Apache Pulsar™ functions

Functions are lightweight compute processes that enable you to process each message received on a topic. You can apply custom logic to that message, transforming or enriching it, and then output it to a different topic.

Functions run inside Astra Streaming and are therefore serverless. You write the code for your function in Java, Python, or Go, then upload the code. It is automatically run for each message published to the specified input topic.

Functions are implemented using Apache Pulsar™ functions.

Custom functions require a paid Astra Streaming plan.

Organizations on the Free plan can use transform functions only.

Deploy Python functions in a zip file

Astra Streaming supports Python-based Pulsar functions. These functions can be packaged in a zip file and deployed to Astra Streaming or Pulsar. The same zip file can be deployed to either environment.

To demonstrate this, the following steps create function configuration YAML file, package all necessary function files as a zip archive, and then use the pulsar-admin CLI to deploy the zip. The configuration file defines the Pulsar function options and parameters.

For video demos of a Pulsar Python function, see the Five Minutes About Pulsar series provides:

  1. Create a directory and subdirectories for your function zip archive with the following structure:

    /parent-directory
       /python-code
          /deps
          /src

    For example, a function called my-python-function could have the following structure:

    /my-python-function
       python-code/my-python-function.zip
       python-code/deps/sh-1.12.14-py2.py3-none-any.whl
       python-code/src/my-python-function.py

    The following commands create the necessary directories for a function called demo-function:

    mkdir demo-function
    mkdir demo-function/python-code
    mkdir demo-function/python-code/deps/
    mkdir demo-function/python-code/src/
  2. Create a Python file in the /src directory. For example:

    touch demo-function/python-code/src/demo-function.py
  3. Add your function code to your Python file. This example function adds an exclamation point to the end of each message:

    from pulsar import Function
    
    class ExclamationFunction(Function):
      def __init__(self):
        pass
    
      def process(self, input, context):
        return input + '!'
  4. Add your function’s dependencies to the demo-function/python-code/deps directory. This example uses the pulsar-client library:

    cd deps
    pip install pulsar-client==2.10.0
  5. Create the zip archive for your function in the python-code directory.

    For example, the following command is run from within the /deps directory and creates the demo-function.zip file in the parent python-code directory.

    cd deps
    zip -r ../demo-function.zip .

    Wait while the archive is packaged.

  6. Verify that the zip file is in the python-code directory:

    python-code ls -al
    Result
    deps
    demo-function.zip
    src

Deploy a Python function with configuration file

  1. Create a deployment configuration file named func-create-config.yaml with the following contents. This file is passed to the pulsar-admin create function command.

    py: /absolute/path/to/demo-function.zip
    className: pythonfunc.ExclamationFunction
    parallelism: 1
    inputs:
     - persistent://TENANT_NAME/NAMESPACE_NAME/INPUT_TOPIC_NAME
    output: persistent://TENANT_NAME/NAMESPACE_NAME/OUTPUT_TOPIC_NAME
    autoAck: true
    tenant: TENANT_NAME
    namespace: NAMESPACE_NAME
    name: demofunction
    logTopic:
    userConfig:
     logging_level: ERROR

    Replace the following:

    • TENANT_NAME: The tenant where you want to deploy the function

    • NAMESPACE_NAME: The namespace where you want to deploy the function

    • INPUT_TOPIC_NAME: The input topic for the function

    • OUTPUT_TOPIC_NAME: The output topic for the function

  2. Use pulsar-admin to deploy the Python zip to Astra Streaming or Pulsar. The command below assumes you’ve properly configured the client.conf file for pulsar-admin commands against your Pulsar cluster. If you are using Astra Streaming, see Use Apache Pulsar™ binaries with Astra Streaming for more information.

    bin/pulsar-admin functions create --function-config-file /absolute/path/to/func-create-config.yml
  3. Verify that the function was deployed:

    • Go to the Astra Portal to see your newly deployed function listed under the Functions tab for your tenant. See Controlling your function for more information on testing and monitoring your function in Astra Streaming.

    • Use the pulsar-admin CLI to list functions for a specific tenant and namespace:

      bin/pulsar-admin functions list --tenant TENANT_NAME --namespace NAMESPACE_NAME

Deploy Java functions in a JAR file

Astra Streaming supports Java-based Pulsar functions which are packaged in a JAR file. The JAR can be deployed to Astra Streaming or Pulsar. The same JAR file can be deployed to either environment.

