Pulsar Beam with Luna Streaming
The Pulsar Beam project is an HTTP-based streaming and queueing system for use with Apache Pulsar.
With Pulsar Beam, you can send messages over HTTP, push messages to a webhook or cloud function, chain webhooks and functions together, or stream messages through server-sent events (SSE).
In this guide, you’ll install a minimal DataStax Pulsar Helm chart that includes Pulsar Beam.
Prerequisites
-
Helm 3 CLI (this example uses version 3.8.0)
-
Enough access to a K8s cluster to create a namespace, deployments, and pods
-
Kubectl CLI (this example uses version 1.23.4)
Install Luna Streaming Helm chart
-
Add the DataStax Helm chart repo to your Helm store:
helm repo add datastax-pulsar https://datastax.github.io/pulsar-helm-chart
-
Install the Helm chart using a minimalist values file. This command creates a Helm release named "my-pulsar-cluster" using the DataStax Luna Helm chart, within the K8s namespace "datastax-pulsar". The minimal cluster creates only the essential components and has no ingress or load balanced services.
VALUES_URL="https://raw.githubusercontent.com/datastaxdevs/luna-streaming-examples/main/beam/values.yaml" helm install \ --namespace datastax-pulsar \ --create-namespace \ --values $VALUES_URL \ --version 3.0.4 \ my-pulsar-cluster \ datastax-pulsar/pulsar
-
Wait for the broker pod to be in a running state. You might see a few restarts as your components start up.
kubectl -n datastax-pulsar wait --for=condition=Ready pod/pulsar-broker-0 --timeout=120s
Forward service port
In a separate terminal window, port forward the Beam endpoint service:
kubectl port-forward -n datastax-pulsar service/pulsar-proxy 8085:8085
The forwarding service will map the URL:PORT https://127.0.0.1:8085 to Pulsar Proxy running in the new cluster. Because Beam was enabled, the Proxy knows to forward on to the Beam service.
Forwarding from 127.0.0.1:8085 -> 8085
Forwarding from [::1]:8085 -> 8085
Start a message consumer
In a new terminal window, run the following curl command to begin streaming server-sent events. See configuring your local environment for Astra Streaming.
TOPIC="my-beam-topic"
curl http://127.0.0.1:8085/v2/sse/persistent/public/default/$TOPIC?SubscriptionInitialPosition=earliest&SubscriptionType=exclusive&SubscriptionName=my-beam-subscription
Note the use of SubscriptionInitialPosition=earliest
in the message consumer.
This instructs Beam to create a subscription on the topic starting at the earliest message.
Try changing the value to latest
to only receive new messages that arrive.
Produce a new message
Return to the original terminal window and run the following script to produce a new message. Don’t worry about creating the new topic - if a topic doesn’t exist, it is created automatically.
TOPIC="my-beam-topic"
curl --request POST \
-d "Hi there" \
http://127.0.0.1:8085/v2/firehose/persistent/public/default/$TOPIC
Consume the message
The message consumer will output the data of the new message just produced.
id: {9 0 0 0 <nil> 0xc002287ad0}
data: Hi there
You have now completed the basics of using Beam in a Pulsar Cluster. Refer to the project’s readme to see all the possibilities!
A Python producer and consumer
This is another example of producing and consuming messages using Beam. Instead of using curl, this example will use the "requests" Python library to issue HTTP requests.
Create project
-
Create a new file to hold the Python code.
touch beam.py
-
Paste the following code into "beam.py".
import requests import json beamUrl = "http://127.0.0.1:8085"; tenantName = "public"; namespace = "default"; topicName = "my-beam-topic"; topic = "persistent://{0}/{1}/{2}".format(tenantName, namespace, topicName) # Produce a message response = requests.post(beamUrl + "/v2/firehose/persistent/{0}/{1}/{2}".format(tenantName, namespace, topicName), data="Hi there!") print("Published 1 message ({0})".format(response.status_code)) # Consume a message response = requests.get(beamUrl + "/v2/poll/persistent/{0}/{1}/{2}?SubscriptionType=shared&SubscriptionInitialPosition=earliest&SubscriptionName=my-beam-subscription".format(tenantName, namespace, topicName)) print("Consumed messages ({0}):\n".format(response.status_code) + json.dumps(response.json(),indent=2))
Run example Python code
-
Run the Python example (we used Python version 3.8.10).
python3 beam.py
-
The output should show 1 message was published and 1 message was consumed.
Published 1 message (200) Consumed messages (200): { "limit": 10, "size": 1, "messages": [ { "payload": "SGkgdGhlcmUh", "topic": "persistent://public/default/my-beam-topic", "eventTime": "2023-01-03T15:52:24.054Z", "publishTime": "2023-01-03T15:52:24.054Z", "messageId": "&{ledgerID:16 entryID:0 batchIdx:0 partitionIdx:0 tracker:<nil> consumer:0xc002287e10}", "key": "" } ] }
The payload is base64 encoded. Simply decode the string to see the actual message data. |
Clean up
Return to each window running an open process and enter ctrl-c
to end the process.
To completely remove all traces of the Helm chart, remove the namespace:
kubectl delete namespace datastax-pulsar
If you want to keep the data, uninstall only the chart:
helm --namespace datastax-pulsar uninstall my-pulsar-cluster
Next steps
Here are links to resources and guides you might be interested in:
-
Learn more about the Pulsar Beam project