Key shared subscriptions in Pulsar

Subscriptions in Pulsar describe which consumers are consuming data from a topic and how they want to consume that data.

Pulsar’s shared subscription model can increase the message processing rate, but it risks losing message ordering guarantees. In a round-robin delivery, there’s no way for the broker to know which messages are going to which consumer.

Key shared subscriptions allow multiple consumers to subscribe to a topic, and provide additional metadata in the form of keys that link messages to specific consumers. Keys are generated with hashing that converts arbitrary values like topic-name or JSON blobs into fixed integer values, and then the hashed values are assigned to subscribed consumers in one of two ways:

  • Auto hash: Uses consistent hashing to balance range values across available consumers without requiring manual setup of hash ranges.

  • Sticky hash: The client manually assigns consumer range values, and then all hashes within a configured range go to one consumer.

This page explains how to use Pulsar’s key shared subscription model to manage your topic consumption.

Prerequisites

This example requires the following:

  • Apache Maven

  • Java OpenJDK 11

  • Astra Streaming access with at least one streaming tenant and one topic

  • A local clone of the DataStax Pulsar Subscription Example repository

  • In the pulsar-subscription-example repo, navigate to src/main/resources, and then edit application.properties to connect to your Astra Streaming cluster:

    application.properties
    service_url=BROKER_SERVICE_URL
    namespace=default
    tenant_name=my-tenant
    authentication_token=ASTRA_DB_APPLICATION_TOKEN
    topic_name=my-topic

