Key shared subscriptions in Apache Pulsar™

Subscriptions in Apache Pulsar™ describe which consumers are consuming data from a topic and how they want to consume that data.

Pulsar’s shared subscription model can increase the message processing rate, but it risks losing message ordering guarantees. In a round-robin delivery, there’s no way for the broker to know which messages are going to which consumer.

Key shared subscriptions allow multiple consumers to subscribe to a topic, and provide additional metadata in the form of keys that link messages to specific consumers. Keys are generated with hashing that converts arbitrary values like topic-name or JSON blobs into fixed integer values, and then the hashed values are assigned to subscribed consumers in one of two ways:

  • Auto hash: Uses consistent hashing to balance range values across available consumers without requiring manual setup of hash ranges.

  • Sticky hash: The client manually assigns consumer range values, and then all hashes within a configured range go to one consumer.

This page explains how to use Pulsar’s key shared subscription model to manage your topic consumption.

Prerequisites

This example requires the following:

Prepare example project and producer

  1. Create a Maven project.

  2. Edit the pom.xml file to include the following dependencies:

    pom.xml
    <project xmlns="http://maven.apache.org/POM/4.0.0"
    	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    	<modelVersion>4.0.0</modelVersion>
    	<groupId>com.datastax.pulsar</groupId>
    	<artifactId>pulsar-subscription-example</artifactId>
    	<version>0.0.1-SNAPSHOT</version>
    	<packaging>jar</packaging>
    	<name>pulsar-subscription-example</name>
    
    	<properties>
    		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    		<version.pulsar>2.10.1</version.pulsar>
    		<version.slf4j>1.7.36</version.slf4j>
    		<version.logback>1.4.4</version.logback>
    		<version.maven-compiler-plugin>3.10.1</version.maven-compiler-plugin>
    		<version.java>11</version.java>
    	</properties>
    
    	<dependencies>
    
    		<dependency>
    			<groupId>org.apache.pulsar</groupId>
    			<artifactId>pulsar-client</artifactId>
    			<version>${version.pulsar}</version>
    		</dependency>
    
    		<dependency>
    			<groupId>ch.qos.logback</groupId>
    			<artifactId>logback-classic</artifactId>
    			<version>${version.logback}</version>
    		</dependency>
    		<dependency>
    			<groupId>org.slf4j</groupId>
    			<artifactId>slf4j-api</artifactId>
    			<version>${version.slf4j}</version>
    		</dependency>
    		<dependency>
    			<groupId>org.slf4j</groupId>
    			<artifactId>slf4j-simple</artifactId>
    			<version>${version.slf4j}</version>
    		</dependency>
    
    		<dependency>
    			<groupId>org.slf4j</groupId>
    			<artifactId>jcl-over-slf4j</artifactId>
    			<version>${version.slf4j}</version>
    		</dependency>
    	</dependencies>
    
    	<build>
    
    		<plugins>
    			<plugin>
    				<groupId>org.apache.maven.plugins</groupId>
    				<artifactId>maven-compiler-plugin</artifactId>
    				<version>${version.maven-compiler-plugin}</version>
    				<configuration>
    					<source>${version.java}</source>
    					<target>${version.java}</target>
    					<showWarnings>true</showWarnings>
    				</configuration>
    			</plugin>
    		</plugins>
    	</build>
    </project>
  3. In src/main/resources, create the following application.properties file with the connection details for your Astra Streaming cluster. Create the resources subdirectory if it doesn’t already exist.

    /src/main/resources/application.properties
    # ---------------------------------------
    # Configuration of your Astra Streaming tenant
    # ---------------------------------------
    demo.wait_between_message=5000
    
    service_url=BROKER_SERVICE_URL
    namespace=default
    tenant_name=my-tenant
    authentication_token=ASTRA_APPLICATION_TOKEN
    topic_name=my-topic
  4. In src/main/java/com/datastax/pulsar, create the following Configuration.java class to load the connection details from application.properties. Create the /datastax/pulsar subdirectories if they don’t already exist.

    Configuration.java
    package com.datastax.pulsar;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.io.IOException;
    import java.util.Properties;
    
    public class Configuration {
    
        private static final Logger LOG = LoggerFactory.getLogger(Configuration.class);
    
        private static Configuration _instance;
    
        private final String serviceUrl;
        private final String tenantName;
        private final String namespace;
        private final String topicName;
        private final String authenticationToken;
    
        private int waitPeriod = 1000;
    
        public static synchronized Configuration getInstance() {
            if (null == _instance) {
                _instance = new Configuration();
            }
            return _instance;
        }
    
        private Configuration() {
            try {
                Properties properties = new Properties();
                properties.load(Thread
                        .currentThread()
                        .getContextClassLoader()
                        .getResourceAsStream("application.properties"));
    
                this.serviceUrl = properties.getProperty("service_url");
                if (null == serviceUrl) {
                    throw new IllegalArgumentException("Cannot read serviceUrl in conf file");
                }
    
                this.namespace  = properties.getProperty("namespace");
                if (null == namespace) {
                    throw new IllegalArgumentException("Cannot read namespace in conf file");
                }
    
                this.tenantName  = properties.getProperty("tenant_name");
                if (null == tenantName) {
                    throw new IllegalArgumentException("Cannot read tenant_name in conf file");
                }
    
                this.topicName  = properties.getProperty("topic_name");
                if (null == topicName) {
                    throw new IllegalArgumentException("Cannot read topic_name in conf file");
                }
    
                this.authenticationToken  = properties.getProperty("authentication_token");
                if (null == authenticationToken) {
                    throw new IllegalArgumentException("Cannot read authentication_token in conf file");
                }
    
                String newPeriod = properties.getProperty("demo.wait_between_message");
                if (null != newPeriod) {
                    waitPeriod = Integer.parseInt(newPeriod);
                }
    
                LOG.info("Configuration has been loaded successfully");
            } catch (IOException ioe) {
                throw new IllegalStateException(ioe);
            }
        }
    
        public static void main(String[] args) {
            new Configuration();
        }
    
        public String getServiceUrl() {
            return serviceUrl;
        }
    
        public String getTenantName() {
            return tenantName;
        }
    
        public String getNamespace() {
            return namespace;
        }
    
        public String getTopicName() {
            return topicName;
        }
    
        public String getAuthenticationToken() {
            return authenticationToken;
        }
    
        public int getWaitPeriod() {
            return waitPeriod;
        }
    
    }
  5. In src/main/java/com/datastax/pulsar, create the following DemoBean.java class to represent the example messages that will be produced and consumed:

    DemoBean.java
    package com.datastax.pulsar;
    
    public class DemoBean {
        int  show_id;
        String cast;
        String country;
        String date_added;
        String description;
        String director;
        String duration;
        String listed_in;
        String rating;
        int release_year;
        String title;
        String type;
    
        public DemoBean(
                int  show_id,
                String cast,
                String country,
                String date_added,
                String description,
                String director,
                String duration,
                String listed_in,
                String rating,
                int release_year,
                String title,
                String type
        ) {
            super();
            this.show_id = show_id;
            this.cast = cast;
            this.country = country;
            this.date_added = date_added;
            this.description = description;
            this.director = director;
            this.duration = duration;
            this.listed_in = listed_in;
            this.rating = rating;
            this.release_year = release_year;
            this.title = title;
            this.type = type;
        }
    
        public int getShow_id() {
            return this.show_id;
        }
    
        public void setShow_id(int show_id) {
            this.show_id = show_id;
        }
    
        public String getCast() {
            return this.cast;
        }
    
        public void setCast(String cast) {
            this.cast = cast;
        }
    
        public String getCountry() {
            return this.country;
        }
    
        public void setCountry(String country) {
            this.country = country;
        }
    
        public String getDate_added() {
            return this.date_added;
        }
    
        public void setDate_added(String date_added) {
            this.date_added = date_added;
        }
    
        public String getDescription() {
            return this.description;
        }
    
        public void setDescription(String description) {
            this.description = description;
        }
    
        public String getDirector() {
            return this.director;
        }
    
        public void setDirector(String director) {
            this.director = director;
        }
    
        public String getDuration() {
            return this.duration;
        }
    
        public void setDuration(String duration) {
            this.duration = duration;
        }
    
        public String getListed_in() {
            return this.listed_in;
        }
    
        public void setListed_in(String listed_in) {
            this.listed_in = listed_in;
        }
    
        public String getRating() {
            return this.rating;
        }
    
        public void setRating(String rating) {
            this.rating = rating;
        }
    
        public int getRelease_year() {
            return this.release_year;
        }
    
        public void setRelease_year(int release_year) {
            this.release_year = release_year;
        }
    
        public String getTitle() {
            return this.title;
        }
    
        public void setTitle(String title) {
            this.title = title;
        }
    
        public String getType() {
            return this.type;
        }
    
        public void setType(String type) {
            this.type = type;
        }
    }
  6. In src/main/java/com/datastax/pulsar, create a SimplePulsarProducer.java file with the following contents:

    SimplePulsarProducer.java
    package com.datastax.pulsar;
    
    import java.util.Random;
    
    import org.apache.pulsar.client.api.AuthenticationFactory;
    import org.apache.pulsar.client.api.Producer;
    import org.apache.pulsar.client.api.PulsarClient;
    import org.apache.pulsar.client.api.PulsarClientException;
    import org.apache.pulsar.client.api.Schema;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    public class SimplePulsarProducer {
    
        private static final Logger LOG = LoggerFactory.getLogger(SimplePulsarProducer.class);
    
        public static void main(String[] args) {
    
            PulsarClient pulsarClient = null;
            Producer<DemoBean> pulsarProducer = null;
    
            try {
                Configuration conf = Configuration.getInstance();
    
                // Create client object
                pulsarClient = PulsarClient.builder()
                        .serviceUrl(conf.getServiceUrl())
                        .authentication(AuthenticationFactory.token(conf.getAuthenticationToken()))
                        .build();
    
                // Create producer on a topic
                pulsarProducer = pulsarClient
                        .newProducer(Schema.JSON(DemoBean.class))
                        .topic("persistent://"
                                + conf.getTenantName() + "/"
                                + conf.getNamespace() + "/"
                                + conf.getTopicName())
                        .create();
    
                while (true) {
                    // Creates random 8 digit ID
                    Random rnd = new Random();
                    int id = 10000000 + rnd.nextInt(90000000);
    
                    pulsarProducer.send(new DemoBean(
                            id,
                            "LeBron James, Anthony Davis, Kyrie Irving, Damian Lillard, Klay Thompson...",
                            "United States",
                            "July 16, 2021",
                            "NBA superstar LeBron James teams up with Bugs Bunny and the rest of the Looney Tunes for this long-awaited sequel.",
                            "Malcolm D. Lee",
                            "120 min",
                            "Animation, Adventure, Comedy",
                            "PG",
                            2021,
                            "Space Jam: A New Legacy",
                            "Movie"
                    ));
                    LOG.info("Message {} sent", id);
                    Thread.sleep(conf.getWaitPeriod());
                }
    
            } catch (PulsarClientException pce) {
                throw new IllegalStateException("Cannot connect to pulsar", pce);
            } catch (InterruptedException e) {
                LOG.info("Stopped request retrieved");
            } finally {
                try {
                    if (null != pulsarProducer) pulsarProducer.close();
                    if (null != pulsarClient) pulsarClient.close();
                } catch (PulsarClientException pce) {
                    LOG.error("Got Pulsar Client Exception", pce);
                }
                LOG.info("SimplePulsarProducer has been stopped.");
            }
        }
    
    }

