Using the DseGraphFrame framework for graph analytics queries
The DseGraphFrame framework provides the Spark base API for analytics operations on DSE Graph.
The DseGraphFrame
framework allows you create applications that use the Spark
API for analytics operations on DataStax Graph. It is inspired by the Databricks GraphFrame library and supports a subset of the Gremlin graph traversal
language. You can read DataStax Graph data into a GraphFrame
and write
GraphFrame
objects from any format supported by Spark into DataStax Graph. You can
also query DseGraphFrame
vertices and edges in Spark SQL.
Choosing when to use DseGraphFrame or DataStax Graph OLAP queries
DataStax Graph OLAP has broader support for
Gremlin than the DseGraphFrame
API. While Graph OLAP is the best choice for
deep queries, simple filtering and counts are much faster using the
DseGraphFrame
API.
Overview of DseGraphFrame
DseGraphFrame
represents a graph as two virtual tables: a vertex and an edge
DataFrame
. The V()
method returns the vertex
DataFrame
of a graph. The E()
method returns the edge
DataFrame
of a graph.
val g = spark.dseGraph("test") g.V.show g.E.show
DseGraphFrame
uses a GraphFrame
-compatible format. This
format requires the vertex DataFrame
to have only one id
column and the edge DataFrame
to have hard coded src
and
dst
columns. Since DataStax Graph allows users to define any arbitrary set of
columns as the vertex id and since there is no concept of labels in GraphFrame
,
DseGraphFrame
will serialize the entire DataStax Graph id
into one
id
column. The label is represented as part of the id
and
also as the ~label
property column.
Using DseGraphFrame
DseGraphFrame
object. In Scala,
there's an implicit conversion between DseGraphFrame
objects and
GraphFrame
objects.
// load a graph val graph = spark.dseGraph("my_graph") //use the TinkerPop API graph.V().has("edge", gt(100)).count().next() // use the GraphFrame API graph.find("(a)-[e]->(b); (b)-[e2]->(c)").filter("e2.`~label` = 'includedIn'").select("a.name", "e.`~label`", "b.name", "e2.`~label`", "c.name").distinct.show // Use both the TinkerPop and GraphFrame APIs: graph.V().out().hasLabel("label").df.show
In Java, use the gf()
method, or use the
DseGraphFrameBuilder.dseGraph(String graphName, GraphFrame gf)
method to
return a GraphFrame
instance.
//load a graph GraphFrame graph = DseGraphFrameBuilder.dseGraph("my_graph", spark); //use the TinkerPop API graph.V().has("edge", gt(100)).count().next() // use the GraphFrame API graph.find("(a)-[e]->(b); (b)-[e2]->(c)").filter("e2.label = 'includedIn'").select("a.name", "e.`~label`", "b.name", "e2.`~label`", "c.name").distinct().show() // Use both the TinkerPop and GraphFrame APIs: graph.V().out().hasLabel("label").df().show()
cache()
or persist(level)
methods.g.cache()
The persist()
method requires one of the Spark persist levels as a
parameter.
g.persist(MEMORY_AND_DISK_SER)
To list all graph names, call the spark.dseGraphs
method.
spark.dseGraphs()
See the API documentation for a full list of methods.
Writing Graph data
If you are using the DataFrame write
method on a graph, convert the
results to a DataFrame before calling write
to avoid collision with the
TinkerPop GraphTraversal write
method. Use the df
or
toDF
methods on the graph before calling write
.
val g = spark.dseGraph("test") g.V.df.write.json('dsefs://mygraphs/output') g.V.toDF.write.json('dsefs://mygraphs/output')
Using TTL and writetime in DseGraphFrame
You can read and write time-to-live (TTL) and writetime values in
DseGraphFrames
. TTL values are settings for when the row data expires.
The writetime value is the time when the row data was last modified.
To read the TTL column, map the TTL column to a metadata column name in the graph:
g = spark.dseGraph("name", Map("ttl.v" -> "ttlOfV"))
To read the TTL value, select it. For Classic Graph:
g.V().df().select(v).show
v
-
["v value", 1232324242423]
For DataStax Graph:
g.V().df().select(v).show
v| ttlOfV|
-+------+
|"v value"|1232324242423|
-+-------+
To write TTL values to the graph in both Classic Graph and DataStax Graph:
g = spark.dseGraph("name", Map("ttl" -> "ttlValue")) df = Seq((1,2,6000)).toDF("k", "v", "ttlValue") g.updateVertices(df)
Reading writetime values is similar in both Classic Graph and DataStax Graph to using TTL.
g = spark.dseGraph("name", Map("writetime.v" -> "writetimeOfV")) g.V().df().select(v).show
Classic Graph only has read support for writetime.
To write using DataStax Graph:
g = spark.dseGraph("name", Map("writetime" -> "writetimeValue")) df = Seq((1,2,1000)).toDF("k", "v", "writetimeValue") g.updateVertices(df)
See Reading and writing TTL and write time properties for information on using TTL and writetime values with Spark DataFrames.
Configuring DSE Smart Analytics query routing
By default DataStax Graph uses the DseGraphFrameInterceptorStrategy
which will
automatically intercept count
, groupCount
, and
drop
queries and route them to DseGraphFrame
to improve
performance. These simpler queries skip StarGraph
RDD creation, allowing for
faster execution times.
If a count
or groupCount
query is against a snapshot or has
a path length longer than 2, DSE will not intercept the query.
To disable DseGraphFrameInterceptorStrategy
, call the
withoutStragies
method on the graph.
g.withoutStrategies(com.datastax.bdp.graph.impl.tinkerpop.optimizer.DseGraphFrameInterceptorStrategy.class) .V() .count()
Copying graphs from one cluster to another using DseGraphFrame
You can copy graph data from one cluster to another using DseGraphFrame
.
Modify the configuration in the Spark session to include both cluster names and connection
points.
spark.setCassandraConf("cluster1", CassandraConnectorConf.ConnectionHostParam.option("10.0.0.1")) spark.setCassandraConf("cluster2", CassandraConnectorConf.ConnectionHostParam.option("10.0.0.2"))
Connect to the graphs on the source and destination clusters.
val source = spark.dseGraph("srcGraph", Map ("cluster" -> "cluster1") ) val dst = spark.dseGraph("dstGraph", Map ("cluster" -> "cluster2"))
Update the vertices and edges on the destination graph using the source graph.
dst.updateVertices(src.V) dst.updateEdges(src.E)
Using authorization with DseGraphFrame
If you have enabled authorization on DSE, grant
execute permissions to the DseGraphRpc
object.