Key_Shared subscriptions in Pulsar
Subscriptions in Pulsar describe which consumers are consuming data from a topic and how they want to consume that data.
Pulsar’s shared subscription model trades an increased message processing rate for ordering guarantees. In a round-robin delivery, there’s no way for the broker to know which messages are going to which consumer.
Key_Shared subscriptions allow multiple consumers to subscribe to a topic, but with additional metadata in the form of keys which link messages to specific consumers.
Keys are generated with hashing, which converts arbitrary values like "topic-name" or JSON blobs into fixed integer values. These hashed values are then assigned to subscribed consumers in one of two ways:
-
Auto hash - uses consistent hashing to balance range values across available consumers, without requiring manual setup of hash ranges.
-
Sticky hash - the client manually assigns consumer range values. All hashes within a configured range go to one consumer.
This document explains how to use Pulsar’s Key_Shared subscription model to manage your topic consumption.
Prerequisites
To run this example, you’ll need:
-
A configured Astra Streaming instance with at least one streaming tenant and one topic. See the Astra Streaming quick start for instructions.
-
A local clone of the DataStax Pulsar Subscription Example repository
-
Modify the
src/main/resources/application.properties
in thepulsar-subscription-example
repo to connect to your Astra Streaming cluster, as below:service_url={broker-service-url} namespace=default tenant_name=my-tenant authentication_token={astra-auth-token} topic_name=my-topic
Key_Shared subscription example
To try out a Pulsar Key_Shared subscription, add .subscriptionType(SubscriptionType.Key_Shared)
to the pulsarConsumer
in SimplePulsarConsumer.java
.
You must also tell Pulsar what keySharedPolicy
this subscription will use. The example below uses the auto-hashing keySharedPolicy
.
pulsarConsumer = pulsarClient.newConsumer(Schema.JSON(DemoBean.class))
.topic("persistent://"
+ conf.getTenantName() + "/"
+ conf.getNamespace() + "/"
+ conf.getTopicName())
.startMessageIdInclusive()
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscriptionName("SimplePulsarConsumer")
.subscriptionType(SubscriptionType.Key_Shared)
.keySharedPolicy(KeySharedPolicy.autoSplitHashRange())
.subscribe();
Running multiple consumers with auto-hashing balances the messaging load across all available consumers.
Manually set stickyHashRange
You can manually set a hash range with KeySharedPolicy.stickyHashRange()
.
To test out sticky hashed Key_Shared subscriptions, you need to first import some additional classes.
-
Add the following classes to
SimplePulsarConsumer.java
:import org.apache.pulsar.client.api.Range; import org.apache.pulsar.client.api.KeySharedPolicy; import org.apache.pulsar.client.api.SubscriptionType;
-
Add the following classes to
SimplePulsarProducer.java
:import org.apache.pulsar.client.api.BatcherBuilder; import org.apache.pulsar.client.api.HashingScheme;
-
Modify the
pulsarProducer
inSimplePulsarProducer.java
to use theJavaStringHash
hashing scheme.pulsarProducer = pulsarClient .newProducer(Schema.JSON(DemoBean.class)) .topic("persistent://" + conf.getTenantName() + "/" + conf.getNamespace() + "/" + conf.getTopicName()) .batcherBuilder(BatcherBuilder.KEY_BASED) .hashingScheme(HashingScheme.JavaStringHash) .create();
-
Modify the
pulsarConsumer
inSimplePulsarConsumer.java
to use sticky hashing. This example sets all possible hashes (0-65535) on this subscription to this consumer..subscriptionType(SubscriptionType.Key_Shared) .keySharedPolicy(KeySharedPolicy.stickyHashRange().ranges(Range.of(0,65535)))
-
Open
pulsar-examples
in the IDE of your choice and runSimplePulsarConsumer.java
to begin consuming messages.
The confirmation message and a cursor appear to indicate the consumer is ready.[main] INFO com.datastax.pulsar.Configuration - Configuration has been loaded successfully ... [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConsumerImpl - [persistent://<tenant_name>/<namespace>/in][SimplePulsarConsumer] Subscribed to topic on <service_url> -- consumer: 0
-
In a new terminal window, run
SimplePulsarProducer.java
to begin producing messages.[main] INFO com.datastax.pulsar.SimplePulsarProducer - Message 55794190 sent [main] INFO com.datastax.pulsar.SimplePulsarProducer - Message 41791865 sent [main] INFO com.datastax.pulsar.SimplePulsarProducer - Message 74840732 sent [main] INFO com.datastax.pulsar.SimplePulsarProducer - Message 57467766 sent
-
The consumer begins receiving messages.
[main] INFO com.datastax.pulsar.SimplePulsarConsumer - Message received: {"show_id":55794190,"cast":"LeBron James, Anthony Davis, Kyrie Irving, Damian Lillard, Klay Thompson...","country":"United States","date_added":"July 16, 2021","description":"NBA superstar LeBron James teams up with Bugs Bunny and the rest of the Looney Tunes for this long-awaited sequel.","director":"Malcolm D. Lee","duration":"120 min","listed_in":"Animation, Adventure, Comedy","rating":"PG","release_year":2021,"title":"Space Jam: A New Legacy","type":"Movie"} [main] INFO com.datastax.pulsar.SimplePulsarConsumer - Message received: {"show_id":41791865,"cast":"LeBron James, Anthony Davis, Kyrie Irving, Damian Lillard, Klay Thompson...","country":"United States","date_added":"July 16, 2021","description":"NBA superstar LeBron James teams up with Bugs Bunny and the rest of the Looney Tunes for this long-awaited sequel.","director":"Malcolm D. Lee","duration":"120 min","listed_in":"Animation, Adventure, Comedy","rating":"PG","release_year":2021,"title":"Space Jam: A New Legacy","type":"Movie"}
-
Open a new terminal window and try to run
SimplePulsarConsumer.java
.[main] INFO com.datastax.pulsar.Configuration - Configuration has been loaded successfully [main] INFO com.datastax.pulsar.SimplePulsarConsumer - SimplePulsarProducer has been stopped. Exception in thread "main" java.lang.IllegalStateException: Cannot connect to pulsar at com.datastax.pulsar.SimplePulsarConsumer.main(SimplePulsarConsumer.java:59) Caused by: org.apache.pulsar.client.api.PulsarClientException$ConsumerAssignException: {"errorMsg":"Range conflict with consumer Consumer{subscription=PersistentSubscription{topic=persistent://<tenant>/<namespace>/in, name=SimplePulsarConsumer}, consumerId=0, consumerName=5825b, address=/...}","reqId":1243883448178735299, "remote":"<service_url>", "local":"/192.168.0.95:56512"} at org.apache.pulsar.client.api.PulsarClientException.unwrap(PulsarClientException.java:1060) at org.apache.pulsar.client.impl.ConsumerBuilderImpl.subscribe(ConsumerBuilderImpl.java:101) at com.datastax.pulsar.SimplePulsarConsumer.main(SimplePulsarConsumer.java:47)
The new consumer can’t subscribe to the topic because you reserved the entire hash range for the first consumer.
Key_Shared subscription video
Follow along with this video from our Five Minutes About Pulsar series to see Key_Shared subscriptions in action.
What’s next?
For more on subscriptions, see: