Python code examples

Astra Streaming is currently in private beta. Join the Astra Streaming Waitlist.

Astra Streaming is powered by Apache Pulsar. To connect to your service, use the open-source client APIs provided by the Apache Pulsar project.

The Python client APIs are distributed through Python Package Index (PyPi). On Linux, Python versions 2.7, and versions 3.4 to 3.7 are supported. On MacOS versions 2.7 and 3.7 are supported. You can find the documentation for the Python client here.

Astra Streaming is running Pulsar version 2.6.3. You should use this API version or higher.

In the code examples, you’ll need to make the following substitutions:

  • <service-url>: The Pulsar service URL from the Astra Streaming Console Connect tab.

  • <streaming-token>: Your access token from the Astra Streaming Console Connect tab.

  • <topic-url>: A topic URL of your choice from the Astra Streaming Console Topics tab.

  • <subscription-name>: A subscription name of your choice, or one that you’ve configured in the Details tab of an existing topic.

  • pip

  • Producer

  • Consumer

  • Reader

To do a native install using pip:

pip install pulsar-client==2.6.3

This sample creates a producer on a topic, sends 10 messages, then closes the producer.

import pulsar

service_url = '<service-url>'

# Use default CA certs for your environment
# RHEL/CentOS:
trust_certs='/etc/ssl/certs/ca-bundle.crt'
# Debian/Ubuntu:
# trust_certs='/etc/ssl/certs/ca-certificates.crt'
# OSX:
# Export the default certificates to a file, then use that file:
#    security find-certificate -a -p /System/Library/Keychains/SystemCACertificates.keychain > ./ca-certificates.crt
# trust_certs='./ca-certificates.crt'

token='<streaming-token>'

client = pulsar.Client(service_url,
                        authentication=pulsar.AuthenticationToken(token),
                        tls_trust_certs_file_path=trust_certs)


producer = client.create_producer('<topic-url>')

for i in range(10):
    producer.send(('Hello World! %d' % i).encode('utf-8'))

client.close()

This sample creates a consumer on a topic, waits for a message, acknowledges it, then closes the consumer.

import pulsar,time

service_url = '<service-url>'

# Use default CA certs for your environment
# RHEL/CentOS:
trust_certs='/etc/ssl/certs/ca-bundle.crt'
# Debian/Ubuntu:
# trust_certs='/etc/ssl/certs/ca-certificates.crt'
# OSX:
# Export the default certificates to a file, then use that file:
#    security find-certificate -a -p /System/Library/Keychains/SystemCACertificates.keychain > ./ca-certificates.crt
# trust_certs='./ca-certificates.crt'

token='<streaming-token>'

client = pulsar.Client(service_url,
                        authentication=pulsar.AuthenticationToken(token),
                        tls_trust_certs_file_path=trust_certs)

consumer = client.subscribe('<topic-url>', '<subscription-name>')

waitingForMsg = True
while waitingForMsg:
    try:
        msg = consumer.receive(2000)
        print("Received message '{}' id='{}'".format(msg.data(), msg.message_id()))

        # Acknowledging the message to remove from message backlog
        consumer.acknowledge(msg)

        waitingForMsg = False
    except:
        print("Still waiting for a message...");

    time.sleep(1)

client.close()

This sample creates a reader on a topic and reads the earliest or latest message from the topic, then closes the reader.

import pulsar,time

service_url = '<service-url>'

# Use default CA certs for your environment
# RHEL/CentOS:
trust_certs='/etc/ssl/certs/ca-bundle.crt'
# Debian/Ubuntu:
# trust_certs='/etc/ssl/certs/ca-certificates.crt'
# OSX:
# Export the default certificates to a file, then use that file:
#    security find-certificate -a -p /System/Library/Keychains/SystemCACertificates.keychain > ./ca-certificates.crt
# trust_certs='./ca-certificates.crt'

token='<streaming-token>'

client = pulsar.Client(service_url,
                        authentication=pulsar.AuthenticationToken(token),
                        tls_trust_certs_file_path=trust_certs)

consumer = client.create_reader('<topic-url>', pulsar.MessageId.earliest)

waitingForMsg = True
while waitingForMsg:
    try:
        msg = consumer.read_next(2000)
        print("Received message '{}' id='{}'".format(msg.data(), msg.message_id()))
        waitingForMsg = False
    except:
        print("Still waiting for a message...");

    time.sleep(1)

client.close()