Transform Functions
A Pulsar transform function is a low-code implementation of Pulsar functions.
Functions receive data from one or more input topics, apply user-supplied processing, and publish the results to another topic.
Custom functions are a powerful feature, but for common data transformations, we now include Transform Functions.
Drop fields, flatten, compute, and more without coding or deep schema knowledge.
DataStax has created the following transform functions for common data transforms, but we’re always experimenting with new ones.
Check back as the list grows, or let us know some functions you’d find helpful in your deployment.
Unqualified orgs can use transform functions without upgrading to a Pay As You Go plan.
Transforms
-
Cast: The cast transform function modifies the key or value schema to a target compatible schema.
-
Compute: The compute transform function computes new field values based on an expression evaluated at runtime. If the field already exists, it will be overwritten.
-
Drop-fields: The drop-fields transform function drops fields from structured data.
-
Drop: The drop transform function drops a record from further processing.
-
Flatten: The flatten transform function flattens structured data.
-
Merge KeyValue: The merge KeyValue transform function merges the fields of KeyValue records where both the key and value are structured data with the same schema type.
-
Unwrap KeyValue: The unwrap KeyValue transform function extracts the KeyValue’s key or value, and then makes it the record value if the record is a KeyValue.
Configuration
The TransformFunction
reads its configuration as JSON
from the Function userConfig
parameter in the format:
{
"steps": [
{
"type": "drop-fields", "fields": "keyField1,keyField2", "part": "key"
},
{
"type": "merge-key-value"
},
{
"type": "unwrap-key-value"
},
{
"type": "cast", "schema-type": "STRING"
}
]
}
Transform functions are performed in the order in which they appear in the steps
array.
Each step is defined by its type
and uses its own arguments.
Each step can be dynamically toggled on or off by supplying a when
condition that evaluates to true or false.
For example, if the previous configuration is applied to a KeyValue<AVRO, AVRO>
input record, the following transformed values are returned after each step:
# Original input record
{key={keyField1: key1, keyField2: key2, keyField3: key3}, value={valueField1: value1, valueField2: value2, valueField3: value3}}
# Transformations
(KeyValue<AVRO, AVRO>)
|
| "type": "drop-fields", "fields": "keyField1,keyField2", "part": "key"
|
{key={keyField3: key3}, value={valueField1: value1, valueField2: value2, valueField3: value3}} (KeyValue<AVRO, AVRO>)
|
| "type": "merge-key-value"
|
{key={keyField3: key3}, value={keyField3: key3, valueField1: value1, valueField2: value2, valueField3: value3}} (KeyValue<AVRO, AVRO>)
|
| "type": "unwrap-key-value"
|
{keyField3: key3, valueField1: value1, valueField2: value2, valueField3: value3} (AVRO)
|
| "type": "cast", "schema-type": "STRING"
|
{"keyField3": "key3", "valueField1": "value1", "valueField2": "value2", "valueField3": "value3"} (STRING)
Deploy with Pulsar CLI
Luna Streaming 2.10+ is required to deploy custom functions in Pulsar.
The transform function .nar
lives in the /functions
directory of your Pulsar deployment.
-
Pulsar standalone
-
Pulsar cluster
To deploy the built-in transform function locally in Pulsar standalone, do the following:
-
Start Pulsar standalone:
./bin/pulsar standalone
-
Create a transform function in
localrun
mode:./bin/pulsar-admin functions localrun \ --jar functions/pulsar-transformations-2.0.1.nar \ --classname com.datastax.oss.pulsar.functions.transforms.TransformFunction \ --inputs my-input-topic \ --output my-output-topic \ --user-config '{"steps": [{"type": "drop-fields", "fields": "password"}, {"type": "merge-key-value"}, {"type": "unwrap-key-value"}, {"type": "cast", "schema-type": "STRING"}]}'
To deploy a built-in transform function to a Pulsar cluster, do the following:
-
Create a built-in transform function with the Pulsar CLI:
./bin/pulsar-admin functions create \ --tenant $TENANT \ --namespace $NAMESPACE \ --name transform-function \ --inputs persistent://$TENANT/$NAMESPACE/$INPUT_TOPIC --output persistent://$TENANT/$NAMESPACE/$OUTPUT_TOPIC \ --classname com.datastax.oss.pulsar.functions.transforms.TransformFunction \ --jar functions/pulsar-transformations-2.0.1.nar
Result
Created successfully
-
Confirm your function has been created:
./bin/pulsar-admin functions list --tenant $TENANT
Result
cast-function flatten-function transform-function transform-function-2
Deploy with Astra Streaming
Deploy transform functions in the Functions tab of the Astra Portal.
The process is similar to creating a function in the Astra Portal, but with a few additional steps.
-
After naming your new function, select the Use DataStax transform function option.
-
Select a transform function from the list of available functions:
-
Select the transform function’s namespace and input topic(s).
-
Select the transform function’s namespace, output topic, and log topic.
The log topic is a separate output topic for messages containing additional
loglevel
,fqn
, andinstance
properties. -
Specify advanced configuration options, if applicable.
-
Pass JSON configuration values with your function, if applicable.
For more, see the transform function Configuration table.
-
Select Create. The transform function will initialize and begin processing data changes.
-
Confirm your function has been created with the Pulsar CLI:
./bin/pulsar-admin functions list --tenant $TENANT
Result
cast-function flatten-function transform-function transform-function-2