Use the Python Pulsar client with Astra Streaming

You can use the Python Pulsar client with Astra Streaming to produce and consume messages.

Go to the examples repo for the complete source of this example.

Prerequisites

  • A supported Python version:

    • For Linux, versions 3.4 to 3.7 are supported

    • For macOS, version 3.7 is supported

  • A Pulsar topic in Astra Streaming

  • A text editor or IDE

Create a project

  1. Create a folder for a new Python project.

  2. In your new directory, install the Pulsar client library with pip:

    mkdir SimpleProducerConsumer && cd SimpleProducerConsumer
    
    touch index.py
    
    pip install pulsar-client==2.10

Write the script

  1. Create an index.py file containing the following code. This code creates a Pulsar client instance with the topic URL and token authentication.

    index.py
    import pulsar
    import time
    
    serviceUrl = "<REPLACE_WITH_SERVICE_URL>";
    pulsarToken = "<REPLACE_WITH_PULSAR_TOKEN>";
    
    tenantName = "<REPLACE_WITH_TENANT_NAME>";
    namespace = "<REPLACE_WITH_NAMESPACE>";
    topicName = "<REPLACE_WITH_TOPIC>";
    
    topic = "persistent://{0}/{1}/{2}".format(tenantName, namespace, topicName)
    
    client = pulsar.Client(serviceUrl, authentication=pulsar.AuthenticationToken(pulsarToken))

    This script is intentionally incomplete. Your IDE might show errors until you complete the next steps.

  2. Provide values for the following variables:

    Parameter Definition Where to find the value

    serviceUrl

    The URL to connect to the Pulsar cluster

    In the Astra Portal navigation menu, click Streaming, select your streaming tenant, and then click the Connect tab. In the Details section, get the Broker Service URL.

    pulsarToken

    The token for Pulsar cluster authentication

    For information about creating Pulsar tokens, see Manage tokens.

    tenantName

    The name of your streaming tenant

    In the Astra Portal navigation menu, click Streaming, select your streaming tenant. In the Details section, get the Name.

    namespace

    The segmented area for certain topics in your streaming tenant

    In the Astra Portal navigation menu, click Streaming, select your streaming tenant, and then click the Namespace and Topics tab. Choose the target namespace from the list of namespaces.

    topicName

    Topic name (not the full name)

    In the Astra Portal navigation menu, click Streaming, select your streaming tenant, and then click the Namespace and Topics tab. Expand the target namespace in the list of namespaces to view the names of the topics within. Do not use the Full Name.

  3. Use the client to create a producer:

    index.py
    producer = client.create_producer(topic)
  4. Send a message:

    index.py
    producer.send('Hello World'.encode('utf-8'))
  5. Use the Python client instance to create a consumer subscription to the same topic that you sent a message to:

    index.py
    consumer = client.subscribe(topic, 'my-subscription')
  6. Iterate through messages and write their data:

    index.py
    waitingForMsg = True
    while waitingForMsg:
        try:
            msg = consumer.receive(2000)
            print("Received message '{}' id='{}'".format(msg.data(), msg.message_id()))
    
            # Acknowledging the message to remove from message backlog
            consumer.acknowledge(msg)
    
            waitingForMsg = False
        except:
            print("Still waiting for a message...");
    
        time.sleep(1)
  7. Clean up:

    client.close()

Run the script

In your Python project directory, run the script:

python3 index.py

The output includes a lot of logs. Received message confirms that the script succeeded:

Received message 'Hello World' id='(422529,5,-1,0)'

Was this helpful?

Give Feedback

How can we improve the documentation?

© 2024 DataStax | Privacy policy | Terms of use

Apache, Apache Cassandra, Cassandra, Apache Tomcat, Tomcat, Apache Lucene, Apache Solr, Apache Hadoop, Hadoop, Apache Pulsar, Pulsar, Apache Spark, Spark, Apache TinkerPop, TinkerPop, Apache Kafka and Kafka are either registered trademarks or trademarks of the Apache Software Foundation or its subsidiaries in Canada, the United States and/or other countries. Kubernetes is the registered trademark of the Linux Foundation.

General Inquiries: +1 (650) 389-6000, info@datastax.com