Saving data to Cassandra
With DataStax Enterprise, you can save almost any RDD to Cassandra. Before you use the RDD in a standalone application, import com.datastax.spark.connector.
With DataStax Enterprise, you can save almost any RDD to Cassandra. Unless you do not provide a custom mapping, the object class of the RDD must be a tuple or have property names corresponding to Cassandra column names. To save the RDD, call the saveToCassandra method with a keyspace name, table name, and optionally, a list of columns. Before attempting to use the RDD in a standalone application, import com.datastax.spark.connector.
Saving a collection of tuples
The following example shows how to save a collection of tuples to Cassandra.
scala> val collection = sc.parallelize(Seq(("cat", 30), ("fox", 40))) collection: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[6] at parallelize at <console>:22 scala> collection.saveToCassandra("test", "words", SomeColumns("word", "count")) scala>
At the last scala prompt in this example, no output means that the data was saved to Cassandra.
In cqlsh, query the words table to select all the contents.
SELECT * FROM test.words; word | count ------+------- bar | 20 foo | 10 cat | 30 fox | 40 (4 rows)
Saving a collection of case class objects to Cassandra
The following example shows how to save a collection of case class objects.
scala> case class WordCount(word: String, count: Long) defined class WordCount scala> val collection = sc.parallelize(Seq(WordCount("dog", 50), WordCount("cow", 60))) collection: org.apache.spark.rdd.RDD[WordCount] = ParallelCollectionRDD[0] at parallelize at <console>:24 scala> collection.saveToCassandra("test", "words", SomeColumns("word", "count")) scala>
In cqlsh, query the words table to select all the contents.
SELECT * FROM test.words; word | count ------+------- bar | 20 foo | 10 cat | 30 fox | 40 dog | 50 cow | 60
Using non-default property-name to column-name mappings
Mapping rows to tuples and case classes work out-of-the box, but in some cases, you might need more control over Cassandra-Scala mapping. For example, Java classes are likely to use the JavaBeans naming convention, where accessors are named with get, is or set prefixes. To customize column-property mappings, put an appropriate ColumnMapper[YourClass] implicit object in scope. Define such an object in a companion object of the class being mapped. The ColumnMapper affects both loading and saving data. DataStax Enterprise includes a few ColumnMapper implementations.
Working with JavaBeans
To work with Java classes, use JavaBeanColumnMapper. Make sure objects are serializable; otherwise Spark cannot send them over the network. The following example shows how to use the JavaBeanColumnMapper.
To use JavaBean style accessors:
scala> :paste // Entering paste mode (ctrl-D to finish)
Paste this import command and class definition:
import com.datastax.spark.connector.mapper.JavaBeanColumnMapper class WordCount extends Serializable { private var _word: String = "" private var _count: Int = 0 def setWord(word: String) { _word = word } def setCount(count: Int) { _count = count } override def toString = _word + ":" + _count } object WordCount { implicit object Mapper extends JavaBeanColumnMapper[WordCount] }
Enter CTRL D to exit paste mode. The output is:
// Exiting paste mode, now interpreting. import com.datastax.spark.connector.mapper.JavaBeanColumnMapper defined class WordCount defined module WordCount scala>
Query the WordCount object.
sc.cassandraTable[WordCount]("test", "words").toArray res18: Array[WordCount] = Array(cow:60, bar:20, foo:10, cat:30, fox:40, dog:50)
To save the data, you need to define getters.
Manually specifying a property-name to column-name relationship
If for some reason you want to associate a property with a column of a different name, pass a column translation map to the DefaultColumnMapper or JavaBeanColumnMapper.
To change column names:
scala> :paste // Entering paste mode (ctrl-D to finish) import com.datastax.spark.connector.mapper.DefaultColumnMapper case class WordCount(w: String, c: Int) object WordCount { implicit object Mapper extends DefaultColumnMapper[WordCount](Map("w" -> "word", "c" -> "count")) }
Enter CTRL D.
// Exiting paste mode, now interpreting. import com.datastax.spark.connector.mapper.DefaultColumnMapper defined class WordCount defined module WordCount
Continue entering these commands:
scala> sc.cassandraTable[WordCount]("test", "words").toArray res21: Array[WordCount] = Array(WordCount(cow,60), WordCount(bar,20), WordCount(foo,10), WordCount(cat,30), WordCount(fox,40), WordCount(dog,50)) scala> sc.parallelize(Seq(WordCount("bar",20),WordCount("foo",40))).saveToCassandra("test", "words", SomeColumns("word", "count")) scala>
Writing custom ColumnMappers
To define column mappings for your classes, create an appropriate implicit object implementing ColumnMapper[YourClass] trait.