Using transforms (filter, flatMap, and map) with DSE Graph Loader

All data inputs support arbitrary user transformations to manipulate or truncate the input data according to a user provided function. The available transforms for DSE Graph Loader are:

As of DSE Graph Loader 6.0, transformation functions may be deprecated; be aware that changes may occur.

The data record for each data input is a document structure or nested map defined from an input file. A transformation acts upon the nested map and returns a nested map. Any provided transformation function must be thread-safe or the behavior of the data loader becomes undefined.

The transforms used are Groovy closures, or open anonymous blocks of code that can take arguments, return values and be assigned for a variable. These closures often make use of a Groovy implicit parameter, it. When a closure does not explicitly define a parameter list, it is always a defined parameter that can be used. In the following examples, it is used to get each record in an input file and apply the transformation.

The placement of the transform in the mapping script is arbitrary; as long as the input file is defined before the transform is defined, a transform may be placed anywhere in the mapping script.

Here’s a simple introduction to Groovy for those unfamiliar with it.

filter

The filter function can apply criteria to the input file, selecting only the objects that meet the criteria and loading them. The criteria can match any data type used in a field.

Filter based on inequality operation on integer

The defined input file in this example is chefs. The filter is applied to the input file using the syntax <input_file_name>.filter { ... }. Given an integer field for age, all chefs 41 years old and younger can be filtered, and loaded into the graph with vertex label chefYoung:

/** SAMPLE INPUT
name|gender|status|age
Jamie Oliver|M|alive|41
**/

inputfiledir = '/tmp/filter_map_flatmap/'
chefs = File.csv(inputfiledir + "filterData.csv").delimiter('|')

// filter
def chefsYoung = chefs.filter { it["age"].toInteger() <= 41 }

//Specifies what data source to load using which mapper (as defined inline)

load(chefsYoung).asVertices {
    label "chefYoung"
    key "name"
}

The value for age is converted to an Integer for the function operation, and compared to the value of 41.

Only the records that match the criteria will create vertices, as reflected in the resulting values:

 g.V().hasLabel('chefYoung').valueMap()
==>{gender=[M], name=[Jamie Oliver], age=[41], status=[alive]}
==>{gender=[F], name=[Amanda Cohen], age=[35], status=[alive]}
==>{gender=[M], name=[Patrick Connolly], age=[31], status=[alive]}

Filter based on equality match operation on string

Another example of two filters finds all the chefs who are alive and who are deceased:

/** SAMPLE INPUT
name|gender|status|age
Jamie Oliver|M|alive|41
**/


inputfiledir = '/tmp/filter_map_flatmap/'
chefs = File.csv(inputfiledir + "filterData.csv").delimiter('|')
def chefsAlive = chefs.filter { it["status"] == "alive" }
def chefsDeceased = chefs.filter { it["status"] == "deceased" }

load(chefsAlive).asVertices {
    label "chefAlive"
    key "name"
}

load(chefsDeceased).asVertices {
    label "chefDeceased"
    key "name"
}

The filter checks the value of the string status and creates two new inputs, chefsAlive and chefsDeceased to use for loading the vertices, with the respective vertex labels chefAlive and chefDeceased.

The resulting vertices are:

// List all the living chefs
g.V().hasLabel('chefAlive').valueMap()
==>{gender=[F], name=[Alice Waters], age=[73], status=[alive]}
==>{gender=[F], name=[Patricia Curtan], age=[66], status=[alive]}
==>{gender=[F], name=[Kelsie Kerr], age=[57], status=[alive]}
==>{gender=[M], name=[Fritz Streiff], age=[500], status=[alive]}
==>{gender=[M], name=[Emeril Lagasse], age=[57], status=[alive]}
==>{gender=[M], name=[Jamie Oliver], age=[41], status=[alive]}
==>{gender=[F], name=[Amanda Cohen], age=[35], status=[alive]}
==>{gender=[M], name=[Patrick Connolly], age=[31], status=[alive]}

// List all the deceased chefs
g.V().hasLabel('chefDeceased').valueMap()
==>{gender=[F], name=[Julia Child], age=[500], status=[deceased]}
==>{gender=[F], name=[Simone Beck], age=[500], status=[deceased]}
==>{gender=[F], name=[Louisette Bertholie], age=[500], status=[deceased]}
==>{gender=[F], name=[Patricia Simon], age=[500], status=[deceased]}
==>{gender=[M], name=[James Beard], age=[500], status=[deceased]}

Full filter data set

The full sample data set used in this example:

name|gender|status|age
Julia Child|F|deceased|500
Simone Beck|F|deceased|500
Louisette Bertholie|F|deceased|500
Patricia Simon|F|deceased|500
Alice Waters|F|alive|73
Patricia Curtan|F|alive|66
Kelsie Kerr|F|alive|57
Fritz Streiff|M|alive|500
Emeril Lagasse|M|alive|57
James Beard|M|deceased|500
Jamie Oliver|M|alive|41
Amanda Cohen|F|alive|35
Patrick Connolly|M|alive|31

Note the use of 500 as a placeholder for the age of deceased chefs.

Full filter mapping script

The full map script with all three filters:

/** SAMPLE INPUT
name|gender|status|age
Jamie Oliver|M|alive|41
**/

// SCHEMA
schema.propertyKey('name').Text().ifNotExists().create()
schema.propertyKey('gender').Text().ifNotExists().create()
schema.propertyKey('status').Text().ifNotExists().create()
schema.propertyKey('age').Int().ifNotExists().create()

schema.vertexLabel('chefAlive').properties('name','gender','status','age').create()
schema.vertexLabel('chefAlive').index('byname').materialized().by('name').add()
schema.vertexLabel('chefDeceased').properties('name','gender','status','age').create()
schema.vertexLabel('chefDeceased').index('byname').materialized().by('name').add()
schema.vertexLabel('chefYoung').properties('name','gender','status','age').create()
schema.vertexLabel('chefYoung').index('byname').materialized().by('name').add()

// CONFIGURATION
// Configures the data loader to create the schema
config create_schema: false, load_new: true

// DATA INPUT
// Define the data input source (a file which can be specified via command line arguments)
// inputfiledir is the directory for the input files that is given in the commandline
// as the "-filename" option

inputfiledir = '/tmp/filter_map_flatmap/'
chefs = File.csv(inputfiledir + "filterData.csv").delimiter('|')
def chefsYoung = chefs.filter { it["age"].toInteger() <= 41 }
def chefsAlive = chefs.filter { it["status"] == "alive" }
def chefsDeceased = chefs.filter { it["status"] == "deceased" }

//Specifies what data source to load using which mapper (as defined inline)

load(chefsYoung).asVertices {
    label "chefYoung"
    key "name"
}

load(chefsAlive).asVertices {
    label "chefAlive"
    key "name"
}

load(chefsDeceased).asVertices {
    label "chefDeceased"
    key "name"
}

flatMap

The flatMap function (also called expand) can break a single field in the input file into separate objects before loading them. In general, this function is used to convert more compacted data into an expanded form.

FlatMap based on multiple cuisine values for a recipe

The input file for this example is recipes. The flatMap is applied to the input file using the syntax <input_file_name>.flatMap { ... }. Given a field for cuisine that identifies all the possible cuisine choices for a recipe, a record for each vertex can be created using the recipe name and the cuisine type as a separate vertex when loading the vertices into the graph:

/** SAMPLE INPUT
name|cuisine
Beef Bourguignon|English::French
**/

inputfiledir = '/tmp/filter_map_flatmap/'
recipes = File.csv(inputfiledir + "flatmapData.csv").delimiter('|')

def recipesCuisine = recipes.flatMap {
  def name = it["name"];
  it["cuisine"].
    split("::").
    collect {
       it = [ 'name': name, 'cuisine': it ]
    }
}
//Specifies what data source to load using which mapper (as defined inline)

load(recipesCuisine).asVertices {
    label "recipe"
    key name: "name", cuisine: "cuisine"
}

The flatMap function gets each record, retrieves the recipe name, splits the cuisine field, and then collects each name/cuisine pair to use as the composite key for identifying each separate vertex. The Groovy split method splits a string (cuisine) using the supplied delimiter (::) and returns an array of strings (each cuisine). The Groovy collect method iterates over a collection and transforms each element of the collection.

The result of the loading reflects all the possible vertices based on cuisine:

g.V().valueMap()
==>{name=[Beef Bourguignon], cuisine=[English]}
==>{name=[Beef Bourguignon], cuisine=[French]}
==>{name=[Nicoise Salade], cuisine=[French]}
==>{name=[Wild Mushroom Stroganoff], cuisine=[American]}
==>{name=[Wild Mushroom Stroganoff], cuisine=[English]}

Full flatMap data set

The full sample data set used in this example:

name|cuisine
Beef Bourguignon|English::French
Nicoise Salade|French
Wild Mushroom Stroganoff|American::English

Full flatMap mapping script

The full map script with flatMap:

/** SAMPLE INPUT
name|cuisine
Beef Bourguignon|English::French
**/

// SCHEMA
schema.propertyKey('name').Text().ifNotExists().create()
schema.propertyKey('cuisine').Text().ifNotExists().create()

schema.vertexLabel('recipe').properties('name','cuisine').create()
schema.vertexLabel('recipe').index('byname').materialized().by('name').add()

