Astra Streaming functions
Functions are lightweight compute processes that you can run on each message a topic receives. You can apply custom logic to a message, transforming or enriching it, and then output it to a different topic.
Functions run inside Astra Streaming, which makes them serverless. You write the code for your function in Java, Python, or Go, and then upload the code. It automatically runs for each message published to the specified input topic.
Functions are implemented using Apache Pulsar® functions. See [Pulsar Functions overview] for more information about Pulsar functions.
Custom functions require a paid Astra Streaming plan. Organizations on the Free plan can use transform functions only. |
Deploy a function archive
You can write Pulsar functions for Astra Streaming in Python, Java, or Go.
To deploy a function to Astra Streaming or Pulsar, you can package the project into an archive, including the function code and any dependencies. You can deploy the same archive to either environment.
This is recommended for complex functions with long scripts, multiple scripts, or many dependencies. You can also use this approach to deploy multiple function classes with a single package.
-
Python functions
-
Java functions
To create a Python function, the .zip
file must have the correct structure.
For example, assuming a project named testpythonfunction
, the extracted archive has the following structure:
/my-python-function
python-code/my-python-function.zip
python-code/deps/sh-1.12.14-py2.py3-none-any.whl
python-code/src/my-python-function.py
-
Prepare the directory structure:
mkdir **FUNCTION_NAME** mkdir **FUNCTION_NAME**/python-code mkdir **FUNCTION_NAME**/python-code/deps/ mkdir **FUNCTION_NAME**/python-code/src/ touch **FUNCTION_NAME**/python-code/src/**FUNCTION_NAME**.py
-
Write your function code in a
.py
file. This example adds an exclamation point to the end of any messages.from pulsar import Function class ExclamationFunction(Function): def __init__(self): pass def process(self, input, context): return input + '!'
-
Add dependencies to
/deps
. This example uses thepulsar-client
library.cd deps pip install pulsar-client==2.10.0
-
Create a
.zip
file at the root of the project:cd deps zip -r ../my-python-function.zip .
To deploy a Java function, you must create a .jar
file.
-
Create a Java project for your function.
-
Declare dependencies in
pom.xml
:pom.xml<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>java-function</groupId> <artifactId>java-function</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <dependency> <groupId>org.apache.pulsar</groupId> <artifactId>pulsar-functions-api</artifactId> <version>3.0.0</version> </dependency> </dependencies> <build> <plugins> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <appendAssemblyId>false</appendAssemblyId> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> <archive> <manifest> <mainClass>org.example.test.ExclamationFunction</mainClass> </manifest> </archive> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>assembly</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.11.0</version> <configuration> <release>17</release> </configuration> </plugin> </plugins> </build> </project>
-
Write your function code in your project.
-
Package the
.jar
file with Maven:mvn package
Result
[INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ [INFO] Total time: 10.989 s [INFO] Finished at: 2023-05-16T16:19:05-04:00 [INFO] ------------------------------------------------------------------------
-
If you haven’t done so already, set up your environment for the Pulsar binaries.
-
Create a deployment configuration YAML file that defines the function metadata and associated topics:
func-create-config.yamlpy: PATH_TO_FUNCTION_ARCHIVE className: FILE_NAME.CLASS_NAME parallelism: 1 inputs: - persistent://TENANT_NAME/NAMESPACE_NAME/TOPIC_NAME output: persistent://TENANT_NAME/NAMESPACE_NAME/TOPIC_NAME autoAck: true tenant: TENANT_NAME namespace: NAMESPACE_NAME name: DISPLAY_NAME logTopic: userConfig: logging_level: ERROR
Replace the following:
-
PATH_TO_FUNCTION_ARCHIVE
: The path to the function archive. -
FILE_NAME.CLASS_NAME
: The class to execute. An archive can contain multiple classes, but only one is used per deployment.-
For Python scripts, the
className
is the Python filename (without the extension) and the class to execute, such aspythonfunc.ExclamationFunction
. If there isn’t a class in the file, theclassName
is the filename without the extension, such aspythonfunc
. -
For Java scripts, the
className
is the path and the class to execute, such ascom.example.pulsar.ExclamationFunction
.
-
-
TENANT_NAME/NAMESPACE_NAME/TOPIC_NAME
: Define the tenant, namespace, and topic for bothinput
(incoming messages passed to the function) andoutput
(the results of the function).To avoid errors, make sure the
inputs
topic (declared in your configuration YAML file) has a defined message schema before you deploy the function. You can define a topic’s message schema in the Astra Portal.Optionally, you can declare a
logTopic
in the same way:persistent://TENANT_NAME/NAMESPACE_NAME/TOPIC_NAME
. -
DISPLAY_NAME
: The function display name in Astra Streaming, such astestpythonfunction
.
-
-
Use
pulsar-admin
to deploy the function to Astra Streaming or Pulsar using your configuration file:bin/pulsar-admin functions create --function-config-file PATH_TO_FUNCTION_CONFIG_YAML
Result
A response of
Created Successfully!
indicates the function is deployed and ready to accept messages.If the response is
402 Payment Required
withReason: only qualified organizations can create functions
, then you must upgrade to a paid Astra Streaming plan. Organizations on the Free plan can use transform functions only.If your Python function contains only a single script and no dependencies, you can deploy the
.py
file directly, without packaging it into a.zip
file or creating a configuration file:$ ./pulsar-admin functions create \ --py PATH_TO_PYTHON_FILE \ --classname FILE_NAME.CLASS_NAME \ --tenant TENANT_NAME \ --namespace NAMESPACE_NAME \ --name DISPLAY_NAME \ --auto-ack true \ --inputs persistent://TENANT_NAME/NAMESPACE_NAME/TOPIC_NAME \ --output persistent://TENANT_NAME/NAMESPACE_NAME/TOPIC_NAME \ --log-topic persistent://TENANT_NAME/NAMESPACE_NAME/TOPIC_NAME
If there isn’t a class in the file, the
className
is only the filename without the extension. -
Verify the deployment:
bin/pulsar-admin functions list --tenant TENANT_NAME --namespace NAMESPACE_NAME
You can also check the Astra Portal to confirm the function is listed on the tenant’s Functions tab.
See Controlling your function for more information on testing and monitoring your function in Astra Streaming.
Deploy functions in the Astra Portal
-
In the Astra Portal navigation menu, click Streaming, and then select a tenant.
-
On the Functions tab, click Create Function.
-
Enter a function name, and then select the namespace within the tenant.
-
Upload function code:
-
Upload your own code
-
Use DataStax transform function
-
Select Upload my own code.
-
Select your function file:
-
.py
: A single, independent Python script -
.zip
: A Python script with dependencies -
.jar
: A Java function -
.go
: A Go function
-
-
Based on the uploaded file, select the specific class (function) to deploy.
Astra DB generates a list of acceptable classes detected in the code. A file can contain multiple classes, but only one is used per deployment.
For Python scripts, the class name is the Python filename (without the extension) and the class to execute. For example, if the Python file is called
testfunction.py
and the class isExclamationFunction
, then the class name istestfunction.ExclamationFunction
. If there isn’t a class in the Python file, the class name is the filename without the extension, such astestfunction
.For Java scripts, the class name is the class to execute.
-
Select Use DataStax transform function.
This is the only function option available on the Astra Streaming Free plan.
For more information, see Transform Functions and Astra Streaming pricing.
-
-
Select input topics.
-
(Optional) Select output and log topics.
-
(Optional) Configure advanced settings:
-
Instances: Enter a number of sink instances to run.
-
Processing Guarantee: Select one of the following:
-
ATLEAST_ONCE (default): Each message sent to the function can be processed one or more times.
-
ATMOST_ONCE: Each message sent to the function is processed 0 or 1 times. This means there is a change that a message is not processed.
-
EFFECTIVELY_ONCE: Each message sent to the function has only one output associated with it.
-
-
Timeout: Set a timeout limit.
-
Auto Acknowledge: Enable or disable automatic message acknowledgment.
-
-
(Optional) Provide a config key, if required. For more information, see the Pulsar documentation.
-
Click Create.
-
Confirm your function was created on the Functions tab.
Manage deployed functions
After you deploy a function, you can test, start, stop, monitor, edit, and delete it.
Test functions
To test the function, publish a message to the function’s input
topic or use pulsar-admin functions trigger
.
If the function produces output and it has an output
topic, the output data is returned.
-
Listen for messages on the
output
topic:$ ./pulsar-client consume persistent://**TENANT_NAME**/**NAMESPACE_NAME**/**TOPIC_NAME** \ --subscription-name my-subscription \ --num-messages 0 # Listen indefinitely
-
Test your function:
$ ./pulsar-admin functions trigger \ --name **FUNCTION_DISPLAY_NAME** \ --tenant **TENANT_NAME** \ --namespace **NAMESPACE_NAME** \ --trigger-value "**MESSAGE**"
The trigger sends the message string to the function. Your function should output the result of processing the message.
Stop and start functions
In the Astra Portal, on your tenant’s Functions tab, you can use Function Controls to start, stop, and restart functions.
Monitor functions
Functions produce logs to help you debug them. In the Astra Portal, on your tenant’s Functions tab, you can view, refresh, copy, and download your functions' logs.
If you specified a log topic when deploying your function, function logs also output to that topic.
Edit functions
-
In the Astra Portal, on your tenant’s Functions tab, click Update Function.
-
Edit the following settings as needed, and then click Update.
-
Function code
-
Output topic
-
Log topic
-
Number of instances
-
Configuration keys
-
If you need to change any other function settings, you must delete and redeploy the function with the desired settings.
Delete functions
Deleting a function is permanent. |
-
In the Astra Portal, on your tenant’s Functions tab, select the function to delete.
-
Click Delete.
-
To confirm deletion, enter the function’s name, and then click Delete.