Saving RDD data to DSE

With DataStax Enterprise, you can save almost any RDD to the database. Before you use the RDD in a standalone application, import com.datastax.spark.connector.

With DataStax Enterprise, you can save almost any RDD to the database. Unless you do not provide a custom mapping, the object class of the RDD must be a tuple or have property names corresponding to table column names. To save the RDD, call the saveToCassandra method with a keyspace name, table name, and optionally, a list of columns. Before attempting to use the RDD in a standalone application, import com.datastax.spark.connector.

You can also use the DataFrames API to manipulate data within Spark.

Saving a collection of tuples

The following example shows how to save a collection of tuples to the database.

val collection = sc.parallelize(Seq(("cat", 30), ("fox", 40)))
collection: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[6] at parallelize at console:22
collection.saveToCassandra("test", "words", SomeColumns("word", "count"))

At the last Scala prompt in this example, no output means that the data was saved to the database.

In cqlsh, query the words table to select all the contents.

SELECT * FROM test.words;
 word | count
------+-------
  bar |    20
  foo |    10
  cat |    30
  fox |    40

(4 rows)

Saving a collection of case class objects to the database

The following example shows how to save a collection of case class objects.

case class WordCount(word: String, count: Long)
val collection = sc.parallelize(Seq(WordCount("dog", 50), WordCount("cow", 60)))
collection.saveToCassandra("test", "words", SomeColumns("word", "count"))

In cqlsh, query the words table to select all the contents.

SELECT * FROM test.words;
 word | count
------+-------
 bar |    20
 foo |    10
 cat |    30
 fox |    40
 dog |    50
 cow |    60

Using non-default property-name to column-name mappings

Mapping rows to tuples and case classes work out-of-the box, but in some cases, you might need more control over database-Scala mapping. For example, Java classes are likely to use the JavaBeans naming convention, where accessors are named with get, is or set prefixes. To customize column-property mappings, put an appropriate ColumnMapper[YourClass] implicit object in scope. Define such an object in a companion object of the class being mapped. The ColumnMapper affects both loading and saving data. DataStax Enterprise includes a few ColumnMapper implementations.

Working with JavaBeans

To work with Java classes, use JavaBeanColumnMapper. Make sure objects are serializable; otherwise Spark cannot send them over the network. The following example shows how to use the JavaBeanColumnMapper.

To use JavaBeans style accessors:

:paste
// Entering paste mode (ctrl-D to finish)
      

Paste this import command and class definition:

import com.datastax.spark.connector.mapper.JavaBeanColumnMapper
    class WordCount extends Serializable { 
      private var _word: String = ""
      private var _count: Int = 0
      def setWord(word: String) { _word = word }
      def setCount(count: Int) { _count = count }
      override def toString = _word + ":" + _count
    }
    object WordCount {
      implicit object Mapper extends JavaBeanColumnMapper[WordCount] 
    } 

Enter CTRL D to exit paste mode. The output is:

// Exiting paste mode, now interpreting.

    import com.datastax.spark.connector.mapper.JavaBeanColumnMapper
    defined class WordCount
    defined module WordCount

    

Query the WordCount object.

sc.cassandraTable[WordCount]("test", "words").collect

To save the data, you need to define getters.

Manually specifying a property-name to column-name relationship

If for some reason you want to associate a property with a column of a different name, pass a column translation map to the DefaultColumnMapper or JavaBeanColumnMapper.

To change column names:

:paste
// Entering paste mode (ctrl-D to finish)

import com.datastax.spark.connector.mapper.DefaultColumnMapper
case class WordCount(w: String, c: Int)
object WordCount { implicit object Mapper extends
DefaultColumnMapper[WordCount](Map("w" -> "word", "c" -> "count")) }

Enter CTRL D.

// Exiting paste mode, now interpreting.

import com.datastax.spark.connector.mapper.DefaultColumnMapper
defined class WordCount
defined module WordCount

Continue entering these commands:

sc.cassandraTable[WordCount]("test", "words").collect
sc.parallelize(Seq(WordCount("bar",20),WordCount("foo",40))).saveToCassandra("test", "words", SomeColumns("word", "count"))

Writing custom ColumnMappers

To define column mappings for your classes, create an appropriate implicit object implementing ColumnMapper[YourClass] trait.