Setting Cassandra-specific properties

Use the Spark Cassandra Connector options to configure DataStax Enterprise Spark.

DataStax Enterprise Spark integration uses the Spark Cassandra Connector under the hood. You can use the configuration options defined in that project to configure DataStax Enterprise Spark. Spark recognizes system properties having the spark. preface and adds the properties to the configuration object implicitly upon creation. You can avoid adding system properties to the configuration object by passing false for the loadDefaults parameter in the SparkConf constructor.

You pass settings for Spark Shell and other DSE Spark built-in applications and tools in the same way as you normally pass system properties to JVM. You manually set these properties in the Spark configuration object definition of your own application. However, DataStax recommends using the DseSparkConfHelper method to gather application command line arguments, recognize the system property-style parameters (-Dname=value), and configure the system properties (setSystemPropertiesFromArgs method).

The forDse method applied on Spark configuration object makes the Spark configuration DSE-compatible, which is required for most DSE Spark applications. The forDse method converts all the DSE-compatible Cassandra settings into Spark Cassandra Connector-compatible settings. For example, in DataStax Enterprise, you typically pass cassandra.username to specify the user when authentication is enabled, while in the Connector, you need to use spark.cassandra.auth.username instead of cassandra.username. A table of properties lists settings for configuring the Cassandra connection in DSE Spark environment. The following simple example shows how to use the DseSparkConfHelper:

import com.datastax.bdp.spark.DseSparkConfHelper._ 
import org.apache.spark.{SparkConf, SparkContext} 

object ConfigurationExample extends App { 

  def createSparkContext() = { 
    /* here we get the application arguments, recognize which of them are
   system property-like, set these system properties, and finally filter them out; the returned
   collection is without those recognized arguments */ 
    val args = setSystemPropertiesFromArgs(this.args) 
    val myJar = getClass.getProtectionDomain.getCodeSource.getLocation.getPath 

    val conf = new SparkConf()
      .setAppName("Configuration example") 
      /* we can refer to args(0) - it doesn't matter whether any
         properties are passed before Spark master address or not, 
         because these properties are filtered out from args by 
         setSystemPropertiesFromArgs method */ 
      .setMaster(args(0)) 
      .setJars(Array(myJar))
      .forDse 

    new SparkContext(conf) 
  } 

  val sc = createSparkContext() 

  // ... 

  sc.stop() 
} 

For some operations, such as CFS access, Spark uses Hadoop. Spark creates the Hadoop configuration during initialization of SparkContext. The Hadoop configuration contains all the options provided in DSE Hadoop configuration files. To customize the Hadoop configuration on the application level, add the configuration entries you want to set or change to Spark configuration object, prepending them with spark.hadoop. prefix. Using this prefix designates the SparkContext as a Hadoop option that needs to be added to Hadoop configuration used by all the executors. The prefix is removed automatically before applying the option to the configuration. To get the SparkContext to use the Hadoop configuration object, access the hadoopConfiguration field of the SparkContext.

The forDse method takes the prefixed Cassandra specific properties from your SparkConf object and copies the properties, so that the parameters are properly passed to Hadoop configuration and then used by CFS connections. For example, assume that cassandra.username is set in SparkConf. By applying forDse, cassandra.username is replicated for use by the connector as spark.cassandra.auth.username and for use by CFS as spark.hadoop.cassandra.username.

Property list

The following Cassandra-specific properties are recognized:

cassandra.connection.native.port
Default = 9042. Port for native client protocol connections.
spark.cassandra.connection.rpc.port
Default = 9160. Port for thrift connections.
spark.cassandra.connection.host
Default = Spark master address. Address of the Cassandra node to contact to obtain connections to Cassandra. Used only initially to fetch the list of other nodes in the cluster. Subsequent connections are made to the closest node.
spark.cassandra.input.split.size
Default = 100000. Approximate number of rows in a single Spark partition. The higher the value, the fewer Spark tasks are created. Increasing the value too much may limit the parallelism level.
spark.cassandra.input.page.row.size
Default = 1000. Number of rows being fetched per roundtrip to Cassandra. Increasing this value increases memory consumption. Decreasing the value increases the number of roundtrips.
spark.cassandra.username
User name for authenticating in Cassandra.
spark.cassandra.password
Password for authenticating in Cassandra.