Spark JVMs and memory management 

Spark jobs running on DataStax Enterprise are divided among several different JVM processes.

Spark jobs running on DataStax Enterprise are divided among several different JVM processes, each with different memory requirements.

Cassandra and Spark Master JVMs 

The Spark Master runs in the same process as Cassandra, but its memory usage is negligible. The only way Spark could cause an OutOfMemoryError in Cassandra is indirectly by executing Cassandra queries that fill the client request queue. For example, if it ran a query with a high limit and paging was disabled or it used a very large batch to update or insert data in a table. This is controlled by MAX_HEAP_SIZE in cassandra-env.sh. If you see an OutOfMemoryError in system.log, you should treat it as a standard Cassandra OutOfMemoryError and follow the usual troubleshooting steps.

Spark executor JVMs 

The Spark executor is where Spark performs transformations and actions on the RDDs and is usually where a Spark-related OutOfMemoryError would occur. An OutOfMemoryError in an executor will show up in the stderr log for the currently executing application (usually in /var/lib/spark). There are several configuration settings that control executor memory and they interact in complicated ways.

  • SPARK_WORKER_MEMORY in spark-env.sh is the maximum amount of memory to give all executors for all applications running on a particular node.
  • initial_spark_worker_resources in dse.yaml is used to automatically calculate SPARK_WORKER_MEMORY if it is commented out (as it is by default). It uses the following formula:

    initial_spark_worker_resources * (total system memory - memory assigned to Cassandra)

  • spark.executor.memory is a system property that controls how much executor memory a specific application gets. It must be less than or equal to SPARK_WORKER_MEMORY. It can be specified in the constructor for the SparkContext in the driver application, or via --conf spark.executor.memory or --executor-memory command line options when submitting the job using spark-submit.
The default location of the spark-env.sh file depends on the type of installation:
Installer-Services and Package installations /etc/dse/spark/spark-env.sh
Installer-No Services and Tarball installations install_location/resources/spark/conf/spark-env.sh
The location of the dse.yaml file depends on the type of installation:
Installer-Services /etc/dse/dse.yaml
Package installations /etc/dse/dse.yaml
Installer-No Services install_location/resources/dse/conf/dse.yaml
Tarball installations install_location/resources/dse/conf/dse.yaml
The location of the cassandra-env.sh file depends on the type of installation:
Package installations /etc/dse/cassandra/cassandra-env.sh
Tarball installations install_location/resources/cassandra/conf/cassandra-env.sh

The client driver JVM 

The driver is the client program for the Spark job. Normally it shouldn't need very large amounts of memory because most of the data should be processed within the executor. If it does need more than a few gigabytes, your application may be using an anti-pattern like pulling all of the data in an RDD into a local data structure by using collect or take. Generally you should never use collect in production code and if you use take, you should be only taking a few records. If the driver runs out of memory, you will see the OutOfMemoryError in the driver stderr or wherever it's been configured to log. This is controlled one of two places:

  • SPARK_DRIVER_MEMORY in spark-env.sh
  • spark.driver.memory system property which can be specified via --conf spark.driver.memory or --driver-memory command line options when submitting the job using spark-submit. This cannot be specified in the SparkContext constructor because by that point, the driver has already started.

Spark worker JVMs 

The worker is a watchdog process that spawns the executor, and should never need its heap size increased. The worker's heap size is controlled by SPARK_DAEMON_MEMORY in spark-env.sh. SPARK_DAEMON_MEMORY also affects the heap size of the Spark SQL thrift server.