DataStax Fast JMS for Apache Pulsar implementation details

A discussion of design and implementation decisions made to support JMS requirements.

Message listeners and concurrency

The JMS specifications require a specific behavior for MessageListener in respect to concurrency, and to support that, DataStax Fast JMS for Apache Pulsar starts a dedicated thread per MessageListener session.

There are also specific behaviors mandates regarding these APIs:

  • Connection/JMSContext.start()

  • Connection/JMSContext.stop()

  • Session.close()/JMSContext.close()

  • Connection/JMSContext.close()

{product_name]} implements its own concurrent processing model in order to obey the specs, but it cannot use the built-in facilities provider by the Pulsar client.

For CompletionListeners, which are useful for asynchronous sending of messages, DataStax Fast JMS for Apache Pulsar relies on the Pulsar asynchronous API, but there are some behaviors that are yet to be enforced with respect to Session/JMSContext.close().

Message properties

In Pulsar properties are always of type String, but the JMS specs require support for every Java primitive type. In order to emulate that behavior for every custom property set on the message, DataStax Fast JMS for Apache Pulsar sets an additional property that describes the original type of the property.

For instance if you set a message property my-key=1234 (integer), DataStax Fast JMS for Apache Pulsar adds a property my-key_jmstype=integer in order to properly reconstruct the value when the receiver calls getObjectProperty.

The value is always serialized as string. For floating point numbers, DataStax Fast JMS for Apache Pulsar uses Double.toString/parseString and Float.toString/parseString with the behavior mandated by Java specifications.

System properties and fields:

Properties processed by DataStax Fast JMS for Apache Pulsar in a special way:

  • All properties with a name ending in _jsmtype: Additional properties that contain the original data type.

  • JMSType: Value for the standard field JMSType.

  • JMSCorrelationID: Base64 representation of the standard JMSCorrelationID field.

  • JMSPulsarMessageType: Type of message.

  • JMSMessageId: Logical ID of the message.

  • JMSReplyTo: Fully qualified name of the topic referred to by the JMSReplyTo field.

  • JMSReplyToType: JMS type for the JMSReplyTo topic. Allowed values are topic or queue (default: topic).

  • JMSDeliveryMode: Integer value of the JMSDeliveryMode standard field, in case it differs from DeliveryMode.PERSISTENT.

  • JMSPriority: Integer value of the priority requested for the message, in case it differs from Message.DEFAULT_PRIORITY.

  • JMSDeliveryTime: Representation in milliseconds since the UNIX epoch of the JMSDeliveryTime field.

  • JMSXGroupID: Mapped to the key of the Pulsar message. Not represented by a message property.

  • JMSXGroupSeq: Mapped to Pulsar Message sequenceId if it isn’t overridden with a custom value.

  • JMSConnectionID: ID of the connection.

Special message field mappings:

  • property JMSXDeliveryCount: Mapped to 1 + the Pulsar message RedeliveryCount field.

  • field JMSExpiration: Representation in milliseconds since the UNIX epoch of the expiration date of the message. Used to emulate time to live.

  • field JMSRedelivered: Mapped to true if JMSXDeliveryCount > 1

Ignored fields:

  • JMSXUserID

  • JMSXAppID

  • JMSXProducerTXID

  • JMSXConsumerTXID

  • JMSXRcvTimestamp

  • JMSXState

For more details on JMS properties, refer to section "3.5.9. JMS defined properties" in the JMS 2.0 specifications.

Client identifiers

Each connection may have a client identifier that can be set programmatically or configured administratively using the jms.clientId configuration parameters. Client identifiers must be globally unique but there is no way to enforce that constraint on a Pulsar cluster.

When you set a clientId, the actual subscription name in Pulsar is constructed as clientId + '_' + subscriptionName.

Unsupported and emulated features

The JMS 2.0 specifications describes broadly a generic messaging service and defines many interfaces and services. Apache Pulsar does not support all of the required features, and DataStax Fast JMS for Apache Pulsar is a wrapper over the Apache Pulsar Client.

Most of the features that are not natively supported by Pulsar are emulated by DataStax Fast JMS for Apache Pulsar, which helps in porting existing JMS based applications.

If you want to use emulated features, but the emulated behavior does not fit your needs, please open an issue in order to request an improvement for the Pulsar core.
Feature Supported by Pulsar Emulated by {product_name}

Message selectors

Unsupported

Emulated

NoLocal subscriptions

Unsupported

Emulated

Per message TTL

TTL supported at topic level, not per-message

Emulated

Global clientId registry

