Saving RDD data to DSE

Before you use the RDD in a standalone application, import com.datastax.spark.connector.

With DataStax Enterprise, you can save almost any RDD to the database. Unless you do not provide a custom mapping, the object class of the RDD must be a tuple or have property names corresponding to table column names. To save the RDD, call the saveToCassandra method with a keyspace name, table name, and optionally, a list of columns. Before attempting to use the RDD in a standalone application, import com.datastax.spark.connector.

You can also use the DataFrames API to manipulate data within Spark.

Saving a collection of tuples

The following example shows how to save a collection of tuples to the database.

val collection = sc.parallelize(Seq(("cat", 30), ("fox", 40)))
collection: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[6] at parallelize at console:22
collection.saveToCassandra("test", "words", SomeColumns("word", "count"))

At the last Scala prompt in this example, no output means that the data was saved to the database.

In cqlsh, query the words table to select all the contents.

SELECT * FROM test.words;
 word | count
------+-------
  bar |    20
  foo |    10
  cat |    30
  fox |    40

(4 rows)

Saving a collection of case class objects to the database

The following example shows how to save a collection of case class objects.

case class WordCount(word: String, count: Long)
val collection = sc.parallelize(Seq(WordCount("dog", 50), WordCount("cow", 60)))
collection.saveToCassandra("test", "words", SomeColumns("word", "count"))

In cqlsh, query the words table to select all the contents.

SELECT * FROM test.words;
 word | count
------+-------
 bar |    20
 foo |    10
 cat |    30
 fox |    40
 dog |    50
 cow |    60

Using non-default property-name to column-name mappings

Mapping rows to tuples and case classes work out-of-the box, but in some cases, you might need more control over database-Scala mapping. For example, Java classes are likely to use the JavaBeans naming convention, where accessors are named with get, is or setprefixes. To customize column-property mappings, put an appropriateColumnMapper[YourClass] implicit object in scope. Define such an object in a companion object of the class being mapped. The ColumnMapper affects both loading and saving data. DataStax Enterprise includes a few ColumnMapper implementations.

Working with JavaBeans

To work with Java classes, use JavaBeanColumnMapper. Make sure objects are serializable; otherwise Spark cannot send them over the network. The following example shows how to use the JavaBeanColumnMapper.

To use JavaBeans style accessors:

:paste
// Entering paste mode (ctrl-D to finish)

Paste this import command and class definition:

import com.datastax.spark.connector.mapper.JavaBeanColumnMapper
    class WordCount extends Serializable {
      private var _word: String = ""
      private var _count: Int = 0
      def setWord(word: String) { _word = word }
      def setCount(count: Int) { _count = count }
      override def toString = _word + ":" + _count
    }
    object WordCount {
      implicit object Mapper extends JavaBeanColumnMapper[WordCount]
    }

Enter CTRL D to exit paste mode. The output is:

// Exiting paste mode, now interpreting.

    import com.datastax.spark.connector.mapper.JavaBeanColumnMapper
    defined class WordCount
    defined module WordCount

Query the WordCount object.

sc.cassandraTable[WordCount]("test", "words").collect

To save the data, you need to define getters.

Manually specifying a property-name to column-name relationship

If for some reason you want to associate a property with a column of a different name, pass a column translation map to the DefaultColumnMapper or JavaBeanColumnMapper.

To change column names:

:paste
// Entering paste mode (ctrl-D to finish)

import com.datastax.spark.connector.mapper.DefaultColumnMapper
case class WordCount(w: String, c: Int)
object WordCount { implicit object Mapper extends
DefaultColumnMapper[WordCount](Map("w" -> "word", "c" -> "count")) }

Enter CTRL D.

// Exiting paste mode, now interpreting.

import com.datastax.spark.connector.mapper.DefaultColumnMapper
defined class WordCount
defined module WordCount

Continue entering these commands:

sc.cassandraTable[WordCount]("test", "words").collect
sc.parallelize(Seq(WordCount("bar",20),WordCount("foo",40))).saveToCassandra("test", "words", SomeColumns("word", "count"))

Writing custom ColumnMappers

To define column mappings for your classes, create an appropriate implicit object implementing ColumnMapper[YourClass] trait.

Was this helpful?

Give Feedback

How can we improve the documentation?

© 2024 DataStax | Privacy policy | Terms of use

Apache, Apache Cassandra, Cassandra, Apache Tomcat, Tomcat, Apache Lucene, Apache Solr, Apache Hadoop, Hadoop, Apache Pulsar, Pulsar, Apache Spark, Spark, Apache TinkerPop, TinkerPop, Apache Kafka and Kafka are either registered trademarks or trademarks of the Apache Software Foundation or its subsidiaries in Canada, the United States and/or other countries. Kubernetes is the registered trademark of the Linux Foundation.

General Inquiries: +1 (650) 389-6000, info@datastax.com