Mapping Pulsar concepts to JMS specifications
JMS specifications are built upon the concepts of Topics and Queues, but Apache Pulsar™ has only a general concept of Topics that can model both of those two domains.
In Pulsar there is no concept of a queue: Starlight for JMS treats Pulsar topics as queues when queue-related JMS API are in use. There is no strict, cluster wide, verification that you are accessing a JMS Queue using the Topic API or vice versa.
In JMS a Topic is written by many Producers and read by many Consumers that share one or many Subscriptions. Subscriptions may be Durable or Non-Durable (meaning the client position not retained on restart), and Shared or Non-Shared determining whether the same message may be received and processed by more than one consumer.
Mappings between JMS consumer/subscriptions and Apache consumers for topics
JMS Concept | Pulsar Concept |
---|---|
Topic |
Persistent topic |
Consumer |
Exclusive non-durable subscription with random name ( |
DurableConsumer |
Exclusive durable subscription with the given name + |
SharedConsumer |
Shared non-durable subscription with the given name + |
SharedDurableConsumer |
Shared durable subscription with the given name + |
DurableSubscriber |
Exclusive durable subscription with the given name + |
As of Pulsar version 2.7.x, delayed messages don’t work with exclusive subscriptions. You can, however, force the usage of shared non-durable subscriptions for simple consumers by setting jms.useExclusiveSubscriptionsForSimpleConsumers=false .
|
For SharedConsumer
and SharedDurableConsumer
, set jms.topicSharedSubscriptionType
to Key_Shared
to use the Key_Shared
subscription type.
In JMS a Queue is written by many Producers but only one Consumer eventually processes each message.
In order to emulate that behavior, the first time you create a consumer over a queue, Starlight for JMS creates a durable subscription named jms-queue
at the beginning (initial position = Earliest) of the Pulsar topic.
Every access to the queue passes through the shared subscription and guarantees that only one consumer receives and processes each message.
Mappings between JMS consumer/subscriptions and Apache consumers for queues
JMS Concept | Pulsar Concept |
---|---|
Queue |
Persistent topic |
Consumer |
Shared durable subscription with name |
QueueBrowser |
Pulsar Reader for the topic, beginning from the next message on |
You can change the name of the shared subscription using the jms.queueSubscriptionName configuration parameter, but you must ensure that you change it on every client.
|
In order to implement QueueBrowser
, Starlight for JMS uses the Pulsar Reader API, starting from the next message available on the jms-queue
subscription. In order to peek at the next message Starlight for JMS uses the Pulsar Admin API peekMessages
.
In certain cases, the peekMessages API can return the last consumed message of the subscription, so the QueueBrowser may return inaccurate results.
|
Overriding jms-queue
By default, the subscription created by a JMS queue is named jms-queue
, which can be overriden with jms.queueSubscriptionName
in the PulsarConnectionFactory
constructor.
You can instead set the subscription name at queue creation:
try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); ){
Queue queue = session.createQueue("pulsarQueue:subscriptionName");
}
In the above example, the subscription will be subscriptionName
, and the queue will be backed by the public/default/pulsarQueue
Pulsar topic.
You can override the default public/default
namespace with jms.systemNamespace
in the PulsarConnectionFactory
constructor:
Map<String, Object> configuration = new HashMap<>();
configuration.put("jms.systemNamespace", "<my-tenant>/<my-namespace>");
ConnectionFactory factory = new PulsarConnectionFactory(configuration);
...
factory.close();
A fully-qualified namespace works similarly, but the topic destination points to a persistent topic URL:
Queue queue = session.createQueue("persistent://tenant/namespace/pulsarQueue:subscriptionName")
In the above example, the subscription will be subscriptionName
, and the queue will be backed by the persistent://tenant/namespace/pulsarQueue
Pulsar topic.
Disabling automatic subscription creation
By default, the jms.precreateQueueSubscription
configuration flag is set to true
, and the JMS client will pre-create the jms-queue
subscription (see Overriding jms-queue above).
To disable automatic subscription creation in the JMS client, set the jms.precreateQueueSubscription
parameter to false
, as below:
Map<String, Object> properties = new HashMap<>();
properties.put("jms.precreateQueueSubscription", "false");
To disable automatic subscription creation on the Pulsar broker, set the allowAutoSubscriptionCreation
parameter to false
, as below:
public static void before() throws Exception {
cluster =
new PulsarCluster(
tempDir,
config -> {
config.setAllowAutoSubscriptionCreation(false);
});
cluster.start();
}
Or modify the Pulsar broker’s broker.conf
file:
allowAutoSubscriptionCreation=false
If both allowAutoSubscriptionCreation
and jms.precreateQueueSubscription
are set to false
, automatic subscription creation is completely disabled.
Mappings between JMS consumer modes and Pulsar
There are five consumer mode session objects in JMS. This section describes how they map to Pulsar concepts.
Session.AUTO_ACKNOWLEDGE
This mode maps to the Pulsar acknowledgeMessage
API.
By default, acknowledgements in Pulsar are asynchronous and best effort, enabling duplicates to always be displayed on the Consumer.
In the example below, we create a consumer on mytopic
with Session.AUTO_ACKNOWLEDGE
, send 100 foo
messages, and auto-acknowledge them.
import com.datastax.oss.pulsar.jms.PulsarConnectionFactory;
import javax.jms.Connection;
import javax.jms.JMSContext;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.util.HashMap;
import java.util.Map;
public class JMSExample {
public static void main(String ... args) throws Exception {
// start pulsar standalone on localhost
// the topic is autocreated, no additional setup is needed
String topic = "persistent://public/default/mytopic";
Map<String, Object> properties = new HashMap<>();
try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); ){
try (Connection connection = factory.createConnection()) {
try (Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE)) {
Queue queue = session.createQueue(topic);
session.createConsumer(queue).setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
try {
System.out.println("received " + message.getBody(String.class));
} catch (Exception err) {
err.printStackTrace();
}
}
});
try (MessageProducer producer = session.createProducer(queue)) {
for (int i = 0; i < 100; i++) {
String message = "foo" + i;
System.out.println("sending " + message);
TextMessage textMessage = session.createTextMessage(message);
producer.send(textMessage);
}
}
// waiting 10 seconds, in order to see all messages consumed
Thread.sleep(10000);
}
}
}
}
}
Session.DUPS_OK_ACKNOWLEDGE
This mode maps to Consumer.acknowledgeAsync()
without waiting for the CompleteableFuture
returned by the method to complete.
Starlight for JMS logs an error in case there is something wrong, but the application will continue running and is not aware of the failure.
In the example below, we create a consumer on mytopic
with Session.DUPS_OK_ACKNOWLEDGE
, send 100 foo
messages, and acknowledge them without waiting for the CompleteableFuture
method to complete.
import com.datastax.oss.pulsar.jms.PulsarConnectionFactory;
import javax.jms.Connection;
import javax.jms.JMSContext;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.util.HashMap;
import java.util.Map;
public class JMSExample {
public static void main(String ... args) throws Exception {
// start pulsar standalone on localhost
// the topic is autocreated, no additional setup is needed
String topic = "persistent://public/default/mytopic";
Map<String, Object> properties = new HashMap<>();
try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); ){
try (Connection connection = factory.createConnection()) {
try (Session session = connection.createSession(Session.DUPS_OK_ACKNOWLEDGE)) {
Queue queue = session.createQueue(topic);
session.createConsumer(queue).setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
try {
System.out.println("received " + message.getBody(String.class));
} catch (Exception err) {
err.printStackTrace();
}
}
});
try (MessageProducer producer = session.createProducer(queue)) {
for (int i = 0; i < 100; i++) {
String message = "foo" + i;
System.out.println("sending " + message);
TextMessage textMessage = session.createTextMessage(message);
producer.send(textMessage);
}
}
// waiting 10 seconds, in order to see all messages consumed
Thread.sleep(10000);
}
}
}
}
}
Session.CLIENT_ACKNOWLEDGE
In this mode the JMS client works like AUTO_ACKNOWLEDGE
, automatically acknowledging all messages received before the message inside the same session. To acknowledge only the individual message, use PulsarJMSConstants.INDIVIDUAL_ACKNOWLEDGE.
In the example below, we create a consumer on mytopic
with Session.CLIENT_ACKNOWLEDGE
, send 100 foo
messages, and acknowledge them by calling Message.acknowledge()`in the `onMessage
method.
This message and all previous messages in the session are acknowledged.
import com.datastax.oss.pulsar.jms.PulsarConnectionFactory;
import javax.jms.Connection;
import javax.jms.JMSContext;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.util.HashMap;
import java.util.Map;
public class JMSExample {
public static void main(String ... args) throws Exception {
// start pulsar standalone on localhost
// the topic is autocreated, no additional setup is needed
String topic = "persistent://public/default/mytopic";
Map<String, Object> properties = new HashMap<>();
try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); ){
try (Connection connection = factory.createConnection()) {
try (Session session = connection.createSession(Session.CLIENT_ACKNOWLEDGE)) {
Queue queue = session.createQueue(topic);
session.createConsumer(queue).setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
try {
System.out.println("received " + message.getBody(String.class));
message.acknowledge();
} catch (Exception err) {
err.printStackTrace();
}
}
});
try (MessageProducer producer = session.createProducer(queue)) {
for (int i = 0; i < 100; i++) {
String message = "foo" + i;
System.out.println("sending " + message);
TextMessage textMessage = session.createTextMessage(message);
producer.send(textMessage);
}
}
// waiting 10 seconds, in order to see all messages consumed
Thread.sleep(10000);
}
}
}
}
}
PulsarJMSConstants.INDIVIDUAL_ACKNOWLEDGE
In this mode the JMS client works like CLIENT_ACKNOWLEDGE
, but when you call Message.acknowledge()
, only the single message you called for will be acknowledged.
This is a proprietary extension that is not present in the standard JMS 2.0 specifications. |
In the example below, we create a consumer on mytopic
with PulsarJMSConstants.INDIVIDUAL_ACKNOWLEDGE
, send 100 foo
messages, and acknowledge them by calling Message.acknowledge()`in the `onMessage
method.
Only the individual message is acknowledged.
import com.datastax.oss.pulsar.jms.PulsarConnectionFactory;
import javax.jms.Connection;
import javax.jms.JMSContext;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.util.HashMap;
import java.util.Map;
public class JMSExample {
public static void main(String ... args) throws Exception {
// start pulsar standalone on localhost
// the topic is autocreated, no additional setup is needed
String topic = "persistent://public/default/mytopic";
Map<String, Object> properties = new HashMap<>();
try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); ){
try (Connection connection = factory.createConnection()) {
try (Session session = connection.createSession(PulsarJMSConstants.INDIVIDUAL_ACKNOWLEDGE)) {
Queue queue = session.createQueue(topic);
session.createConsumer(queue).setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
try {
System.out.println("received " + message.getBody(String.class));
message.acknowledge();
} catch (Exception err) {
err.printStackTrace();
}
}
});
try (MessageProducer producer = session.createProducer(queue)) {
for (int i = 0; i < 100; i++) {
String message = "foo" + i;
System.out.println("sending " + message);
TextMessage textMessage = session.createTextMessage(message);
producer.send(textMessage);
}
}
// waiting 10 seconds, in order to see all messages consumed
Thread.sleep(10000);
}
}
}
}
}
Session.SESSION_TRANSACTED
The acknowledgement is bound to the Transaction, and the acknowledgement is sent to the broker only during Transaction.commit()
.
If you’re porting a JMS application that is using |
Interoperability between Starlight for JMS and other Pulsar clients
Starlight for JMS doesn’t deal with schema, and it treats every message as a raw array of bytes, interpreting the content of the message according to the JMS API that is used, and to a special JMSPulsarMessageType
property.
JMS specs require that, on the consumer side, you receive a message of the same type sent by the producer:
-
TextMessage
-
BytesMessage
-
StreamMessage
-
MapMessage
-
ObjectMessage
When the JMS consumer receives a message that has not been produced by Starlight for JMS itself and lacks the JMSPulsarMessageType
property, it converts the message to a BytesMessage
.
Pulsar message keys and JMSXGroupID
The special JMSXGroupID
property is defined in the JMS specs as a way to group messages and enable routing to the same destination.
Starlight for JMS maps that property to the message key in Pulsar, ensuring that JMSXGroupID
is used as the routing key.
This is the same behavior implemented in Apache ActiveMQ. |
What’s next?
-
Starlight for JMS standalone quickstart: Create a simple command line Java JMS client that connects to a local Pulsar installation.
-
Getting started with Starlight for JMS: Create a simple command line Java JMS client that connects to an Astra Streaming instance.
-
Installing Starlight for JMS: Install Starlight for JMS in your own JMS project.
-
Starlight for JMS implementation details: Understand key implementation details for Starlight for JMS.
-
Starlight for JMS FAQs: Frequently asked questions about Starlight for JMS.
-
Starlight for JMS configuration reference: Starlight for JMS configuration reference.