Setting Cassandra-specific properties

Use the Spark Cassandra Connector options to configure DataStax Enterprise Spark.

Spark integration uses the Spark Cassandra Connector 1.1 under the hood. You can use the configuration options defined in that project to configure DataStax Enterprise Spark. Spark recognizes system properties having the spark. prefix and adds the properties to the configuration object implicitly upon creation. You can avoid adding system properties to the configuration object by passing false for the loadDefaults parameter in the SparkConf constructor.

You pass settings for Spark, Spark Shell, and other DSE Spark built-in applications using the intermediate application spark-submit, described in Spark 1.1 documentation.

Configuring the Spark shell

You pass Spark configuration arguments using the following syntax:
dse spark[-with-cc] [submission arguments] [application arguments]
  • Submission arguments:
    • --properties-file <path-to-properties-file>

      The location of the properties file having the configuration settings. By default, Spark loads the settings from conf/spark-defaults.conf.

    • --executor-memory <memory>.

      How much memory to allocate on each machine for the application. You can provide the memory argument in JVM format using either the k, m, or g suffix.

    • --total-executor-cores <cores>

      The total number of cores the application uses

    • --conf name=value

      An arbitrary Spark option to the Spark configuration prefixed by spark.

    • --help

      Shows a help message that displays all options except DataStax Enterprise Spark shell options.

    • -- jars <additional-jars>

      A comma-separated list of paths to additional jar files.

    • --verbose

      Displays which arguments are recognized as Spark configuration options and which are forwarded to the Spark Shell.

  • Spark shell application arguments:
    • -i <file>

      Runs a script from the specified file.

Configuring Spark applications

You pass the Spark submission arguments using the following syntax:
dse spark-submit[-with-cc] [submission arguments] <application file> [application arguments]
  • All the submission arguments listed in the previous section, and additionally:
    • -- class <class-name>

      The full name of the application main class

    • -- name <name>

      The application name as displayed in the Spark web-app

    • -- py-files <files>

      A comma-separated list of the .zip, .egg or .py files, which will be set on PYTHONPATH for Python applications

    • -- files <files>

      A comma-separated list files that are distributed among the executors and available for the application.

    • -- master <master URL>

      The URL of the Spark master.

  • Application file, a jar or py file that contains the application to run

    Passed without any control argument; acts as a separator between Spark configuration arguments and custom application arguments.

In general, Spark submission arguments are translated into system properties -Dname=value and other VM params like classpath. The application arguments are passed directly to the application.

Spark configuration object

The forDse method applied on Spark configuration object makes the Spark configuration DSE-compatible, which is required for most DSE Spark applications. The forDse method converts all the DSE-compatible Cassandra settings into Spark Cassandra Connector-compatible settings. For example, in DataStax Enterprise, you typically pass cassandra.username to specify the user when authentication is enabled, while in the Connector, you need to use spark.cassandra.auth.username instead of cassandra.username. A table of properties lists settings for configuring the Cassandra connection in DSE Spark environment. The following simple example shows how to use the DseSparkConfHelper:

import com.datastax.bdp.spark.DseSparkConfHelper._ 
import org.apache.spark.{SparkConf, SparkContext} 

object ConfigurationExample extends App { 

  def createSparkContext() = { 
    val conf = new SparkConf()
     /* set the app name here or by using the --name option when
        you submit the app */
      .setAppName("Configuration example") 
      .forDse 

    new SparkContext(conf) 
  } 

  val sc = createSparkContext() 

  // ... 

  sc.stop() 
} 

The main jar file is added automatically to the configuration. For some operations, such as CFS access, Spark uses Hadoop. Spark creates the Hadoop configuration during initialization of SparkContext. The Hadoop configuration contains all the options provided in DSE Hadoop configuration files. To customize the Hadoop configuration on the application level, add the configuration entries you want to set or change to Spark configuration object, prepending them with spark.hadoop. prefix. Using this prefix designates the SparkContext as a Hadoop option that needs to be added to Hadoop configuration used by all the executors. The prefix is removed automatically before applying the option to the configuration. To get the SparkContext to use the Hadoop configuration object, access the hadoopConfiguration field of the SparkContext.

The forDse method takes the prefixed Cassandra specific properties from your SparkConf object and copies the properties, so that the parameters are properly passed to Hadoop configuration and then used by CFS connections. For example, assume that cassandra.username is set in SparkConf. By applying forDse, cassandra.username is replicated for use by the connector as spark.cassandra.auth.username and for use by CFS as spark.hadoop.cassandra.username.

Property list

The following key Cassandra-specific properties are recognized:

spark.cassandra.keyspace
The default keyspace for Spark SQL.
spark.cassandra.connection.native.port
Default = 9042. Port for native client protocol connections.
spark.cassandra.connection.rpc.port
Default = 9160. Port for thrift connections.
spark.cassandra.connection.host
The host name or IP address to which the Thrift RPC service and native transport is bound. The rpc_address property in the cassandra.yaml, which is localhost by default, determines the default value of this property.

Read properties

spark.cassandra.input.split.size
Default = 100000. Approximate number of rows in a single Spark partition. The higher the value, the fewer Spark tasks are created. Increasing the value too much may limit the parallelism level.
spark.cassandra.input.page.row.size
Default = 1000. Number of rows being fetched per roundtrip to Cassandra. Increasing this value increases memory consumption. Decreasing the value increases the number of roundtrips.
spark.cassandra.input.consistency.level
Default = LOCAL_ONE. Consistency level to use when reading.

Write properties

You can set the following properties in SparkConf to fine tune the saving process.

spark.cassandra.output.batch.size.bytes

Default = auto. Number of bytes per single batch. The default, auto, means the connector adjusts the number of bytes based on the amount of data.

spark.cassandra.output.consistency.level
Default = LOCAL_ONE. Consistency level to use when writing.
spark.cassandra.output.concurrent.writes
Default = 5. Maximum number of batches executed in parallel by a single Spark
task.
spark.cassandra.output.batch.size.rows

Default = 64K. The maximum total size of the batch in bytes.

Connector 1.1 documentation describes additional, low-level properties.

The location of the cassandra.yaml file depends on the type of installation:
Package installations /etc/cassandra/cassandra.yaml
Tarball installations install_location/resources/cassandra/conf/cassandra.yaml

Running Spark commands against a remote cluster 

To run Spark commands against a remote cluster, you must copy your Hadoop configuration files from one of the remote nodes to the local client machine.

The default location of the Hadoop configuration files depends on the type of installation:
Installer-Services and Package installations /etc/dse/hadoop/
Installer-No Services and Tarball installations install_location/resources/hadoop/conf/