Using transforms (filter, flatMap, and map) with DSE Graph Loader
How to use transforms (filter, flatMap, and map) with DSE Graph Loader
The data record for each data input is a document structure or nested map defined from an input file. A transformation acts upon the nested map and returns a nested map. Any provided transformation function must be thread-safe or the behavior of the data loader becomes undefined.
The transforms used are Groovy closures, or open anonymous blocks of code that can take
arguments, return values and be assigned for a variable. These closures often make use of a
Groovy implicit parameter, it
. When a closure does not explicitly define a
parameter list, it
is always a defined parameter that can be used. In the
following examples, it
is used to get each record in an input file and apply
the transformation.
The placement of the transform in the mapping script is arbitrary; as long as the input file is defined before the transform is defined, a transform may be placed anywhere in the mapping script.
Here's a simple introduction to Groovy for those unfamiliar with it.
filter
How to use filter with DSE Graph Loader
The filter
function can apply criteria to the input file, selecting only the
objects that meet the criteria and loading them. The criteria can match any data type used in
a field.
Filter based on inequality operation on integer
<input_file_name>.filter { ... }
.
Given an integer field for age
, all chefs 41 years old and younger can be
filtered, and loaded into the graph with vertex label
chefYoung
:/** SAMPLE INPUT name|gender|status|age Jamie Oliver|M|alive|41 **/ inputfiledir = '/tmp/filter_map_flatmap/' chefs = File.csv(inputfiledir + "filterData.csv").delimiter('|') // filter def chefsYoung = chefs.filter { it["age"].toInteger() <= 41 } //Specifies what data source to load using which mapper (as defined inline) load(chefsYoung).asVertices { label "chefYoung" key "name" }The value for
age
is converted to an Integer for the function operation, and
compared to the value of 41
.g.V().hasLabel('chefYoung').valueMap() ==>{gender=[M], name=[Jamie Oliver], age=[41], status=[alive]} ==>{gender=[F], name=[Amanda Cohen], age=[35], status=[alive]} ==>{gender=[M], name=[Patrick Connolly], age=[31], status=[alive]}
Filter based on equality match operation on string
/** SAMPLE INPUT name|gender|status|age Jamie Oliver|M|alive|41 **/ inputfiledir = '/tmp/filter_map_flatmap/' chefs = File.csv(inputfiledir + "filterData.csv").delimiter('|') def chefsAlive = chefs.filter { it["status"] == "alive" } def chefsDeceased = chefs.filter { it["status"] == "deceased" } load(chefsAlive).asVertices { label "chefAlive" key "name" } load(chefsDeceased).asVertices { label "chefDeceased" key "name" }The filter checks the value of the string status and creates two new inputs,
chefsAlive
and chefsDeceased
to use for loading the
vertices, with the respective vertex labels chefAlive
and
chefDeceased
. // List all the living chefs g.V().hasLabel('chefAlive').valueMap() ==>{gender=[F], name=[Alice Waters], age=[73], status=[alive]} ==>{gender=[F], name=[Patricia Curtan], age=[66], status=[alive]} ==>{gender=[F], name=[Kelsie Kerr], age=[57], status=[alive]} ==>{gender=[M], name=[Fritz Streiff], age=[500], status=[alive]} ==>{gender=[M], name=[Emeril Lagasse], age=[57], status=[alive]} ==>{gender=[M], name=[Jamie Oliver], age=[41], status=[alive]} ==>{gender=[F], name=[Amanda Cohen], age=[35], status=[alive]} ==>{gender=[M], name=[Patrick Connolly], age=[31], status=[alive]} // List all the deceased chefs g.V().hasLabel('chefDeceased').valueMap() ==>{gender=[F], name=[Julia Child], age=[500], status=[deceased]} ==>{gender=[F], name=[Simone Beck], age=[500], status=[deceased]} ==>{gender=[F], name=[Louisette Bertholie], age=[500], status=[deceased]} ==>{gender=[F], name=[Patricia Simon], age=[500], status=[deceased]} ==>{gender=[M], name=[James Beard], age=[500], status=[deceased]}
Full filter data set
name|gender|status|age Julia Child|F|deceased|500 Simone Beck|F|deceased|500 Louisette Bertholie|F|deceased|500 Patricia Simon|F|deceased|500 Alice Waters|F|alive|73 Patricia Curtan|F|alive|66 Kelsie Kerr|F|alive|57 Fritz Streiff|M|alive|500 Emeril Lagasse|M|alive|57 James Beard|M|deceased|500 Jamie Oliver|M|alive|41 Amanda Cohen|F|alive|35 Patrick Connolly|M|alive|31Note the use of
500
as a placeholder for the age of deceased chefs.Full filter mapping script
filters
:/** SAMPLE INPUT name|gender|status|age Jamie Oliver|M|alive|41 **/ // SCHEMA schema.propertyKey('name').Text().ifNotExists().create() schema.propertyKey('gender').Text().ifNotExists().create() schema.propertyKey('status').Text().ifNotExists().create() schema.propertyKey('age').Int().ifNotExists().create() schema.vertexLabel('chefAlive').properties('name','gender','status','age').create() schema.vertexLabel('chefAlive').index('byname').materialized().by('name').add() schema.vertexLabel('chefDeceased').properties('name','gender','status','age').create() schema.vertexLabel('chefDeceased').index('byname').materialized().by('name').add() schema.vertexLabel('chefYoung').properties('name','gender','status','age').create() schema.vertexLabel('chefYoung').index('byname').materialized().by('name').add() // CONFIGURATION // Configures the data loader to create the schema config create_schema: false, load_new: true // DATA INPUT // Define the data input source (a file which can be specified via command line arguments) // inputfiledir is the directory for the input files that is given in the commandline // as the "-filename" option inputfiledir = '/tmp/filter_map_flatmap/' chefs = File.csv(inputfiledir + "filterData.csv").delimiter('|') def chefsYoung = chefs.filter { it["age"].toInteger() <= 41 } def chefsAlive = chefs.filter { it["status"] == "alive" } def chefsDeceased = chefs.filter { it["status"] == "deceased" } //Specifies what data source to load using which mapper (as defined inline) load(chefsYoung).asVertices { label "chefYoung" key "name" } load(chefsAlive).asVertices { label "chefAlive" key "name" } load(chefsDeceased).asVertices { label "chefDeceased" key "name" }
flatMap
How to use flatMap with DSE Graph Loader
The flatMap
function (also called expand
) can break a
single field in the input file into separate objects before loading them. In general, this
function is used to convert more compacted data into an expanded form.
FlatMap based on multiple cuisine values for a recipe
<input_file_name>.flatMap { ... }
. Given
a field for cuisine
that identifies all the possible cuisine choices
for a recipe, a record for each vertex can be created using the recipe name and the cuisine
type as a separate vertex when loading the vertices into the
graph:/** SAMPLE INPUT name|cuisine Beef Bourguignon|English::French **/ inputfiledir = '/tmp/filter_map_flatmap/' recipes = File.csv(inputfiledir + "flatmapData.csv").delimiter('|') def recipesCuisine = recipes.flatMap { def name = it["name"]; it["cuisine"]. split("::"). collect { it = [ 'name': name, 'cuisine': it ] } } //Specifies what data source to load using which mapper (as defined inline) load(recipesCuisine).asVertices { label "recipe" key name: "name", cuisine: "cuisine" }The
flatMap
function gets each record, retrieves the recipe name, splits the
cuisine field, and then collects each name/cuisine pair to use as the composite key for
identifying each separate vertex. The Groovy split
method splits a string
(cuisine) using the supplied delimiter (::
) and returns an array
of strings (each cuisine). The Groovy collect
method iterates over a
collection and transforms each element of the collection.g.V().valueMap() ==>{name=[Beef Bourguignon], cuisine=[English]} ==>{name=[Beef Bourguignon], cuisine=[French]} ==>{name=[Nicoise Salade], cuisine=[French]} ==>{name=[Wild Mushroom Stroganoff], cuisine=[American]} ==>{name=[Wild Mushroom Stroganoff], cuisine=[English]}
Full flatMap data set
name|cuisine Beef Bourguignon|English::French Nicoise Salade|French Wild Mushroom Stroganoff|American::English
Full flatMap mapping script
flatMap
:/** SAMPLE INPUT name|cuisine Beef Bourguignon|English::French **/ // SCHEMA schema.propertyKey('name').Text().ifNotExists().create() schema.propertyKey('cuisine').Text().ifNotExists().create() schema.vertexLabel('recipe').properties('name','cuisine').create() schema.vertexLabel('recipe').index('byname').materialized().by('name').add() // CONFIGURATION // Configures the data loader to create the schema config create_schema: false, load_new: true // DATA INPUT // Define the data input source (a file which can be specified via command line arguments) // inputfiledir is the directory for the input files that is given in the commandline // as the "-filename" option inputfiledir = '/tmp/filter_map_flatmap/' recipes = File.csv(inputfiledir + "flatmapData.csv").delimiter('|') def recipesCuisine = recipes.flatMap { def name = it["name"]; it["cuisine"]. split("::"). collect { it = [ 'name': name, 'cuisine': it ] } } //Specifies what data source to load using which mapper (as defined inline) load(recipesCuisine).asVertices { label "recipe" key name: "name", cuisine: "cuisine" }
map
How to use map with DSE Graph Loader
The map()
(also called transform()
) applies a function to a
field's values before loading the data.
map converts gender field from to lower case from any case
The input file for this example is authorInput. The map is applied to
the input file using the syntax <input_file_name>.map { ... }
. Given a
field gender
, the Groovy toLowerCase()
method is performed
on each gender
value in the nested map authorInput
:
inputfiledir = '/tmp/TEXT/' authorInput = File.text(inputfiledir + "author.dat"). delimiter("|"). header('name', 'gender') authorInput = authorInput.map { it['gender'] = it['gender'].toLowerCase(); it }
This map()
transformation ensures that the gender
values
in the graph are only lowercase.
gender
:g.V().valueMap() ==>{gender=[f], name=[Julia Child], age=[500]} ==>{gender=[f], name=[Simone Beck], age=[500]} ==>{gender=[f], name=[Louisette Bertholie], age=[500]} ==>{gender=[f], name=[Patricia Simon], age=[500]} ==>{gender=[f], name=[Alice Waters], age=[73]} ==>{gender=[f], name=[Patricia Curtan], age=[66]} ==>{gender=[f], name=[Kelsie Kerr], age=[57]} ==>{gender=[m], name=[Fritz Streiff], age=[500]} ==>{gender=[m], name=[Emeril Lagasse], age=[57]} ==>{gender=[m], name=[James Beard], age=[500]} ==>{gender=[m], name=[Jamie Oliver], age=[41]} ==>{gender=[f], name=[Amanda Cohen], age=[35]} ==>{gender=[m], name=[Patrick Connolly], age=[31]}
Full map data set
name|gender|age Julia Child|F|500 Simone Beck|F|500 Louisette Bertholie|F|500 Patricia Simon|F|500 Alice Waters|F|73 Patricia Curtan|F|66 Kelsie Kerr|F|57 Fritz Streiff|M|500 Emeril Lagasse|M|57 James Beard|M|500 Jamie Oliver|M|41 Amanda Cohen|F|35 Patrick Connolly|M|31
Full map mapping script
map
:/** SAMPLE INPUT
name|gender|age
Jamie Oliver|M|41
**/
// SCHEMA
schema.propertyKey('name').Text().ifNotExists().create()
schema.propertyKey('gender').Text().ifNotExists().create()
schema.propertyKey('age').Int().ifNotExists().create()
schema.vertexLabel('chef').properties('name','gender','age').create()
schema.vertexLabel('chef').index('byname').materialized().by('name').add()
// CONFIGURATION
// Configures the data loader to create the schema
config create_schema: false, load_new: true
// DATA INPUT
// Define the data input source (a file which can be specified via command line arguments)
// inputfiledir is the directory for the input files that is given in the commandline
// as the "-filename" option
inputfiledir = '/tmp/filter_map_flatmap/'
chefs = File.csv(inputfiledir + "mapData.csv").delimiter('|')
chefInput = chefs.map { it['gender'] = it['gender'].toLowerCase(); it }
//Specifies what data source to load using which mapper (as defined inline)
load(chefInput).asVertices {
label "chef"
key "name"
}