Deploy and manage Luna Streaming Pulsar functions
Functions are lightweight compute processes that you can run on each message a topic receives. You can apply custom logic to a message, transforming or enriching it, and then output it to a different topic.
Functions run inside Luna Streaming, which makes them serverless. You write the code for your function in Java, Python, or Go, and then upload the code to the Pulsar cluster and deploy the function. The function automatically runs for each message published to the specified input topic.
Functions are implemented using Apache Pulsar™ functions.
Luna Streaming 2.10 or later is required to deploy custom functions in Pulsar.
Manage functions using Pulsar Admin CLI
There are multiple ways to add functions using the Pulsar Admin CLI.
Deploy custom function code
-
Create a file that contains your function code.
For example purposes, you can use the following sample function that consumes a message from one topic, adds an exclamation point, and then publishes the result to another topic.
function.pyfrom pulsar import Function class ExclamationFunction(Function): def __init__(self): pass def process(self, input, context): return input + '!' -
Deploy your function file to your Pulsar cluster using the Pulsar Admin CLI:
./pulsar-admin functions create \ --py function.py \ --classname function.ExclamationFunction \ --tenant $TENANT \ --namespace $NAMESPACE \ --name exclamation \ --inputs persistent://$TENANT/$NAMESPACE/$INPUT_TOPIC \ --output persistent://$TENANT/$NAMESPACE/$OUTPUT_TOPICIf the function is set up and ready to accept messages, then the output is
Created Successfully. -
Confirm the function was created by listing the functions in the tenant:
./pulsar-admin functions list --tenant $TENANT
Deploy DataStax transform functions
Transform functions are built-in functions that perform common data transformation tasks, such as casting data types, dropping fields, flattening nested structures, and merging or unwrapping KeyValue pairs. For more information, see the transform functions configuration reference.
You can find the transform function .nar in the /functions directory of your Pulsar deployment.
Deploy transform functions in Pulsar standalone
To deploy the built-in transform function locally in Pulsar standalone, do the following:
-
Start Pulsar standalone:
./bin/pulsar standalone -
Create a transform function in
localrunmode:./bin/pulsar-admin functions localrun \ --jar functions/pulsar-transformations-2.0.1.nar \ --classname com.datastax.oss.pulsar.functions.transforms.TransformFunction \ --inputs $INPUT_TOPIC \ --output $OUTPUT_TOPIC \ --user-config '{"steps": [{"type": "drop-fields", "fields": "password"}, {"type": "merge-key-value"}, {"type": "unwrap-key-value"}, {"type": "cast", "schema-type": "STRING"}]}'For the
user-configparameters, see the transform functions configuration reference.
Deploy transform functions in a Pulsar cluster
To deploy a built-in transform function to a Pulsar cluster, do the following:
-
Create a built-in transform function with the Pulsar CLI:
./bin/pulsar-admin functions create \ --tenant $TENANT \ --namespace $NAMESPACE \ --name transform-function \ --inputs persistent://$TENANT/$NAMESPACE/$INPUT_TOPIC \ --output persistent://$TENANT/$NAMESPACE/$OUTPUT_TOPIC \ --classname com.datastax.oss.pulsar.functions.transforms.TransformFunction \ --jar functions/pulsar-transformations-2.0.1.narThis option deployed the functions using the transform function
.nar. You can also deploy individual functions by passing the function configuration in the--user-configparameter. For more information, see the transform functions configuration reference. -
Confirm the function was created by listing the functions in the tenant:
./pulsar-admin functions list --tenant $TENANT
Deploy a function in a Pulsar sink
By default, functions modify data either after it is written to a topic by a source connector, or before it is read from a topic by a sink connector. This requires either a custom connector or an intermediate topic (with additional storage, IO, and latency).
Alternatively, you can deploy functions in a sink to apply preprocessing to sink topic writes (outgoing messages from the sink).
To deploy a function in a sink, include the function path and configuration when you create the sink connector. For example:
pulsar-admin sinks create \
--sink-type elastic-sink \
--inputs my-input-topic \
--tenant public \
--namespace default \
--name my-sink \
--transform-function "builtin://transforms" \
--transform-function-config '{"steps": [{"type": "drop-fields", "fields": "password"}, {"type": "merge-key-value"}, {"type": "unwrap-key-value"}'
Trigger functions with the Pulsar CLI
Triggering a function is a convenient way to test that the function is working. When you trigger a function, you are publishing a message on the function’s input topic, which triggers the function to run.
To test a function with the Pulsar CLI, send a test value with Pulsar CLI’s functions trigger command:
-
Listen for messages on the output topic:
bin/pulsar-client consume persistent://$TENANT/$NAMESPACE/$TOPIC \ --subscription-name my-subscription --num-messages 0 # Listen indefinitely -
Test the function with the
functions triggercommand and a test message.The following example triggers a function named
exclamation:./pulsar-admin functions trigger \ --name exclamation \ --tenant $TENANT \ --namespace $NAMESPACE \ --trigger-value "Hello world" -
Make sure the output shows that the messages was processed as expected based on your function logic. For example, the
exclamationsample function adds an exclamation point to the input string. Therefore, the test messageHello worldshould outputHello world!.
Add Functions using Pulsar Admin Console
If the Pulsar Admin Console is deployed, you can also add and manage the Pulsar functions in the Functions tab of the Admin Console web UI.
-
Select Choose File to choose a local Function. In this example, we chose
exclamation_function.py. Choose the file you want to pull the function from and which function you want to use within that file.
To specify the class name, it depends on the programming language or runtime of the function. Python, Java, and Go functions are added differently from each other.
Python functions are added by loading a Python file (.py) or a zipped Python file (.zip). When adding Python files, the Class Name is specified as the name of the Python file without the extension plus the class you want to execute. For example, if the Python file is called
function.pyand the class isexclamation, then the class name would befunction.exclamation. The file can contain multiple classes, but only one will be used. If there is no class in the Python file (when using a basic function, for example) just specify the filename without the extension (ex.function).Java functions are added by loading a Java jar file (.jar). When adding Java files, also specify the name of the class to execute as the function.
Go functions are added by loading a Go file (.go). For more information on packaging a Go function, see Packaging Functions in Go.
-
Choose your function name and namespace.
Your input topics, output topics, log topics, and processing guarantees will auto-populate.
-
Provide a Configuration Key in the dropdown menu.
For a list of configuration keys, see the Pulsar Functions API Docs.
-
Select Add to add your function.
You have created a function for this namespace. You can confirm your function was created in the Manage tab.
Manage functions in Admin Console
You can start, stop, and restart your function by selecting Manage in the Functions dashboard.
Monitor your function
Functions produce logs to help you in debugging. To view your function’s logs, select Logs in the Manage dashboard.
Update your function
A function that is already running can be updated with a new configuration. The following settings can be updated:
-
Function code
-
Output topic
-
Log topic
-
Number of instances
-
Configuration keys
If you need to update any other setting of the function, delete and then re-add the function.
To update your function, select Update in the Manage dashboard.
Delete your function
To delete a function, select Delete in the Manage dashboard.
A Function-name Deleted Successfully! flag will appear to let you know you’ve deleted your function.
Trigger your function
To trigger a function in the Pulsar Admin Console, select Trigger in the Manage dashboard.
Enter your message in the Message to Send field, and select the output topic. In this case, the trigger sends the string Hello world! to your exclamation function with no output function. If the function has an output topic and the function returns data to the output topic, it will be displayed.