Getting started with Spark Streaming

Spark Streaming allows you to consume live data streams from sources, including Akka, Kafka, and Twitter. This data can then be analyzed by Spark applications, and the data can be stored in Cassandra. This example uses Scala.

Spark Streaming allows you to consume live data streams from sources, including Akka, Kafka, and Twitter. This data can then be analyzed by Spark applications, and the data can be stored in Cassandra.

You use Spark Streaming by creating a org.apache.spark.streaming.StreamingContext instance based on your Spark configuration. You then create a DStream instance, or a discretionized stream, an object that represents an input stream. DStream objects are created by calling one of the methods of StreamingContext, or using a utility class from external libraries to connect to other sources like Twitter.

The data you consume and analyze is saved to Cassandra by calling one of the saveToCassandra methods on the stream object, passing in the keyspace name, the table name, and optionally the column names and batch size.

Checkpointing with Spark Streaming

When internal DataStax Enterprise authentication and checkpointing are on, checkpointing uses the Hadoop configuration that is separate from the configuration values in the spark-env.sh Spark configuration file that are passed into the application. DataStax Enterprise uses all of the default generated Hadoop xml files in /dse/resources/hadoop/conf. You can manually add properties to the Hadoop configuration files for parameters that are dependent on the submitted application. For example, to pass the Cassandra parameters for password authentication from the Spark configuration to the Hadoop configuration that is used for checkpointing:
Configuration hadoopConf = new Configuration();
    hadoopConf.set("cassandra.username", sparkConf.get("cassandra.username"));
    hadoopConf.set("cassandra.password", sparkConf.get("cassandra.password"));
    return JavaStreamingContext.getOrCreate(CHECKPOINT_DIR, hadoopConf, contextFactory);

Procedure

The following Scala example demonstrates how to connect to a text input stream at a particular IP address and port, count the words in the stream, and save the results to Cassandra.

  1. Create a new StreamingContext object based on an existing SparkConf configuration object, specifying the interval in which streaming data will be divided into batches by passing in a batch duration.
    val sparkConf = ....
    val ssc = new StreamingContext(sparkConf, Seconds(1))
    Spark allows you to specify the batch duration in milliseconds, seconds, and minutes.
  2. Import the Cassandra-specific functions for StreamingContext, DStream, and RDD objects.
    import com.datastax.spark.connector.streaming._
  3. Create the DStream object that will connect to the IP and port of the service providing the data stream.
    val lines = ssc.socketTextStream(server IP address, server port number)
  4. Count the words in each batch and save the data to the Cassandra table.
    val words = lines.flatMap(_.split(" "))
    val pairs = words.map(word => (word, 1))
    val wordCounts = pairs.reduceByKey(_ + _)
                          .saveToCassandra("streaming_test", "words_table", SomeColumns("word", "count"))
  5. Start the computation.
    ssc.start()
    ssc.awaitTermination()

Example

In the following example, you will start a service using the nc utility that repeats strings, then consume the output of that service using Spark Streaming.

In a terminal window, enter the following command to start the service:

$ nc -lk 9999
one two two three three three four four four four someword

In a different terminal start a Spark shell.

$ bin/dse spark

In the Spark shell enter the following:

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import com.datastax.spark.connector.streaming._
import com.datastax.spark.connector.cql.CassandraConnector

val conf =  new SparkConf().setMaster( "local[2]").setAppName( "NetworkWordCount")

val ssc =  new StreamingContext(conf, Seconds(1))

val lines = ssc.socketTextStream( "localhost", 9999)

val words = lines.flatMap(_.split( " "))

val pairs = words.map(word => (word, 1))


CassandraConnector(conf).withSessionDo { session =>
     |   session.execute(s "CREATE KEYSPACE IF NOT EXISTS streaming_test WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1 }")
     |   session.execute(s "CREATE TABLE IF NOT EXISTS streaming_test.words_table (word TEXT PRIMARY KEY, count COUNTER)")
     |   session.execute(s "TRUNCATE streaming_test.words_table")
     | }

val wordCounts = pairs.reduceByKey(_ + _)

wordCounts.saveToCassandra( "streaming_test",  "words_table", SomeColumns( "word",  "count"))

wordCounts.print()

ssc.start()

ssc.awaitTermination()
exit()

Using cqlsh connect to the streaming_test keyspace and run a query to show the results.

$ cqlsh -k streaming_test
cqlsh:streaming_test> select * from words_table;

 word    | count
---------+-------
   three |     3
     one |     1
     two |     2
    four |     4
someword |     1

What's next

See the Spark Streaming Programming Guide for more information, API documentation, and examples.