Using the DseGraphFrame framework for graph analytics queries

The DseGraphFrame framework provides the Spark base API for analytics operations on DSE Graph.

The DseGraphFrame framework allows you create applications that use the Spark API for analytics operations on DataStax Graph. It is inspired by the Databricks GraphFrame library and supports a subset of the Gremlin graph traversal language. You can read DataStax Graph data into a GraphFrame and write GraphFrame objects from any format supported by Spark into DataStax Graph. You can also query DseGraphFrame vertices and edges in Spark SQL.

Choosing when to use DseGraphFrame or DataStax Graph OLAP queries

DataStax Graph OLAP has broader support for Gremlin than the DseGraphFrame API. While Graph OLAP is the best choice for deep queries, simple filtering and counts are much faster using the DseGraphFrame API.

Overview of DseGraphFrame

DseGraphFrame represents a graph as two virtual tables: a vertex and an edge DataFrame. The V() method returns the vertex DataFrame of a graph. The E() method returns the edge DataFrame of a graph.

val g = spark.dseGraph("test")
g.V.show
g.E.show

DseGraphFrame uses a GraphFrame-compatible format. This format requires the vertex DataFrame to have only one id column and the edge DataFrame to have hard coded src and dst columns. Since DataStax Graph allows users to define any arbitrary set of columns as the vertex id and since there is no concept of labels in GraphFrame, DseGraphFrame will serialize the entire DataStax Graph id into one id column. The label is represented as part of the id and also as the ~label property column.

Using DseGraphFrame

The starting point for all operations is the DseGraphFrame object. In Scala, there's an implicit conversion between DseGraphFrame objects and GraphFrame objects.
// load a graph
val graph = spark.dseGraph("my_graph")
//use the TinkerPop API 
graph.V().has("edge", gt(100)).count().next()
// use the GraphFrame API 
graph.find("(a)-[e]->(b); (b)-[e2]->(c)").filter("e2.`~label` = 'includedIn'").select("a.name", "e.`~label`", "b.name", "e2.`~label`", "c.name").distinct.show
// Use both the TinkerPop and GraphFrame APIs:
graph.V().out().hasLabel("label").df.show

In Java, use the gf() method, or use the DseGraphFrameBuilder.dseGraph(String graphName, GraphFrame gf) method to return a GraphFrame instance.

//load a graph
GraphFrame graph = DseGraphFrameBuilder.dseGraph("my_graph", spark);
//use the TinkerPop API 
graph.V().has("edge", gt(100)).count().next()
// use the GraphFrame API 
graph.find("(a)-[e]->(b); (b)-[e2]->(c)").filter("e2.label = 'includedIn'").select("a.name", "e.`~label`", "b.name", "e2.`~label`", "c.name").distinct().show()
// Use both the TinkerPop and GraphFrame APIs:
graph.V().out().hasLabel("label").df().show()
Before doing complex queries, it is strongly recommended you cache the graph. You can do so using the cache() or persist(level) methods.
g.cache()

The persist() method requires one of the Spark persist levels as a parameter.

g.persist(MEMORY_AND_DISK_SER)

To list all graph names, call the spark.dseGraphs method.

spark.dseGraphs()

See the API documentation for a full list of methods.

Writing Graph data

If you are using the DataFrame write method on a graph, convert the results to a DataFrame before calling write to avoid collision with the TinkerPop GraphTraversal write method. Use the df or toDF methods on the graph before calling write.

val g = spark.dseGraph("test")
g.V.df.write.json('dsefs://mygraphs/output')
g.V.toDF.write.json('dsefs://mygraphs/output')

Using TTL and writetime in DseGraphFrame

You can read and write time-to-live (TTL) and writetime values in DseGraphFrames. TTL values are settings for when the row data expires. The writetime value is the time when the row data was last modified.

To read the TTL column, map the TTL column to a metadata column name in the graph:

g = spark.dseGraph("name", Map("ttl.v" -> "ttlOfV"))

To read the TTL value, select it. For Classic Graph:

g.V().df().select(v).show
v 
- 
["v value", 1232324242423]

For DataStax Graph:

g.V().df().select(v).show
v| ttlOfV|
-+------+
|"v value"|1232324242423|
-+-------+

To write TTL values to the graph in both Classic Graph and DataStax Graph:

g = spark.dseGraph("name", Map("ttl" -> "ttlValue"))
df = Seq((1,2,6000)).toDF("k", "v", "ttlValue")
g.updateVertices(df)

Reading writetime values is similar in both Classic Graph and DataStax Graph to using TTL.

g = spark.dseGraph("name", Map("writetime.v" -> "writetimeOfV"))
g.V().df().select(v).show

Classic Graph only has read support for writetime.

To write using DataStax Graph:

g = spark.dseGraph("name", Map("writetime" -> "writetimeValue"))
df = Seq((1,2,1000)).toDF("k", "v", "writetimeValue")
g.updateVertices(df)

See Reading and writing TTL and write time properties for information on using TTL and writetime values with Spark DataFrames.

Configuring DSE Smart Analytics query routing

By default DataStax Graph uses the DseGraphFrameInterceptorStrategy which will automatically intercept count, groupCount, and drop queries and route them to DseGraphFrame to improve performance. These simpler queries skip StarGraph RDD creation, allowing for faster execution times.

If a count or groupCount query is against a snapshot or has a path length longer than 2, DSE will not intercept the query.

To disable DseGraphFrameInterceptorStrategy, call the withoutStragies method on the graph.

g.withoutStrategies(com.datastax.bdp.graph.impl.tinkerpop.optimizer.DseGraphFrameInterceptorStrategy.class)
.V()
.count()

Copying graphs from one cluster to another using DseGraphFrame

You can copy graph data from one cluster to another using DseGraphFrame. Modify the configuration in the Spark session to include both cluster names and connection points.

spark.setCassandraConf("cluster1", CassandraConnectorConf.ConnectionHostParam.option("10.0.0.1"))
spark.setCassandraConf("cluster2", CassandraConnectorConf.ConnectionHostParam.option("10.0.0.2"))

Connect to the graphs on the source and destination clusters.

val source =  spark.dseGraph("srcGraph", Map ("cluster" -> "cluster1") )
val dst =  spark.dseGraph("dstGraph", Map ("cluster" -> "cluster2"))

Update the vertices and edges on the destination graph using the source graph.

dst.updateVertices(src.V)
dst.updateEdges(src.E)

Using authorization with DseGraphFrame

If you have enabled authorization on DSE, grant execute permissions to the DseGraphRpc object.