Key_Shared subscriptions in Pulsar

Subscriptions in Pulsar describe which consumers are consuming data from a topic and how they want to consume that data.

Pulsar’s shared subscription model trades an increased message processing rate for ordering guarantees. In a round-robin delivery, there’s no way for the broker to know which messages are going to which consumer.
Key_Shared subscriptions allow multiple consumers to subscribe to a topic, but with additional metadata in the form of keys which link messages to specific consumers.
Keys are generated with hashing, which converts arbitrary values like "topic-name" or JSON blobs into fixed integer values. These hashed values are then assigned to subscribed consumers in one of two ways:

  • Auto hash - uses consistent hashing to balance range values across available consumers, without requiring manual setup of hash ranges.

  • Sticky hash - the client manually assigns consumer range values. All hashes within a configured range go to one consumer.

This document explains how to use Pulsar’s Key_Shared subscription model to manage your topic consumption.

Prerequisites

To run this example, you’ll need:

  • Apache Maven

  • Java OpenJDK 11

  • A configured Astra Streaming instance with at least one streaming tenant and one topic. See the Astra Streaming quick start for instructions.

  • A local clone of the DataStax Pulsar Subscription Example repository

  • Modify the src/main/resources/application.properties in the pulsar-subscription-example repo to connect to your Astra Streaming cluster, as below:

    service_url={broker-service-url}
    namespace=default
    tenant_name=my-tenant
    authentication_token={astra-auth-token}
    topic_name=my-topic

Key_Shared subscription example

To try out a Pulsar Key_Shared subscription, add .subscriptionType(SubscriptionType.Key_Shared) to the pulsarConsumer in SimplePulsarConsumer.java.
You must also tell Pulsar what keySharedPolicy this subscription will use. The example below uses the auto-hashing keySharedPolicy.

pulsarConsumer = pulsarClient.newConsumer(Schema.JSON(DemoBean.class))
    .topic("persistent://"
        + conf.getTenantName() + "/"
        + conf.getNamespace() + "/"
        + conf.getTopicName())
    .startMessageIdInclusive()
    .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
    .subscriptionName("SimplePulsarConsumer")
    .subscriptionType(SubscriptionType.Key_Shared)
    .keySharedPolicy(KeySharedPolicy.autoSplitHashRange())
    .subscribe();

Running multiple consumers with auto-hashing balances the messaging load across all available consumers.

Manually set stickyHashRange

You can manually set a hash range with KeySharedPolicy.stickyHashRange().

