DataFrame APIの使用

Spark DataFrame APIは、DataStax Enterpriseデータを含む、名前付きのカラムにまとめられているデータ・ソースをカプセル化します。

Spark DataFrame APIは、DataStax Enterpriseデータを含む、名前付きのカラムにまとめられているデータ・ソースをカプセル化します。

Spark Cassandra Connectorは、統合されたDataSourceを提供し、DataFrameの作成を単純化します。技術的な詳細については、DataStaxで管理されているSpark Cassandra Connectorのドキュメントおよび「CassandraおよびPySparkのDataFrame」の記事を参照してください。

DataFrame APIの使用例

このPythonの例では、DataFrame APIを使用し、テーブルks.kvから読み取って、別のテーブルks.othertableに挿入する方法を示しています。

table1 = spark.read.format("org.apache.spark.sql.cassandra")
  .options(table="kv", keyspace="ks")
  .load()
table1.write.format("org.apache.spark.sql.cassandra")
  .options(table="othertable", keyspace = "ks")
  .save(mode ="append")

以下のScala例では、DSE Sparkコンソールを使用し、あるテーブルからDataFrameオブジェクトを作成して、別のテーブルに保存する方法を示しています。

val table1 = spark.read.format("org.apache.spark.sql.cassandra")
  .options(Map( "table" -> "words", "keyspace" -> "test"))
  .load()
table1.createCassandraTable("test", "otherwords", partitionKeyColumns = Some(Seq("word")), clusteringKeyColumns = Some(Seq("count")))
table1.write.cassandraFormat("otherwords", "test").save()

書き込み操作では、Spark Cassandra Connectorに含まれているヘルパー・メソッドの1つであるcassandraFormatを使用します。これは、標準的なDataFrame操作の形式とオプションを設定する簡素化された方法です。以下のコマンドは、cassandraFormatを使用する書き込み操作に相当します。

table1.write.format("org.apache.spark.sql.cassandra")
  .options(Map("table" -> "othertable", "keyspace" -> "test"))
  .save()