Spark Cassandra Connector Java APIの概要

Spark Cassandra Connector Java APIを使用すると、Cassandraデータの分析にSparkを使用するJavaアプリケーションを作成できます。

Spark Cassandra Connector Java APIを使用すると、Cassandraデータの分析にSparkを使用するJavaアプリケーションを作成できます。GitHubにあるSpark Cassandra ConnectorのJavaドキュメントを参照してください。DataStax Enterpriseが使用している最新バージョンのSpark Cassandra Connectorについては、コンポーネントのバージョンを参照してください。

SBTビルド・ファイルでのJava APIの使用 

以下のライブラリ依存関係をbuild.sbtまたはSBTビルド・ファイルに追加します。

libraryDependencies + =" com.datastax.spark " %%" spark-cassandra-connector-java_2.10 " %" 1.6.1 " withSources() withJavadoc()

Mavenビルド・ファイルでのJava APIの使用 

以下の依存関係をpom.xmlファイルに追加します。

<dependencies>
<dependency>
<groupId>com.datastax.spark</groupId>
<artifactId>spark-cassandra-connector-java_2.10</artifactId>
version1.6.1</version>
</dependency>
    ...
</dependencies>

アプリケーションのdse.jarに含まれているhelperクラスを使用するには、dse.jarproject_directory/libにコピーし、以下の依存関係をpom.xmlファイルに追加します。

<dependency>
<groupId>com.datastax</groupId>
<artifactId>dse</artifactId>
versionversion number</version>
<scope>system</scope>
<systemPath>${project.basedir}/lib/dse-version number.jar</systemPath>
</dependency>

Alternately, you can manually install dse.jar in your local repository.

mvn install:install-file -Dfile=path/dse-version number.jar -DgroupId=com.datastax -DartifactId=dse -Dversion=version number -Dpackaging=jar

次に、依存関係をpom.xmlに追加します。

<dependency>
<groupId>com.datastax</groupId>
<artifactId>dse</artifactId>
versionversion number</version>
</dependency>

Scalaアプリケーション内のCassandraデータへのアクセス 

Cassandraテーブル・データに対してSpark操作を実行するには、まずRDDオブジェクトを取得する必要があります。RDDオブジェクトを作成するには、Sparkコンテキスト・オブジェクトを作成するために使用されるSpark構成オブジェクトを作成します。

import com.datastax.spark.connector._
val conf = new SparkConf(true)
.set("spark.cassandra.connection.host", "127.0.0.1")
val sc = new SparkContext("spark://127.0.0.1:7077", "test", conf)
val rdd = sc.cassandraTable("my_keyspace", "my_table")

データをScalaアプリケーション内のCassandraに保存するには、saveToCassandraメソッドを使用して、キースペース、テーブル、およびマッピング情報を渡します。

val collection = sc.parallelize(Seq(("key3", 3), ("key4", 4)))
collection.saveToCassandra("my_keyspace", "my_table", SomeColumns("key", "value"))

Javaアプリケーション内のCassandraデータへのアクセス 

Cassandraテーブル・データに対してSparkアクションを実行するには、まず、JavaRDDクラスのサブクラスであるCassandraJavaRDDオブジェクトを取得します。CassandraJavaRDDは、Scalaアプリケーションで使用されるCassandraRDDオブジェクトのJava言語版です。

CassandraJavaRDDオブジェクトを作成するには、Sparkコンテキスト・オブジェクトを作成するために使用されるSpark構成オブジェクトを作成します。

SparkConf conf = new SparkConf()
.setAppName( "My application");
SparkContext sc = new SparkContext(conf);
dse.jarファイルのデフォルトの場所は、インストールのタイプによって異なります。
Installer-Servicesおよびパッケージ・インストール /usr/share/dse/dse.jar
Installer-No Servicesおよびtarボール・インストール install_location/lib/dse.jar

CassandraJavaRDDインスタンスの取得と操作には、com.datastax.spark.connector.japi.CassandraJavaUtilクラスの静的メソッドを使用します。新しいCassandraJavaRDDインスタンスを取得するには、CassandraJavaUtilのいずれかのjavaFunctionsメソッドを呼び出してコンテキスト・オブジェクトに渡してから、cassandraTableメソッドを呼び出して、キースペース、テーブル名、およびマッピング・クラスを渡します。

JavaRDDstring cassandraRdd = CassandraJavaUtil.javaFunctions(sc)
.cassandraTable("my_keyspace", "my_table", .mapColumnTo(String.class))
.select("my_column");

Java型へのCassandraカラム・データのマッピング 

テーブル行の1つのカラムのJava型を指定するには、CassandraJavaRDD<T>インスタンスの作成時に型を指定し、mapColumnToメソッドを呼び出して、型を渡します。次に、selectメソッドを呼び出してカラム名をCassandraで設定します。

JavaRDD<Integer> cassandraRdd = CassandraJavaUtil.javaFunctions(sc)
.cassandraTable("my_keyspace", "my_table", .mapColumnTo(Integer.class))
.select("column1");

JavaBeansクラスは、mapRowToメソッドを使用してマッピングできます。JavaBeansプロパティ名は、デフォルトのマッピング・ルールに従ってカラム名に対応させる必要があります。たとえば、firstNameプロパティは、デフォルトでfirst_nameカラム名にマッピングされます。

JavaRDD<Person> personRdd = CassandraJavaUtil.javaFunctions(sc)
.cassandraTable("my_keyspace", "my_table", mapRowTo(Person.class));

CassandraJavaPairRDD<T, T>インスタンスは、JavaPairRDDクラスの拡張であり、行とカラムに関して前出の例と同様にマッピング読み取り側が存在します。これらのペアRDDは、通常、キー/値のペアに使用され、最初の型がキー、2番目の型が値になります。

キーと値の両方に1つのカラムをマッピングする場合は、mapColumnToを呼び出して、キーと値の型を指定してから、selectメソッドを呼び出し、キーと値のカラム名を渡します。

CassandraJavaPairRDD<Integer, String> pairRdd = CassandraJavaUtil.javaFunctions(sc)
.cassandraTable("my_keyspace", "my_table", mapColumnTo(Integer.class), mapColumnTo(String.class))
select("id", "first_name");

mapRowToメソッドは、行データをJava型にマッピングするために使用します。たとえば、プライマリ・キーを使用してペアRDDインスタンスを作成し、次にJavaBeansオブジェクトを作成するには、次のコードを使用します。

CassandraJavaPairRDD<Integer, Person> idPersonRdd = CassandraJavaUtil.javaFunctions(sc)
.cassandraTable("my_keyspace", "my_table", mapColumnTo(Integer.class), mapRowTo(Person.class))
.select("id", "first_name", "last_name", "birthdate", "email");

Javaアプリケーション内のCassandraへのデータの保存 

RDDからCassandraにデータを保存するには、writerBuilderメソッドをCassandraJavaRDDインスタンスに対して呼び出し、キースペースとテーブル名、そして任意でカラムまたは行の型マッピング情報を渡します。

CassandraJavaUtil.javaFunctions(personRdd)
.writerBuilder("my_keyspace", "my_table", mapToRow(Person.class)).saveToCassandra();