Starlight for JMS implementation details

Client identifiers

Each connection may have a client identifier that can be set programmatically or configured administratively using the jms.clientId configuration parameters. Client identifiers must be globally unique but there is no way to enforce that constraint on a Pulsar cluster.

When you set a clientId, the actual subscription name in Pulsar is constructed as clientId + '_' + subscriptionName.

Dead letter topic configuration

Dead letter policy controls Pulsar’s handling of messages that can’t be processed successfully. Under normal processing conditions, these messages cannot be processed successfully and the consumer cannot move to the next message.

One solution for this situation is the dead letter topic policy. The dead-letter topic policy routes the unsuccessful messages to the dead-letter topic for storage, so consumers can keep consuming while the unsuccessful messages are stored in the dead-letter topic for processing or evaluation.

Starlight for JMS allows further configuration of deadLetterPolicy. For more on deadLetterTopic configuration properties, see Starlight for JMS configuration reference.

Map<String, Object> deadLetterPolicy = new HashMap<>();
    consumerConfig.put("deadLetterPolicy", deadLetterPolicy);
    deadLetterPolicy.put("maxRedeliverCount", 1);
    deadLetterPolicy.put("deadLetterTopic", deadLetterTopic);
    deadLetterPolicy.put("initialSubscriptionName", "dqlsub");

Dead letter topic triggers

A message being written to the dead-letter topic is triggered in one of two ways:

Acknowledgement timeout

In an acknowledgement timeout, the broker fails to receive an acknowledgement from a consumer in the amount of time specified in ackTimeout.
The ackTimeoutRedeliveryBackoff values control the time between the expiration of ackTimeout and the client sending a redeliver unacknowledged messages request to the broker.

 Map<String, Object> ackTimeoutRedeliveryBackoff = new HashMap<>();
    consumerConfig.put("ackTimeoutRedeliveryBackoff", ackTimeoutRedeliveryBackoff);
    ackTimeoutRedeliveryBackoff.put("minDelayMs", 10);
    ackTimeoutRedeliveryBackoff.put("maxDelayMs", 100);
    ackTimeoutRedeliveryBackoff.put("multiplier", 2.0);

Negative acknowledgement

In a negative acknowledgement, the broker receives a negative acknowledgement indicating that a consumer did not process a message, so the broker tries re-delivering the message.
When the number of re-deliveries exceeds the value specified in maxRedeliverCount, the message is routed to the dead letter topic.
The negativeAckRedeliveryBackoff values control the time between the broker receiving the negative acknowledgement and attempting redelivery.

Map<String, Object> negativeAckRedeliveryBackoff = new HashMap<>();
    consumerConfig.put("negativeAckRedeliveryBackoff", negativeAckRedeliveryBackoff);
    negativeAckRedeliveryBackoff.put("minDelayMs", 10);
    negativeAckRedeliveryBackoff.put("maxDelayMs", 100);
    negativeAckRedeliveryBackoff.put("multiplier", 2.0);

A few usage notes on negative acknowledgement in Starlight for JMS:

  • Calling Session.recover() stops delivery in the session, restarts message delivery with the oldest unacked message, and sends negative acknowledgements to the broker for the unacked messages.

  • When using client side emulation with JMS Queues and Shared Subscriptions, if a message does not match the Selector, the JMS Client negatively acknowledges the message to let another consume pick up the message.

  • When using server side filtering and Selectors, there is still a chance that a negative acknowledgement will be sent when one message in a batch does not match the filter.
    Using negative acknowledgement increases the count of deliveries of the message, which can trigger routing to the dead-letter topic if the number exceeds maxRedeliverCount. So if you’re seeing messages in the dead-letter topic that shouldn’t be there, they might be hitting a consumer with a local selector that doesn’t match too many times.

Delayed messages

In Pulsar, delayed messages are delivered immediately, without respecting the delay, in the case of exclusive subscriptions. In order to mitigate that behavior, Starlight for JMS allows you to use shared subscriptions even where an exclusive subscription would be a better fit. The most notable case is Session.createConsumer(destination), which creates an unnamed subscription, and an exclusive subscription would be a better fit.

If you set jms.useExclusiveSubscriptionsForSimpleConsumers=false the client will use a Shared subscription, and the delay is respected.

See PIP-26 for details.

Global registry of subscription names and of clientIds

The JMS specifications require a subscription name, together with the clientId, to be globally unique in the system. In Pulsar, the subscription name is defined in the scope of the Topic, so you can use the same subscription name on two different topics, referring to distinct entities.

