グラフ分析クエリーでのDseGraphFrameフレームワークの使用

DseGraphFrameフレームワークは、DSE Graphでの分析操作で用いるSparkベースのAPIを提供します。

DseGraphFrameフレームワークを使用すると、DSE Graphでの分析操作にSpark APIを使用するアプリケーションを作成することができます。これはDatabricksのGraphFrameライブラリにヒントを得たもので、Gremlinグラフ探索言語のサブセットに対応しています。DSE GraphデータをGraphFrameに読み出し、Sparkでサポートされている任意の形式からGraphFrameオブジェクトをDSE Graphに書き出すことができます。

DseGraphFrameクエリーを使用する状況とDSE Graph OLAPクエリーを使用する状況の選択

DSE Graph OLAPのGremlinに対する対応範囲はDseGraphFrame APIよりも広範です。詳細なクエリーを行う場合はGraph OLAPが最適ですが、単純なフィルター処理やカウントの場合はDseGraphFrame APIを使用した方がはるかに速く処理できます。

DseGraphFrameの概要

DseGraphFrameは、グラフを2つの仮想テーブル(頂点とエッジのDataFrame)で表現します。V()メソッドは、グラフの頂点DataFrameを返します。E()メソッドは、グラフのエッジDataFrameを返します。

val g = spark.dseGraph("test")
g.V.show
g.E.show

DseGraphFrameでは、GraphFrame互換の形式が使用されます。この形式では、頂点DataFrameidカラムを1つだけ指定し、エッジDataFrameにハードコード化されたsrcカラムとdstカラムを指定する必要があります。DSE Graphでは、ユーザーが任意のカラム・セットを頂点IDとして定義でき、GraphFrameにはラベルの概念がないため、DseGraphFrameは、DSE Graph id全体を1つのidカラムにシリアライズします。ラベルはidの一部として、また~labelプロパティ・カラムとして表されます。

DseGraphFrameの使用

すべての操作の開始点となるのがDseGraphFrameオブジェクトです。Scalaでは、DseGraphFrameオブジェクトとGraphFrameオブジェクトの間に暗黙的変換があります。
// load a graph
val graph = spark.dseGraph("my_graph")
//use the TinkerPop API 
graph.V().has("edge", gt(100)).count().next()
// use the GraphFrame API 
graph.find("(a)-[e]->(b); (b)-[e2]->(c)").filter("e2.`~label` = 'includedIn'").select("a.name", "e.`~label`", "b.name", "e2.`~label`", "c.name").distinct.show
// Use both the TinkerPop and GraphFrame APIs:
graph.V().out().hasLabel("label").df.show

Javaの場合は、gf()メソッドを使用するか、DseGraphFrameBuilder.dseGraph(String graphName, GraphFrame gf)メソッドを使用してGraphFrameインスタンスを返します。

//load a graph
GraphFrame graph = DseGraphFrameBuilder.dseGraph("my_graph", spark);
//use the TinkerPop API 
graph.V().has("edge", gt(100)).count().next()
// use the GraphFrame API 
graph.find("(a)-[e]->(b); (b)-[e2]->(c)").filter("e2.label = 'includedIn'").select("a.name", "e.`~label`", "b.name", "e2.`~label`", "c.name").distinct().show()
// Use both the TinkerPop and GraphFrame APIs:
graph.V().out().hasLabel("label").df().show()
複雑なクエリーを行う前に、グラフをキャッシュすることを強くお勧めします。キャッシュするには、cache()メソッドか、persist(level)メソッドを使用します。
g.cache()

persist()メソッドを使用するには、パラメーターとしてSpark保持レベルのいずれかを指定する必要があります。

g.persist(MEMORY_AND_DISK_SER)
表 1. DseGraphFrameメソッド・リスト
メソッド 説明
gf() GraphFrameオブジェクトを返します。
V() 頂点の探索を開始するDseGraphTraversal[Vertex]オブジェクトを返します。
E() エッジの探索を開始するDseGraphTraversal[Edge]オブジェクトを返します。
cache() Sparkでグラフ・データをキャッシュします。
persist(level) Spark保持レベルのいずれかを使用して、グラフ・データをキャッシュします。
deleteVertices() 頂点を削除します。
deleteVertices(label: String) 指定されたラベルを持つすべての頂点を削除します。
deleteEdges() エッジを削除します。
deleteVertexProperties() 頂点のプロパティを削除します。
deleteEdgeProperties() エッジのプロパティを削除します。
updateVertices(df: DataFrame, labels: Seq[String] = Seq.empty) 既存の頂点のプロパティを変更するか、新しい頂点を挿入します。

オプションのパラメーターlabelsは、更新する頂点をDataFrameから導き出すのではなく、提供されたラベルに対して繰り返し行うことで、更新のパフォーマンスを向上させます。

updateEdges(df: DataFrame, labels: Seq[String] = Seq.empty) 既存のエッジのプロパティを変更するか、新しいエッジを挿入します。

オプションのパラメーターlabelsは、更新するエッジをDataFrameから導き出すのではなく、提供されたラベルに対して繰り返し行うことで、更新のパフォーマンスを向上させます。