Sparkコンテキストの使用

Cassandraテーブルを表すSpark RDDを取得するには、scドット(sc.)構文を使用してcassandraTableメソッドをSparkコンテキストに対して呼び出すことによって、CassandraテーブルのデータをSparkに読み込みます。

Cassandraテーブルを表すSpark RDDを取得するには、scドット(sc.)構文を使用してcassandraTableメソッドをSparkコンテキストに対して呼び出すことによって、CassandraテーブルのデータをSparkに読み込みます。ここで、scは、Spark API SparkContextクラスを表します。

sc.cassandraTable ( "keyspace", "table name" )

CassandraデータはScalaオブジェクトにマッピングされ、DataStax EnterpriseによってCassandraRDD[CassandraRow]が返されます。DataStax Enterpriseの外部で実行されるアプリケーションを作成するためにSpark APIを使用するには、com.datastax.spark.connector.SparkContextCassandraFunctionsをインポートします。

以下の例は、CassandraテーブルをSparkに読み込んで、CassandraのテーブルをSparkから読み取る方法を示しています。

  1. cqlshを使用して、以下のキースペースとテーブルをCassandraで作成します。キースペースの作成には、Analyticsデータセンターを使用します。
    CREATE KEYSPACE test WITH REPLICATION = {'class' :'NetworkTopologyStrategy', 'Analytics' : 1};
    
    CREATE TABLE test.words (word text PRIMARY KEY, count int);

    この例では、単一ノード・クラスターをSparkモードで起動することを前提としています。

  2. データをwordsテーブルに読み込みます。
    INSERT INTO test.words (word, count) VALUES ('foo', 10);
    INSERT INTO test.words (word, count) VALUES ('bar', 20);
  3. ノードをSparkモードで起動してあるという前提で、Sparkシェルを起動します。シェルを起動するためにsudoを使用しないでください。
    $ bin/dse spark

    「Welcome to Spark」という出力とscalaプロンプトが表示されます。

  4. showSchemaコマンドを使用して、userキースペースおよびテーブルをCassandraで表示します。
    :showSchema

    すべてのuserキースペースに関する情報が表示されます。

    ========================================
    Keyspace:HiveMetaStore
    ========================================
    Table:MetaStore
    ----------------------------------------
    - key    :String (partition key column)
    - entity :String              (clustering column)
    - value  :java.nio.ByteBuffer 
    
    
    ========================================
    Keyspace:test
    ========================================
    Table:words
    ----------------------------------------
    - word  :String (partition key column)
    - count :Int                
    
    scala> :showSchema test
    ========================================
    Keyspace:test
    ========================================
    Table:words
    ----------------------------------------
    - word  :String (partition key column)
    - count :Int   
    
    scala> :showSchema test words
    ========================================
    Keyspace:test
    ========================================
    Table:words
    ----------------------------------------
    - word  :String (partition key column)
    - count :Int
  5. testキースペースのみに関する情報を取得します。
    :showSchema test
    
    ========================================
    Keyspace:test
    ========================================
    Table:words
    ----------------------------------------
    - word  :String (partition key column)
    - count :Int   
  6. wordsテーブルに関する情報を取得します。
    :showSchema test words
    
    ========================================
    Keyspace:test
    ========================================
    Table:words
    ----------------------------------------
    - word  :String (partition key column)
    - count :Int
  7. test.wordsテーブルのデータを参照するベースRDDを定義します。
    val rdd = sc.cassandraTable("test", "words")
    
    rdd:com.datastax.spark.connector.rdd.CassandraRDD[com.datastax.spark.connector.
    CassandraRow] = CassandraRDD[0] at RDD at CassandraRDD.scala:47

    RDDがrdd値で返されます。Cassandraテーブルを読み取るには、以下のコマンドを使用します。

    rdd.toArray.foreach(println)
    
    CassandraRow{word:bar, count: 20}
    CassandraRow{word:foo, count: 10}

返されたRDDに対するメソッドを使用して、test.wordsテーブルをクエリーできるようになりました。

cassandraTablesの読み込みに対するPythonのサポート 

Pythonでは、Sparkストリーミング・コンテキストからcassandraTablesを読み込み、DStreamをCassandraに保存する操作をサポートしています。

カラム値の読み取り 

