Cassandra特有のプロパティの設定

Spark Cassandra Connectorのオプションを使用して、DataStax Enterprise Sparkを構成します。

Sparkの統合では、内部でSpark Cassandra Connectorを使用します。DataStax Enterprise Sparkの構成には、プロジェクトで定義されている構成オプションを使用できます。Sparkでは、spark.プレフィックスを持つシステム・プロパティが認識され、構成オブジェクトの作成時にこれらのプロパティがそのオブジェクトに暗黙的に追加されます。SparkConfコンストラクターのloadDefaultsパラメーターにfalseを渡すことによって、構成オブジェクトにシステム・プロパティが追加されるのを防ぐことができます。

Spark、Sparkシェル、その他のDataStax Enterprise Spark組み込みアプリケーションに設定を渡すには、Sparkのドキュメントで説明する、中間アプリケーションのspark-submitを使用します。

Sparkシェルの構成

以下の構文を使用して、Spark構成引数を渡します。
dse spark [submission_arguments] [application_arguments]
ここで、submission_argumentsは次のとおりです。
  • --properties-file path_to_properties_file

    構成設定が含まれているプロパティ・ファイルの場所。デフォルトでは、Sparkはconf/spark-defaults.confから設定を読み込みます。

  • --executor-memory memory

    各マシンでアプリケーションに割り当てるメモリーの容量。k、m、gのいずれかのサフィックスを使用して、メモリー引数をJVM形式で指定できます。

  • --total-executor-cores cores

    アプリケーションが使用するコアの総数。

  • --conf name=value

    Spark構成に対する任意のSparkオプション。プレフィックスsparkが付きます。

  • --help

    DataStax Enterprise Sparkシェル・オプション以外のすべてのオプションを含むヘルプ・メッセージを表示します。

  • -- jars <additional-jars>

    追加JARファイルへのパスのコンマ区切りリスト。

  • --verbose

    どの引数がSpark構成オプションとして認識され、どの引数がSparkシェルに転送されるかを表示します。

Sparkシェル・アプリケーション引数:
  • -i file

    指定されたファイルからスクリプトを実行します。

Sparkアプリケーションの構成

Sparkサブミット引数を渡すには、以下の構文を使用します。
dse spark-submit [submission_arguments] application_file [application_arguments]
すべてのsubmission_argumentsおよびこれらの追加のspark-submit submission_arguments:
  • -- class class_name

    アプリケーション・メイン・クラスの完全な名前。

  • -- name name

    Spark Webアプリケーションに表示されるアプリケーション名。

  • -- py-files files

    Pythonアプリケーション向けにPYTHONPATHで設定される、.zip、.egg、.pyの各ファイルのコンマ区切りリスト。

  • -- files files

    エグゼキューター間で配布され、アプリケーションで使用できるファイルのコンマ区切りリスト。

  • -- master master_URL

    SparkマスターのURL。

通常、Sparkサブミット引数は、システム・プロパティである-Dname=valueやclasspathなどのVMパラメーターに変換されます。アプリケーション引数はアプリケーションに直接渡されます。

Spark構成オブジェクト 

com.datastax.bdp.spark.DseSparkContextクラスを使用してSparkコンテキスト・オブジェクトを作成し、DataStax Enterpriseクラスターに接続します。DseSparkContextクラスは機能的にはorg.apache.spark.SparkContextと同じです。

import com.datastax.bdp.spark.DseSparkContext 
import org.apache.spark.SparkConf 

object ConfigurationExample extends App { 

def createSparkContext() = { 
val conf = new SparkConf()
/* set the app name here or by using the --name option when
you submit the app */
.setAppName("Configuration example") 
.forDse 

new DseSparkContext.apply(conf) 
  } 

val sc = createSparkContext() 

  // ... 

sc.stop() 
} 

プロパティ・リスト 

以下のCassandra特有のプロパティが認識されます。

spark.cassandra.keyspace
Spark SQL向けのデフォルトのキースペース。
spark.cassandra.connection.native.port
デフォルトは9042です。ネイティブ・クライアント・プロトコル接続に使用するポート。
spark.cassandra.connection.rpc.port
デフォルトは9160です。Thrift接続に使用するポート。
spark.cassandra.connection.host
Thrift RPCサービスとネイティブ・トランスポートが関連付けられているホスト名またはIPアドレス。このプロパティのデフォルト値は、cassandra.yamlのrpc_addressプロパティ(デフォルトでlocalhost)によって決められます。

読み取りプロパティ

spark.cassandra.input.split.size
デフォルトは100000です。単一のSparkパーティションに含まれる行の数の概数。この値が大きいほど、作成されるSparkタスクの数は少なくなります。値を大きくしすぎると、並列処理が制限される可能性があります。
spark.cassandra.input.fetch.size_in_rows
デフォルトは1000です。Cassandraへの往復ごとにフェッチされる行の数。この値を増やすと、メモリーの消費量も増加します。値を減らすと、往復回数が増加します。以前のリリースでは、このプロパティはspark.cassandra.input.page.row.sizeでした。
spark.cassandra.input.consistency.level
デフォルトはLOCAL_ONEです。読み取り時に使用する整合性レベル。

書き込みプロパティ

以下のプロパティをSparkConfで設定して、保存プロセスを微調整できます。

spark.cassandra.output.batch.size.bytes

デフォルトはautoです。1バッチあたりのバイト数。デフォルトのautoは、コネクターがデータ量に基づいてバイト数を調整することを意味します。

spark.cassandra.output.consistency.level
デフォルトはLOCAL_ONEです。書き込み時に使用する整合性レベル。
spark.cassandra.output.concurrent.writes
デフォルトは5です。1つのSparkタスクによって並列実行されるバッチの最大数。
spark.cassandra.output.batch.size.rows

デフォルトは64Kです。バッチの最大合計サイズ(単位はバイト)。

上記以外の下位プロパティの詳細については、Spark Cassandra Connectorのドキュメントを参照してください。