Using the Spark context

To get a Spark RDD that represents a database table, load data from a the table into Spark using the sc-dot (sc.) syntax to call the cassandraTable method on the Spark context.

Starting in DSE 5.1, the entry point for Spark applications is the SparkSession object. Using the Spark context directly is deprecated and may be removed in future releases.

Access the deprecated context object, call spark.sparkContext.

val sc = spark.sparkContext

To get a Spark RDD that represents a database table, load data from a table into Spark using the sc-dot (sc.) syntax to call the cassandraTable method on the Spark context, where sc represents the Spark API SparkContext class.

$ sc.cassandraTable ( "keyspace", "table name" )

By default, the DSE Spark shell creates an sc object. The Spark context can be manually retrieved from the Spark session object in the Spark shell by calling spark.sparkContext.

val sc = spark.sparkContext()

Data is mapped into Scala objects and DataStax Enterprise returns a CassandraRDD[CassandraRow]. To use the Spark API for creating an application that runs outside DataStax Enterprise, import com.datastax.spark.connector.SparkContextCassandraFunctions.

The following example shows how to load a table into Spark and read the table from Spark.

  1. Create this keyspace and table in using cqlsh. Use the Analytics datacenter to create the keyspace.

    CREATE KEYSPACE test WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'Analytics' : 1};
    
    CREATE TABLE test.words (word text PRIMARY KEY, count int);

    This example assumes you start a single-node cluster in Spark mode.

  2. Load data into the words table.

    INSERT INTO test.words (word, count) VALUES ('foo', 10);
    INSERT INTO test.words (word, count) VALUES ('bar', 20);
  3. Assuming you started the node in Spark mode, start the Spark shell. Do not use sudo to start the shell.

    bin/dse spark

    The Welcome to Spark output and prompt appears.

  4. Use the showSchema command to view the user keyspaces and tables.

    :showSchema

    Information about all user keyspaces appears.

    ========================================
     Keyspace: HiveMetaStore
    ========================================
     Table: MetaStore
    ----------------------------------------
     - key    : String              (partition key column)
     - entity : String              (clustering column)
     - value  : java.nio.ByteBuffer
    
    
    ========================================
     Keyspace: test
    ========================================
     Table: words
    ----------------------------------------
     - word   : String              (partition key column)
     - count  : Int
    :showSchema test
    ========================================
     Keyspace: test
    ========================================
     Table: words
    ----------------------------------------
     - word  : String (partition key column)
     - count : Int
    :showSchema test words
    ========================================
     Keyspace: test
    ========================================
     Table: words
    ----------------------------------------
     - word  : String (partition key column)
     - count : Int
  5. Get information about only the test keyspace.

    :showSchema test
    $
    ========================================
     Keyspace: test
    ========================================
     Table: words
    ----------------------------------------
     - word  : String (partition key column)
     - count : Int
  6. Get information about the words table.

    :showSchema test words
    $
    ========================================
     Keyspace: test
    ========================================
     Table: words
    ----------------------------------------
     - word  : String (partition key column)
     - count : Int
  7. Define a base RDD to point to the data in the test.words table.

    val rdd = sc.cassandraTable("test", "words")
    rdd: com.datastax.spark.connector.rdd.CassandraRDD[com.datastax.spark.connector.
    CassandraRow] = CassandraRDD[0] at RDD at CassandraRDD.scala:47

    The RDD is returned in the rdd value. To read the table, use this command.

    rdd.collect().foreach(println)
    CassandraRow{word: bar, count: 20}
    CassandraRow{word: foo, count: 10}

Now, you can use methods on the returned RDD to query the test.words table.

Python support for loading cassandraTables

Python supports loading cassandraTables from a Spark streaming context and saving a DStream to the database.

Reading column values

You can read columns in a table using the get methods of the CassandraRow object. The get methods access individual column values by column name or column index. Type conversions are applied on the fly. Use getOption variants when you expect to receive null values.

Continuing with the previous example, follow these steps to access individual column values.

  1. Store the first item of the RDD in the firstRow value.

    val firstRow = rdd.first
    firstRow: com.datastax.spark.connector.CassandraRow = CassandraRow{word: foo, count: 10}
  2. Get the column names.

    rdd.columnNames
    res3: com.datastax.spark.connector.ColumnSelector = AllColumns
  3. Use a generic get to query the table by passing the return type directly.

    firstRow.get[Int]("count")
    res4: Int = 10
    firstRow.get[Long]("count")
    res5: Long = 10
    firstRow.get[BigInt]("count")
    res6: BigInt = 10
    firstRow.get[java.math.BigInteger]("count")
    res7: java.math.BigInteger = 10
    firstRow.get[Option[Int]]("count")
    res8: Option[Int] = Some(10)
    firstRow.get[Option[BigInt]]("count")
    res9: Option[BigInt] = Some(10)

Reading collections

You can read collection columns in a table using the get methods of the CassandraRow object. The get methods access the collection column and returns a corresponding Scala collection.

