Getting started with Spark Streaming
Spark Streaming allows you to consume live data streams from sources, including Akka, Kafka, and Twitter. This data can then be analyzed by Spark applications, and the data can be stored in the database. This example uses Scala.
You use Spark Streaming by creating an org.apache.spark.streaming.StreamingContext instance based on your Spark configuration. You then create a DStream instance, or a discretionized stream, an object that represents an input stream. DStream objects are created by calling one of the methods of StreamingContext, or using a utility class from external libraries to connect to other sources like Twitter.
The data you consume and analyze is saved to the database by calling one of the saveToCassandra methods on the stream object, passing in the keyspace name, the table name, and optionally the column names and batch size.
Procedure
-
Import the streaming context objects.
import org.apache.spark.streaming._
-
Create a new StreamingContext object based on an existing
SparkConf configuration object, specifying the interval
in which streaming data will be divided into batches by passing in a batch
duration.
val sparkConf = .... val ssc = new StreamingContext(sc, Seconds(1)) // Uses the context automatically created by the spark shell
Spark allows you to specify the batch duration in milliseconds, seconds, and minutes. -
Import the database-specific functions for StreamingContext,
DStream, and RDD objects.
import com.datastax.spark.connector.streaming._
-
Create the DStream object that will connect to the IP and
port of the service providing the data stream.
val lines = ssc.socketTextStream(server IP address, server port number)
-
Count the words in each batch and save the data to the table.
val words = lines.flatMap(_.split(" ")) val pairs = words.map(word => (word, 1)) val wordCounts = pairs.reduceByKey(_ + _) .saveToCassandra("streaming_test", "words_table", SomeColumns("word", "count"))
-
Start the computation.
ssc.start() ssc.awaitTermination()
Example
In the following example, you start a service using the nc
utility
that repeats strings, then consume the output of that service using Spark
Streaming.
CREATE KEYSPACE IF NOT EXISTS streaming_test WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1 }; CREATE TABLE IF NOT EXISTS streaming_test.words_table (word TEXT PRIMARY KEY, count COUNTER);
In a terminal window, enter the following command to start the service:
nc -lk 9999 one two two three three three four four four four someword
In a different terminal start a Spark shell.
dse spark
In the Spark shell enter the following:
import org.apache.spark.streaming._ import com.datastax.spark.connector.streaming._ val ssc = new StreamingContext(sc, Seconds(1)) val lines = ssc.socketTextStream( "localhost", 9999) val words = lines.flatMap(_.split( " ")) val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _) wordCounts.saveToCassandra( "streaming_test", "words_table", SomeColumns( "word", "count")) wordCounts.print() ssc.start() ssc.awaitTermination() exit()
Using cqlsh
connect to the streaming_test
keyspace and run a query to show the results.
cqlsh -k streaming_test
select * from words_table;
word | count ---------+------- three | 3 one | 1 two | 2 four | 4 someword | 1
What's next
Run the http_receiver demo. See the Spark Streaming Programming Guide for more information, API documentation, and examples on Spark Streaming.