• Glossary
  • Support
  • Downloads
  • DataStax Home
Get Live Help
Expand All
Collapse All

DataStax Streaming Home

Luna Streaming Documentation

    • Luna Streaming FAQs
    • Luna Streaming Installation on Azure Kubernetes Service
    • Luna Streaming Installation on Amazon Elastic Kubernetes Environment
    • Luna Streaming Installation on Google Kubernetes Environment
    • Install & Upgrade
      • Quick Start for Helm Chart installs
      • Quick Start for Bare Metal/VM installs
      • Supported Software
    • Components
      • Pulsar Admin Console
      • Install Pulsar Admin Console on Server/VM
      • Pulsar Heartbeat
      • Heartbeat on VM/Server
      • Pulsar Beam
      • Pulsar SQL
      • Starlight for Luna Streaming
    • Operations
      • Luna Streaming Authentication
      • Luna Streaming Functions
      • Luna Streaming IO connectors
      • Elasticsearch sink connector
      • Scaling your Luna Streaming cluster
      • Troubleshooting
  • Luna Streaming Documentation
  • Components
  • Pulsar Beam
2.10-3.x 2.10-2.x 2.10-1.x
Edit this Page

Pulsar Beam with Luna Streaming

The Pulsar Beam↗ project is an HTTP-based streaming and queueing system for use with Apache Pulsar.

With Pulsar Beam, you can send messages over HTTP, push messages to a webhook or cloud function, chain webhooks and functions together, or stream messages via server-sent events↗.

In this guide we will install a minimal DataStax Pulsar Helm chart that includes Pulsar Beam.

Prerequisites

You will need the following prerequisites in place to complete this guide:

  • Helm 3 CLI↗ (this example uses version 3.8.0)

  • Enough access to a K8s cluster to create a namespace, deployments, and pods

  • Kubectl CLI↗ (this example uses version 1.23.4)

Install Luna Streaming Helm chart

  1. Add the DataStax Helm chart repo to your Helm store.

    helm repo add datastax-pulsar https://datastax.github.io/pulsar-helm-chart
  2. Install the Helm chart using a minimalist values file.
    This command creates a Helm release named "my-pulsar-cluster" using the DataStax Luna Helm chart, within the K8s namespace "datastax-pulsar". The minimal cluster creates only the essential components and has no ingress or load balanced services.

    VALUES_URL="https://raw.githubusercontent.com/datastaxdevs/luna-streaming-examples/main/beam/values.yaml"
    helm install \
      --namespace datastax-pulsar \
      --create-namespace \
      --values $VALUES_URL \
      --version 3.0.4 \
      my-pulsar-cluster \
      datastax-pulsar/pulsar
  3. Wait for the broker pod to be in a running state. You might see a few restarts as your components start up.

    kubectl -n datastax-pulsar wait --for=condition=Ready pod/pulsar-broker-0 --timeout=120s

Forward service port

In a separate terminal window, port forward the Beam endpoint service.

kubectl port-forward -n datastax-pulsar service/pulsar-proxy 8085:8085

The forwarding service will map the URL:PORT https://127.0.0.1:8085 to Pulsar Proxy running in the new cluster. Because Beam was enabled, the Proxy knows to forward on to the Beam service.

Forwarding from 127.0.0.1:8085 -> 8085
Forwarding from [::1]:8085 -> 8085

Start a message consumer

In a new terminal window, run the following curl command to begin streaming server-sent events. See configuring your local environment for Astra Streaming.

TOPIC="my-beam-topic"
curl http://127.0.0.1:8085/v2/sse/persistent/public/default/$TOPIC?SubscriptionInitialPosition=earliest&SubscriptionType=exclusive&SubscriptionName=my-beam-subscription

Note the use of SubscriptionInitialPosition=earliest in the message consumer. This instructs Beam to create a subscription on the topic starting at the earliest message. Try changing the value to latest to only receive new messages that arrive.