Key shared subscription example

  1. To try out a Pulsar key shared subscription, add .subscriptionType(SubscriptionType.Key_Shared) to the pulsarConsumer in SimplePulsarConsumer.java:

    SimplePulsarConsumer.java
    pulsarConsumer = pulsarClient.newConsumer(Schema.JSON(DemoBean.class))
        .topic("persistent://"
            + conf.getTenantName() + "/"
            + conf.getNamespace() + "/"
            + conf.getTopicName())
        .startMessageIdInclusive()
        .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
        .subscriptionName("SimplePulsarConsumer")
        .subscriptionType(SubscriptionType.Key_Shared)
        .keySharedPolicy(KeySharedPolicy.autoSplitHashRange())
        .subscribe();

    The keySharedPolicy defines how hashed values are assigned to subscribed consumers.

    The above example uses autoSplitHashRange, which is an auto-hashing policy. Running multiple consumers with auto-hashing balances the messaging load across all available consumers, like a shared subscription.

    If you want to set a fixed hash range, use KeySharedPolicy.stickyHashRange(), as demonstrated in the following steps.

  2. To use a sticky hashed key shared subscription, import the following classes to SimplePulsarConsumer.java:

    SimplePulsarConsumer.java
    import org.apache.pulsar.client.api.Range;
    import org.apache.pulsar.client.api.KeySharedPolicy;
    import org.apache.pulsar.client.api.SubscriptionType;
  3. Add the following classes to SimplePulsarProducer.java:

    SimplePulsarProducer.java
    import org.apache.pulsar.client.api.BatcherBuilder;
    import org.apache.pulsar.client.api.HashingScheme;
  4. In SimplePulsarProducer.java, modify the pulsarProducer to use the JavaStringHash hashing scheme:

    SimplePulsarProducer.java
    pulsarProducer = pulsarClient
        .newProducer(Schema.JSON(DemoBean.class))
        .topic("persistent://"
                    + conf.getTenantName() + "/"
                    + conf.getNamespace() + "/"
                    + conf.getTopicName())
        .batcherBuilder(BatcherBuilder.KEY_BASED)
        .hashingScheme(HashingScheme.JavaStringHash)
        .create();
  5. In SimplePulsarConsumer.java, modify the pulsarConsumer to use sticky hashing. This example sets all possible hashes (0-65535) on this subscription to one consumer.

    SimplePulsarConsumer.java
    .subscriptionType(SubscriptionType.Key_Shared)
    .keySharedPolicy(KeySharedPolicy.stickyHashRange().ranges(Range.of(0,65535)))
  6. In the pulsar-examples project, run SimplePulsarConsumer.java to begin consuming messages.

    The confirmation message and a cursor appear to indicate the consumer is ready:

    Result
    [main] INFO com.datastax.pulsar.Configuration - Configuration has been loaded successfully
    ...
    [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConsumerImpl - [persistent://<tenant_name>/<namespace>/in][SimplePulsarConsumer] Subscribed to topic on <service_url> -- consumer: 0
  7. In a new terminal window, run SimplePulsarProducer.java to begin producing messages:

    Result
    [main] INFO com.datastax.pulsar.SimplePulsarProducer - Message 55794190 sent
    [main] INFO com.datastax.pulsar.SimplePulsarProducer - Message 41791865 sent
    [main] INFO com.datastax.pulsar.SimplePulsarProducer - Message 74840732 sent
    [main] INFO com.datastax.pulsar.SimplePulsarProducer - Message 57467766 sent

    In the SimplePulsarConsumer terminal, the consumer begins receiving messages:

    Result
    [main] INFO com.datastax.pulsar.SimplePulsarConsumer - Message received: {"show_id":55794190,"cast":"LeBron James, Anthony Davis, Kyrie Irving, Damian Lillard, Klay Thompson...","country":"United States","date_added":"July 16, 2021","description":"NBA superstar LeBron James teams up with Bugs Bunny and the rest of the Looney Tunes for this long-awaited sequel.","director":"Malcolm D. Lee","duration":"120 min","listed_in":"Animation, Adventure, Comedy","rating":"PG","release_year":2021,"title":"Space Jam: A New Legacy","type":"Movie"}
    [main] INFO com.datastax.pulsar.SimplePulsarConsumer - Message received: {"show_id":41791865,"cast":"LeBron James, Anthony Davis, Kyrie Irving, Damian Lillard, Klay Thompson...","country":"United States","date_added":"July 16, 2021","description":"NBA superstar LeBron James teams up with Bugs Bunny and the rest of the Looney Tunes for this long-awaited sequel.","director":"Malcolm D. Lee","duration":"120 min","listed_in":"Animation, Adventure, Comedy","rating":"PG","release_year":2021,"title":"Space Jam: A New Legacy","type":"Movie"}
  8. In a new terminal window, try to run a new instance of SimplePulsarConsumer.java.

    The new consumer can’t subscribe to the topic because the SimplePulsarConsumer configuration reserved the entire hash range for the first consumer:

    Result
    [main] INFO com.datastax.pulsar.Configuration - Configuration has been loaded successfully
    [main] INFO com.datastax.pulsar.SimplePulsarConsumer - SimplePulsarProducer has been stopped.
    Exception in thread "main" java.lang.IllegalStateException: Cannot connect to pulsar
    	at com.datastax.pulsar.SimplePulsarConsumer.main(SimplePulsarConsumer.java:59)
    Caused by: org.apache.pulsar.client.api.PulsarClientException$ConsumerAssignException: {"errorMsg":"Range conflict with consumer Consumer{subscription=PersistentSubscription{topic=persistent://<tenant>/<namespace>/in, name=SimplePulsarConsumer}, consumerId=0, consumerName=5825b, address=/...}","reqId":1243883448178735299, "remote":"<service_url>", "local":"/192.168.0.95:56512"}
    	at org.apache.pulsar.client.api.PulsarClientException.unwrap(PulsarClientException.java:1060)
    	at org.apache.pulsar.client.impl.ConsumerBuilderImpl.subscribe(ConsumerBuilderImpl.java:101)
    	at com.datastax.pulsar.SimplePulsarConsumer.main(SimplePulsarConsumer.java:47)
  9. To run multiple consumers with sticky hashing, modify the SimplePulsarConsumer.java configuration to split the hash range between consumers or use auto-hashing. Then, you can launch multiple instances of SimplePulsarConsumer.java to consume messages from different hash ranges.

Was this helpful?

Give Feedback

How can we improve the documentation?

© 2024 DataStax | Privacy policy | Terms of use

Apache, Apache Cassandra, Cassandra, Apache Tomcat, Tomcat, Apache Lucene, Apache Solr, Apache Hadoop, Hadoop, Apache Pulsar, Pulsar, Apache Spark, Spark, Apache TinkerPop, TinkerPop, Apache Kafka and Kafka are either registered trademarks or trademarks of the Apache Software Foundation or its subsidiaries in Canada, the United States and/or other countries. Kubernetes is the registered trademark of the Linux Foundation.

General Inquiries: +1 (650) 389-6000, info@datastax.com