DSE Graph Loaderでの変換(filter、flatMap、map)の使用

DSE Graph Loaderで変換(filter、flatMap、map)を使用する方法

すべてのデータ入力は、ユーザーが指定した関数に従って入力データの操作またはtruncateを行うための任意のユーザー変換に対応しています。DSE Graph Loaderで使用可能な変換は以下のとおりです。
注: DSE Graph Loader 6.0では、変換関数が廃止予定となる可能性があります。変更が生じる可能性があることにご留意ください。

各データ入力のデータ・レコードは、ドキュメント構造か、入力ファイルから定義された、ネストされたマップです。変換は、ネストされたマップに対して行われ、ネストされたマップを返します。指定される変換関数はすべてスレッド・セーフでなければなりません。スレッド・セーフでなければ、データ・ローダーの動作が定義されなくなります。

使用される変換はGroovyクロージャか、引数を取り、値を返し、変数に割り当てることができるオープンなコードの匿名ブロックです。このようなクロージャは、Groovyの暗黙的なパラメーター、itを使用することがよくあります。クロージャによってパラメーター・リストが明示的に定義されない場合に、常に定義されたパラメーターとして使用できるのがitです。以下の例では、入力ファイルの各レコードを取得し、変換を適用するためにitが使用されています。

マッピング・スクリプトへの変換の配置は任意です。変換を定義する前に入力ファイルが定義されていれば、マッピング・スクリプトの任意の場所に変換を配置することができます。

Groovyをよく知らない方のために、こちらで簡単にGroovyの紹介をしています。

filter

DSE Graph Loaderでfilterを使用する方法

filter関数では、入力ファイルに基準を適用し、基準に合うオブジェクトのみを選択して、そのオブジェクトを読み込むことができます。この基準では、フィールドで使用されている任意のデータ型に一致させることができます。

整数の不等価演算に基づくフィルター

この例で定義されている入力ファイルはchefsです。フィルターは、構文<input_file_name>.filter { ...}という同等のHOCON構文でフォーマットされています。ageに整数フィールドを指定すると、41歳以下のすべてのシェフをフィルターして、頂点ラベルがchefYoungのグラフに読み込むことができます。
/** SAMPLE INPUT
name|gender|status|age
Jamie Oliver|M|alive|41
**/

inputfiledir = '/tmp/filter_map_flatmap/'
chefs = File.csv(inputfiledir + "filterData.csv").delimiter('|')

// filter
def chefsYoung = chefs.filter { it["age"].toInteger() <= 41 }

//Specifies what data source to load using which mapper (as defined inline)

load(chefsYoung).asVertices {
    label "chefYoung"
    key "name"
}
ageの値は関数演算のために整数に変換され、41の値と比較されます。
結果の値に反映されているように、基準に一致するレコードのみで頂点が作成されます。
 g.V().hasLabel('chefYoung').valueMap()
==>{gender=[M], name=[Jamie Oliver], age=[41], status=[alive]}
==>{gender=[F], name=[Amanda Cohen], age=[35], status=[alive]}
==>{gender=[M], name=[Patrick Connolly], age=[31], status=[alive]}

文字列の等価一致演算に基づくフィルター

2つのフィルターを使用したもう1つの例では、現在も活動を続けているシェフと既に亡くなっているシェフを全員検索します。
/** SAMPLE INPUT
name|gender|status|age
Jamie Oliver|M|alive|41
**/


inputfiledir = '/tmp/filter_map_flatmap/'
chefs = File.csv(inputfiledir + "filterData.csv").delimiter('|')
def chefsAlive = chefs.filter { it["status"] == "alive" }
def chefsDeceased = chefs.filter { it["status"] == "deceased" }

load(chefsAlive).asVertices {
    label "chefAlive"
    key "name"
}

load(chefsDeceased).asVertices {
    label "chefDeceased"
    key "name"
}
このフィルターは文字列statusの値を確認し、頂点を読み込む際に使用する、頂点ラベルがそれぞれchefAliveおよびchefDeceasedの、2つの入力chefsAliveおよびchefsDeceasedを新たに作成します。
結果として作成される頂点は、次のとおりです。
// List all the living chefs
g.V().hasLabel('chefAlive').valueMap()
==>{gender=[F], name=[Alice Waters], age=[73], status=[alive]}
==>{gender=[F], name=[Patricia Curtan], age=[66], status=[alive]}
==>{gender=[F], name=[Kelsie Kerr], age=[57], status=[alive]}
==>{gender=[M], name=[Fritz Streiff], age=[500], status=[alive]}
==>{gender=[M], name=[Emeril Lagasse], age=[57], status=[alive]}
==>{gender=[M], name=[Jamie Oliver], age=[41], status=[alive]}
==>{gender=[F], name=[Amanda Cohen], age=[35], status=[alive]}
==>{gender=[M], name=[Patrick Connolly], age=[31], status=[alive]}
 
