Producing and consuming messages using the Java pulsar client with Astra Streaming
Prerequisites
You will need the following prerequisites in place to complete this guide:
-
JRE 8 installed (install now)
-
A working Pulsar topic (get started here if you don’t have a topic)
-
A basic text editor or IDE
Visit our examples repo to see the complete source of this example. |
Create a project
Create a new Maven project and add the Pulsar client dependency in pom.xml.
mkdir SimpleProducerConsumer && cd SimpleProducerConsumer
mvn archetype:generate \
-DgroupId=org.example \
-DartifactId=SimpleProducerConsumer \
-DarchetypeArtifactId=maven-archetype-quickstart \
-DinteractiveMode=false
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client</artifactId>
<version>2.10.2</version>
</dependency>
For this example we will be creating a single artifact. Add the following build target in pom.xml.
<build>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<archive>
<manifest>
<mainClass>org.example.App</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
</plugins>
</build>
You may also need to specify the versions on pom.xml as well.
<properties>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
</properties>
Add the client
Let’s set up the main Java class and have some fun!
Open the "src/main/java/org/example/App.java" file in your favorite text editor or IDE. Clear the contents of the file and add the following to bring in needed dependencies, create a client instance, and configure it to use your Astra Streaming tenant.
package org.example;
import org.apache.pulsar.client.api.*;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
public class App
{
private static final String serviceUrl = "<REPLACE_WITH_SERVICE_URL>";
private static final String pulsarToken = "<REPLACE_WITH_PULSAR_TOKEN>";
private static final String tenantName = "<REPLACE_WITH_TENANT_NAME>";
private static final String namespace = "<REPLACE_WITH_NAMESPACE>";
private static final String topicName = "<REPLACE_WITH_TOPIC>";
private static final String topic = String.format("persistent://%s/%s/%s", tenantName,namespace,topicName);
public static void main( String[] args ) throws IOException
{
PulsarClient client = PulsarClient.builder()
.serviceUrl(serviceUrl)
.authentication(
AuthenticationFactory.token(pulsarToken)
)
.build();
This isn’t a complete class, so don’t be alarmed if your editor shows errors.
Notice there are a few variables waiting for replacement values. You can find those values here:
serviceUrl |
This is the URL for connecting to the Pulsar cluster. In the Astra Portal, navigate to the "Connect" tab of your streaming tenant. In the "Details" area you will find the "Broker Service URL". |
pulsarToken |
This is the token used for Pulsar cluster authentications. In the Astra Portal, navigate to the "Settings" tab of your streaming tenant. Select "Create Token". A new token will be generated and available to copy. |
tenantName |
The name of your streaming tenant. In the Astra Portal, navigate to your streaming tenant. In the "Details" area you will find the "Name". |
namespace |
Within your streaming tenant, this is the segmented area for certain topics. In the Astra Portal, navigate to the "Namespace And Topics" tab of your streaming tenant to see a list of namespaces. |
topicName |
Expanding the above chosen namespace will list the topics within. This value is just the topic name (not the "Full Name"). |
Create a producer
Use the client to create a producer.
The producer builds on the client configuration for directions about what topic to produce messages to.
Add this code to "App.java" to create a new producer.
Producer<String> producer = client.newProducer(Schema.STRING)
.topic(topic)
.create();
Add this code to "App.java" to asynchronously send a single message to the broker and wait for acknowledgment.
producer.send("Hello World");
Create a consumer
Create a new consumer instance in "App.java".
This code directs the consumer to watch a certain topic, names the subscription for watching topics, and begins the subscription.
Add the following code to "App.java".
Consumer<String> consumer = client.newConsumer(Schema.STRING)
.topic(topic)
.subscriptionName("my-subscription")
.subscribe();
With the consumer and subscription in place, we can receive the unacknowledged messages added by the producer earlier.
Add the following to "App.java".
boolean receivedMsg = false;
do {
// Block for up to 1 second for a message
Message<String> msg = consumer.receive(1, TimeUnit.SECONDS);
if(msg != null){
System.out.printf("Message received: %s", new String(msg.getData()));
// Acknowledge the message to remove it from the message backlog
consumer.acknowledge(msg);
receivedMsg = true;
}
} while (!receivedMsg);
Finally, add a little clean up and close out the Java class.
consumer.close();
client.close();
}
}
Run the example
Your Java class is ready for the big time! Let’s build the app and run.
mvn clean package assembly:single
java -jar target/SimpleProducerConsumer-1.0-SNAPSHOT-jar-with-dependencies.jar
You should see output similar to the following:
Message received: Hello World
Next steps
Woo-hoo🎉! You did it! You’re on your way to messaging glory. Let’s continue learning.