Luna Streaming Functions

Functions are lightweight compute processes that enable you to process each message received on a topic or multiple topics. You can apply custom logic to that message, transforming or enriching it, and then output it to a different topic.

Functions run inside Luna Streaming and are therefore serverless. Write the code for your function in Java, Python, or Go, then upload the code to the Pulsar cluster and deploy the function. The function will be automatically run for each message published to the specified input topic. See Pulsar Functions overview for more information about Apache Pulsar® functions.

Manage functions using Pulsar Admin CLI

Add functions using the Pulsar Admin CLI. Create a new Python function to consume a message from one topic, add an exclamation point, and publish the results to another topic.

  1. Create the following Python function in

    from pulsar import Function
    class ExclamationFunction(Function):
      def __init__(self):
      def process(self, input, context):
        return input + '!'
  2. Deploy to your Pulsar cluster using the Pulsar Admin CLI:

    $ ./pulsar-admin functions create \
      --py \
      --classname function.ExclamationFunction \
      --tenant <tenant-name> \
      --namespace default \
      --name exclamation \
      --inputs persistent:///default/
      --output persistent:///default/

    If the function is set up and ready to accept messages, you should see "Created Successfully!"

  3. Use ./pulsar-admin functions list --tenant <tenant-name> to list the functions in your tenant and confirm your new function was created.

Trigger with CLI

Triggering a function is a convenient way to test that the function is working. When you trigger a function, you are publishing a message on the function’s input topic, which triggers the function to run.

To test a function with the Pulsar CLI, send a test value with Pulsar CLI’s trigger.

  1. Listen for messages on the output topic:

    $ bin/pulsar-client consume persistent://<tenant-name>/default/<topic-name> \
      --subscription-name my-subscription
      --num-messages 0 # Listen indefinitely
  2. Test your exclamation function with trigger:

    $ ./pulsar-admin functions trigger \
      --name exclamation \
      --tenant <tenant-name> \
      --namespace default \
      --trigger-value "Hello world"

    The trigger sends the string Hello world to your exclamation function. Your function should output Hello world! to your consumed output.

Add Functions using Pulsar Admin Console

If the Pulsar Admin Console is deployed, you can also add and manage the Pulsar functions in the Functions tab of the Admin Console web UI.

  1. Select Choose File to choose a local Function. In this example, we chose Choose the file you want to pull the function from and which function you want to use within that file.

    Add Function in Admin Console

    To specify the class name, it depends on the programming language or runtime of the function. Python, Java, and Go functions are added differently from each other.

    Python functions are added by loading a Python file (.py) or a zipped Python file (.zip). When adding Python files, the Class Name is specified as the name of the Python file without the extension plus the class you want to execute. For example, if the Python file is called and the class is exclamation, then the class name would be function.exclamation. The file can contain multiple classes, but only one will be used. If there is no class in the Python file (when using a basic function, for example) just specify the filename without the extension (ex. function).

    Java functions are added by loading a Java jar file (.jar). When adding Java files, also specify the name of the class to execute as the function.

    Go functions are added by loading a Go file (.go). For more information on packaging a Go function, see Packaging Functions in Go.

  1. Choose your function name and namespace.

    Your input topics, output topics, log topics, and processing guarantees will auto-populate.

  2. Provide a Configuration Key in the dropdown menu.

    For a list of configuration keys, see the Pulsar Functions API Docs.

  3. Select Add to add your function.

You have created a function for this namespace. You can confirm your function was created in the Manage tab.

Manage functions in Admin Console

You can start, stop, and restart your function by selecting Manage in the Functions dashboard.

Manage Functions

Monitor your function

Functions produce logs to help you in debugging. To view your function’s logs, select Logs in the Manage dashboard.

Function Log

Update your function

A function that is already running can be updated with a new configuration. The following settings can be updated:

  • Function code

  • Output topic

  • Log topic

  • Number of instances

  • Configuration keys

If you need to update any other setting of the function, delete and then re-add the function.

To update your function, select Update in the Manage dashboard.

Delete your function

To delete a function, select Delete in the Manage dashboard.

A Function-name Deleted Successfully! flag will appear to let you know you’ve deleted your function.

Trigger your function

To trigger a function in the Pulsar Admin Console, select Trigger in the Manage dashboard.

Trigger Function

Enter your message in the Message to Send field, and select the output topic. In this case, the trigger sends the string Hello world! to your exclamation function with no output function. If the function has an output topic and the function returns data to the output topic, it will be displayed.


For more about developing functions for Luna Streaming and Pulsar, see here.

Was this helpful?

Give Feedback

How can we improve the documentation?

© 2024 DataStax | Privacy policy | Terms of use

Apache, Apache Cassandra, Cassandra, Apache Tomcat, Tomcat, Apache Lucene, Apache Solr, Apache Hadoop, Hadoop, Apache Pulsar, Pulsar, Apache Spark, Spark, Apache TinkerPop, TinkerPop, Apache Kafka and Kafka are either registered trademarks or trademarks of the Apache Software Foundation or its subsidiaries in Canada, the United States and/or other countries. Kubernetes is the registered trademark of the Linux Foundation.

General Inquiries: +1 (650) 389-6000,