The most notable side effects while using Pulsar with the JMS API are:

  • Session.unsubscribe(String subscriptionName) cannot be used, because it refers to the subscriptionName without a topic name.

  • In instances such as changing a message selector, you must unsubscribe the old subscription and create a new subscription.

In Pulsar, Starlight for JMS can’t attach labels or metadata to subscriptions, and can’t enforce that a subscription is accessed globally using the same "message selector" and noLocal options. Pulsar does not have the concept of clientId, so it is not possible to prevent the existence of multiple connections with the same clientId in the cluster. Such a check is performed only locally in the context of the JVM/Classloader execution using a static registry.

Message listeners and concurrency

The JMS specifications require a specific behavior for MessageListener in respect to concurrency, and to support that, Starlight for JMS starts a dedicated thread per MessageListener session.

There are also specific behaviors mandates regarding these APIs:

  • Connection/JMSContext.start()

  • Connection/JMSContext.stop()

  • Session.close()/JMSContext.close()

  • Connection/JMSContext.close()

Starlight for JMS implements its own concurrent processing model in order to obey the specs, but it cannot use the built-in facilities provider by the Pulsar client.

For CompletionListeners, which are useful for asynchronous sending of messages, Starlight for JMS relies on the Apache Pulsar™ asynchronous API, but there are some behaviors that are yet to be enforced with respect to Session/JMSContext.close().

Message properties

In Pulsar properties are always of type String, but the JMS specs require support for every Java primitive type. In order to emulate that behavior for every custom property set on the message, Starlight for JMS sets an additional property that describes the original type of the property.

For instance if you set a message property my-key=1234 (integer), Starlight for JMS adds a property my-key_jmstype=integer in order to properly reconstruct the value when the receiver calls getObjectProperty.

The value is always serialized as string. For floating point numbers, Starlight for JMS uses Double.toString/parseString and Float.toString/parseString with the behavior mandated by Java specifications.

Message selectors and noLocal subscriptions

Message selectors let you choose not to receive messages that do not meet a given condition while NoLocal subscriptions prevents a consumer from receiving messages sent by the same connector that created the consumer itself.

Both of those features can be emulated on the client side with the following limitations:

  • For exclusive subscriptions, the message is discarded on the client and automatically acknowledged.

  • For shared subscriptions, especially on queues, the message is discarded on the client and is "negative acknowledged" in order to let other consumers receive the message.

  • For QueueBrowsers, the message is discarded on the client side.

Currently, the implementation of message selectors is based on Apache ActiveMQ® Java client classes, which are imported as a dependency in Starlight for JMS.

Apache ActiveMQ is licensed under Apache 2.0.

Subscription creation

For Starlight for JMS to create subscriptions it must be granted permission, and the broker must be configured to automatically create subscriptions by setting the allowAutoSubscriptionCreation=true parameter on the broker configuration.

For more on subscription creation, including disabling automatic subscription creation, see JMS subscriptions.

System properties and fields:

Properties processed by Starlight for JMS in a special way:

  • All properties with a name ending in _jsmtype: Additional properties that contain the original data type.

  • JMSType: Value for the standard field JMSType.

  • JMSCorrelationID: Base64 representation of the standard JMSCorrelationID field.

  • JMSPulsarMessageType: Type of message.

  • JMSMessageId: Logical ID of the message.

  • JMSReplyTo: Fully qualified name of the topic referred to by the JMSReplyTo field.

  • JMSReplyToType: JMS type for the JMSReplyTo topic. Allowed values are topic or queue (default: topic).

  • JMSDeliveryMode: Integer value of the JMSDeliveryMode standard field, in case it differs from DeliveryMode.PERSISTENT.

  • JMSPriority: Integer value of the priority requested for the message, in case it differs from Message.DEFAULT_PRIORITY.

  • JMSDeliveryTime: Representation in milliseconds since the UNIX epoch of the JMSDeliveryTime field.

  • JMSXGroupID: Mapped to the key of the Pulsar message. Not represented by a message property.

  • JMSXGroupSeq: Mapped to Pulsar Message sequenceId if it isn’t overridden with a custom value.

  • JMSConnectionID: ID of the connection.

Special message field mappings:

  • property JMSXDeliveryCount: Mapped to 1 + the Pulsar message RedeliveryCount field.

  • field JMSExpiration: Representation in milliseconds since the UNIX epoch of the expiration date of the message. Used to emulate time to live.

  • field JMSRedelivered: Mapped to true if JMSXDeliveryCount > 1