Assuming you set up the test keyspace earlier, follow these steps to access a collection.

  1. In the test keyspace, set up a collection set using cqlsh.

    CREATE TABLE test.users (
      username text PRIMARY KEY, emails SET<text>);
    
    INSERT INTO test.users (username, emails)
      VALUES ('someone', {'someone@email.com', 's@email.com'});
  2. If Spark is not running, start the Spark shell. Do not use sudo to start the shell.

    bin/dse spark

    The Welcome to Spark output and prompt appears.

  3. Define a CassandraRDD[CassandraRow] to access the collection set.

    val row = sc.cassandraTable("test", "users").collect().apply(0)
    row: com.datastax.spark.connector.CassandraRow = CassandraRow{username: someone,
    emails: {s@email.com,someone@email.com}}
  4. Query the collection set from Spark.

    row.getList[String]("emails")
    res2: Vector[String] = Vector(s@email.com, someone@email.com)
    row.get[List[String]]("emails")
    res3: List[String] = List(s@email.com, someone@email.com)
    row.get[Seq[String]]("emails")
    res4: Seq[String] = List(s@email.com, someone@email.com)
    row.get[IndexedSeq[String]]("emails")
    res5: IndexedSeq[String] = Vector(s@email.com, someone@email.com)
    row.get[Set[String]]("emails")
    res6: Set[String] = Set(s@email.com, someone@email.com)
    row.get[String]("emails")
    res7: String = {s@email.com,someone@email.com}

Restricting the number of fetched columns

For performance reasons, you should not fetch columns you don’t need. You can achieve this with the select method:

To restrict the number of fetched columns:

val row = sc.cassandraTable("test", "users").select("username").collect()
row: Array[com.datastax.spark.connector.CassandraRow] = Array(CassandraRow{username: someone})

Mapping rows to tuples and case classes

Instead of mapping your rows to objects of the CassandraRow class, you can directly unwrap column values into tuples of the desired type.

To map rows to tuples:

sc.cassandraTable[(String, Int)]("test", "words").select("word", "count").collect()
res9: Array[(String, Int)] = Array((bar,20), (foo,10))
sc.cassandraTable[(Int, String)]("test", "words").select("count", "word").collect()
res10: Array[(Int, String)] = Array((20,bar), (10,foo))

Define a case class with properties of the same name as the columns. For multi-word column identifiers, separate each word using an underscore when creating the columns, and use camel case abbreviation on the Scala side.

To map rows to case classes:

case class WordCount(word: String, count: Int)
defined class WordCount
 sc.cassandraTable[WordCount]("test", "words").collect()
res14: Array[WordCount] = Array(WordCount(bar,20), WordCount(foo,20))

You can name columns using these conventions:

  • Use the underscore convention and lowercase letters. (Recommended)

  • Use the camel case convention, exactly the same as properties in Scala.

The following examples show valid column names.

Valid column names
Database column name Scala property name

count

count

column_1

column1

user_name

userName

user_address

UserAddress

count

count

column1

column1

userName

userName

UserAddress

UserAddress

Mapping rows to objects with a user-defined function

Invoke as on the CassandraRDD to map every row to an object of a different type. Contrary to map, as expects a function having the same number of arguments as the number of columns to be fetched. Invoking as in this way performs type conversions. Using as to directly create objects of a particular type eliminates the need to create CassandraRow objects and also decreases garbage collection pressure.

To map columns using a user-defined function:

val table = sc.cassandraTable("test", "words")
table: com.datastax.spark.connector.rdd.CassandraRDD[com.datastax.spark.connector.
CassandraRow] = CassandraRDD[9] at RDD at CassandraRDD.scala:47
val total = table.select("count").as((c: Int) => c).sum
total: Double = 30.0
val frequencies = table.select("word", "count").as((w: String, c: Int) => (w, c / total)).collect()
frequencies: Array[(String, Double)] = Array((bar,0.6666666666666666), (foo,0.3333333333333333))

Filtering rows on the server

To filter rows, you can use the filter transformation provided by Spark. Filter transformation fetches all rows from the database first and then filters them in Spark. Some CPU cycles are wasted serializing and de-serializing objects excluded from the result. To avoid this overhead, CassandraRDD has a method that passes an arbitrary CQL condition to filter the row set on the server.

This example shows how to use Spark to filter rows on the server.

  1. Download and unzip the CQL commands for this example. The commands in this file perform the following tasks:

    • Create a cars table in the test keyspace.

    • Index the color column.

    • Insert some data into the table

  2. Run the test_cars.cql file using cqlsh or DevCenter. For example using cqlsh:

    $ cqlsh -f test_cars.cql
  3. Filter the rows using Spark:

    sc.cassandraTable("test", "cars").select("id", "model").where("color = ?", "black").collect().foreach(println)
    CassandraRow{id: AS-8888, model: Aston Martin DB9 Volante}
    CassandraRow{id: KF-334L, model: Ford Mondeo}
    CassandraRow{id: MT-8787, model: Hyundai x35}
    CassandraRow{id: MZ-1038, model: Mazda CX-9}
    CassandraRow{id: DG-2222, model: Dodge Avenger}
    CassandraRow{id: DG-8897, model: Dodge Charger}
    CassandraRow{id: BT-3920, model: Bentley Continental GT}
    CassandraRow{id: IN-9964, model: Infinity FX}
    sc.cassandraTable("test", "cars").select("id", "model").where("color = ?", "silver").collect().foreach(println)
    CassandraRow{id: FR-8877, model: Ferrari FF}
    CassandraRow{id: FR-8877, model: Ferrari FF}
    CassandraRow{id: HD-1828, model: Honda Accord}
    CassandraRow{id: WX-2234, model: Toyota Yaris}

Was this helpful?

Give Feedback

How can we improve the documentation?

© 2024 DataStax | Privacy policy | Terms of use

Apache, Apache Cassandra, Cassandra, Apache Tomcat, Tomcat, Apache Lucene, Apache Solr, Apache Hadoop, Hadoop, Apache Pulsar, Pulsar, Apache Spark, Spark, Apache TinkerPop, TinkerPop, Apache Kafka and Kafka are either registered trademarks or trademarks of the Apache Software Foundation or its subsidiaries in Canada, the United States and/or other countries. Kubernetes is the registered trademark of the Linux Foundation.

General Inquiries: +1 (650) 389-6000, info@datastax.com