Using the Cassandra context

An alternative to the Spark context for creating a CassandraRDD is the Cassandra context (cc). The Cassandra context is an object that offers handy methods for browsing the Cassandra schema through auto-completion.

An alternative to the Spark context for creating a CassandraRDD is the Cassandra context (cc). The Cassandra context is an object that offers handy methods for browsing the Cassandra schema through auto-completion. By browsing the schema, you can get the RDD of the right table. The Cassandra context also contains appropriate case classes defined for all the user tables. The Cassandra context offers no additional capabilities over the Spark context and is simply an alternative collection of helpers and shortcuts. By default Cassandra context files are generated to ~/.spark/cassandra-context. The Cassandra context is deprecated and might be modified or removed in the future.

To generate the Cassandra context, startup the Spark shell:
$ dse spark-with-cc

To query Cassandra, use cc. followed by the Scala Java object, for example the Cassandra keyspace, table, and column name. The cc represents the CassandraContext instance.

cc.<keyspace>.<table>.(rdd | genericRdd | newRow | write)
rdd
Returns RDD of case class instances associated with the corresponding table.
genericRdd
Returns CassandraRDD[CassandraRow] for the corresponding table - just the same as sc.cassandraTable(<keyspace>, <table>).
newRow
Creates a new instance of the case class associated with the corresponding table.
write(<rdd>)
Saves the given (as a parameter) rdd into the corresponding table - this is just the same as rdd.saveToCassandra(<keyspace>, <table>).

Examples

scala> val samples = sc.parallelize(Seq(cc.test.words.newRow("Data", Some(5)),cc.test.words.newRow("Stax", Some(10)), cc.test.words.newRow("Enterprise", Some(15))))
samples: org.apache.spark.rdd.RDD[com.datastax.bdp.spark.cassandra.test$words$Row] = ParallelCollectionRDD[0] at parallelize at <console>:24

scala> cc.test.words.write(samples)   
   
scala> cc.test.words.rdd.toArray.foreach(println)
test.words(word=bar, count=Some(20))
test.words(word=Data, count=Some(5))
test.words(word=foo, count=Some(10))
test.words(word=cat, count=Some(30))
test.words(word=Enterprise, count=Some(15))
test.words(word=Stax, count=Some(10))
test.words(word=fox, count=Some(40))
test.words(word=dog, count=Some(50))
test.words(word=cow, count=Some(60))

The Scala method is the name of a method, such as count, get, or take, as shown in the following example.

Example: Querying Cassandra

This example shows how to query the portfolio manager database using the Cassandra context. This example uses several portfolio manager demo tables that you need to load into Cassandra by running the pricer scripts in the demos/portfolio_manager/bin directory. The example in Getting started with Shark steps through running these scripts to set up the database.
  1. Start the Spark shell and generate the Cassandra context.
    $ dse spark-with-cc
  2. In the Spark shell, type cc. and press the TAB key to list non-system keyspaces.

    The output includes hints to use API methods and Cassandra keyspaces if you started the node as Spark.

    scala> cc.
    HiveMetaStore   PortfolioDemo   asInstanceOf    isInstanceOf    test   toString
  3. Add PortfolioDemo. to the entry, and press the TAB key.

    The output is a list of tables in the PortfolioDemo keyspace:

    scala> cc.PortfolioDemo.
    HistLoss       Portfolios     StockHist      Stocks         asInstanceOf   
      isInstanceOf   toString
  4. Add Stocks. to the entry and press the TAB key. The output is the list of available methods, which were previously described.
    scala> scala> cc.PortfolioDemo.Stocks.
    asInstanceOf   genericRdd     isInstanceOf   newRow         rdd            toString       write

    You can invoke these methods on the object that represents the Cassandra table.

  5. Query the PortfolioDemo keyspace using the Stocks method. DataStax Enterprise provides a scala method for each table in the keyspace:
    scala> cc.PortfolioDemo.Stocks
    res4: com.datastax.bdp.spark.cassandra.PortfolioDemo$Stocks$Table 
    = com.datastax.bdp.spark.cassandra.PortfolioDemo$Stocks$Table@4b94ba59
  6. Create an RDD from the Cassandra table having a generic row mapping.
    scala> cc.PortfolioDemo.Stocks.genericRdd
    res5: com.datastax.bdp.spark.rdd.CassandraRDD[com.datastax.spark.connector.CassandraRow] =
     CassandraRDD[0] at RDD at CassandraRDD.scala:47
  7. Use tab-completion to get help about how to create an instance of a dedicated case class for the table. Enter this line, but do not press RETURN:
    scala> cc.PortfolioDemo.Stocks.newRow
  8. Press the TAB key. You see the help for creating the instance.
    def newRow(key: String, column1: String, value: Option[Double]): PortfolioDemo$Stocks$Row
  9. Create the instance of a dedicated case class for the table.
    scala> cc.PortfolioDemo.Stocks.newRow("some key", "some column1", Some(3.3d))
    res7: com.datastax.bdp.spark.cassandra.PortfolioDemo$Stocks$Row = PortfolioDemo.Stocks(key=some key, 
    column1=some column1, value=Some(3.3))
  10. Select the first row of data in the Stocks table.
    scala> cc.PortfolioDemo.Stocks.rdd.first
    res8: com.datastax.bdp.spark.cassandra.PortfolioDemo$Stocks$Row = PortfolioDemo.Stocks(key=DKK, 
    column1=price, value=Some(43.23847112571667))
  11. Query the Stocks table to select the first 3 rows.
    scala> cc.PortfolioDemo.Stocks.rdd.take(3)
    res9: Array[com.datastax.bdp.spark.cassandra.PortfolioDemo$Stocks$Row] = Array(PortfolioDemo.Stocks
    (key=DKK, column1=price, value=Some(43.23847112571667)), PortfolioDemo.Stocks(key=GLP, column1=price, 
    value=Some(3.6448786678725864)), PortfolioDemo.Stocks(key=PCL, column1=price, value=Some(30.152568555524205)))
  12. Query the Stocks table to get the key value of the first row.
    scala> cc.PortfolioDemo.Stocks.rdd.first.key
    res10: String = DKK
  13. Query the Stocks table to get the contents of the first row of the value column.
    scala> cc.PortfolioDemo.Stocks.rdd.first.value.get
    res11: Double = 43.23847112571667

Refreshing the Cassandra context 

The :refreshCC command refreshes the Cassandra context without restarting the Spark shell. Refreshing the Cassandra context regenerates Cassandra context classes. The refresh works incrementally, only regenerating the classes based on schema objects that changed. You can select a particular keyspace or keyspace and table to refresh the related classes. The syntax is:

:refreshCC <keyspace> <table>
Refreshes classes and dependent classes for the given table only.
:refreshCC <keyspace>
Refreshes classes for the given keyspace only. Intended to be used after refreshing table classes for the keyspace.
:refreshCC
Refreshes all Cassandra context classes.