Sparkコンテキストの使用

データベース・テーブルを表すSpark RDDを取得するには、scドット(sc.)構文を使用してcassandraTableメソッドをSparkコンテキストに対して呼び出すことによって、テーブルのデータをSparkに読み込みます。

注: DSE 5.1以降では、Sparkアプリケーションのエントリー・ポイントはSparkSessionオブジェクトです。Sparkコンテキストの使用は廃止予定で、今後のリリースでは除外される可能性があります。

廃止予定のコンテキスト・オブジェクトにアクセスするには、spark.sparkContextを呼び出します。

val sc = spark.sparkContext

データベース・テーブルを表すSpark RDDを取得するには、scドット(sc.)構文を使用してcassandraTableメソッドをSparkコンテキストに対して呼び出すことによって、テーブルのデータをSparkに読み込みます。scは、Spark API SparkContextクラスを表します。

sc.cassandraTable ( "keyspace", "table name" )

デフォルトでは、DSE Sparkシェルはscオブジェクトを作成します。Sparkコンテキストは、spark.sparkContextを呼び出して、Sparkシェル内のSpark sessionオブジェクトから手動で取得できます。

val sc = spark.sparkContext()

データはScalaオブジェクトにマッピングされ、DataStax EnterpriseによってCassandraRDD[CassandraRow]が返されます。DataStax Enterpriseの外部で実行されるアプリケーションを作成するためにSpark APIを使用するには、com.datastax.spark.connector.SparkContextCassandraFunctionsをインポートします。

以下の例は、テーブルをSparkに読み込んで、テーブルをSparkから読み取る方法を示しています。

  1. cqlshを使用して、以下のキースペースとテーブルを作成します。キースペースの作成には、Analyticsデータ・センターを使用します。
    CREATE KEYSPACE test WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'Analytics' : 1};
    
    CREATE TABLE test.words (word text PRIMARY KEY, count int);

    この例では、単一ノード・クラスターをSparkモードで起動することを前提としています。

  2. データをwordsテーブルに読み込みます。
    INSERT INTO test.words (word, count) VALUES ('foo', 10);
    INSERT INTO test.words (word, count) VALUES ('bar', 20);
  3. ノードをSparkモードで起動してあるという前提で、Sparkシェルを起動します。シェルを起動するためにsudoを使用しないでください。
    bin/dse spark

    「Welcome to Spark」という出力とプロンプトが表示されます。

  4. showSchemaコマンドを使用して、userキースペースおよびテーブルを表示します。
    :showSchema

    すべてのuserキースペースに関する情報が表示されます。

    ======================================== Keyspace: HiveMetaStore ======================================== Table: MetaStore ---------------------------------------- - key : String (partition key column) - entity : String (clustering column) - value : java.nio.ByteBuffer ======================================== Keyspace: test ======================================== Table: words ---------------------------------------- - word : String (partition key column) - count : Int scala> :showSchema test ======================================== Keyspace: test ======================================== Table: words ---------------------------------------- - word : String (partition key column) - count : Int scala> :showSchema test words ======================================== Keyspace: test ======================================== Table: words ---------------------------------------- - word : String (partition key column) - count : Int
  5. testキースペースのみに関する情報を取得します。
    :showSchema test
    
    ======================================== Keyspace: test ======================================== Table: words ---------------------------------------- - word : String (partition key column) - count : Int
  6. wordsテーブルに関する情報を取得します。
    :showSchema test words
    
    ======================================== Keyspace: test ======================================== Table: words ---------------------------------------- - word : String (partition key column) - count : Int
  7. test.wordsテーブルのデータを参照するベースRDDを定義します。
    val rdd = sc.cassandraTable("test", "words")
    
    rdd: com.datastax.spark.connector.rdd.CassandraRDD[com.datastax.spark.connector.CassandraRow] = CassandraRDD[0] at RDD at CassandraRDD.scala:47

    RDDがrdd値で返されます。テーブルを読み取るには、このコマンドを使用します。

    rdd.toArray.foreach(println)
    
    CassandraRow{word: bar, count: 20} CassandraRow{word: foo, count: 10}

返されたRDDに対するメソッドを使用して、test.wordsテーブルをクエリーできるようになりました。

cassandraTablesを読み込むためのPythonサポート

