Use the Python Pulsar client with Astra Streaming
You can use the Python Pulsar client with Astra Streaming to produce and consume messages.
For a complete source code example, see the Astra Streaming Examples repository.
Prerequisites
-
A supported Python version:
-
For Linux, versions 3.4 to 3.7 are supported
-
For macOS, version 3.7 is supported
-
-
An Apache Pulsar™ topic in Astra Streaming
-
A text editor or IDE
Create a project
-
Create a folder for a new Python project.
-
In your new directory, install the Pulsar client library with pip:
mkdir SimpleProducerConsumer && cd SimpleProducerConsumer touch index.py pip install pulsar-client==2.10
Write the script
-
Create an
index.pyfile containing the following code. This code creates a Pulsar client instance with the topic URL and token authentication.index.pyimport pulsar import time serviceUrl = "<REPLACE_WITH_SERVICE_URL>"; pulsarToken = "<REPLACE_WITH_PULSAR_TOKEN>"; tenantName = "<REPLACE_WITH_TENANT_NAME>"; namespace = "<REPLACE_WITH_NAMESPACE>"; topicName = "<REPLACE_WITH_TOPIC>"; topic = "persistent://{0}/{1}/{2}".format(tenantName, namespace, topicName) client = pulsar.Client(serviceUrl, authentication=pulsar.AuthenticationToken(pulsarToken))This script is intentionally incomplete. Your IDE might show errors until you complete the next steps.
-
Provide values for the following variables:
Parameter Definition Where to find the value serviceUrlThe URL to connect to the Pulsar cluster
-
In the Astra Portal header, click Applications, and then select Streaming.
-
Click the name of your tenant, and then click the Connect tab.
-
In the Tenant Details section, get the Broker Service URL.
pulsarTokenThe token for Pulsar cluster authentication
For information about creating Pulsar tokens, see Manage tokens.
tenantNameThe name of your streaming tenant
-
In the Astra Portal header, click Applications, and then select Streaming.
-
Get the tenant name from the Astra Streaming dashboard.
namespaceThe segmented area for certain topics in your streaming tenant
-
In the Astra Portal header, click Applications, and then select Streaming.
-
Click the name of your tenant, and then click the Namespace and Topics tab.
-
Choose the target namespace from the list of namespaces.
topicNameTopic name (not the full name)
-
In the Astra Portal header, click Applications, and then select Streaming.
-
Click the name of your tenant, and then click the Namespace and Topics tab.
-
Expand the target namespace in the list of namespaces to view the names of the topics within. Do not use the Full Name.
-
-
Use the client to create a producer:
index.pyproducer = client.create_producer(topic) -
Send a message:
index.pyproducer.send('Hello World'.encode('utf-8')) -
Use the Python client instance to create a consumer subscription to the same topic that you sent a message to:
index.pyconsumer = client.subscribe(topic, 'my-subscription') -
Iterate through messages and write their data:
index.pywaitingForMsg = True while waitingForMsg: try: msg = consumer.receive(2000) print("Received message '{}' id='{}'".format(msg.data(), msg.message_id())) # Acknowledging the message to remove from message backlog consumer.acknowledge(msg) waitingForMsg = False except: print("Still waiting for a message..."); time.sleep(1) -
Clean up:
client.close()
Run the script
In your Python project directory, run the script:
python3 index.py
The output includes a lot of logs.
Received message confirms that the script succeeded:
Received message 'Hello World' id='(422529,5,-1,0)'