Unsupported

Partially emulated

Global unique subscription names

Subscription name is unique per topic

Partially emulated

Temporary destinations (auto deleted when the connection is closed)

Unsupported

Partially emulated

Creation of subscriptions from client

Supported (requires relevant privileges granted to the client)

Delayed messages

Unsupported for Exclusive subscriptions

DataStax Fast JMS for Apache Pulsar provides an option to use shared subscriptions even in cases where an exclusive subscription would be preferred

Message Priority

Unsupported

Priority is stored as property and delivered to the consumer, but ignored

Non-Persistent Messages

Unsupported (every message is persisted)

DeliveryMode.NON_PERSISTENT is stored as property and delivered to the consumer, but ignored

Transactions

Supported for the BETA of Pulsar 2.7.x

Transactions must be enabled on the client and on the server

StreamMessage

Unsupported in Pulsar

Emulated by storing the whole stream in a single message

Topic vs Queue

Unsupported

Each destination is a Pulsar Topic; the behavior of the client depends upon which API you use

Username/password authentication

Unsupported

Unsupported, but you can configure Pulsar client security features

JMSXDeliveryCount/JMSRedelivered

Unsupported

The behavior of the delivery counter in Pulsar follows different semantics from JMS

DataStax Fast JMS for Apache Pulsar, when run using Apache Pulsar 2.7.x passes most of the TCK, except for the few tests requiring globally unique subscription names.

Message selectors and noLocal subscriptions

Message selectors let you choose not to receive messages that do not meet a given condition while NoLocal subscriptions prevents a consumer from receiving messages sent by the same connector that created the consumer itself.

Both of those features can be emulated on the client side with the following limitations:

  • For exclusive subscriptions, the message is discarded on the client and automatically acknowledged.

  • For shared subscriptions, especially on queues, the message is discarded on the client and is "negative acknowledged" in order to let other consumers receive the message.

  • For QueueBrowsers, the message is discarded on the client side.

Currently, the implementation of message selectors is based on Apache ActiveMQ® Java client classes, which are imported as dependency in DataStax Fast JMS for Apache Pulsar.

Apache ActiveMQ is licensed under Apache 2.0.

Global registry of subscription names and of clientIds

The JMS specifications require a subscription name, together with the clientId, to be globally unique in the system. In Pulsar, the subscription name is defined in the scope of the Topic, so you can use the same subscription name on two different topics, referring to distinct entities.

The most notable side effects while using Pulsar with the JMS API are:

  • Session.unsubscribe(String subscriptionName) cannot be used, because it refers to the subscriptionName without a topic name.

  • In instances such as changing a message selector, you must unsubscribe the old subscription and create a new subscription.

In Pulsar, DataStax Fast JMS for Apache Pulsar can’t attach labels or metadata to subscriptions, and can’t enforce that a subscription is accessed globally using the same "message selector" and noLocal options. Pulsar does not have the concept of clientId, so it is not possible to prevent the existence of multiple connections with the same clientId in the cluster. Such a check is performed only locally in the context of the JVM/Classloader execution using a static registry.

Delayed messages

In Pulsar 2.7.x, delayed messages are delivered immediately, without respecting the delay, in the case of exclusive subscriptions. In order to mitigate that behavior, DataStax Fast JMS for Apache Pulsar allows you to use shared subscriptions even where an exclusive subscription would be a better fit. The most notable case is Session.createConsumer(destination), which creates an unnamed subscription, and an exclusive subscription would be a better fit.

If you set jms.useExclusiveSubscriptionsForSimpleConsumers=false the client will use a Shared subscription, and the delay is respected.

See PIP-26 for details.

Temporary destinations

Temporary destinations are created using Session.createTemporaryQueue and Session.createTemporaryTopic and should create a destination that is automatically deleted then the connection is closed. In Pulsar, since there is no concept of a JMS Connection, that behavior cannot be implemented.

DataStax Fast JMS for Apache Pulsar emulates the behavior by trying to delete the destination on Connection.close() and in ConnectionFactory.close() but there is no guarantee that that will eventually happen, if, for instance, the client application crashes or a temporary error occurs during the deletion of the destination.

Creating a temporary destination requires the client to be allowed to create the destination and also to configure the broker to allow automatic topic creation using allowAutoTopicCreation=true.

Subscription creation

In order for DataStax Fast JMS for Apache Pulsar to create subscriptions it must be granted permission, and the broker must be configured to automatically create subscriptions by setting the allowAutoSubscriptionCreation=true parameter on the broker configuration.

Next