Pythonでは、Sparkストリーミング・コンテキストからのcassandraTablesの読み込みと、データベースへのDStreamの保存をサポートしています。

カラム値の読み取り

CassandraRowオブジェクトのgetメソッドを使用して、テーブルのカラムを読み取ることができます。getメソッドは、カラム名またはカラム・インデックスに基づいて、個々のカラム値にアクセスします。型変換はその場で適用されます。null値が返されることが予想される場合は、getOptionバリアントを使用します。

前の例をそのまま使用し、以下の手順に従って個々のカラム値にアクセスします。
  1. RDDの最初の項目をfirstRow値に格納します。
    val firstRow = rdd.first
    firstRow: com.datastax.spark.connector.CassandraRow = CassandraRow{word: foo, count: 10}
  2. カラム名を取得します。
    rdd.columnNames
    
    res3: com.datastax.spark.connector.ColumnSelector = AllColumns
  3. 一般的なgetを使用し、戻り値の型を直接渡してテーブルをクエリーします。
    firstRow.get[Int]("count")
    
    res4: Int = 10
    firstRow.get[Long]("count")
    
    res5: Long = 10
    firstRow.get[BigInt]("count")
    
    res6: BigInt = 10
    firstRow.get[java.math.BigInteger]("count")
    
    res7: java.math.BigInteger = 10
    firstRow.get[Option[Int]]("count")
    
    res8: Option[Int] = Some(10)
    firstRow.get[Option[BigInt]]("count")
    
    res9: Option[BigInt] = Some(10)

コレクションの読み取り

CassandraRowオブジェクトのgetメソッドを使用して、テーブルのコレクション・カラムを読み取ることができます。getメソッドは、コレクション・カラムにアクセスし、対応するScalaコレクションを返します。

testキースペースが既にセットアップされているという前提で、以下の手順に従ってコレクションにアクセスします。

  1. testキースペースで、cqlshを使用してコレクション・セットをセットアップします。
    CREATE TABLE test.users (
      username text PRIMARY KEY, emails SETtext);
    
    INSERT INTO test.users (username, emails) 
      VALUES ('someone', {'someone@email.com', 's@email.com'});
  2. Sparkが実行されていない場合は、Sparkシェルを起動します。シェルを起動するためにsudoを使用しないでください。
    bin/dse spark

    「Welcome to Spark」という出力とプロンプトが表示されます。

  3. コレクション・セットにアクセスするためのCassandraRDD[CassandraRow]を定義します。
    val row = sc.cassandraTable("test", "users").toArray.apply(0)
    
    row: com.datastax.spark.connector.CassandraRow = CassandraRow{username: someone, emails: {s@email.com,someone@email.com}}
  4. コレクション・セットをSparkからクエリーします。
    row.getList[String]("emails")
    
    res2: Vector[String] = Vector(s@email.com, someone@email.com)
    row.get[List[String]]("emails")
    
    res3: List[String] = List(s@email.com, someone@email.com)
    row.get[Seq[String]]("emails")
    
    res4: Seq[String] = List(s@email.com, someone@email.com)
    row.get[IndexedSeq[String]]("emails")
    
    res5: IndexedSeq[String] = Vector(s@email.com, someone@email.com)
    row.get[Set[String]]("emails")
    
    res6: Set[String] = Set(s@email.com, someone@email.com)
    row.get[String]("emails")
    
    res7: String = {s@email.com,someone@email.com}

フェッチ対象カラムの数の制限

パフォーマンス上の理由のため、不要なカラムはフェッチ対象から除外する必要があります。これを行うには、selectメソッドを使用します。

フェッチ対象カラムの数を制限するには:

val row = sc.cassandraTable("test", "users").select("username").toArray
row: Array[com.datastax.spark.connector.CassandraRow] = Array(CassandraRow{username: someone})

行とタプルおよびcaseクラス間のマッピング

行をCassandraRowクラスのオブジェクトにマッピングする代わりに、必要な型のタプルにカラム値を直接ラップ解除することができます。

行をタプルにマッピングするには:

sc.cassandraTable[(String, Int)]("test", "words").select("word", "count").toArray
res9: Array[(String, Int)] = Array((bar,20), (foo,10))
sc.cassandraTable[(Int, String)]("test", "words").select("count", "word").toArray
res10: Array[(Int, String)] = Array((20,bar), (10,foo))

