Producing and consuming messages using the python pulsar client on Astra Streaming
Prerequisites
You will need the following prerequisites in place to complete this guide:
-
Linux Python versions 3.4 to 3.7 are supported
-
MacOS version 3.7 is supported
-
A working Pulsar topic (get started here if you don’t have a topic)
-
A basic text editor or IDE
Visit our examples repo to see the complete source of this example. |
Create a project
Create a folder, change directory into it, and start a new Python project.
Install the Pulsar client library with pip.
mkdir SimpleProducerConsumer && cd SimpleProducerConsumer
touch index.py
pip install pulsar-client==2.10
Add the client
Create a new Pulsar client instance with the topic URL and using token authentication.
Open "index.py" in your favorite text editor or IDE and copy in the following code.
import pulsar
import time
serviceUrl = "<REPLACE_WITH_SERVICE_URL>";
pulsarToken = "<REPLACE_WITH_PULSAR_TOKEN>";
tenantName = "<REPLACE_WITH_TENANT_NAME>";
namespace = "<REPLACE_WITH_NAMESPACE>";
topicName = "<REPLACE_WITH_TOPIC>";
topic = "persistent://{0}/{1}/{2}".format(tenantName, namespace, topicName)
client = pulsar.Client(serviceUrl, authentication=pulsar.AuthenticationToken(pulsarToken))
This isn’t complete code (yet), so don’t be alarmed if your editor shows errors.
Notice there are a few variables waiting for replacement values. You can find those values here:
serviceUrl |
This is the URL for connecting to the Pulsar cluster. In the Astra Portal, navigate to the "Connect" tab of your streaming tenant. In the "Details" area you will find the "Broker Service URL". |
pulsarToken |
This is the token used for Pulsar cluster authentications. In the Astra Portal, navigate to the "Settings" tab of your streaming tenant. Select "Create Token". A new token will be generated and available to copy. |
tenantName |
The name of your streaming tenant. In the Astra Portal, navigate to your streaming tenant. In the "Details" area you will find the "Name". |
namespace |
Within your streaming tenant, this is the segmented area for certain topics. In the Astra Portal, navigate to the "Namespace And Topics" tab of your streaming tenant to see a list of namespaces. |
topicName |
Expanding the above chosen namespace will list the topics within. This value is just the topic name (not the "Full Name"). |
Create a producer
Use the client to create a producer.
Add the following code to "index.py".
producer = client.create_producer(topic)
Once the code above creates a producer, add this code to "index.py" to actually send the message.
producer.send('Hello World'.encode('utf-8'))
Create a consumer
Just like the producer, use the Python client instance to create a consumer subscription to the same topic we sent a message to.
Add this code to "index.py".
consumer = client.subscribe(topic, 'my-subscription')
And now for the good stuff! Let’s iterate through messages and write their data.
Add this code to "index.py".
waitingForMsg = True
while waitingForMsg:
try:
msg = consumer.receive(2000)
print("Received message '{}' id='{}'".format(msg.data(), msg.message_id()))
# Acknowledging the message to remove from message backlog
consumer.acknowledge(msg)
waitingForMsg = False
except:
print("Still waiting for a message...");
time.sleep(1)
Finally, add a little clean-up.
client.close()
Run the example
Here it comes: your greatest Python creation yet!
Return to the terminal where you created your Python project and run the following command.
python3 index.py
There will likely be a lot of logging in the output, but look closely and you should see the following message.
Received message 'Hello World' id='(422529,5,-1,0)'
You did it🎉!
Next steps
You’re one step closer to being a messaging ninja. Let’s continue the learning with these guides.