Use the Java Pulsar client with Astra Streaming and Spring
This example demonstrates how you can produce and consume messages with the Java Pulsar client, Astra Streaming, and Spring.
Prerequisites
For this example, you need the following:
-
JRE 8
-
An Apache Pulsar™ topic in Astra Streaming
-
A text editor or IDE
Create a Maven project in Spring Initializr
You can use Spring Initializr to quickly create a Java project with the required dependencies, including the Pulsar client dependency.
-
Go to Spring Initializr to initialize a new project.
-
Select Maven, Java 17, a non-
SNAPSHOTversion of Spring Boot, and the Apache Pulsar dependency:
-
Click Generate Project, download the zip file, and then extract it.
-
Navigate to
src/main/java, and then open theDemoApplication.javafile. This file contains the main method that will run your application with the specified dependencies.DemoApplication.javapackage com.example.demo; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.apache.pulsar.client.api.*; import java.io.IOException; import java.util.concurrent.TimeUnit; @SpringBootApplication public class DemoApplication { private static final String serviceUrl = "<REPLACE_WITH_SERVICE_URL>"; private static final String pulsarToken = "<REPLACE_WITH_PULSAR_TOKEN>"; private static final String tenantName = "<REPLACE_WITH_TENANT_NAME>"; private static final String namespace = "<REPLACE_WITH_NAMESPACE>"; private static final String topicName = "<REPLACE_WITH_TOPIC>"; private static final String topic = String.format("persistent://%s/%s/%s", tenantName,namespace,topicName); public static void main( String[] args ) throws IOException { PulsarClient client = PulsarClient.builder() .serviceUrl(serviceUrl) .authentication( AuthenticationFactory.token(pulsarToken) ) .build(); Producer<String> producer = client.newProducer(Schema.STRING) .topic(topic) .create(); producer.send("Hello World"); producer.close(); Consumer<String> consumer = client.newConsumer(Schema.STRING) .topic(topic) .subscriptionName("my-subscription") .subscribe(); boolean receivedMsg = false; do { // Block for up to 1 second for a message Message<String> msg = consumer.receive(1, TimeUnit.SECONDS); if(msg != null){ System.out.printf("Message received: %s", new String(msg.getData())); // Acknowledge the message to remove it from the message backlog consumer.acknowledge(msg); receivedMsg = true; } } while (!receivedMsg); consumer.close(); client.close(); } } -
Replace the following values in
DemoApplication.javawith values from your tenant’s Connect tab in the Astra Portal:DemoApplication.javaprivate static final String serviceUrl = "pulsar+ssl://pulsar-PROVIDER-REGION.streaming.datastax.com:PORT"; private static final String pulsarToken = "PULSAR_JTW_TOKEN"; private static final String tenantName = "TENANT_NAME"; private static final String namespace = "NAMESPACE_NAME"; private static final String topicName = "TOPIC_NAME";You can also modify the values in the
application.propertiesfile in the/resourcesdirectory, which is recommended for production applications. On your tenant’s Connect tab, you can download a properties file with the values your application needs to connect to your Astra Streaming instance. For example:spring: pulsar: administration: service-url: https://pulsar-aws-useast1.api.streaming.datastax.com tls-hostname-verification-enable: true auth-plugin-class-name: org.apache.pulsar.client.impl.auth.AuthenticationToken authentication: token: *** client: service-url: pulsar+ssl://pulsar-aws-useast1.streaming.datastax.com:6651 tls-hostname-verification-enable: true auth-plugin-class-name: org.apache.pulsar.client.impl.auth.AuthenticationToken authentication: token: *** -
From your project’s root directory, compile the Java application with the Pulsar client dependency:
mvn clean compileResult[INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ [INFO] Total time: 0.819 s [INFO] Finished at: 2023-05-09T15:36:33-04:00 [INFO] ------------------------------------------------------------------------ -
Run the project to send a message to your Astra Streaming cluster:
mvn spring-boot:runResultMessage received: Hello World -
In the Astra Portal header, click Applications, and then select Streaming.
-
Click the name of your tenant, and then verify that the message appears in the specified namespace and topic.