カラムと同じ名前のプロパティを使用してcaseクラスを定義します。複数の単語で構成されるカラムIDの場合は、カラムを作成する際、各単語をアンダースコアで区切り、Scala側でキャメル・ケース略語を使用します。

行をcaseクラスにマッピングするには:

case class WordCount(word: String, count: Int)
defined class WordCount
scala> sc.cassandraTable[WordCount]("test", "words").toArray
res14: Array[WordCount] = Array(WordCount(bar,20), WordCount(foo,20))

カラム名は、以下の規則に従って付けることができます。

  • アンダースコアと小文字を使用します。(推奨)
  • キャメル・ケースを使用し、Scalaのプロパティと完全に一致させます。

以下の例に有効なカラム名を示します。

1. 推奨命名規則
データベース・カラム名 Scalaプロパティ名
count count
column_1 column1
user_name userName
user_address UserAddress
2. 代替的な命名規則
データベース・カラム名 Scalaプロパティ名
count count
column1 column1
userName userName
UserAddress UserAddress

ユーザー定義の関数を使用した行とオブジェクト間のマッピング

asCassandraRDDに対して呼び出して、すべての行を異なる型のオブジェクトにマッピングします。mapとは対照的に、asでは、関数の引数の数がフェッチ対象カラムの数と同じである必要があります。この方法でasを呼び出すと、型変換が実行されます。asを使用して特定の型のオブジェクトを直接作成すると、CassandraRowオブジェクトを作成する必要がなくなり、また、ガーベージ・コレクションの負荷も軽減されます。

ユーザー定義の関数を使用してカラムをマッピングするには:

val table = sc.cassandraTable("test", "words")
table: com.datastax.spark.connector.rdd.CassandraRDD[com.datastax.spark.connector.CassandraRow] = CassandraRDD[9] at RDD at CassandraRDD.scala:47
val total = table.select("count").as((c: Int) => c).sum
total: Double = 30.0
val frequencies = table.select("word", "count").as((w: String, c: Int) => (w, c / total)).toArray
frequencies: Array[(String, Double)] = Array((bar,0.6666666666666666), (foo,0.3333333333333333))

サーバー上の行のフィルター

行をフィルターするには、Sparkのフィルター変換機能を使用します。フィルター変換機能は、まずデータベースからすべての行をフェッチしてから、それをSparkでフィルターします。一部のCPUサイクルは、結果から除外されたオブジェクトのシリアライズとデシリアライズのために浪費されます。このオーバーヘッドを防ぐため、CassandraRDDには、サーバー上の行セットをフィルターするための任意のCQL条件を渡すメソッドが用意されています。

この例は、Sparkを使用してサーバー上の行をフィルターする方法を示しています。
  1. この例のCQLコマンドをダウンロードして解凍します。このファイルに含まれているコマンドによって以下のタスクが実行されます。
    • carsテーブルをtestキースペースに作成
    • colorカラムのインデックス作成
    • テーブルへのデータの挿入
  2. cqlshまたはDevCenterを使用して、test_cars.cqlファイルを実行します。たとえば、cqlshを使用します。
    $ cqlsh -f test_cars.cql
  3. Sparkを使用して行をフィルターします。
    sc.cassandraTable("test", "cars").select("id", "model").where("color = ?", "black").toArray.foreach(println)
    
    CassandraRow{id: AS-8888, model: Aston Martin DB9 Volante} CassandraRow{id: KF-334L, model: Ford Mondeo} CassandraRow{id: MT-8787, model: Hyundai x35} CassandraRow{id: MZ-1038, model: Mazda CX-9} CassandraRow{id: DG-2222, model: Dodge Avenger} CassandraRow{id: DG-8897, model: Dodge Charger} CassandraRow{id: BT-3920, model: Bentley Continental GT} CassandraRow{id: IN-9964, model: Infinity FX}
    sc.cassandraTable("test", "cars").select("id", "model").where("color = ?", "silver").toArray.foreach(println)
    
    CassandraRow{id: FR-8877, model: Ferrari FF} CassandraRow{id: FR-8877, model: Ferrari FF} CassandraRow{id: HD-1828, model: Honda Accord} CassandraRow{id: WX-2234, model: Toyota Yaris}