Using the Spark context

To get a Spark RDD that represents a Cassandra table, load data from a Cassandra table into Spark using the sc-dot (sc.) syntax to call the cassandraTable method on the Spark context.

To get a Spark RDD that represents a Cassandra table, load data from a Cassandra table into Spark using the sc-dot (sc.) syntax to call the cassandraTable method on the Spark context, where sc represents the Spark API SparkContext class.

sc.cassandraTable ( "keyspace", "table name" )

Cassandra data is mapped into Scala objects and DataStax Enterprise returns a CassandraRDD[CassandraRow]. To use the Spark API for creating an application that runs outside DataStax Enterprise, import com.datastax.spark.connector.SparkContextCassandraFunctions.

The following example shows how to load a Cassandra table into Spark and read the table in Cassandra from Spark.

  1. Create this keyspace and table in Cassandra using cqlsh. Use the Analytics datacenter to create the keyspace.
    CREATE KEYSPACE test WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'Analytics' : 1};
    
    CREATE TABLE test.words (word text PRIMARY KEY, count int);

    This example assumes you start a single-node cluster in Spark mode.

  2. Load data into the words table.
    INSERT INTO test.words (word, count) VALUES ('foo', 10);
    INSERT INTO test.words (word, count) VALUES ('bar', 20);
  3. Assuming you started the node in Spark mode, start the Spark shell. Do not use sudo to start the shell.
    $ bin/dse spark

    The Welcome to Spark output and scala prompt appears.

  4. Use the showSchema command to view the user keyspaces and tables in Cassandra.
    :showSchema

    Information about all user keyspaces appears.

    ========================================
     Keyspace: HiveMetaStore
    ========================================
     Table: MetaStore
    ----------------------------------------
     - key    : String              (partition key column)
     - entity : String              (clustering column)
     - value  : java.nio.ByteBuffer 
    
    
    ========================================
     Keyspace: test
    ========================================
     Table: words
    ----------------------------------------
     - word   : String              (partition key column)
     - count  : Int                
    
    scala> :showSchema test
    ========================================
     Keyspace: test
    ========================================
     Table: words
    ----------------------------------------
     - word  : String (partition key column)
     - count : Int   
    
    scala> :showSchema test words
    ========================================
     Keyspace: test
    ========================================
     Table: words
    ----------------------------------------
     - word  : String (partition key column)
     - count : Int
  5. Get information about only the test keyspace.
    :showSchema test
    
    ========================================
     Keyspace: test
    ========================================
     Table: words
    ----------------------------------------
     - word  : String (partition key column)
     - count : Int   
  6. Get information about the words table.
    :showSchema test words
    
    ========================================
     Keyspace: test
    ========================================
     Table: words
    ----------------------------------------
     - word  : String (partition key column)
     - count : Int
  7. Define a base RDD to point to the data in the test.words table.
    val rdd = sc.cassandraTable("test", "words")
    
    rdd: com.datastax.spark.connector.rdd.CassandraRDD[com.datastax.spark.connector.
    CassandraRow] = CassandraRDD[0] at RDD at CassandraRDD.scala:47

    The RDD is returned in the rdd value. To read the Cassandra table, use this command.

    rdd.toArray.foreach(println)
    
    CassandraRow{word: bar, count: 20}
    CassandraRow{word: foo, count: 10}

Now, you can use methods on the returned RDD to query the test.words table.

Python support for loading cassandraTables 

Python supports loading cassandraTables from a Spark Streaming context and saving a DStream to Cassandra.

Reading column values 

You can read columns in a Cassandra table using the get methods of the CassandraRow object. The get methods access individual column values by column name or column index. Type conversions are applied on the fly. Use getOption variants when you expect to receive Cassandra null values.

Continuing with the previous example, follow these steps to access individual column values.
  1. Store the first item of the rdd in the firstRow value.
    val firstRow = rdd.first
    firstRow: com.datastax.spark.connector.CassandraRow = CassandraRow{word: foo, count: 10}
  2. Get the column names.
    rdd.columnNames
    
    res3: com.datastax.spark.connector.ColumnSelector = AllColumns
  3. Use a generic get to query the table by passing the return type directly.
    firstRow.get[Int]("count")
    
    res4: Int = 10
    
    firstRow.get[Long]("count")
    
    res5: Long = 10
    
    firstRow.get[BigInt]("count")
    
    res6: BigInt = 10
    
    firstRow.get[java.math.BigInteger]("count")
    
    res7: java.math.BigInteger = 10
    
    firstRow.get[Option[Int]]("count")
    
    res8: Option[Int] = Some(10)
    
    firstRow.get[Option[BigInt]]("count")
    
    res9: Option[BigInt] = Some(10)

Reading collections 

You can read collection columns in a Cassandra table using the get methods of the CassandraRow object. The get methods access the collection column and returns a corresponding Scala collection.

Assuming you set up the test keyspace earlier, follow these steps to access a Cassandra collection.

  1. In the test keyspace, set up a collection set using cqlsh.
    CREATE TABLE test.users (
      username text PRIMARY KEY, emails SETtext);
    
    INSERT INTO test.users (username, emails) 
      VALUES ('someone', {'someone@email.com', 's@email.com'});
  2. If Spark is not running, start the Spark shell. Do not use sudo to start the shell.
    $ bin/dse spark

    The Welcome to Spark output and scala prompt appears.

  3. Define a CassandraRDD[CassandraRow] to access the collection set.
    val row = sc.cassandraTable("test", "users").toArray.apply(0)
    
    row: com.datastax.spark.connector.CassandraRow = CassandraRow{username: someone, 
    emails: {s@email.com,someone@email.com}}
  4. Query the collection set in Cassandra from Spark.
    row.getList[String]("emails")
    
    res2: Vector[String] = Vector(s@email.com, someone@email.com)
    
    row.get[List[String]]("emails")
    
    res3: List[String] = List(s@email.com, someone@email.com)
    
    row.get[Seq[String]]("emails")
    
    res4: Seq[String] = List(s@email.com, someone@email.com)
    
    row.get[IndexedSeq[String]]("emails")
    
    res5: IndexedSeq[String] = Vector(s@email.com, someone@email.com)
    
    row.get[Set[String]]("emails")
    
    res6: Set[String] = Set(s@email.com, someone@email.com)
    
    row.get[String]("emails")
    
    res7: String = {s@email.com,someone@email.com}

Restricting the number of fetched columns 

For performance reasons, you should not fetch columns you don't need. You can achieve this with the select method:

To restrict the number of fetched columns:

val row = sc.cassandraTable("test", "users").select("username").toArray
row: Array[com.datastax.spark.connector.CassandraRow] = Array(CassandraRow{username: someone})

Mapping rows to tuples and case classes 

Instead of mapping your Cassandra rows to objects of the CassandraRow class, you can directly unwrap column values into tuples of the desired type.

To map rows to tuples:

sc.cassandraTable[(String, Int)]("test", "words").select("word", "count").toArray
res9: Array[(String, Int)] = Array((bar,20), (foo,10))
sc.cassandraTable[(Int, String)]("test", "words").select("count", "word").toArray
res10: Array[(Int, String)] = Array((20,bar), (10,foo))

Define a case class with properties of the same name as the Cassandra columns. For multi-word column identifiers, separate each word using an underscore in Cassandra, and use camel case abbreviation on the Scala side.

To map rows to case classes:

case class WordCount(word: String, count: Int)
defined class WordCount
scala> sc.cassandraTable[WordCount]("test", "words").toArray
res14: Array[WordCount] = Array(WordCount(bar,20), WordCount(foo,20))

You can name columns in Cassandra using these conventions:

  • Use the underscore convention and lowercase letters. (Recommended)
  • Use the camel case convention, exactly the same as properties in Scala.

The following examples show valid column names.

Recommended naming convention
Cassandra column name Scala property name
count count
column_1 column1
user_name userName
user_address UserAddress
Alternative naming convention
Cassandra column name Scala property name
count count
column1 column1
userName userName
UserAddress UserAddress

Mapping rows to objects with a user-defined function 

Invoke as on the CassandraRDD to map every row to an object of a different type. Contrary to map, as expects a function having the same number of arguments as the number of columns to be fetched. Invoking as in this way performs type conversions. Using as to directly create objects of a particular type eliminates the need to create CassandraRow objects and also decreases garbage collection pressure.

To map columns using a user-defined function:

val table = sc.cassandraTable("test", "words")
table: com.datastax.spark.connector.rdd.CassandraRDD[com.datastax.spark.connector.
CassandraRow] = CassandraRDD[9] at RDD at CassandraRDD.scala:47
val total = table.select("count").as((c: Int) => c).sum
total: Double = 30.0
val frequencies = table.select("word", "count").as((w: String, c: Int) => (w, c / total)).toArray
frequencies: Array[(String, Double)] = Array((bar,0.6666666666666666), (foo,0.3333333333333333))

Filtering rows on the server 

To filter rows, you can use the filter transformation provided by Spark. Filter transformation fetches all rows from Cassandra first and then filters them in Spark. Some CPU cycles are wasted serializing and deserializing objects excluded from the result. To avoid this overhead, CassandraRDD has a method that passes an arbitrary CQL condition to filter the row set on the server.

This example shows how to use Spark to filter rows on the server.
  1. Download and unzip the CQL commands for this example. The commands in this file perform the following tasks:
    • Create a cars table in the test keyspace.
    • Index the color column.
    • Insert some data into the table
  2. Run the test_cars.cql file using cqlsh or DevCenter. For example using cqlsh:
    cqlsh -f test_cars.cql
  3. Filter the rows using Spark:
    sc.cassandraTable("test", "cars").select("id", "model").where("color = ?", "black").toArray.foreach(println)
    
    CassandraRow{id: AS-8888, model: Aston Martin DB9 Volante}
    CassandraRow{id: KF-334L, model: Ford Mondeo}
    CassandraRow{id: MT-8787, model: Hyundai x35}
    CassandraRow{id: MZ-1038, model: Mazda CX-9}
    CassandraRow{id: DG-2222, model: Dodge Avenger}
    CassandraRow{id: DG-8897, model: Dodge Charger}
    CassandraRow{id: BT-3920, model: Bentley Continental GT}
    CassandraRow{id: IN-9964, model: Infinity FX}
    
    sc.cassandraTable("test", "cars").select("id", "model").where("color = ?", "silver").toArray.foreach(println)
    
    CassandraRow{id: FR-8877, model: Ferrari FF}
    CassandraRow{id: FR-8877, model: Ferrari FF}
    CassandraRow{id: HD-1828, model: Honda Accord}
    CassandraRow{id: WX-2234, model: Toyota Yaris}