Spark Streamingの概要

Spark Streamingを使用すると、Akka、Kafka、Twitterなどのソースからの生データ・ストリームを消費できます。その後、これらのデータはSparkアプリケーションによって分析し、Cassandraに格納できます。この例ではScalaを使用します。

Spark Streamingを使用すると、Akka、Kafka、Twitterなどのソースからの生データ・ストリームを消費できます。その後、これらのデータはSparkアプリケーションによって分析し、Cassandraに格納できます。

Spark Streamingを使用するには、自分のSpark構成に基づいてorg.apache.spark.streaming.StreamingContextインスタンスを作成します。次に、入力ストリームを表すオブジェクトであるDStream離散ストリーム)インスタンスを作成します。DStreamオブジェクトを作成するには、StreamingContextのいずれかのメソッドを呼び出すか、外部ライブラリからのユーティリティ・クラスを使用してTwitterなどの他のソースに接続します。

消費して分析したデータをCassandraに保存するには、いずれかのsaveToCassandraメソッドをストリーム・オブジェクトに対して呼び出して、キースペース名とテーブル名、そしてオプションでカラム名とバッチ・サイズを渡します。

手順

以下のScalaの例は、特定のIPアドレスおよびポートでテキスト入力ストリームに接続して、ストリーム内の単語数をカウントし、結果をCassandraに保存する方法を示しています。

  1. 新しいStreamingContextオブジェクトを既存のSparkConf構成オブジェクトに基づいて作成します。この際、バッチの持続期間の値を渡すことによって、ストリーミング・データをバッチに分割する間隔を指定します。
    val sparkConf = ....
    val ssc = new StreamingContext(sc, Seconds(1)) // Uses the context automatically created by the spark shell
    Sparkでは、バッチの持続期間をミリ秒、秒、分の各単位で指定できます。
  2. StreamingContextDStream、RDDの各オブジェクトに関するCassandra特有の関数をインポートします。
    import com.datastax.spark.connector.streaming._
  3. データ・ストリームを提供するサービスのIPとポートに接続するDStreamオブジェクトを作成します。
    val lines = ssc.socketTextStream(server IP address, server port number)
  4. 各バッチの単語数をカウントし、データをCassandraテーブルに保存します。
    val words = lines.flatMap(_.split(" "))
    val pairs = words.map(word => (word, 1))
    val wordCounts = pairs.reduceByKey(_ + _)    
    .saveToCassandra("streaming_test", "words_table", SomeColumns("word", "count"))
  5. 計算を開始します。
    ssc.start()
    ssc.awaitTermination()

以下の例では、文字列を反復するncユーティリティを使用してサービスを開始してから、Spark Streamingを使用してそのサービスの出力を消費します。

cqlshを使用して、ストリーミング用の書き込み先ターゲットのキースペースおよびテーブルを作成することから始めます。
CREATE KEYSPACE IF NOT EXISTS streaming_test 
WITH REPLICATION = { 'class' :'SimpleStrategy', 'replication_factor' : 1 } 
CREATE TABLE IF NOT EXISTS streaming_test.words_table 
(word TEXT PRIMARY KEY, count COUNTER) session.execute(s 
"TRUNCATE streaming_test.words_table" 

ターミナル・ウィンドウで、以下のコマンドを入力してサービスを開始します。

nc -lk 9999
one two two three three three four four four four someword

別のターミナルで、Sparkシェルを起動します。

bin/dse spark

Sparkシェルで、以下のコマンドを入力します。

val ssc =  new StreamingContext(sc, Seconds(1)) 
val lines = ssc.socketTextStream( "localhost", 9999)
val words = lines.flatMap(_.split(" "))
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
wordCounts.saveToCassandra( "streaming_test",  "words_table", SomeColumns( "word",  "count"))
wordCounts.print()
ssc.start()
ssc.awaitTermination()
exit()

cqlshを使用してstreaming_testキースペースに接続し、クエリーを実行して結果を表示します。

cqlsh -k streaming_test
cqlsh:streaming_test> select * from words_table;

word | count
---------+-------
three |     3
one |     1
two |     2
four |     4
someword |     1

次のタスク

詳細情報、APIドキュメント、例については、『Spark Streaming Programming Guide』を参照してください。