Astra Streaming Functions
Functions are lightweight compute processes that enable you to process each message received on a topic. You can apply custom logic to that message, transforming or enriching it, and then output it to a different topic.
Functions run inside Astra Streaming and are therefore serverless. You write the code for your function in Java, Python, or Go, then upload the code. It will be automatically run for each message published to the specified input topic.
Functions are implemented using Apache Pulsar® functions. See Pulsar Functions overview for more information about Pulsar functions.
Custom functions can only be created by qualified organizations. A qualified organization is an organization on the Pay As You Go plan with a payment method on file. Upgrade your organization to a qualified organization by:
Unqualified orgs can still use transform functions. |
Deploy Python functions in a ZIP file
Astra Streaming supports Python-based Pulsar Functions. These functions can be packaged in a ZIP file and deployed to Astra Streaming or Pulsar. The same ZIP file can be deployed to either environment. We’ll create a ZIP file in the proper format, then use the pulsar-admin command to deploy the ZIP. We’ll pass a “create function" configuration file (a .yaml file) as a parameter to pulsar-admin, which defines the Pulsar Function options and parameters.
Assuming the ZIP file is named testpythonfunction.zip
, an unzipped testpythonfunction.zip
folder looks like this:
/my-python-function
python-code/my-python-function.zip
python-code/deps/sh-1.12.14-py2.py3-none-any.whl
python-code/src/my-python-function.py
-
To deploy a ZIP, first create the proper ZIP file directory structure. That file format/layout looks like this:
mkdir my-python-function mkdir my-python-function/python-code mkdir my-python-function/python-code/deps/ mkdir my-python-function/python-code/src/ touch my-python-function/python-code/src/my-python-function.py
-
Add your code to my-python-function.py. For this example, we’ll just use a basic exclamation function:
from pulsar import Function class ExclamationFunction(Function): def __init__(self): pass def process(self, input, context): return input + '!'
-
Add your dependencies to the /deps folder. For this example, we’ll use the pulsar-client library.
cd deps pip install pulsar-client==2.10.0
-
Run the following command to add my-pulsar-function.zip to the root of the file structure:
cd deps zip -r ../my-python-function.zip . adding: sh-1.12.14-py2.py3-none-any.whl (deflated 2%)
-
Ensure your package has the ZIP file at the root of the file structure:
python-code ls -al deps my-python-function.zip src
Deploy a Python function with configuration file
-
Create a deployment configuration file. In this example we’ll call this file “func-create-config.yaml”. This file will be passed to the pulsar-admin create function command.
The contents of the YAML file should be:py: </absolute/path/to/my/testpythonfunction.zip> className: pythonfunc.ExclamationFunction parallelism: 1 inputs: - persistent://mytenant/n0/t1 output: persistent://mytenant/ns/t2 autoAck: true tenant: mytenant namespace: ns0 name: testpythonfunction logTopic: userConfig: logging_level: ERROR
-
Use pulsar-admin to deploy the Python ZIP to Astra Streaming or Pulsar. The command below assumes you’ve properly configured the client.conf file for pulsar-admin commands against your Pulsar cluster. See the documentation here for more information.
bin/pulsar-admin functions create --function-config-file </absolute/path/to/your/python/func-create-config.yml>
-
Check results: Go to the Astra Portal to see your newly deployed function listed under the “Functions” tab for your Tenant. See Controlling your function for more information on testing and monitoring your function in Astra Streaming.
-
You can also use the pulsar-admin command to list your functions:
bin/pulsar-admin functions list --tenant <mytenant> --namespace <my-namespace>
-
Deploy Java functions in a JAR file
Astra Streaming supports Java-based Pulsar Functions which are packaged in a JAR file. The JAR can be deployed to Astra Streaming or Pulsar. The same JAR file can be deployed to either environment.
We’ll create a JAR file using Maven, then use the pulsar-admin command to deploy the JAR. We’ll pass a "create function" configuration file (a .yaml file) as a parameter to pulsar-admin, which defines the Pulsar function options and parameters.
-
To deploy a JAR, first create the proper JAR with the Java code of the Pulsar Function. An example pom.xml file is shown below:
Function pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>java-function</groupId> <artifactId>java-function</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <dependency> <groupId>org.apache.pulsar</groupId> <artifactId>pulsar-functions-api</artifactId> <version>3.0.0</version> </dependency> </dependencies> <build> <plugins> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <appendAssemblyId>false</appendAssemblyId> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> <archive> <manifest> <mainClass>org.example.test.ExclamationFunction</mainClass> </manifest> </archive> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>assembly</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.11.0</version> <configuration> <release>17</release> </configuration> </plugin> </plugins> </build> </project>
-
Package the JAR file with Maven.
-
Maven
-
Result
mvn package
[INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ [INFO] Total time: 10.989 s [INFO] Finished at: 2023-05-16T16:19:05-04:00 [INFO] ------------------------------------------------------------------------
-
-
Create a deployment configuration file. In this example we’ll call this file “func-create-config.yaml”. This file will be passed to the pulsar-admin create function command.
The contents of the YAML file should be:jar: </absolute/path/to/my/testpythonfunction.jar> className: com.example.pulsar.ExclamationFunction parallelism: 1 inputs: - persistent://mytenant/n0/t1 output: persistent://mytenant/ns/t2 autoAck: true tenant: mytenant namespace: ns0 name: testjarfunction logTopic: userConfig: logging_level: ERROR
Astra Streaming requires the “inputs” topic to have a message schema defined before deploying the function. Otherwise, deployment errors may occur. Use the Astra Portal to define the message schema for a topic.
-
Use pulsar-admin to deploy your new JAR to Astra Streaming or Pulsar. The command below assumes you’ve properly configured the client.conf file for pulsar-admin commands against your Pulsar cluster. See the documentation here for more information.
bin/pulsar-admin functions create --function-config-file </absolute/path/to/your/javajar/func-create-config.yml>
-
Check results: Go to the Astra Portal to see your newly deployed function listed under the “Functions” tab for your Tenant. See Controlling your function for more information on testing and monitoring your function in Astra Streaming.
-
You can also use the pulsar-admin command to list your functions:
bin/pulsar-admin functions list --tenant <mytenant> --namespace <my-namespace>
-
Add functions in Astra Streaming dashboard
Add functions in the Functions tab of the Astra Streaming dashboard.
-
Select Create Function to get started.
-
Choose your function name and namespace.
-
Select the file you want to pull the function from and which function you want to use within that file.
Astra generates a list of acceptable classes. Python and Java functions are added a little differently from each other.
Python functions are added by loading a Python file (.py) or a zipped Python file (.zip). When adding Python files, the Class Name is specified as the name of the Python file without the extension plus the class you want to execute.
For example, if the Python file is called testfunction.py
and the class is ExclamationFunction
, then the class name is testfunction.ExclamationFunction
. The file can contain multiple classes, but only one is used. If there is no class in the Python file (when using a basic function, for example), specify the filename without the extension (ex. function
).
Java functions are added by loading a Java jar file (.jar). When adding Java files, you also need to specify the name of the class to execute as the function.
-
Choose your input topics.
-
Choose Optional Destination Topics for output and logging.
-
Choose Advanced Options and run at least one sink instance.
-
Choose your Processing Guarantee. The default value is ATLEAST_ONCE. Processing Guarantee offers three options:
-
ATLEAST_ONCE: Each message sent to the function can be processed more than once.
-
ATMOST_ONCE: The message sent to the function is processed at most once. Therefore, there is a chance that the message is not processed.
-
EFFECTIVELY_ONCE: Each message sent to the function will have one output associated with it.
-
-
Provide an Option Configuration Key. See the Pulsar Docs for a list of configuration keys.
-
Select Create.
You have created a function for this namespace. You can confirm your function was created in the Functions tab.
Add function with Pulsar CLI
You can also add functions using the Pulsar CLI. We will create a new Python function to consume a message from one topic, add an exclamation point, and publish the results to another topic.
-
Create the following Python function in
testfunction.py
:from pulsar import Function class ExclamationFunction(Function): def __init__(self): pass def process(self, input, context): return input + '!'
-
Deploy
testfunction.py
to your Pulsar cluster using the Pulsar CLI:$ ./pulsar-admin functions create \ --py /full/path/to/testfunction.py \ --classname testfunction.ExclamationFunction \ --tenant <tenant-name> \ --namespace default \ --name exclamation \ --auto-ack true \ --inputs persistent://<tenant-name>/default/in \ --output persistent://<tenant-name>/default/out \ --log-topic persistent://<tenant-name>/default/log
You will see "Created Successfully!" if the function is set up and ready to accept messages.
If you receive a 402 error with "Reason: only qualified organizations can create functions", this means your organization needs to be upgraded to a Pay As You Go plan with a payment method. A qualified organization is an organization on the Pay As You Go plan with a payment method on file. Upgrade your organization to a qualified organization by:
-
Enrolling in the Pay As You Go plan in the Astra Portal with a payment method. For more, see Plan Options.
-
Contacting our sales team to see how we can help.
-
Opening a support ticket.
-
-
Use
./pulsar-admin functions list --tenant <tenant-name>
to list the functions in your tenant and confirm your new function was created.
Testing Your Function
Triggering a function is a convenient way to test that the function is working. When you trigger a function, you are publishing a message on the function’s input topic, which triggers the function to run. If the function has an output topic and the function returns data to the output topic, that data is displayed.
Send a test value with Pulsar CLI’s trigger
to test a function you’ve set up.
-
Listen for messages on the output topic:
$ ./pulsar-client consume persistent://<tenant-name>/default/<topic-name> \ --subscription-name my-subscription \ --num-messages 0 # Listen indefinitely
-
Test your exclamation function with
trigger
:$ ./pulsar-admin functions trigger \ --name exclamation \ --tenant <tenant-name> \ --namespace default \ --trigger-value "Hello world"
The trigger sends the string
Hello world
to your exclamation function. Your function should outputHello world!
to your consumed output.
Controlling Your Function
You can start, stop, and restart your function by selecting it in the Functions dashboard.
Monitoring Your Function
Functions produce logs to help you in debugging. To view your function’s logs, open your function in the Functions dashboard.
In the upper right corner of the function log are controls to Refresh, Copy to Clipboard, and Save your function log.
Updating Your Function
A function that is already running can be updated with new configuration. The following settings can be updated:
-
Function code
-
Output topic
-
Log topic
-
Number of instances
-
Configuration keys
If you need to update any other setting of the function, delete and then re-add the function.
To update your function, select your function in the Functions dashboard.
-
Select Change File to find your function locally and click Open.
-
Update your function’s Instances and Timeout. When you’re done, click Update.
-
An Updates Submitted Successfully flag will appear to let you know your function has been updated.
Deleting Your Function
To delete a function, select the function to be deleted in the Functions dashboard.
-
Click Delete.
-
A popup will ask you to confirm deletion by entering the function’s name and clicking Delete.
-
A Function-name Deleted Successfully! flag will appear to let you know you’ve deleted your function.
Pulsar functions video
Follow along with this video from our Five Minutes About Pulsar series to see a Pulsar Python function in action.