Using the Spark context
Steps for getting a Spark RDD that represents a Cassandra table.
To get a Spark RDD that represents a Cassandra table, load data from a Cassandra table into Spark using the sc-dot (sc.) syntax to call the cassandraTable method on the Spark context. sc represents the Spark API SparkContext class.
sc.cassandraTable ( "keyspace", "table name" )
Cassandra data is mapped into Scala objects and DataStax Enterprise returns a CassandraRDD[CassandraRow]. To use the Spark API for creating an application that runs outside DataStax Enterprise, import com.datastax.spark.connector.SparkContextCassandraFunctions.
The following example shows how to load a Cassandra table into Spark and read the table in Cassandra from Spark.
- Create this keyspace and table in Cassandra using cqlsh. Use the Analytics data center to
create the
keyspace.
CREATE KEYSPACE test WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'Analytics' : 1}; CREATE TABLE test.words (word text PRIMARY KEY, count int);
This example assumes you start a single-node cluster in Spark mode.
- Load data into the words
table.
INSERT INTO test.words (word, count) VALUES ('foo', 10); INSERT INTO test.words (word, count) VALUES ('bar', 20);
- Assuming you started the node in Spark mode, start the Spark shell. Do not use sudo to start
the shell.
$ bin/dse spark
The Welcome to Spark output and scala prompt appears.
- Use the showSchema command, available in DataStax Enterprise 4.5.1 and
later, to view the user keyspaces and tables in
Cassandra.
scala> :showSchema
Information about all user keyspaces appears.
======================================== Keyspace: HiveMetaStore ======================================== Table: MetaStore ---------------------------------------- - key : String (partition key column) - entity : String (clustering column) - value : java.nio.ByteBuffer ======================================== Keyspace: test ======================================== Table: words ---------------------------------------- - word : String (partition key column) - count : Int scala> :showSchema test ======================================== Keyspace: test ======================================== Table: words ---------------------------------------- - word : String (partition key column) - count : Int scala> :showSchema test words ======================================== Keyspace: test ======================================== Table: words ---------------------------------------- - word : String (partition key column) - count : Int
- Get information about only the test keyspace.
scala> :showSchema test ======================================== Keyspace: test ======================================== Table: words ---------------------------------------- - word : String (partition key column) - count : Int
- Get information about the words
table.
scala> :showSchema test words ======================================== Keyspace: test ======================================== Table: words ---------------------------------------- - word : String (partition key column) - count : Int
- Define a base RDD to point to the data in the test.words
table.
scala> val rdd = sc.cassandraTable("test", "words") rdd: com.datastax.spark.connector.rdd.CassandraRDD[com.datastax.spark.connector. CassandraRow] = CassandraRDD[0] at RDD at CassandraRDD.scala:47
The RDD is returned in the rdd value. If you want to read the Cassandra table, you can use this command.
scala> rdd.toArray.foreach(println) CassandraRow{word: bar, count: 20} CassandraRow{word: foo, count: 10}
Now, you can use methods on the returned RDD to query the test.words table.
Reading column values
You can read columns in a Cassandra table using the get methods of the CassandraRow object. The get methods access individual column values by column name or column index. Type conversions are applied on the fly. Use getOption variants when you expect to receive Cassandra null values.
- Store the first item of the rdd in the firstRow
value.
scala> val firstRow = rdd.first firstRow: com.datastax.spark.connector.CassandraRow = CassandraRow{word: foo, count: 10}
- Get the column
names.
scala> rdd.columnNames res3: com.datastax.spark.connector.ColumnSelector = AllColumns
- Use a generic get to query the table by passing the return type
directly.
scala> firstRow.get[Int]("count") res4: Int = 10 scala> firstRow.get[Long]("count") res5: Long = 10 scala> firstRow.get[BigInt]("count") res6: BigInt = 10 scala> firstRow.get[java.math.BigInteger]("count") res7: java.math.BigInteger = 10 scala> firstRow.get[Option[Int]]("count") res8: Option[Int] = Some(10) scala> firstRow.get[Option[BigInt]]("count") res9: Option[BigInt] = Some(10)
Reading collections
You can read collection columns in a Cassandra table using the get methods of the CassandraRow object. The get methods access the collection column and returns a corresponding Scala collection.
Assuming you set up the test keyspace earlier, follow these steps to access a Cassandra collection.
- In the test keyspace, set up a collection set using
cqlsh.
CREATE TABLE test.users ( username text PRIMARY KEY, emails SET<text>); INSERT INTO test.users (username, emails) VALUES ('someone', {'someone@email.com', 's@email.com'});
- If Spark is not running, start the Spark shell. Do not use sudo to start the
shell.
$ bin/dse spark
The Welcome to Spark output and scala prompt appears.
- Define a CassandraRDD[CassandraRow] to access the collection
set.
scala> val row = sc.cassandraTable("test", "users").toArray.apply(0) row: com.datastax.spark.connector.CassandraRow = CassandraRow{username: someone, emails: {s@email.com,someone@email.com}}
- Query the collection set in Cassandra from
Spark.
scala> row.getList[String]("emails") res2: Vector[String] = Vector(s@email.com, someone@email.com) scala> row.get[List[String]]("emails") res3: List[String] = List(s@email.com, someone@email.com) scala> row.get[Seq[String]]("emails") res4: Seq[String] = List(s@email.com, someone@email.com) scala> row.get[IndexedSeq[String]]("emails") res5: IndexedSeq[String] = Vector(s@email.com, someone@email.com) scala> row.get[Set[String]]("emails") res6: Set[String] = Set(s@email.com, someone@email.com) scala> row.get[String]("emails") res7: String = {s@email.com,someone@email.com}
Restricting the number of fetched columns
For performance reasons, you should not fetch columns you don't need. You can achieve this with select:
To restrict the number of fetched columns:
scala> val row = sc.cassandraTable("test", "users").select("username").toArray row: Array[com.datastax.spark.connector.CassandraRow] = Array(CassandraRow{username: someone})
Mapping rows to tuples and case classes
Instead of mapping your Cassandra rows to objects of CassandraRow class, you can directly unwrap column values into tuples of desired type.
To map rows to tuples:
scala> sc.cassandraTable[(String, Int)]("test", "words").select("word", "count").toArray res9: Array[(String, Int)] = Array((bar,20), (foo,10)) scala> sc.cassandraTable[(Int, String)]("test", "words").select("count", "word").toArray res10: Array[(Int, String)] = Array((20,bar), (10,foo))
Define a case class with properties named the same as the Cassandra columns. For multi-word column identifiers, separate each word using an underscore in Cassandra, and use camel case abbreviation on the Scala side.
To map rows to case classes:
scala> case class WordCount(word: String, count: Int) defined class WordCount scala> sc.cassandraTable[WordCount]("test", "words").toArray res14: Array[WordCount] = Array(WordCount(bar,20), WordCount(foo,20))
You can name columns in Cassandra using these conventions:
- Use the underscore convention and lowercase letters. (Recommended)
- Use the camel case convention, exactly the same as properties in Scala.
The following examples show valid column names.
Cassandra column name | Scala property name |
---|---|
count | count |
column_1 | column1 |
user_name | userName |
user_address | UserAddress |
Cassandra column name | Scala property name |
---|---|
count | count |
column1 | column1 |
userName | userName |
UserAddress | UserAddress |
Mapping rows to objects with a user-defined function
Invoke as on the CassandraRDD to map every row to an object of different type. Contrary to map, as expects a function having the same number of arguments as the number of columns to be fetched. Invoking as in this way performs type conversions. Using as to directly create objects of a particular type eliminates the need to create CassandraRow objects and also decreases garbage collection pressure.
To map columns using a user-defined function:
scala> val table = sc.cassandraTable("test", "words") table: com.datastax.spark.connector.rdd.CassandraRDD[com.datastax.spark.connector. CassandraRow] = CassandraRDD[9] at RDD at CassandraRDD.scala:47 scala> val total = table.select("count").as((c: Int) => c).sum total: Double = 30.0 scala> val frequencies = table.select("word", "count").as((w: String, c: Int) => (w, c / total)).toArray frequencies: Array[(String, Double)] = Array((bar,0.6666666666666666), (foo,0.3333333333333333))
Filtering rows on the server
To filter rows, you can use the filter transformation provided by Spark. Filter transformation fetches all rows from Cassandra first and then filters them in Spark. Some CPU cycles are wasted serializing and deserializing objects excluded from the result. To avoid this overhead, CassandraRDD has a method that passes an arbitrary CQL condition to filter the row set on the server.
- Download and unzip the CQL commands for this example. The commands in
this file perform the following tasks:
- Create a cars table in the test keyspace.
- Index the color column.
- Insert some data into the table
- Run the test_cars.cql file on the DataStax Enterprise command line. On Linux for example:
$ Install_Directory/bin/cqlsh -f test_cars.cql
- Filter rows using Spark:
scala> sc.cassandraTable("test", "cars").select("id", "model").where("color = ?", "black").toArray.foreach(println)CassandraRow{id: AS-8888, model: Aston Martin DB9 Volante} CassandraRow{id: KF-334L, model: Ford Mondeo} CassandraRow{id: MT-8787, model: Hyundai x35} CassandraRow{id: MZ-1038, model: Mazda CX-9} CassandraRow{id: DG-2222, model: Dodge Avenger} CassandraRow{id: DG-8897, model: Dodge Charger} CassandraRow{id: BT-3920, model: Bentley Continental GT} CassandraRow{id: IN-9964, model: Infinity FX} scala> sc.cassandraTable("test", "cars").select("id", "model").where("color = ?", "silver").toArray.foreach(println)CassandraRow{id: FR-8877, model: Ferrari FF} CassandraRow{id: FR-8877, model: Ferrari FF} CassandraRow{id: HD-1828, model: Honda Accord} CassandraRow{id: WX-2234, model: Toyota Yaris}
Running standalone Spark jobs
Use the dsetool sparkmaster command to get a master location for the standalone jobs that create SparkContext manually. The Spark Master runs on the same node as the integrated Hadoop job tracker.