Use the Java Pulsar client with Astra Streaming

You can produce and consume messages with the Java Pulsar client and Astra Streaming.

Go to the examples repo for the complete source of this example.

Prerequisites

For this example, you need the following:

  • JRE 8

  • Maven

  • A Pulsar topic in Astra Streaming

  • A text editor or IDE

Create a project

  1. Create a new Maven project:

    mkdir SimpleProducerConsumer && cd SimpleProducerConsumer
    
    mvn archetype:generate \
        -DgroupId=org.example \
        -DartifactId=SimpleProducerConsumer \
        -DarchetypeArtifactId=maven-archetype-quickstart \
        -DinteractiveMode=false
  2. Add the Pulsar client dependency in pom.xml:

    pom.xml
    <dependency>
      <groupId>org.apache.pulsar</groupId>
      <artifactId>pulsar-client</artifactId>
      <version>2.10.2</version>
    </dependency>
  3. For this example, add the following build target in pom.xml. This example creates a single artifact.

    pom.xml
    <build>
      <plugins>
        <plugin>
          <artifactId>maven-assembly-plugin</artifactId>
          <configuration>
            <archive>
              <manifest>
                <mainClass>org.example.App</mainClass>
              </manifest>
            </archive>
            <descriptorRefs>
              <descriptorRef>jar-with-dependencies</descriptorRef>
            </descriptorRefs>
          </configuration>
        </plugin>
      </plugins>
    </build>
  4. If necessary, specify the compiler versions in pom.xml:

    pom.xml
    <properties>
      <maven.compiler.source>11</maven.compiler.source>
      <maven.compiler.target>11</maven.compiler.target>
    </properties>

Write the script

  1. In your project, navigate to src/main/java/org/example, and then open App.java.

  2. Remove any existing content from the file, and then add the following code that imports dependencies, creates a client instance, and configures the instance to use your Astra Streaming tenant:

    App.java
    package org.example;
    
    import org.apache.pulsar.client.api.*;
    import java.io.IOException;
    import java.util.concurrent.TimeUnit;
    
    public class App
    {
      private static final String serviceUrl = "<REPLACE_WITH_SERVICE_URL>";
      private static final String pulsarToken = "<REPLACE_WITH_PULSAR_TOKEN>";
      private static final String tenantName = "<REPLACE_WITH_TENANT_NAME>";
      private static final String namespace = "<REPLACE_WITH_NAMESPACE>";
      private static final String topicName = "<REPLACE_WITH_TOPIC>";
    
      private static final String topic = String.format("persistent://%s/%s/%s", tenantName,namespace,topicName);
    
      public static void main( String[] args ) throws IOException
      {
        PulsarClient client = PulsarClient.builder()
                                          .serviceUrl(serviceUrl)
                                          .authentication(
                                              AuthenticationFactory.token(pulsarToken)
                                          )
                                          .build();

    This class is incomplete. Your editor might show errors until you complete the next steps.

  3. Provide values for the following variables:

    Parameter Definition Where to find the value

    serviceUrl

    The URL to connect to the Pulsar cluster

    In the Astra Portal navigation menu, click Streaming, select your streaming tenant, and then click the Connect tab. In the Details section, get the Broker Service URL.

    pulsarToken

    The token for Pulsar cluster authentication

    For information about creating Pulsar tokens, see Manage tokens.

    tenantName

    The name of your streaming tenant

    In the Astra Portal navigation menu, click Streaming, select your streaming tenant. In the Details section, get the Name.

    namespace

    The segmented area for certain topics in your streaming tenant

    In the Astra Portal navigation menu, click Streaming, select your streaming tenant, and then click the Namespace and Topics tab. Choose the target namespace from the list of namespaces.

    topicName

    Topic name (not the full name)

    In the Astra Portal navigation menu, click Streaming, select your streaming tenant, and then click the Namespace and Topics tab. Expand the target namespace in the list of namespaces to view the names of the topics within. Do not use the Full Name.

  4. Use the client to create a producer. The producer builds on the client configuration for directions about what topic to produce messages to.

    App.java
        Producer<String> producer = client.newProducer(Schema.STRING)
                                  .topic(topic)
                                  .create();
  5. Asynchronously send a single message to the broker and wait for acknowledgment:

    App.java
        producer.send("Hello World");
  6. Create a new consumer instance. This code directs the consumer to watch a certain topic, identifies the subscription for watching topics, and begins the subscription.

    App.java
        Consumer<String> consumer = client.newConsumer(Schema.STRING)
            .topic(topic)
            .subscriptionName("my-subscription")
            .subscribe();
  7. Receive the messages added by the producer:

    App.java
        boolean receivedMsg = false;
    
        do {
          // Block for up to 1 second for a message
          Message<String> msg = consumer.receive(1, TimeUnit.SECONDS);
    
          if(msg != null){
            System.out.printf("Message received: %s", new String(msg.getData()));
    
            // Acknowledge the message to remove it from the message backlog
            consumer.acknowledge(msg);
    
            receivedMsg = true;
          }
    
        } while (!receivedMsg);
  8. Clean up and close the class:

    App.java
        consumer.close();
        client.close();
      }
    }

Run the script

Build and run the app:

mvn clean package assembly:single
java -jar target/SimpleProducerConsumer-1.0-SNAPSHOT-jar-with-dependencies.jar

Message received output indicates the script succeeded:

Message received: Hello World

Was this helpful?

Give Feedback

How can we improve the documentation?

© 2024 DataStax | Privacy policy | Terms of use

Apache, Apache Cassandra, Cassandra, Apache Tomcat, Tomcat, Apache Lucene, Apache Solr, Apache Hadoop, Hadoop, Apache Pulsar, Pulsar, Apache Spark, Spark, Apache TinkerPop, TinkerPop, Apache Kafka and Kafka are either registered trademarks or trademarks of the Apache Software Foundation or its subsidiaries in Canada, the United States and/or other countries. Kubernetes is the registered trademark of the Linux Foundation.

General Inquiries: +1 (650) 389-6000, info@datastax.com