Key shared subscriptions in Pulsar
Subscriptions in Pulsar describe which consumers are consuming data from a topic and how they want to consume that data.
Pulsar’s shared subscription model can increase the message processing rate, but it risks losing message ordering guarantees. In a round-robin delivery, there’s no way for the broker to know which messages are going to which consumer.
Key shared subscriptions allow multiple consumers to subscribe to a topic, and provide additional metadata in the form of keys that link messages to specific consumers.
Keys are generated with hashing that converts arbitrary values like topic-name
or JSON blobs into fixed integer values, and then the hashed values are assigned to subscribed consumers in one of two ways:
-
Auto hash: Uses consistent hashing to balance range values across available consumers without requiring manual setup of hash ranges.
-
Sticky hash: The client manually assigns consumer range values, and then all hashes within a configured range go to one consumer.
This page explains how to use Pulsar’s key shared subscription model to manage your topic consumption.
Prerequisites
This example requires the following:
-
Astra Streaming access with at least one streaming tenant and one topic
-
A local clone of the DataStax Pulsar Subscription Example repository
-
In the
pulsar-subscription-example
repo, navigate tosrc/main/resources
, and then editapplication.properties
to connect to your Astra Streaming cluster:application.propertiesservice_url=BROKER_SERVICE_URL namespace=default tenant_name=my-tenant authentication_token=ASTRA_DB_APPLICATION_TOKEN topic_name=my-topic
Key shared subscription example
-
To try out a Pulsar key shared subscription, add
.subscriptionType(SubscriptionType.Key_Shared)
to thepulsarConsumer
inSimplePulsarConsumer.java
:SimplePulsarConsumer.javapulsarConsumer = pulsarClient.newConsumer(Schema.JSON(DemoBean.class)) .topic("persistent://" + conf.getTenantName() + "/" + conf.getNamespace() + "/" + conf.getTopicName()) .startMessageIdInclusive() .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) .subscriptionName("SimplePulsarConsumer") .subscriptionType(SubscriptionType.Key_Shared) .keySharedPolicy(KeySharedPolicy.autoSplitHashRange()) .subscribe();
The
keySharedPolicy
defines how hashed values are assigned to subscribed consumers.The above example uses
autoSplitHashRange
, which is an auto-hashing policy. Running multiple consumers with auto-hashing balances the messaging load across all available consumers, like a shared subscription.If you want to set a fixed hash range, use
KeySharedPolicy.stickyHashRange()
, as demonstrated in the following steps. -
To use a sticky hashed key shared subscription, import the following classes to
SimplePulsarConsumer.java
:SimplePulsarConsumer.javaimport org.apache.pulsar.client.api.Range; import org.apache.pulsar.client.api.KeySharedPolicy; import org.apache.pulsar.client.api.SubscriptionType;
-
Add the following classes to
SimplePulsarProducer.java
:SimplePulsarProducer.javaimport org.apache.pulsar.client.api.BatcherBuilder; import org.apache.pulsar.client.api.HashingScheme;
-
In
SimplePulsarProducer.java
, modify thepulsarProducer
to use theJavaStringHash
hashing scheme:SimplePulsarProducer.javapulsarProducer = pulsarClient .newProducer(Schema.JSON(DemoBean.class)) .topic("persistent://" + conf.getTenantName() + "/" + conf.getNamespace() + "/" + conf.getTopicName()) .batcherBuilder(BatcherBuilder.KEY_BASED) .hashingScheme(HashingScheme.JavaStringHash) .create();
-
In
SimplePulsarConsumer.java
, modify thepulsarConsumer
to use sticky hashing. This example sets all possible hashes (0-65535
) on this subscription to one consumer.SimplePulsarConsumer.java.subscriptionType(SubscriptionType.Key_Shared) .keySharedPolicy(KeySharedPolicy.stickyHashRange().ranges(Range.of(0,65535)))
-
In the
pulsar-examples
project, runSimplePulsarConsumer.java
to begin consuming messages.The confirmation message and a cursor appear to indicate the consumer is ready:
Result[main] INFO com.datastax.pulsar.Configuration - Configuration has been loaded successfully ... [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConsumerImpl - [persistent://<tenant_name>/<namespace>/in][SimplePulsarConsumer] Subscribed to topic on <service_url> -- consumer: 0
-
In a new terminal window, run
SimplePulsarProducer.java
to begin producing messages:Result[main] INFO com.datastax.pulsar.SimplePulsarProducer - Message 55794190 sent [main] INFO com.datastax.pulsar.SimplePulsarProducer - Message 41791865 sent [main] INFO com.datastax.pulsar.SimplePulsarProducer - Message 74840732 sent [main] INFO com.datastax.pulsar.SimplePulsarProducer - Message 57467766 sent
In the
SimplePulsarConsumer
terminal, the consumer begins receiving messages:Result[main] INFO com.datastax.pulsar.SimplePulsarConsumer - Message received: {"show_id":55794190,"cast":"LeBron James, Anthony Davis, Kyrie Irving, Damian Lillard, Klay Thompson...","country":"United States","date_added":"July 16, 2021","description":"NBA superstar LeBron James teams up with Bugs Bunny and the rest of the Looney Tunes for this long-awaited sequel.","director":"Malcolm D. Lee","duration":"120 min","listed_in":"Animation, Adventure, Comedy","rating":"PG","release_year":2021,"title":"Space Jam: A New Legacy","type":"Movie"} [main] INFO com.datastax.pulsar.SimplePulsarConsumer - Message received: {"show_id":41791865,"cast":"LeBron James, Anthony Davis, Kyrie Irving, Damian Lillard, Klay Thompson...","country":"United States","date_added":"July 16, 2021","description":"NBA superstar LeBron James teams up with Bugs Bunny and the rest of the Looney Tunes for this long-awaited sequel.","director":"Malcolm D. Lee","duration":"120 min","listed_in":"Animation, Adventure, Comedy","rating":"PG","release_year":2021,"title":"Space Jam: A New Legacy","type":"Movie"}
-
In a new terminal window, try to run a new instance of
SimplePulsarConsumer.java
.The new consumer can’t subscribe to the topic because the
SimplePulsarConsumer
configuration reserved the entire hash range for the first consumer:Result[main] INFO com.datastax.pulsar.Configuration - Configuration has been loaded successfully [main] INFO com.datastax.pulsar.SimplePulsarConsumer - SimplePulsarProducer has been stopped. Exception in thread "main" java.lang.IllegalStateException: Cannot connect to pulsar at com.datastax.pulsar.SimplePulsarConsumer.main(SimplePulsarConsumer.java:59) Caused by: org.apache.pulsar.client.api.PulsarClientException$ConsumerAssignException: {"errorMsg":"Range conflict with consumer Consumer{subscription=PersistentSubscription{topic=persistent://<tenant>/<namespace>/in, name=SimplePulsarConsumer}, consumerId=0, consumerName=5825b, address=/...}","reqId":1243883448178735299, "remote":"<service_url>", "local":"/192.168.0.95:56512"} at org.apache.pulsar.client.api.PulsarClientException.unwrap(PulsarClientException.java:1060) at org.apache.pulsar.client.impl.ConsumerBuilderImpl.subscribe(ConsumerBuilderImpl.java:101) at com.datastax.pulsar.SimplePulsarConsumer.main(SimplePulsarConsumer.java:47)
-
To run multiple consumers with sticky hashing, modify the
SimplePulsarConsumer.java
configuration to split the hash range between consumers or use auto-hashing. Then, you can launch multiple instances ofSimplePulsarConsumer.java
to consume messages from different hash ranges.