Use the Python Pulsar client with Astra Streaming
This example demonstrates how you can use the Python Pulsar client with Astra Streaming to produce and consume messages.
Prerequisites
-
A supported Python version:
-
For Linux, versions 3.4 to 3.7 are supported
-
For macOS, version 3.7 is supported
-
-
An Apache Pulsar™ topic in Astra Streaming
-
A text editor or IDE
Create a project
Create a directory for your Python project, create an index.py file, and then install the Pulsar client library with pip:
mkdir SimpleProducerConsumer && cd SimpleProducerConsumer
touch index.py
pip install pulsar-client==2.10
Write the script
In the following steps you will add code to index.py to create a complete script that produces and consumes messages with the Python Pulsar client and Astra Streaming.
Your IDE might show errors until you have completed the script.
-
Open
index.pyin a text editor or IDE. -
Import the required libraries, and then create a Pulsar client instance with the topic URL and token authentication:
index.pyimport pulsar import time serviceUrl = "<REPLACE_WITH_SERVICE_URL>"; pulsarToken = "<REPLACE_WITH_PULSAR_TOKEN>"; tenantName = "<REPLACE_WITH_TENANT_NAME>"; namespace = "<REPLACE_WITH_NAMESPACE>"; topicName = "<REPLACE_WITH_TOPIC>"; topic = "persistent://{0}/{1}/{2}".format(tenantName, namespace, topicName) client = pulsar.Client(serviceUrl, authentication=pulsar.AuthenticationToken(pulsarToken)) -
Provide values for the following variables:
Parameter Definition Where to find the value serviceUrlThe URL to connect to the Pulsar cluster
-
In the Astra Portal header, click Applications, and then select Streaming.
-
Click the name of your tenant, and then click the Connect tab.
-
In the Tenant Details section, get the Broker Service URL.
pulsarTokenThe token for Pulsar cluster authentication
For information about creating Pulsar tokens, see Manage tokens.
tenantNameThe name of your streaming tenant
-
In the Astra Portal header, click Applications, and then select Streaming.
-
Get the tenant name from the Astra Streaming dashboard.
namespaceThe segmented area for certain topics in your streaming tenant
-
In the Astra Portal header, click Applications, and then select Streaming.
-
Click the name of your tenant, and then click the Namespace and Topics tab.
-
Choose the target namespace from the list of namespaces.
topicNameTopic name (not the full name)
-
In the Astra Portal header, click Applications, and then select Streaming.
-
Click the name of your tenant, and then click the Namespace and Topics tab.
-
Expand the target namespace in the list of namespaces to view the names of the topics within. Do not use the Full Name.
-
-
Use the client to create a producer:
index.pyproducer = client.create_producer(topic) -
Send a message:
index.pyproducer.send('Hello World'.encode('utf-8')) -
Use the Python client instance to create a consumer subscription to the same topic that you sent a message to:
index.pyconsumer = client.subscribe(topic, 'my-subscription') -
Iterate through messages and write their data:
index.pywaitingForMsg = True while waitingForMsg: try: msg = consumer.receive(2000) print("Received message '{}' id='{}'".format(msg.data(), msg.message_id())) # Acknowledging the message to remove from message backlog consumer.acknowledge(msg) waitingForMsg = False except: print("Still waiting for a message..."); time.sleep(1) -
Clean up:
client.close()
The complete index.py script is as follows:
import pulsar
import time
serviceUrl = "<REPLACE_WITH_SERVICE_URL>";
pulsarToken = "<REPLACE_WITH_PULSAR_TOKEN>";
tenantName = "<REPLACE_WITH_TENANT_NAME>";
namespace = "<REPLACE_WITH_NAMESPACE>";
topicName = "<REPLACE_WITH_TOPIC>";
topic = "persistent://{0}/{1}/{2}".format(tenantName, namespace, topicName)
client = pulsar.Client(serviceUrl, authentication=pulsar.AuthenticationToken(pulsarToken))
producer = client.create_producer(topic)
producer.send('Hello World'.encode('utf-8'))
consumer = client.subscribe(topic, 'my-subscription')
waitingForMsg = True
while waitingForMsg:
try:
msg = consumer.receive(2000)
print("Received message '{}' id='{}'".format(msg.data(), msg.message_id()))
# Acknowledging the message to remove from message backlog
consumer.acknowledge(msg)
waitingForMsg = False
except:
print("Still waiting for a message...");
time.sleep(1)
client.close()
Run the script
In your Python project directory, run the script:
python3 index.py
The output includes a lot of logs.
Received message confirms that the script succeeded:
Received message 'Hello World' id='(422529,5,-1,0)'