Getting started with the Spark Cassandra Connector

The Spark Cassandra Connector allows you to create Java applications that use Spark to analyze database data. See the Spark Cassandra Connector Java Doc on GitHub. See the component versions for the latest version of the Spark Cassandra Connector used by DataStax Enterprise.

Using the Java API in SBT build files

Add the following library dependency to the build.sbt or other SBT build file.

val dseVersion = "6.0.0"
// Please make sure that following DSE version matches your DSE cluster version.
// SBT 0.13.13 or greater required because of a dependency resolution bug
libraryDependencies += "com.datastax.dse" % "dse-spark-dependencies" % dseVersion % "provided"

For example project templates, see https://github.com/datastax/SparkBuildExamples.

Using the Java API in Maven build files

Add the following dependencies to the pom.xml file:

    <dependency>
      <groupId>com.datastax.dse</groupId>
      <artifactId>dse-spark-dependencies</artifactId>
      <version>${dse.version}</version>
      <scope>provided</scope>
    </dependency>

Then add the DataStax repository:

    <repository>
      <id>DataStax-Repo</id>
      <url>https://repo.datastax.com/public-repos/</url>
    </repository>

For example project templates, see https://github.com/datastax/SparkBuildExamples

Accessing database data in Scala applications

To perform Spark actions on table data, you first obtain a RDD object. To create the RDD object, create a Spark configuration object, which is then used to create a Spark context object.

import com.datastax.spark.connector._
val conf = new SparkConf(true)
   .set("spark.cassandra.connection.host", "127.0.0.1")
val sc = new SparkContext("dse://127.0.0.1:7077", "test", conf)
val rdd = sc.cassandraTable("my_keyspace", "my_table")

To save data to the database in Scala applications, use the saveToCassandra method, passing in the keyspace, table, and mapping information.

val collection = sc.parallelize(Seq(("key3", 3), ("key4", 4)))
collection.saveToCassandra("my_keyspace", "my_table", SomeColumns("key", "value"))

To perform DSE Graph queries in a Scala application, you can cast a CassandraConnector session to a com.datastax.driver.dse.DseSession and then run graph statements using the executeGraph method.

val session = CassandraConnector(sc.getConf).withSessionDo(session => session.asInstanceOf[DseSession])
session.executeGraph(graph statement)

Accessing database data in Java applications

To perform Spark actions on table data, you first obtain a CassandraJavaRDD object, a subclass of the JavaRDD class. The CassandraJavaRDD is the Java language equivalent of the CassandraRDD object used in Scala applications.

To create the CassandraJavaRDD object, create a Spark configuration object, which is then used to create a Spark context object.

SparkConf conf = new SparkConf()
                .setAppName( "My application");
SparkContext sc = new SparkContext(conf);

Use the static methods of the com.datastax.spark.connector.japi.CassandraJavaUtil class to get and manipulate CassandraJavaRDD instances. To get a new CassandraJavaRDD instance, call one of the javaFunctions methods in CassandraJavaUtil, pass in a context object, and then call the cassandraTable method and pass in the keyspace, table name, and mapping class.

JavaRDDstring cassandraRdd = CassandraJavaUtil.javaFunctions(sc)
        .cassandraTable("my_keyspace", "my_table", .mapColumnTo(String.class))
        .select("my_column");

Mapping column data to Java types

You can specify the Java type of a single column from a table row by specifying the type in when creating the CassandraJavaRDD<T> instance and calling the mapColumnTo method and passing in the type. Then call the select method to set the column name.

JavaRDD<Integer> cassandraRdd = CassandraJavaUtil.javaFunctions(sc)
        .cassandraTable("my_keyspace", "my_table", .mapColumnTo(Integer.class))
        .select("column1");

JavaBeans classes can be mapped using the mapRowTo method. The JavaBeans property names should correspond to the column names following the default mapping rules. For example, the firstName property will map by default to the first_name column name.

JavaRDD<Person> personRdd = CassandraJavaUtil.javaFunctions(sc)
                .cassandraTable("my_keyspace", "my_table", mapRowTo(Person.class));

CassandraJavaPairRDD<T, T> instances are extensions of the JavaPairRDD class, and have mapping readers for rows and columns similar to the previous examples. These pair RDDs typically are used for key/value pairs, where the first type is the key and the second type is the value.

When mapping a single column for both the key and the value, call mapColumnTo and specify the key and value types, then the select method and pass in the key and value column names.

CassandraJavaPairRDD<Integer, String> pairRdd = CassandraJavaUtil.javaFunctions(sc)
        .cassandraTable("my_keyspace", "my_table", mapColumnTo(Integer.class), mapColumnTo(String.class))
        select("id", "first_name");

Use the mapRowTo method to map row data to a Java type. For example, to create a pair RDD instance with the primary key and then a JavaBeans object:

CassandraJavaPairRDD<Integer, Person> idPersonRdd = CassandraJavaUtil.javaFunctions(sc)
        .cassandraTable("my_keyspace", "my_table", mapColumnTo(Integer.class), mapRowTo(Person.class))
        .select("id", "first_name", "last_name", "birthdate", "email");

Saving data to the database in Java applications

To save data from an RDD to the database call the writerBuilder method on the CassandraJavaRDD instance, passing in the keyspace, table name, and optionally type mapping information for the column or row.

CassandraJavaUtil.javaFunctions(personRdd)
                .writerBuilder("my_keyspace", "my_table", mapToRow(Person.class)).saveToCassandra();

Was this helpful?

Give Feedback

How can we improve the documentation?

© 2024 DataStax | Privacy policy | Terms of use

Apache, Apache Cassandra, Cassandra, Apache Tomcat, Tomcat, Apache Lucene, Apache Solr, Apache Hadoop, Hadoop, Apache Pulsar, Pulsar, Apache Spark, Spark, Apache TinkerPop, TinkerPop, Apache Kafka and Kafka are either registered trademarks or trademarks of the Apache Software Foundation or its subsidiaries in Canada, the United States and/or other countries. Kubernetes is the registered trademark of the Linux Foundation.

General Inquiries: +1 (650) 389-6000, info@datastax.com