Java pulsar client on Astra Streaming with Spring
Prerequisites
You will need the following prerequisites in place to complete this guide:
-
JRE 8 installed (install now↗)
-
A working Pulsar topic (get started here if you don’t have a topic)
-
A basic text editor or IDE
Visit our examples repo↗ to see the complete source of this example. |
Create a Maven project in Spring Initializr
Spring Initializr is a great tool to quickly create a project with the dependencies you need. Let’s use it to create a project with the Pulsar client dependency.
-
Go to Spring Initializr↗ to initalize a new project.
-
Select Maven, Java 17, a non-SNAPSHOT version of Spring Boot, and the Apache Pulsar dependency.
-
Select Generate Project and download the zip file.
-
Unzip the file and open the project in your IDE. In src/main/java/DemoApplication, you’ll find the main method that will run your application, with the dependencies helpfully injected by Spring. This is where we’ll add our code.
Java application code
package com.example.demo; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.apache.pulsar.client.api.*; import java.io.IOException; import java.util.concurrent.TimeUnit; @SpringBootApplication public class DemoApplication { private static final String serviceUrl = "<REPLACE_WITH_SERVICE_URL>"; private static final String pulsarToken = "<REPLACE_WITH_PULSAR_TOKEN>"; private static final String tenantName = "<REPLACE_WITH_TENANT_NAME>"; private static final String namespace = "<REPLACE_WITH_NAMESPACE>"; private static final String topicName = "<REPLACE_WITH_TOPIC>"; private static final String topic = String.format("persistent://%s/%s/%s", tenantName,namespace,topicName); public static void main( String[] args ) throws IOException { PulsarClient client = PulsarClient.builder() .serviceUrl(serviceUrl) .authentication( AuthenticationFactory.token(pulsarToken) ) .build(); // end::build-client[] // tag::build-producer[] Producer<String> producer = client.newProducer(Schema.STRING) .topic(topic) .create(); // end::build-producer[] // tag::produce-message[] producer.send("Hello World"); // end::produce-message[] // tag::close-producer[] producer.close(); // end::close-producer[] // tag::build-consumer[] Consumer<String> consumer = client.newConsumer(Schema.STRING) .topic(topic) .subscriptionName("my-subscription") .subscribe(); // end::build-consumer[] // tag::consumer-loop[] boolean receivedMsg = false; do { // Block for up to 1 second for a message Message<String> msg = consumer.receive(1, TimeUnit.SECONDS); if(msg != null){ System.out.printf("Message received: %s", new String(msg.getData())); // Acknowledge the message to remove it from the message backlog consumer.acknowledge(msg); receivedMsg = true; } } while (!receivedMsg); // end::consumer-loop[] // tag::close-consumer[] consumer.close(); // end::close-consumer[] // tag::close-client[] client.close(); } }
-
Replace the values in your Java application with values from the Connect tab of your Astra Streaming instance.
private static final String serviceUrl = "pulsar+ssl://pulsar-aws-useast1.streaming.datastax.com:6651"; private static final String pulsarToken = "ey..."; private static final String tenantName = "homelab"; private static final String namespace = "default"; private static final String topicName = "clue-sensors";
You could instead modify the values in the "application.properties" file in the /resources folder, which is a better practice for production applications. The Connect tab in Astra Streaming has a link to download a properties file with the values your application needs to connect to your Astra Streaming instance.
Spring application properties
spring: pulsar: administration: service-url: https://pulsar-aws-useast1.api.streaming.datastax.com tls-hostname-verification-enable: true auth-plugin-class-name: org.apache.pulsar.client.impl.auth.AuthenticationToken authentication: token: *** client: service-url: pulsar+ssl://pulsar-aws-useast1.streaming.datastax.com:6651 tls-hostname-verification-enable: true auth-plugin-class-name: org.apache.pulsar.client.impl.auth.AuthenticationToken authentication: token: ***
-
Change directory to the project root and run the following command to compile the project:
mvn clean compile
[INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ [INFO] Total time: 0.819 s [INFO] Finished at: 2023-05-09T15:36:33-04:00 [INFO] ------------------------------------------------------------------------
-
Maven has compiled your Java application with the Pulsar client dependency. You can now run the project and send a message to your Astra Streaming cluster.
-
Remember to check in your Astra Streaming instance to see the message you sent to the newly created subscription "my-subscription".