Cassandra特有のプロパティの設定
Spark Cassandra Connectorのオプションを使用して、DataStax Enterprise Sparkを構成します。
Sparkの統合では、内部でSpark Cassandra Connectorを使用します。DataStax Enterprise Sparkの構成には、プロジェクトで定義されている構成オプションを使用できます。Sparkでは、spark.プレフィックスを持つシステム・プロパティが認識され、構成オブジェクトの作成時にこれらのプロパティがそのオブジェクトに暗黙的に追加されます。SparkConfコンストラクターのloadDefaultsパラメーターにfalseを渡すことによって、構成オブジェクトにシステム・プロパティが追加されるのを防ぐことができます。
Spark、Sparkシェル、その他のDataStax Enterprise Spark組み込みアプリケーションに設定を渡すには、Sparkのドキュメントで説明する、中間アプリケーションのspark-submitを使用します。
Sparkシェルの構成
dse spark [submission_arguments] [application_arguments]
--properties-file path_to_properties_file
構成設定が含まれているプロパティ・ファイルの場所。デフォルトでは、Sparkはconf/spark-defaults.confから設定を読み込みます。
--executor-memory memory
各マシンでアプリケーションに割り当てるメモリーの容量。k、m、gのいずれかのサフィックスを使用して、メモリー引数をJVM形式で指定できます。
--total-executor-cores cores
アプリケーションが使用するコアの総数。
--conf name=value
Spark構成に対する任意のSparkオプション。プレフィックス
spark
が付きます。--help
DataStax Enterprise Sparkシェル・オプション以外のすべてのオプションを含むヘルプ・メッセージを表示します。
-- jars <additional-jars>
追加JARファイルへのパスのコンマ区切りリスト。
--verbose
どの引数がSpark構成オプションとして認識され、どの引数がSparkシェルに転送されるかを表示します。
-i file
指定されたファイルからスクリプトを実行します。
Sparkアプリケーションの構成
dse spark-submit [submission_arguments] application_file [application_arguments]
spark-submit submission_arguments
:-- class class_name
アプリケーション・メイン・クラスの完全な名前。
-- name name
Spark Webアプリケーションに表示されるアプリケーション名。
-- py-files files
Pythonアプリケーション向けにPYTHONPATHで設定される、.zip、.egg、.pyの各ファイルのコンマ区切りリスト。
-- files files
エグゼキューター間で配布され、アプリケーションで使用できるファイルのコンマ区切りリスト。
-- master master_URL
SparkマスターのURL。
通常、Sparkサブミット引数は、システム・プロパティである-Dname=value
やclasspathなどのVMパラメーターに変換されます。アプリケーション引数はアプリケーションに直接渡されます。
Spark構成オブジェクト
com.datastax.bdp.spark.DseSparkContextクラスを使用してSparkコンテキスト・オブジェクトを作成し、DataStax Enterpriseクラスターに接続します。DseSparkContextクラスは機能的にはorg.apache.spark.SparkContextと同じです。
import com.datastax.bdp.spark.DseSparkContext
import org.apache.spark.SparkConf
object ConfigurationExample extends App {
def createSparkContext() = {
val conf = new SparkConf()
/* set the app name here or by using the --name option when
you submit the app */
.setAppName("Configuration example")
.forDse
new DseSparkContext.apply(conf)
}
val sc = createSparkContext()
// ...
sc.stop()
}
プロパティ・リスト
以下のCassandra特有のプロパティが認識されます。
- spark.cassandra.keyspace
- Spark SQL向けのデフォルトのキースペース。
- spark.cassandra.connection.native.port
- デフォルトは9042です。ネイティブ・クライアント・プロトコル接続に使用するポート。
- spark.cassandra.connection.rpc.port
- デフォルトは9160です。Thrift接続に使用するポート。
- spark.cassandra.connection.host
- Thrift RPCサービスとネイティブ・トランスポートが関連付けられているホスト名またはIPアドレス。このプロパティのデフォルト値は、cassandra.yamlのrpc_addressプロパティ(デフォルトでlocalhost)によって決められます。
読み取りプロパティ
- spark.cassandra.input.split.size
- デフォルトは100000です。単一のSparkパーティションに含まれる行の数の概数。この値が大きいほど、作成されるSparkタスクの数は少なくなります。値を大きくしすぎると、並列処理が制限される可能性があります。
- spark.cassandra.input.fetch.size_in_rows
- デフォルトは1000です。Cassandraへの往復ごとにフェッチされる行の数。この値を増やすと、メモリーの消費量も増加します。値を減らすと、往復回数が増加します。以前のリリースでは、このプロパティはspark.cassandra.input.page.row.sizeでした。
- spark.cassandra.input.consistency.level
- デフォルトはLOCAL_ONEです。読み取り時に使用する整合性レベル。
書き込みプロパティ
以下のプロパティをSparkConfで設定して、保存プロセスを微調整できます。
- spark.cassandra.output.batch.size.bytes
-
デフォルトはautoです。1バッチあたりのバイト数。デフォルトのautoは、コネクターがデータ量に基づいてバイト数を調整することを意味します。
- spark.cassandra.output.consistency.level
- デフォルトはLOCAL_ONEです。書き込み時に使用する整合性レベル。
- spark.cassandra.output.concurrent.writes
- デフォルトは5です。1つのSparkタスクによって並列実行されるバッチの最大数。
- spark.cassandra.output.batch.size.rows
-
デフォルトは64Kです。バッチの最大合計サイズ(単位はバイト)。
上記以外の下位プロパティの詳細については、Spark Cassandra Connectorのドキュメントを参照してください。