Mapping Pulsar concepts to JMS specifications

JMS specifications are built upon the concepts of Topics and Queues, but Apache Pulsarâ„¢ has only a general concept of Topics that can model both of those two domains.

In Pulsar there is no concept of a queue: Starlight for JMS treats Pulsar topics as queues when queue-related JMS API are in use. There is no strict, cluster wide, verification that you are accessing a JMS Queue using the Topic API or vice versa.

In JMS a Topic is written by many Producers and read by many Consumers that share one or many Subscriptions. Subscriptions may be Durable or Non-Durable (meaning the client position not retained on restart), and Shared or Non-Shared determining whether the same message may be received and processed by more than one consumer.

Mappings between JMS consumer/subscriptions and Apache consumers for topics

JMS Concept Pulsar Concept

Topic

Persistent topic

Consumer

Exclusive non-durable subscription with random name (UUID)

DurableConsumer

Exclusive durable subscription with the given name + clientId

SharedConsumer

Shared non-durable subscription with the given name + clientId

SharedDurableConsumer

Shared durable subscription with the given name + clientId

DurableSubscriber

Exclusive durable subscription with the given name + clientId

As of Pulsar version 2.7.x, delayed messages don’t work with exclusive subscriptions. You can, however, force the usage of shared non-durable subscriptions for simple consumers by setting jms.useExclusiveSubscriptionsForSimpleConsumers=false.

For SharedConsumer and SharedDurableConsumer, set jms.topicSharedSubscriptionType to Key_Shared to use the Key_Shared subscription type.

In JMS a Queue is written by many Producers but only one Consumer eventually processes each message.

In order to emulate that behavior, the first time you create a consumer over a queue, Starlight for JMS creates a durable subscription named jms-queue at the beginning (initial position = Earliest) of the Pulsar topic.

Every access to the queue passes through the shared subscription and guarantees that only one consumer receives and processes each message.

Mappings between JMS consumer/subscriptions and Apache consumers for queues

JMS Concept Pulsar Concept

Queue

Persistent topic

Consumer

Shared durable subscription with name jms-queue

QueueBrowser

Pulsar Reader for the topic, beginning from the next message on jms-queue subscription

You can change the name of the shared subscription using the jms.queueSubscriptionName configuration parameter, but you must ensure that you change it on every client.

In order to implement QueueBrowser, Starlight for JMS uses the Pulsar Reader API, starting from the next message available on the jms-queue subscription. In order to peek at the next message Starlight for JMS uses the Pulsar Admin API peekMessages.

In certain cases, the peekMessages API can return the last consumed message of the subscription, so the QueueBrowser may return inaccurate results.

Overriding jms-queue

By default, the subscription created by a JMS queue is named jms-queue, which can be overriden with jms.queueSubscriptionName in the PulsarConnectionFactory constructor.

You can instead set the subscription name at queue creation:

try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); ){
    Queue queue = session.createQueue("pulsarQueue:subscriptionName");
}

In the above example, the subscription will be pulsarQueue:subscriptionName, and the queue will be backed by the public/default/pulsarQueue Pulsar topic.

You can override the default public/default namespace with jms.systemNamespace in the PulsarConnectionFactory constructor:

Map<String, Object> configuration = new HashMap<>();
configuration.put("jms.systemNamespace", "<my-tenant>/<my-namespace>");
ConnectionFactory factory = new PulsarConnectionFactory(configuration);
...
factory.close();

A fully-qualified namespace works similarly, but the topic destination points to a persistent topic URL:

Queue queue = session.createQueue("persistent://tenant/namespace/pulsarQueue:subscriptionName")

In the above example, the subscription will be pulsarQueue:subscriptionName, and the queue will be backed by the persistent://tenant/namespace/pulsarQueue Pulsar topic.

Disabling automatic subscription creation

By default, the jms.precreateQueueSubscription configuration flag is set to true, and the JMS client will pre-create the jms-queue subscription (see Overriding jms-queue above).

To disable automatic subscription creation in the JMS client, set the jms.precreateQueueSubscription parameter to false, as below:

Map<String, Object> properties = new HashMap<>();
    properties.put("jms.precreateQueueSubscription", "false");

To disable automatic subscription creation on the Pulsar broker, set the allowAutoSubscriptionCreation parameter to false, as below:

public static void before() throws Exception {
    cluster =
        new PulsarCluster(
            tempDir,
            config -> {
              config.setAllowAutoSubscriptionCreation(false);
            });
    cluster.start();
}

Or modify the Pulsar broker’s broker.conf file:

