Spark Streamingの概要

Spark Streamingを使用すると、Akka、Kafka、Twitterなどのソースからの生データ・ストリームを消費できます。その後、これらのデータはSparkアプリケーションによって分析され、データベースに格納できます。この例では、Scalaを使用します。

Sparkストリーミングを使用すると、Akka、Kafka、Twitterなどのソースからの生データ・ストリームを消費できます。その後、これらのデータはSparkアプリケーションによって分析され、データベースに格納できます。

Sparkストリーミングを使用するには、自分のSpark構成に基づいてorg.apache.spark.streaming.StreamingContextインスタンスを作成します。次に、入力ストリームを表すオブジェクトであるDStream(任意化ストリーム)インスタンスを作成します。DStreamオブジェクトを作成するには、StreamingContextのいずれかのメソッドを呼び出すか、外部ライブラリからのユーティリティ・クラスを使用してTwitterなどの他のソースに接続します。

消費して分析したデータをデータベースに保存するには、いずれかのsaveToCassandraメソッドをストリーム・オブジェクトに対して呼び出して、キースペース名とテーブル名、そしてオプションでカラム名とバッチ・サイズを渡します。

注: Sparkストリーミング・アプリケーションでは、適切に操作するためにクロックを同期化する必要があります。「クロックの同期化」を参照してください。

手順

以下のScalaの例は、特定のIPアドレスおよびポートでテキスト入力ストリームに接続して、ストリーム内の単語数をカウントし、結果をデータベースに保存する方法を示しています。

  1. 新しいStreamingContextオブジェクトを既存のSparkConf構成オブジェクトに基づいて作成します。この際、バッチの持続期間の値を渡すことによって、ストリーミング・データをバッチに分割する間隔を指定します。
    val sparkConf = ....
    val ssc = new StreamingContext(sc, Seconds(1)) // Uses the context automatically created by the spark shell
    Sparkでは、バッチの持続期間をミリ秒、秒、分の各単位で指定できます。
  2. StreamingContextDStream、RDDの各オブジェクトに関するデータベース特有の関数をインポートします。
    import com.datastax.spark.connector.streaming._
  3. データ・ストリームを提供するサービスのIPとポートに接続するDStreamオブジェクトを作成します。
    val lines = ssc.socketTextStream(server IP address, server port number)
  4. 各バッチの単語数をカウントし、データをテーブルに保存します。
    val words = lines.flatMap(_.split(" "))
    val pairs = words.map(word => (word, 1))
    val wordCounts = pairs.reduceByKey(_ + _)    
        .saveToCassandra("streaming_test", "words_table", SomeColumns("word", "count"))
  5. 計算を開始します。
    ssc.start()
    ssc.awaitTermination()

以下の例では、文字列を反復するncユーティリティを使用してサービスを開始してから、Sparkストリーミングを使用してそのサービスの出力を消費します。

cqlshを使用して、書き込み情報にストリーミングするために、最初にターゲット・キースペースとテーブルを作成します。
CREATE KEYSPACE IF NOT EXISTS streaming_test 
WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1 } 
CREATE TABLE IF NOT EXISTS streaming_test.words_table 
(word TEXT PRIMARY KEY, count COUNTER) session.execute(s 
"TRUNCATE streaming_test.words_table" 

ターミナル・ウィンドウで、以下のコマンドを入力してサービスを開始します。

$ nc -lk 9999 one two two three three three four four four four someword

別のターミナルで、Sparkシェルを起動します。

$ bin/dse spark

Sparkシェルで、以下のコマンドを入力します。

val ssc =  new StreamingContext(sc, Seconds(1)) 
val lines = ssc.socketTextStream( "localhost", 9999)
val words = lines.flatMap(_.split( " "))
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
wordCounts.saveToCassandra( "streaming_test",  "words_table", SomeColumns( "word",  "count"))
wordCounts.print()
ssc.start()
ssc.awaitTermination()
exit()

cqlshを使用してstreaming_testキースペースに接続し、クエリーを実行して結果を表示します。

$ cqlsh -k streaming_test cqlsh:streaming_test> select * from words_table; word | count ---------+------- three | 3 one | 1 two | 2 four | 4 someword | 1

次のタスク

詳細情報、APIドキュメント、例については、『Spark Streaming Programming Guide』を参照してください。