Using the Spark context
To get a Spark RDD that represents a Cassandra table, load data from a Cassandra table into Spark using the sc-dot (sc.) syntax to call the cassandraTable method on the Spark context.
To get a Spark RDD that represents a Cassandra table, load data from a Cassandra table into Spark using the sc-dot (sc.) syntax to call the cassandraTable method on the Spark context, where sc represents the Spark API SparkContext class.
sc.cassandraTable ( "keyspace", "table name" )
Cassandra data is mapped into Scala objects and DataStax Enterprise returns a CassandraRDD[CassandraRow]. To use the Spark API for creating an application that runs outside DataStax Enterprise, import com.datastax.spark.connector.SparkContextCassandraFunctions.
The following example shows how to load a Cassandra table into Spark and read the table in Cassandra from Spark.
- Create this keyspace and table in Cassandra using
cqlsh
. Use the Analytics datacenter to create the keyspace.CREATE KEYSPACE test WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'Analytics' : 1}; CREATE TABLE test.words (word text PRIMARY KEY, count int);
This example assumes you start a single-node cluster in Spark mode.
- Load data into the words
table.
INSERT INTO test.words (word, count) VALUES ('foo', 10); INSERT INTO test.words (word, count) VALUES ('bar', 20);
- Assuming you started the node in Spark mode, start the Spark shell. Do not use
sudo
to start the shell.$ bin/dse spark
The Welcome to Spark output and scala prompt appears.
- Use the showSchema command to view the user keyspaces
and tables in Cassandra.
:showSchema
Information about all user keyspaces appears.
======================================== Keyspace: HiveMetaStore ======================================== Table: MetaStore ---------------------------------------- - key : String (partition key column) - entity : String (clustering column) - value : java.nio.ByteBuffer ======================================== Keyspace: test ======================================== Table: words ---------------------------------------- - word : String (partition key column) - count : Int scala> :showSchema test ======================================== Keyspace: test ======================================== Table: words ---------------------------------------- - word : String (partition key column) - count : Int scala> :showSchema test words ======================================== Keyspace: test ======================================== Table: words ---------------------------------------- - word : String (partition key column) - count : Int
- Get information about only the
test
keyspace.:showSchema test
======================================== Keyspace: test ======================================== Table: words ---------------------------------------- - word : String (partition key column) - count : Int
- Get information about the
words
table.:showSchema test words
======================================== Keyspace: test ======================================== Table: words ---------------------------------------- - word : String (partition key column) - count : Int
- Define a base RDD to point to the data in the
test.words
table.val rdd = sc.cassandraTable("test", "words")
rdd: com.datastax.spark.connector.rdd.CassandraRDD[com.datastax.spark.connector. CassandraRow] = CassandraRDD[0] at RDD at CassandraRDD.scala:47
The RDD is returned in the rdd value. To read the Cassandra table, use this command.
rdd.toArray.foreach(println)
CassandraRow{word: bar, count: 20} CassandraRow{word: foo, count: 10}
Now, you can use methods on the returned RDD to query the test.words
table.
Python support for loading cassandraTables
Python supports loading cassandraTables
from a Spark streaming context and
saving a DStream to Cassandra.
Reading column values
You can read columns in a Cassandra table using the get methods of the CassandraRow object. The get methods access individual column values by column name or column index. Type conversions are applied on the fly. Use getOption variants when you expect to receive Cassandra null values.
- Store the first item of the RDD in the
firstRow
value.val firstRow = rdd.first
firstRow: com.datastax.spark.connector.CassandraRow = CassandraRow{word: foo, count: 10}
- Get the column
names.
rdd.columnNames
res3: com.datastax.spark.connector.ColumnSelector = AllColumns
- Use a generic get to query the table by passing the return type
directly.
firstRow.get[Int]("count")
res4: Int = 10
firstRow.get[Long]("count")
res5: Long = 10
firstRow.get[BigInt]("count")
res6: BigInt = 10
firstRow.get[java.math.BigInteger]("count")
res7: java.math.BigInteger = 10
firstRow.get[Option[Int]]("count")
res8: Option[Int] = Some(10)
firstRow.get[Option[BigInt]]("count")
res9: Option[BigInt] = Some(10)
Reading collections
You can read collection columns in a Cassandra table using the get methods of the CassandraRow object. The get methods access the collection column and returns a corresponding Scala collection.
Assuming you set up the test
keyspace earlier, follow these steps to access a
Cassandra collection.
- In the
test
keyspace, set up a collection set using cqlsh.CREATE TABLE test.users ( username text PRIMARY KEY, emails SETtext); INSERT INTO test.users (username, emails) VALUES ('someone', {'someone@email.com', 's@email.com'});
- If Spark is not running, start the Spark shell. Do not use sudo to start
the shell.
$ bin/dse spark
The Welcome to Spark output and prompt appears.
- Define a CassandraRDD[CassandraRow] to access the collection
set.
val row = sc.cassandraTable("test", "users").toArray.apply(0)
row: com.datastax.spark.connector.CassandraRow = CassandraRow{username: someone, emails: {s@email.com,someone@email.com}}
- Query the collection set in Cassandra from
Spark.
row.getList[String]("emails")
res2: Vector[String] = Vector(s@email.com, someone@email.com)
row.get[List[String]]("emails")
res3: List[String] = List(s@email.com, someone@email.com)
row.get[Seq[String]]("emails")
res4: Seq[String] = List(s@email.com, someone@email.com)
row.get[IndexedSeq[String]]("emails")
res5: IndexedSeq[String] = Vector(s@email.com, someone@email.com)
row.get[Set[String]]("emails")
res6: Set[String] = Set(s@email.com, someone@email.com)
row.get[String]("emails")
res7: String = {s@email.com,someone@email.com}
Restricting the number of fetched columns
For performance reasons, you should not fetch columns you don't need. You can achieve this with the select method:
To restrict the number of fetched columns:
val row = sc.cassandraTable("test", "users").select("username").toArray
row: Array[com.datastax.spark.connector.CassandraRow] = Array(CassandraRow{username: someone})
Mapping rows to tuples and case classes
Instead of mapping your Cassandra rows to objects of the CassandraRow class, you can directly unwrap column values into tuples of the desired type.
To map rows to tuples:
sc.cassandraTable[(String, Int)]("test", "words").select("word", "count").toArray
res9: Array[(String, Int)] = Array((bar,20), (foo,10))
sc.cassandraTable[(Int, String)]("test", "words").select("count", "word").toArray
res10: Array[(Int, String)] = Array((20,bar), (10,foo))
Define a case class with properties of the same name as the Cassandra columns. For multi-word column identifiers, separate each word using an underscore in Cassandra, and use camel case abbreviation on the Scala side.
To map rows to case classes:
case class WordCount(word: String, count: Int)
defined class WordCount
scala> sc.cassandraTable[WordCount]("test", "words").toArray
res14: Array[WordCount] = Array(WordCount(bar,20), WordCount(foo,20))
You can name columns in Cassandra using these conventions:
- Use the underscore convention and lowercase letters. (Recommended)
- Use the camel case convention, exactly the same as properties in Scala.
The following examples show valid column names.
Cassandra column name | Scala property name |
---|---|
count | count |
column_1 | column1 |
user_name | userName |
user_address | UserAddress |
Cassandra column name | Scala property name |
---|---|
count | count |
column1 | column1 |
userName | userName |
UserAddress | UserAddress |
Mapping rows to objects with a user-defined function
Invoke as on the CassandraRDD to map every row to an object of a different type. Contrary to map, as expects a function having the same number of arguments as the number of columns to be fetched. Invoking as in this way performs type conversions. Using as to directly create objects of a particular type eliminates the need to create CassandraRow objects and also decreases garbage collection pressure.
To map columns using a user-defined function:
val table = sc.cassandraTable("test", "words")
table: com.datastax.spark.connector.rdd.CassandraRDD[com.datastax.spark.connector. CassandraRow] = CassandraRDD[9] at RDD at CassandraRDD.scala:47
val total = table.select("count").as((c: Int) => c).sum
total: Double = 30.0
val frequencies = table.select("word", "count").as((w: String, c: Int) => (w, c / total)).toArray
frequencies: Array[(String, Double)] = Array((bar,0.6666666666666666), (foo,0.3333333333333333))
Filtering rows on the server
To filter rows, you can use the filter transformation provided by Spark. Filter transformation fetches all rows from Cassandra first and then filters them in Spark. Some CPU cycles are wasted serializing and de-serializing objects excluded from the result. To avoid this overhead, CassandraRDD has a method that passes an arbitrary CQL condition to filter the row set on the server.
- Download and unzip the CQL commands for this example. The commands in
this file perform the following tasks:
- Create a cars table in the test keyspace.
- Index the color column.
- Insert some data into the table
- Run the test_cars.cql file using
cqlsh
or DevCenter. For example usingcqlsh
:cqlsh -f test_cars.cql
- Filter the rows using Spark:
sc.cassandraTable("test", "cars").select("id", "model").where("color = ?", "black").toArray.foreach(println)
CassandraRow{id: AS-8888, model: Aston Martin DB9 Volante} CassandraRow{id: KF-334L, model: Ford Mondeo} CassandraRow{id: MT-8787, model: Hyundai x35} CassandraRow{id: MZ-1038, model: Mazda CX-9} CassandraRow{id: DG-2222, model: Dodge Avenger} CassandraRow{id: DG-8897, model: Dodge Charger} CassandraRow{id: BT-3920, model: Bentley Continental GT} CassandraRow{id: IN-9964, model: Infinity FX}
sc.cassandraTable("test", "cars").select("id", "model").where("color = ?", "silver").toArray.foreach(println)
CassandraRow{id: FR-8877, model: Ferrari FF} CassandraRow{id: FR-8877, model: Ferrari FF} CassandraRow{id: HD-1828, model: Honda Accord} CassandraRow{id: WX-2234, model: Toyota Yaris}