Ignored fields:

  • JMSXUserID

  • JMSXAppID

  • JMSXProducerTXID

  • JMSXConsumerTXID

  • JMSXRcvTimestamp

  • JMSXState

For more details on JMS properties, refer to section "3.5.9. JMS defined properties" in the JMS 2.0 specifications.

Transaction Emulation

Starlight for JMS and Pulsar fully support JMS transactions, and also support emulating SESSION_TRANSACTED behavior without actually performing the transaction’s operations.

For example, when porting a JMS application that is using SESSION_TRANSACTED, you can emulate SESSION_TRANSACTED behavior with the jms.emulateTransactions feature.

In jms.emulateTransactions mode, when a SESSION_TRANSACTED mode is created, the Session behaves like a transacted Session but is not transactional: a produced message is sent immediately, and acknowledgements are sent during session.commit().

To enable transaction emulation, add "jms.emulateTransactions", "true", as below:

Map<String, Object> properties = new HashMap<>();
    properties.put("webServiceUrl", cluster.getAddress());
    properties.put("enableTransaction", "false");
    properties.put("jms.emulateTransactions", "true");

Unsupported and emulated features

The JMS 2.0 specifications describes broadly a generic messaging service and defines many interfaces and services. Apache Pulsar® does not support all of the required features, and Starlight for JMS is a wrapper over the Apache Pulsar Client.

Most of the features that are not natively supported by Pulsar are emulated by Starlight for JMS, which helps in porting existing JMS based applications.

If you want to use emulated features, but the emulated behavior does not fit your needs, please open an issue in order to request an improvement for the Pulsar core.
Feature Supported by Pulsar Emulated by Starlight for JMS

Message selectors

Unsupported

Emulated

NoLocal subscriptions

Unsupported

Emulated

Per message TTL

TTL supported at topic level, not per-message

Emulated

Global clientId registry

Unsupported

Partially emulated

Global unique subscription names

Subscription name is unique per topic

Partially emulated

Temporary destinations (auto deleted when the connection is closed)

Unsupported

Partially emulated

Creation of subscriptions from client

Supported (requires relevant privileges granted to the client)

Delayed messages

Unsupported for Exclusive subscriptions

Starlight for JMS provides an option to use shared subscriptions even in cases where an exclusive subscription would be preferred

Message Priority

Unsupported

Priority is stored as property and delivered to the consumer, but ignored

Non-Persistent Messages

Unsupported (every message is persisted)

DeliveryMode.NON_PERSISTENT is stored as property and delivered to the consumer, but ignored

Transactions

Supported for the BETA of Pulsar 2.7.x

Transactions must be enabled on the client and on the server

StreamMessage

Unsupported in Pulsar

Emulated by storing the whole stream in a single message

Topic vs Queue

Unsupported

Each destination is a Pulsar Topic; the behavior of the client depends upon which API you use

Username/password authentication

Unsupported

Unsupported, but you can configure Pulsar client security features

JMSXDeliveryCount/JMSRedelivered

Unsupported

The behavior of the delivery counter in Pulsar follows different semantics from JMS

Starlight for JMS, when run using Apache Pulsar 2.7.x passes most of the TCK, except for the few tests requiring globally unique subscription names.

Temporary destinations

Temporary destinations are created using Session.createTemporaryQueue and Session.createTemporaryTopic and should create a destination that is automatically deleted then the connection is closed. In Pulsar, since there is no concept of a JMS Connection, that behavior cannot be implemented.

Starlight for JMS emulates the behavior by trying to delete the destination on Connection.close() and in ConnectionFactory.close() but there is no guarantee that that will eventually happen, if, for instance, the client application crashes or a temporary error occurs during the deletion of the destination.

Creating a temporary destination requires the client to be allowed to create the destination and also to configure the broker to allow automatic topic creation using allowAutoTopicCreation=true.

Username and password authentication

Starlight for JMS currently supports only JWT (JSON Web Token) authentication, but offers an alternate method of registering authParams when connections are created.

Setting the configuration property jms.useCredentialsFromCreateConnection=true when creating a new connection will pass the username and password pair from the createConnection(username, password) or createContext(username, password, mode) methods into authParams.

To set the configuration when creating a new connection or context, add jms.useCredentialsFromCreateConnection=true as below:

Map<String, Object> configuration = new HashMap<>();
configuration.put("jms.precreateQueueSubscription", "true");
ConnectionFactory factory = new PulsarConnectionFactory(configuration);
...
factory.close();

This will pass the username and password pair into the PulsarConnectionFactory constructor.

