Saving RDD data to DSE
Before you use the RDD in a standalone application, import com.datastax.spark.connector
.
With DataStax Enterprise, you can save almost any RDD to the database.
Unless you do not provide a custom mapping, the object class of the RDD must be a tuple or have property names corresponding to table column names.
To save the RDD, call the saveToCassandra
method with a keyspace name, table name, and optionally, a list of columns.
Before attempting to use the RDD in a standalone application, import com.datastax.spark
.connector.
You can also use the DataFrames API to manipulate data within Spark.
Saving a collection of tuples
The following example shows how to save a collection of tuples to the database.
val collection = sc.parallelize(Seq(("cat", 30), ("fox", 40)))
collection: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[6] at parallelize at console:22
collection.saveToCassandra("test", "words", SomeColumns("word", "count"))
At the last Scala prompt in this example, no output means that the data was saved to the database.
In cqlsh
, query the words table to select all the contents.
SELECT * FROM test.words;
word | count
------+-------
bar | 20
foo | 10
cat | 30
fox | 40
(4 rows)
Saving a collection of case class objects to the database
The following example shows how to save a collection of case class objects.
case class WordCount(word: String, count: Long)
val collection = sc.parallelize(Seq(WordCount("dog", 50), WordCount("cow", 60)))
collection.saveToCassandra("test", "words", SomeColumns("word", "count"))
In cqlsh
, query the words table to select all the contents.
SELECT * FROM test.words;
word | count
------+-------
bar | 20
foo | 10
cat | 30
fox | 40
dog | 50
cow | 60
Using non-default property-name to column-name mappings
Mapping rows to tuples and case classes work out-of-the box, but in some cases, you might need more control over database-Scala mapping.
For example, Java classes are likely to use the JavaBeans naming convention, where accessors are named with get, is or setprefixes.
To customize column-property mappings, put an appropriateColumnMapper[YourClass]
implicit object in scope.
Define such an object in a companion object of the class being mapped.
The ColumnMapper
affects both loading and saving data.
DataStax Enterprise includes a few ColumnMapper
implementations.
Working with JavaBeans
To work with Java classes, use JavaBeanColumnMapper
.
Make sure objects are serializable;
otherwise Spark cannot send them over the network.
The following example shows how to use the JavaBeanColumnMapper
.
To use JavaBeans style accessors:
:paste
// Entering paste mode (ctrl-D to finish)
Paste this import command and class definition:
import com.datastax.spark.connector.mapper.JavaBeanColumnMapper
class WordCount extends Serializable {
private var _word: String = ""
private var _count: Int = 0
def setWord(word: String) { _word = word }
def setCount(count: Int) { _count = count }
override def toString = _word + ":" + _count
}
object WordCount {
implicit object Mapper extends JavaBeanColumnMapper[WordCount]
}
Enter CTRL D to exit paste mode. The output is:
// Exiting paste mode, now interpreting.
import com.datastax.spark.connector.mapper.JavaBeanColumnMapper
defined class WordCount
defined module WordCount
Query the WordCount
object.
sc.cassandraTable[WordCount]("test", "words").collect
To save the data, you need to define getters.
Manually specifying a property-name to column-name relationship
If for some reason you want to associate a property with a column of a different name, pass a column translation map to the DefaultColumnMapper
or JavaBeanColumnMapper
.
To change column names:
:paste
// Entering paste mode (ctrl-D to finish)
import com.datastax.spark.connector.mapper.DefaultColumnMapper
case class WordCount(w: String, c: Int)
object WordCount { implicit object Mapper extends
DefaultColumnMapper[WordCount](Map("w" -> "word", "c" -> "count")) }
Enter CTRL D.
// Exiting paste mode, now interpreting.
import com.datastax.spark.connector.mapper.DefaultColumnMapper
defined class WordCount
defined module WordCount
Continue entering these commands:
sc.cassandraTable[WordCount]("test", "words").collect
sc.parallelize(Seq(WordCount("bar",20),WordCount("foo",40))).saveToCassandra("test", "words", SomeColumns("word", "count"))
Writing custom ColumnMappers
To define column mappings for your classes, create an appropriate implicit object implementing ColumnMapper[YourClass]
trait.