Use the Python Pulsar client with Astra Streaming

This example demonstrates how you can use the Python Pulsar client with Astra Streaming to produce and consume messages.

Prerequisites

  • A supported Python version:

    • For Linux, versions 3.4 to 3.7 are supported

    • For macOS, version 3.7 is supported

  • An Apache Pulsar™ topic in Astra Streaming

  • A text editor or IDE

Create a project

Create a directory for your Python project, create an index.py file, and then install the Pulsar client library with pip:

mkdir SimpleProducerConsumer && cd SimpleProducerConsumer

touch index.py

pip install pulsar-client==2.10

Write the script

In the following steps you will add code to index.py to create a complete script that produces and consumes messages with the Python Pulsar client and Astra Streaming. Your IDE might show errors until you have completed the script.

  1. Open index.py in a text editor or IDE.

  2. Import the required libraries, and then create a Pulsar client instance with the topic URL and token authentication:

    index.py
    import pulsar
    import time
    
    serviceUrl = "<REPLACE_WITH_SERVICE_URL>";
    pulsarToken = "<REPLACE_WITH_PULSAR_TOKEN>";
    
    tenantName = "<REPLACE_WITH_TENANT_NAME>";
    namespace = "<REPLACE_WITH_NAMESPACE>";
    topicName = "<REPLACE_WITH_TOPIC>";
    
    topic = "persistent://{0}/{1}/{2}".format(tenantName, namespace, topicName)
    
    client = pulsar.Client(serviceUrl, authentication=pulsar.AuthenticationToken(pulsarToken))
  3. Provide values for the following variables:

    Parameter Definition Where to find the value

    serviceUrl

    The URL to connect to the Pulsar cluster

    1. In the Astra Portal header, click Applications, and then select Streaming.

    2. Click the name of your tenant, and then click the Connect tab.

    3. In the Tenant Details section, get the Broker Service URL.

    pulsarToken

    The token for Pulsar cluster authentication

    For information about creating Pulsar tokens, see Manage tokens.

    tenantName

    The name of your streaming tenant

    1. In the Astra Portal header, click Applications, and then select Streaming.

    2. Get the tenant name from the Astra Streaming dashboard.

    namespace

    The segmented area for certain topics in your streaming tenant

    1. In the Astra Portal header, click Applications, and then select Streaming.

    2. Click the name of your tenant, and then click the Namespace and Topics tab.

    3. Choose the target namespace from the list of namespaces.

    topicName

    Topic name (not the full name)

    1. In the Astra Portal header, click Applications, and then select Streaming.

    2. Click the name of your tenant, and then click the Namespace and Topics tab.

    3. Expand the target namespace in the list of namespaces to view the names of the topics within. Do not use the Full Name.

  4. Use the client to create a producer:

    index.py
    producer = client.create_producer(topic)
  5. Send a message:

    index.py
    producer.send('Hello World'.encode('utf-8'))
  6. Use the Python client instance to create a consumer subscription to the same topic that you sent a message to:

    index.py
    consumer = client.subscribe(topic, 'my-subscription')
  7. Iterate through messages and write their data:

    index.py
    waitingForMsg = True
    while waitingForMsg:
        try:
            msg = consumer.receive(2000)
            print("Received message '{}' id='{}'".format(msg.data(), msg.message_id()))
    
            # Acknowledging the message to remove from message backlog
            consumer.acknowledge(msg)
    
            waitingForMsg = False
        except:
            print("Still waiting for a message...");
    
        time.sleep(1)
  8. Clean up:

    client.close()

The complete index.py script is as follows:

index.py
import pulsar
import time

serviceUrl = "<REPLACE_WITH_SERVICE_URL>";
pulsarToken = "<REPLACE_WITH_PULSAR_TOKEN>";

tenantName = "<REPLACE_WITH_TENANT_NAME>";
namespace = "<REPLACE_WITH_NAMESPACE>";
topicName = "<REPLACE_WITH_TOPIC>";

topic = "persistent://{0}/{1}/{2}".format(tenantName, namespace, topicName)

client = pulsar.Client(serviceUrl, authentication=pulsar.AuthenticationToken(pulsarToken))

producer = client.create_producer(topic)

producer.send('Hello World'.encode('utf-8'))

consumer = client.subscribe(topic, 'my-subscription')

waitingForMsg = True
while waitingForMsg:
    try:
        msg = consumer.receive(2000)
        print("Received message '{}' id='{}'".format(msg.data(), msg.message_id()))

        # Acknowledging the message to remove from message backlog
        consumer.acknowledge(msg)

        waitingForMsg = False
    except:
        print("Still waiting for a message...");

    time.sleep(1)

client.close()

Run the script

In your Python project directory, run the script:

python3 index.py

The output includes a lot of logs. Received message confirms that the script succeeded:

Received message 'Hello World' id='(422529,5,-1,0)'

Was this helpful?

Give Feedback

How can we improve the documentation?

© Copyright IBM Corporation 2026 | Privacy policy | Terms of use Manage Privacy Choices

Apache, Apache Cassandra, Cassandra, Apache Tomcat, Tomcat, Apache Lucene, Apache Solr, Apache Hadoop, Hadoop, Apache Pulsar, Pulsar, Apache Spark, Spark, Apache TinkerPop, TinkerPop, Apache Kafka and Kafka are either registered trademarks or trademarks of the Apache Software Foundation or its subsidiaries in Canada, the United States and/or other countries. Kubernetes is the registered trademark of the Linux Foundation.

General Inquiries: Contact IBM