Saving data to Cassandra

With DataStax Enterprise, you can save almost any RDD to Cassandra. Before you use the RDD in a standalone application, import com.datastax.spark.connector.

With DataStax Enterprise, you can save almost any RDD to Cassandra. Unless you do not provide a custom mapping, the object class of the RDD must be a tuple or have property names corresponding to Cassandra column names. To save the RDD, call the saveToCassandra method with a keyspace name, table name, and optionally, a list of columns. Before attempting to use the RDD in a standalone application, import com.datastax.spark.connector.

Saving a collection of tuples 

The following example shows how to save a collection of tuples to Cassandra.

scala> val collection = sc.parallelize(Seq(("cat", 30), ("fox", 40)))
collection: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[6] at parallelize at <console>:22

scala> collection.saveToCassandra("test", "words", SomeColumns("word", "count"))

scala>            

At the last scala prompt in this example, no output means that the data was saved to Cassandra.

In cqlsh, query the words table to select all the contents.

SELECT * FROM test.words;

 word | count
------+-------
  bar |    20
  foo |    10
  cat |    30
  fox |    40

(4 rows)

Saving a collection of case class objects to Cassandra 

The following example shows how to save a collection of case class objects.

scala> case class WordCount(word: String, count: Long)
defined class WordCount

scala> val collection = sc.parallelize(Seq(WordCount("dog", 50), WordCount("cow", 60)))
collection: org.apache.spark.rdd.RDD[WordCount] = ParallelCollectionRDD[0] at parallelize at <console>:24

scala> collection.saveToCassandra("test", "words", SomeColumns("word", "count"))

scala>
      

In cqlsh, query the words table to select all the contents.

SELECT * FROM test.words;

 word | count
------+-------
 bar |    20
 foo |    10
 cat |    30
 fox |    40
 dog |    50
 cow |    60

Using non-default property-name to column-name mappings 

Mapping rows to tuples and case classes work out-of-the box, but in some cases, you might need more control over Cassandra-Scala mapping. For example, Java classes are likely to use the JavaBeans naming convention, where accessors are named with get, is or set prefixes. To customize column-property mappings, put an appropriate ColumnMapper[YourClass] implicit object in scope. Define such an object in a companion object of the class being mapped. The ColumnMapper affects both loading and saving data. DataStax Enterprise includes a few ColumnMapper implementations.

Working with JavaBeans

To work with Java classes, use JavaBeanColumnMapper. Make sure objects are serializable; otherwise Spark cannot send them over the network. The following example shows how to use the JavaBeanColumnMapper.

To use JavaBean style accessors:

scala> :paste
// Entering paste mode (ctrl-D to finish)
      

Paste this import command and class definition:

    import com.datastax.spark.connector.mapper.JavaBeanColumnMapper
    class WordCount extends Serializable { 
      private var _word: String = ""
      private var _count: Int = 0
      def setWord(word: String) { _word = word }
      def setCount(count: Int) { _count = count }
      override def toString = _word + ":" + _count
    }
    object WordCount {
      implicit object Mapper extends JavaBeanColumnMapper[WordCount] 
    } 

Enter CTRL D to exit paste mode. The output is:

    // Exiting paste mode, now interpreting.

    import com.datastax.spark.connector.mapper.JavaBeanColumnMapper
    defined class WordCount
    defined module WordCount

    scala>

Query the WordCount object.

sc.cassandraTable[WordCount]("test", "words").toArray
res18: Array[WordCount] = Array(cow:60, bar:20, foo:10, cat:30, fox:40, dog:50)

To save the data, you need to define getters.

Manually specifying a property-name to column-name relationship 

If for some reason you want to associate a property with a column of a different name, pass a column translation map to the DefaultColumnMapper or JavaBeanColumnMapper.

To change column names:

scala> :paste
// Entering paste mode (ctrl-D to finish)

import com.datastax.spark.connector.mapper.DefaultColumnMapper
case class WordCount(w: String, c: Int)
object WordCount { implicit object Mapper extends
DefaultColumnMapper[WordCount](Map("w" -> "word", "c" -> "count")) }

Enter CTRL D.

// Exiting paste mode, now interpreting.

import com.datastax.spark.connector.mapper.DefaultColumnMapper
defined class WordCount
defined module WordCount

Continue entering these commands:

scala> sc.cassandraTable[WordCount]("test", "words").toArray
res21: Array[WordCount] = Array(WordCount(cow,60), WordCount(bar,20), WordCount(foo,10), WordCount(cat,30), WordCount(fox,40), WordCount(dog,50))

scala> sc.parallelize(Seq(WordCount("bar",20),WordCount("foo",40))).saveToCassandra("test", "words", SomeColumns("word", "count"))

scala>

Writing custom ColumnMappers 

To define column mappings for your classes, create an appropriate implicit object implementing ColumnMapper[YourClass] trait.