Starlight for JMS implementation details
A discussion of design and implementation decisions made to support JMS requirements.
Client identifiers
Each connection may have a client identifier that can be set programmatically or configured administratively using the jms.clientId
configuration parameters. Client identifiers must be globally unique but there is no way to enforce that constraint on a Pulsar cluster.
When you set a clientId
, the actual subscription name in Pulsar is constructed as clientId + '_' + subscriptionName
.
Dead letter topic configuration
Dead letter policy controls Pulsar’s handling of messages that can’t be processed successfully. Under normal processing conditions, these messages cannot be processed successfully and the consumer cannot move to the next message.
One solution for this situation is the dead letter topic policy. The dead-letter topic policy routes the unsuccessful messages to the dead-letter topic for storage, so consumers can keep consuming while the unsuccessful messages are stored in the dead-letter topic for processing or evaluation.
Starlight for JMS allows further configuration of deadLetterPolicy
. For more on deadLetterTopic
configuration properties, see Starlight for JMS configuration reference.
Map<String, Object> deadLetterPolicy = new HashMap<>();
consumerConfig.put("deadLetterPolicy", deadLetterPolicy);
deadLetterPolicy.put("maxRedeliverCount", 1);
deadLetterPolicy.put("deadLetterTopic", deadLetterTopic);
deadLetterPolicy.put("initialSubscriptionName", "dqlsub");
Dead letter topic triggers
A message being written to the dead-letter topic is triggered in one of two ways:
Acknowledgement timeout
In an acknowledgement timeout, the broker fails to receive an acknowledgement from a consumer in the amount of time specified in ackTimeout
.
The ackTimeoutRedeliveryBackoff
values control the time between the expiration of ackTimeout
and the client sending a redeliver unacknowledged messages
request to the broker.
Map<String, Object> ackTimeoutRedeliveryBackoff = new HashMap<>();
consumerConfig.put("ackTimeoutRedeliveryBackoff", ackTimeoutRedeliveryBackoff);
ackTimeoutRedeliveryBackoff.put("minDelayMs", 10);
ackTimeoutRedeliveryBackoff.put("maxDelayMs", 100);
ackTimeoutRedeliveryBackoff.put("multiplier", 2.0);
Negative acknowledgement
In a negative acknowledgement, the broker receives a negative acknowledgement indicating that a consumer did not process a message, so the broker tries re-delivering the message.
When the number of re-deliveries exceeds the value specified in maxRedeliverCount
, the message is routed to the dead letter topic.
The negativeAckRedeliveryBackoff
values control the time between the broker receiving the negative acknowledgement and attempting redelivery.
Map<String, Object> negativeAckRedeliveryBackoff = new HashMap<>();
consumerConfig.put("negativeAckRedeliveryBackoff", negativeAckRedeliveryBackoff);
negativeAckRedeliveryBackoff.put("minDelayMs", 10);
negativeAckRedeliveryBackoff.put("maxDelayMs", 100);
negativeAckRedeliveryBackoff.put("multiplier", 2.0);
A few usage notes on negative acknowledgement in Starlight for JMS:
-
Calling
Session.recover()
stops delivery in the session, restarts message delivery with the oldest unacked message, and sends negative acknowledgements to the broker for the unacked messages. -
When using client side emulation with JMS Queues and Shared Subscriptions, if a message does not match the Selector, the JMS Client negatively acknowledges the message to let another consume pick up the message.
-
When using server side filtering and Selectors, there is still a chance that a negative acknowledgement will be sent when one message in a batch does not match the filter.
Using negative acknowledgement increases the count of deliveries of the message, which can trigger routing to the dead-letter topic if the number exceedsmaxRedeliverCount
. So if you’re seeing messages in the dead-letter topic that shouldn’t be there, they might be hitting a consumer with a local selector that doesn’t match too many times.
Delayed messages
In Pulsar, delayed messages are delivered immediately, without respecting the delay, in the case of exclusive subscriptions. In order to mitigate that behavior, Starlight for JMS allows you to use shared subscriptions even where an exclusive subscription would be a better fit. The most notable case is Session.createConsumer(destination)
, which creates an unnamed subscription, and an exclusive subscription would be a better fit.
If you set jms.useExclusiveSubscriptionsForSimpleConsumers=false
the client will use a Shared subscription, and the delay is respected.
See PIP-26 for details.
Global registry of subscription names and of clientIds
The JMS specifications require a subscription name, together with the clientId
, to be globally unique in the system. In Pulsar, the subscription name is defined in the scope of the Topic, so you can use the same subscription name on two different topics, referring to distinct entities.
The most notable side effects while using Pulsar with the JMS API are:
-
Session.unsubscribe(String subscriptionName)
cannot be used, because it refers to thesubscriptionName
without a topic name. -
In instances such as changing a message selector, you must unsubscribe the old subscription and create a new subscription.
In Pulsar, Starlight for JMS can’t attach labels or metadata to subscriptions, and can’t enforce that a subscription is accessed globally using the same "message selector" and noLocal
options. Pulsar does not have the concept of clientId
, so it is not possible to prevent the existence of multiple connections with the same clientId
in the cluster. Such a check is performed only locally in the context of the JVM/Classloader execution using a static registry.
Message listeners and concurrency
The JMS specifications require a specific behavior for MessageListener
in respect to concurrency, and to support that, Starlight for JMS starts a dedicated thread per MessageListener
session.
There are also specific behaviors mandates regarding these APIs:
-
Connection/JMSContext.start()
-
Connection/JMSContext.stop()
-
Session.close()/JMSContext.close()
-
Connection/JMSContext.close()
Starlight for JMS implements its own concurrent processing model in order to obey the specs, but it cannot use the built-in facilities provider by the Pulsar client.
For CompletionListeners
, which are useful for asynchronous sending of messages, Starlight for JMS relies on the Apache Pulsar™ asynchronous API, but there are some behaviors that are yet to be enforced with respect to Session/JMSContext.close()
.
Message properties
In Pulsar properties are always of type String, but the JMS specs require support for every Java primitive type. In order to emulate that behavior for every custom property set on the message, Starlight for JMS sets an additional property that describes the original type of the property.
For instance if you set a message property my-key=1234
(integer), Starlight for JMS adds a property my-key_jmstype=integer
in order to properly reconstruct the value when the receiver calls getObjectProperty
.
The value is always serialized as string. For floating point numbers, Starlight for JMS uses Double.toString/parseString
and Float.toString/parseString
with the behavior mandated by Java specifications.
Message selectors and noLocal
subscriptions
Message selectors let you choose not to receive messages that do not meet a given condition while NoLocal
subscriptions prevents a consumer from receiving messages sent by the same connector that created the consumer itself.
Both of those features can be emulated on the client side with the following limitations:
-
For exclusive subscriptions, the message is discarded on the client and automatically acknowledged.
-
For shared subscriptions, especially on queues, the message is discarded on the client and is "negative acknowledged" in order to let other consumers receive the message.
-
For
QueueBrowsers
, the message is discarded on the client side.
Currently, the implementation of message selectors is based on Apache ActiveMQ® Java client classes, which are imported as a dependency in Starlight for JMS.
Apache ActiveMQ is licensed under Apache 2.0. |
Subscription creation
For Starlight for JMS to create subscriptions it must be granted permission, and the broker must be configured to automatically create subscriptions by setting the allowAutoSubscriptionCreation=true
parameter on the broker configuration.
For more on subscription creation, including disabling automatic subscription creation, see JMS subscriptions.
System properties and fields:
Properties processed by Starlight for JMS in a special way:
-
All properties with a name ending in
_jsmtype
: Additional properties that contain the original data type. -
JMSType
: Value for the standard fieldJMSType
. -
JMSCorrelationID
: Base64 representation of the standardJMSCorrelationID
field. -
JMSPulsarMessageType
: Type of message. -
JMSMessageId
: Logical ID of the message. -
JMSReplyTo
: Fully qualified name of the topic referred to by theJMSReplyTo
field. -
JMSReplyToType
: JMS type for theJMSReplyTo
topic. Allowed values aretopic
orqueue
(default:topic
). -
JMSDeliveryMode
: Integer value of theJMSDeliveryMode
standard field, in case it differs fromDeliveryMode.PERSISTENT
. -
JMSPriority
: Integer value of the priority requested for the message, in case it differs fromMessage.DEFAULT_PRIORITY
. -
JMSDeliveryTime
: Representation in milliseconds since the UNIX epoch of theJMSDeliveryTime
field. -
JMSXGroupID
: Mapped to thekey
of the Pulsar message. Not represented by a message property. -
JMSXGroupSeq
: Mapped to Pulsar MessagesequenceId
if it isn’t overridden with a custom value. -
JMSConnectionID
: ID of the connection.
Special message field mappings:
-
property
JMSXDeliveryCount
: Mapped to1
+ the Pulsar messageRedeliveryCount
field. -
field
JMSExpiration
: Representation in milliseconds since the UNIX epoch of the expiration date of the message. Used to emulate time to live. -
field
JMSRedelivered
: Mapped totrue
ifJMSXDeliveryCount
>1
Ignored fields:
-
JMSXUserID
-
JMSXAppID
-
JMSXProducerTXID
-
JMSXConsumerTXID
-
JMSXRcvTimestamp
-
JMSXState
For more details on JMS properties, refer to section "3.5.9. JMS defined properties" in the JMS 2.0 specifications.
Transaction Emulation
Starlight for JMS and Pulsar fully support JMS transactions, and also support emulating SESSION_TRANSACTED
behavior without actually performing the transaction’s operations.
For example, when porting a JMS application that is using SESSION_TRANSACTED
, you can emulate SESSION_TRANSACTED
behavior with the jms.emulateTransactions
feature.
In jms.emulateTransactions
mode, when a SESSION_TRANSACTED
mode is created, the Session behaves like a transacted Session but is not transactional: a produced message is sent immediately, and acknowledgements are sent during session.commit()
.
To enable transaction emulation, add "jms.emulateTransactions", "true"
, as below:
Map<String, Object> properties = new HashMap<>();
properties.put("webServiceUrl", cluster.getAddress());
properties.put("enableTransaction", "false");
properties.put("jms.emulateTransactions", "true");
Unsupported and emulated features
The JMS 2.0 specifications describes broadly a generic messaging service and defines many interfaces and services. Apache Pulsar® does not support all of the required features, and Starlight for JMS is a wrapper over the Apache Pulsar Client.
Most of the features that are not natively supported by Pulsar are emulated by Starlight for JMS, which helps in porting existing JMS based applications.
If you want to use emulated features, but the emulated behavior does not fit your needs, please open an issue in order to request an improvement for the Pulsar core. |
Feature | Supported by Pulsar | Emulated by Starlight for JMS |
---|---|---|
Message selectors |
Unsupported |
Emulated |
|
Unsupported |
Emulated |
Per message TTL |
TTL supported at topic level, not per-message |
Emulated |
Global clientId registry |
Unsupported |
Partially emulated |
Global unique subscription names |
Subscription name is unique per topic |
Partially emulated |
Temporary destinations (auto deleted when the connection is closed) |
Unsupported |
Partially emulated |
Creation of subscriptions from client |
Supported (requires relevant privileges granted to the client) |
|
Delayed messages |
Unsupported for Exclusive subscriptions |
Starlight for JMS provides an option to use shared subscriptions even in cases where an exclusive subscription would be preferred |
Message Priority |
Unsupported |
Priority is stored as property and delivered to the consumer, but ignored |
Non-Persistent Messages |
Unsupported (every message is persisted) |
|
Transactions |
Supported for the BETA of Pulsar 2.7.x |
Transactions must be enabled on the client and on the server |
|
Unsupported in Pulsar |
Emulated by storing the whole stream in a single message |
Topic vs Queue |
Unsupported |
Each destination is a Pulsar Topic; the behavior of the client depends upon which API you use |
Username/password authentication |
Unsupported |
Unsupported, but you can configure Pulsar client security features |
|
Unsupported |
The behavior of the delivery counter in Pulsar follows different semantics from JMS |
Starlight for JMS, when run using Apache Pulsar 2.7.x passes most of the TCK, except for the few tests requiring globally unique subscription names. |
Temporary destinations
Temporary destinations are created using Session.createTemporaryQueue
and Session.createTemporaryTopic
and should create a destination that is automatically deleted then the connection is closed. In Pulsar, since there is no concept of a JMS Connection, that behavior cannot be implemented.
Starlight for JMS emulates the behavior by trying to delete the destination on Connection.close()
and in ConnectionFactory.close()
but there is no guarantee that that will eventually happen, if, for instance, the client application crashes or a temporary error occurs during the deletion of the destination.
Creating a temporary destination requires the client to be allowed to create the destination and also to configure the broker to allow automatic topic creation using allowAutoTopicCreation=true .
|
Username and password authentication
Starlight for JMS currently supports only JWT (JSON Web Token) authentication, but offers an alternate method of registering authParams
when connections are created.
Setting the configuration property jms.useCredentialsFromCreateConnection=true
when creating a new connection will pass the username
and password
pair from the createConnection(username, password)
or createContext(username, password, mode)
methods into authParams
.
To set the configuration when creating a new connection or context, add jms.useCredentialsFromCreateConnection=true
as below:
Map<String, Object> configuration = new HashMap<>();
configuration.put("jms.precreateQueueSubscription", "true");
ConnectionFactory factory = new PulsarConnectionFactory(configuration);
...
factory.close();
This will pass the username
and password
pair into the PulsarConnectionFactory
constructor.
A few notes on usage:
-
The values of
username
andpassword
depend on the authentication type configured in PulsarClient. Using JWT authentication, the values are:-
username
: not used -
password
:token:XXXX
, whereXXXX
is the JWT token
-
-
Once a username/password pair is used to start a connection, you must use it for all subsequent calls.
-
Only one PulsarClient can be held by a PulsarConnectionFactory.
Using the Pulsar Schema Registry
The JMS API does not have a standard way to consume schema-driven data like AVRO, but the Pulsar client can automatically apply schema and decode messages to a specific Java Object model.
Set the useSchema
flag to true
with consumerConfig
to apply schema when consuming data, as below:
Map<String, Object> consumerConfig = new HashMap<>();
properties.put("consumerConfig", consumerConfig);
Map<String, Object> properties = new HashMap<>();
properties.put("webServiceUrl", cluster.getAddress());
consumerConfig.put("useSchema", true);
The consumer will return a specific JMS message depending on the schema type:
Schema type | JMS mapping |
---|---|
|
|
|
Maps to a |
|
|
For all other schema types, the consumer will return a |
Virtual destinations
Virtual destinations add support for Consumers to receive messages from multiple destinations, known as multi-topic subscriptions in Pulsar.
Multiple topics can be subscribed to using regular expressions or static multi-topic lists.
Pattern | Syntax | Example |
---|---|---|
RegEx |
|
|
List |
|
|
To embed a multi-topic subscription name using a JMS queue:
try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); ){
Queue queue = session.createQueue("multi:example,foo,bar:subscription");
}
For more on multi-topic subscriptions in Pulsar, see the Pulsar documentation.
JMS priority
JMS priority allows messages to be dispatched at different rates according to priority.
Priorities are mapped to Pulsar partition IDs with the jms.priorityMapping
parameter in the ConnectionFactory.
When a producer writes a message, the message is routed to a partition based on its priority mapping. By default, the higher the partition name (0 is highest priority), the higher the priority.
On the consumer side, messages are re-ordered in memory so high priority partitions are drained faster than other partitions.
Both producers and consumers advertise their priority routing behavior in metadata, so priority behavior can be detected and used by Pulsar admin for subscriptions, filters, and metrics.
Even if a consumer is not looking for a message’s priority mapping metadata, it will still consume the message.
Mapping JMS priority to partitions
The parameter jms.priorityMapping
controls how priorities are mapped to partitions.
The jms.priorityMapping
parameter can be set to linear
or non-linear
. The default is linear
.
In jms.priorityMapping = "linear"
mode, the priority level (from 0 to 9) is linearly mapped to the partition id.
If you have 10 partitions, priority 0 goes to partition 0, priority 1 to partition 1, and so on.
If you have 20 partitions, priority 0 goes to partitions 0 and 1, priority 1 goes to partitions 2 and 3, and so on.
The benefit of this approach is that you are guaranteed that each priority level is sent to different partitions.
The drawback is that if you don’t use all the priority levels, then some partitions will be left underutilized.
In jms.priorityMapping = "non-linear"
, the mapping between JMSPriority and the partition id is not linear.
The assumption is that there will be more regular messages than high/low priority messages.
Priorities are divided into 3 classes: low, default, and high priority.
-
1/4 of the partitions are for low priority messages.
-
1/2 of the partitions are for the default priority.
-
1/4 of the partitions are for high priority messages.
With non-linear partition mapping, the JMS Client will tend to:
-
use more partitions for default level priorities (especially 4, the default priority).
-
map higher level priorities to higher partition ids.
-
map lower level priorities to lower partition ids.
What’s next?
-
Starlight for JMS standalone quickstart: Create a simple command line Java JMS client that connects to a local Pulsar installation.
-
Getting started with Starlight for JMS: Create a simple command line Java JMS client that connects to an Astra Streaming instance.
-
Installing Starlight for JMS: Install Starlight for JMS in your own JMS project.
-
Mapping Pulsar concepts to JMS specifications: Understand Pulsar concepts in the context of JMS.
-
Starlight for JMS FAQs: Frequently asked questions about Starlight for JMS.
-
Starlight for JMS configuration reference: Starlight for JMS configuration reference.