Astra Streaming functions

Functions are lightweight compute processes that you can run on each message a topic receives. You can apply custom logic to a message, transforming or enriching it, and then output it to a different topic.

Functions run inside Astra Streaming, which makes them serverless. You write the code for your function in Java, Python, or Go, and then upload the code. It automatically runs for each message published to the specified input topic.

Functions are implemented using Apache Pulsar® functions. See [Pulsar Functions overview] for more information about Pulsar functions.

Custom functions require a paid Astra Streaming plan.

Organizations on the Free plan can use transform functions only.

Deploy a function archive

You can write Pulsar functions for Astra Streaming in Python, Java, or Go.

To deploy a function to Astra Streaming or Pulsar, you can package the project into an archive, including the function code and any dependencies. You can deploy the same archive to either environment.

This is recommended for complex functions with long scripts, multiple scripts, or many dependencies. You can also use this approach to deploy multiple function classes with a single package.

  • Python functions

  • Java functions

To create a Python function, the .zip file must have the correct structure. For example, assuming a project named testpythonfunction, the extracted archive has the following structure:

/my-python-function
   python-code/my-python-function.zip
   python-code/deps/sh-1.12.14-py2.py3-none-any.whl
   python-code/src/my-python-function.py
  1. Prepare the directory structure:

    mkdir **FUNCTION_NAME**
    mkdir **FUNCTION_NAME**/python-code
    mkdir **FUNCTION_NAME**/python-code/deps/
    mkdir **FUNCTION_NAME**/python-code/src/
    
    touch **FUNCTION_NAME**/python-code/src/**FUNCTION_NAME**.py
  2. Write your function code in a .py file. This example adds an exclamation point to the end of any messages.

    from pulsar import Function
    
    class ExclamationFunction(Function):
      def __init__(self):
        pass
    
      def process(self, input, context):
        return input + '!'
  3. Add dependencies to /deps. This example uses the pulsar-client library.

    cd deps
    pip install pulsar-client==2.10.0
  4. Create a .zip file at the root of the project:

    cd deps
    zip -r ../my-python-function.zip .

To deploy a Java function, you must create a .jar file.

  1. Create a Java project for your function.

  2. Declare dependencies in pom.xml:

    pom.xml
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>java-function</groupId>
        <artifactId>java-function</artifactId>
        <version>1.0-SNAPSHOT</version>
    
        <dependencies>
            <dependency>
                <groupId>org.apache.pulsar</groupId>
                <artifactId>pulsar-functions-api</artifactId>
                <version>3.0.0</version>
            </dependency>
        </dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <artifactId>maven-assembly-plugin</artifactId>
                    <configuration>
                        <appendAssemblyId>false</appendAssemblyId>
                        <descriptorRefs>
                            <descriptorRef>jar-with-dependencies</descriptorRef>
                        </descriptorRefs>
                        <archive>
                        <manifest>
                            <mainClass>org.example.test.ExclamationFunction</mainClass>
                        </manifest>
                    </archive>
                    </configuration>
                    <executions>
                        <execution>
                            <id>make-assembly</id>
                            <phase>package</phase>
                            <goals>
                                <goal>assembly</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.11.0</version>
                    <configuration>
                        <release>17</release>
                    </configuration>
                </plugin>
            </plugins>
        </build>
    </project>
  3. Write your function code in your project.

  4. Package the .jar file with Maven:

    mvn package
    Result
    [INFO] ------------------------------------------------------------------------
    [INFO] BUILD SUCCESS
    [INFO] ------------------------------------------------------------------------
    [INFO] Total time:  10.989 s
    [INFO] Finished at: 2023-05-16T16:19:05-04:00
    [INFO] ------------------------------------------------------------------------
  1. If you haven’t done so already, set up your environment for the Pulsar binaries.

  2. Create a deployment configuration YAML file that defines the function metadata and associated topics:

    func-create-config.yaml
    py: PATH_TO_FUNCTION_ARCHIVE
    className: FILE_NAME.CLASS_NAME
    parallelism: 1
    inputs:
     - persistent://TENANT_NAME/NAMESPACE_NAME/TOPIC_NAME
    output: persistent://TENANT_NAME/NAMESPACE_NAME/TOPIC_NAME
    autoAck: true
    tenant: TENANT_NAME
    namespace: NAMESPACE_NAME
    name: DISPLAY_NAME
    logTopic:
    userConfig:
     logging_level: ERROR

    Replace the following:

    • PATH_TO_FUNCTION_ARCHIVE: The path to the function archive.

    • FILE_NAME.CLASS_NAME: The class to execute. An archive can contain multiple classes, but only one is used per deployment.

      • For Python scripts, the className is the Python filename (without the extension) and the class to execute, such as pythonfunc.ExclamationFunction. If there isn’t a class in the file, the className is the filename without the extension, such as pythonfunc.

      • For Java scripts, the className is the path and the class to execute, such as com.example.pulsar.ExclamationFunction.

    • TENANT_NAME/NAMESPACE_NAME/TOPIC_NAME: Define the tenant, namespace, and topic for both input (incoming messages passed to the function) and output (the results of the function).

      To avoid errors, make sure the inputs topic (declared in your configuration YAML file) has a defined message schema before you deploy the function. You can define a topic’s message schema in the Astra Portal.

      Optionally, you can declare a logTopic in the same way: persistent://TENANT_NAME/NAMESPACE_NAME/TOPIC_NAME.

    • DISPLAY_NAME: The function display name in Astra Streaming, such as testpythonfunction.

  3. Use pulsar-admin to deploy the function to Astra Streaming or Pulsar using your configuration file:

    bin/pulsar-admin functions create --function-config-file PATH_TO_FUNCTION_CONFIG_YAML
    Result

    A response of Created Successfully! indicates the function is deployed and ready to accept messages.

    If the response is 402 Payment Required with Reason: only qualified organizations can create functions, then you must upgrade to a paid Astra Streaming plan. Organizations on the Free plan can use transform functions only.

    If your Python function contains only a single script and no dependencies, you can deploy the .py file directly, without packaging it into a .zip file or creating a configuration file:

    $ ./pulsar-admin functions create \
      --py PATH_TO_PYTHON_FILE \
      --classname FILE_NAME.CLASS_NAME \
      --tenant TENANT_NAME \
      --namespace NAMESPACE_NAME \
      --name DISPLAY_NAME \
      --auto-ack true \
      --inputs persistent://TENANT_NAME/NAMESPACE_NAME/TOPIC_NAME \
      --output persistent://TENANT_NAME/NAMESPACE_NAME/TOPIC_NAME \
      --log-topic persistent://TENANT_NAME/NAMESPACE_NAME/TOPIC_NAME

    If there isn’t a class in the file, the className is only the filename without the extension.

  4. Verify the deployment:

    bin/pulsar-admin functions list --tenant TENANT_NAME --namespace NAMESPACE_NAME

    You can also check the Astra Portal to confirm the function is listed on the tenant’s Functions tab.

    See Controlling your function for more information on testing and monitoring your function in Astra Streaming.

Deploy functions in the Astra Portal

  1. In the Astra Portal navigation menu, click Streaming, and then select a tenant.

  2. On the Functions tab, click Create Function.

  3. Enter a function name, and then select the namespace within the tenant.

  4. Upload function code:

    • Upload your own code

    • Use DataStax transform function

    1. Select Upload my own code.

    2. Select your function file:

      • .py: A single, independent Python script

      • .zip: A Python script with dependencies

      • .jar: A Java function

      • .go: A Go function

    3. Based on the uploaded file, select the specific class (function) to deploy.

      Astra DB generates a list of acceptable classes detected in the code. A file can contain multiple classes, but only one is used per deployment.

      For Python scripts, the class name is the Python filename (without the extension) and the class to execute. For example, if the Python file is called testfunction.py and the class is ExclamationFunction, then the class name is testfunction.ExclamationFunction. If there isn’t a class in the Python file, the class name is the filename without the extension, such as testfunction.

      For Java scripts, the class name is the class to execute.

      Exclamation Function
    1. Select Use DataStax transform function.

    This is the only function option available on the Astra Streaming Free plan.

    For more information, see Transform Functions and Astra Streaming pricing.

  5. Select input topics.

  6. (Optional) Select output and log topics.

  7. (Optional) Configure advanced settings:

    • Instances: Enter a number of sink instances to run.

    • Processing Guarantee: Select one of the following:

      • ATLEAST_ONCE (default): Each message sent to the function can be processed one or more times.

      • ATMOST_ONCE: Each message sent to the function is processed 0 or 1 times. This means there is a change that a message is not processed.

      • EFFECTIVELY_ONCE: Each message sent to the function has only one output associated with it.

    • Timeout: Set a timeout limit.

    • Auto Acknowledge: Enable or disable automatic message acknowledgment.

  8. (Optional) Provide a config key, if required. For more information, see the Pulsar documentation.

  9. Click Create.

  10. Confirm your function was created on the Functions tab.

Manage deployed functions

After you deploy a function, you can test, start, stop, monitor, edit, and delete it.

Test functions

To test the function, publish a message to the function’s input topic or use pulsar-admin functions trigger. If the function produces output and it has an output topic, the output data is returned.

  1. Listen for messages on the output topic:

    $ ./pulsar-client consume persistent://**TENANT_NAME**/**NAMESPACE_NAME**/**TOPIC_NAME** \
      --subscription-name my-subscription \
      --num-messages 0 # Listen indefinitely
  2. Test your function:

    $ ./pulsar-admin functions trigger \
      --name **FUNCTION_DISPLAY_NAME** \
      --tenant **TENANT_NAME** \
      --namespace **NAMESPACE_NAME** \
      --trigger-value "**MESSAGE**"

    The trigger sends the message string to the function. Your function should output the result of processing the message.

Stop and start functions

In the Astra Portal, on your tenant’s Functions tab, you can use Function Controls to start, stop, and restart functions.

Monitor functions

Functions produce logs to help you debug them. In the Astra Portal, on your tenant’s Functions tab, you can view, refresh, copy, and download your functions' logs.

Function Log

If you specified a log topic when deploying your function, function logs also output to that topic.

Edit functions

  1. In the Astra Portal, on your tenant’s Functions tab, click Update Function.

  2. Edit the following settings as needed, and then click Update.

    • Function code

    • Output topic

    • Log topic

    • Number of instances

    • Configuration keys

If you need to change any other function settings, you must delete and redeploy the function with the desired settings.

Delete functions

Deleting a function is permanent.

  1. In the Astra Portal, on your tenant’s Functions tab, select the function to delete.

  2. Click Delete.

  3. To confirm deletion, enter the function’s name, and then click Delete.

Was this helpful?

Give Feedback

How can we improve the documentation?

© 2024 DataStax | Privacy policy | Terms of use

Apache, Apache Cassandra, Cassandra, Apache Tomcat, Tomcat, Apache Lucene, Apache Solr, Apache Hadoop, Hadoop, Apache Pulsar, Pulsar, Apache Spark, Spark, Apache TinkerPop, TinkerPop, Apache Kafka and Kafka are either registered trademarks or trademarks of the Apache Software Foundation or its subsidiaries in Canada, the United States and/or other countries. Kubernetes is the registered trademark of the Linux Foundation.

General Inquiries: +1 (650) 389-6000, info@datastax.com