Transform Functions

A Pulsar transform function is a low-code implementation of Pulsar functions.
Functions receive data from one or more input topics, apply user-supplied processing, and publish the results to another topic.
Custom functions are a powerful feature, but for common data transformations, we now include Transform Functions.
Drop-fields, Flatten, Compute, and more without coding or deep schema knowledge.
DataStax has created the following transform functions for common data transforms, but we’re always experimenting with new ones. Check back as the list grows, or let us know some functions you’d find helpful in your deployment.

Unqualified orgs can use transform functions without upgrading to a Pay As You Go plan.

Transforms

Cast

The cast transform function modifies the key or value schema to a target compatible schema.

Compute

The compute transform function computes new field values based on an expression evaluated at runtime. If the field already exists, it will be overwritten.

Drop-fields

The drop-fields transform function drops fields from structured data.

Drop

The drop transform function drops a record from further processing.

Flatten

The flatten transform function flattens structured data.

Merge KeyValue

The merge-key-value transform function merges the fields of KeyValue records where both the key and value are structured data with the same schema type.

Unwrap key value

The unwrap-key-value transform function extracts the KeyValue’s key or value and makes it the record value (if the record is a KeyValue).

Configuration

The TransformFunction reads its configuration as JSON from the Function userConfig parameter in the format:

{
  "steps": [
    {
      "type": "drop-fields", "fields": "keyField1,keyField2", "part": "key"
    },
    {
      "type": "merge-key-value"
    },
    {
      "type": "unwrap-key-value"
    },
    {
      "type": "cast", "schema-type": "STRING"
    }
  ]
}

Transform functions are performed in the order in which they appear in the steps array. Each step is defined by its type and uses its own arguments. Each step can be dynamically toggled on or off by supplying a when condition that evaluates to true or false.

For example, the above configuration applied on a KeyValue<AVRO, AVRO> input record with value {key={keyField1: key1, keyField2: key2, keyField3: key3}, value={valueField1: value1, valueField2: value2, valueField3: value3}} will return transformed values after each step:

{key={keyField1: key1, keyField2: key2, keyField3: key3}, value={valueField1: value1, valueField2: value2, valueField3: value3}}(KeyValue<AVRO, AVRO>)
           |
           | ”type": "drop-fields", "fields": "keyField1,keyField2”, "part": "key”
           |
{key={keyField3: key3}, value={valueField1: value1, valueField2: value2, valueField3: value3}} (KeyValue<AVRO, AVRO>)
           |
           | "type": "merge-key-value"
           |
{key={keyField3: key3}, value={keyField3: key3, valueField1: value1, valueField2: value2, valueField3: value3}} (KeyValue<AVRO, AVRO>)
           |
           | "type": "unwrap-key-value"
           |
{keyField3: key3, valueField1: value1, valueField2: value2, valueField3: value3} (AVRO)
           |
           | "type": "cast", "schema-type": "STRING"
           |
{"keyField3": "key3", "valueField1": "value1", "valueField2": "value2", "valueField3": "value3"} (STRING)

Deploy with Pulsar CLI

Luna Streaming 2.10+ is required to deploy custom functions in Pulsar.

The transform function .nar lives in the /functions directory of your Pulsar deployment.

Pulsar standalone

To deploy the built-in transform function locally in Pulsar standalone:

  1. Start Pulsar standalone:

    ./bin/pulsar standalone
  2. Create a transform function in localrun mode:

    ./bin/pulsar-admin functions localrun \
    --jar functions/pulsar-transformations-2.0.1.nar \
    --classname com.datastax.oss.pulsar.functions.transforms.TransformFunction \
    --inputs my-input-topic \
    --output my-output-topic \
    --user-config '{"steps": [{"type": "drop-fields", "fields": "password"}, {"type": "merge-key-value"}, {"type": "unwrap-key-value"}, {"type": "cast", "schema-type": "STRING"}]}'

Pulsar cluster

To deploy a built-in transform function to a Pulsar cluster:

  1. Create a built-in transform function:

    • Pulsar Admin

    • Result

    ./bin/pulsar-admin functions create \
    --tenant $TENANT \
    --namespace $NAMESPACE \
    --name transform-function \
    --inputs persistent://$TENANT/$NAMESPACE/$INPUT_TOPIC
    --output persistent://$TENANT/$NAMESPACE/$OUTPUT_TOPIC  \
    --classname com.datastax.oss.pulsar.functions.transforms.TransformFunction \
    --jar functions/pulsar-transformations-2.0.1.nar
    Created successfully
  2. Confirm your function has been created with the Pulsar CLI:

    • Pulsar Admin

    • Result

    ./bin/pulsar-admin functions list --tenant $TENANT
    cast-function
    flatten-function
    transform-function
    transform-function-2

Deploy with Astra Streaming

Deploy transform functions in the Functions tab of the Astra Portal.

The process is similar to creating a function in the Astra Portal, but with a few additional steps.

  1. After naming your new function, select the Use DataStax transform function option.

  2. Select a transform function from the list of available functions:

    Connect Topics
  3. Select the transform function’s namespace and input topic(s).

  4. Select the transform function’s namespace, output topic, and log topic. The log topic is a separate output topic for messages containing additional loglevel, fqn, and instance properties.

  5. Specify advanced configuration options.

  6. Pass JSON configuration values with your function, if applicable. For more, see the transform function Configuration table.

  7. Select Create. The transform function will initialize and begin processing data changes.

  8. Confirm your function has been created with the Pulsar CLI:

    • Pulsar Admin

    • Result

    ./bin/pulsar-admin functions list --tenant $TENANT
    cast-function
    flatten-function
    transform-function
    transform-function-2

Was this helpful?

Give Feedback

How can we improve the documentation?

© 2024 DataStax | Privacy policy | Terms of use

Apache, Apache Cassandra, Cassandra, Apache Tomcat, Tomcat, Apache Lucene, Apache Solr, Apache Hadoop, Hadoop, Apache Pulsar, Pulsar, Apache Spark, Spark, Apache TinkerPop, TinkerPop, Apache Kafka and Kafka are either registered trademarks or trademarks of the Apache Software Foundation or its subsidiaries in Canada, the United States and/or other countries. Kubernetes is the registered trademark of the Linux Foundation.

General Inquiries: +1 (650) 389-6000, info@datastax.com