Luna Streaming Pulsar transform functions configuration reference

Pulsar transform functions are low-code implementations of Pulsar functions.

All functions receive data from one or more input topics, apply user-supplied processing, and then publish the results to another topic. Custom functions are a powerful feature, but it can be discouraging to write custom code for common data transformation processes like type casting and concatenation.

To assist with these common tasks, DataStax offers built-in transform functions that are ready to deploy without writing custom code.

This reference explains how to configure transform functions generally, and it describes the parameters available for each function:

To deploy transform functions, see Deploy and manage functions.

Configure transform functions

The TransformFunction reads its configuration as JSON from a function’s userConfig parameter at runtime.

Transform functions are defined as an array of steps, where each step object represents a single transformation operation, including the type (function name) and its parameters. If parameters are omitted, default values are used where available. For example:

{
  "steps": [
    {
      "type": "drop-fields",
      "fields": "keyField1,keyField2",
      "part": "key"
    }
  ]
}

You can use when conditions for dynamic transformations that only run if the condition is true.

You can combine multiple transform functions into a pipeline that progressively modifies records as they pass through each step. Transform functions are performed in the order in which they appear in the steps array. For example, the following configuration runs the drop-fields, merge-key-value, unwrap-key-value, and cast transform functions consecutively:

{
  "steps": [
    {
      "type": "drop-fields",
      "fields": "keyField1,keyField2",
      "part": "key"
    },
    {
      "type": "merge-key-value"
    },
    {
      "type": "unwrap-key-value"
    },
    {
      "type": "cast",
      "schema-type": "STRING"
    }
  ]
}

Using the preceding pipeline, the following example illustrates how a KeyValue<AVRO, AVRO> input record is progressively transformed by each step:

# Original input record
{key={keyField1: key1, keyField2: key2, keyField3: key3}, value={valueField1: value1, valueField2: value2, valueField3: value3}}

# drop-fields removes `keyField1` and `keyField2` from the `key`
{key={keyField3: key3}, value={valueField1: value1, valueField2: value2, valueField3: value3}} (KeyValue<AVRO, AVRO>)

# merge-key-value combines the `key` into the `value`
{key={keyField3: key3}, value={keyField3: key3, valueField1: value1, valueField2: value2, valueField3: value3}} (KeyValue<AVRO, AVRO>)

# unwrap-key-value extracts the `value` and drops the `key`
{keyField3: key3, valueField1: value1, valueField2: value2, valueField3: value3} (AVRO)

# cast converts the record to STRING type
{"keyField3": "key3", "valueField1": "value1", "valueField2": "value2", "valueField3": "value3"} (STRING)

Expression language

Pulsar transform functions use an expression language to evaluate when conditions and expression parameters.

The expression language supports several operators and functions.

You can use dot notation (.) to access field properties or map keys. For example, value.field1.subField, properties.key1, or "fn:concat(value.valueField1, value.valueField2)".

Arithmetic operators
  • Addition: +

  • Subtraction: - (binary and unary)

  • Multiplication: *

  • Division: /, div

  • Modulus: %, mod

Logical operators
  • Logical AND: and, &&

  • Logical OR: or, ||

  • Logical NOT: not, !

Relational operators
  • Equal: ==, eq

  • Not equal: !=, ne

  • Less than: <, lt

  • Greater than: >, gt

  • Less than or equal: , le

  • Greater than or equal: >=, ge

Functions

Utility methods are available under the fn namespace. For example, to calculate the current time, use fn:now().

  • uppercase(input): Changes the capitalization of a string input. If the input isn’t a string, it attempts a string conversion. If the input is null, it returns null.

  • lowercase(input): Changes the capitalization of an input string. If the input isn’t a string, it attempts a string conversion. If the input is null, it returns null.

  • contains(input, value): Returns true if the given value exists in the input. If either argument is null, it returns false. If either argument isn’t a string, it attempts string conversion.

  • trim(input): Returns the input string with any leading and trailing spaces removed. If the input isn’t a string, it attempts a string conversion.

  • concat(input1, input2): Returns a string concatenation of two inputs. If either input is null, it is treated as an empty string. For example, "fn:concat(value.field, '_suffix')" concatenates the contents of value.field and _suffix into a single string.

  • coalesce(value, valueIfNull): Returns a field’s actual value if it isn’t null, otherwise it returns the fallback (valueIfNull). For example, coalesce(value.field1, 'default') returns the value of value.field1 if it exists, otherwise it returns the string 'default'.

  • now(): Returns the current epoch millis.

  • dateadd(input, delta, unit): Performs date/time arithmetic operations on the input date/time.

    • input: The starting date/time, expressed as either epoch millis or an RFC3339-formated timestamp like 2022-10-14T10:15:30+01:00.

    • delta: The amount of the unit to add to the input. For subtraction, provide a negative value.

    • unit: The unit of time to add or subtract. Can be one of years, months, days, hours, minutes, seconds, or millis.