In this example, you’ll create a function JAR file using Maven, then use the pulsar-admin CLI to deploy the JAR. You’ll also create a function configuration YAML file that defines the Pulsar function options and parameters.

  1. Create a properly-structured JAR with your function’s Java code. For example:

    Example: Function pom.xml
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
            xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
            xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>java-function</groupId>
        <artifactId>java-function</artifactId>
        <version>1.0-SNAPSHOT</version>
    
        <dependencies>
            <dependency>
                <groupId>org.apache.pulsar</groupId>
                <artifactId>pulsar-functions-api</artifactId>
                <version>3.0.0</version>
            </dependency>
        </dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <artifactId>maven-assembly-plugin</artifactId>
                    <configuration>
                        <appendAssemblyId>false</appendAssemblyId>
                        <descriptorRefs>
                            <descriptorRef>jar-with-dependencies</descriptorRef>
                        </descriptorRefs>
                        <archive>
                        <manifest>
                            <mainClass>org.example.test.ExclamationFunction</mainClass>
                        </manifest>
                    </archive>
                    </configuration>
                    <executions>
                        <execution>
                            <id>make-assembly</id>
                            <phase>package</phase>
                            <goals>
                                <goal>assembly</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.11.0</version>
                    <configuration>
                        <release>17</release>
                    </configuration>
                </plugin>
            </plugins>
        </build>
    
    </project>
  2. Package the JAR file with Maven:

    mvn package
    Result
    [INFO] ------------------------------------------------------------------------
    [INFO] BUILD SUCCESS
    [INFO] ------------------------------------------------------------------------
    [INFO] Total time:  10.989 s
    [INFO] Finished at: 2023-05-16T16:19:05-04:00
    [INFO] ------------------------------------------------------------------------
  3. Create a deployment configuration file named func-create-config.yaml with the following contents. This file is passed to the pulsar-admin create function command.

    jar: /absolute/path/to/java-function.jar
    className: com.example.pulsar.ExclamationFunction
    parallelism: 1
    inputs:
      - persistent://mytenant/n0/t1
    output: persistent://mytenant/ns/t2
    autoAck: true
    tenant: mytenant
    namespace: ns0
    name: testjarfunction
    logTopic:
    userConfig:
      logging_level: ERROR

    Astra Streaming requires the inputs topic to have a message schema defined before deploying the function. Otherwise, deployment errors may occur. Use the Astra Portal to define the message schema for a topic.

  4. Use the pulsar-admin CLI to deploy your function JAR to Astra Streaming or Pulsar.

    The following command assumes you’ve properly configured the client.conf file for pulsar-admin commands against your Pulsar cluster. If you are using Astra Streaming, see Use Apache Pulsar™ binaries with Astra Streaming for more information.

    bin/pulsar-admin functions create --function-config-file  /absolute/path/to/func-create-config.yml
  5. Verify that the function was deployed:

    • Go to the Astra Portal to see your newly deployed function listed under the Functions tab for your tenant. See Controlling your function for more information on testing and monitoring your function in Astra Streaming.

    • Use the pulsar-admin CLI to list functions for a specific tenant and namespace:

      bin/pulsar-admin functions list --tenant TENANT_NAME --namespace NAMESPACE_NAME

Add functions in Astra Streaming dashboard

Add functions in the Functions tab of the Astra Streaming dashboard.

  1. Select Create Function to get started.

  2. Choose your function name and namespace.

    Function and Namespace
  3. Select the file you want to pull the function from and which function you want to use within that file. Astra Streaming generates a list of acceptable classes.

    Exclamation Function

    There are differences depending on the function language:

    • Python functions are added by loading a Python file (.py) or a zipped Python file (.zip).

      When adding Python files, the Class Name is specified as the name of the Python file without the extension plus the class you want to execute. For example, if the Python file is called testfunction.py and the class is ExclamationFunction, then the class name is testfunction.ExclamationFunction.

      The file can contain multiple classes, but only one is used. If there is no class in the Python file (when using a basic function, for example), specify the filename without the extension, such as testfunction.

    • Java functions are added by loading a Java jar file (.jar). When adding Java files, you must specify the name of the class to execute as the function.

  4. Select your input topics.

    IO Topics

  5. Select Optional Destination Topics for output and logging.

    Optional Topics

  6. If applicable, configure the Advanced Options.

    Advanced Configuration

  7. Run at least one sink instance.

  8. Select an option for Processing Guarantee:

    • ATLEAST_ONCE (default): Each message sent to the function can be processed more than once.

    • ATMOST_ONCE: The message sent to the function is processed at most once. Therefore, there is a chance that the message is not processed.

    • EFFECTIVELY_ONCE: Each message sent to the function will have one output associated with it.

  9. Provide an Option Configuration Key. See the Pulsar documentation for a list of configuration keys.

    Provide Config Key

  10. Click Create.

  11. To verify that the function was created, review the list of functions on the Functions tab.