A few notes on usage:

  • The values of username and password depend on the authentication type configured in PulsarClient. Using JWT authentication, the values are:

    • username: not used

    • password: token:XXXX, where XXXX is the JWT token

  • Once a username/password pair is used to start a connection, you must use it for all subsequent calls.

  • Only one PulsarClient can be held by a PulsarConnectionFactory.

Using the Pulsar Schema Registry

The JMS API does not have a standard way to consume schema-driven data like AVRO, but the Pulsar client can automatically apply schema and decode messages to a specific Java Object model.

Set the useSchema flag to true with consumerConfig to apply schema when consuming data, as below:

Map<String, Object> consumerConfig = new HashMap<>();
properties.put("consumerConfig", consumerConfig);

Map<String, Object> properties = new HashMap<>();
properties.put("webServiceUrl", cluster.getAddress());
consumerConfig.put("useSchema", true);

The consumer will return a specific JMS message depending on the schema type:

Schema type JMS mapping

AVRO

MapMessage (nesting is supported)

KeyValue

Maps to a MapMessage with KeyValue<AVRO, AVRO>, which maps to a MapMessage with two entries, 'key' and 'value'

STRING

TextMessage

For all other schema types, the consumer will return a StringObject.

Virtual destinations

Virtual destinations add support for Consumers to receive messages from multiple destinations, known as multi-topic subscriptions in Pulsar.

Multiple topics can be subscribed to using regular expressions or static multi-topic lists.

Pattern Syntax Example

RegEx

regex:topicnamepattern

regex:example.* This will subscribe to all the topic with a name that starts with "example".

List

multi:topic1,topic2,topic3

multi:example,foo,bar This will subscribe to the topics named "example", "foo" and "bar"

To embed a multi-topic subscription name using a JMS queue:

try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); ){
    Queue queue = session.createQueue("multi:example,foo,bar:subscription");
}

For more on multi-topic subscriptions in Pulsar, see the Pulsar documentation.

JMS priority

JMS priority allows messages to be dispatched at different rates according to priority.
Priorities are mapped to Pulsar partition IDs with the jms.priorityMapping parameter in the ConnectionFactory.

When a producer writes a message, the message is routed to a partition based on its priority mapping. By default, the higher the partition name (0 is highest priority), the higher the priority.
On the consumer side, messages are re-ordered in memory so high priority partitions are drained faster than other partitions.
Both producers and consumers advertise their priority routing behavior in metadata, so priority behavior can be detected and used by Pulsar admin for subscriptions, filters, and metrics.
Even if a consumer is not looking for a message’s priority mapping metadata, it will still consume the message.

Mapping JMS priority to partitions

The parameter jms.priorityMapping controls how priorities are mapped to partitions.

The jms.priorityMapping parameter can be set to linear or non-linear. The default is linear.

In jms.priorityMapping = "linear" mode, the priority level (from 0 to 9) is linearly mapped to the partition id.
If you have 10 partitions, priority 0 goes to partition 0, priority 1 to partition 1, and so on.
If you have 20 partitions, priority 0 goes to partitions 0 and 1, priority 1 goes to partitions 2 and 3, and so on.

The benefit of this approach is that you are guaranteed that each priority level is sent to different partitions.
The drawback is that if you don’t use all the priority levels, then some partitions will be left underutilized.

In jms.priorityMapping = "non-linear", the mapping between JMSPriority and the partition id is not linear. The assumption is that there will be more regular messages than high/low priority messages.

Priorities are divided into 3 classes: low, default, and high priority.

  • 1/4 of the partitions are for low priority messages.

  • 1/2 of the partitions are for the default priority.

  • 1/4 of the partitions are for high priority messages.

With non-linear partition mapping, the JMS Client will tend to:

  • use more partitions for default level priorities (especially 4, the default priority).

  • map higher level priorities to higher partition ids.

  • map lower level priorities to lower partition ids.

What’s next?

Was this helpful?

Give Feedback

How can we improve the documentation?

© 2024 DataStax | Privacy policy | Terms of use

Apache, Apache Cassandra, Cassandra, Apache Tomcat, Tomcat, Apache Lucene, Apache Solr, Apache Hadoop, Hadoop, Apache Pulsar, Pulsar, Apache Spark, Spark, Apache TinkerPop, TinkerPop, Apache Kafka and Kafka are either registered trademarks or trademarks of the Apache Software Foundation or its subsidiaries in Canada, the United States and/or other countries. Kubernetes is the registered trademark of the Linux Foundation.

General Inquiries: +1 (650) 389-6000, info@datastax.com