Conditional steps

Every transform pipeline step can accept an optional when condition that is evaluated at step runtime against the current records.

When writing when conditions, be aware of any record transformation that occurs in the transform pipeline prior to the when condition. The condition is evaluated against the current state of the record at that point in the pipeline. This can be different from the original input record.

You can use the expression language syntax in when statements to define the evaluation logic.

when conditions can access the following record attributes for evaluation:

  • key: The key portion of the record in a KeyValue schema.

  • value: The value portion of the record in a KeyValue schema, or the message payload itself.

  • messageKey: An optional key that messages can be tagged with, such as the partition key.

  • topicName: An optional attribute containing the name of the topic where the record originated, also known as the input topic.

  • destinationTopic: The name of the topic where the transformed record will be sent, also known as the output topic.

  • eventTime: An optional timestamp attached to a record by its source, such as the original timestamp attached to a Pulsar message.

  • properties: Optional user-defined properties that can be attached to a record.

Use dot notation (.) to access sub-properties within an attribute that has a schema-full key or value. For example, key.keyField1 or value.valueField1.nestedValueField.

Example: when expressions and evaluations

For this example, assume that a transform pipeline includes when conditions with the following expressions:

  • key.compound.uuid == 'uuidValue': Does the uuid field in the key.compound object equal the string value uuidValue?

  • key.compound.timestamp ⇐ 10: Is the timestamp field in the key.compound object less than or equal to 10?

  • value.first == 'f1' && value.last.toUpperCase() == 'L1': Does the first field in the value object contain the exact string f1, and does the last field in the value object, when converted to uppercase, match the exact string L1?

  • value.rank ⇐ 1 && value.address.substring(0, 3) == 'abc': Is the rank field in the value object less than or equal to 1, and does an extracted substring from the address field in the value object match the exact string abc?

Then, assume that the expressions are evaluated against the following KeyValue record:

{
  "key": {
    "compound": {
      "uuid": "uuidValue",
      "timestamp": 1663616014
    },
    "value" : {
      "first" : "f1",
      "last" : "l1",
      "rank" : 1,
      "address" : {
        "zipcode" : "abc-def"
      }
    }
  }
}

The when conditions are evaluated as follows:

  • key.compound.uuid == 'uuidValue' evaluates to true and triggers the conditional step.

  • key.compound.timestamp ⇐ 10 evaluates to false and doesn’t trigger the conditional step.

  • value.first == 'f1' && value.last.toUpperCase() == 'L1' evaluates to true and triggers the conditional step.

  • value.rank ⇐ 1 && value.address.substring(0, 3) == 'abc' evaluates to true and triggers the conditional step.

Cast

The cast transform function transforms the data to a target compatible schema.

The step name is cast, and the UserConfig is specified as follows:

{
  "steps": [
    {
      "type": "cast",
      "schema-type": "STRING",
      "part": "value"  # Optional
    }
  ]
}
Cast transform function parameters
Parameter Description

schema-type

The target schema type. Only STRING is available.

part

When using cast on KeyValue data, set the part of the message to cast:

  • key: The transform function runs on the key portion only.

  • value: The transform function runs on the value portion only.

  • null or absent: The transform function runs on both the key and value portions.

For example, given an AVRO message with the following payload:

{field1: value1, field2: value2}

The function casts the keys and values as strings if part isn’t set in the function configuration:

Result
{"field1": "value1", "field2": "value2"}

If part is set to either key or value, only the specified part is cast to string while the other part remains unchanged. For example, if part is set to value, the function casts only the value portions:

Result
{field1: "value1", field2: "value2"}

Compute

The compute transform function computes field values based on an expression that is evaluated at runtime. If the field already exists, the function overwrites the existing value.

The step name is compute and the function’s UserConfig is configured as follows:

{
  "steps": [
    {
      "type": "compute",
      "fields": [
        { "name": "key.newKeyField", "expression" : "5*3", "type": "INT32" },
        { "name": "value.valueField", "expression" : "fn:concat(value.valueField, '_suffix')", "type": "STRING" }
      ]
    }
  ]
}
Compute transform function parameters
Parameter Description

fields

An array of JSON objects describing how to calculate values for fields. Each object represents one field. Each object can contain the field parameters: name, expression, type, and optional.

name

