• Glossary
  • Support
  • Downloads
  • DataStax Home
Get Live Help
Expand All
Collapse All

DataStax Streaming Home

Astra Streaming Documentation

    • Getting Started
    • Developing
      • Using Pulsar binaries with Astra Streaming
      • Using curl with Astra Streaming
      • Astra CLI
      • Astra Streaming Functions
      • Starlight for Kafka
      • Starlight for RabbitMQ
      • Producing and consuming messages
        • Astra Portal
        • Pulsar Cli
        • Client Applications
          • Java
          • Python
          • C#
          • Golang
          • Node.js
      • Change data capture (CDC)
        • CDC for Astra DB
    • Operations
      • Geo-replication
      • Astra Streaming Limits
      • Astra Streaming Pricing
      • Astra Streaming Regions
      • Scrape Astra Streaming metrics with Prometheus
      • Manage Tokens
      • Enrollment FAQ
    • Guides and Examples
      • FAQs
      • Manage permissions
        • Use custom roles
      • Pulsar subscriptions
        • Exclusive
        • Shared
        • Failover
        • Key_shared
    • API Docs
      • Using the DevOps v2 API
      • API References
    • IO Connectors
    • Astra Streaming release notes
  • Astra Streaming Documentation
  • Developing
  • Producing and consuming messages
  • Client Applications
  • Python
Edit this Page

Python pulsar client on Astra Streaming

Prerequisites

You will need the following prerequisites in place to complete this guide:

  • Linux Python versions 3.4 to 3.7 are supported

  • MacOS version 3.7 is supported

  • A working Pulsar topic (get started here if you don’t have a topic)

  • A basic text editor or IDE

Visit our examples repo↗ to see the complete source of this example.

Create a project

Create a folder, change directory into it, and start a new Python project.

Install the Pulsar client library with pip.

mkdir SimpleProducerConsumer && cd SimpleProducerConsumer

touch index.py

pip install pulsar-client==2.10.2

Add the client

Create a new Pulsar client instance with the topic URL and using token authentication.

Open "index.py" in your favorite text editor or IDE and copy in the following code.

import pulsar
import time

serviceUrl = "<REPLACE_WITH_SERVICE_URL>";
pulsarToken = "<REPLACE_WITH_PULSAR_TOKEN>";

tenantName = "<REPLACE_WITH_TENANT_NAME>";
namespace = "<REPLACE_WITH_NAMESPACE>";
topicName = "<REPLACE_WITH_TOPIC>";

topic = "persistent://{0}/{1}/{2}".format(tenantName, namespace, topicName)

client = pulsar.Client(serviceUrl, authentication=pulsar.AuthenticationToken(pulsarToken))

This isn’t complete code (yet), so don’t be alarmed if your editor shows errors.

Notice there are a few variables waiting for replacement values. You can find those values here:

serviceUrl

This is the URL for connecting to the Pulsar cluster.

In the Astra Portal, navigate to the "Connect" tab of your streaming tenant. In the "Details" area you will find the "Broker Service URL".

pulsarToken

This is the token used for Pulsar cluster authentications.

In the Astra Portal, navigate to the "Settings" tab of your streaming tenant. Select "Create Token". A new token will be generated and available to copy.

tenantName

The name of your streaming tenant.

In the Astra Portal, navigate to your streaming tenant. In the "Details" area you will find the "Name".

namespace

Within your streaming tenant, this is the segmented area for certain topics.

In the Astra Portal, navigate to the "Namespace And Topics" tab of your streaming tenant to see a list of namespaces.

topicName

Expanding the above chosen namespace will list the topics within. This value is just the topic name (not the "Full Name").

Create a producer

Use the client to create a producer.

Add the following code to "index.py".

producer = client.create_producer(topic)

Once the code above creates a producer, add this code to "index.py" to actually send the message.

producer.send('Hello World'.encode('utf-8'))

Create a consumer

Just like the producer, use the Python client instance to create a consumer subscription to the same topic we sent a message to.

Add this code to "index.py".

consumer = client.subscribe(topic, 'my-subscription')

And now for the good stuff! Let’s iterate through messages and write their data.

Add this code to "index.py".

waitingForMsg = True
while waitingForMsg:
    try:
        msg = consumer.receive(2000)
        print("Received message '{}' id='{}'".format(msg.data(), msg.message_id()))

        # Acknowledging the message to remove from message backlog
        consumer.acknowledge(msg)

        waitingForMsg = False
    except:
        print("Still waiting for a message...");

    time.sleep(1)

Finally, add a little clean-up.

client.close()

Run the example

Here it comes: your greatest Python creation yet!

Return to the terminal where you created your Python project and run the following command.

python3 index.py

There will likely be a lot of logging in the output, but look closely and you should see the following message.

Received message 'Hello World' id='(422529,5,-1,0)'

You did it🎉!

Next steps

You’re one step closer to being a messaging ninja. Let’s continue the learning with these guides.

  • A guide to configuring Pulsar binaries with Astra Streaming

  • Astra Streaming Functions

  • Connectors

Java C#

General Inquiries: +1 (650) 389-6000 info@datastax.com

© DataStax | Privacy policy | Terms of use

DataStax, Titan, and TitanDB are registered trademarks of DataStax, Inc. and its subsidiaries in the United States and/or other countries.

Apache, Apache Cassandra, Cassandra, Apache Tomcat, Tomcat, Apache Lucene, Apache Solr, Apache Hadoop, Hadoop, Apache Pulsar, Pulsar, Apache Spark, Spark, Apache TinkerPop, TinkerPop, Apache Kafka and Kafka are either registered trademarks or trademarks of the Apache Software Foundation or its subsidiaries in Canada, the United States and/or other countries.

Kubernetes is the registered trademark of the Linux Foundation.

landing_page landingpage