Sparkストリーミングの概要
Sparkストリーミングを使用すると、Akka、Kafka、Twitterなどのソースからの生データ・ストリームを消費できます。その後、これらのデータはSparkアプリケーションによって分析され、データベースに格納できます。この例では、Scalaを使用します。
Sparkストリーミングを使用するには、自分のSpark構成に基づいてorg.apache.spark.streaming.StreamingContextインスタンスを作成します。次に、入力ストリームを表すオブジェクトであるDStream(任意化ストリーム)インスタンスを作成します。DStreamオブジェクトを作成するには、StreamingContextのいずれかのメソッドを呼び出すか、外部ライブラリからのユーティリティ・クラスを使用してTwitterなどの他のソースに接続します。
消費して分析したデータをデータベースに保存するには、いずれかのsaveToCassandraメソッドをストリーム・オブジェクトに対して呼び出して、キースペース名とテーブル名、そしてオプションでカラム名とバッチ・サイズを渡します。
手順
-
ストリーミング・コンテキスト・オブジェクトをインポートします。
import org.apache.spark.streaming._
-
新しいStreamingContextオブジェクトを既存のSparkConf構成オブジェクトに基づいて作成します。この際、バッチの持続期間の値を渡すことによって、ストリーミング・データをバッチに分割する間隔を指定します。
val sparkConf = .... val ssc = new StreamingContext(sc, Seconds(1)) // Uses the context automatically created by the spark shell
Sparkでは、バッチの持続期間をミリ秒、秒、分の各単位で指定できます。 -
StreamingContext、DStream、RDDの各オブジェクトに関するデータベース特有の関数をインポートします。
import com.datastax.spark.connector.streaming._
-
データ・ストリームを提供するサービスのIPとポートに接続するDStreamオブジェクトを作成します。
val lines = ssc.socketTextStream(server IP address, server port number)
-
各バッチの単語数をカウントし、データをテーブルに保存します。
val words = lines.flatMap(_.split(" ")) val pairs = words.map(word => (word, 1)) val wordCounts = pairs.reduceByKey(_ + _) .saveToCassandra("streaming_test", "words_table", SomeColumns("word", "count"))
-
計算を開始します。
ssc.start() ssc.awaitTermination()
例
以下の例では、文字列を反復するnc
ユーティリティを使用してサービスを開始してから、Sparkストリーミングを使用してそのサービスの出力を消費します。
CREATE KEYSPACE IF NOT EXISTS streaming_test WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1 }; CREATE TABLE IF NOT EXISTS streaming_test.words_table (word TEXT PRIMARY KEY, count COUNTER);
ターミナル・ウィンドウで、以下のコマンドを入力してサービスを開始します。
nc -lk 9999 one two two three three three four four four four someword
別のターミナルで、Sparkシェルを起動します。
dse spark
Sparkシェルで、以下のコマンドを入力します。
import org.apache.spark.streaming._ import com.datastax.spark.connector.streaming._ val ssc = new StreamingContext(sc, Seconds(1)) val lines = ssc.socketTextStream( "localhost", 9999) val words = lines.flatMap(_.split( " ")) val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _) wordCounts.saveToCassandra( "streaming_test", "words_table", SomeColumns( "word", "count")) wordCounts.print() ssc.start() ssc.awaitTermination() exit()
cqlsh
を使用してstreaming_test
キースペースに接続し、クエリーを実行して結果を表示します。
cqlsh -k streaming_test
select * from words_table;
word | count ---------+------- three | 3 one | 1 two | 2 four | 4 someword | 1
次のタスク
http_receiver demoを実行します。Sparkストリーミングの詳細情報、APIドキュメント、例については、『Spark Streaming Programming Guide』を参照してください。