Cassandraへのデータの保存

DataStax Enterpriseを使用すると、ほぼあらゆるRDDをCassandraに保存できます。RDDをスタンドアローン・アプリケーションで使用する前に、com.datastax.spark.connectorをインポートしておいてください。

DataStax Enterpriseを使用すると、ほぼあらゆるRDDをCassandraに保存できます。カスタム・マッピングを指定しない限り、RDDのオブジェクト・クラスはタプルであるか、Cassandraカラム名に対応するプロパティ名を持たなければなりません。RDDを保存するには、キースペース名とテーブル名、さらに任意でカラム・リストを指定してsaveToCassandraメソッドを呼び出します。RDDをスタンドアローン・アプリケーションで使用する前に、com.datastax.spark.connectorをインポートしておいてください。

DataFrames APIを使用して、Spark内でデータを操作することもできます。

タプルのコレクションの保存

以下の例は、タプルのコレクションをCassandraに保存する方法を示しています。

scala> val collection = sc.parallelize(Seq(("cat", 30), ("fox", 40)))
collection:org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[6] at parallelize at console:22

scala> collection.saveToCassandra("test", "words", SomeColumns("word", "count"))

scala>            

この例の最後のscalaプロンプトで出力が生成されていないことは、データがCassandraに保存されたことを意味します。

cqlshで、wordsテーブルをクエリーしてすべての内容を選択します。

SELECT * FROM test.words;

word | count
------+-------
bar |    20
foo |    10
cat |    30
fox |    40

(4 rows)

caseクラス・オブジェクトのコレクションのCassandraへの保存

以下の例は、caseクラス・オブジェクトのコレクションを保存する方法を示しています。

scala> case class WordCount(word:String, count:Long)
defined class WordCount

scala> val collection = sc.parallelize(Seq(WordCount("dog", 50), WordCount("cow", 60)))
collection:org.apache.spark.rdd.RDD[WordCount] = ParallelCollectionRDD[0] at parallelize at console:24

scala> collection.saveToCassandra("test", "words", SomeColumns("word", "count"))

scala>
      

cqlshで、wordsテーブルをクエリーしてすべての内容を選択します。


SELECT * FROM test.words;

word | count
------+-------
bar |    20
foo |    10
cat |    30
fox |    40
dog |    50
cow |    60

プロパティ名とカラム名間のデフォルト以外のマッピングの使用

タプルおよびcaseクラスへの行のマッピングは、構成しなくてもそのまま機能しますが、CassandraとScala間のマッピングを制御する必要が生じる場合があります。たとえば、JavaクラスではJavaBeansの命名規則が使用されることが多いのに対し、アクセッサーの名前にはget、is、setの各プレフィックスが付きます。カラムとプロパティ間のマッピングをカスタマイズするには、適切なColumnMapper[YourClass]暗黙的オブジェクトを範囲に含めます。このオブジェクトの定義は、マッピング対象クラスのコンパニオン・オブジェクト内で行います。ColumnMapperはデータの読み込みと保存の両方に影響します。DataStax Enterpriseには、ColumnMapperの実装がいくつか含まれています。

JavaBeansの使用

Javaクラスを使用して作業するには、JavaBeanColumnMapperを使用します。オブジェクトがシリアライズ可能であることを確認してください。そうでないと、Sparkはオブジェクトをネットワーク経由で送信できません。以下の例は、JavaBeanColumnMapperの使用方法を示しています。

JavaBeanスタイル・アクセッサーを使用するには:

scala> :paste
// Entering paste mode (ctrl-D to finish)
      

以下のimportコマンドとclass定義をペーストします。

    import com.datastax.spark.connector.mapper.JavaBeanColumnMapper
class WordCount extends Serializable { 
private var _word:String = ""
private var _count:Int = 0
def setWord(word:String) { _word = word }
def setCount(count:Int) { _count = count }
override def toString = _word + ":" + _count
    }
object WordCount {
implicit object Mapper extends JavaBeanColumnMapper[WordCount] 
    } 

CTRL Dキーを押してペースト・モードを終了します。出力は以下のようになります。

    // Exiting paste mode, now interpreting.

import com.datastax.spark.connector.mapper.JavaBeanColumnMapper
defined class WordCount
defined module WordCount

scala>

WordCountオブジェクトをクエリーします。

sc.cassandraTable[WordCount]("test", "words").toArray
res18:Array[WordCount] = Array(cow:60, bar:20, foo:10, cat:30, fox:40, dog:50)

データを保存するには、ゲッターを定義する必要があります。

プロパティ名とカラム名間の関係の手動による指定

何らかの理由で名前の異なるプロパティとカラムを関連付ける必要が生じた場合は、カラム変換マップをDefaultColumnMapperまたはJavaBeanColumnMapperに渡します。

カラム名を変更するには:

scala> :paste
// Entering paste mode (ctrl-D to finish)

import com.datastax.spark.connector.mapper.DefaultColumnMapper
case class WordCount(w:String, c:Int)
object WordCount { implicit object Mapper extends
DefaultColumnMapper[WordCount](Map("w" -> "word", "c" -> "count")) }

CTRL Dキーを押します。

// Exiting paste mode, now interpreting.

import com.datastax.spark.connector.mapper.DefaultColumnMapper
defined class WordCount
defined module WordCount

続いて以下のコマンドを入力します。

scala> sc.cassandraTable[WordCount]("test", "words").toArray
res21:Array[WordCount] = Array(WordCount(cow,60), WordCount(bar,20), WordCount(foo,10), WordCount(cat,30), WordCount(fox,40), WordCount(dog,50))

scala> sc.parallelize(Seq(WordCount("bar",20),WordCount("foo",40))).saveToCassandra("test", "words", SomeColumns("word", "count"))

scala>

カスタムのColumnMapperの作成

クラスのカラム・マッピングを定義するには、ColumnMapper[YourClass]トレイトを実装する適切な暗黙的オブジェクトを作成します。