Use the Java Pulsar client with Astra Streaming and Spring

You can produce and consume messages with the Java Pulsar client, Astra Streaming, and Spring.

Go to the examples repo for the complete source of this example.

Prerequisites

For this example, you need the following:

  • JRE 8

  • Maven

  • A Pulsar topic in Astra Streaming

  • A text editor or IDE

Create a Maven project in Spring Initializr

You can use Spring Initializr to quickly create a Java project with the required dependencies, including the Pulsar client dependency.

  1. Go to Spring Initializr to initialize a new project.

  2. Select Maven, Java 17, a non-SNAPSHOT version of Spring Boot, and the Apache Pulsar dependency:

    Spring Initializr
  3. Click Generate Project, download the zip file, and then extract it.

  4. Navigate to src/main/java, and then open the DemoApplication.java file. This file contains the main method that will run your application with the specified dependencies.

    DemoApplication.java
    package com.example.demo;
    
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.apache.pulsar.client.api.*;
    import java.io.IOException;
    import java.util.concurrent.TimeUnit;
    
    @SpringBootApplication
    public class DemoApplication
    {
    	private static final String serviceUrl = "<REPLACE_WITH_SERVICE_URL>";
    	private static final String pulsarToken = "<REPLACE_WITH_PULSAR_TOKEN>";
    	private static final String tenantName = "<REPLACE_WITH_TENANT_NAME>";
    	private static final String namespace = "<REPLACE_WITH_NAMESPACE>";
    	private static final String topicName = "<REPLACE_WITH_TOPIC>";
    
    	private static final String topic = String.format("persistent://%s/%s/%s", tenantName,namespace,topicName);
    
    	public static void main( String[] args ) throws IOException
    	{
    		PulsarClient client = PulsarClient.builder()
    				.serviceUrl(serviceUrl)
    				.authentication(
    						AuthenticationFactory.token(pulsarToken)
    				)
    				.build();
    // end::build-client[]
    
    // tag::build-producer[]
    		Producer<String> producer = client.newProducer(Schema.STRING)
    				.topic(topic)
    				.create();
    // end::build-producer[]
    
    // tag::produce-message[]
    		producer.send("Hello World");
    // end::produce-message[]
    
    // tag::close-producer[]
    		producer.close();
    // end::close-producer[]
    
    // tag::build-consumer[]
    		Consumer<String> consumer = client.newConsumer(Schema.STRING)
    				.topic(topic)
    				.subscriptionName("my-subscription")
    				.subscribe();
    // end::build-consumer[]
    
    // tag::consumer-loop[]
    		boolean receivedMsg = false;
    
    		do {
    			// Block for up to 1 second for a message
    			Message<String> msg = consumer.receive(1, TimeUnit.SECONDS);
    
    			if(msg != null){
    				System.out.printf("Message received: %s", new String(msg.getData()));
    
    				// Acknowledge the message to remove it from the message backlog
    				consumer.acknowledge(msg);
    
    				receivedMsg = true;
    			}
    
    		} while (!receivedMsg);
    // end::consumer-loop[]
    
    // tag::close-consumer[]
    		consumer.close();
    // end::close-consumer[]
    
    // tag::close-client[]
    		client.close();
    	}
    }
  5. Replace the following values in DemoApplication.java with values from your tenant’s Connect tab in the Astra Portal:

    DemoApplication.java
    	private static final String serviceUrl = "pulsar+ssl://pulsar-PROVIDER-REGION.streaming.datastax.com:PORT";
    	private static final String pulsarToken = "PULSAR_JTW_TOKEN";
    	private static final String tenantName = "TENANT_NAME";
    	private static final String namespace = "NAMESPACE_NAME";
    	private static final String topicName = "TOPIC_NAME";

    You can also modify the values in the application.properties file in the /resources directory, which is recommended for production applications. On your tenant’s Connect tab, you can download a properties file with the values your application needs to connect to your Astra Streaming instance.

    Spring application properties
    spring:
        pulsar:
    administration:
    service-url: https://pulsar-aws-useast1.api.streaming.datastax.com
    tls-hostname-verification-enable: true
    auth-plugin-class-name: org.apache.pulsar.client.impl.auth.AuthenticationToken
    authentication:
    token: ***
    client:
    service-url: pulsar+ssl://pulsar-aws-useast1.streaming.datastax.com:6651
    tls-hostname-verification-enable: true
    auth-plugin-class-name: org.apache.pulsar.client.impl.auth.AuthenticationToken
    authentication:
    token: ***
  6. From your project’s root directory, compile the Java application with the Pulsar client dependency:

    mvn clean compile
    Result
    [INFO] ------------------------------------------------------------------------
    [INFO] BUILD SUCCESS
    [INFO] ------------------------------------------------------------------------
    [INFO] Total time:  0.819 s
    [INFO] Finished at: 2023-05-09T15:36:33-04:00
    [INFO] ------------------------------------------------------------------------
  7. Run the project to send a message to your Astra Streaming cluster:

    mvn spring-boot:run
    Result
    Message received: Hello World
  8. In the Astra Portal, go to your tenant to verify that the message appears in the specified namespace and topic.

Was this helpful?

Give Feedback

How can we improve the documentation?

© 2024 DataStax | Privacy policy | Terms of use

Apache, Apache Cassandra, Cassandra, Apache Tomcat, Tomcat, Apache Lucene, Apache Solr, Apache Hadoop, Hadoop, Apache Pulsar, Pulsar, Apache Spark, Spark, Apache TinkerPop, TinkerPop, Apache Kafka and Kafka are either registered trademarks or trademarks of the Apache Software Foundation or its subsidiaries in Canada, the United States and/or other countries. Kubernetes is the registered trademark of the Linux Foundation.

General Inquiries: +1 (650) 389-6000, info@datastax.com