To test out sticky hashed Key_Shared subscriptions, you need to first import some additional classes.

  1. Add the following classes to SimplePulsarConsumer.java:

    import org.apache.pulsar.client.api.Range;
    import org.apache.pulsar.client.api.KeySharedPolicy;
    import org.apache.pulsar.client.api.SubscriptionType;
  2. Add the following classes to SimplePulsarProducer.java:

    import org.apache.pulsar.client.api.BatcherBuilder;
    import org.apache.pulsar.client.api.HashingScheme;
  3. Modify the pulsarProducer in SimplePulsarProducer.java to use the JavaStringHash hashing scheme.

    pulsarProducer = pulsarClient
        .newProducer(Schema.JSON(DemoBean.class))
        .topic("persistent://"
                    + conf.getTenantName() + "/"
                    + conf.getNamespace() + "/"
                    + conf.getTopicName())
        .batcherBuilder(BatcherBuilder.KEY_BASED)
        .hashingScheme(HashingScheme.JavaStringHash)
        .create();
  4. Modify the pulsarConsumer in SimplePulsarConsumer.java to use sticky hashing. This example sets all possible hashes (0-65535) on this subscription to this consumer.

    .subscriptionType(SubscriptionType.Key_Shared)
    .keySharedPolicy(KeySharedPolicy.stickyHashRange().ranges(Range.of(0,65535)))
  5. Open pulsar-examples in the IDE of your choice and run SimplePulsarConsumer.java to begin consuming messages.
    The confirmation message and a cursor appear to indicate the consumer is ready.

    [main] INFO com.datastax.pulsar.Configuration - Configuration has been loaded successfully
    ...
    [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConsumerImpl - [persistent://<tenant_name>/<namespace>/in][SimplePulsarConsumer] Subscribed to topic on <service_url> -- consumer: 0
  6. In a new terminal window, run SimplePulsarProducer.java to begin producing messages.

    [main] INFO com.datastax.pulsar.SimplePulsarProducer - Message 55794190 sent
    [main] INFO com.datastax.pulsar.SimplePulsarProducer - Message 41791865 sent
    [main] INFO com.datastax.pulsar.SimplePulsarProducer - Message 74840732 sent
    [main] INFO com.datastax.pulsar.SimplePulsarProducer - Message 57467766 sent
  7. The consumer begins receiving messages.

    [main] INFO com.datastax.pulsar.SimplePulsarConsumer - Message received: {"show_id":55794190,"cast":"LeBron James, Anthony Davis, Kyrie Irving, Damian Lillard, Klay Thompson...","country":"United States","date_added":"July 16, 2021","description":"NBA superstar LeBron James teams up with Bugs Bunny and the rest of the Looney Tunes for this long-awaited sequel.","director":"Malcolm D. Lee","duration":"120 min","listed_in":"Animation, Adventure, Comedy","rating":"PG","release_year":2021,"title":"Space Jam: A New Legacy","type":"Movie"}
    [main] INFO com.datastax.pulsar.SimplePulsarConsumer - Message received: {"show_id":41791865,"cast":"LeBron James, Anthony Davis, Kyrie Irving, Damian Lillard, Klay Thompson...","country":"United States","date_added":"July 16, 2021","description":"NBA superstar LeBron James teams up with Bugs Bunny and the rest of the Looney Tunes for this long-awaited sequel.","director":"Malcolm D. Lee","duration":"120 min","listed_in":"Animation, Adventure, Comedy","rating":"PG","release_year":2021,"title":"Space Jam: A New Legacy","type":"Movie"}
  8. Open a new terminal window and try to run SimplePulsarConsumer.java.

    [main] INFO com.datastax.pulsar.Configuration - Configuration has been loaded successfully
    [main] INFO com.datastax.pulsar.SimplePulsarConsumer - SimplePulsarProducer has been stopped.
    Exception in thread "main" java.lang.IllegalStateException: Cannot connect to pulsar
    	at com.datastax.pulsar.SimplePulsarConsumer.main(SimplePulsarConsumer.java:59)
    Caused by: org.apache.pulsar.client.api.PulsarClientException$ConsumerAssignException: {"errorMsg":"Range conflict with consumer Consumer{subscription=PersistentSubscription{topic=persistent://<tenant>/<namespace>/in, name=SimplePulsarConsumer}, consumerId=0, consumerName=5825b, address=/...}","reqId":1243883448178735299, "remote":"<service_url>", "local":"/192.168.0.95:56512"}
    	at org.apache.pulsar.client.api.PulsarClientException.unwrap(PulsarClientException.java:1060)
    	at org.apache.pulsar.client.impl.ConsumerBuilderImpl.subscribe(ConsumerBuilderImpl.java:101)
    	at com.datastax.pulsar.SimplePulsarConsumer.main(SimplePulsarConsumer.java:47)

The new consumer can’t subscribe to the topic because you reserved the entire hash range for the first consumer.

Key_Shared subscription video

Follow along with this video from our Five Minutes About Pulsar series to see Key_Shared subscriptions in action.

Was this helpful?

Give Feedback

How can we improve the documentation?

© 2024 DataStax | Privacy policy | Terms of use

Apache, Apache Cassandra, Cassandra, Apache Tomcat, Tomcat, Apache Lucene, Apache Solr, Apache Hadoop, Hadoop, Apache Pulsar, Pulsar, Apache Spark, Spark, Apache TinkerPop, TinkerPop, Apache Kafka and Kafka are either registered trademarks or trademarks of the Apache Software Foundation or its subsidiaries in Canada, the United States and/or other countries. Kubernetes is the registered trademark of the Linux Foundation.

General Inquiries: +1 (650) 389-6000, info@datastax.com