Produce a new message

Return to the original terminal window and run the following script to produce a new message. Don’t worry about creating the new topic - if a topic doesn’t exist, it is created automatically.

TOPIC="my-beam-topic"
curl --request POST \
  -d "Hi there" \
  http://127.0.0.1:8085/v2/firehose/persistent/public/default/$TOPIC

Consume the message

The message consumer will output the data of the new message just produced.

id: {9 0 0 0 <nil> 0xc002287ad0}
data: Hi there

You have now completed the basics of using Beam in a Pulsar Cluster. Refer to the project’s readme↗ to see all the possibilities!

A Python producer and consumer

This is another example of producing and consuming messages using Beam. Instead of using curl, this example will use the "requests" Python library to issue HTTP requests.

Create project

  1. Create a new file to hold the Python code.

    touch beam.py
  2. Paste the following code into "beam.py".

    import requests
    import json
    
    beamUrl = "http://127.0.0.1:8085";
    
    tenantName = "public";
    namespace = "default";
    topicName = "my-beam-topic";
    
    topic = "persistent://{0}/{1}/{2}".format(tenantName, namespace, topicName)
    
    # Produce a message
    response = requests.post(beamUrl + "/v2/firehose/persistent/{0}/{1}/{2}".format(tenantName, namespace, topicName), data="Hi there!")
    print("Published 1 message ({0})".format(response.status_code))
    
    # Consume a message
    response = requests.get(beamUrl + "/v2/poll/persistent/{0}/{1}/{2}?SubscriptionType=shared&SubscriptionInitialPosition=earliest&SubscriptionName=my-beam-subscription".format(tenantName, namespace, topicName))
    print("Consumed messages ({0}):\n".format(response.status_code) + json.dumps(response.json(),indent=2))

Run example Python code

  1. Run the Python example (we used Python version 3.8.10).

    python3 beam.py
  2. The output should show 1 message was published and 1 message was consumed.

    Published 1 message (200)
    Consumed messages (200):
    {
      "limit": 10,
      "size": 1,
      "messages": [
        {
          "payload": "SGkgdGhlcmUh",
          "topic": "persistent://public/default/my-beam-topic",
          "eventTime": "2023-01-03T15:52:24.054Z",
          "publishTime": "2023-01-03T15:52:24.054Z",
          "messageId": "&{ledgerID:16 entryID:0 batchIdx:0 partitionIdx:0 tracker:<nil> consumer:0xc002287e10}",
          "key": ""
        }
      ]
    }

The payload is base64 encoded. Simply decode the string to see the actual message data.

Clean up

Return to each window running an open process and enter ctrl-c to end the process.
To completely remove all traces of the Helm chart, remove the namespace.

kubectl delete namespace datastax-pulsar

If you want to keep the data, uninstall only the chart.

helm --namespace datastax-pulsar uninstall my-pulsar-cluster

Next steps

Here are links to resources and guides you might be interested in:

  • Learn more↗ about the Pulsar Beam project

  • Pulsar Beam API↗

  • Pulsar SQL with DataStax Luna Streaming

Heartbeat on VM/Server Pulsar SQL

General Inquiries: +1 (650) 389-6000 info@datastax.com

© DataStax | Privacy policy | Terms of use

DataStax, Titan, and TitanDB are registered trademarks of DataStax, Inc. and its subsidiaries in the United States and/or other countries.

Apache, Apache Cassandra, Cassandra, Apache Tomcat, Tomcat, Apache Lucene, Apache Solr, Apache Hadoop, Hadoop, Apache Pulsar, Pulsar, Apache Spark, Spark, Apache TinkerPop, TinkerPop, Apache Kafka and Kafka are either registered trademarks or trademarks of the Apache Software Foundation or its subsidiaries in Canada, the United States and/or other countries.

Kubernetes is the registered trademark of the Linux Foundation.

landing_page landingpage