Using the DseGraphFrame framework for graph analytics queries
The DseGraphFrame framework provides the Spark base API for analytics operations on DSE Graph.
The DseGraphFrame framework allows you create applications that use the Spark
API for analytics operations on DSE Graph. It is inspired by the Databricks GraphFrame library and supports a subset of the Gremlin graph traversal
language. You can read DSE Graph data into a GraphFrame and write
GraphFrame objects from any format supported by Spark into DSE Graph. You can
also query DseGraphFrame vertices and edges in Spark SQL.
Choosing when to use DseGraphFrame or DSE Graph OLAP queries
DSE Graph OLAP has broader support for
Gremlin than the DseGraphFrame API. While Graph OLAP is the best choice for
deep queries, simple filtering and counts are much faster using the
DseGraphFrame API.
Overview of DseGraphFrame
DseGraphFrame represents a graph as two virtual tables: a vertex and an edge
DataFrame. The V() method returns the vertex
DataFrame of a graph. The E() method returns the edge
DataFrame of a graph.
val g = spark.dseGraph("test")
g.V.show
g.E.show
DseGraphFrame uses a GraphFrame-compatible format. This
format requires the vertex DataFrame to have only one id
column and the edge DataFrame to have hard coded src and
dst columns. Since DSE Graph allows users to define any arbitrary set of
columns as the vertex id and since there is no concept of labels in GraphFrame,
DseGraphFrame will serialize the entire DSE Graph id into one
id column. The label is represented as part of the id and
also as the ~label property column.
Using DseGraphFrame
DseGraphFrame object. In Scala,
there's an implicit conversion between DseGraphFrame objects and
GraphFrame objects.
// load a graph
val graph = spark.dseGraph("my_graph")
//use the TinkerPop API
graph.V().has("edge", gt(100)).count().next()
// use the GraphFrame API
graph.find("(a)-[e]->(b); (b)-[e2]->(c)").filter("e2.`~label` = 'includedIn'").select("a.name", "e.`~label`", "b.name", "e2.`~label`", "c.name").distinct.show
// Use both the TinkerPop and GraphFrame APIs:
graph.V().out().hasLabel("label").df.showIn Java, use the gf() method, or use the
DseGraphFrameBuilder.dseGraph(String graphName, GraphFrame gf) method to
return a GraphFrame instance.
//load a graph GraphFrame graph = DseGraphFrameBuilder.dseGraph("my_graph", spark); //use the TinkerPop API graph.V().has("edge", gt(100)).count().next() // use the GraphFrame API graph.find("(a)-[e]->(b); (b)-[e2]->(c)").filter("e2.label = 'includedIn'").select("a.name", "e.`~label`", "b.name", "e2.`~label`", "c.name").distinct().show() // Use both the TinkerPop and GraphFrame APIs: graph.V().out().hasLabel("label").df().show()
cache() or persist(level)
methods.g.cache()
The persist() method requires one of the Spark persist levels as a
parameter.
g.persist(MEMORY_AND_DISK_SER)
To list all graph names, call the spark.dseGraphs method.
spark.dseGraphs()
See the API documentation for a full list of methods.
Configuring DSE Smart Analytics query routing
By default DSE Graph uses the DseGraphFrameInterceptorStrategy which will
automatically intercept count, groupCount, and
drop queries and route them to DseGraphFrame to improve
performance. These simpler queries skip StarGraph RDD creation, allowing for
faster execution times.
If a count or groupCount query is against a snapshot or has
a path length longer than 2, DSE will not intercept the query.
To disable DseGraphFrameInterceptorStrategy, call the
withoutStragies method on the graph.
g.withoutStrategies(com.datastax.bdp.graph.impl.tinkerpop.optimizer.DseGraphFrameInterceptorStrategy.class) .V() .count()
Copying graphs from one cluster to another using DseGraphFrame
You can copy graph data from one DSE cluster to another using
DseGraphFrame. Modify the configuration in the Spark session to include
both cluster names and connection points.
spark.setCassandraConf("cluster1", CassandraConnectorConf.ConnectionHostParam.option("10.0.0.1"))
spark.setCassandraConf("cluster2", CassandraConnectorConf.ConnectionHostParam.option("10.0.0.2"))
Connect to the graphs on the source and destination clusters.
spark.conf.set("cluster", "cluster1")
val source = spark.dseGraph("srcGraph")
spark.conf.set("cluster", "cluster2")
val dst = spark.dseGraph("dstGraph")
Update the vertices and edges on the destination graph using the source graph.
dst.updateVertices(src.V) dst.updateEdges(src.E)
Using authorization with DseGraphFrame
If you have enabled authorization on DSE, grant
execute permissions to the DseGraphRpc object.