Used in a field object to specify the name of the field to compute.

To compute the fields in KeyValue records, use the key. or value. prefix to select the part of the message to compute. For example, key.fieldName or value.fieldName.

You can also compute values from the message headers: destinationTopic, messageKey, and properties..

Be aware that properties is a map of key-value pairs that are referenced using dot notation, such as properties.key0.

expression

Used in a field object to define the expression to evaluate at runtime. This is the computation logic that generates the values that is assigned to the field.

Supports the expression language syntax.

type

Used in a field object to set the data type of the computed field. This becomes the new field’s schema type in the transformed message.

The following types are supported:

  • STRING:

  • INT32: The computed value is treated as a 32-bit integer, such as the result of a mathematical expression like 5 * 3, or transformation of a string to an integer like "2147483647".

  • INT64: The computed value is treated as a 64-bit integer, such as the result of a mathematical expression like 1 + 1, or transformation of a string to an integer like "9223372036854775807".

  • FLOAT: The computed value is treated as a 32-bit floating point, such as the result of a mathematical expression like 1.1 + 1.1, or transformation of a string to a float like "3402823999.999".

  • DOUBLE: The computed value is treated as a 64-bit floating point, such as the result of a mathematical expression like 1.79769313486231570e+308 or "7.999 + 7.999".

  • BOOLEAN: The computed value is treated as a Boolean, either from the literal values true or false, or a logical expression that evaluates as true or false, such as logical expressions like 1 == 1 or value.stringField == 'matching string'.

  • DATE: The computed value is expected to be a date without a time-zone in RFC3339 format, such as "2021-12-03".

  • TIME: The computed value is expected to be a time without a time-zone in RFC3339 format, such as "20:15:45".

  • DATETIME: The computed value is expected to be a date-time with an offset from UTC in RFC3339 format, such as "2022-10-02T01:02:03+02:00", "2019-10-02T01:02:03Z", or the result of an expression language function like fn:now().

optional

Can be used in a field object to specify whether the computed field is optional in the transformed message’s schema. This is useful for expressions that can resolve to null.

If true (default), then the field is optional.

If false, then the field is required, and it must have a non-null value in the transformed message.

Multiply and concatenate example

The following compute transform function, created with the Pulsar Admin CLI, applies preprocessing to outgoing messages. Specifically, it performs multiplication and concatenation operations.

./bin/pulsar-admin functions create \
--tenant ${TENANT} \
--namespace ${NAMESPACE} \
--name transform-function \
--inputs persistent://${TENANT}/${NAMESPACE}/${INPUT_TOPIC} \
--output persistent://${TENANT}/${NAMESPACE}/${OUTPUT_TOPIC} \
--classname com.datastax.oss.pulsar.functions.transforms.TransformFunction \
--jar functions/pulsar-transformations-2.0.1.nar \
--transform-function-config '{"steps": [{"type": "compute", "fields":[
    {"name": "key.newKeyField", "expression" : "5*3", "type": "INT32"},
    {"name": "value.valueField", "expression" : "fn:concat(value.valueField, '_suffix')", "type": "STRING"}]}
]}'

Given an AVRO message with the following payload:

{key={keyField: key}, value={valueField: value}} (KeyValue<AVRO, AVRO>)

The function outputs the following record to the output topic:

Result
{key={keyField: key, newKeyField: 15}, value={valueField: value_suffix}} (KeyValue<AVRO, AVRO>)
Message routing example

The following compute transform function, created with the Pulsar Admin CLI applies preprocessing to outgoing messages. Specifically, it reroutes the destination topics and modifies message properties.

./bin/pulsar-admin functions create \
--tenant ${TENANT} \
--namespace ${NAMESPACE} \
--name transform-function \
--inputs persistent://${TENANT}/${NAMESPACE}/${INPUT_TOPIC} \
--output persistent://${TENANT}/${NAMESPACE}/${OUTPUT_TOPIC} \
--classname com.datastax.oss.pulsar.functions.transforms.TransformFunction \
--jar functions/pulsar-transformations-2.0.1.nar \
--transform-function-config `{"steps": [{"type": "compute", "fields":[
    {"name": "destinationTopic", "expression" : "'routed'", "type": "STRING"},
    {"name": "properties.k1", "expression" : "'overwritten'", "type": "STRING"},
    {"name": "properties.k2", "expression" : "'new'", "type": "STRING"}]}
]}`

Given an AVRO message with the following payload:

{key={keyField: key}, value={valueField: value}} (KeyValue<AVRO, AVRO>), headers=destinationTopic: out1, properties: {k1:v1}

The function outputs the following record to the output topic:

