Sparkコンテキストの使用
データベース・テーブルを表すSpark RDDを取得するには、scドット(sc.)構文を使用してcassandraTableメソッドをSparkコンテキストに対して呼び出すことによって、テーブルのデータをSparkに読み込みます。
廃止予定のコンテキスト・オブジェクトにアクセスするには、spark.sparkContextを呼び出します。
val sc = spark.sparkContext
データベース・テーブルを表すSpark RDDを取得するには、scドット(sc.)構文を使用してcassandraTableメソッドをSparkコンテキストに対して呼び出すことによって、テーブルのデータをSparkに読み込みます。scは、Spark API SparkContextクラスを表します。
sc.cassandraTable ( "keyspace", "table name" )
デフォルトでは、DSE Sparkシェルはscオブジェクトを作成します。Sparkコンテキストは、spark.sparkContextを呼び出して、Sparkシェル内のSpark sessionオブジェクトから手動で取得できます。
val sc = spark.sparkContext()
データはScalaオブジェクトにマッピングされ、DataStax EnterpriseによってCassandraRDD[CassandraRow]が返されます。DataStax Enterpriseの外部で実行されるアプリケーションを作成するためにSpark APIを使用するには、com.datastax.spark.connector.SparkContextCassandraFunctionsをインポートします。
以下の例は、テーブルをSparkに読み込んで、テーブルをSparkから読み取る方法を示しています。
cqlsh
を使用して、以下のキースペースとテーブルを作成します。キースペースの作成には、Analyticsデータ・センターを使用します。CREATE KEYSPACE test WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'Analytics' : 1}; CREATE TABLE test.words (word text PRIMARY KEY, count int);
この例では、単一ノード・クラスターをSparkモードで起動することを前提としています。
- データを
words
テーブルに読み込みます。INSERT INTO test.words (word, count) VALUES ('foo', 10); INSERT INTO test.words (word, count) VALUES ('bar', 20);
- ノードをSparkモードで起動してあるという前提で、Sparkシェルを起動します。シェルを起動するために
sudo
を使用しないでください。bin/dse spark
「Welcome to Spark」という出力とプロンプトが表示されます。
- showSchemaコマンドを使用して、userキースペースおよびテーブルを表示します。
:showSchema
すべてのuserキースペースに関する情報が表示されます。
======================================== Keyspace: HiveMetaStore ======================================== Table: MetaStore ---------------------------------------- - key : String (partition key column) - entity : String (clustering column) - value : java.nio.ByteBuffer ======================================== Keyspace: test ======================================== Table: words ---------------------------------------- - word : String (partition key column) - count : Int scala> :showSchema test ======================================== Keyspace: test ======================================== Table: words ---------------------------------------- - word : String (partition key column) - count : Int scala> :showSchema test words ======================================== Keyspace: test ======================================== Table: words ---------------------------------------- - word : String (partition key column) - count : Int
test
キースペースのみに関する情報を取得します。:showSchema test
======================================== Keyspace: test ======================================== Table: words ---------------------------------------- - word : String (partition key column) - count : Int
words
テーブルに関する情報を取得します。:showSchema test words
======================================== Keyspace: test ======================================== Table: words ---------------------------------------- - word : String (partition key column) - count : Int
test.words
テーブルのデータを参照するベースRDDを定義します。val rdd = sc.cassandraTable("test", "words")
rdd: com.datastax.spark.connector.rdd.CassandraRDD[com.datastax.spark.connector.CassandraRow] = CassandraRDD[0] at RDD at CassandraRDD.scala:47
RDDがrdd値で返されます。テーブルを読み取るには、このコマンドを使用します。
rdd.toArray.foreach(println)
CassandraRow{word: bar, count: 20} CassandraRow{word: foo, count: 10}
返されたRDDに対するメソッドを使用して、test.words
テーブルをクエリーできるようになりました。
cassandraTablesを読み込むためのPythonサポート
Pythonでは、Sparkストリーミング・コンテキストからのcassandraTables
の読み込みと、データベースへのDStreamの保存をサポートしています。
カラム値の読み取り
CassandraRowオブジェクトのgetメソッドを使用して、テーブルのカラムを読み取ることができます。getメソッドは、カラム名またはカラム・インデックスに基づいて、個々のカラム値にアクセスします。型変換はその場で適用されます。null値が返されることが予想される場合は、getOptionバリアントを使用します。
- RDDの最初の項目を
firstRow
値に格納します。val firstRow = rdd.first
firstRow: com.datastax.spark.connector.CassandraRow = CassandraRow{word: foo, count: 10}
- カラム名を取得します。
rdd.columnNames
res3: com.datastax.spark.connector.ColumnSelector = AllColumns
- 一般的なgetを使用し、戻り値の型を直接渡してテーブルをクエリーします。
firstRow.get[Int]("count")
res4: Int = 10
firstRow.get[Long]("count")
res5: Long = 10
firstRow.get[BigInt]("count")
res6: BigInt = 10
firstRow.get[java.math.BigInteger]("count")
res7: java.math.BigInteger = 10
firstRow.get[Option[Int]]("count")
res8: Option[Int] = Some(10)
firstRow.get[Option[BigInt]]("count")
res9: Option[BigInt] = Some(10)
コレクションの読み取り
CassandraRowオブジェクトのgetメソッドを使用して、テーブルのコレクション・カラムを読み取ることができます。getメソッドは、コレクション・カラムにアクセスし、対応するScalaコレクションを返します。
test
キースペースが既にセットアップされているという前提で、以下の手順に従ってコレクションにアクセスします。
test
キースペースで、cqlshを使用してコレクション・セットをセットアップします。CREATE TABLE test.users ( username text PRIMARY KEY, emails SETtext); INSERT INTO test.users (username, emails) VALUES ('someone', {'someone@email.com', 's@email.com'});
- Sparkが実行されていない場合は、Sparkシェルを起動します。シェルを起動するためにsudoを使用しないでください。
bin/dse spark
「Welcome to Spark」という出力とプロンプトが表示されます。
- コレクション・セットにアクセスするためのCassandraRDD[CassandraRow]を定義します。
val row = sc.cassandraTable("test", "users").toArray.apply(0)
row: com.datastax.spark.connector.CassandraRow = CassandraRow{username: someone, emails: {s@email.com,someone@email.com}}
- コレクション・セットをSparkからクエリーします。
row.getList[String]("emails")
res2: Vector[String] = Vector(s@email.com, someone@email.com)
row.get[List[String]]("emails")
res3: List[String] = List(s@email.com, someone@email.com)
row.get[Seq[String]]("emails")
res4: Seq[String] = List(s@email.com, someone@email.com)
row.get[IndexedSeq[String]]("emails")
res5: IndexedSeq[String] = Vector(s@email.com, someone@email.com)
row.get[Set[String]]("emails")
res6: Set[String] = Set(s@email.com, someone@email.com)
row.get[String]("emails")
res7: String = {s@email.com,someone@email.com}
フェッチ対象カラムの数の制限
パフォーマンス上の理由のため、不要なカラムはフェッチ対象から除外する必要があります。これを行うには、selectメソッドを使用します。
フェッチ対象カラムの数を制限するには:
val row = sc.cassandraTable("test", "users").select("username").toArray
row: Array[com.datastax.spark.connector.CassandraRow] = Array(CassandraRow{username: someone})
行とタプルおよびcaseクラス間のマッピング
行をCassandraRowクラスのオブジェクトにマッピングする代わりに、必要な型のタプルにカラム値を直接ラップ解除することができます。
行をタプルにマッピングするには:
sc.cassandraTable[(String, Int)]("test", "words").select("word", "count").toArray
res9: Array[(String, Int)] = Array((bar,20), (foo,10))
sc.cassandraTable[(Int, String)]("test", "words").select("count", "word").toArray
res10: Array[(Int, String)] = Array((20,bar), (10,foo))
カラムと同じ名前のプロパティを使用してcaseクラスを定義します。複数の単語で構成されるカラムIDの場合は、カラムを作成する際、各単語をアンダースコアで区切り、Scala側でキャメル・ケース略語を使用します。
行をcaseクラスにマッピングするには:
case class WordCount(word: String, count: Int)
defined class WordCount
scala> sc.cassandraTable[WordCount]("test", "words").toArray
res14: Array[WordCount] = Array(WordCount(bar,20), WordCount(foo,20))
カラム名は、以下の規則に従って付けることができます。
- アンダースコアと小文字を使用します。(推奨)
- キャメル・ケースを使用し、Scalaのプロパティと完全に一致させます。
以下の例に有効なカラム名を示します。
データベース・カラム名 | Scalaプロパティ名 |
---|---|
count | count |
column_1 | column1 |
user_name | userName |
user_address | UserAddress |
データベース・カラム名 | Scalaプロパティ名 |
---|---|
count | count |
column1 | column1 |
userName | userName |
UserAddress | UserAddress |
ユーザー定義の関数を使用した行とオブジェクト間のマッピング
asをCassandraRDDに対して呼び出して、すべての行を異なる型のオブジェクトにマッピングします。mapとは対照的に、asでは、関数の引数の数がフェッチ対象カラムの数と同じである必要があります。この方法でasを呼び出すと、型変換が実行されます。asを使用して特定の型のオブジェクトを直接作成すると、CassandraRowオブジェクトを作成する必要がなくなり、また、ガーベージ・コレクションの負荷も軽減されます。
ユーザー定義の関数を使用してカラムをマッピングするには:
val table = sc.cassandraTable("test", "words")
table: com.datastax.spark.connector.rdd.CassandraRDD[com.datastax.spark.connector.CassandraRow] = CassandraRDD[9] at RDD at CassandraRDD.scala:47
val total = table.select("count").as((c: Int) => c).sum
total: Double = 30.0
val frequencies = table.select("word", "count").as((w: String, c: Int) => (w, c / total)).toArray
frequencies: Array[(String, Double)] = Array((bar,0.6666666666666666), (foo,0.3333333333333333))
サーバー上の行のフィルター
行をフィルターするには、Sparkのフィルター変換機能を使用します。フィルター変換機能は、まずデータベースからすべての行をフェッチしてから、それをSparkでフィルターします。一部のCPUサイクルは、結果から除外されたオブジェクトのシリアライズとデシリアライズのために浪費されます。このオーバーヘッドを防ぐため、CassandraRDDには、サーバー上の行セットをフィルターするための任意のCQL条件を渡すメソッドが用意されています。
- この例のCQLコマンドをダウンロードして解凍します。このファイルに含まれているコマンドによって以下のタスクが実行されます。
- carsテーブルをtestキースペースに作成
- colorカラムのインデックス作成
- テーブルへのデータの挿入
cqlsh
またはDevCenterを使用して、test_cars.cqlファイルを実行します。たとえば、cqlsh
を使用します。$ cqlsh -f test_cars.cql
- Sparkを使用して行をフィルターします。
sc.cassandraTable("test", "cars").select("id", "model").where("color = ?", "black").toArray.foreach(println)
CassandraRow{id: AS-8888, model: Aston Martin DB9 Volante} CassandraRow{id: KF-334L, model: Ford Mondeo} CassandraRow{id: MT-8787, model: Hyundai x35} CassandraRow{id: MZ-1038, model: Mazda CX-9} CassandraRow{id: DG-2222, model: Dodge Avenger} CassandraRow{id: DG-8897, model: Dodge Charger} CassandraRow{id: BT-3920, model: Bentley Continental GT} CassandraRow{id: IN-9964, model: Infinity FX}
sc.cassandraTable("test", "cars").select("id", "model").where("color = ?", "silver").toArray.foreach(println)
CassandraRow{id: FR-8877, model: Ferrari FF} CassandraRow{id: FR-8877, model: Ferrari FF} CassandraRow{id: HD-1828, model: Honda Accord} CassandraRow{id: WX-2234, model: Toyota Yaris}