Spark Cassandra Connector Java APIの概要

Spark Cassandra Connector Java APIを使用すると、データベース・データの分析にSparkを使用するJavaアプリケーションを作成できます。

Spark Cassandra Connector Java APIを使用すると、データベース・データの分析にSparkを使用するJavaアプリケーションを作成できます。 GitHubで、Spark Cassandra Connectorの「Javaドキュメント」を参照してください。 DataStax Enterpriseによって使用されるSpark Cassandra Connectorの最新バージョンについては、コンポーネントのバージョンを確認してください。

SBTビルド・ファイルでのJava APIの使用

以下のライブラリ依存関係をSBTビルド・ファイル(build.sbtなど)に追加します。

libraryDependencies ++= Seq(
  "com.datastax.dse" % "dse-spark-dependencies" % dseVersion % "provided"  excludeAll (
    ExclusionRule("com.datastax.dse", "dse-java-driver-core"),
    ExclusionRule("org.apache.solr", "solr-solrj")
  ),
  "com.datastax.dse" % "dse-java-driver-core" % "1.2.3",
  "org.apache.solr" % "solr-solrj" % "6.0.1"
)

プロジェクトのテンプレートの例については、次のURLを参照してください。 https://github.com/datastax/SparkBuildExamples

Mavenビルド・ファイルでのJava APIの使用

以下の依存関係をpom.xmlファイルに追加します。

    <dependency>
      <groupId>com.datastax.dse</groupId>
      <artifactId>dse-spark-dependencies</artifactId>
      <version>${dse.version}</version>
      <scope>provided</scope>
      <exclusions>
        <exclusion>
          <groupId>com.datastax.dse</groupId>
          <artifactId>dse-java-driver-core</artifactId>
        </exclusion>
        <exclusion>
          <groupId>org.apache.solr</groupId>
          <artifactId>solr-solrj</artifactId>
        </exclusion>
      </exclusions>
    </dependency>

次に、DataStaxリポジトリを追加します。

  <repositories>
    <repository>
      <id>DataStax-Repo</id>
      <url>https://datastax.artifactoryonline.com/datastax/public-repos/</url>
    </repository>
  </repositories>

プロジェクトのテンプレートの例については、次のURLを参照してください。 https://github.com/datastax/SparkBuildExamples

Scalaアプリケーション内のデータベース・データへのアクセス

テーブル・データに対してSparkアクションを実行するには、まず、RDDオブジェクトを取得します。RDDオブジェクトを作成するには、後のSparkコンテキスト・オブジェクトの作成に使用されるSpark構成オブジェクトを作成します。

import com.datastax.spark.connector._
val conf = new SparkConf(true)
   .set("spark.cassandra.connection.host", "127.0.0.1")
val sc = new SparkContext("dse://127.0.0.1:7077", "test", conf)
val rdd = sc.cassandraTable("my_keyspace", "my_table")

Scalaアプリケーションのデータベースにデータを保存するには、saveToCassandraメソッドを使用して、キースペース、テーブル、およびマッピング情報を渡します。

val collection = sc.parallelize(Seq(("key3", 3), ("key4", 4)))
collection.saveToCassandra("my_keyspace", "my_table", SomeColumns("key", "value"))

ScalaアプリケーションでDSE Graphクエリーを実行するには、CassandraConnectorセッションをcom.datastax.driver.dse.DseSessionにキャストしてから、executeGraphメソッドを使用してグラフ・ステートメントを実行します。

val session = CassandraConnector(sc.getConf).withSessionDo(session => session.asInstanceOf[DseSession])
session.executeGraph(graph statement)

Javaアプリケーション内のデータベース・データへのアクセス

テーブル・データに対してSparkアクションを実行するには、まず、JavaRDDクラスのサブクラスであるCassandraJavaRDDオブジェクトを取得します。CassandraJavaRDDは、Scalaアプリケーションで使用されるCassandraRDDオブジェクトのJava言語版です。