CassandraRowオブジェクトのgetメソッドを使用してCassandraテーブルのカラムを読み取ることができます。getメソッドは、カラム名またはカラム・インデックスに基づいて、個々のカラム値にアクセスします。型変換はその場で適用されます。Cassandraのnull値が返されることが予想される場合は、getOptionバリアントを使用します。

前の例をそのまま使用し、以下の手順に従って個々のカラム値にアクセスします。
  1. RDDの最初の項目をfirstRow値に格納します。
    val firstRow = rdd.first
    firstRow:com.datastax.spark.connector.CassandraRow = CassandraRow{word:foo, count: 10}
  2. カラム名を取得します。
    rdd.columnNames
    
    res3:com.datastax.spark.connector.ColumnSelector = AllColumns
  3. 一般的なgetを使用し、戻り値の型を直接渡してテーブルをクエリーします。
    firstRow.get[Int]("count")
    
    res4:Int = 10
    
    firstRow.get[Long]("count")
    
    res5:Long = 10
    
    firstRow.get[BigInt]("count")
    
    res6:BigInt = 10
    
    firstRow.get[java.math.BigInteger]("count")
    
    res7:java.math.BigInteger = 10
    
    firstRow.get[Option[Int]]("count")
    
    res8:Option[Int] = Some(10)
    
    firstRow.get[Option[BigInt]]("count")
    
    res9:Option[BigInt] = Some(10)

コレクションの読み取り 

CassandraRowオブジェクトのgetメソッドを使用してCassandraテーブルのコレクション・カラムを読み取ることができます。getメソッドは、コレクション・カラムにアクセスし、対応するScalaコレクションを返します。

testキースペースがすでにセットアップされているという前提で、以下の手順に従ってCassandraコレクションにアクセスします。

  1. testキースペースで、cqlshを使用してコレクション・セットをセットアップします。
    CREATE TABLE test.users (
    username text PRIMARY KEY, emails SETtext);
    
    INSERT INTO test.users (username, emails) 
    VALUES ('someone', {'someone@email.com', 's@email.com'});
  2. Sparkが実行されていない場合は、Sparkシェルを起動します。シェルを起動するためにsudoを使用しないでください。
    $ bin/dse spark

    「Welcome to Spark」という出力とプロンプトが表示されます。

  3. コレクション・セットにアクセスするためのCassandraRDD[CassandraRow]を定義します。
    val row = sc.cassandraTable("test", "users").toArray.apply(0)
    
    row:com.datastax.spark.connector.CassandraRow = CassandraRow{username:someone, 
    emails:{s@email.com,someone@email.com}}
  4. Cassandraのコレクション・セットをSparkからクエリーします。
    row.getList[String]("emails")
    
    res2:Vector[String] = Vector(s@email.com, someone@email.com)
    
    row.get[List[String]]("emails")
    
    res3:List[String] = List(s@email.com, someone@email.com)
    
    row.get[Seq[String]]("emails")
    
    res4:Seq[String] = List(s@email.com, someone@email.com)
    
    row.get[IndexedSeq[String]]("emails")
    
    res5:IndexedSeq[String] = Vector(s@email.com, someone@email.com)
    
    row.get[Set[String]]("emails")
    
    res6:Set[String] = Set(s@email.com, someone@email.com)
    
    row.get[String]("emails")
    
    res7:String = {s@email.com,someone@email.com}

フェッチ対象カラムの数の制限 

パフォーマンス上の理由のため、不要なカラムはフェッチ対象から除外する必要があります。これを行うには、selectメソッドを使用します。

フェッチ対象カラムの数を制限するには:

val row = sc.cassandraTable("test", "users").select("username").toArray
row:Array[com.datastax.spark.connector.CassandraRow] = Array(CassandraRow{username:someone})

行とタプルおよびcaseクラス間のマッピング 

Cassandra行をCassandraRowクラスのオブジェクトにマッピングする代わりに、必要な型のタプルにカラム値を直接ラップ解除することができます。

行をタプルにマッピングするには:

sc.cassandraTable[(String, Int)]("test", "words").select("word", "count").toArray
res9:Array[(String, Int)] = Array((bar,20), (foo,10))
sc.cassandraTable[(Int, String)]("test", "words").select("count", "word").toArray
res10:Array[(Int, String)] = Array((20,bar), (10,foo))

