Producing and consuming messages using the Java pulsar client with Astra Streaming and Spring

Prerequisites

You will need the following prerequisites in place to complete this guide:

  • JRE 8 installed (install now)

  • (Maven) or (Gradle) installed

  • A working Pulsar topic (get started here if you don’t have a topic)

  • A basic text editor or IDE

Visit our examples repo to see the complete source of this example.

Create a Maven project in Spring Initializr

Spring Initializr is a great tool to quickly create a project with the dependencies you need. Let’s use it to create a project with the Pulsar client dependency.

  1. Go to Spring Initializr to initalize a new project.

  2. Select Maven, Java 17, a non-SNAPSHOT version of Spring Boot, and the Apache Pulsar dependency.

    Spring Initializr
  3. Select Generate Project and download the zip file.

  4. Unzip the file and open the project in your IDE. In src/main/java/DemoApplication, you’ll find the main method that will run your application, with the dependencies helpfully injected by Spring. This is where we’ll add our code.

    Java application code
    package com.example.demo;
    
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.apache.pulsar.client.api.*;
    import java.io.IOException;
    import java.util.concurrent.TimeUnit;
    
    @SpringBootApplication
    public class DemoApplication
    {
    	private static final String serviceUrl = "<REPLACE_WITH_SERVICE_URL>";
    	private static final String pulsarToken = "<REPLACE_WITH_PULSAR_TOKEN>";
    	private static final String tenantName = "<REPLACE_WITH_TENANT_NAME>";
    	private static final String namespace = "<REPLACE_WITH_NAMESPACE>";
    	private static final String topicName = "<REPLACE_WITH_TOPIC>";
    
    	private static final String topic = String.format("persistent://%s/%s/%s", tenantName,namespace,topicName);
    
    	public static void main( String[] args ) throws IOException
    	{
    		PulsarClient client = PulsarClient.builder()
    				.serviceUrl(serviceUrl)
    				.authentication(
    						AuthenticationFactory.token(pulsarToken)
    				)
    				.build();
    // end::build-client[]
    
    // tag::build-producer[]
    		Producer<String> producer = client.newProducer(Schema.STRING)
    				.topic(topic)
    				.create();
    // end::build-producer[]
    
    // tag::produce-message[]
    		producer.send("Hello World");
    // end::produce-message[]
    
    // tag::close-producer[]
    		producer.close();
    // end::close-producer[]
    
    // tag::build-consumer[]
    		Consumer<String> consumer = client.newConsumer(Schema.STRING)
    				.topic(topic)
    				.subscriptionName("my-subscription")
    				.subscribe();
    // end::build-consumer[]
    
    // tag::consumer-loop[]
    		boolean receivedMsg = false;
    
    		do {
    			// Block for up to 1 second for a message
    			Message<String> msg = consumer.receive(1, TimeUnit.SECONDS);
    
    			if(msg != null){
    				System.out.printf("Message received: %s", new String(msg.getData()));
    
    				// Acknowledge the message to remove it from the message backlog
    				consumer.acknowledge(msg);
    
    				receivedMsg = true;
    			}
    
    		} while (!receivedMsg);
    // end::consumer-loop[]
    
    // tag::close-consumer[]
    		consumer.close();
    // end::close-consumer[]
    
    // tag::close-client[]
    		client.close();
    	}
    }
  5. Replace the values in your Java application with values from the Connect tab of your Astra Streaming instance.

    	private static final String serviceUrl = "pulsar+ssl://pulsar-aws-useast1.streaming.datastax.com:6651";
    	private static final String pulsarToken = "ey...";
    	private static final String tenantName = "homelab";
    	private static final String namespace = "default";
    	private static final String topicName = "clue-sensors";

    You could instead modify the values in the "application.properties" file in the /resources folder, which is a better practice for production applications. The Connect tab in Astra Streaming has a link to download a properties file with the values your application needs to connect to your Astra Streaming instance.

    Spring application properties
    spring:
        pulsar:
    administration:
    service-url: https://pulsar-aws-useast1.api.streaming.datastax.com
    tls-hostname-verification-enable: true
    auth-plugin-class-name: org.apache.pulsar.client.impl.auth.AuthenticationToken
    authentication:
    token: ***
    client:
    service-url: pulsar+ssl://pulsar-aws-useast1.streaming.datastax.com:6651
    tls-hostname-verification-enable: true
    auth-plugin-class-name: org.apache.pulsar.client.impl.auth.AuthenticationToken
    authentication:
    token: ***
  6. Change directory to the project root and run the following command to compile the project:

    • Maven

    • Result

    mvn clean compile
    [INFO] ------------------------------------------------------------------------
    [INFO] BUILD SUCCESS
    [INFO] ------------------------------------------------------------------------
    [INFO] Total time:  0.819 s
    [INFO] Finished at: 2023-05-09T15:36:33-04:00
    [INFO] ------------------------------------------------------------------------
  7. Maven has compiled your Java application with the Pulsar client dependency. You can now run the project and send a message to your Astra Streaming cluster.

    • Maven

    • Result

    mvn spring-boot:run
    Message received: Hello World
  8. Remember to check in your Astra Streaming instance to see the message you sent to the newly created subscription "my-subscription".

Was this helpful?

Give Feedback

How can we improve the documentation?

© 2024 DataStax | Privacy policy | Terms of use

Apache, Apache Cassandra, Cassandra, Apache Tomcat, Tomcat, Apache Lucene, Apache Solr, Apache Hadoop, Hadoop, Apache Pulsar, Pulsar, Apache Spark, Spark, Apache TinkerPop, TinkerPop, Apache Kafka and Kafka are either registered trademarks or trademarks of the Apache Software Foundation or its subsidiaries in Canada, the United States and/or other countries. Kubernetes is the registered trademark of the Linux Foundation.

General Inquiries: +1 (650) 389-6000, info@datastax.com