Use the Java Pulsar client with Astra Streaming and Spring
You can produce and consume messages with the Java Pulsar client, Astra Streaming, and Spring.
Go to the examples repo for the complete source of this example.
Prerequisites
For this example, you need the following:
-
JRE 8
-
A Pulsar topic in Astra Streaming
-
A text editor or IDE
Create a Maven project in Spring Initializr
You can use Spring Initializr to quickly create a Java project with the required dependencies, including the Pulsar client dependency.
-
Go to Spring Initializr to initialize a new project.
-
Select Maven, Java 17, a non-SNAPSHOT version of Spring Boot, and the Apache Pulsar dependency:
-
Click Generate Project, download the zip file, and then extract it.
-
Navigate to
src/main/java
, and then open theDemoApplication.java
file. This file contains the main method that will run your application with the specified dependencies.DemoApplication.java
package com.example.demo; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.apache.pulsar.client.api.*; import java.io.IOException; import java.util.concurrent.TimeUnit; @SpringBootApplication public class DemoApplication { private static final String serviceUrl = "<REPLACE_WITH_SERVICE_URL>"; private static final String pulsarToken = "<REPLACE_WITH_PULSAR_TOKEN>"; private static final String tenantName = "<REPLACE_WITH_TENANT_NAME>"; private static final String namespace = "<REPLACE_WITH_NAMESPACE>"; private static final String topicName = "<REPLACE_WITH_TOPIC>"; private static final String topic = String.format("persistent://%s/%s/%s", tenantName,namespace,topicName); public static void main( String[] args ) throws IOException { PulsarClient client = PulsarClient.builder() .serviceUrl(serviceUrl) .authentication( AuthenticationFactory.token(pulsarToken) ) .build(); // end::build-client[] // tag::build-producer[] Producer<String> producer = client.newProducer(Schema.STRING) .topic(topic) .create(); // end::build-producer[] // tag::produce-message[] producer.send("Hello World"); // end::produce-message[] // tag::close-producer[] producer.close(); // end::close-producer[] // tag::build-consumer[] Consumer<String> consumer = client.newConsumer(Schema.STRING) .topic(topic) .subscriptionName("my-subscription") .subscribe(); // end::build-consumer[] // tag::consumer-loop[] boolean receivedMsg = false; do { // Block for up to 1 second for a message Message<String> msg = consumer.receive(1, TimeUnit.SECONDS); if(msg != null){ System.out.printf("Message received: %s", new String(msg.getData())); // Acknowledge the message to remove it from the message backlog consumer.acknowledge(msg); receivedMsg = true; } } while (!receivedMsg); // end::consumer-loop[] // tag::close-consumer[] consumer.close(); // end::close-consumer[] // tag::close-client[] client.close(); } }
-
Replace the following values in
DemoApplication.java
with values from your tenant’s Connect tab in the Astra Portal:DemoApplication.javaprivate static final String serviceUrl = "pulsar+ssl://pulsar-PROVIDER-REGION.streaming.datastax.com:PORT"; private static final String pulsarToken = "PULSAR_JTW_TOKEN"; private static final String tenantName = "TENANT_NAME"; private static final String namespace = "NAMESPACE_NAME"; private static final String topicName = "TOPIC_NAME";
You can also modify the values in the
application.properties
file in the/resources
directory, which is recommended for production applications. On your tenant’s Connect tab, you can download a properties file with the values your application needs to connect to your Astra Streaming instance.Spring application properties
spring: pulsar: administration: service-url: https://pulsar-aws-useast1.api.streaming.datastax.com tls-hostname-verification-enable: true auth-plugin-class-name: org.apache.pulsar.client.impl.auth.AuthenticationToken authentication: token: *** client: service-url: pulsar+ssl://pulsar-aws-useast1.streaming.datastax.com:6651 tls-hostname-verification-enable: true auth-plugin-class-name: org.apache.pulsar.client.impl.auth.AuthenticationToken authentication: token: ***
-
From your project’s root directory, compile the Java application with the Pulsar client dependency:
mvn clean compile
Result
[INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ [INFO] Total time: 0.819 s [INFO] Finished at: 2023-05-09T15:36:33-04:00 [INFO] ------------------------------------------------------------------------
-
Run the project to send a message to your Astra Streaming cluster:
mvn spring-boot:run
Result
Message received: Hello World
-
In the Astra Portal, go to your tenant to verify that the message appears in the specified namespace and topic.