allowAutoSubscriptionCreation=false

If both allowAutoSubscriptionCreation and jms.precreateQueueSubscription are set to false, automatic subscription creation is completely disabled.

Mappings between JMS consumer modes and Pulsar

There are five consumer mode session objects in JMS. This section describes how they map to Pulsar concepts.

Session.AUTO_ACKNOWLEDGE

This mode maps to the Pulsar acknowledgeMessage API.

By default, acknowledgements in Pulsar are asynchronous and best effort, enabling duplicates to always be displayed on the Consumer.

In the example below, we create a consumer on mytopic with Session.AUTO_ACKNOWLEDGE, send 100 foo messages, and auto-acknowledge them.

import com.datastax.oss.pulsar.jms.PulsarConnectionFactory;

import javax.jms.Connection;
import javax.jms.JMSContext;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.util.HashMap;
import java.util.Map;

public class JMSExample {

    public static void main(String ... args) throws Exception {

        // start pulsar standalone on localhost

        // the topic is autocreated, no additional setup is needed
        String topic = "persistent://public/default/mytopic";

        Map<String, Object> properties = new HashMap<>();

        try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); ){
            try (Connection connection = factory.createConnection()) {
                try (Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE)) {
                    Queue queue = session.createQueue(topic);

                    session.createConsumer(queue).setMessageListener(new MessageListener() {
                        @Override
                        public void onMessage(Message message) {
                            try {
                                System.out.println("received " + message.getBody(String.class));

                            } catch (Exception err) {
                                err.printStackTrace();
                            }
                        }
                    });

                    try (MessageProducer producer = session.createProducer(queue)) {
                        for (int i = 0; i < 100; i++) {
                            String message = "foo" + i;
                            System.out.println("sending " + message);
                            TextMessage textMessage = session.createTextMessage(message);
                            producer.send(textMessage);
                        }
                    }

                    // waiting 10 seconds, in order to see all messages consumed
                    Thread.sleep(10000);
                }
            }
        }
    }
}

Session.DUPS_OK_ACKNOWLEDGE

This mode maps to Consumer.acknowledgeAsync() without waiting for the CompleteableFuture returned by the method to complete.

Starlight for JMS logs an error in case there is something wrong, but the application will continue running and is not aware of the failure.

In the example below, we create a consumer on mytopic with Session.DUPS_OK_ACKNOWLEDGE, send 100 foo messages, and acknowledge them without waiting for the CompleteableFuture method to complete.

import com.datastax.oss.pulsar.jms.PulsarConnectionFactory;

import javax.jms.Connection;
import javax.jms.JMSContext;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.util.HashMap;
import java.util.Map;

public class JMSExample {

    public static void main(String ... args) throws Exception {

        // start pulsar standalone on localhost

        // the topic is autocreated, no additional setup is needed
        String topic = "persistent://public/default/mytopic";

        Map<String, Object> properties = new HashMap<>();

        try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); ){
            try (Connection connection = factory.createConnection()) {
                try (Session session = connection.createSession(Session.DUPS_OK_ACKNOWLEDGE)) {
                    Queue queue = session.createQueue(topic);

                    session.createConsumer(queue).setMessageListener(new MessageListener() {
                        @Override
                        public void onMessage(Message message) {
                            try {
                                System.out.println("received " + message.getBody(String.class));

                            } catch (Exception err) {
                                err.printStackTrace();
                            }
                        }
                    });

                    try (MessageProducer producer = session.createProducer(queue)) {
                        for (int i = 0; i < 100; i++) {
                            String message = "foo" + i;
                            System.out.println("sending " + message);
                            TextMessage textMessage = session.createTextMessage(message);
                            producer.send(textMessage);
                        }
                    }

                    // waiting 10 seconds, in order to see all messages consumed
                    Thread.sleep(10000);
                }
            }
        }
    }
}

Session.CLIENT_ACKNOWLEDGE

In this mode the JMS client works like AUTO_ACKNOWLEDGE, automatically acknowledging all messages received before the message inside the same session. To acknowledge only the individual message, use PulsarJMSConstants.INDIVIDUAL_ACKNOWLEDGE.

In the example below, we create a consumer on mytopic with Session.CLIENT_ACKNOWLEDGE, send 100 foo messages, and acknowledge them by calling Message.acknowledge()`in the `onMessage method.
This message and all previous messages in the session are acknowledged.

import com.datastax.oss.pulsar.jms.PulsarConnectionFactory;

