Deploy transform function in sink
As of Luna Streaming version 2.10.1.6, transform functions can be deployed inside of a sink process.
Before this update, functions transformed data either after it was written to a topic by a source connector, or before it was read from a topic by a sink connector.
This required either an intermediate topic, with additional storage, IO, and latency, or a custom connector.
Now, functions can be deployed at sink creation and apply preprocessing to sink topic writes.
Create sink function in Astra Streaming
Creating a sink function is similar to creating a sink in the Astra Portal, but with a few additional steps.
-
Create a sink as described in the Astra Streaming documentation.
-
During sink creation, select the transform function you want to run inside the sink.
-
When the sink is up and running, inspect the sink connector’s log.
The function is loaded at sink creation:2022-11-14T15:01:02.398190413Z 2022-11-14T15:01:02,397+0000 [main] INFO org.apache.pulsar.functions.runtime.thread.ThreadRuntime - ThreadContainer starting function with instanceId 0 functionId f584ae69-2eda-449b-9759-2d19fd7c4da5 namespace astracdc
-
The function then applies preprocessing to outgoing messages, in this case casting an AVRO record to
String
to your selected topic:{{"field1": "value1", "field2": "value2"}}
Create sink function in pulsar-admin
Luna Streaming 2.10+ is required to deploy custom functions in Pulsar. |
Create a sink connector, and include the path to the transform function and configuration at creation:
pulsar-admin sinks create \
--sink-type elastic-sink \
--inputs my-input-topic \
--tenant public \
--namespace default \
--name my-sink \
--transform-function "builtin://transforms" \
--transform-function-config '{"steps": [{"type": "drop-fields", "fields": "password"}, {"type": "merge-key-value"}, {"type": "unwrap-key-value"}'
What’s next?
For more, see Transform Functions or the Pulsar documentation.