Luna Streaming Pulsar transform functions configuration reference
Pulsar transform functions are low-code implementations of Pulsar functions.
All functions receive data from one or more input topics, apply user-supplied processing, and then publish the results to another topic. Custom functions are a powerful feature, but it can be discouraging to write custom code for common data transformation processes like type casting and concatenation.
To assist with these common tasks, DataStax offers built-in transform functions that are ready to deploy without writing custom code.
This reference explains how to configure transform functions generally, and it describes the parameters available for each function:
To deploy transform functions, see Deploy and manage functions.
Configure transform functions
The TransformFunction reads its configuration as JSON from a function’s userConfig parameter at runtime.
Transform functions are defined as an array of steps, where each step object represents a single transformation operation, including the type (function name) and its parameters.
If parameters are omitted, default values are used where available.
For example:
{
"steps": [
{
"type": "drop-fields",
"fields": "keyField1,keyField2",
"part": "key"
}
]
}
|
You can use |
You can combine multiple transform functions into a pipeline that progressively modifies records as they pass through each step.
Transform functions are performed in the order in which they appear in the steps array.
For example, the following configuration runs the drop-fields, merge-key-value, unwrap-key-value, and cast transform functions consecutively:
{
"steps": [
{
"type": "drop-fields",
"fields": "keyField1,keyField2",
"part": "key"
},
{
"type": "merge-key-value"
},
{
"type": "unwrap-key-value"
},
{
"type": "cast",
"schema-type": "STRING"
}
]
}
Using the preceding pipeline, the following example illustrates how a KeyValue<AVRO, AVRO> input record is progressively transformed by each step:
# Original input record
{key={keyField1: key1, keyField2: key2, keyField3: key3}, value={valueField1: value1, valueField2: value2, valueField3: value3}}
# drop-fields removes `keyField1` and `keyField2` from the `key`
{key={keyField3: key3}, value={valueField1: value1, valueField2: value2, valueField3: value3}} (KeyValue<AVRO, AVRO>)
# merge-key-value combines the `key` into the `value`
{key={keyField3: key3}, value={keyField3: key3, valueField1: value1, valueField2: value2, valueField3: value3}} (KeyValue<AVRO, AVRO>)
# unwrap-key-value extracts the `value` and drops the `key`
{keyField3: key3, valueField1: value1, valueField2: value2, valueField3: value3} (AVRO)
# cast converts the record to STRING type
{"keyField3": "key3", "valueField1": "value1", "valueField2": "value2", "valueField3": "value3"} (STRING)
Expression language
Pulsar transform functions use an expression language to evaluate when conditions and expression parameters.
The expression language supports several operators and functions.
You can use dot notation (.) to access field properties or map keys.
For example, value.field1.subField, properties.key1, or "fn:concat(value.valueField1, value.valueField2)".
- Arithmetic operators
-
-
Addition:
+ -
Subtraction:
-(binary and unary) -
Multiplication:
* -
Division:
/,div -
Modulus:
%,mod
-
- Logical operators
-
-
Logical AND:
and,&& -
Logical OR:
or,|| -
Logical NOT:
not,!
-
- Relational operators
-
-
Equal:
==,eq -
Not equal:
!=,ne -
Less than:
<,lt -
Greater than:
>,gt -
Less than or equal:
⇐,le -
Greater than or equal:
>=,ge
-
- Functions
-
Utility methods are available under the
fnnamespace. For example, to calculate the current time, usefn:now().-
uppercase(input): Changes the capitalization of a string input. If the input isn’t a string, it attempts a string conversion. If the input isnull, it returnsnull. -
lowercase(input): Changes the capitalization of an input string. If the input isn’t a string, it attempts a string conversion. If the input isnull, it returnsnull. -
contains(input, value): Returnstrueif the given value exists in the input. If either argument isnull, it returnsfalse. If either argument isn’t a string, it attempts string conversion. -
trim(input): Returns the input string with any leading and trailing spaces removed. If the input isn’t a string, it attempts a string conversion. -
concat(input1, input2): Returns a string concatenation of two inputs. If either input isnull, it is treated as an empty string. For example,"fn:concat(value.field, '_suffix')"concatenates the contents ofvalue.fieldand_suffixinto a single string. -
coalesce(value, valueIfNull): Returns a field’s actual value if it isn’tnull, otherwise it returns the fallback (valueIfNull). For example,coalesce(value.field1, 'default')returns the value ofvalue.field1if it exists, otherwise it returns the string'default'. -
now(): Returns the current epoch millis. -
dateadd(input, delta, unit): Performs date/time arithmetic operations on the input date/time.-
input: The starting date/time, expressed as either epoch millis or an RFC3339-formated timestamp like2022-10-14T10:15:30+01:00. -
delta: The amount of theunitto add to theinput. For subtraction, provide a negative value. -
unit: The unit of time to add or subtract. Can be one ofyears,months,days,hours,minutes,seconds, ormillis.
-
-
Conditional steps
Every transform pipeline step can accept an optional when condition that is evaluated at step runtime against the current records.
When writing when conditions, be aware of any record transformation that occurs in the transform pipeline prior to the when condition.
The condition is evaluated against the current state of the record at that point in the pipeline.
This can be different from the original input record.
You can use the expression language syntax in when statements to define the evaluation logic.
when conditions can access the following record attributes for evaluation:
-
key: The key portion of the record in aKeyValueschema. -
value: The value portion of the record in aKeyValueschema, or the message payload itself. -
messageKey: An optional key that messages can be tagged with, such as the partition key. -
topicName: An optional attribute containing the name of the topic where the record originated, also known as the input topic. -
destinationTopic: The name of the topic where the transformed record will be sent, also known as the output topic. -
eventTime: An optional timestamp attached to a record by its source, such as the original timestamp attached to a Pulsar message. -
properties: Optional user-defined properties that can be attached to a record.
Use dot notation (.) to access sub-properties within an attribute that has a schema-full key or value.
For example, key.keyField1 or value.valueField1.nestedValueField.
Example: when expressions and evaluations
For this example, assume that a transform pipeline includes when conditions with the following expressions:
-
key.compound.uuid == 'uuidValue': Does theuuidfield in thekey.compoundobject equal the string valueuuidValue? -
key.compound.timestamp ⇐ 10: Is thetimestampfield in thekey.compoundobject less than or equal to10? -
value.first == 'f1' && value.last.toUpperCase() == 'L1': Does thefirstfield in thevalueobject contain the exact stringf1, and does thelastfield in thevalueobject, when converted to uppercase, match the exact stringL1? -
value.rank ⇐ 1 && value.address.substring(0, 3) == 'abc': Is therankfield in thevalueobject less than or equal to1, and does an extracted substring from theaddressfield in thevalueobject match the exact stringabc?
Then, assume that the expressions are evaluated against the following KeyValue record:
{
"key": {
"compound": {
"uuid": "uuidValue",
"timestamp": 1663616014
},
"value" : {
"first" : "f1",
"last" : "l1",
"rank" : 1,
"address" : {
"zipcode" : "abc-def"
}
}
}
}
The when conditions are evaluated as follows:
-
key.compound.uuid == 'uuidValue'evaluates to true and triggers the conditional step. -
key.compound.timestamp ⇐ 10evaluates to false and doesn’t trigger the conditional step. -
value.first == 'f1' && value.last.toUpperCase() == 'L1'evaluates to true and triggers the conditional step. -
value.rank ⇐ 1 && value.address.substring(0, 3) == 'abc'evaluates to true and triggers the conditional step.
Cast
The cast transform function transforms the data to a target compatible schema.
The step name is cast, and the UserConfig is specified as follows:
{
"steps": [
{
"type": "cast",
"schema-type": "STRING",
"part": "value" # Optional
}
]
}
| Parameter | Description |
|---|---|
|
The target schema type.
Only |
|
When using
|
For example, given an AVRO message with the following payload:
{field1: value1, field2: value2}
The function casts the keys and values as strings if part isn’t set in the function configuration:
{"field1": "value1", "field2": "value2"}
If part is set to either key or value, only the specified part is cast to string while the other part remains unchanged.
For example, if part is set to value, the function casts only the value portions:
{field1: "value1", field2: "value2"}
Compute
The compute transform function computes field values based on an expression that is evaluated at runtime.
If the field already exists, the function overwrites the existing value.
The step name is compute and the function’s UserConfig is configured as follows:
{
"steps": [
{
"type": "compute",
"fields": [
{ "name": "key.newKeyField", "expression" : "5*3", "type": "INT32" },
{ "name": "value.valueField", "expression" : "fn:concat(value.valueField, '_suffix')", "type": "STRING" }
]
}
]
}
| Parameter | Description |
|---|---|
|
An array of JSON objects describing how to calculate values for fields.
Each object represents one field.
Each object can contain the field parameters: |
|
Used in a To compute the fields in You can also compute values from the message headers: Be aware that |
|
Used in a Supports the expression language syntax. |
|
Used in a The following types are supported:
|
|
Can be used in a If If |
Multiply and concatenate example
The following compute transform function, created with the Pulsar Admin CLI, applies preprocessing to outgoing messages.
Specifically, it performs multiplication and concatenation operations.
./bin/pulsar-admin functions create \
--tenant ${TENANT} \
--namespace ${NAMESPACE} \
--name transform-function \
--inputs persistent://${TENANT}/${NAMESPACE}/${INPUT_TOPIC} \
--output persistent://${TENANT}/${NAMESPACE}/${OUTPUT_TOPIC} \
--classname com.datastax.oss.pulsar.functions.transforms.TransformFunction \
--jar functions/pulsar-transformations-2.0.1.nar \
--transform-function-config '{"steps": [{"type": "compute", "fields":[
{"name": "key.newKeyField", "expression" : "5*3", "type": "INT32"},
{"name": "value.valueField", "expression" : "fn:concat(value.valueField, '_suffix')", "type": "STRING"}]}
]}'
Given an AVRO message with the following payload:
{key={keyField: key}, value={valueField: value}} (KeyValue<AVRO, AVRO>)
The function outputs the following record to the output topic:
{key={keyField: key, newKeyField: 15}, value={valueField: value_suffix}} (KeyValue<AVRO, AVRO>)
Message routing example
The following compute transform function, created with the Pulsar Admin CLI applies preprocessing to outgoing messages.
Specifically, it reroutes the destination topics and modifies message properties.
./bin/pulsar-admin functions create \
--tenant ${TENANT} \
--namespace ${NAMESPACE} \
--name transform-function \
--inputs persistent://${TENANT}/${NAMESPACE}/${INPUT_TOPIC} \
--output persistent://${TENANT}/${NAMESPACE}/${OUTPUT_TOPIC} \
--classname com.datastax.oss.pulsar.functions.transforms.TransformFunction \
--jar functions/pulsar-transformations-2.0.1.nar \
--transform-function-config `{"steps": [{"type": "compute", "fields":[
{"name": "destinationTopic", "expression" : "'routed'", "type": "STRING"},
{"name": "properties.k1", "expression" : "'overwritten'", "type": "STRING"},
{"name": "properties.k2", "expression" : "'new'", "type": "STRING"}]}
]}`
Given an AVRO message with the following payload:
{key={keyField: key}, value={valueField: value}} (KeyValue<AVRO, AVRO>), headers=destinationTopic: out1, properties: {k1:v1}
The function outputs the following record to the output topic:
{key={keyField: key}, value={valueField: value}} (KeyValue<AVRO, AVRO>), headers=destinationTopic:routed, properties: {k1:overwritten, k2:new}
Drop
The drop transform function drops a record from further processing.
The step name is drop, and the UserConfig is configured as follows:
{
"steps": [
{
"type": "drop",
"when": "value.firstName == value1" # Optional
}
]
}
|
The All records are dropped if |
There is no output for drop steps.
The record either continues to be processed by the transform function pipeline, or it is discarded.
Drop fields
The drop-fields transform function drops fields from AVRO structured data.
The step name is drop-fields, and the UserConfig is configured as follows:
{
"steps": [
{
"type": "drop-fields",
"fields": "password,other",
"part": "value" # Optional
}
]
}
| Parameter | Description |
|---|---|
|
A comma-separated list of fields to drop. |
|
When using
|
For example, given an AVRO message with the following payload:
{name: value1, password: value2}
The preceding drop-fields function looks for the fields specified in the fields parameter (password and other), and then outputs only the fields that weren’t dropped:
{"name": "value1"}
Flatten
The flatten transform function converts multi-level (nested) structured data into single-level structured data.
It generates the names of the new, flattened fields by concatenating the original nested field names.
For example, the structure {grandparent{parent{child1,child2}}} is flattened into two fields named grandparent_parent_child1 and grandparent_parent_child2.
The step name is flatten, and the UserConfig is configured as follows:
{
"steps": [
{
"type": "flatten",
"delimiter": "_", # Optional
"part": "value" # Optional
}
]
}
| Parameter | Description |
|---|---|
|
The delimiter character to use when concatenating the new field names.
The default is underscore ( |
|
When using
|
For example, given an AVRO message with the following payload:
{field1: {field11: value11, field12: value12}}
The flatten function flattens the structure using the specified delimiter (or _ if no delimiter is specified):
{field1_field11: value11, field1_field12: value12}
If part is set to either key or value, only the specified part of the message is flattened, and the other part remains unchanged.
Merge KeyValue
The merge-key-value transform function merges the fields of KeyValue records if the key and value are both structured data with the same schema type.
Only AVRO structured data is supported.
The step name is merge-key-value and the UserConfig is specified as follows:
{
"steps": [
{
"type": "merge-key-value"
}
]
}
This function has no additional parameters.
For example, given an AVRO message with the following payload:
{key={keyField: key}, value={valueField: value}}
The merge-key-value function combines the KeyValue fields because both are AVRO structured data:
{key={keyField: key}, value={keyField: key, valueField: value}}
Unwrap KeyValue
For KeyValue records only, the unwrap-key-value transform function extracts the key or value from the KeyValue, and then makes it into the record value.
The step name is unwrap-key-value, and the UserConfig is specified as follows:
{
"steps": [
{
"type": "unwrap-key-value",
"unwrapKey": false
}
]
}
The unwrapKey parameter is optional.
By default, unwrapKey is false, and the function unwraps the value from the KeyValue.
Set this parameter to true to unwrap the key instead.
For example, given an AVRO message with the following payload:
{key={keyField: key}, value={valueField: value}}
The function extracts the value from the KeyValue, and then outputs only the contents of the unwrapped portion:
{valueField: value}