Using the Spark context
Starting in DSE 5.1, the entry point for Spark applications is the SparkSession object. Using the Spark context directly is deprecated and may be removed in future releases. |
Access the deprecated context object, call spark.sparkContext.
val sc = spark.sparkContext
To get a Spark RDD that represents a database table, load data from a table into Spark using the sc-dot (sc.) syntax to call the cassandraTable method on the Spark context, where sc represents the Spark API SparkContext class.
sc.cassandraTable ( "keyspace", "table name" )
By default, the DSE Spark shell creates an sc object. The Spark context can be manually retrieved from the Spark session object in the Spark shell by calling spark.sparkContext.
val sc = spark.sparkContext()
Data is mapped into Scala objects and DataStax Enterprise returns a CassandraRDD[CassandraRow]. To use the Spark API for creating an application that runs outside DataStax Enterprise, import com.datastax.spark.connector.SparkContextCassandraFunctions.
The following example shows how to load a table into Spark and read the table from Spark.
-
Create this keyspace and table in using
cqlsh
. Use the Analytics datacenter to create the keyspace.CREATE KEYSPACE test WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'Analytics' : 1}; CREATE TABLE test.words (word text PRIMARY KEY, count int);
This example assumes you start a single-node cluster in Spark mode.
-
Load data into the
words
table.INSERT INTO test.words (word, count) VALUES ('foo', 10); INSERT INTO test.words (word, count) VALUES ('bar', 20);
-
Assuming you started the node in Spark mode, start the Spark shell. Do not use
sudo
to start the shell.bin/dse spark
The Welcome to Spark output and prompt appears.
-
Use the showSchema command to view the user keyspaces and tables.
:showSchema
Information about all user keyspaces appears.
======================================== Keyspace: HiveMetaStore ======================================== Table: MetaStore ---------------------------------------- - key : String (partition key column) - entity : String (clustering column) - value : java.nio.ByteBuffer ======================================== Keyspace: test ======================================== Table: words ---------------------------------------- - word : String (partition key column) - count : Int
:showSchema test
======================================== Keyspace: test ======================================== Table: words ---------------------------------------- - word : String (partition key column) - count : Int
:showSchema test words
======================================== Keyspace: test ======================================== Table: words ---------------------------------------- - word : String (partition key column) - count : Int
-
Get information about only the
test
keyspace.:showSchema test
======================================== Keyspace: test ======================================== Table: words ---------------------------------------- - word : String (partition key column) - count : Int
-
Get information about the
words
table.:showSchema test words
======================================== Keyspace: test ======================================== Table: words ---------------------------------------- - word : String (partition key column) - count : Int
-
Define a base RDD to point to the data in the
test.words
table.val rdd = sc.cassandraTable("test", "words")
rdd: com.datastax.spark.connector.rdd.CassandraRDD[com.datastax.spark.connector. CassandraRow] = CassandraRDD[0] at RDD at CassandraRDD.scala:47
The RDD is returned in the rdd value. To read the table, use this command.
rdd.collect().foreach(println)
CassandraRow{word: bar, count: 20} CassandraRow{word: foo, count: 10}
Now, you can use methods on the returned RDD to query the test.words
table.
Python support for loading cassandraTables
Python supports loading cassandraTables
from a Spark streaming context and saving a DStream to the database.
Reading column values
You can read columns in a table using the get methods of the CassandraRow object. The get methods access individual column values by column name or column index. Type conversions are applied on the fly. Use getOption variants when you expect to receive null values.
Continuing with the previous example, follow these steps to access individual column values.
-
Store the first item of the RDD in the
firstRow
value.val firstRow = rdd.first
firstRow: com.datastax.spark.connector.CassandraRow = CassandraRow{word: foo, count: 10}
-
Get the column names.
rdd.columnNames
res3: com.datastax.spark.connector.ColumnSelector = AllColumns
-
Use a generic get to query the table by passing the return type directly.
firstRow.get[Int]("count")
res4: Int = 10
firstRow.get[Long]("count")
res5: Long = 10
firstRow.get[BigInt]("count")
res6: BigInt = 10
firstRow.get[java.math.BigInteger]("count")
res7: java.math.BigInteger = 10
firstRow.get[Option[Int]]("count")
res8: Option[Int] = Some(10)
firstRow.get[Option[BigInt]]("count")
res9: Option[BigInt] = Some(10)
Reading collections
You can read collection columns in a table using the get methods of the CassandraRow object. The get methods access the collection column and returns a corresponding Scala collection.
Assuming you set up the test
keyspace earlier, follow these steps to access a collection.
-
In the
test
keyspace, set up a collection set using cqlsh.CREATE TABLE test.users ( username text PRIMARY KEY, emails SET<text>); INSERT INTO test.users (username, emails) VALUES ('someone', {'someone@email.com', 's@email.com'});
-
If Spark is not running, start the Spark shell. Do not use sudo to start the shell.
bin/dse spark
The Welcome to Spark output and prompt appears.
-
Define a CassandraRDD[CassandraRow] to access the collection set.
val row = sc.cassandraTable("test", "users").collect().apply(0)
row: com.datastax.spark.connector.CassandraRow = CassandraRow{username: someone, emails: {s@email.com,someone@email.com}}
-
Query the collection set from Spark.
row.getList[String]("emails")
res2: Vector[String] = Vector(s@email.com, someone@email.com)
row.get[List[String]]("emails")
res3: List[String] = List(s@email.com, someone@email.com)
row.get[Seq[String]]("emails")
res4: Seq[String] = List(s@email.com, someone@email.com)
row.get[IndexedSeq[String]]("emails")
res5: IndexedSeq[String] = Vector(s@email.com, someone@email.com)
row.get[Set[String]]("emails")
res6: Set[String] = Set(s@email.com, someone@email.com)
row.get[String]("emails")
res7: String = {s@email.com,someone@email.com}
Restricting the number of fetched columns
For performance reasons, you should not fetch columns you don’t need. You can achieve this with the select method:
To restrict the number of fetched columns:
val row = sc.cassandraTable("test", "users").select("username").collect()
row: Array[com.datastax.spark.connector.CassandraRow] = Array(CassandraRow{username: someone})
Mapping rows to tuples and case classes
Instead of mapping your rows to objects of the CassandraRow class, you can directly unwrap column values into tuples of the desired type.
To map rows to tuples:
sc.cassandraTable[(String, Int)]("test", "words").select("word", "count").collect()
res9: Array[(String, Int)] = Array((bar,20), (foo,10))
sc.cassandraTable[(Int, String)]("test", "words").select("count", "word").collect()
res10: Array[(Int, String)] = Array((20,bar), (10,foo))
Define a case class with properties of the same name as the columns. For multi-word column identifiers, separate each word using an underscore when creating the columns, and use camel case abbreviation on the Scala side.
To map rows to case classes:
case class WordCount(word: String, count: Int)
defined class WordCount
sc.cassandraTable[WordCount]("test", "words").collect()
res14: Array[WordCount] = Array(WordCount(bar,20), WordCount(foo,20))
You can name columns using these conventions:
-
Use the underscore convention and lowercase letters. (Recommended)
-
Use the camel case convention, exactly the same as properties in Scala.
The following examples show valid column names.
Database column name | Scala property name |
---|---|
count |
count |
column_1 |
column1 |
user_name |
userName |
user_address |
UserAddress |
Database column name | Scala property name |
---|---|
count |
count |
column1 |
column1 |
userName |
userName |
UserAddress |
UserAddress |
Mapping rows to objects with a user-defined function
Invoke as on the CassandraRDD to map every row to an object of a different type. Contrary to map, as expects a function having the same number of arguments as the number of columns to be fetched. Invoking as in this way performs type conversions. Using as to directly create objects of a particular type eliminates the need to create CassandraRow objects and also decreases garbage collection pressure.
To map columns using a user-defined function:
val table = sc.cassandraTable("test", "words")
table: com.datastax.spark.connector.rdd.CassandraRDD[com.datastax.spark.connector.
CassandraRow] = CassandraRDD[9] at RDD at CassandraRDD.scala:47
val total = table.select("count").as((c: Int) => c).sum
total: Double = 30.0
val frequencies = table.select("word", "count").as((w: String, c: Int) => (w, c / total)).collect()
frequencies: Array[(String, Double)] = Array((bar,0.6666666666666666), (foo,0.3333333333333333))
Filtering rows on the server
To filter rows, you can use the filter transformation provided by Spark. Filter transformation fetches all rows from the database first and then filters them in Spark. Some CPU cycles are wasted serializing and de-serializing objects excluded from the result. To avoid this overhead, CassandraRDD has a method that passes an arbitrary CQL condition to filter the row set on the server.
This example shows how to use Spark to filter rows on the server.
-
Download and unzip the CQL commands for this example. The commands in this file perform the following tasks:
-
Create a cars table in the test keyspace.
-
Index the color column.
-
Insert some data into the table
-
-
Run the
test_cars.cql
file usingcqlsh
or DevCenter. For example usingcqlsh
:cqlsh -f test_cars.cql
-
Filter the rows using Spark:
sc.cassandraTable("test", "cars").select("id", "model").where("color = ?", "black").collect().foreach(println)
CassandraRow{id: AS-8888, model: Aston Martin DB9 Volante} CassandraRow{id: KF-334L, model: Ford Mondeo} CassandraRow{id: MT-8787, model: Hyundai x35} CassandraRow{id: MZ-1038, model: Mazda CX-9} CassandraRow{id: DG-2222, model: Dodge Avenger} CassandraRow{id: DG-8897, model: Dodge Charger} CassandraRow{id: BT-3920, model: Bentley Continental GT} CassandraRow{id: IN-9964, model: Infinity FX}
sc.cassandraTable("test", "cars").select("id", "model").where("color = ?", "silver").collect().foreach(println)
CassandraRow{id: FR-8877, model: Ferrari FF} CassandraRow{id: FR-8877, model: Ferrari FF} CassandraRow{id: HD-1828, model: Honda Accord} CassandraRow{id: WX-2234, model: Toyota Yaris}