// List all the deceased chefs
g.V().hasLabel('chefDeceased').valueMap()
==>{gender=[F], name=[Julia Child], age=[500], status=[deceased]}
==>{gender=[F], name=[Simone Beck], age=[500], status=[deceased]}
==>{gender=[F], name=[Louisette Bertholie], age=[500], status=[deceased]}
==>{gender=[F], name=[Patricia Simon], age=[500], status=[deceased]}
==>{gender=[M], name=[James Beard], age=[500], status=[deceased]}

完全なfilterデータ・セット

この例で使用されている完全なサンプル・データ・セットは以下のとおりです。
name|gender|status|age
Julia Child|F|deceased|500
Simone Beck|F|deceased|500
Louisette Bertholie|F|deceased|500
Patricia Simon|F|deceased|500
Alice Waters|F|alive|73
Patricia Curtan|F|alive|66
Kelsie Kerr|F|alive|57
Fritz Streiff|M|alive|500
Emeril Lagasse|M|alive|57
James Beard|M|deceased|500
Jamie Oliver|M|alive|41
Amanda Cohen|F|alive|35
Patrick Connolly|M|alive|31
既に亡くなっているシェフの年齢のプレースホルダーに500が使用されていることにご注意ください。

filterマッピング・スクリプト全文

3つのfiltersすべてを持つマップ・スクリプト全文は以下のようになります。
/** SAMPLE INPUT
name|gender|status|age
Jamie Oliver|M|alive|41
**/

// SCHEMA
schema.propertyKey('name').Text().ifNotExists().create()
schema.propertyKey('gender').Text().ifNotExists().create()
schema.propertyKey('status').Text().ifNotExists().create()
schema.propertyKey('age').Int().ifNotExists().create()

schema.vertexLabel('chefAlive').properties('name','gender','status','age').create()
schema.vertexLabel('chefAlive').index('byname').materialized().by('name').add()
schema.vertexLabel('chefDeceased').properties('name','gender','status','age').create()
schema.vertexLabel('chefDeceased').index('byname').materialized().by('name').add()
schema.vertexLabel('chefYoung').properties('name','gender','status','age').create()
schema.vertexLabel('chefYoung').index('byname').materialized().by('name').add()

// CONFIGURATION
// Configures the data loader to create the schema
config create_schema: false, load_new: true

// DATA INPUT
// Define the data input source (a file which can be specified via command line arguments)
// inputfiledir is the directory for the input files that is given in the commandline
// as the "-filename" option

inputfiledir = '/tmp/filter_map_flatmap/'
chefs = File.csv(inputfiledir + "filterData.csv").delimiter('|')
def chefsYoung = chefs.filter { it["age"].toInteger() <= 41 }
def chefsAlive = chefs.filter { it["status"] == "alive" }
def chefsDeceased = chefs.filter { it["status"] == "deceased" }

//Specifies what data source to load using which mapper (as defined inline)

load(chefsYoung).asVertices {
    label "chefYoung"
    key "name"
}

load(chefsAlive).asVertices {
    label "chefAlive"
    key "name"
}

load(chefsDeceased).asVertices {
    label "chefDeceased"
    key "name"
}

flatMap

DSE Graph LoaderでflatMapを使用する方法

flatMap関数(別名expand)は、入力ファイルの1つのフィールドを別々のオブジェクトに分割してから読み込むことができます。一般に、この関数は、コンパクションされたデータを拡張形式に変換する際に使用されます。

レシピに対する複数の料理の値に基づくFlatMap

この例の入力ファイルはrecipesです。flatMapは、構文<input_file_name>.flatMap { ...}という同等のHOCON構文でフォーマットされています。レシピについて考えられるcuisineの選択肢をすべて特定するcuisineに対してフィールドを指定すると、頂点をグラフに読み込む際に、レシピ名と料理の種類を別の頂点として使用して、各頂点のレコードを作成することができます。
/** SAMPLE INPUT
name|cuisine
Beef Bourguignon|English::French
**/

inputfiledir = '/tmp/filter_map_flatmap/'
recipes = File.csv(inputfiledir + "flatmapData.csv").delimiter('|')

def recipesCuisine = recipes.flatMap {
  def name = it["name"];
  it["cuisine"].
    split("::").
    collect { 
       it = [ 'name': name, 'cuisine': it ] 
    }
}
//Specifies what data source to load using which mapper (as defined inline)

load(recipesCuisine).asVertices {
    label "recipe"
    key name: "name", cuisine: "cuisine"
}
flatMap関数は各レコードを取得し、レシピ名を取得して料理フィールドを分割してから、各名前/料理の組み合わせを集めて、別々の各頂点を識別する複合キーとして使用します。Groovy splitメソッドは、提供された区切り文字(::)を使用して文字列(cuisine)を分割し、文字列の配列(各料理)を返します。Groovy collectメソッドは、コレクションに対して繰り返され、コレクションの各要素を変換します。
料理に基づいて使用可能なすべての頂点が、読み込みの結果に反映されます。
g.V().valueMap()
==>{name=[Beef Bourguignon], cuisine=[English]}
==>{name=[Beef Bourguignon], cuisine=[French]}
==>{name=[Nicoise Salade], cuisine=[French]}
==>{name=[Wild Mushroom Stroganoff], cuisine=[American]}
==>{name=[Wild Mushroom Stroganoff], cuisine=[English]}

