• Glossary
  • Support
  • Downloads
  • DataStax Home
Get Live Help
Expand All
Collapse All

DataStax Streaming Home

Astra Streaming Documentation

    • Getting Started
    • Developing
      • Using Pulsar binaries with Astra Streaming
      • Using curl with Astra Streaming
      • Astra CLI
      • Astra Streaming Functions
      • Starlight for Kafka
      • Starlight for RabbitMQ
      • Producing and consuming messages
        • Astra Portal
        • Pulsar Cli
        • Client Applications
          • Java
          • Python
          • C#
          • Golang
          • Node.js
      • Change data capture (CDC)
        • CDC for Astra DB
    • Operations
      • Geo-replication
      • Astra Streaming Limits
      • Astra Streaming Pricing
      • Astra Streaming Regions
      • Scrape Astra Streaming metrics with Prometheus
      • Manage Tokens
      • Enrollment FAQ
    • Guides and Examples
      • FAQs
      • Manage permissions
        • Use custom roles
      • Pulsar subscriptions
        • Exclusive
        • Shared
        • Failover
        • Key_shared
    • API Docs
      • Using the DevOps v2 API
      • API References
    • IO Connectors
    • Astra Streaming release notes
  • Astra Streaming Documentation
  • Developing
  • Astra Streaming Functions
Edit this Page

Astra Streaming Functions

Functions are lightweight compute processes that enable you to process each message received on a topic. You can apply custom logic to that message, transforming or enriching it, and then output it to a different topic.

Functions run inside Astra Streaming and are therefore serverless. You write the code for your function in Java or Python, then upload the code. It will be automatically run for each message published to the specified input topic.

Functions are implemented using Apache Pulsar® functions. See Pulsar Functions overview for more information about Pulsar functions.

Add functions in dashboard

Add functions in the Functions tab of the Astra Streaming dashboard.

  1. Select Create Function to get started.

  2. Choose your function name and namespace.

    Function and Namespace
  3. Select the file you want to pull the function from and which function you want to use within that file.

Astra generates a list of acceptable classes. Python and Java functions are added a little differently from each other.

Python functions are added by loading a Python file (.py) or a zipped Python file (.zip). When adding Python files, the Class Name is specified as the name of the Python file without the extension plus the class you want to execute.

For example, if the Python file is called testfunction.py and the class is ExclamationFunction, then the class name is testfunction.ExclamationFunction. The file can contain multiple classes, but only one is used. If there is no class in the Python file (when using a basic function, for example), specify the filename without the extension (ex. function).

Java functions are added by loading a Java jar file (.jar). When adding Java files, you also need to specify the name of the class to execute as the function.

Exclamation Function
  1. Choose your input topics.

    IO Topics
  2. Choose Optional Destination Topics for output and logging.

    Optional Topics
  3. Choose Advanced Options and run at least one sink instance.

    Advanced Configuration
  4. Choose your Processing Guarantee. The default value is ATLEAST_ONCE. Processing Guarantee offers three options:

    • ATLEAST_ONCE: Each message sent to the function can be processed more than once.

    • ATMOST_ONCE: The message sent to the function is processed at most once. Therefore, there is a chance that the message is not processed.

    • EFFECTIVELY_ONCE: Each message sent to the function will have one output associated with it.

  5. Provide an Option Configuration Key. See the Pulsar Docs for a list of configuration keys.

    Provide Config Key
  6. Select Create.

You have created a function for this namespace. You can confirm your function was created in the Functions tab.

Add function with Pulsar CLI

You can also add functions using the Pulsar CLI. We will create a new Python function to consume a message from one topic, add an exclamation point, and publish the results to another topic.

  1. Create the following Python function in testfunction.py:

    from pulsar import Function
    
    class ExclamationFunction(Function):
      def __init__(self):
        pass
    
      def process(self, input, context):
        return input + '!'
  2. Deploy testfunction.py to your Pulsar cluster using the Pulsar CLI:

    $ ./pulsar-admin functions create \
      --py /full/path/to/testfunction.py \
      --classname testfunction.ExclamationFunction \
      --tenant <tenant-name> \
      --namespace default \
      --name exclamation \
      --auto-ack true \
      --inputs persistent://<tenant-name>/default/in \
      --output persistent://<tenant-name>/default/out \
      --log-topic persistent://<tenant-name>/default/log

    If the function is set up and ready to accept messages, you should see "Created Successfully!"

  3. Use ./pulsar-admin functions list --tenant <tenant-name> to list the functions in your tenant and confirm your new function was created.

Testing Your Function

Triggering a function is a convenient way to test that the function is working. When you trigger a function, you are publishing a message on the function’s input topic, which triggers the function to run. If the function has an output topic and the function returns data to the output topic, that data is displayed.

Send a test value with Pulsar CLI’s trigger to test a function you’ve set up.

  1. Listen for messages on the output topic:

    $ ./pulsar-client consume persistent://<tenant-name>/default/<topic-name> \
      --subscription-name my-subscription \
      --num-messages 0 # Listen indefinitely
  2. Test your exclamation function with trigger:

    $ ./pulsar-admin functions trigger \
      --name exclamation \
      --tenant <tenant-name> \
      --namespace default \
      --trigger-value "Hello world"

    The trigger sends the string Hello world to your exclamation function. Your function should output Hello world! to your consumed output.

Controlling Your Function

You can start, stop, and restart your function by selecting it in the Functions dashboard.

Function Controls

Monitoring Your Function

Functions produce logs to help you in debugging. To view your function’s logs, open your function in the Functions dashboard.

Function Log

In the upper right corner of the function log are controls to Refresh, Copy to Clipboard, and Save your function log.

Updating Your Function

A function that is already running can be updated with new configuration. The following settings can be updated:

  • Function code

  • Output topic

  • Log topic

  • Number of instances

  • Configuration keys

If you need to update any other setting of the function, delete and then re-add the function.

To update your function, select your function in the Functions dashboard.

Update Function
  1. Select Change File to find your function locally and click Open.

  2. Update your function’s Instances and Timeout. When you’re done, click Update.

  3. An Updates Submitted Successfully flag will appear to let you know your function has been updated.

Deleting Your Function

To delete a function, select the function to be deleted in the Functions dashboard.

Delete Function
  1. Click Delete.

  2. A popup will ask you to confirm deletion by entering the function’s name and clicking Delete.

  3. A Function-name Deleted Successfully! flag will appear to let you know you’ve deleted your function.

Pulsar functions video

Follow along with this video from our Five Minutes About Pulsar series to see a Pulsar Python function in action.

Next

Learn more about developing functions for Astra Streaming and Pulsar here.

Astra CLI Starlight for Kafka

General Inquiries: +1 (650) 389-6000 info@datastax.com

© DataStax | Privacy policy | Terms of use

DataStax, Titan, and TitanDB are registered trademarks of DataStax, Inc. and its subsidiaries in the United States and/or other countries.

Apache, Apache Cassandra, Cassandra, Apache Tomcat, Tomcat, Apache Lucene, Apache Solr, Apache Hadoop, Hadoop, Apache Pulsar, Pulsar, Apache Spark, Spark, Apache TinkerPop, TinkerPop, Apache Kafka and Kafka are either registered trademarks or trademarks of the Apache Software Foundation or its subsidiaries in Canada, the United States and/or other countries.

Kubernetes is the registered trademark of the Linux Foundation.

landing_page landingpage