Producing and consuming messages using the python pulsar client on Astra Streaming

Prerequisites

You will need the following prerequisites in place to complete this guide:

  • Linux Python versions 3.4 to 3.7 are supported

  • MacOS version 3.7 is supported

  • A working Pulsar topic (get started here if you don’t have a topic)

  • A basic text editor or IDE

Visit our examples repo to see the complete source of this example.

Create a project

Create a folder, change directory into it, and start a new Python project.

Install the Pulsar client library with pip.

mkdir SimpleProducerConsumer && cd SimpleProducerConsumer

touch index.py

pip install pulsar-client==2.10

Add the client

Create a new Pulsar client instance with the topic URL and using token authentication.

Open "index.py" in your favorite text editor or IDE and copy in the following code.

import pulsar
import time

serviceUrl = "<REPLACE_WITH_SERVICE_URL>";
pulsarToken = "<REPLACE_WITH_PULSAR_TOKEN>";

tenantName = "<REPLACE_WITH_TENANT_NAME>";
namespace = "<REPLACE_WITH_NAMESPACE>";
topicName = "<REPLACE_WITH_TOPIC>";

topic = "persistent://{0}/{1}/{2}".format(tenantName, namespace, topicName)

client = pulsar.Client(serviceUrl, authentication=pulsar.AuthenticationToken(pulsarToken))

This isn’t complete code (yet), so don’t be alarmed if your editor shows errors.

Notice there are a few variables waiting for replacement values. You can find those values here:

serviceUrl

This is the URL for connecting to the Pulsar cluster.

In the Astra Portal, navigate to the "Connect" tab of your streaming tenant. In the "Details" area you will find the "Broker Service URL".

pulsarToken

This is the token used for Pulsar cluster authentications.

In the Astra Portal, navigate to the "Settings" tab of your streaming tenant. Select "Create Token". A new token will be generated and available to copy.

tenantName

The name of your streaming tenant.

In the Astra Portal, navigate to your streaming tenant. In the "Details" area you will find the "Name".

namespace

Within your streaming tenant, this is the segmented area for certain topics.

In the Astra Portal, navigate to the "Namespace And Topics" tab of your streaming tenant to see a list of namespaces.

topicName

Expanding the above chosen namespace will list the topics within. This value is just the topic name (not the "Full Name").

Create a producer

Use the client to create a producer.

Add the following code to "index.py".

producer = client.create_producer(topic)

Once the code above creates a producer, add this code to "index.py" to actually send the message.

producer.send('Hello World'.encode('utf-8'))

Create a consumer

Just like the producer, use the Python client instance to create a consumer subscription to the same topic we sent a message to.

Add this code to "index.py".

consumer = client.subscribe(topic, 'my-subscription')

And now for the good stuff! Let’s iterate through messages and write their data.

Add this code to "index.py".

waitingForMsg = True
while waitingForMsg:
    try:
        msg = consumer.receive(2000)
        print("Received message '{}' id='{}'".format(msg.data(), msg.message_id()))

        # Acknowledging the message to remove from message backlog
        consumer.acknowledge(msg)

        waitingForMsg = False
    except:
        print("Still waiting for a message...");

    time.sleep(1)

Finally, add a little clean-up.

client.close()

Run the example

Here it comes: your greatest Python creation yet!

Return to the terminal where you created your Python project and run the following command.

python3 index.py

There will likely be a lot of logging in the output, but look closely and you should see the following message.

Received message 'Hello World' id='(422529,5,-1,0)'

You did it🎉!

Next steps

You’re one step closer to being a messaging ninja. Let’s continue the learning with these guides.

Was this helpful?

Give Feedback

How can we improve the documentation?

© 2024 DataStax | Privacy policy | Terms of use

Apache, Apache Cassandra, Cassandra, Apache Tomcat, Tomcat, Apache Lucene, Apache Solr, Apache Hadoop, Hadoop, Apache Pulsar, Pulsar, Apache Spark, Spark, Apache TinkerPop, TinkerPop, Apache Kafka and Kafka are either registered trademarks or trademarks of the Apache Software Foundation or its subsidiaries in Canada, the United States and/or other countries. Kubernetes is the registered trademark of the Linux Foundation.

General Inquiries: +1 (650) 389-6000, info@datastax.com