Saving RDD data to DSE
With DataStax Enterprise, you can save almost any RDD to the database. Before you use the RDD in a standalone application, import com.datastax.spark.connector.
With DataStax Enterprise, you can save almost any RDD to the database. Unless you do not provide a custom mapping, the object class of the RDD must be a tuple or have property names corresponding to table column names. To save the RDD, call the saveToCassandra method with a keyspace name, table name, and optionally, a list of columns. Before attempting to use the RDD in a standalone application, import com.datastax.spark.connector.
Saving a collection of tuples
The following example shows how to save a collection of tuples to the database.
val collection = sc.parallelize(Seq(("cat", 30), ("fox", 40)))
collection: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[6] at parallelize at console:22
collection.saveToCassandra("test", "words", SomeColumns("word", "count"))
At the last Scala prompt in this example, no output means that the data was saved to the database.
In cqlsh
, query the words table to select all the contents.
SELECT * FROM test.words;
word | count ------+------- bar | 20 foo | 10 cat | 30 fox | 40 (4 rows)
Saving a collection of case class objects to the database
The following example shows how to save a collection of case class objects.
case class WordCount(word: String, count: Long) val collection = sc.parallelize(Seq(WordCount("dog", 50), WordCount("cow", 60))) collection.saveToCassandra("test", "words", SomeColumns("word", "count"))
In cqlsh
, query the words table to select all the contents.
SELECT * FROM test.words;
word | count ------+------- bar | 20 foo | 10 cat | 30 fox | 40 dog | 50 cow | 60
Using non-default property-name to column-name mappings
Mapping rows to tuples and case classes work out-of-the box, but in some cases, you might need more control over database-Scala mapping. For example, Java classes are likely to use the JavaBeans naming convention, where accessors are named with get, is or set prefixes. To customize column-property mappings, put an appropriate ColumnMapper[YourClass] implicit object in scope. Define such an object in a companion object of the class being mapped. The ColumnMapper affects both loading and saving data. DataStax Enterprise includes a few ColumnMapper implementations.
Working with JavaBeans
To work with Java classes, use JavaBeanColumnMapper. Make sure objects are serializable; otherwise Spark cannot send them over the network. The following example shows how to use the JavaBeanColumnMapper.
To use JavaBeans style accessors:
:paste // Entering paste mode (ctrl-D to finish)
Paste this import command and class definition:
import com.datastax.spark.connector.mapper.JavaBeanColumnMapper class WordCount extends Serializable { private var _word: String = "" private var _count: Int = 0 def setWord(word: String) { _word = word } def setCount(count: Int) { _count = count } override def toString = _word + ":" + _count } object WordCount { implicit object Mapper extends JavaBeanColumnMapper[WordCount] }
Enter CTRL D to exit paste mode. The output is:
// Exiting paste mode, now interpreting. import com.datastax.spark.connector.mapper.JavaBeanColumnMapper defined class WordCount defined module WordCount
Query the WordCount object.
sc.cassandraTable[WordCount]("test", "words").collect
To save the data, you need to define getters.
Manually specifying a property-name to column-name relationship
If for some reason you want to associate a property with a column of a different name, pass a column translation map to the DefaultColumnMapper or JavaBeanColumnMapper.
To change column names:
:paste // Entering paste mode (ctrl-D to finish) import com.datastax.spark.connector.mapper.DefaultColumnMapper case class WordCount(w: String, c: Int) object WordCount { implicit object Mapper extends DefaultColumnMapper[WordCount](Map("w" -> "word", "c" -> "count")) }
Enter CTRL D.
// Exiting paste mode, now interpreting. import com.datastax.spark.connector.mapper.DefaultColumnMapper defined class WordCount defined module WordCount
Continue entering these commands:
sc.cassandraTable[WordCount]("test", "words").collect sc.parallelize(Seq(WordCount("bar",20),WordCount("foo",40))).saveToCassandra("test", "words", SomeColumns("word", "count"))
Writing custom ColumnMappers
To define column mappings for your classes, create an appropriate implicit object implementing ColumnMapper[YourClass] trait.