完全なflatMapデータ・セット

この例で使用されている完全なサンプル・データ・セットは以下のとおりです。
name|cuisine
Beef Bourguignon|English::French
Nicoise Salade|French
Wild Mushroom Stroganoff|American::English

flatMapマッピング・スクリプト全文

flatMapを持つマップ・スクリプト全文は以下のようになります。
/** SAMPLE INPUT
name|cuisine
Beef Bourguignon|English::French
**/

// SCHEMA
schema.propertyKey('name').Text().ifNotExists().create()
schema.propertyKey('cuisine').Text().ifNotExists().create()

schema.vertexLabel('recipe').properties('name','cuisine').create()
schema.vertexLabel('recipe').index('byname').materialized().by('name').add()

// CONFIGURATION
// Configures the data loader to create the schema
config create_schema: false, load_new: true

// DATA INPUT
// Define the data input source (a file which can be specified via command line arguments)
// inputfiledir is the directory for the input files that is given in the commandline
// as the "-filename" option

inputfiledir = '/tmp/filter_map_flatmap/'
recipes = File.csv(inputfiledir + "flatmapData.csv").delimiter('|')

def recipesCuisine = recipes.flatMap {
  def name = it["name"];
  it["cuisine"].
     split("::").
     collect { 
        it = [ 'name': name, 'cuisine': it ] 
     }
}
//Specifies what data source to load using which mapper (as defined inline)

load(recipesCuisine).asVertices {
    label "recipe"
    key name: "name", cuisine: "cuisine"
}

map

DSE Graph Loaderでmapを使用する方法

map()(別名transform())は、フィールドの値に関数を適用してからデータを読み込みます。

mapによる任意の文字から小文字へのgenderフィールドの変換

この例の入力ファイルはauthorInputです。mapは、構文<input_file_name>.map { ...}という同等のHOCON構文でフォーマットされています。フィールドgenderを指定すると、ネストされたマップauthorInputの各gender値において、Groovy toLowerCase()メソッドが実行されます。

inputfiledir = '/tmp/TEXT/'
authorInput = File.text(inputfiledir + "author.dat").
    delimiter("|").
    header('name', 'gender')

authorInput = authorInput.map { it['gender'] = it['gender'].toLowerCase(); it }

このmap()変換によって、グラフのgender値が小文字のみになります。

genderの大文字と小文字の区別に加えた変更が、読み込みの結果に反映されます。
g.V().valueMap()
==>{gender=[f], name=[Julia Child], age=[500]}
==>{gender=[f], name=[Simone Beck], age=[500]}
==>{gender=[f], name=[Louisette Bertholie], age=[500]}
==>{gender=[f], name=[Patricia Simon], age=[500]}
==>{gender=[f], name=[Alice Waters], age=[73]}
==>{gender=[f], name=[Patricia Curtan], age=[66]}
==>{gender=[f], name=[Kelsie Kerr], age=[57]}
==>{gender=[m], name=[Fritz Streiff], age=[500]}
==>{gender=[m], name=[Emeril Lagasse], age=[57]}
==>{gender=[m], name=[James Beard], age=[500]}
==>{gender=[m], name=[Jamie Oliver], age=[41]}
==>{gender=[f], name=[Amanda Cohen], age=[35]}
==>{gender=[m], name=[Patrick Connolly], age=[31]}

完全なmapデータ・セット

この例で使用されている完全なサンプル・データ・セットは以下のとおりです。
name|gender|age
Julia Child|F|500
Simone Beck|F|500
Louisette Bertholie|F|500
Patricia Simon|F|500
Alice Waters|F|73
Patricia Curtan|F|66
Kelsie Kerr|F|57
Fritz Streiff|M|500
Emeril Lagasse|M|57
James Beard|M|500
Jamie Oliver|M|41
Amanda Cohen|F|35
Patrick Connolly|M|31

mapマッピング・スクリプト全文

mapを持つマップ・スクリプト全文は以下のようになります。
/** SAMPLE INPUT
name|gender|age
Jamie Oliver|M|41
**/

// SCHEMA
schema.propertyKey('name').Text().ifNotExists().create()
schema.propertyKey('gender').Text().ifNotExists().create()
schema.propertyKey('age').Int().ifNotExists().create()

schema.vertexLabel('chef').properties('name','gender','age').create()
schema.vertexLabel('chef').index('byname').materialized().by('name').add()

// CONFIGURATION
// Configures the data loader to create the schema
config create_schema: false, load_new: true

// DATA INPUT
// Define the data input source (a file which can be specified via command line arguments)
// inputfiledir is the directory for the input files that is given in the commandline
// as the "-filename" option

inputfiledir = '/tmp/filter_map_flatmap/'
chefs = File.csv(inputfiledir + "mapData.csv").delimiter('|')
chefInput = chefs.map { it['gender'] = it['gender'].toLowerCase(); it }

//Specifies what data source to load using which mapper (as defined inline)

load(chefInput).asVertices {
    label "chef"
    key "name"
}