Cassandraカラムと同じ名前のプロパティを使用してcaseクラスを定義します。複数の単語で構成されるカラムIDの場合は、Cassandraで各単語をアンダースコアで区切り、Scala側でキャメル・ケース略語を使用します。

行をcaseクラスにマッピングするには:

case class WordCount(word:String, count:Int)
defined class WordCount
scala> sc.cassandraTable[WordCount]("test", "words").toArray
res14:Array[WordCount] = Array(WordCount(bar,20), WordCount(foo,20))

Cassandraのカラム名は、以下の規則に従って付けることができます。

  • アンダースコアと小文字を使用します(推奨)。
  • キャメル・ケースを使用し、Scalaのプロパティと完全に一致させます。

以下の例に有効なカラム名を示します。

推奨命名規則
Cassandraカラム名 Scalaプロパティ名
count count
column_1 column1
user_name userName
user_address UserAddress
代替的な命名規則
Cassandraカラム名 Scalaプロパティ名
count count
column1 column1
userName userName
UserAddress UserAddress

ユーザー定義の関数を使用した行とオブジェクト間のマッピング 

asCassandraRDDに対して呼び出して、すべての行を異なる型のオブジェクトにマッピングします。mapとは対照的に、asでは、関数の引数の数がフェッチ対象カラムの数と同じである必要があります。この方法でasを呼び出すと、型変換が実行されます。asを使用して特定の型のオブジェクトを直接作成すると、CassandraRowオブジェクトを作成する必要がなくなり、また、ガーベージ・コレクションの負荷も軽減されます。

ユーザー定義の関数を使用してカラムをマッピングするには:

val table = sc.cassandraTable("test", "words")
table:com.datastax.spark.connector.rdd.CassandraRDD[com.datastax.spark.connector.
CassandraRow] = CassandraRDD[9] at RDD at CassandraRDD.scala:47
val total = table.select("count").as((c:Int) => c).sum
total:Double = 30.0
val frequencies = table.select("word", "count").as((w:String, c:Int) => (w, c / total)).toArray
frequencies:Array[(String, Double)] = Array((bar,0.6666666666666666), (foo,0.3333333333333333))

サーバー上の行のフィルター 

行をフィルターするには、Sparkのフィルター変換機能を使用します。フィルター変換機能は、まずCassandraからすべての行をフェッチしてから、それをSparkでフィルターします。一部のCPUサイクルは、結果から除外されたオブジェクトのシリアライズとデシリアライズのために浪費されます。このオーバーヘッドを防ぐため、CassandraRDDには、サーバー上の行セットをフィルターするための任意のCQL条件を渡すメソッドが用意されています。

この例は、Sparkを使用してサーバー上の行をフィルターする方法を示しています。
  1. この例のCQLコマンドをダウンロードして解凍します。このファイルに含まれているコマンドによって以下のタスクが実行されます。
    • carsテーブルをtestキースペースに作成
    • colorカラムのインデックス作成
    • テーブルへのデータの挿入
  2. cqlshまたはDevCenterを使用して、test_cars.cqlファイルを実行します。たとえば、cqlshを使用します。
    cqlsh -f test_cars.cql
  3. Sparkを使用して行をフィルターします。
    sc.cassandraTable("test", "cars").select("id", "model").where("color = ?", "black").toArray.foreach(println)
    
    CassandraRow{id:AS-8888, model:Aston Martin DB9 Volante}
    CassandraRow{id:KF-334L, model:Ford Mondeo}
    CassandraRow{id:MT-8787, model:Hyundai x35}
    CassandraRow{id:MZ-1038, model:Mazda CX-9}
    CassandraRow{id:DG-2222, model:Dodge Avenger}
    CassandraRow{id:DG-8897, model:Dodge Charger}
    CassandraRow{id:BT-3920, model:Bentley Continental GT}
    CassandraRow{id:IN-9964, model:Infinity FX}
    
    sc.cassandraTable("test", "cars").select("id", "model").where("color = ?", "silver").toArray.foreach(println)
    
    CassandraRow{id:FR-8877, model:Ferrari FF}
    CassandraRow{id:FR-8877, model:Ferrari FF}
    CassandraRow{id:HD-1828, model:Honda Accord}
    CassandraRow{id:WX-2234, model:Toyota Yaris}