Result
{key={keyField: key}, value={valueField: value}} (KeyValue<AVRO, AVRO>), headers=destinationTopic:routed, properties: {k1:overwritten, k2:new}

Drop

The drop transform function drops a record from further processing.

The step name is drop, and the UserConfig is configured as follows:

{
  "steps": [
    {
      "type": "drop",
      "when": "value.firstName == value1" # Optional
    }
  ]
}

The when parameter is optional, but it is required to selectively drop records. For example, the preceding function only drops records where the firstName field contains the exact string value1.

All records are dropped if when isn’t set.

There is no output for drop steps. The record either continues to be processed by the transform function pipeline, or it is discarded.

Drop fields

The drop-fields transform function drops fields from AVRO structured data.

The step name is drop-fields, and the UserConfig is configured as follows:

{
  "steps": [
    {
      "type": "drop-fields",
      "fields": "password,other",
      "part": "value" # Optional
    }
  ]
}
Drop fields transform function parameters
Parameter Description

fields

A comma-separated list of fields to drop.

part

When using drop-fields on KeyValue data, set the part of the message to drop fields from:

  • key: Fields are dropped from the key portion only.

  • value: Fields are dropped from the value portion only.

  • null or absent: Fields are dropped from both the key and value portions.

For example, given an AVRO message with the following payload:

{name: value1, password: value2}

The preceding drop-fields function looks for the fields specified in the fields parameter (password and other), and then outputs only the fields that weren’t dropped:

Result
{"name": "value1"}

Flatten

The flatten transform function converts multi-level (nested) structured data into single-level structured data.

It generates the names of the new, flattened fields by concatenating the original nested field names. For example, the structure {grandparent{parent{child1,child2}}} is flattened into two fields named grandparent_parent_child1 and grandparent_parent_child2.

The step name is flatten, and the UserConfig is configured as follows:

{
  "steps": [
    {
      "type": "flatten",
      "delimiter": "_",   # Optional
      "part": "value"     # Optional
    }
  ]
}
Flatten transform function parameters
Parameter Description

delimiter

The delimiter character to use when concatenating the new field names. The default is underscore (_).

part

When using flatten on KeyValue data, set the part of the message to flatten:

  • key: The transform function runs on the key portion only.

  • value: The transform function runs on the value portion only.

  • null or absent: The transform function runs on both the key and value portions.

For example, given an AVRO message with the following payload:

{field1: {field11: value11, field12: value12}}

The flatten function flattens the structure using the specified delimiter (or _ if no delimiter is specified):

Result
{field1_field11: value11, field1_field12: value12}

If part is set to either key or value, only the specified part of the message is flattened, and the other part remains unchanged.

Merge KeyValue

The merge-key-value transform function merges the fields of KeyValue records if the key and value are both structured data with the same schema type. Only AVRO structured data is supported.

The step name is merge-key-value and the UserConfig is specified as follows:

{
  "steps": [
    {
      "type": "merge-key-value"
    }
  ]
}

This function has no additional parameters.

For example, given an AVRO message with the following payload:

{key={keyField: key}, value={valueField: value}}

The merge-key-value function combines the KeyValue fields because both are AVRO structured data:

Result
{key={keyField: key}, value={keyField: key, valueField: value}}

Unwrap KeyValue

For KeyValue records only, the unwrap-key-value transform function extracts the key or value from the KeyValue, and then makes it into the record value.

The step name is unwrap-key-value, and the UserConfig is specified as follows:

{
  "steps": [
    {
      "type": "unwrap-key-value",
      "unwrapKey": false
    }
  ]
}

The unwrapKey parameter is optional. By default, unwrapKey is false, and the function unwraps the value from the KeyValue. Set this parameter to true to unwrap the key instead.

For example, given an AVRO message with the following payload:

{key={keyField: key}, value={valueField: value}}

The function extracts the value from the KeyValue, and then outputs only the contents of the unwrapped portion:

Result
{valueField: value}

Was this helpful?

Give Feedback

How can we improve the documentation?

© Copyright IBM Corporation 2026 | Privacy policy | Terms of use Manage Privacy Choices

Apache, Apache Cassandra, Cassandra, Apache Tomcat, Tomcat, Apache Lucene, Apache Solr, Apache Hadoop, Hadoop, Apache Pulsar, Pulsar, Apache Spark, Spark, Apache TinkerPop, TinkerPop, Apache Kafka and Kafka are either registered trademarks or trademarks of the Apache Software Foundation or its subsidiaries in Canada, the United States and/or other countries. Kubernetes is the registered trademark of the Linux Foundation.

General Inquiries: Contact IBM