Querying Cassandra data using Spark SQL in Java
You can execute Spark SQL queries in Java applications that traverse over Cassandra column families. Java applications that query Cassandra data using Spark SQL require a Spark configuration instance and Spark context instance.
Java applications that query Cassandra data using Spark SQL first need a Spark configuration instance and Spark context instance.
Create the Spark configuration object by calling the
enrichSparkConf
method of
com.datastax.bdp.spark.DseSparkConfHelper, creating a new
default SparkConf object. DseSparkConfHelper
is found in dse.jar. The returned
SparkConf instance is automatically configured for your
cluster, including the Spark Master and Cassandra host. This can then be used to
create a new com.datastax.bdp.spark.DseSparkContext object.
DseSparkContext automatically adjusts your configuration for
DataStax Enterprise. It is also located in dse.jar.
Installer-Services and Package installations | /usr/share/dse/dse.jar |
Installer-No Services and Tarball installations | install_location/lib/dse.jar |
This Spark context object is used to create a Cassandra-aware Spark SQL context object to connect to Cassandra. This object is an instance of org.apache.spark.sql.cassandra.CassandraSQLContext.
// create a new configuration
SparkConf conf = DseSparkConfHelper.enrichSparkConf(new SparkConf())
.setAppName( "My application");
// create a Spark context
DseSparkContext sc = new DseSparkContext.apply(conf);
// create a Cassandra Spark SQL context
CassandraSQLContext cassandraSQLContext = new CassandraSQLContext(sc);
Once the Spark SQL context has been created, you can use it to register RDDs and execute Spark SQL queries. Queries are executed by calling the CassandaSQLContext.sql method. You can register an RDD as a table by calling SchemaRDD.registerTempTable to perform further Spark SQL queries on the results.
SchemaRDD employees = cassandraContext.sql("SELECT * FROM company.employees");
employees.registerTempTable("employees");
SchemaRDD managers = cassandraContext.sql("SELECT name FROM employees WHERE role == 'Manager' ");
The returned RDD objects support the standard RDD operations.
List<String> managerNames = managers.map(new Function<Row, String>() {
public String call(Row row) {
return "Name: " + row.getString(0);
}
}).collect();