• Glossary
  • Support
  • Downloads
  • DataStax Home
Get Live Help
Expand All
Collapse All

DataStax Streaming Home

Streaming Learning

    • Pulsar IO
      • Sinks
        • Astra DB
        • Elasticsearch
        • JDBC PostgreSQL
        • JDBC MariaDB
        • JDBC Clickhouse
        • JDBC SQLite
        • Kafka
        • Kinesis
        • Snowflake
      • Sources
        • Data Generator
        • Debezium MongoDB
        • Debezium MySQL
        • Debezium Oracle
        • Debezium PostgreSQL
        • Debezium SQL Server
        • Kafka
        • Kinesis
    • Transform functions
      • Cast
      • Compute
      • Deploy transform function in sink
      • Drop fields
      • Drop
      • Flatten
      • Merge KeyValue
      • Unwrap KeyValue
    • Real-time Processing
      • Real-time data pipelines with DataStax Astra and Decodable
    • Pulsar extensions
      • Starlight for Kafka
      • Starlight for RabbitMQ
      • Starlight for JMS
  • Streaming Learning
  • Transform functions
Edit this Page

Transform Functions

A Pulsar transform function is a low-code implementation of Pulsar functions.
Functions receive data from one or more input topics, apply user-supplied processing, and publish the results to another topic.
Custom functions are a powerful feature, but for common data transformations, we now include Transform Functions.
Drop-fields, Flatten, Compute, and more without coding or deep schema knowledge.
DataStax has created the following transform functions for common data transforms, but we’re always experimenting with new ones. Check back as the list grows, or let us know some functions you’d find helpful in your deployment.

Transforms

Cast

The cast transform function modifies the key or value schema to a target compatible schema.

Cast documentation.

Compute

The compute transform function computes new field values based on an expression evaluated at runtime. If the field already exists, it will be overwritten.

Compute documentation.

Drop-fields

The drop-fields transform function drops fields from structured data.

Drop-fields documentation.

Drop

The drop transform function drops a record from further processing.

Drop documentation.

Flatten

The flatten transform function flattens structured data.

Flatten documentation.

Merge KeyValue

The merge-key-value transform function merges the fields of KeyValue records where both the key and value are structured data with the same schema type.

Merge KeyValue documentation.

Unwrap key value

The unwrap-key-value transform function extracts the KeyValue’s key or value and makes it the record value (if the record is a KeyValue).

Unwrap KeyValue documentation.

Configuration

The TransformFunction reads its configuration as JSON from the Function userConfig parameter in the format:

{
  "steps": [
    {
      "type": "drop-fields", "fields": "keyField1,keyField2", "part": "key"
    },
    {
      "type": "merge-key-value"
    },
    {
      "type": "unwrap-key-value"
    },
    {
      "type": "cast", "schema-type": "STRING"
    }
  ]
}

Transform functions are performed in the order in which they appear in the steps array. Each step is defined by its type and uses its own arguments. Each step can be dynamically toggled on or off by supplying a when condition that evaluates to true or false.

For example, the above configuration applied on a KeyValue<AVRO, AVRO> input record with value {key={keyField1: key1, keyField2: key2, keyField3: key3}, value={valueField1: value1, valueField2: value2, valueField3: value3}} will return transformed values after each step:

{key={keyField1: key1, keyField2: key2, keyField3: key3}, value={valueField1: value1, valueField2: value2, valueField3: value3}}(KeyValue<AVRO, AVRO>)
           |
           | ”type": "drop-fields", "fields": "keyField1,keyField2”, "part": "key”
           |
{key={keyField3: key3}, value={valueField1: value1, valueField2: value2, valueField3: value3}} (KeyValue<AVRO, AVRO>)
           |
           | "type": "merge-key-value"
           |
{key={keyField3: key3}, value={keyField3: key3, valueField1: value1, valueField2: value2, valueField3: value3}} (KeyValue<AVRO, AVRO>)
           |
           | "type": "unwrap-key-value"
           |
{keyField3: key3, valueField1: value1, valueField2: value2, valueField3: value3} (AVRO)
           |
           | "type": "cast", "schema-type": "STRING"
           |
{"keyField3": "key3", "valueField1": "value1", "valueField2": "value2", "valueField3": "value3"} (STRING)

