Spark Cassandra Connector Java APIの概要
Spark Cassandra Connector Java APIを使用すると、Cassandraデータの分析にSparkを使用するJavaアプリケーションを作成できます。
Spark Cassandra Connector Java APIを使用すると、Cassandraデータの分析にSparkを使用するJavaアプリケーションを作成できます。GitHubにあるSpark Cassandra ConnectorのJavaドキュメントを参照してください。DataStax Enterpriseが使用している最新バージョンのSpark Cassandra Connectorについては、コンポーネントのバージョンを参照してください。
SBTビルド・ファイルでのJava APIの使用
以下のライブラリ依存関係をbuild.sbtまたはSBTビルド・ファイルに追加します。
libraryDependencies + =" com.datastax.spark " %%" spark-cassandra-connector-java_2.10 " %" 1.6.1 " withSources() withJavadoc()
Mavenビルド・ファイルでのJava APIの使用
以下の依存関係をpom.xmlファイルに追加します。
<dependencies>
<dependency>
<groupId>com.datastax.spark</groupId>
<artifactId>spark-cassandra-connector-java_2.10</artifactId>
version1.6.1</version>
</dependency>
...
</dependencies>
アプリケーションのdse.jarに含まれているhelperクラスを使用するには、dse.jarをproject_directory/libにコピーし、以下の依存関係をpom.xmlファイルに追加します。
<dependency>
<groupId>com.datastax</groupId>
<artifactId>dse</artifactId>
versionversion number</version>
<scope>system</scope>
<systemPath>${project.basedir}/lib/dse-version number.jar</systemPath>
</dependency>
Alternately, you can manually install dse.jar in your local repository.
mvn install:install-file -Dfile=path/dse-version number.jar -DgroupId=com.datastax -DartifactId=dse -Dversion=version number -Dpackaging=jar
次に、依存関係をpom.xmlに追加します。
<dependency>
<groupId>com.datastax</groupId>
<artifactId>dse</artifactId>
versionversion number</version>
</dependency>
Scalaアプリケーション内のCassandraデータへのアクセス
Cassandraテーブル・データに対してSpark操作を実行するには、まずRDDオブジェクトを取得する必要があります。RDDオブジェクトを作成するには、Sparkコンテキスト・オブジェクトを作成するために使用されるSpark構成オブジェクトを作成します。
import com.datastax.spark.connector._
val conf = new SparkConf(true)
.set("spark.cassandra.connection.host", "127.0.0.1")
val sc = new SparkContext("spark://127.0.0.1:7077", "test", conf)
val rdd = sc.cassandraTable("my_keyspace", "my_table")
データをScalaアプリケーション内のCassandraに保存するには、saveToCassandraメソッドを使用して、キースペース、テーブル、およびマッピング情報を渡します。
val collection = sc.parallelize(Seq(("key3", 3), ("key4", 4)))
collection.saveToCassandra("my_keyspace", "my_table", SomeColumns("key", "value"))
Javaアプリケーション内のCassandraデータへのアクセス
Cassandraテーブル・データに対してSparkアクションを実行するには、まず、JavaRDDクラスのサブクラスであるCassandraJavaRDDオブジェクトを取得します。CassandraJavaRDDは、Scalaアプリケーションで使用されるCassandraRDDオブジェクトのJava言語版です。
CassandraJavaRDDオブジェクトを作成するには、Sparkコンテキスト・オブジェクトを作成するために使用されるSpark構成オブジェクトを作成します。
SparkConf conf = new SparkConf()
.setAppName( "My application");
SparkContext sc = new SparkContext(conf);
Installer-Servicesおよびパッケージ・インストール | /usr/share/dse/dse.jar |
Installer-No Servicesおよびtarボール・インストール | install_location/lib/dse.jar |
CassandraJavaRDDインスタンスの取得と操作には、com.datastax.spark.connector.japi.CassandraJavaUtilクラスの静的メソッドを使用します。新しいCassandraJavaRDDインスタンスを取得するには、CassandraJavaUtilのいずれかのjavaFunctions
メソッドを呼び出してコンテキスト・オブジェクトに渡してから、cassandraTable
メソッドを呼び出して、キースペース、テーブル名、およびマッピング・クラスを渡します。
JavaRDDstring cassandraRdd = CassandraJavaUtil.javaFunctions(sc)
.cassandraTable("my_keyspace", "my_table", .mapColumnTo(String.class))
.select("my_column");
Java型へのCassandraカラム・データのマッピング
テーブル行の1つのカラムのJava型を指定するには、CassandraJavaRDD<T>インスタンスの作成時に型を指定し、mapColumnTo
メソッドを呼び出して、型を渡します。次に、select
メソッドを呼び出してカラム名をCassandraで設定します。
JavaRDD<Integer> cassandraRdd = CassandraJavaUtil.javaFunctions(sc)
.cassandraTable("my_keyspace", "my_table", .mapColumnTo(Integer.class))
.select("column1");
JavaBeansクラスは、mapRowTo
メソッドを使用してマッピングできます。JavaBeansプロパティ名は、デフォルトのマッピング・ルールに従ってカラム名に対応させる必要があります。たとえば、firstName
プロパティは、デフォルトでfirst_name
カラム名にマッピングされます。
JavaRDD<Person> personRdd = CassandraJavaUtil.javaFunctions(sc)
.cassandraTable("my_keyspace", "my_table", mapRowTo(Person.class));
CassandraJavaPairRDD<T, T>インスタンスは、JavaPairRDDクラスの拡張であり、行とカラムに関して前出の例と同様にマッピング読み取り側が存在します。これらのペアRDDは、通常、キー/値のペアに使用され、最初の型がキー、2番目の型が値になります。
キーと値の両方に1つのカラムをマッピングする場合は、mapColumnTo
を呼び出して、キーと値の型を指定してから、select
メソッドを呼び出し、キーと値のカラム名を渡します。
CassandraJavaPairRDD<Integer, String> pairRdd = CassandraJavaUtil.javaFunctions(sc)
.cassandraTable("my_keyspace", "my_table", mapColumnTo(Integer.class), mapColumnTo(String.class))
select("id", "first_name");
mapRowTo
メソッドは、行データをJava型にマッピングするために使用します。たとえば、プライマリ・キーを使用してペアRDDインスタンスを作成し、次にJavaBeansオブジェクトを作成するには、次のコードを使用します。
CassandraJavaPairRDD<Integer, Person> idPersonRdd = CassandraJavaUtil.javaFunctions(sc)
.cassandraTable("my_keyspace", "my_table", mapColumnTo(Integer.class), mapRowTo(Person.class))
.select("id", "first_name", "last_name", "birthdate", "email");
Javaアプリケーション内のCassandraへのデータの保存
RDDからCassandraにデータを保存するには、writerBuilder
メソッドをCassandraJavaRDDインスタンスに対して呼び出し、キースペースとテーブル名、そして任意でカラムまたは行の型マッピング情報を渡します。
CassandraJavaUtil.javaFunctions(personRdd)
.writerBuilder("my_keyspace", "my_table", mapToRow(Person.class)).saveToCassandra();