Connect to Astra Streaming using Java
Astra Streaming is powered by Apache Pulsar. To connect to your service, use the open-source client APIs provided by the Apache Pulsar project.
Astra Streaming is running Pulsar version 2.8.1. You should use this API version or higher.
Choose the language that you would like to use:
Java
The Java client APIs are distributed through Maven Central. They require Java version 8. You can find the Javadocs here.
In the code examples, you’ll need to make the following substitutions:
-
<service-url>: The Pulsar Broker Service URL from the Astra Streaming Console Connect tab.
-
<streaming-token>: Your access token from the Astra Streaming Console Connect tab.
-
<topic-url>: A topic URL of your choice from the Astra Streaming Console Topics tab.
-
<subscription-name>: A subscription name of your choice, or one that you’ve configured in the Details tab of an existing topic.
If you are using Maven in your project, add this to the <properties>
section of your pom.xml
file:
<pulsar.version>2.7.2</pulsar.version>
And add the following to the <dependencies> section:
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client</artifactId>
<version>${pulsar.version}</version>
</dependency>
This sample creates a producer on a topic, sends a message, then closes the producer.
import org.apache.pulsar.client.api.*;
import java.io.IOException;
public class simpleProducer {
private static final String SERVICE_URL = "<service-url>";
public static void main(String[] args) throws IOException
{
// Create client object
PulsarClient client = PulsarClient.builder()
.serviceUrl(SERVICE_URL)
.authentication(
AuthenticationFactory.token("<streaming-token>")
)
.build();
// Create producer on a topic
Producer<byte[]> producer = client.newProducer()
.topic("<topic-url>")
.create();
// Send a message to the topic
producer.send("Hello World".getBytes());
//Close the producer
producer.close();
// Close the client
client.close();
}
}
This sample creates a consumer on a topic, consumes a message, then closes the consumer.
import org.apache.pulsar.client.api.*;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
public class simpleConsumer {
private static final String SERVICE_URL = "<service-url>";
public static void main(String[] args) throws IOException
{
// Create client object
PulsarClient client = PulsarClient.builder()
.serviceUrl(SERVICE_URL)
.authentication(
AuthenticationFactory.token("<streaming-token>")
)
.build();
// Create consumer on a topic with a subscription
Consumer consumer = client.newConsumer()
.topic("<topic-url>")
.subscriptionName("<subscription-name>")
.subscribe();
boolean receivedMsg = false;
// Loop until a message is received
do {
// Block for up to 1 second for a message
Message msg = consumer.receive(1, TimeUnit.SECONDS);
if(msg != null){
System.out.printf("Message received: %s", new String(msg.getData()));
// Acknowledge the message to remove it from the message backlog
consumer.acknowledge(msg);
receivedMsg = true;
}
} while (!receivedMsg);
//Close the consumer
consumer.close();
// Close the client
client.close();
}
}
This sample creates a reader on a topic and reads the earliest or latest message, then closes the reader.
import org.apache.pulsar.client.api.*;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
public class simpleReader {
private static final String SERVICE_URL = "<service-url>";
public static void main(String[] args) throws IOException {
// Create client object
PulsarClient client = PulsarClient.builder()
.serviceUrl(SERVICE_URL)
.authentication(
AuthenticationFactory.token("<streaming-token>")
)
.build();
// Create a reader on a topic starting at the earliest retained message
// No subscription is necessary. Depending on retention policy, the
// earliest message may be days old
Reader<byte[]> reader = client.newReader()
.topic("<topic-url>")
.startMessageId(MessageId.earliest)
.create();
boolean receivedMsg = false;
// Loop until a message is received
do {
// Block for up to 1 second for a message
Message msg = reader.readNext(1, TimeUnit.SECONDS);
if(msg != null){
System.out.printf("Message received: %s%n", new String(msg.getData()));
receivedMsg = true;
}
} while (!receivedMsg);
//Close the reader
reader.close();
// Close the client
client.close();
}
}