Deploy with Pulsar CLI

Luna Streaming 2.10+ is required to deploy custom functions in Pulsar.

The transform function .nar lives in the /functions directory of your Pulsar deployment.

Pulsar standalone

To deploy the built-in transform function locally in Pulsar standalone:

  1. Start Pulsar standalone:

    ./bin/pulsar standalone
  2. Create a transform function in localrun mode:

    ./bin/pulsar-admin functions localrun \
    --jar functions/pulsar-transformations-2.0.1.nar \
    --classname com.datastax.oss.pulsar.functions.transforms.TransformFunction \
    --inputs my-input-topic \
    --output my-output-topic \
    --user-config '{"steps": [{"type": "drop-fields", "fields": "password"}, {"type": "merge-key-value"}, {"type": "unwrap-key-value"}, {"type": "cast", "schema-type": "STRING"}]}'

Pulsar cluster

To deploy a built-in transform function to a Pulsar cluster:

  1. Create a built-in transform function:

    • Pulsar Admin

    • Result

    ./bin/pulsar-admin functions create \
    --tenant $TENANT \
    --namespace $NAMESPACE \
    --name transform-function \
    --inputs persistent://$TENANT/$NAMESPACE/$INPUT_TOPIC
    --output persistent://$TENANT/$NAMESPACE/$OUTPUT_TOPIC  \
    --classname com.datastax.oss.pulsar.functions.transforms.TransformFunction \
    --jar functions/pulsar-transformations-2.0.1.nar
    Created successfully
  2. Confirm your function has been created with the Pulsar CLI:

    • Pulsar Admin

    • Result

    ./bin/pulsar-admin functions list --tenant $TENANT
    cast-function
    flatten-function
    transform-function
    transform-function-2

Deploy with Astra Streaming

Deploy transform functions in the Functions tab of the Astra Portal.

The process is similar to creating a function in the Astra Portal, but with a few additional steps.

  1. After naming your new function, select the Use DataStax transform function option.

  2. Select a transform function from the list of available functions:

    Connect Topics
  3. Select the transform function’s namespace and input topic(s).

  4. Select the transform function’s namespace, output topic, and log topic. The log topic is a separate output topic for messages containing additional loglevel, fqn, and instance properties.

  5. Specify advanced configuration options.

  6. Pass JSON configuration values with your function, if applicable. For more, see the transform function Configuration table.

  7. Select Create. The transform function will initialize and begin processing data changes.

  8. Confirm your function has been created with the Pulsar CLI:

    • Pulsar Admin

    • Result

    ./bin/pulsar-admin functions list --tenant $TENANT
    cast-function
    flatten-function
    transform-function
    transform-function-2

What’s next?

For more, see Astra Streaming Functions or the Pulsar documentation.

Kinesis Cast

General Inquiries: +1 (650) 389-6000 info@datastax.com

© DataStax | Privacy policy | Terms of use

DataStax, Titan, and TitanDB are registered trademarks of DataStax, Inc. and its subsidiaries in the United States and/or other countries.

Apache, Apache Cassandra, Cassandra, Apache Tomcat, Tomcat, Apache Lucene, Apache Solr, Apache Hadoop, Hadoop, Apache Pulsar, Pulsar, Apache Spark, Spark, Apache TinkerPop, TinkerPop, Apache Kafka and Kafka are either registered trademarks or trademarks of the Apache Software Foundation or its subsidiaries in Canada, the United States and/or other countries.

Kubernetes is the registered trademark of the Linux Foundation.

landing_page landingpage