Querying Cassandra data using Spark SQL in Java

You can execute Spark SQL queries in Java applications that traverse over Cassandra column families. Java applications that query Cassandra data using Spark SQL require a Spark configuration instance and Spark context instance.

Java applications that query Cassandra data using Spark SQL first need a Spark configuration instance and Spark context instance.

Create the Spark configuration object by calling the enrichSparkConf method of com.datastax.bdp.spark.DseSparkConfHelper, creating a new default SparkConf object. DseSparkConfHelper is found in dse.jar. The returned SparkConf instance is automatically configured for your cluster, including the Spark Master and Cassandra host. This can then be used to create a new com.datastax.bdp.spark.DseSparkContext object. DseSparkContext automatically adjusts your configuration for DataStax Enterprise. It is also located in dse.jar.

The default location of the dse.jar file depends on the type of installation:
Installer-Services and Package installations /usr/share/dse/dse.jar
Installer-No Services and Tarball installations install_location/lib/dse.jar

This Spark context object is used to create a Cassandra-aware Spark SQL context object to connect to Cassandra. This object is an instance of org.apache.spark.sql.cassandra.CassandraSQLContext.

// create a new configuration
SparkConf conf = DseSparkConfHelper.enrichSparkConf(new SparkConf())
                .setAppName( "My application");
// create a Spark context
DseSparkContext sc = new DseSparkContext.apply(conf);
// create a Cassandra Spark SQL context
CassandraSQLContext cassandraSQLContext = new CassandraSQLContext(sc);

Once the Spark SQL context has been created, you can use it to register RDDs and execute Spark SQL queries. Queries are executed by calling the CassandaSQLContext.sql method. You can register an RDD as a table by calling SchemaRDD.registerTempTable to perform further Spark SQL queries on the results.

SchemaRDD employees = cassandraContext.sql("SELECT * FROM company.employees");
employees.registerTempTable("employees");
SchemaRDD managers = cassandraContext.sql("SELECT name FROM employees WHERE role == 'Manager' ");

The returned RDD objects support the standard RDD operations.

List<String> managerNames = managers.map(new Function<Row, String>() {
    public String call(Row row) {
        return "Name: " + row.getString(0);
    }
}).collect();