Astra Streaming code examples

Astra Streaming is powered by Apache Pulsar. To connect to your service, use the open-source client APIs provided by the Apache Pulsar project.

Astra Streaming is running Pulsar version 2.6.3. You should use this API version or higher.

Choose the language that you would like to use:

java icon

python icon

golang icon

node icon

Java

Python

Golang

Node.js

Java

The Java client APIs are distributed through Maven Central. They require Java version 8. You can find the Javadocs here.

In the code examples, you’ll need to make the following substitutions:

  • <service-url>: The Pulsar Broker Service URL from the Astra Streaming Console Connect tab.

  • <streaming-token>: Your access token from the Astra Streaming Console Connect tab.

  • <topic-url>: A topic URL of your choice from the Astra Streaming Console Topics tab.

  • <subscription-name>: A subscription name of your choice, or one that you’ve configured in the Details tab of an existing topic.

  • Maven/Gradle

  • Producer

  • Consumer

  • Reader

If you are using Maven in your project, add this to the <properties> section of your pom.xml file:

<pulsar.version>2.6.3</pulsar.version>

And add the following to the <dependencies> section:

<dependency>
  <groupId>org.apache.pulsar</groupId>
  <artifactId>pulsar-client</artifactId>
  <version>${pulsar.version}</version>
</dependency>

This sample creates a producer on a topic, sends a message, then closes the producer.

import org.apache.pulsar.client.api.*;
import java.io.IOException;

public class simpleProducer {

    private static final String SERVICE_URL = "<service-url>";

    public static void main(String[] args) throws IOException
    {
        // Create client object
        PulsarClient client = PulsarClient.builder()
                .serviceUrl(SERVICE_URL)
                .authentication(
                    AuthenticationFactory.token("<streaming-token>")
                )
                .build();

        // Create producer on a topic
        Producer<byte[]> producer = client.newProducer()
                .topic("<topic-url>")
                .create();

        // Send a message to the topic
        producer.send("Hello World".getBytes());

        //Close the producer
        producer.close();

        // Close the client
        client.close();
    }
}

This sample creates a consumer on a topic, consumes a message, then closes the consumer.

import org.apache.pulsar.client.api.*;
import java.io.IOException;
import java.util.concurrent.TimeUnit;

public class simpleConsumer {

        private static final String SERVICE_URL = "<service-url>";

        public static void main(String[] args) throws IOException
        {

            // Create client object
            PulsarClient client = PulsarClient.builder()
                    .serviceUrl(SERVICE_URL)
                    .authentication(
                        AuthenticationFactory.token("<streaming-token>")
                    )
                    .build();

            // Create consumer on a topic with a subscription
            Consumer consumer = client.newConsumer()
                    .topic("<topic-url>")
                    .subscriptionName("<subscription-name>")
                    .subscribe();

            boolean receivedMsg = false;
            // Loop until a message is received
            do {
                // Block for up to 1 second for a message
                Message msg = consumer.receive(1, TimeUnit.SECONDS);

                if(msg != null){
                    System.out.printf("Message received: %s", new String(msg.getData()));

                    // Acknowledge the message to remove it from the message backlog
                    consumer.acknowledge(msg);

                    receivedMsg = true;
                }

            } while (!receivedMsg);

            //Close the consumer
            consumer.close();

            // Close the client
            client.close();
        }
}

This sample creates a reader on a topic and reads the earliest or latest message, then closes the reader.

import org.apache.pulsar.client.api.*;
import java.io.IOException;
import java.util.concurrent.TimeUnit;

public class simpleReader {

    private static final String SERVICE_URL = "<service-url>";

    public static void main(String[] args) throws IOException {

        // Create client object
        PulsarClient client = PulsarClient.builder()
                .serviceUrl(SERVICE_URL)
                .authentication(
                    AuthenticationFactory.token("<streaming-token>")
                )
                .build();

        // Create a reader on a topic starting at the earliest retained message
        // No subscription is necessary. Depending on retention policy, the
        // earliest message may be days old
        Reader<byte[]> reader = client.newReader()
                .topic("<topic-url>")
                .startMessageId(MessageId.earliest)
                .create();

        boolean receivedMsg = false;
        // Loop until a message is received
        do {
            // Block for up to 1 second for a message
            Message msg = reader.readNext(1, TimeUnit.SECONDS);

            if(msg != null){
                System.out.printf("Message received: %s%n",  new String(msg.getData()));

                receivedMsg = true;
            }

        } while (!receivedMsg);

        //Close the reader
        reader.close();

        // Close the client
        client.close();

    }
}