Deploy transform function in sink

As of Luna Streaming version 2.10.1.6, transform functions can be deployed inside of a sink process.
Before this update, functions transformed data either after it was written to a topic by a source connector, or before it was read from a topic by a sink connector.
This required either an intermediate topic, with additional storage, IO, and latency, or a custom connector.
Now, functions can be deployed at sink creation and apply preprocessing to sink topic writes.

Create sink function in Astra Streaming

Creating a sink function is similar to creating a sink in the Astra Portal, but with a few additional steps.

  1. Create a sink as described in the Astra Streaming documentation.

  2. During sink creation, select the transform function you want to run inside the sink.

    Connect Topics
  3. When the sink is up and running, inspect the sink connector’s log.
    The function is loaded at sink creation:

    2022-11-14T15:01:02.398190413Z 2022-11-14T15:01:02,397+0000 [main] INFO  org.apache.pulsar.functions.runtime.thread.ThreadRuntime - ThreadContainer starting function with instanceId 0 functionId f584ae69-2eda-449b-9759-2d19fd7c4da5 namespace astracdc
  4. The function then applies preprocessing to outgoing messages, in this case casting an AVRO record to String to your selected topic:

    {{"field1": "value1", "field2": "value2"}}

Create sink function in pulsar-admin

Luna Streaming 2.10+ is required to deploy custom functions in Pulsar.

Create a sink connector, and include the path to the transform function and configuration at creation:

pulsar-admin sinks create \
--sink-type elastic-sink \
--inputs my-input-topic \
--tenant public \
--namespace default \
--name my-sink \
--transform-function "builtin://transforms" \
--transform-function-config '{"steps": [{"type": "drop-fields", "fields": "password"}, {"type": "merge-key-value"}, {"type": "unwrap-key-value"}'

Was this helpful?

Give Feedback

How can we improve the documentation?

© 2024 DataStax | Privacy policy | Terms of use

Apache, Apache Cassandra, Cassandra, Apache Tomcat, Tomcat, Apache Lucene, Apache Solr, Apache Hadoop, Hadoop, Apache Pulsar, Pulsar, Apache Spark, Spark, Apache TinkerPop, TinkerPop, Apache Kafka and Kafka are either registered trademarks or trademarks of the Apache Software Foundation or its subsidiaries in Canada, the United States and/or other countries. Kubernetes is the registered trademark of the Linux Foundation.

General Inquiries: +1 (650) 389-6000, info@datastax.com