Create consumer with key shared subscription

To create a Pulsar key shared subscription, create a pulsarConsumer with .subscriptionType(SubscriptionType.Key_Shared) and a keySharedPolicy configuration. The keySharedPolicy defines how hashed values are assigned to subscribed consumers.

For the simplest configuration, use the autoSplitHashRange policy.

If you need to set fixed hash ranges, use the stickyHashRange policy.

Use autoSplitHashRange

To automatically assign hash ranges to consumers, use the autoSplitHashRange policy. Running multiple consumers with autoSplitHashRange balances the messaging load across all available consumers, like a shared subscription.

  1. In src/main/java/com/datastax/pulsar, create a SimplePulsarConsumer.java file with the following contents:

    SimplePulsarConsumer.java
    package com.datastax.pulsar;
    
    import java.util.concurrent.TimeUnit;
    
    import org.apache.pulsar.client.api.AuthenticationFactory;
    import org.apache.pulsar.client.api.Consumer;
    import org.apache.pulsar.client.api.Message;
    import org.apache.pulsar.client.api.PulsarClient;
    import org.apache.pulsar.client.api.PulsarClientException;
    import org.apache.pulsar.client.api.Schema;
    import org.apache.pulsar.client.api.SubscriptionInitialPosition;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    public class SimplePulsarConsumer {
    
        private static final Logger LOG = LoggerFactory.getLogger(SimplePulsarConsumer.class);
    
        public static void main(String[] args) {
    
            PulsarClient pulsarClient = null;
            Consumer<DemoBean> pulsarConsumer = null;
    
            try {
                Configuration conf = Configuration.getInstance();
    
                // Create client object
                pulsarClient = PulsarClient.builder()
                        .serviceUrl(conf.getServiceUrl())
                        .authentication(AuthenticationFactory.token(conf.getAuthenticationToken()))
                        .build();
    
                pulsarConsumer = pulsarClient.newConsumer(Schema.JSON(DemoBean.class))
                        .topic("persistent://"
                                + conf.getTenantName() + "/"
                                + conf.getNamespace() + "/"
                                + conf.getTopicName())
                        .startMessageIdInclusive()
                        .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                        .subscriptionName("SimplePulsarConsumer")
                        // Subscription type is set to Key_Shared
                        .subscriptionType(SubscriptionType.Key_Shared)
                        .subscribe();
    
                while (true) {
                    Message<DemoBean> msg = pulsarConsumer.receive(conf.getWaitPeriod(), TimeUnit.MILLISECONDS);
                    if (msg != null) {
                        LOG.info("Message received: {}", new String(msg.getData()));
                        pulsarConsumer.acknowledge(msg);
                        Thread.sleep(conf.getWaitPeriod());
                    }
                }
    
            } catch (PulsarClientException e) {
                throw new IllegalStateException("Cannot connect to pulsar", e);
            } catch (InterruptedException e) {
                LOG.info("Stopped request retrieved");
            } finally {
                try {
                    if (null != pulsarConsumer) pulsarConsumer.close();
                    if (null != pulsarClient) pulsarClient.close();
                } catch (PulsarClientException pce) {
                    LOG.error("Got Pulsar Client Exception", pce);
                }
                LOG.info("SimplePulsarProducer has been stopped.");
            }
        }
    
    }
  2. In pulsarConsumer, add .keySharedPolicy(KeySharedPolicy.autoSplitHashRange()):

    SimplePulsarConsumer.java with autoSplitHashRange
    ...
    
    pulsarConsumer = pulsarClient.newConsumer(Schema.JSON(DemoBean.class))
        .topic("persistent://"
            + conf.getTenantName() + "/"
            + conf.getNamespace() + "/"
            + conf.getTopicName())
        .startMessageIdInclusive()
        .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
        .subscriptionName("SimplePulsarConsumer")
        // Subscription type is set to Key_Shared
        .subscriptionType(SubscriptionType.Key_Shared)
        // Define key shared policy for hash range assignment
        .keySharedPolicy(KeySharedPolicy.autoSplitHashRange())
        .subscribe();
    
    ...

