Use the Java Pulsar client with Astra Streaming and Spring
You can produce and consume messages with the Java Pulsar client, Astra Streaming, and Spring.
For a complete source code example, see the Astra Streaming Examples repository.
Prerequisites
For this example, you need the following:
-
JRE 8
-
An Apache Pulsar™ topic in Astra Streaming
-
A text editor or IDE
Create a Maven project in Spring Initializr
You can use Spring Initializr to quickly create a Java project with the required dependencies, including the Pulsar client dependency.
-
Go to Spring Initializr to initialize a new project.
-
Select Maven, Java 17, a non-SNAPSHOT version of Spring Boot, and the Apache Pulsar dependency:
-
Click Generate Project, download the zip file, and then extract it.
-
Navigate to
src/main/java
, and then open theDemoApplication.java
file. This file contains the main method that will run your application with the specified dependencies.DemoApplication.java
package com.example.demo; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.apache.pulsar.client.api.*; import java.io.IOException; import java.util.concurrent.TimeUnit; @SpringBootApplication public class DemoApplication { private static final String serviceUrl = "<REPLACE_WITH_SERVICE_URL>"; private static final String pulsarToken = "<REPLACE_WITH_PULSAR_TOKEN>"; private static final String tenantName = "<REPLACE_WITH_TENANT_NAME>"; private static final String namespace = "<REPLACE_WITH_NAMESPACE>"; private static final String topicName = "<REPLACE_WITH_TOPIC>"; private static final String topic = String.format("persistent://%s/%s/%s", tenantName,namespace,topicName); public static void main( String[] args ) throws IOException { PulsarClient client = PulsarClient.builder() .serviceUrl(serviceUrl) .authentication( AuthenticationFactory.token(pulsarToken) ) .build(); Producer<String> producer = client.newProducer(Schema.STRING) .topic(topic) .create(); producer.send("Hello World"); producer.close(); Consumer<String> consumer = client.newConsumer(Schema.STRING) .topic(topic) .subscriptionName("my-subscription") .subscribe(); boolean receivedMsg = false; do { // Block for up to 1 second for a message Message<String> msg = consumer.receive(1, TimeUnit.SECONDS); if(msg != null){ System.out.printf("Message received: %s", new String(msg.getData())); // Acknowledge the message to remove it from the message backlog consumer.acknowledge(msg); receivedMsg = true; } } while (!receivedMsg); consumer.close(); client.close(); } }
-
Replace the following values in
DemoApplication.java
with values from your tenant’s Connect tab in the Astra Portal:DemoApplication.javaprivate static final String serviceUrl = "pulsar+ssl://pulsar-PROVIDER-REGION.streaming.datastax.com:PORT"; private static final String pulsarToken = "PULSAR_JTW_TOKEN"; private static final String tenantName = "TENANT_NAME"; private static final String namespace = "NAMESPACE_NAME"; private static final String topicName = "TOPIC_NAME";
You can also modify the values in the
application.properties
file in the/resources
directory, which is recommended for production applications. On your tenant’s Connect tab, you can download a properties file with the values your application needs to connect to your Astra Streaming instance.Spring application properties
spring: pulsar: administration: service-url: https://pulsar-aws-useast1.api.streaming.datastax.com tls-hostname-verification-enable: true auth-plugin-class-name: org.apache.pulsar.client.impl.auth.AuthenticationToken authentication: token: *** client: service-url: pulsar+ssl://pulsar-aws-useast1.streaming.datastax.com:6651 tls-hostname-verification-enable: true auth-plugin-class-name: org.apache.pulsar.client.impl.auth.AuthenticationToken authentication: token: ***
-
From your project’s root directory, compile the Java application with the Pulsar client dependency:
mvn clean compile
Result
[INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ [INFO] Total time: 0.819 s [INFO] Finished at: 2023-05-09T15:36:33-04:00 [INFO] ------------------------------------------------------------------------
-
Run the project to send a message to your Astra Streaming cluster:
mvn spring-boot:run
Result
Message received: Hello World
-
In the Astra Portal, click Streaming tenants.
-
Click the name of your tenant, and then verify that the message appears in the specified namespace and topic.