Transform Functions
A Pulsar transform function is a low-code implementation of Pulsar functions.
Functions receive data from one or more input topics, apply user-supplied processing, and publish the results to another topic.
Custom functions are a powerful feature, but for common data transformations, we now include Transform Functions.
Drop-fields, Flatten, Compute, and more without coding or deep schema knowledge.
DataStax has created the following transform functions for common data transforms, but we’re always experimenting with new ones.
Check back as the list grows, or let us know some functions you’d find helpful in your deployment.
Unqualified orgs can use transform functions without upgrading to a Pay As You Go plan.
Transforms
Cast
The cast
transform function modifies the key or value schema to a target compatible schema.
Compute
The compute
transform function computes new field values based on an expression evaluated at runtime. If the field already exists, it will be overwritten.
Drop-fields
The drop-fields
transform function drops fields from structured data.
Merge KeyValue
The merge-key-value
transform function merges the fields of KeyValue records where both the key and value are structured data with the same schema type.
Unwrap key value
The unwrap-key-value
transform function extracts the KeyValue’s key or value and makes it the record value (if the record is a KeyValue).
Configuration
The TransformFunction
reads its configuration as JSON
from the Function userConfig
parameter in the format:
{
"steps": [
{
"type": "drop-fields", "fields": "keyField1,keyField2", "part": "key"
},
{
"type": "merge-key-value"
},
{
"type": "unwrap-key-value"
},
{
"type": "cast", "schema-type": "STRING"
}
]
}
Transform functions are performed in the order in which they appear in the steps
array.
Each step is defined by its type
and uses its own arguments.
Each step can be dynamically toggled on or off by supplying a when
condition that evaluates to true
or false
.
For example, the above configuration applied on a KeyValue<AVRO, AVRO>
input record with value {key={keyField1: key1, keyField2: key2, keyField3: key3}, value={valueField1: value1, valueField2: value2, valueField3: value3}}
will return transformed values after each step:
{key={keyField1: key1, keyField2: key2, keyField3: key3}, value={valueField1: value1, valueField2: value2, valueField3: value3}}(KeyValue<AVRO, AVRO>)
|
| ”type": "drop-fields", "fields": "keyField1,keyField2”, "part": "key”
|
{key={keyField3: key3}, value={valueField1: value1, valueField2: value2, valueField3: value3}} (KeyValue<AVRO, AVRO>)
|
| "type": "merge-key-value"
|
{key={keyField3: key3}, value={keyField3: key3, valueField1: value1, valueField2: value2, valueField3: value3}} (KeyValue<AVRO, AVRO>)
|
| "type": "unwrap-key-value"
|
{keyField3: key3, valueField1: value1, valueField2: value2, valueField3: value3} (AVRO)
|
| "type": "cast", "schema-type": "STRING"
|
{"keyField3": "key3", "valueField1": "value1", "valueField2": "value2", "valueField3": "value3"} (STRING)
Deploy with Pulsar CLI
Luna Streaming 2.10+ is required to deploy custom functions in Pulsar. |
The transform function .nar
lives in the /functions
directory of your Pulsar deployment.
Pulsar standalone
To deploy the built-in transform function locally in Pulsar standalone:
-
Start Pulsar standalone:
./bin/pulsar standalone
-
Create a transform function in
localrun
mode:./bin/pulsar-admin functions localrun \ --jar functions/pulsar-transformations-2.0.1.nar \ --classname com.datastax.oss.pulsar.functions.transforms.TransformFunction \ --inputs my-input-topic \ --output my-output-topic \ --user-config '{"steps": [{"type": "drop-fields", "fields": "password"}, {"type": "merge-key-value"}, {"type": "unwrap-key-value"}, {"type": "cast", "schema-type": "STRING"}]}'
Pulsar cluster
To deploy a built-in transform function to a Pulsar cluster:
-
Create a built-in transform function:
-
Pulsar Admin
-
Result
./bin/pulsar-admin functions create \ --tenant $TENANT \ --namespace $NAMESPACE \ --name transform-function \ --inputs persistent://$TENANT/$NAMESPACE/$INPUT_TOPIC --output persistent://$TENANT/$NAMESPACE/$OUTPUT_TOPIC \ --classname com.datastax.oss.pulsar.functions.transforms.TransformFunction \ --jar functions/pulsar-transformations-2.0.1.nar
Created successfully
-
-
Confirm your function has been created with the Pulsar CLI:
-
Pulsar Admin
-
Result
./bin/pulsar-admin functions list --tenant $TENANT
cast-function flatten-function transform-function transform-function-2
-
Deploy with Astra Streaming
Deploy transform functions in the Functions tab of the Astra Portal.
The process is similar to creating a function in the Astra Portal, but with a few additional steps.
-
After naming your new function, select the Use DataStax transform function option.
-
Select a transform function from the list of available functions:
-
Select the transform function’s namespace and input topic(s).
-
Select the transform function’s namespace, output topic, and log topic. The log topic is a separate output topic for messages containing additional
loglevel
,fqn
, andinstance
properties. -
Specify advanced configuration options.
-
Pass JSON configuration values with your function, if applicable. For more, see the transform function Configuration table.
-
Select Create. The transform function will initialize and begin processing data changes.
-
Confirm your function has been created with the Pulsar CLI:
-
Pulsar Admin
-
Result
./bin/pulsar-admin functions list --tenant $TENANT
cast-function flatten-function transform-function transform-function-2
-
What’s next?
For more, see Astra Streaming functions or the Pulsar documentation.