Getting started with the Spark Cassandra Connector Java API
The Spark Cassandra Connector Java API allows you to create Java applications that use Spark to analyze Cassandra data.
The Spark Cassandra Connector Java API allows you to create Java applications that use Spark to analyze Cassandra data.
Using the Java API in SBT build files
Add the following library dependency to the build.sbt or other SBT build file.
libraryDependencies + =" com.datastax.spark " %%" spark-cassandra-connector-java_2.10 " %" 1.1.2 " withSources() withJavadoc()
Using the Java API in Maven build files
Add the following dependencies to the pom.xml file:
<dependencies>
<dependency>
<groupId>com.datastax.spark</groupId>
<artifactId>spark-cassandra-connector-java_2.10</artifactId>
<version>1.1.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.1.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>1.1.1</version>
<scope>provided</scope>
</dependency>
...
</dependencies>
To use the helper classes included in dse.jar in your applications, copy dse.jar to the project_directory/lib and add the following to your pom.xml:
<dependency>
<groupId>com.datastax</groupId>
<artifactId>dse</artifactId>
<version>version_number</version>
<scope>system</scope>
<systemPath>${project.basedir}/lib/dse-version number.jar</systemPath>
</dependency>
Alternately, you can manually install dse.jar in your local repository.
$ mvn install:install-file -Dfile=path/dse-version_number.jar -DgroupId=com.datastax -DartifactId=dse -Dversion=version number -Dpackaging=jar
And then add the dependency to pom.xml:
<dependency>
<groupId>com.datastax</groupId>
<artifactId>dse</artifactId>
<version>version_number</version>
</dependency>
Accessing Cassandra data in Java applications
To perform Spark actions on Cassandra table data, you first obtain a CassandraJavaRDD object, a subclass of the JavaRDD class. The CassandraJavaRDD is the Java language equivalent of the CassandraRDD object used in Scala applications.
To create the CassandraJavaRDD object, you need to create a Spark configuration object, which is then used to create a Spark context object.
Create the Spark configuration object by calling the
enrichSparkConf
method of
com.datastax.bdp.spark.DseSparkConfHelper, creating a new
default SparkConf object. DseSparkConfHelper
is found in dse.jar. The returned
SparkConf instance is automatically configured for your
cluster, including the Spark Master and Cassandra host. This can then be used to
create a new com.datastax.bdp.spark.DseSparkContext object.
DseSparkContext automatically adjusts your configuration for
DataStax Enterprise. It is also located in dse.jar.
SparkConf conf = DseSparkConfHelper.enrichSparkConf(new SparkConf())
.setAppName( "My application");
DseSparkContext sc = new DseSparkContext.apply(conf);
Installer-Services and Package installations | /usr/share/dse/dse.jar |
Installer-No Services and Tarball installations | install_location/lib/dse.jar |
Use the static methods of the
com.datastax.spark.connector.japi.CassandraJavaUtil class to
get and manipulate CassandraJavaRDD instances. To get a new
CassandraJavaRDD instance, call one of the
javaFunctions
methods in CassandraJavaUtil,
pass in a context object, and then call the cassandraTable
method
and pass in the keyspace, table name, and mapping class.
JavaRDD<String> cassandraRdd = CassandraJavaUtil.javaFunctions(sc)
.cassandraTable("my_keyspace", "my_table", mapColumnTo(String.class))
.select("my_column");
Mapping Cassandra column data to Java types
You can specify the Java type of a single column from a table row by specifying the
type in when creating the CassandraJavaRDD<T> instance and
calling the mapColumnTo
method and passing in the type. Then call
the select
method to set the column name in Cassandra.
JavaRDD<Integer> cassandraRdd = CassandraJavaUtil.javaFunctions(sc)
.cassandraTable("my_keyspace", "my_table", mapColumnTo(Integer.class))
.select("column1");
JavaBeans classes can be mapped using the mapRowTo
method. The
JavaBeans property names should correspond to the column names following the default
mapping rules. For example, the firstName
property will map by
default to the first_name
column name.
JavaRDD<Person> personRdd = CassandraJavaUtil.javaFunctions(sc)
.cassandraTable("my_keyspace", "my_table", mapRowTo(Person.class));
CassandraJavaPairRDD<T, T> instances are extensions of the JavaPairRDD class, and have mapping readers for rows and columns similar to the previous examples. These pair RDDs typically are used for key/value pairs, where the first type is the key and the second type is the value.
When mapping a single column for both the key and the value, call
mapColumnTo
and specify the key and value types, then the
select
method and pass in the key and value column names.
CassandraJavaPairRDD<Integer, String> pairRdd = CassandraJavaUtil.javaFunctions(sc)
.cassandraTable("my_keyspace", "my_table", mapColumnTo(Integer.class), mapColumnTo(String.class))
select("id", "first_name");
Use the mapRowTo
method to map row data to a Java type. For example,
to create a pair RDD instance with the primary key and then a JavaBeans object:
CassandraJavaPairRDD<Integer, Person> idPersonRdd = CassandraJavaUtil.javaFunctions(sc)
.cassandraTable("my_keyspace", "my_table", mapColumnTo(Integer.class), mapRowTo(Person.class))
.select("id", "first_name", "last_name", "birthdate", "email");
Saving data to Cassandra
To save data from an RDD to Cassandra call the writerBuilder
method
on the CassandraJavaRDD instance, passing in the keyspace, table
name, and optionally type mapping information for the column or row.
CassandraJavaUtil.javaFunctions(personRdd)
.writerBuilder("my_keyspace", "my_table", mapToRow(Person.class)).saveToCassandra();