CassandraJavaRDDオブジェクトを作成するには、後のSparkコンテキスト・オブジェクトの作成に使用されるSpark構成オブジェクトを作成します。

SparkConf conf = new SparkConf()
                .setAppName( "My application");
SparkContext sc = new SparkContext(conf);
dse-spark-version.jarファイルのデフォルトの場所は、インストールのタイプによって異なります。

パッケージ・インストールInstaller-Servicesインストール

/usr/share/dse/dse-spark-version.jar

tarボール・インストールInstaller-No Servicesインストール

installation_location/lib/dse-spark-version.jar

CassandraJavaRDDインスタンスの取得と操作には、com.datastax.spark.connector.japi.CassandraJavaUtilクラスの静的メソッドを使用します。新しいCassandraJavaRDDインスタンスを取得するには、CassandraJavaUtilのいずれかのjavaFunctionsメソッドを呼び出し、コンテキスト・オブジェクトを渡してから、cassandraTableメソッドを呼び出して、キースペース、テーブル名、およびマッピング・クラスを渡します。

JavaRDDstring cassandraRdd = CassandraJavaUtil.javaFunctions(sc)
        .cassandraTable("my_keyspace", "my_table", .mapColumnTo(String.class))
        .select("my_column");

Java型へのカラム・データのマッピング

テーブル行の1つのカラムのJava型を指定するには、CassandraJavaRDD<T>インスタンスの作成時に型を指定し、mapColumnToメソッドを呼び出して、型を渡します。次に、selectメソッドを呼び出してカラム名を設定します。

JavaRDD<Integer> cassandraRdd = CassandraJavaUtil.javaFunctions(sc)
        .cassandraTable("my_keyspace", "my_table", .mapColumnTo(Integer.class))
        .select("column1");

JavaBeansクラスは、mapRowToメソッドを使用してマッピングできます。JavaBeansプロパティ名は、デフォルトのマッピング・ルールに従ってカラム名に対応させる必要があります。たとえば、firstNameプロパティは、デフォルトでfirst_nameカラム名にマッピングされます。

JavaRDD<Person> personRdd = CassandraJavaUtil.javaFunctions(sc)
                .cassandraTable("my_keyspace", "my_table", mapRowTo(Person.class));

CassandraJavaPairRDD<T, T>インスタンスは、JavaPairRDDクラスの拡張であり、行とカラムに関して前出の例と同様にマッピング読み取り側が存在します。これらのペアRDDは、通常、キー/値のペアに使用され、最初の型がキー、2番目の型が値になります。

キーと値の両方に1つのカラムをマッピングする場合は、mapColumnToを呼び出して、キーと値の型を指定してから、selectメソッドを呼び出し、キーと値のカラム名を渡します。

CassandraJavaPairRDD<Integer, String> pairRdd = CassandraJavaUtil.javaFunctions(sc)
        .cassandraTable("my_keyspace", "my_table", mapColumnTo(Integer.class), mapColumnTo(String.class))
        select("id", "first_name");

mapRowToメソッドは、行データをJava型にマッピングするために使用します。たとえば、プライマリ・キーを使用してペアRDDインスタンスを作成し、次にJavaBeansオブジェクトを作成するには、次のコードを使用します。

CassandraJavaPairRDD<Integer, Person> idPersonRdd = CassandraJavaUtil.javaFunctions(sc)
        .cassandraTable("my_keyspace", "my_table", mapColumnTo(Integer.class), mapRowTo(Person.class))
        .select("id", "first_name", "last_name", "birthdate", "email");

Javaアプリケーションでのデータベースへのデータの保存

RDDからデータベースにデータを保存するには、writerBuilderメソッドをCassandraJavaRDDインスタンスに対して呼び出し、キースペースとテーブル名、そして任意でカラムまたは行の型マッピング情報を渡します。

CassandraJavaUtil.javaFunctions(personRdd)
                .writerBuilder("my_keyspace", "my_table", mapToRow(Person.class)).saveToCassandra();