Add function with Pulsar CLI

You can add functions using the Pulsar CLI.

The following example creates a Python function that consumes a message from one topic, adds an exclamation point, and then publishes the results to another topic.

  1. Add the following Python function code to a file named testfunction.py:

    testfunction.py
    from pulsar import Function
    
    class ExclamationFunction(Function):
      def __init__(self):
        pass
    
      def process(self, input, context):
        return input + '!'
  2. Deploy testfunction.py to your Pulsar cluster using the Pulsar CLI:

    $ ./pulsar-admin functions create \
      --py /absolute/path/to/testfunction.py \
      --classname testfunction.ExclamationFunction \
      --tenant TENANT_NAME \
      --namespace default \
      --name exclamation \
      --auto-ack true \
      --inputs persistent://TENANT_NAME/default/in \
      --output persistent://TENANT_NAME/default/out \
      --log-topic persistent://TENANT_NAME/default/log

    Replace TENANT_NAME with the name of the tenant where you want to deploy the function. If you want to use a different namespace, replace default with another namespace name. If you want to use different topics, change in, out, and log accordingly.

  3. Verify that the response is Created Successfully!. This indicates that the function was deployed and ready to run when triggered by incoming messages.

    If the response is 402 Payment Required with Reason: only qualified organizations can create functions, then you must upgrade to a paid Astra Streaming plan. Organizations on the Free plan can use transform functions only.

    You can also verify that a function was created by checking the Functions tab or by running ./pulsar-admin functions list --tenant TENANT_NAME.

Testing Your Function

Triggering a function is a convenient way to test that the function is working. When you trigger a function, you publish a message on the function’s input topic, which triggers the function.

  1. Listen for messages on the output topic:

    $ ./pulsar-client consume persistent://TENANT_NAME/default/out \
      --subscription-name my-subscription \
      --num-messages 0 # Listen indefinitely

    Replace TENANT_NAME with the name of the tenant where you deployed the function. If your function uses a different namespace and output topic name, replace default and out accordingly.

    If the function has an output topic, and the function returns data to the output topic, then that data is returned by the listener when you run the function.

  2. Send a test value with the Pulsar CLI trigger command:

    $ ./pulsar-admin functions trigger \
      --name exclamation \
      --tenant TENANT_NAME \
      --namespace default \
      --trigger-value "Hello world"

    This command sends the string Hello world to the exclamation function. If deployed and configured correctly, the function should output Hello world! to the out topic.

Controlling Your Function

You can start, stop, and restart your function by selecting it in the Functions dashboard.

Function Controls

Monitoring Your Function

Functions produce logs to help you in debugging. To view your function’s logs, open your function in the Functions dashboard.

Function Log

In the upper right corner of the function log are controls to Refresh, Copy to Clipboard, and Save your function log.

Updating Your Function

A function that is already running can be updated with new configuration. The following settings can be updated:

  • Function code

  • Output topic

  • Log topic

  • Number of instances

  • Configuration keys

If you need to update any other setting of the function, delete and then re-add the function.

  1. To update your function, select the function in the Functions dashboard.

Update Function
  1. Click Change File to select a local function file, and then click Open.

  2. Update your function’s Instances and Timeout.

  3. Click Update.

    An Updates Submitted Successfully message confirms that the function was updated.

Deleting Your Function

  1. Select the function to be deleted in the Functions dashboard.

Delete Function
  1. Click Delete.

  2. To confirm the deletion, enter the function’s name, and then click Delete.

    A Function-name Deleted Successfully! message confirms the function was permanently deleted.

Next steps

Learn more about developing functions for Astra Streaming and Pulsar here.

Was this helpful?

Give Feedback

How can we improve the documentation?

© 2025 DataStax, an IBM Company | Privacy policy | Terms of use | Manage Privacy Choices

Apache, Apache Cassandra, Cassandra, Apache Tomcat, Tomcat, Apache Lucene, Apache Solr, Apache Hadoop, Hadoop, Apache Pulsar, Pulsar, Apache Spark, Spark, Apache TinkerPop, TinkerPop, Apache Kafka and Kafka are either registered trademarks or trademarks of the Apache Software Foundation or its subsidiaries in Canada, the United States and/or other countries. Kubernetes is the registered trademark of the Linux Foundation.

General Inquiries: +1 (650) 389-6000, info@datastax.com