Use stickyHashRange

To set a fixed hash range, use the stickyHashRange policy. This policy requires additional dependencies and producer configuration changes, in addition to the consumer configuration.

  1. In src/main/java/com/datastax/pulsar, create a SimplePulsarConsumer.java file with the following contents:

    SimplePulsarConsumer.java
    package com.datastax.pulsar;
    
    import java.util.concurrent.TimeUnit;
    
    import org.apache.pulsar.client.api.AuthenticationFactory;
    import org.apache.pulsar.client.api.Consumer;
    import org.apache.pulsar.client.api.Message;
    import org.apache.pulsar.client.api.PulsarClient;
    import org.apache.pulsar.client.api.PulsarClientException;
    import org.apache.pulsar.client.api.Schema;
    import org.apache.pulsar.client.api.SubscriptionInitialPosition;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    public class SimplePulsarConsumer {
    
        private static final Logger LOG = LoggerFactory.getLogger(SimplePulsarConsumer.class);
    
        public static void main(String[] args) {
    
            PulsarClient pulsarClient = null;
            Consumer<DemoBean> pulsarConsumer = null;
    
            try {
                Configuration conf = Configuration.getInstance();
    
                // Create client object
                pulsarClient = PulsarClient.builder()
                        .serviceUrl(conf.getServiceUrl())
                        .authentication(AuthenticationFactory.token(conf.getAuthenticationToken()))
                        .build();
    
                pulsarConsumer = pulsarClient.newConsumer(Schema.JSON(DemoBean.class))
                        .topic("persistent://"
                                + conf.getTenantName() + "/"
                                + conf.getNamespace() + "/"
                                + conf.getTopicName())
                        .startMessageIdInclusive()
                        .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                        .subscriptionName("SimplePulsarConsumer")
                        // Subscription type is set to Key_Shared
                        .subscriptionType(SubscriptionType.Key_Shared)
                        .subscribe();
    
                while (true) {
                    Message<DemoBean> msg = pulsarConsumer.receive(conf.getWaitPeriod(), TimeUnit.MILLISECONDS);
                    if (msg != null) {
                        LOG.info("Message received: {}", new String(msg.getData()));
                        pulsarConsumer.acknowledge(msg);
                        Thread.sleep(conf.getWaitPeriod());
                    }
                }
    
            } catch (PulsarClientException e) {
                throw new IllegalStateException("Cannot connect to pulsar", e);
            } catch (InterruptedException e) {
                LOG.info("Stopped request retrieved");
            } finally {
                try {
                    if (null != pulsarConsumer) pulsarConsumer.close();
                    if (null != pulsarClient) pulsarClient.close();
                } catch (PulsarClientException pce) {
                    LOG.error("Got Pulsar Client Exception", pce);
                }
                LOG.info("SimplePulsarProducer has been stopped.");
            }
        }
    
    }
  2. Import the following additional classes that are required for the stickyHashRange policy:

    SimplePulsarConsumer.java
    import org.apache.pulsar.client.api.Range;
    import org.apache.pulsar.client.api.KeySharedPolicy;
    import org.apache.pulsar.client.api.SubscriptionType;
  3. In pulsarConsumer, add the sticky hash key policy: .keySharedPolicy(KeySharedPolicy.stickyHashRange().ranges(Range.of(int,int))).

    The following example sets all possible hashes (0-65535) on this subscription to one consumer:

    SimplePulsarConsumer.java with stickyHashRange for one consumer
    ...
    
    pulsarConsumer = pulsarClient.newConsumer(Schema.JSON(DemoBean.class))
        .topic("persistent://"
            + conf.getTenantName() + "/"
            + conf.getNamespace() + "/"
            + conf.getTopicName())
        .startMessageIdInclusive()
        .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
        .subscriptionName("SimplePulsarConsumer")
        {subscription-comment}
        {subscription-type}
        // Policy assigns all possible hashes to one consumer
        .keySharedPolicy(KeySharedPolicy.stickyHashRange().ranges(Range.of(0,65535)))
        .subscribe();
    
    ...

    To split the hash range between multiple consumers, add a Range.of() argument for each consumer with the assigned hash range. For example:

    SimplePulsarConsumer.java with stickyHashRange for two consumers
        // Policy assigns half of the hash range to one consumer and half to another
        .keySharedPolicy(KeySharedPolicy.stickyHashRange().ranges(Range.of(0,32767), Range.of(32768,65535)))
  4. Modify the producer configuration to support stickyHashRange:

    1. In SimplePulsarProducer.java, import the following classes:

      SimplePulsarProducer.java
      import org.apache.pulsar.client.api.BatcherBuilder;
      import org.apache.pulsar.client.api.HashingScheme;
    2. Configure the pulsarProducer to use the JavaStringHash hashing scheme:

      SimplePulsarProducer.java
      pulsarProducer = pulsarClient
          .newProducer(Schema.JSON(DemoBean.class))
          .topic("persistent://"
                      + conf.getTenantName() + "/"
                      + conf.getNamespace() + "/"
                      + conf.getTopicName())
          // Send messages with the same key to the same consumer
          .batcherBuilder(BatcherBuilder.KEY_BASED)
          // Generate hash values for messages based on their keys
          .hashingScheme(HashingScheme.JavaStringHash)
          .create();

