Pulsar Beam with Luna Streaming

The Pulsar Beam↗ project is an HTTP-based streaming and queueing system for use with Apache Pulsar.

With Pulsar Beam, you can send messages over HTTP, push messages to a webhook or cloud function, chain webhooks and functions together, or stream messages via server-sent events↗.

In this guide we will install a minimal DataStax Pulsar Helm chart that includes Pulsar Beam.

Prerequisites

You will need the following prerequisites in place to complete this guide:

  • Helm 3 CLI↗ (this example uses version 3.8.0)

  • Enough access to a K8s cluster to create a namespace, deployments, and pods

  • Kubectl CLI↗ (this example uses version 1.23.4)

Install Luna Streaming Helm chart

  1. Add the DataStax Helm chart repo to your Helm store.

    helm repo add datastax-pulsar https://datastax.github.io/pulsar-helm-chart
  2. Install the Helm chart using a minimalist values file.
    This command creates a Helm release named "my-pulsar-cluster" using the DataStax Luna Helm chart, within the K8s namespace "datastax-pulsar". The minimal cluster creates only the essential components and has no ingress or load balanced services.

    VALUES_URL="https://raw.githubusercontent.com/datastaxdevs/luna-streaming-examples/main/beam/values.yaml"
    helm install \
      --namespace datastax-pulsar \
      --create-namespace \
      --values $VALUES_URL \
      --version 3.0.4 \
      my-pulsar-cluster \
      datastax-pulsar/pulsar
  3. Wait for the broker pod to be in a running state. You might see a few restarts as your components start up.

    kubectl -n datastax-pulsar wait --for=condition=Ready pod/pulsar-broker-0 --timeout=120s

Forward service port

In a separate terminal window, port forward the Beam endpoint service.

kubectl port-forward -n datastax-pulsar service/pulsar-proxy 8085:8085

The forwarding service will map the URL:PORT https://127.0.0.1:8085 to Pulsar Proxy running in the new cluster. Because Beam was enabled, the Proxy knows to forward on to the Beam service.

Forwarding from 127.0.0.1:8085 -> 8085
Forwarding from [::1]:8085 -> 8085

Start a message consumer

In a new terminal window, run the following curl command to begin streaming server-sent events. See configuring your local environment for Astra Streaming.

TOPIC="my-beam-topic"
curl http://127.0.0.1:8085/v2/sse/persistent/public/default/$TOPIC?SubscriptionInitialPosition=earliest&SubscriptionType=exclusive&SubscriptionName=my-beam-subscription

Note the use of SubscriptionInitialPosition=earliest in the message consumer. This instructs Beam to create a subscription on the topic starting at the earliest message. Try changing the value to latest to only receive new messages that arrive.

Produce a new message

Return to the original terminal window and run the following script to produce a new message. Don’t worry about creating the new topic - if a topic doesn’t exist, it is created automatically.

TOPIC="my-beam-topic"
curl --request POST \
  -d "Hi there" \
  http://127.0.0.1:8085/v2/firehose/persistent/public/default/$TOPIC

Consume the message

The message consumer will output the data of the new message just produced.

id: {9 0 0 0 <nil> 0xc002287ad0}
data: Hi there

You have now completed the basics of using Beam in a Pulsar Cluster. Refer to the project’s readme↗ to see all the possibilities!

A Python producer and consumer

This is another example of producing and consuming messages using Beam. Instead of using curl, this example will use the "requests" Python library to issue HTTP requests.

Create project

  1. Create a new file to hold the Python code.

    touch beam.py
  2. Paste the following code into "beam.py".

    import requests
    import json
    
    beamUrl = "http://127.0.0.1:8085";
    
    tenantName = "public";
    namespace = "default";
    topicName = "my-beam-topic";
    
    topic = "persistent://{0}/{1}/{2}".format(tenantName, namespace, topicName)
    
    # Produce a message
    response = requests.post(beamUrl + "/v2/firehose/persistent/{0}/{1}/{2}".format(tenantName, namespace, topicName), data="Hi there!")
    print("Published 1 message ({0})".format(response.status_code))
    
    # Consume a message
    response = requests.get(beamUrl + "/v2/poll/persistent/{0}/{1}/{2}?SubscriptionType=shared&SubscriptionInitialPosition=earliest&SubscriptionName=my-beam-subscription".format(tenantName, namespace, topicName))
    print("Consumed messages ({0}):\n".format(response.status_code) + json.dumps(response.json(),indent=2))

Run example Python code

  1. Run the Python example (we used Python version 3.8.10).

    python3 beam.py
  2. The output should show 1 message was published and 1 message was consumed.

    Published 1 message (200)
    Consumed messages (200):
    {
      "limit": 10,
      "size": 1,
      "messages": [
        {
          "payload": "SGkgdGhlcmUh",
          "topic": "persistent://public/default/my-beam-topic",
          "eventTime": "2023-01-03T15:52:24.054Z",
          "publishTime": "2023-01-03T15:52:24.054Z",
          "messageId": "&{ledgerID:16 entryID:0 batchIdx:0 partitionIdx:0 tracker:<nil> consumer:0xc002287e10}",
          "key": ""
        }
      ]
    }

The payload is base64 encoded. Simply decode the string to see the actual message data.

Clean up

Return to each window running an open process and enter ctrl-c to end the process.
To completely remove all traces of the Helm chart, remove the namespace.

kubectl delete namespace datastax-pulsar

If you want to keep the data, uninstall only the chart.

helm --namespace datastax-pulsar uninstall my-pulsar-cluster

Next steps

Here are links to resources and guides you might be interested in:

Was this helpful?

Give Feedback

How can we improve the documentation?

© 2024 DataStax | Privacy policy | Terms of use

Apache, Apache Cassandra, Cassandra, Apache Tomcat, Tomcat, Apache Lucene, Apache Solr, Apache Hadoop, Hadoop, Apache Pulsar, Pulsar, Apache Spark, Spark, Apache TinkerPop, TinkerPop, Apache Kafka and Kafka are either registered trademarks or trademarks of the Apache Software Foundation or its subsidiaries in Canada, the United States and/or other countries. Kubernetes is the registered trademark of the Linux Foundation.

General Inquiries: +1 (650) 389-6000, info@datastax.com