DSE Graph and Graph Analytics

DSE Graph allows you to perform OLAP queries using Spark.

Many local graph traversals can be executed in real time at high transactional loads. When the density of the graph is too high or the branching factor too large (the number of connected nodes at each level of the graph), the memory and computation requirements to answer OLTP queries go beyond what is acceptable under typical application workloads. These type of queries are called deep queries.

Scan queries are queries that touch either an entire graph or large parts of the graph. They typically traverse a large number of vertices and edges. For example, a query on a social network graph that searches for friends of friends is a scan query.

For applications that use deep and scan queries, using a OLAP query will result in better performance.

Performing OLAP queries using DSE Graph

Every graph created in DSE Graph has an OLAP traversal source a that is available to gremlin-console and DataStax Studio. This traversal source uses the SparkGraphComputer to analyze queries and execute them against the underlying DSE Analytics nodes. The nodes must be started with Graph and Spark enabled to access the OLAP traversal source. You must connect to the Spark Master node for the datacenter by either running the console from the Spark Master or specifying the Spark Master in the hosts field of the Gremlin console yaml file. For one-off or single-session OLAP queries, alias database.a to g and create the query. For example in the Gremlin console:

:remote config alias g database.a 
g.V().count()

If you are performing multiple queries against different parts of the graph, use graph.snapshot() to return an OLAP traversal source for each part of the graph. For example, in the Gremlin console:

categories = graph.snapshot().vertices('category1', 'category2').create()

To create a snapshot, supply all the vertices the snapshot will traverse. For example, the following query touches both the Person and Address vertices.

def person = graph.snapshot().vertices('Person', 'Address').create()
person.V().hasLabel('Person').out('HAS_ADDRESS').count()

Use the conf() method on the snapshot before you call create() to set TinkerPop's SparkGraphComputer configuration options. For example, to explicitly set the storage level for the snapshot to MEMORY_ONLY:

graph.snapshot().vertices("vertexlabel_alice", "vertexlabel_bob").edges("edgelabel_carol").conf("gremlin.spark.persistStorageLevel", "MEMORY_ONLY").create()

Setting Spark properties from Gremlin

Spark properties can be set from Gremlin using the graph.configuration.setProperty method on the graph.

:remote config alias g database.a
g.graph.configuration.setProperty("property name", value)

By default, Spark applications will use all available resources on the node, so no other Spark application can run. Limit the application's resources before running OLAP traversals by setting the maximum number of cores and the amount of memory used by the traversal. This is particularly important on servers with very large amounts of cores and memory.

For example this request sets 10 executors with 1 core and 4 GB of memory each:

:remote config alias g example_graph.a
==>g=example_graph.a
g.graph.configuration.setProperty("spark.cores.max", 10)
g.graph.configuration.setProperty("spark.executor.memory", "4g")
g.graph.configuration.setProperty("spark.executor.cores", "1")

The spark.cores.max property sets the maximum number of cores used by Spark. Setting this property lower than the total number of cores limits the number of nodes on which the queries will be run. The spark.executor.memory property sets the amount of memory used for each executor. The spark.executor.cores property sets the number of cores used for each executor.

Before you configure Spark properties from Gremlin kill the currently-running Spark context from the Spark web UI. This will kill all currently running Gremlin OLAP queries. From the Spark web UI, find the application named Apache TinkerPop's Spark-Gremlin and click kill next to the Application ID.

OLAP traversals create many intermediate objects during execution. These objects are garbage-collected by the JVM, so we recommend configuring a larger pool of executors each with smaller memory and CPU resources, compared to non-graph Spark jobs which typically perform better with fewer executors with higher memory and CPU resources.

We recommend allocating executors with no more then 8 cores (1 should work in most cases) to reduce garbage collection pauses and improve OLAP traversal performance. The memory available to Spark should be equally spread among the cores. For example, if you have 3 nodes and each has 24 cores and 96 GB dedicated to Spark you have 24 * 3 = 72 cores and 96 GB * 3 = 188 GB memory. To allocate all resources you should request 72 single core executors with 4 GB of memory each:

:remote config alias g example_graph.a
==>g=example_graph.a
g.graph.configuration.setProperty("spark.cores.max", 72)
g.graph.configuration.setProperty("spark.executor.memory", "4g")
g.graph.configuration.setProperty("spark.executor.cores", "1")

Some OLAP queries and most DseGraphFrame queries use Spark SQL joins for traversals. Spark has a predefined number of partitions to perform merge joins, by default set to 200. This can create huge Spark partitions for very large graphs, which slows query execution.

To reduce the size of single partitions and improve performance increase the number of shuffle partitions by setting the spark.sql.shuffle.partitions property to a larger number. We recommend the spark.sql.shuffle.partitions is set to 2-4 times the number of Spark cluster cores. So if you have a 200 core cluster, set spark.sql.shuffle.partitions to 400 or 800.

:remote config alias g example_graph.a
==>g=example_graph.a
g.graph.configuration.setProperty("spark.sql.shuffle.partitions", 500)

DSE supports Spark dynamic resource allocation. To use dynamic resource allocation in Graph OLAP queries configure the Spark context with the following properties:

:remote config alias g gods.a
g.graph.configuration.setProperty("spark.dynamicAllocation.enabled", "true")
g.graph.configuration.setProperty("spark.shuffle.service.enabled", "true")
g.graph.configuration.setProperty("spark.shuffle.service.port", "7437")

DSE's spark.shuffle.service.port differs from the default Spark port, so you need to specify it when using the Spark context in Graph OLAP queries.

To manage idle executors timeouts, set spark.dynamicAllocation.executorIdleTimeout to the number of seconds before a timeout occurs.

g.graph.configuration.setProperty("spark.dynamicAllocation.executorIdleTimeout", "60")

If you create snapshots, the executors will not be shutdown by default. To allow the executors to be shutdown, set spark.dynamicAllocation.cachedExecutorIdleTimeout to the numebr of seconds before a timeout occurs:

g.graph.configuration.setProperty("spark.dynamicAllocation.cachedExecutorIdleTimeout", "60")

When to use analytic OLAP queries

On large graphs, OLAP queries typically perform better for deep queries. However, executing deep queries as part of an OLTP load may make sense if they are rarely performed. For example, on online payment provider will favor OLTP queries to process payments quickly, but may require a deep query if there are indications of fraud in the transaction. While the deep query may take much longer as an OLTP workload, on the whole the performance of the application will be faster than segmenting the application into OLTP and OLAP queries.

Long running and periodic processes like recommendation engines and search engines that analyze an entire graph are the ideal use cases for OLAP queries. However, one-off data analysis operations that involve deep queries or that scan the entire database also can benefit from being run as OLAP queries. See DSE Graph, OLTP, and OLAP for detailed information on performance differences between OLTP and OLAP queries.

Best practices for deleting large numbers of edges and vertices

When deleting large numbers of edges or vertices from a graph, you may end up getting error messages in subsequent queries due the large number of tombstones left in the database before they are automatically removed.

The log entries for such errors resemble the following:

ERROR [ReadStage-32] 2017-05-18 11:18:29,289  StorageProxy.java:2068 - Scanned over 100001 tombstones during query 'SELECT * FROM t33215.PhoneNumber_p WHERE token(community_id) > -7331398285705078207 AND token(community_id) <= -6858404847917653807 LIMIT 1000' (last scanned row partion key was ((216134144), 1250272)); query aborted

To avoid these errors, reduce the number of tombstones per request by setting the spark.cassandra.input.split.size_in_mb property to a smaller size than the default of 64 MB. The spark.cassandra.input.split.size_in_mb property sets the approximate size of data the Spark Cassandra Connector will request with each individual CQL query.

The following example shows how to set the spark.cassandra.input.split.size_in_mb property to 1 MB and then to drop all phone number vertices from a graph.

:remote config alias g example_graph.a
g.graph.configuration.setProperty("spark.cassandra.input.split.size_in_mb", "1")
g.V().hasLabel("PhoneNumber").drop().iterate()

DSE authentication and OLAP queries

If DSE authentication is enabled, the internal user dse_inproc_user runs the application, not the user who submitted the Graph OLAP query.