// CONFIGURATION
// Configures the data loader to create the schema
config create_schema: false, load_new: true

// DATA INPUT
// Define the data input source (a file which can be specified via command line arguments)
// inputfiledir is the directory for the input files that is given in the commandline
// as the "-filename" option

inputfiledir = '/tmp/filter_map_flatmap/'
recipes = File.csv(inputfiledir + "flatmapData.csv").delimiter('|')

def recipesCuisine = recipes.flatMap {
  def name = it["name"];
  it["cuisine"].
     split("::").
     collect {
        it = [ 'name': name, 'cuisine': it ]
     }
}
//Specifies what data source to load using which mapper (as defined inline)

load(recipesCuisine).asVertices {
    label "recipe"
    key name: "name", cuisine: "cuisine"
}

map

How to use map with DSE Graph Loader

The map() (also called transform()) applies a function to a field’s values before loading the data.

map converts gender field from to lower case from any case

The input file for this example is authorInput. The map is applied to the input file using the syntax <input_file_name>.map { ... }. Given a field gender, the Groovy toLowerCase() method is performed on each gender value in the nested map authorInput:

inputfiledir = '/tmp/TEXT/'
authorInput = File.text(inputfiledir + "author.dat").
    delimiter("|").
    header('name', 'gender')

authorInput = authorInput.map { it['gender'] = it['gender'].toLowerCase(); it }

This map() transformation ensures that the gender values in the graph are only lowercase.

The result of the loading reflects the change to the case of gender:

g.V().valueMap()
==>{gender=[f], name=[Julia Child], age=[500]}
==>{gender=[f], name=[Simone Beck], age=[500]}
==>{gender=[f], name=[Louisette Bertholie], age=[500]}
==>{gender=[f], name=[Patricia Simon], age=[500]}
==>{gender=[f], name=[Alice Waters], age=[73]}
==>{gender=[f], name=[Patricia Curtan], age=[66]}
==>{gender=[f], name=[Kelsie Kerr], age=[57]}
==>{gender=[m], name=[Fritz Streiff], age=[500]}
==>{gender=[m], name=[Emeril Lagasse], age=[57]}
==>{gender=[m], name=[James Beard], age=[500]}
==>{gender=[m], name=[Jamie Oliver], age=[41]}
==>{gender=[f], name=[Amanda Cohen], age=[35]}
==>{gender=[m], name=[Patrick Connolly], age=[31]}

Full map data set

The full sample data set used in this example:

name|gender|age
Julia Child|F|500
Simone Beck|F|500
Louisette Bertholie|F|500
Patricia Simon|F|500
Alice Waters|F|73
Patricia Curtan|F|66
Kelsie Kerr|F|57
Fritz Streiff|M|500
Emeril Lagasse|M|57
James Beard|M|500
Jamie Oliver|M|41
Amanda Cohen|F|35
Patrick Connolly|M|31

Full map mapping script

The full map script with map:

/** SAMPLE INPUT
name|gender|age
Jamie Oliver|M|41
**/

// SCHEMA
schema.propertyKey('name').Text().ifNotExists().create()
schema.propertyKey('gender').Text().ifNotExists().create()
schema.propertyKey('age').Int().ifNotExists().create()

schema.vertexLabel('chef').properties('name','gender','age').create()
schema.vertexLabel('chef').index('byname').materialized().by('name').add()

// CONFIGURATION
// Configures the data loader to create the schema
config create_schema: false, load_new: true

// DATA INPUT
// Define the data input source (a file which can be specified via command line arguments)
// inputfiledir is the directory for the input files that is given in the commandline
// as the "-filename" option

inputfiledir = '/tmp/filter_map_flatmap/'
chefs = File.csv(inputfiledir + "mapData.csv").delimiter('|')
chefInput = chefs.map { it['gender'] = it['gender'].toLowerCase(); it }

//Specifies what data source to load using which mapper (as defined inline)

load(chefInput).asVertices {
    label "chef"
    key "name"
}

Was this helpful?

Give Feedback

How can we improve the documentation?

© 2024 DataStax | Privacy policy | Terms of use

Apache, Apache Cassandra, Cassandra, Apache Tomcat, Tomcat, Apache Lucene, Apache Solr, Apache Hadoop, Hadoop, Apache Pulsar, Pulsar, Apache Spark, Spark, Apache TinkerPop, TinkerPop, Apache Kafka and Kafka are either registered trademarks or trademarks of the Apache Software Foundation or its subsidiaries in Canada, the United States and/or other countries. Kubernetes is the registered trademark of the Linux Foundation.

General Inquiries: +1 (650) 389-6000, info@datastax.com