Use the Java Pulsar client with Astra Streaming
You can produce and consume messages with the Java Pulsar client and Astra Streaming.
Go to the examples repo for the complete source of this example.
Prerequisites
For this example, you need the following:
-
JRE 8
-
A Pulsar topic in Astra Streaming
-
A text editor or IDE
Create a project
-
Create a new Maven project:
mkdir SimpleProducerConsumer && cd SimpleProducerConsumer mvn archetype:generate \ -DgroupId=org.example \ -DartifactId=SimpleProducerConsumer \ -DarchetypeArtifactId=maven-archetype-quickstart \ -DinteractiveMode=false
-
Add the Pulsar client dependency in
pom.xml
:pom.xml<dependency> <groupId>org.apache.pulsar</groupId> <artifactId>pulsar-client</artifactId> <version>2.10.2</version> </dependency>
-
For this example, add the following build target in
pom.xml
. This example creates a single artifact.pom.xml<build> <plugins> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <archive> <manifest> <mainClass>org.example.App</mainClass> </manifest> </archive> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> </plugin> </plugins> </build>
-
If necessary, specify the compiler versions in
pom.xml
:pom.xml<properties> <maven.compiler.source>11</maven.compiler.source> <maven.compiler.target>11</maven.compiler.target> </properties>
Write the script
-
In your project, navigate to
src/main/java/org/example
, and then openApp.java
. -
Remove any existing content from the file, and then add the following code that imports dependencies, creates a client instance, and configures the instance to use your Astra Streaming tenant:
App.javapackage org.example; import org.apache.pulsar.client.api.*; import java.io.IOException; import java.util.concurrent.TimeUnit; public class App { private static final String serviceUrl = "<REPLACE_WITH_SERVICE_URL>"; private static final String pulsarToken = "<REPLACE_WITH_PULSAR_TOKEN>"; private static final String tenantName = "<REPLACE_WITH_TENANT_NAME>"; private static final String namespace = "<REPLACE_WITH_NAMESPACE>"; private static final String topicName = "<REPLACE_WITH_TOPIC>"; private static final String topic = String.format("persistent://%s/%s/%s", tenantName,namespace,topicName); public static void main( String[] args ) throws IOException { PulsarClient client = PulsarClient.builder() .serviceUrl(serviceUrl) .authentication( AuthenticationFactory.token(pulsarToken) ) .build();
This class is incomplete. Your editor might show errors until you complete the next steps.
-
Provide values for the following variables:
Parameter Definition Where to find the value serviceUrl
The URL to connect to the Pulsar cluster
In the Astra Portal navigation menu, click Streaming, select your streaming tenant, and then click the Connect tab. In the Details section, get the Broker Service URL.
pulsarToken
The token for Pulsar cluster authentication
For information about creating Pulsar tokens, see Manage tokens.
tenantName
The name of your streaming tenant
In the Astra Portal navigation menu, click Streaming, select your streaming tenant. In the Details section, get the Name.
namespace
The segmented area for certain topics in your streaming tenant
In the Astra Portal navigation menu, click Streaming, select your streaming tenant, and then click the Namespace and Topics tab. Choose the target namespace from the list of namespaces.
topicName
Topic name (not the full name)
In the Astra Portal navigation menu, click Streaming, select your streaming tenant, and then click the Namespace and Topics tab. Expand the target namespace in the list of namespaces to view the names of the topics within. Do not use the Full Name.
-
Use the client to create a producer. The producer builds on the client configuration for directions about what topic to produce messages to.
App.javaProducer<String> producer = client.newProducer(Schema.STRING) .topic(topic) .create();
-
Asynchronously send a single message to the broker and wait for acknowledgment:
App.javaproducer.send("Hello World");
-
Create a new consumer instance. This code directs the consumer to watch a certain topic, identifies the subscription for watching topics, and begins the subscription.
App.javaConsumer<String> consumer = client.newConsumer(Schema.STRING) .topic(topic) .subscriptionName("my-subscription") .subscribe();
-
Receive the messages added by the producer:
App.javaboolean receivedMsg = false; do { // Block for up to 1 second for a message Message<String> msg = consumer.receive(1, TimeUnit.SECONDS); if(msg != null){ System.out.printf("Message received: %s", new String(msg.getData())); // Acknowledge the message to remove it from the message backlog consumer.acknowledge(msg); receivedMsg = true; } } while (!receivedMsg);
-
Clean up and close the class:
App.javaconsumer.close(); client.close(); } }
Run the script
Build and run the app:
mvn clean package assembly:single
java -jar target/SimpleProducerConsumer-1.0-SNAPSHOT-jar-with-dependencies.jar
Message received
output indicates the script succeeded:
Message received: Hello World