Test key shared subscription

  1. Run SimplePulsarConsumer.java to begin consuming messages as the primary consumer.

    The confirmation message and a cursor appear to indicate the consumer is ready:

    Result
    [main] INFO com.datastax.pulsar.Configuration - Configuration has been loaded successfully
    ...
    [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConsumerImpl - [persistent://<tenant_name>/<namespace>/in][SimplePulsarConsumer] Subscribed to topic on <service_url> -- consumer: 0
  2. In a new terminal window, run SimplePulsarProducer.java to begin producing messages:

    Result
    [main] INFO com.datastax.pulsar.SimplePulsarProducer - Message 55794190 sent
    [main] INFO com.datastax.pulsar.SimplePulsarProducer - Message 41791865 sent
    [main] INFO com.datastax.pulsar.SimplePulsarProducer - Message 74840732 sent
    [main] INFO com.datastax.pulsar.SimplePulsarProducer - Message 57467766 sent

    In the SimplePulsarConsumer terminal, the consumer begins receiving messages:

    Result
    [main] INFO com.datastax.pulsar.SimplePulsarConsumer - Message received: {"show_id":55794190,"cast":"LeBron James, Anthony Davis, Kyrie Irving, Damian Lillard, Klay Thompson...","country":"United States","date_added":"July 16, 2021","description":"NBA superstar LeBron James teams up with Bugs Bunny and the rest of the Looney Tunes for this long-awaited sequel.","director":"Malcolm D. Lee","duration":"120 min","listed_in":"Animation, Adventure, Comedy","rating":"PG","release_year":2021,"title":"Space Jam: A New Legacy","type":"Movie"}
    [main] INFO com.datastax.pulsar.SimplePulsarConsumer - Message received: {"show_id":41791865,"cast":"LeBron James, Anthony Davis, Kyrie Irving, Damian Lillard, Klay Thompson...","country":"United States","date_added":"July 16, 2021","description":"NBA superstar LeBron James teams up with Bugs Bunny and the rest of the Looney Tunes for this long-awaited sequel.","director":"Malcolm D. Lee","duration":"120 min","listed_in":"Animation, Adventure, Comedy","rating":"PG","release_year":2021,"title":"Space Jam: A New Legacy","type":"Movie"}
  3. In a new terminal window, try to run a new instance of SimplePulsarConsumer.java.

    If you used autoSplitHashRange, then the new consumer subscribes to the topic and consumes messages. The auto-hashing policy balances hash ranges across available consumers.

    If you used sticky hashing with one Range.of() argument, then the new consumer cannot subscribe to the topic because the SimplePulsarConsumer configuration reserved the entire hash range for the first consumer. For example:

    Result when using sticky hashing limited to one consumer
    [main] INFO com.datastax.pulsar.Configuration - Configuration has been loaded successfully
    [main] INFO com.datastax.pulsar.SimplePulsarConsumer - SimplePulsarProducer has been stopped.
    Exception in thread "main" java.lang.IllegalStateException: Cannot connect to pulsar
    	at com.datastax.pulsar.SimplePulsarConsumer.main(SimplePulsarConsumer.java:59)
    Caused by: org.apache.pulsar.client.api.PulsarClientException$ConsumerAssignException: {"errorMsg":"Range conflict with consumer Consumer{subscription=PersistentSubscription{topic=persistent://<tenant>/<namespace>/in, name=SimplePulsarConsumer}, consumerId=0, consumerName=5825b, address=/...}","reqId":1243883448178735299, "remote":"<service_url>", "local":"/192.168.0.95:56512"}
    	at org.apache.pulsar.client.api.PulsarClientException.unwrap(PulsarClientException.java:1060)
    	at org.apache.pulsar.client.impl.ConsumerBuilderImpl.subscribe(ConsumerBuilderImpl.java:101)
    	at com.datastax.pulsar.SimplePulsarConsumer.main(SimplePulsarConsumer.java:47)

    To run multiple consumers with sticky hashing, you must modify SimplePulsarConsumer.java to split the hash range between consumers, as explained in Use stickyHashRange. Then, you can launch multiple instances of SimplePulsarConsumer.java to consume messages from different hash ranges.

Was this helpful?

Give Feedback

How can we improve the documentation?

© Copyright IBM Corporation 2026 | Privacy policy | Terms of use Manage Privacy Choices

Apache, Apache Cassandra, Cassandra, Apache Tomcat, Tomcat, Apache Lucene, Apache Solr, Apache Hadoop, Hadoop, Apache Pulsar, Pulsar, Apache Spark, Spark, Apache TinkerPop, TinkerPop, Apache Kafka and Kafka are either registered trademarks or trademarks of the Apache Software Foundation or its subsidiaries in Canada, the United States and/or other countries. Kubernetes is the registered trademark of the Linux Foundation.

General Inquiries: Contact IBM