• Glossary
  • Support
  • Downloads
  • DataStax Home
Get Live Help
Expand All
Collapse All

DataStax Streaming Home

Streaming Learning

    • Pulsar IO
      • Sinks
        • Astra DB
        • Elasticsearch
        • JDBC PostgreSQL
        • JDBC MariaDB
        • JDBC Clickhouse
        • JDBC SQLite
        • Kafka
        • Kinesis
        • Snowflake
      • Sources
        • Data Generator
        • Debezium MongoDB
        • Debezium MySQL
        • Debezium Oracle
        • Debezium PostgreSQL
        • Debezium SQL Server
        • Kafka
        • Kinesis
    • Transform functions
      • Cast
      • Compute
      • Deploy transform function in sink
      • Drop fields
      • Drop
      • Flatten
      • Merge KeyValue
      • Unwrap KeyValue
    • Real-time Processing
      • Real-time data pipelines with DataStax Astra and Decodable
    • Pulsar extensions
      • Starlight for Kafka
      • Starlight for RabbitMQ
      • Starlight for JMS
  • Streaming Learning
  • Transform functions
  • Deploy transform function in sink
Edit this Page

Deploy transform function in sink

As of Luna Streaming version 2.10.1.6, transform functions can be deployed inside of a sink process.
Before this update, functions transformed data either after it was written to a topic by a source connector, or before it was read from a topic by a sink connector.
This required either an intermediate topic, with additional storage, IO, and latency, or a custom connector.
Now, functions can be deployed at sink creation and apply preprocessing to sink topic writes.

Create sink function in Astra Streaming

Creating a sink function is similar to creating a sink in the Astra Portal, but with a few additional steps.

  1. Create a sink as described in the Astra Streaming documentation.

  2. During sink creation, select the transform function you want to run inside the sink.

    Connect Topics
  3. When the sink is up and running, inspect the sink connector’s log.
    The function is loaded at sink creation:

    2022-11-14T15:01:02.398190413Z 2022-11-14T15:01:02,397+0000 [main] INFO  org.apache.pulsar.functions.runtime.thread.ThreadRuntime - ThreadContainer starting function with instanceId 0 functionId f584ae69-2eda-449b-9759-2d19fd7c4da5 namespace astracdc
  4. The function then applies preprocessing to outgoing messages, in this case casting an AVRO record to String to your selected topic:

    {{"field1": "value1", "field2": "value2"}}

Create sink function in pulsar-admin

Luna Streaming 2.10+ is required to deploy custom functions in Pulsar.

Create a sink connector, and include the path to the transform function and configuration at creation:

pulsar-admin sinks create \
--sink-type elastic-sink \
--inputs my-input-topic \
--tenant public \
--namespace default \
--name my-sink \
--transform-function "builtin://transforms" \
--transform-function-config '{"steps": [{"type": "drop-fields", "fields": "password"}, {"type": "merge-key-value"}, {"type": "unwrap-key-value"}'

What’s next?

For more, see Transform Functions or the Pulsar documentation.

Compute Drop fields

General Inquiries: +1 (650) 389-6000 info@datastax.com

© DataStax | Privacy policy | Terms of use

DataStax, Titan, and TitanDB are registered trademarks of DataStax, Inc. and its subsidiaries in the United States and/or other countries.

Apache, Apache Cassandra, Cassandra, Apache Tomcat, Tomcat, Apache Lucene, Apache Solr, Apache Hadoop, Hadoop, Apache Pulsar, Pulsar, Apache Spark, Spark, Apache TinkerPop, TinkerPop, Apache Kafka and Kafka are either registered trademarks or trademarks of the Apache Software Foundation or its subsidiaries in Canada, the United States and/or other countries.

Kubernetes is the registered trademark of the Linux Foundation.

landing_page landingpage