Spark Streamingの概要
Spark Streamingを使用すると、Akka、Kafka、Twitterなどのソースからの生データ・ストリームを消費できます。その後、これらのデータはSparkアプリケーションによって分析し、Cassandraに格納できます。この例ではScalaを使用します。
Spark Streamingを使用するには、自分のSpark構成に基づいてorg.apache.spark.streaming.StreamingContextインスタンスを作成します。次に、入力ストリームを表すオブジェクトであるDStream(離散ストリーム)インスタンスを作成します。DStreamオブジェクトを作成するには、StreamingContextのいずれかのメソッドを呼び出すか、外部ライブラリからのユーティリティ・クラスを使用してTwitterなどの他のソースに接続します。
消費して分析したデータをCassandraに保存するには、いずれかのsaveToCassandraメソッドをストリーム・オブジェクトに対して呼び出して、キースペース名とテーブル名、そしてオプションでカラム名とバッチ・サイズを渡します。
手順
以下のScalaの例は、特定のIPアドレスおよびポートでテキスト入力ストリームに接続して、ストリーム内の単語数をカウントし、結果をCassandraに保存する方法を示しています。
例
以下の例では、文字列を反復するnc
ユーティリティを使用してサービスを開始してから、Spark Streamingを使用してそのサービスの出力を消費します。
CREATE KEYSPACE IF NOT EXISTS streaming_test
WITH REPLICATION = { 'class' :'SimpleStrategy', 'replication_factor' : 1 }
CREATE TABLE IF NOT EXISTS streaming_test.words_table
(word TEXT PRIMARY KEY, count COUNTER) session.execute(s
"TRUNCATE streaming_test.words_table"
ターミナル・ウィンドウで、以下のコマンドを入力してサービスを開始します。
nc -lk 9999 one two two three three three four four four four someword
別のターミナルで、Sparkシェルを起動します。
bin/dse spark
Sparkシェルで、以下のコマンドを入力します。
val ssc = new StreamingContext(sc, Seconds(1))
val lines = ssc.socketTextStream( "localhost", 9999)
val words = lines.flatMap(_.split(" "))
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
wordCounts.saveToCassandra( "streaming_test", "words_table", SomeColumns( "word", "count"))
wordCounts.print()
ssc.start()
ssc.awaitTermination()
exit()
cqlsh
を使用してstreaming_test
キースペースに接続し、クエリーを実行して結果を表示します。
cqlsh -k streaming_test cqlsh:streaming_test> select * from words_table; word | count ---------+------- three | 3 one | 1 two | 2 four | 4 someword | 1
次のタスク
詳細情報、APIドキュメント、例については、『Spark Streaming Programming Guide』を参照してください。