Setting Cassandra-specific properties
Use the Spark Cassandra Connector options to configure DataStax Enterprise Spark.
Spark integration uses the Spark Cassandra Connector 1.1 under the hood. You can use the configuration options defined in that project to configure DataStax Enterprise Spark. Spark recognizes system properties having the spark. prefix and adds the properties to the configuration object implicitly upon creation. You can avoid adding system properties to the configuration object by passing false for the loadDefaults parameter in the SparkConf constructor.
You pass settings for Spark, Spark Shell, and other DSE Spark built-in applications using the intermediate application spark-submit, described in Spark 1.1 documentation.
Configuring the Spark shell
dse spark[-with-cc] [submission arguments] [application arguments]
- Submission arguments:
- --properties-file <path-to-properties-file>
The location of the properties file having the configuration settings. By default, Spark loads the settings from conf/spark-defaults.conf.
- --executor-memory <memory>.
How much memory to allocate on each machine for the application. You can provide the memory argument in JVM format using either the k, m, or g suffix.
- --total-executor-cores <cores>
The total number of cores the application uses
- --conf name=value
An arbitrary Spark option to the Spark configuration prefixed by spark.
- --help
Shows a help message that displays all options except DataStax Enterprise Spark shell options.
- -- jars <additional-jars>
A comma-separated list of paths to additional jar files.
- --verbose
Displays which arguments are recognized as Spark configuration options and which are forwarded to the Spark Shell.
- --properties-file <path-to-properties-file>
- Spark shell application arguments:
- -i <file>
Runs a script from the specified file.
- -i <file>
Configuring Spark applications
dse spark-submit[-with-cc] [submission arguments] <application file> [application arguments]
- All the submission arguments listed in the previous section, and additionally:
-
- -- class <class-name>
The full name of the application main class
- -- name <name>
The application name as displayed in the Spark web-app
- -- py-files <files>
A comma-separated list of the .zip, .egg or .py files, which will be set on PYTHONPATH for Python applications
- -- files <files>
A comma-separated list files that are distributed among the executors and available for the application.
- -- master <master URL>
The URL of the Spark master.
- -- class <class-name>
- Application file, a jar or py file that contains the application to run
Passed without any control argument; acts as a separator between Spark configuration arguments and custom application arguments.
In general, Spark submission arguments are translated into system properties
-Dname=value
and other VM params like classpath. The application arguments
are passed directly to the application.
Spark configuration object
The forDse method applied on Spark configuration object makes the Spark configuration DSE-compatible, which is required for most DSE Spark applications. The forDse method converts all the DSE-compatible Cassandra settings into Spark Cassandra Connector-compatible settings. For example, in DataStax Enterprise, you typically pass cassandra.username to specify the user when authentication is enabled, while in the Connector, you need to use spark.cassandra.auth.username instead of cassandra.username. A table of properties lists settings for configuring the Cassandra connection in DSE Spark environment. The following simple example shows how to use the DseSparkConfHelper:
import com.datastax.bdp.spark.DseSparkConfHelper._ import org.apache.spark.{SparkConf, SparkContext} object ConfigurationExample extends App { def createSparkContext() = { val conf = new SparkConf() /* set the app name here or by using the --name option when you submit the app */ .setAppName("Configuration example") .forDse new SparkContext(conf) } val sc = createSparkContext() // ... sc.stop() }
The main jar file is added automatically to the configuration. For some operations, such as CFS access, Spark uses Hadoop. Spark creates the Hadoop configuration during initialization of SparkContext. The Hadoop configuration contains all the options provided in DSE Hadoop configuration files. To customize the Hadoop configuration on the application level, add the configuration entries you want to set or change to Spark configuration object, prepending them with spark.hadoop. prefix. Using this prefix designates the SparkContext as a Hadoop option that needs to be added to Hadoop configuration used by all the executors. The prefix is removed automatically before applying the option to the configuration. To get the SparkContext to use the Hadoop configuration object, access the hadoopConfiguration field of the SparkContext.
The forDse method takes the prefixed Cassandra specific properties from your SparkConf object and copies the properties, so that the parameters are properly passed to Hadoop configuration and then used by CFS connections. For example, assume that cassandra.username is set in SparkConf. By applying forDse, cassandra.username is replicated for use by the connector as spark.cassandra.auth.username and for use by CFS as spark.hadoop.cassandra.username.
Property list
The following key Cassandra-specific properties are recognized:
- spark.cassandra.keyspace
- The default keyspace for Spark SQL.
- spark.cassandra.connection.native.port
- Default = 9042. Port for native client protocol connections.
- spark.cassandra.connection.rpc.port
- Default = 9160. Port for thrift connections.
- spark.cassandra.connection.host
- The host name or IP address to which the Thrift RPC service and native transport is bound. The rpc_address property in the cassandra.yaml, which is localhost by default, determines the default value of this property.
Read properties
- spark.cassandra.input.split.size
- Default = 100000. Approximate number of rows in a single Spark partition. The higher the value, the fewer Spark tasks are created. Increasing the value too much may limit the parallelism level.
- spark.cassandra.input.page.row.size
- Default = 1000. Number of rows being fetched per roundtrip to Cassandra. Increasing this value increases memory consumption. Decreasing the value increases the number of roundtrips.
- spark.cassandra.input.consistency.level
- Default = LOCAL_ONE. Consistency level to use when reading.
Write properties
You can set the following properties in SparkConf to fine tune the saving process.
- spark.cassandra.output.batch.size.bytes
-
Default = auto. Number of bytes per single batch. The default, auto, means the connector adjusts the number of bytes based on the amount of data.
- spark.cassandra.output.consistency.level
- Default = LOCAL_ONE. Consistency level to use when writing.
- spark.cassandra.output.concurrent.writes
- Default = 5. Maximum number of batches executed in parallel by a single Spark
- spark.cassandra.output.batch.size.rows
-
Default = 64K. The maximum total size of the batch in bytes.
Connector 1.1 documentation describes additional, low-level properties.
Package installations | /etc/cassandra/cassandra.yaml |
Tarball installations | install_location/resources/cassandra/conf/cassandra.yaml |
Running Spark commands against a remote cluster
To run Spark commands against a remote cluster, you must copy your Hadoop configuration files from one of the remote nodes to the local client machine.
Installer-Services and Package installations | /etc/dse/hadoop/ |
Installer-No Services and Tarball installations | install_location/resources/hadoop/conf/ |