import javax.jms.Connection;
import javax.jms.JMSContext;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.util.HashMap;
import java.util.Map;

public class JMSExample {

    public static void main(String ... args) throws Exception {

        // start pulsar standalone on localhost

        // the topic is autocreated, no additional setup is needed
        String topic = "persistent://public/default/mytopic";

        Map<String, Object> properties = new HashMap<>();

        try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); ){
            try (Connection connection = factory.createConnection()) {
                try (Session session = connection.createSession(Session.CLIENT_ACKNOWLEDGE)) {
                    Queue queue = session.createQueue(topic);

                    session.createConsumer(queue).setMessageListener(new MessageListener() {
                        @Override
                        public void onMessage(Message message) {
                            try {
                                System.out.println("received " + message.getBody(String.class));
                                message.acknowledge();
                            } catch (Exception err) {
                                err.printStackTrace();
                            }
                        }
                    });

                    try (MessageProducer producer = session.createProducer(queue)) {
                        for (int i = 0; i < 100; i++) {
                            String message = "foo" + i;
                            System.out.println("sending " + message);
                            TextMessage textMessage = session.createTextMessage(message);
                            producer.send(textMessage);
                        }
                    }

                    // waiting 10 seconds, in order to see all messages consumed
                    Thread.sleep(10000);
                }
            }
        }
    }
}

PulsarJMSConstants.INDIVIDUAL_ACKNOWLEDGE

In this mode the JMS client works like CLIENT_ACKNOWLEDGE, but when you call Message.acknowledge(), only the single message you called for will be acknowledged.

This is a proprietary extension that is not present in the standard JMS 2.0 specifications.

In the example below, we create a consumer on mytopic with PulsarJMSConstants.INDIVIDUAL_ACKNOWLEDGE, send 100 foo messages, and acknowledge them by calling Message.acknowledge()`in the `onMessage method.
Only the individual message is acknowledged.

import com.datastax.oss.pulsar.jms.PulsarConnectionFactory;

import javax.jms.Connection;
import javax.jms.JMSContext;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.util.HashMap;
import java.util.Map;

public class JMSExample {

    public static void main(String ... args) throws Exception {

        // start pulsar standalone on localhost

        // the topic is autocreated, no additional setup is needed
        String topic = "persistent://public/default/mytopic";

        Map<String, Object> properties = new HashMap<>();

        try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); ){
            try (Connection connection = factory.createConnection()) {
                try (Session session = connection.createSession(PulsarJMSConstants.INDIVIDUAL_ACKNOWLEDGE)) {
                    Queue queue = session.createQueue(topic);

                    session.createConsumer(queue).setMessageListener(new MessageListener() {
                        @Override
                        public void onMessage(Message message) {
                            try {
                                System.out.println("received " + message.getBody(String.class));
                                message.acknowledge();
                            } catch (Exception err) {
                                err.printStackTrace();
                            }
                        }
                    });

                    try (MessageProducer producer = session.createProducer(queue)) {
                        for (int i = 0; i < 100; i++) {
                            String message = "foo" + i;
                            System.out.println("sending " + message);
                            TextMessage textMessage = session.createTextMessage(message);
                            producer.send(textMessage);
                        }
                    }

                    // waiting 10 seconds, in order to see all messages consumed
                    Thread.sleep(10000);
                }
            }
        }
    }
}

Session.SESSION_TRANSACTED

The acknowledgement is bound to the Transaction, and the acknowledgement is sent to the broker only during Transaction.commit().

If you’re porting a JMS application that is using SESSION_TRANSACTED but don’t need to perform the transaction’s operations, enable jms.emulateTransactions. See Transaction Emulation.

Interoperability between Starlight for JMS and other Pulsar clients

Starlight for JMS doesn’t deal with schema, and it treats every message as a raw array of bytes, interpreting the content of the message according to the JMS API that is used, and to a special JMSPulsarMessageType property.

JMS specs require that, on the consumer side, you receive a message of the same type sent by the producer:

  • TextMessage

  • BytesMessage

  • StreamMessage

  • MapMessage

  • ObjectMessage

When the JMS consumer receives a message that has not been produced by Starlight for JMS itself and lacks the JMSPulsarMessageType property, it converts the message to a BytesMessage.

Pulsar message keys and JMSXGroupID

The special JMSXGroupID property is defined in the JMS specs as a way to group messages and enable routing to the same destination.

Starlight for JMS maps that property to the message key in Pulsar, ensuring that JMSXGroupID is used as the routing key.

This is the same behavior implemented in Apache ActiveMQ.

What’s next?