Producing and consuming messages using the Java pulsar client with Astra Streaming

Prerequisites

You will need the following prerequisites in place to complete this guide:

  • JRE 8 installed (install now)

  • (Maven) or (Gradle) installed

  • A working Pulsar topic (get started here if you don’t have a topic)

  • A basic text editor or IDE

Visit our examples repo to see the complete source of this example.

Create a project

Create a new Maven project and add the Pulsar client dependency in pom.xml.

mkdir SimpleProducerConsumer && cd SimpleProducerConsumer

mvn archetype:generate \
    -DgroupId=org.example \
    -DartifactId=SimpleProducerConsumer \
    -DarchetypeArtifactId=maven-archetype-quickstart \
    -DinteractiveMode=false
<dependency>
    <groupId>org.apache.pulsar</groupId>
    <artifactId>pulsar-client</artifactId>
    <version>2.10.2</version>
</dependency>

For this example we will be creating a single artifact. Add the following build target in pom.xml.

<build>
    <plugins>
        <plugin>
            <artifactId>maven-assembly-plugin</artifactId>
            <configuration>
                <archive>
                    <manifest>
                        <mainClass>org.example.App</mainClass>
                    </manifest>
                </archive>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
            </configuration>
        </plugin>
    </plugins>
</build>

You may also need to specify the versions on pom.xml as well.

<properties>
    <maven.compiler.source>11</maven.compiler.source>
    <maven.compiler.target>11</maven.compiler.target>
</properties>

Add the client

Let’s set up the main Java class and have some fun!

Open the "src/main/java/org/example/App.java" file in your favorite text editor or IDE. Clear the contents of the file and add the following to bring in needed dependencies, create a client instance, and configure it to use your Astra Streaming tenant.

package org.example;

import org.apache.pulsar.client.api.*;
import java.io.IOException;
import java.util.concurrent.TimeUnit;

public class App
{
  private static final String serviceUrl = "<REPLACE_WITH_SERVICE_URL>";
  private static final String pulsarToken = "<REPLACE_WITH_PULSAR_TOKEN>";
  private static final String tenantName = "<REPLACE_WITH_TENANT_NAME>";
  private static final String namespace = "<REPLACE_WITH_NAMESPACE>";
  private static final String topicName = "<REPLACE_WITH_TOPIC>";

  private static final String topic = String.format("persistent://%s/%s/%s", tenantName,namespace,topicName);

  public static void main( String[] args ) throws IOException
  {
    PulsarClient client = PulsarClient.builder()
                                      .serviceUrl(serviceUrl)
                                      .authentication(
                                          AuthenticationFactory.token(pulsarToken)
                                      )
                                      .build();

This isn’t a complete class, so don’t be alarmed if your editor shows errors.

Notice there are a few variables waiting for replacement values. You can find those values here:

serviceUrl

This is the URL for connecting to the Pulsar cluster.

In the Astra Portal, navigate to the "Connect" tab of your streaming tenant. In the "Details" area you will find the "Broker Service URL".

pulsarToken

This is the token used for Pulsar cluster authentications.

In the Astra Portal, navigate to the "Settings" tab of your streaming tenant. Select "Create Token". A new token will be generated and available to copy.

tenantName

The name of your streaming tenant.

In the Astra Portal, navigate to your streaming tenant. In the "Details" area you will find the "Name".

namespace

Within your streaming tenant, this is the segmented area for certain topics.

In the Astra Portal, navigate to the "Namespace And Topics" tab of your streaming tenant to see a list of namespaces.

topicName

Expanding the above chosen namespace will list the topics within. This value is just the topic name (not the "Full Name").

Create a producer

Use the client to create a producer.

The producer builds on the client configuration for directions about what topic to produce messages to.

Add this code to "App.java" to create a new producer.

    Producer<String> producer = client.newProducer(Schema.STRING)
                              .topic(topic)
                              .create();

Add this code to "App.java" to asynchronously send a single message to the broker and wait for acknowledgment.

    producer.send("Hello World");

Create a consumer

Create a new consumer instance in "App.java".

This code directs the consumer to watch a certain topic, names the subscription for watching topics, and begins the subscription.

Add the following code to "App.java".

    Consumer<String> consumer = client.newConsumer(Schema.STRING)
        .topic(topic)
        .subscriptionName("my-subscription")
        .subscribe();

With the consumer and subscription in place, we can receive the unacknowledged messages added by the producer earlier.

Add the following to "App.java".

    boolean receivedMsg = false;

    do {
      // Block for up to 1 second for a message
      Message<String> msg = consumer.receive(1, TimeUnit.SECONDS);

      if(msg != null){
        System.out.printf("Message received: %s", new String(msg.getData()));

        // Acknowledge the message to remove it from the message backlog
        consumer.acknowledge(msg);

        receivedMsg = true;
      }

    } while (!receivedMsg);

Finally, add a little clean up and close out the Java class.

    consumer.close();
    client.close();
  }
}

Run the example

Your Java class is ready for the big time! Let’s build the app and run.

mvn clean package assembly:single
java -jar target/SimpleProducerConsumer-1.0-SNAPSHOT-jar-with-dependencies.jar

You should see output similar to the following:

Message received: Hello World

Next steps

Woo-hoo🎉! You did it! You’re on your way to messaging glory. Let’s continue learning.

Was this helpful?

Give Feedback

How can we improve the documentation?

© 2024 DataStax | Privacy policy | Terms of use

Apache, Apache Cassandra, Cassandra, Apache Tomcat, Tomcat, Apache Lucene, Apache Solr, Apache Hadoop, Hadoop, Apache Pulsar, Pulsar, Apache Spark, Spark, Apache TinkerPop, TinkerPop, Apache Kafka and Kafka are either registered trademarks or trademarks of the Apache Software Foundation or its subsidiaries in Canada, the United States and/or other countries. Kubernetes is the registered trademark of the Linux Foundation.

General Inquiries: +1 (650) 389-6000, info@datastax.com