Starlight for JMS quickstart
The Starlight for JMS extension is a highly compliant JMS implementation designed to run on a modern streaming platform.
This quickstart explains how to prepare an Apache Pulsar™ streaming instance, and then use Starlight for JMS to produce and consume messages with a Java JMS client.
Prepare your Pulsar cluster
-
Astra Streaming
-
Luna Streaming
-
Self-managed
To use Starlight for JMS with an Astra Streaming Pulsar cluster, you need the following:
-
An active Astra Streaming Pulsar cluster with a tenant, namespace, and topic that you want to use with Starlight for JMS. If you don’t have these structures configured in Astra Streaming, see the Astra Streaming quickstart.
-
The connection details for your tenant, which are found on the tenant’s Connect tab under Tenant Details:
-
Name: The tenant name
-
Broker Service URL: The cluster’s broker service URL in the format
pulsar+ssl://CLUSTER.streaming.datastax.com:6651 -
Web Service URL: The cluster’s web service URL in the format
https://CLUSTER.streaming.datastax.com -
Token: A Pulsar token that provides access to the tenant
-
To use Starlight for JMS with a Luna Streaming Pulsar cluster, you need the following:
-
A Luna Streaming Pulsar cluster.
If you don’t have a Luna Streaming cluster, see the Luna Streaming installation guide.
-
Access to the cluster’s admin port 8080 and the binary port 6650.
You must modify the initialization properties in the quickstart code if your cluster’s Pulsar endpoints aren’t accessible on
http://localhost:8080andhttp://localhost:6650. Alternatively, enable port forwarding withkubectlto make the endpoints accessible on these URLs. For more information, see Install Luna Streaming locally.
To use Starlight for JMS with a self-managed Pulsar cluster, you need a Pulsar cluster with access to the Pulsar admin port 8080 and binary port 6650.
This quickstart uses a local Pulsar Docker instance. You can also use a local Apache Pulsar binary installation or a remote Pulsar cluster.
|
You must modify the initialization properties in the quickstart code if your cluster’s Pulsar endpoints aren’t accessible on |
To follow along with this quickstart as written, install a Pulsar instance using Docker:
-
Install Docker on your local machine.
-
Open a terminal, and then create a Pulsar Docker instance with ports 8080 and 6650 exposed:
docker run --name pulsar-jms-runner -p 8080:8080 -p 6650:6650 apachepulsar/pulsar:latest /pulsar/bin/pulsar standaloneResult
Starting the Docker container produces many status messages and output similar to the following:
14:57:50.996 [pulsar-web-68-11] INFO org.eclipse.jetty.server.RequestLog - 127.0.0.1 - - [07/May/2021:14:57:50 +0000] "GET /admin/v2/persistent/public/functions/coordinate/stats?getPreciseBacklog=false&subscriptionBacklogSize=false HTTP/1.1" 200 1677 "-" "Pulsar-Java-v2.7.1" 3 14:58:20.962 [pulsar-web-68-1] INFO org.eclipse.jetty.server.RequestLog - 127.0.0.1 - - [07/May/2021:14:58:20 +0000] "GET /admin/v2/persistent/public/functions/coordinate/stats?getPreciseBacklog=false&subscriptionBacklogSize=false HTTP/1.1" 200 1677 "-" "Pulsar-Java-v2.7.1" 3 14:58:50.926 [pulsar-web-68-12] INFO org.eclipse.jetty.server.RequestLog - 127.0.0.1 - - [07/May/2021:14:58:50 +0000] "GET /admin/v2/persistent/public/functions/coordinate/stats?getPreciseBacklog=false&subscriptionBacklogSize=false HTTP/1.1" 200 1677 "-" "Pulsar-Java-v2.7.1" 3 14:59:20.892 [pulsar-web-68-2] INFO org.eclipse.jetty.server.RequestLog - 127.0.0.1 - - [07/May/2021:14:59:20 +0000] "GET /admin/v2/persistent/public/functions/coordinate/stats?getPreciseBacklog=false&subscriptionBacklogSize=false HTTP/1.1" 200 1677 "-" "Pulsar-Java-v2.7.1" 4 14:59:50.857 [pulsar-web-68-3] INFO org.eclipse.jetty.server.RequestLog - 127.0.0.1 - - [07/May/2021:14:59:50 +0000] "GET /admin/v2/persistent/public/functions/coordinate/stats?getPreciseBacklog=false&subscriptionBacklogSize=false HTTP/1.1" 200 1677 "-" "Pulsar-Java-v2.7.1" 3 -
Leave this terminal running in the background while you complete the rest of this quickstart.
Produce and consume messages with Starlight for JMS
This example shows how to produce and consume messages with Starlight for JMS using a JMS client, Java OpenJDK 8 or 11, and Apache Maven. You can also use Gradle with some modifications.
For the complete source code for this example, see the DataStax streaming examples repository. This example was written for Astra Streaming but it can be used with Luna Streaming or self-managed Pulsar clusters by setting the connection properties appropriately.
-
Astra Streaming
-
Luna Streaming
-
Self-managed
-
Create a new Maven project:
mvn archetype:generate \ -DgroupId=org.example \ -DartifactId=StarlightForJMSClient \ -DarchetypeArtifactId=maven-archetype-quickstart \ -DinteractiveMode=false -
Change to the new project directory:
cd StarlightForJMSClient -
Open the project in your IDE, and then add the Starlight for JMS dependency to the
pom.xmlfile.This quickstart uses the
pulsar-jms-allpackage, which is a fat JAR file that includes all dependencies. DataStax recommends using the latest stable release.pom.xml<dependencies> <dependency> <groupId>com.datastax.oss</groupId> <artifactId>pulsar-jms-all</artifactId> <version>3.2.0</version> </dependency> </dependencies> -
Set the compiler source and target according to your Java version:
pom.xml<properties> <maven.compiler.source>11</maven.compiler.source> <maven.compiler.target>11</maven.compiler.target> </properties> -
For this quickstart, include the following plugin configuration in the
buildsection:-
<artifactId>maven-assembly-plugin</artifactId>: The Maven Assembly Plugin that is used to compile a JAR file with all dependencies included. -
<descriptorRef>jar-with-dependencies</descriptorRef>: An additional descriptor appended to the compiled JAR file name. -
<mainClass>org.example.App</mainClass>: The default package and class so you can run the compiled JAR file without any additional specifications.pom.xml<build> <plugins> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> <archive> <manifest> <mainClass>org.example.App</mainClass> </manifest> </archive> </configuration> </plugin> </plugins> </build>
-
-
In your Maven project, create an
examplesubdirectory at/src/main/java/org:mkdir -p /src/main/java/org/example -
In the
exampledirectory, create anApp.javafile. In the next steps, you will add code to this file to create a complete program that produces and consumes messages. -
Paste the following code in the file, and then replace the placeholders with the tenant connection details, namespace, and topic from Astra Streaming. Your editor will report errors because this isn’t a complete program yet.
/src/main/java/org/example/App.javapackage org.example; import com.datastax.oss.pulsar.jms.PulsarConnectionFactory; import javax.jms.JMSContext; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.Queue; import java.util.HashMap; import java.util.Map; public class App { private static String webServiceUrl = "WEB_SERVICE_URL"; private static String brokerServiceUrl = "BROKER_SERVICE_URL"; private static String pulsarToken = "TOKEN"; private static String tenantName = "TENANT_NAME"; private static final String namespace = "NAMESPACE_NAME"; private static final String topicName = "TOPIC_NAME"; private static final String topic = String.format("persistent://%s/%s/%s", tenantName,namespace,topicName); public static void main( String[] args ) throws Exception { -
Add the following code to build the configuration for the producer and consumer:
/src/main/java/org/example/App.javaMap<String, Object> properties = new HashMap<>(); properties.put("webServiceUrl",webServiceUrl); properties.put("brokerServiceUrl",brokerServiceUrl); properties.put("authPlugin","org.apache.pulsar.client.impl.auth.AuthenticationToken"); properties.put("authParams",pulsarToken); -
Add the following code to define a simple
PulsarConnectionFactorythat creates a JMS queue using the full Pulsar topic address, creates a message listener callback function that watches the queue, and then produces a single message on the queue:/src/main/java/org/example/App.javatry (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); ){ JMSContext context = factory.createContext(); Queue queue = context.createQueue(topic); context.createConsumer(queue).setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { try { System.out.println("Received: " + message.getBody(String.class)); } catch (Exception err) { err.printStackTrace(); } } }); String message = "Hello there!"; System.out.println("Sending: "+message); context.createProducer().send(queue, message); Thread.sleep(4000); //wait for the message to be consumed } } } -
Save
App.java, and then build and run the JAR file for the complete program:mvn clean package assembly:single java -jar target/StarlightForJMSClient-1.0-SNAPSHOT-jar-with-dependencies.jar -
Make sure the result shows that a message was sent and received:
Sending: Hello there! Received: Hello there! -
In Astra Streaming, go to your tenant’s Namespaces and Topics tab to inspect the activity in the namespace that you used in the Java program.
If you everything was configured correctly, then the namespace’s metrics should reflect that at least one message was published and consumed by your Astra Streaming Pulsar topic.
-
Create a new Maven project:
mvn archetype:generate \ -DgroupId=org.example \ -DartifactId=StarlightForJMSClient \ -DarchetypeArtifactId=maven-archetype-quickstart \ -DinteractiveMode=false -
Change to the new project directory:
cd StarlightForJMSClient -
Open the project in your IDE, and then add the Starlight for JMS dependency to the
pom.xmlfile.This quickstart uses the
pulsar-jms-allpackage, which is a fat JAR file that includes all dependencies. DataStax recommends using the latest stable release.pom.xml<dependencies> <dependency> <groupId>com.datastax.oss</groupId> <artifactId>pulsar-jms-all</artifactId> <version>3.2.0</version> </dependency> </dependencies> -
Set the compiler source and target according to your Java version:
pom.xml<properties> <maven.compiler.source>11</maven.compiler.source> <maven.compiler.target>11</maven.compiler.target> </properties> -
For this quickstart, include the following plugin configuration in the
buildsection:-
<artifactId>maven-assembly-plugin</artifactId>: The Maven Assembly Plugin that is used to compile a JAR file with all dependencies included. -
<descriptorRef>jar-with-dependencies</descriptorRef>: An additional descriptor appended to the compiled JAR file name. -
<mainClass>org.example.App</mainClass>: The default package and class so you can run the compiled JAR file without any additional specifications.pom.xml<build> <plugins> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> <archive> <manifest> <mainClass>org.example.App</mainClass> </manifest> </archive> </configuration> </plugin> </plugins> </build>
-
-
In your Maven project, create an
examplesubdirectory at/src/main/java/org:mkdir -p /src/main/java/org/example -
In the
exampledirectory, create anApp.javafile, and then paste the following code into the file:/src/main/java/org/example/App.javapackage example; import com.datastax.oss.pulsar.jms.PulsarConnectionFactory; import javax.jms.JMSContext; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.Queue; import java.util.HashMap; import java.util.Map; public class App { public static void main(String ... args) throws Exception { String topic = "persistent://public/default/example-topic"; Map<String, Object> properties = new HashMap<>(); try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); ){ JMSContext context = factory.createContext(); Queue queue = context.createQueue(topic); // Listen for messages... context.createConsumer(queue).setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { try { System.out.println("Received: " + message.getBody(String.class)); } catch (Exception err) { err.printStackTrace(); } } }); for (int i = 0; i < 10; i++) { String message = "Hello world! " + i; System.out.println("Sending: "+message); context.createProducer().send(queue, message); } Thread.sleep(10000); } } }Note the following elements of the
App.javaprogram:-
persistent://public/default/example-topic: The topic URI that the client uses to publish and consume messages. This quickstart uses the default tenant and namespace with a topic namedexample-topic. The tenant and namespace must already exist. If the specified topic doesn’t exist in the namespace, Starlight for JMS creates the topic automatically. -
Map<String, Object> properties = new HashMap<>();: Creates an empty Starlight for JMS configuration properties hashmap. You must overridewebServiceUrlif your Pulsar cluster’s admin endpoint isn’t accessible onhttp://localhost:8080. For remote clusters, additional properties are required, such as the Broker Service URL and authentication credentials. -
Queue queue = context.createQueue(topic);: Creates a Starlight for JMS queue. For more information about queues and topics, see Map Pulsar concepts to JMS specifications. -
context.createConsumer(queue).setMessageListener(new MessageListener() {…}): Creates a Starlight for JMS consumer context using thecreateConsumermethod. Then,public void onMessage(Message message) {…}initializes anonMessagecallback to consume the messages as they arrive. For more information about consumers, see Map Pulsar concepts to JMS specifications. -
for (int i = 0; i < 10; i++) {…}: Sends 10 messages to the queue using thecreateProducermethod. -
Thread.sleep(10000);: Sleeps for 10 seconds to allow time for all messages to be consumed before closing the connection.
-
-
Compile the sample application:
mvn clean installThe first time you compile the JAR file, Maven downloads all required dependencies. Subsequent runs are faster.
-
Run the sample application:
java -jar target/StarlightForJMSClient-1.0-SNAPSHOT-jar-with-dependencies.jar -
Make sure the result shows that messages were sent and received.
It can be easy to miss message
0in the output because it is the first line of output, and it is followed by many status lines before the rest of the messages.Additionally, messages are consumed asynchronously, so the messages might not be sent and received sequentially.
Sending: Hello world! 0 ... TRUNCATED ... Sending: Hello world! 1 Sending: Hello world! 2 Received: Hello world! 0 Received: Hello world! 1 Sending: Hello world! 3 Received: Hello world! 2 Sending: Hello world! 4 Received: Hello world! 3 Sending: Hello world! 5 Sending: Hello world! 6 Received: Hello world! 4 Received: Hello world! 5 Sending: Hello world! 7 Received: Hello world! 6 Sending: Hello world! 8 Received: Hello world! 7 Sending: Hello world! 9 Received: Hello world! 8 Received: Hello world! 9 [pulsar-client-io-5-1] INFO org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/default/example-topic] [jms-queue] Closed consumer [pulsar-client-io-5-1] INFO org.apache.pulsar.client.impl.ProducerImpl - [persistent://public/default/example-topic] [standalone-0-0] Closed Producer [main] INFO org.apache.pulsar.client.impl.PulsarClientImpl - Client closing. URL: http://localhost:8080 [pulsar-client-io-5-1] INFO org.apache.pulsar.client.impl.ClientCnx - [id: 0xe43071e3, L:/127.0.0.1:65236 ! R:localhost/127.0.0.1:6650] Disconnected -
Use the Pulsar Admin CLI or the Luna Streaming Pulsar Admin Console to inspect your tenant’s activity, and verify that messages were published and consumed on the specified topic.
-
Create a new Maven project:
mvn archetype:generate \ -DgroupId=org.example \ -DartifactId=StarlightForJMSClient \ -DarchetypeArtifactId=maven-archetype-quickstart \ -DinteractiveMode=false -
Change to the new project directory:
cd StarlightForJMSClient -
Open the project in your IDE, and then add the Starlight for JMS dependency to the
pom.xmlfile.This quickstart uses the
pulsar-jms-allpackage, which is a fat JAR file that includes all dependencies. DataStax recommends using the latest stable release.pom.xml<dependencies> <dependency> <groupId>com.datastax.oss</groupId> <artifactId>pulsar-jms-all</artifactId> <version>3.2.0</version> </dependency> </dependencies> -
Set the compiler source and target according to your Java version:
pom.xml<properties> <maven.compiler.source>11</maven.compiler.source> <maven.compiler.target>11</maven.compiler.target> </properties> -
For this quickstart, include the following plugin configuration in the
buildsection:-
<artifactId>maven-assembly-plugin</artifactId>: The Maven Assembly Plugin that is used to compile a JAR file with all dependencies included. -
<descriptorRef>jar-with-dependencies</descriptorRef>: An additional descriptor appended to the compiled JAR file name. -
<mainClass>org.example.App</mainClass>: The default package and class so you can run the compiled JAR file without any additional specifications.pom.xml<build> <plugins> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> <archive> <manifest> <mainClass>org.example.App</mainClass> </manifest> </archive> </configuration> </plugin> </plugins> </build>
-
-
In your Maven project, create an
examplesubdirectory at/src/main/java/org:mkdir -p /src/main/java/org/example -
In the
exampledirectory, create anApp.javafile, and then paste the following code into the file:/src/main/java/org/example/App.javapackage example; import com.datastax.oss.pulsar.jms.PulsarConnectionFactory; import javax.jms.JMSContext; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.Queue; import java.util.HashMap; import java.util.Map; public class App { public static void main(String ... args) throws Exception { String topic = "persistent://public/default/example-topic"; Map<String, Object> properties = new HashMap<>(); try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); ){ JMSContext context = factory.createContext(); Queue queue = context.createQueue(topic); // Listen for messages... context.createConsumer(queue).setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { try { System.out.println("Received: " + message.getBody(String.class)); } catch (Exception err) { err.printStackTrace(); } } }); for (int i = 0; i < 10; i++) { String message = "Hello world! " + i; System.out.println("Sending: "+message); context.createProducer().send(queue, message); } Thread.sleep(10000); } } }Note the following elements of the
App.javaprogram:-
persistent://public/default/example-topic: The topic URI that the client uses to publish and consume messages. This quickstart uses the default tenant and namespace with a topic namedexample-topic. The tenant and namespace must already exist. If the specified topic doesn’t exist in the namespace, Starlight for JMS creates the topic automatically. -
Map<String, Object> properties = new HashMap<>();: Creates an empty Starlight for JMS configuration properties hashmap. You must overridewebServiceUrlif your Pulsar cluster’s admin endpoint isn’t accessible onhttp://localhost:8080. For remote clusters, additional properties are required, such as the Broker Service URL and authentication credentials. -
Queue queue = context.createQueue(topic);: Creates a Starlight for JMS queue. For more information about queues and topics, see Map Pulsar concepts to JMS specifications. -
context.createConsumer(queue).setMessageListener(new MessageListener() {…}): Creates a Starlight for JMS consumer context using thecreateConsumermethod. Then,public void onMessage(Message message) {…}initializes anonMessagecallback to consume the messages as they arrive. For more information about consumers, see Map Pulsar concepts to JMS specifications. -
for (int i = 0; i < 10; i++) {…}: Sends 10 messages to the queue using thecreateProducermethod. -
Thread.sleep(10000);: Sleeps for 10 seconds to allow time for all messages to be consumed before closing the connection.
-
-
Compile the sample application:
mvn clean installThe first time you compile the JAR file, Maven downloads all required dependencies. Subsequent runs are faster.
-
Run the sample application:
java -jar target/StarlightForJMSClient-1.0-SNAPSHOT-jar-with-dependencies.jar -
Make sure the result shows that messages were sent and received.
It can be easy to miss message
0in the output because it is the first line of output, and it is followed by many status lines before the rest of the messages.Additionally, messages are consumed asynchronously, so the messages might not be sent and received sequentially.
Sending: Hello world! 0 ... TRUNCATED ... Sending: Hello world! 1 Sending: Hello world! 2 Received: Hello world! 0 Received: Hello world! 1 Sending: Hello world! 3 Received: Hello world! 2 Sending: Hello world! 4 Received: Hello world! 3 Sending: Hello world! 5 Sending: Hello world! 6 Received: Hello world! 4 Received: Hello world! 5 Sending: Hello world! 7 Received: Hello world! 6 Sending: Hello world! 8 Received: Hello world! 7 Sending: Hello world! 9 Received: Hello world! 8 Received: Hello world! 9 [pulsar-client-io-5-1] INFO org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/default/example-topic] [jms-queue] Closed consumer [pulsar-client-io-5-1] INFO org.apache.pulsar.client.impl.ProducerImpl - [persistent://public/default/example-topic] [standalone-0-0] Closed Producer [main] INFO org.apache.pulsar.client.impl.PulsarClientImpl - Client closing. URL: http://localhost:8080 [pulsar-client-io-5-1] INFO org.apache.pulsar.client.impl.ClientCnx - [id: 0xe43071e3, L:/127.0.0.1:65236 ! R:localhost/127.0.0.1:6650] Disconnected -
Use the Pulsar Admin CLI to inspect your tenant’s activity, and verify that messages were published and consumed on the specified topic.
-
If you used a Pulsar Docker instance for this quickstart, stop and remove the Docker container when you are done testing the quickstart program:
-
In the terminal where you started the Pulsar Docker instance, press Ctrl+C to stop the container.
-
Get a list of all Docker containers, and then note the container ID for the
apachepulsarcontainer:docker ps --allResult
In the following example result, the container ID is
5116f0d16eb3:CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES 5116f0d16eb3 apachepulsar/pulsar:latest "/pulsar/bin/pulsar …" 2 hours ago Exited (130) 2 minutes ago pulsar-jms-runner 7ed5fc6c9776 cassandra:latest "docker-entrypoint.s…" 7 days ago Exited (143) 7 days ago my_cass -
Delete the container by its container ID:
docker rm CONTAINER_ID
-