Use the Java Pulsar client with Astra Streaming
You can produce and consume messages with the Java Pulsar client and Astra Streaming.
For a complete source code example, see the Astra Streaming Examples repository.
Prerequisites
For this example, you need the following:
-
JRE 8
-
An Apache Pulsar™ topic in Astra Streaming
-
A text editor or IDE
Create a project
-
Create a new Maven project:
mkdir SimpleProducerConsumer && cd SimpleProducerConsumer mvn archetype:generate \ -DgroupId=org.example \ -DartifactId=SimpleProducerConsumer \ -DarchetypeArtifactId=maven-archetype-quickstart \ -DinteractiveMode=false -
Add the Pulsar client dependency in
pom.xml:pom.xml<dependency> <groupId>org.apache.pulsar</groupId> <artifactId>pulsar-client</artifactId> <version>2.10.2</version> </dependency> -
For this example, add the following build target in
pom.xml. This example creates a single artifact.pom.xml<build> <plugins> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <archive> <manifest> <mainClass>org.example.App</mainClass> </manifest> </archive> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> </plugin> </plugins> </build> -
If necessary, specify the compiler versions in
pom.xml:pom.xml<properties> <maven.compiler.source>11</maven.compiler.source> <maven.compiler.target>11</maven.compiler.target> </properties>
Write the script
-
In your project, navigate to
src/main/java/org/example, and then openApp.java. -
Remove any existing content from the file, and then add the following code that imports dependencies, creates a client instance, and configures the instance to use your Astra Streaming tenant:
App.javapackage org.example; import org.apache.pulsar.client.api.*; import java.io.IOException; import java.util.concurrent.TimeUnit; public class App { private static final String serviceUrl = "<REPLACE_WITH_SERVICE_URL>"; private static final String pulsarToken = "<REPLACE_WITH_PULSAR_TOKEN>"; private static final String tenantName = "<REPLACE_WITH_TENANT_NAME>"; private static final String namespace = "<REPLACE_WITH_NAMESPACE>"; private static final String topicName = "<REPLACE_WITH_TOPIC>"; private static final String topic = String.format("persistent://%s/%s/%s", tenantName,namespace,topicName); public static void main( String[] args ) throws IOException { PulsarClient client = PulsarClient.builder() .serviceUrl(serviceUrl) .authentication( AuthenticationFactory.token(pulsarToken) ) .build();This class is incomplete. Your editor might show errors until you complete the next steps.
-
Provide values for the following variables:
Parameter Definition Where to find the value serviceUrlThe URL to connect to the Pulsar cluster
-
In the Astra Portal header, click Applications, and then select Streaming.
-
Click the name of your tenant, and then click the Connect tab.
-
In the Tenant Details section, get the Broker Service URL.
pulsarTokenThe token for Pulsar cluster authentication
For information about creating Pulsar tokens, see Manage tokens.
tenantNameThe name of your streaming tenant
-
In the Astra Portal header, click Applications, and then select Streaming.
-
Get the tenant name from the Astra Streaming dashboard.
namespaceThe segmented area for certain topics in your streaming tenant
-
In the Astra Portal header, click Applications, and then select Streaming.
-
Click the name of your tenant, and then click the Namespace and Topics tab.
-
Choose the target namespace from the list of namespaces.
topicNameTopic name (not the full name)
-
In the Astra Portal header, click Applications, and then select Streaming.
-
Click the name of your tenant, and then click the Namespace and Topics tab.
-
Expand the target namespace in the list of namespaces to view the names of the topics within. Do not use the Full Name.
-
-
Use the client to create a producer. The producer builds on the client configuration for directions about what topic to produce messages to.
App.javaProducer<String> producer = client.newProducer(Schema.STRING) .topic(topic) .create(); -
Asynchronously send a single message to the broker and wait for acknowledgment, and close the producer:
App.javaproducer.send("Hello World"); producer.close(); -
Create a new consumer instance. This code directs the consumer to watch a certain topic, identifies the subscription for watching topics, and begins the subscription.
App.javaConsumer<String> consumer = client.newConsumer(Schema.STRING) .topic(topic) .subscriptionName("my-subscription") .subscribe(); -
Receive the messages added by the producer:
App.javaboolean receivedMsg = false; do { // Block for up to 1 second for a message Message<String> msg = consumer.receive(1, TimeUnit.SECONDS); if(msg != null){ System.out.printf("Message received: %s", new String(msg.getData())); // Acknowledge the message to remove it from the message backlog consumer.acknowledge(msg); receivedMsg = true; } } while (!receivedMsg); -
Clean up and close the class:
App.javaconsumer.close(); client.close(); } }
Run the script
Build and run the app:
mvn clean package assembly:single
java -jar target/SimpleProducerConsumer-1.0-SNAPSHOT-jar-with-dependencies.jar
Message received output indicates the script succeeded:
Message received: Hello World