DSE Graph and Graph Analytics
DSE Graph allows you to perform OLAP queries using Spark.
Many local graph traversals can be executed in real time at high transactional loads. When the density of the graph is too high or the branching factor too large (the number of connected nodes at each level of the graph), the memory and computation requirements to answer OLTP queries go beyond what is acceptable under typical application workloads. These type of queries are called deep queries.
Scan queries are queries that touch either an entire graph or large parts of the graph. They typically traverse a large number of vertices and edges. For example, a query on a social network graph that searches for friends of friends is a scan query.
For applications that use deep and scan queries, using a OLAP query will result in better performance.
Performing OLAP queries using DSE Graph
Every graph created in DSE Graph has an OLAP traversal source a
that is
available to gremlin-console and DataStax Studio. This traversal source
uses the SparkGraphComputer to analyze queries and execute them against
the underlying DSE Analytics nodes. The nodes must be started with Graph and Spark enabled
to access the OLAP traversal source. You must connect to the Spark Master node for the datacenter by either
running the console from the Spark Master or specifying the Spark Master in the
hosts
field of the Gremlin
console yaml file. For one-off or single-session OLAP queries, alias
database.a
to g
and create the
query. For example in the Gremlin console:
:remote config alias g database.a
g.V().count()
If you are performing multiple queries against different parts of the graph, use
graph.snapshot()
to return an OLAP traversal source for each part of the
graph. For example, in the Gremlin console:
categories = graph.snapshot().vertices('category1', 'category2').create()
To create a snapshot, supply all the vertices the snapshot will traverse. For example, the
following query touches both the Person
and Address
vertices.
def person = graph.snapshot().vertices('Person', 'Address').create() person.V().hasLabel('Person').out('HAS_ADDRESS').count()
Use the conf()
method on the snapshot before you call
create()
to set TinkerPop's SparkGraphComputer configuration options. For
example, to explicitly set the storage level for the snapshot to
MEMORY_ONLY
:
graph.snapshot().vertices("vertexlabel_alice", "vertexlabel_bob").edges("edgelabel_carol").conf("gremlin.spark.persistStorageLevel", "MEMORY_ONLY").create()
Setting Spark properties from Gremlin
Spark properties can be set from Gremlin using the graph.configuration.setProperty method on the graph.
:remote config alias g database.a
g.graph.configuration.setProperty("property name", value)
By default, Spark applications will use all available resources on the node, so no other Spark application can run. Limit the application's resources before running OLAP traversals by setting the maximum number of cores and the amount of memory used by the traversal. This is particularly important on servers with very large amounts of cores and memory.
For example this request sets 10 executors with 1 core and 4 GB of memory each:
:remote config alias g example_graph.a ==>g=example_graph.a
g.graph.configuration.setProperty("spark.cores.max", 10) g.graph.configuration.setProperty("spark.executor.memory", "4g") g.graph.configuration.setProperty("spark.executor.cores", "1")
The spark.cores.max
property sets the maximum number of cores used by
Spark. Setting this property lower than the total number of cores limits the number of nodes
on which the queries will be run. The spark.executor.memory
property sets
the amount of memory used for each executor. The spark.executor.cores
property sets the number of cores used for each executor.
Before you configure Spark properties from Gremlin kill the currently-running Spark context
from the Spark web UI. This will kill all currently
running Gremlin OLAP queries. From the Spark web UI, find the application named Apache
TinkerPop's Spark-Gremlin and click kill
next to the Application ID.
OLAP traversals create many intermediate objects during execution. These objects are garbage-collected by the JVM, so we recommend configuring a larger pool of executors each with smaller memory and CPU resources, compared to non-graph Spark jobs which typically perform better with fewer executors with higher memory and CPU resources.
We recommend allocating executors with no more then 8 cores (1 should work in most cases) to reduce garbage collection pauses and improve OLAP traversal performance. The memory available to Spark should be equally spread among the cores. For example, if you have 3 nodes and each has 24 cores and 96 GB dedicated to Spark you have 24 * 3 = 72 cores and 96 GB * 3 = 188 GB memory. To allocate all resources you should request 72 single core executors with 4 GB of memory each:
:remote config alias g example_graph.a ==>g=example_graph.a
g.graph.configuration.setProperty("spark.cores.max", 72)
g.graph.configuration.setProperty("spark.executor.memory", "4g") g.graph.configuration.setProperty("spark.executor.cores", "1")
Some OLAP queries and most DseGraphFrame
queries use Spark SQL joins for
traversals. Spark has a predefined number of partitions to perform merge joins, by default
set to 200. This can create huge Spark partitions for very large graphs, which slows query
execution.
To reduce the size of single partitions and improve performance increase the number of
shuffle partitions by setting the spark.sql.shuffle.partitions
property to
a larger number. We recommend the spark.sql.shuffle.partitions
is set to
2-4 times the number of Spark cluster cores. So if you have a 200 core cluster, set
spark.sql.shuffle.partitions
to 400 or 800.
:remote config alias g example_graph.a ==>g=example_graph.a
g.graph.configuration.setProperty("spark.sql.shuffle.partitions", 500)
DSE supports Spark dynamic resource allocation. To use dynamic resource allocation in Graph OLAP queries configure the Spark context with the following properties:
:remote config alias g gods.a g.graph.configuration.setProperty("spark.dynamicAllocation.enabled", "true") g.graph.configuration.setProperty("spark.shuffle.service.enabled", "true") g.graph.configuration.setProperty("spark.shuffle.service.port", "7437")
DSE's spark.shuffle.service.port
differs from the default Spark port, so
you need to specify it when using the Spark context in Graph OLAP queries.
To manage idle executors timeouts, set
spark.dynamicAllocation.executorIdleTimeout
to the number of seconds
before a timeout occurs.
g.graph.configuration.setProperty("spark.dynamicAllocation.executorIdleTimeout", "60")
If you create snapshots, the executors will not be shutdown by default. To allow the
executors to be shutdown, set
spark.dynamicAllocation.cachedExecutorIdleTimeout
to the numebr of
seconds before a timeout occurs:
g.graph.configuration.setProperty("spark.dynamicAllocation.cachedExecutorIdleTimeout", "60")
When to use analytic OLAP queries
On large graphs, OLAP queries typically perform better for deep queries. However, executing deep queries as part of an OLTP load may make sense if they are rarely performed. For example, on online payment provider will favor OLTP queries to process payments quickly, but may require a deep query if there are indications of fraud in the transaction. While the deep query may take much longer as an OLTP workload, on the whole the performance of the application will be faster than segmenting the application into OLTP and OLAP queries.
Long running and periodic processes like recommendation engines and search engines that analyze an entire graph are the ideal use cases for OLAP queries. However, one-off data analysis operations that involve deep queries or that scan the entire database also can benefit from being run as OLAP queries. See DSE Graph, OLTP, and OLAP for detailed information on performance differences between OLTP and OLAP queries.
Best practices for deleting large numbers of edges and vertices
When deleting large numbers of edges or vertices from a graph, you may end up getting error messages in subsequent queries due the large number of tombstones left in the database before they are automatically removed.
The log entries for such errors resemble the following:
ERROR [ReadStage-32] 2017-05-18 11:18:29,289 StorageProxy.java:2068 - Scanned over 100001 tombstones during query 'SELECT * FROM t33215.PhoneNumber_p WHERE token(community_id) > -7331398285705078207 AND token(community_id) <= -6858404847917653807 LIMIT 1000' (last scanned row partion key was ((216134144), 1250272)); query aborted
To avoid these errors, reduce the number of tombstones per request by setting the
spark.cassandra.input.split.size_in_mb
property to a smaller size than
the default of 64 MB. The spark.cassandra.input.split.size_in_mb
property
sets the approximate size of data the Spark Cassandra Connector will request with each
individual CQL query.
The following example shows how to set the
spark.cassandra.input.split.size_in_mb
property to 1 MB and then to drop
all phone number vertices from a graph.
:remote config alias g example_graph.a
g.graph.configuration.setProperty("spark.cassandra.input.split.size_in_mb", "1") g.V().hasLabel("PhoneNumber").drop().iterate()
DSE authentication and OLAP queries
If DSE authentication is enabled, the internal user dse_inproc_user
runs
the application, not the user who submitted the Graph OLAP query.