DSEへのRDDデータの保存

DataStax Enterpriseを使用すると、ほぼあらゆるRDDをデータベースに保存できます。RDDをスタンドアローン・アプリケーションで使用する前に、com.datastax.spark.connectorをインポートします。

DataStax Enterpriseを使用すると、ほぼあらゆるRDDをデータベースに保存できます。カスタム・マッピングを指定しない限り、RDDのオブジェクト・クラスはタプルであるか、テーブル・カラム名に対応するプロパティ名を持つ必要があります。RDDを保存するには、キースペース名とテーブル名、さらに任意でカラムのリストを指定してsaveToCassandraメソッドを呼び出します。RDDをスタンドアローン・アプリケーションで使用する前に、com.datastax.spark.connectorをインポートしておいてください。

DataFrames APIを使用して、Spark内でデータを操作することもできます。

タプルのコレクションの保存

以下の例は、タプルのコレクションをデータベースに保存する方法を示しています。

val collection = sc.parallelize(Seq(("cat", 30), ("fox", 40)))
collection: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[6] at parallelize at console:22
collection.saveToCassandra("test", "words", SomeColumns("word", "count"))

この例の最後のScalaプロンプトで出力が生成されていないことは、データがデータベースに保存されたことを意味します。

cqlshで、wordsテーブルをクエリーしてすべての内容を選択します。

SELECT * FROM test.words;
 word | count
------+-------
  bar |    20
  foo |    10
  cat |    30
  fox |    40

(4 rows)

caseクラス・オブジェクトのコレクションのデータベースへの保存

以下の例は、caseクラス・オブジェクトのコレクションを保存する方法を示しています。

case class WordCount(word: String, count: Long)
val collection = sc.parallelize(Seq(WordCount("dog", 50), WordCount("cow", 60)))
collection.saveToCassandra("test", "words", SomeColumns("word", "count"))

cqlshで、wordsテーブルをクエリーしてすべての内容を選択します。

SELECT * FROM test.words;
 word | count
------+-------
 bar |    20
 foo |    10
 cat |    30
 fox |    40
 dog |    50
 cow |    60

プロパティ名とカラム名間のデフォルト以外のマッピングの使用

タプルおよびcaseクラスへの行のマッピングは、構成しなくてもそのまま機能しますが、データベースとScala間のマッピングを制御する必要が生じる場合があります。たとえば、JavaクラスではJavaBeansの命名規則が使用されることが多いのに対し、アクセッサーの名前にはgetissetの各プレフィックスが付きます。カラムとプロパティ間のマッピングをカスタマイズするには、適切なColumnMapper[YourClass]暗黙的オブジェクトを範囲に含めます。このオブジェクトの定義は、マッピング対象クラスのコンパニオン・オブジェクト内で行います。ColumnMapperはデータの読み込みと保存の両方に影響します。DataStax Enterpriseには、ColumnMapperの実装がいくつか含まれています。

JavaBeansの使用

Javaクラスを使用して作業するには、JavaBeanColumnMapperを使用します。オブジェクトがシリアライズ可能であることを確認してください。そうでないと、Sparkはオブジェクトをネットワーク経由で送信できません。以下の例は、JavaBeanColumnMapperの使用方法を示しています。

JavaBeansスタイル・アクセッサーを使用するには:

:paste
// Entering paste mode (ctrl-D to finish)
      

以下のimportコマンドとclass定義をペーストします。

import com.datastax.spark.connector.mapper.JavaBeanColumnMapper
    class WordCount extends Serializable { 
      private var _word: String = ""
      private var _count: Int = 0
      def setWord(word: String) { _word = word }
      def setCount(count: Int) { _count = count }
      override def toString = _word + ":" + _count
    }
    object WordCount {
      implicit object Mapper extends JavaBeanColumnMapper[WordCount] 
    } 

CTRL Dキーを押してペースト・モードを終了します。出力は以下のようになります。

// Exiting paste mode, now interpreting.

    import com.datastax.spark.connector.mapper.JavaBeanColumnMapper
    defined class WordCount
    defined module WordCount

    

WordCountオブジェクトをクエリーします。

sc.cassandraTable[WordCount]("test", "words").toArray

データを保存するには、ゲッターを定義する必要があります。

プロパティ名とカラム名間の関係の手動による指定

何らかの理由で名前の異なるプロパティとカラムを関連付ける必要が生じた場合は、カラム変換マップをDefaultColumnMapperまたはJavaBeanColumnMapperに渡します。

カラム名を変更するには:

:paste
// Entering paste mode (ctrl-D to finish)

import com.datastax.spark.connector.mapper.DefaultColumnMapper
case class WordCount(w: String, c: Int)
object WordCount { implicit object Mapper extends
DefaultColumnMapper[WordCount](Map("w" -> "word", "c" -> "count")) }

CTRL Dキーを押します。

// Exiting paste mode, now interpreting.

import com.datastax.spark.connector.mapper.DefaultColumnMapper
defined class WordCount
defined module WordCount

続いて以下のコマンドを入力します。

sc.cassandraTable[WordCount]("test", "words").toArray
sc.parallelize(Seq(WordCount("bar",20),WordCount("foo",40))).saveToCassandra("test", "words", SomeColumns("word", "count"))

カスタムのColumnMapperの作成

クラスのカラム・マッピングを定義するには、ColumnMapper[YourClass]トレイトを実装する適切な暗黙的オブジェクトを作成します。