Shared subscriptions in Apache Pulsar™

Subscriptions in Apache Pulsar™ describe which consumers are consuming data from a topic and how they want to consume that data.

A shared subscription allows multiple consumers to consume messages from a single topic in a round-robin fashion. More consumers in a shared subscription can increase your Pulsar deployment’s rate of message consumption. However, there is a risk of losing message ordering guarantees and acknowledgment schemes.

This page explains how you can use Pulsar’s shared subscription model to manage your topic consumption.

Prerequisites

This example requires the following:

Prepare example project and producer

  1. Create a Maven project.

  2. Edit the pom.xml file to include the following dependencies:

    pom.xml
    <project xmlns="http://maven.apache.org/POM/4.0.0"
    	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    	<modelVersion>4.0.0</modelVersion>
    	<groupId>com.datastax.pulsar</groupId>
    	<artifactId>pulsar-subscription-example</artifactId>
    	<version>0.0.1-SNAPSHOT</version>
    	<packaging>jar</packaging>
    	<name>pulsar-subscription-example</name>
    
    	<properties>
    		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    		<version.pulsar>2.10.1</version.pulsar>
    		<version.slf4j>1.7.36</version.slf4j>
    		<version.logback>1.4.4</version.logback>
    		<version.maven-compiler-plugin>3.10.1</version.maven-compiler-plugin>
    		<version.java>11</version.java>
    	</properties>
    
    	<dependencies>
    
    		<dependency>
    			<groupId>org.apache.pulsar</groupId>
    			<artifactId>pulsar-client</artifactId>
    			<version>${version.pulsar}</version>
    		</dependency>
    
    		<dependency>
    			<groupId>ch.qos.logback</groupId>
    			<artifactId>logback-classic</artifactId>
    			<version>${version.logback}</version>
    		</dependency>
    		<dependency>
    			<groupId>org.slf4j</groupId>
    			<artifactId>slf4j-api</artifactId>
    			<version>${version.slf4j}</version>
    		</dependency>
    		<dependency>
    			<groupId>org.slf4j</groupId>
    			<artifactId>slf4j-simple</artifactId>
    			<version>${version.slf4j}</version>
    		</dependency>
    
    		<dependency>
    			<groupId>org.slf4j</groupId>
    			<artifactId>jcl-over-slf4j</artifactId>
    			<version>${version.slf4j}</version>
    		</dependency>
    	</dependencies>
    
    	<build>
    
    		<plugins>
    			<plugin>
    				<groupId>org.apache.maven.plugins</groupId>
    				<artifactId>maven-compiler-plugin</artifactId>
    				<version>${version.maven-compiler-plugin}</version>
    				<configuration>
    					<source>${version.java}</source>
    					<target>${version.java}</target>
    					<showWarnings>true</showWarnings>
    				</configuration>
    			</plugin>
    		</plugins>
    	</build>
    </project>
  3. In src/main/resources, create the following application.properties file with the connection details for your Astra Streaming cluster. Create the resources subdirectory if it doesn’t already exist.

    /src/main/resources/application.properties
    # ---------------------------------------
    # Configuration of your Astra Streaming tenant
    # ---------------------------------------
    demo.wait_between_message=5000
    
    service_url=BROKER_SERVICE_URL
    namespace=default
    tenant_name=my-tenant
    authentication_token=ASTRA_APPLICATION_TOKEN
    topic_name=my-topic
  4. In src/main/java/com/datastax/pulsar, create the following Configuration.java class to load the connection details from application.properties. Create the /datastax/pulsar subdirectories if they don’t already exist.

    Configuration.java
    package com.datastax.pulsar;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.io.IOException;
    import java.util.Properties;
    
    public class Configuration {
    
        private static final Logger LOG = LoggerFactory.getLogger(Configuration.class);
    
        private static Configuration _instance;
    
        private final String serviceUrl;
        private final String tenantName;
        private final String namespace;
        private final String topicName;
        private final String authenticationToken;
    
        private int waitPeriod = 1000;
    
        public static synchronized Configuration getInstance() {
            if (null == _instance) {
                _instance = new Configuration();
            }
            return _instance;
        }
    
        private Configuration() {
            try {
                Properties properties = new Properties();
                properties.load(Thread
                        .currentThread()
                        .getContextClassLoader()
                        .getResourceAsStream("application.properties"));
    
                this.serviceUrl = properties.getProperty("service_url");
                if (null == serviceUrl) {
                    throw new IllegalArgumentException("Cannot read serviceUrl in conf file");
                }
    
                this.namespace  = properties.getProperty("namespace");
                if (null == namespace) {
                    throw new IllegalArgumentException("Cannot read namespace in conf file");
                }
    
                this.tenantName  = properties.getProperty("tenant_name");
                if (null == tenantName) {
                    throw new IllegalArgumentException("Cannot read tenant_name in conf file");
                }
    
                this.topicName  = properties.getProperty("topic_name");
                if (null == topicName) {
                    throw new IllegalArgumentException("Cannot read topic_name in conf file");
                }
    
                this.authenticationToken  = properties.getProperty("authentication_token");
                if (null == authenticationToken) {
                    throw new IllegalArgumentException("Cannot read authentication_token in conf file");
                }
    
                String newPeriod = properties.getProperty("demo.wait_between_message");
                if (null != newPeriod) {
                    waitPeriod = Integer.parseInt(newPeriod);
                }
    
                LOG.info("Configuration has been loaded successfully");
            } catch (IOException ioe) {
                throw new IllegalStateException(ioe);
            }
        }
    
        public static void main(String[] args) {
            new Configuration();
        }
    
        public String getServiceUrl() {
            return serviceUrl;
        }
    
        public String getTenantName() {
            return tenantName;
        }
    
        public String getNamespace() {
            return namespace;
        }
    
        public String getTopicName() {
            return topicName;
        }
    
        public String getAuthenticationToken() {
            return authenticationToken;
        }
    
        public int getWaitPeriod() {
            return waitPeriod;
        }
    
    }
  5. In src/main/java/com/datastax/pulsar, create the following DemoBean.java class to represent the example messages that will be produced and consumed:

    DemoBean.java
    package com.datastax.pulsar;
    
    public class DemoBean {
        int  show_id;
        String cast;
        String country;
        String date_added;
        String description;
        String director;
        String duration;
        String listed_in;
        String rating;
        int release_year;
        String title;
        String type;
    
        public DemoBean(
                int  show_id,
                String cast,
                String country,
                String date_added,
                String description,
                String director,
                String duration,
                String listed_in,
                String rating,
                int release_year,
                String title,
                String type
        ) {
            super();
            this.show_id = show_id;
            this.cast = cast;
            this.country = country;
            this.date_added = date_added;
            this.description = description;
            this.director = director;
            this.duration = duration;
            this.listed_in = listed_in;
            this.rating = rating;
            this.release_year = release_year;
            this.title = title;
            this.type = type;
        }
    
        public int getShow_id() {
            return this.show_id;
        }
    
        public void setShow_id(int show_id) {
            this.show_id = show_id;
        }
    
        public String getCast() {
            return this.cast;
        }
    
        public void setCast(String cast) {
            this.cast = cast;
        }
    
        public String getCountry() {
            return this.country;
        }
    
        public void setCountry(String country) {
            this.country = country;
        }
    
        public String getDate_added() {
            return this.date_added;
        }
    
        public void setDate_added(String date_added) {
            this.date_added = date_added;
        }
    
        public String getDescription() {
            return this.description;
        }
    
        public void setDescription(String description) {
            this.description = description;
        }
    
        public String getDirector() {
            return this.director;
        }
    
        public void setDirector(String director) {
            this.director = director;
        }
    
        public String getDuration() {
            return this.duration;
        }
    
        public void setDuration(String duration) {
            this.duration = duration;
        }
    
        public String getListed_in() {
            return this.listed_in;
        }
    
        public void setListed_in(String listed_in) {
            this.listed_in = listed_in;
        }
    
        public String getRating() {
            return this.rating;
        }
    
        public void setRating(String rating) {
            this.rating = rating;
        }
    
        public int getRelease_year() {
            return this.release_year;
        }
    
        public void setRelease_year(int release_year) {
            this.release_year = release_year;
        }
    
        public String getTitle() {
            return this.title;
        }
    
        public void setTitle(String title) {
            this.title = title;
        }
    
        public String getType() {
            return this.type;
        }
    
        public void setType(String type) {
            this.type = type;
        }
    }
  6. In src/main/java/com/datastax/pulsar, create a SimplePulsarProducer.java file with the following contents:

    SimplePulsarProducer.java
    package com.datastax.pulsar;
    
    import java.util.Random;
    
    import org.apache.pulsar.client.api.AuthenticationFactory;
    import org.apache.pulsar.client.api.Producer;
    import org.apache.pulsar.client.api.PulsarClient;
    import org.apache.pulsar.client.api.PulsarClientException;
    import org.apache.pulsar.client.api.Schema;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    public class SimplePulsarProducer {
    
        private static final Logger LOG = LoggerFactory.getLogger(SimplePulsarProducer.class);
    
        public static void main(String[] args) {
    
            PulsarClient pulsarClient = null;
            Producer<DemoBean> pulsarProducer = null;
    
            try {
                Configuration conf = Configuration.getInstance();
    
                // Create client object
                pulsarClient = PulsarClient.builder()
                        .serviceUrl(conf.getServiceUrl())
                        .authentication(AuthenticationFactory.token(conf.getAuthenticationToken()))
                        .build();
    
                // Create producer on a topic
                pulsarProducer = pulsarClient
                        .newProducer(Schema.JSON(DemoBean.class))
                        .topic("persistent://"
                                + conf.getTenantName() + "/"
                                + conf.getNamespace() + "/"
                                + conf.getTopicName())
                        .create();
    
                while (true) {
                    // Creates random 8 digit ID
                    Random rnd = new Random();
                    int id = 10000000 + rnd.nextInt(90000000);
    
                    pulsarProducer.send(new DemoBean(
                            id,
                            "LeBron James, Anthony Davis, Kyrie Irving, Damian Lillard, Klay Thompson...",
                            "United States",
                            "July 16, 2021",
                            "NBA superstar LeBron James teams up with Bugs Bunny and the rest of the Looney Tunes for this long-awaited sequel.",
                            "Malcolm D. Lee",
                            "120 min",
                            "Animation, Adventure, Comedy",
                            "PG",
                            2021,
                            "Space Jam: A New Legacy",
                            "Movie"
                    ));
                    LOG.info("Message {} sent", id);
                    Thread.sleep(conf.getWaitPeriod());
                }
    
            } catch (PulsarClientException pce) {
                throw new IllegalStateException("Cannot connect to pulsar", pce);
            } catch (InterruptedException e) {
                LOG.info("Stopped request retrieved");
            } finally {
                try {
                    if (null != pulsarProducer) pulsarProducer.close();
                    if (null != pulsarClient) pulsarClient.close();
                } catch (PulsarClientException pce) {
                    LOG.error("Got Pulsar Client Exception", pce);
                }
                LOG.info("SimplePulsarProducer has been stopped.");
            }
        }
    
    }

Create consumer with shared subscription

To create a Pulsar shared subscription, create a pulsarConsumer with .subscriptionType(SubscriptionType.Shared).

  1. In src/main/java/com/datastax/pulsar, create a SimplePulsarConsumer.java file with the following contents:

    SimplePulsarConsumer.java
    package com.datastax.pulsar;
    
    import java.util.concurrent.TimeUnit;
    
    import org.apache.pulsar.client.api.AuthenticationFactory;
    import org.apache.pulsar.client.api.Consumer;
    import org.apache.pulsar.client.api.Message;
    import org.apache.pulsar.client.api.PulsarClient;
    import org.apache.pulsar.client.api.PulsarClientException;
    import org.apache.pulsar.client.api.Schema;
    import org.apache.pulsar.client.api.SubscriptionInitialPosition;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    public class SimplePulsarConsumer {
    
        private static final Logger LOG = LoggerFactory.getLogger(SimplePulsarConsumer.class);
    
        public static void main(String[] args) {
    
            PulsarClient pulsarClient = null;
            Consumer<DemoBean> pulsarConsumer = null;
    
            try {
                Configuration conf = Configuration.getInstance();
    
                // Create client object
                pulsarClient = PulsarClient.builder()
                        .serviceUrl(conf.getServiceUrl())
                        .authentication(AuthenticationFactory.token(conf.getAuthenticationToken()))
                        .build();
    
                pulsarConsumer = pulsarClient.newConsumer(Schema.JSON(DemoBean.class))
                        .topic("persistent://"
                                + conf.getTenantName() + "/"
                                + conf.getNamespace() + "/"
                                + conf.getTopicName())
                        .startMessageIdInclusive()
                        .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                        .subscriptionName("SimplePulsarConsumer")
                        // Subscription type is set to Shared
                        .subscriptionType(SubscriptionType.Shared)
                        .subscribe();
    
                while (true) {
                    Message<DemoBean> msg = pulsarConsumer.receive(conf.getWaitPeriod(), TimeUnit.MILLISECONDS);
                    if (msg != null) {
                        LOG.info("Message received: {}", new String(msg.getData()));
                        pulsarConsumer.acknowledge(msg);
                        Thread.sleep(conf.getWaitPeriod());
                    }
                }
    
            } catch (PulsarClientException e) {
                throw new IllegalStateException("Cannot connect to pulsar", e);
            } catch (InterruptedException e) {
                LOG.info("Stopped request retrieved");
            } finally {
                try {
                    if (null != pulsarConsumer) pulsarConsumer.close();
                    if (null != pulsarClient) pulsarClient.close();
                } catch (PulsarClientException pce) {
                    LOG.error("Got Pulsar Client Exception", pce);
                }
                LOG.info("SimplePulsarProducer has been stopped.");
            }
        }
    
    }

Test shared subscription

  1. Run SimplePulsarConsumer.java to begin consuming messages as the primary consumer.

    The confirmation message and a cursor appear to indicate the consumer is ready:

    Result
    [main] INFO com.datastax.pulsar.Configuration - Configuration has been loaded successfully
    ...
    [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConsumerImpl - [persistent://<tenant_name>/<namespace>/in][SimplePulsarConsumer] Subscribed to topic on <service_url> -- consumer: 0
  2. In a new terminal window, run SimplePulsarProducer.java to begin producing messages:

    Result
    [main] INFO com.datastax.pulsar.SimplePulsarProducer - Message 59819331 sent
    [main] INFO com.datastax.pulsar.SimplePulsarProducer - Message 70129519 sent
    [main] INFO com.datastax.pulsar.SimplePulsarProducer - Message 31365142 sent
    [main] INFO com.datastax.pulsar.SimplePulsarProducer - Message 48206643 sent
    [main] INFO com.datastax.pulsar.SimplePulsarProducer - Message 51277375 sent

    In the SimplePulsarConsumer terminal, the consumer begins receiving messages:

    Result
    [main] INFO com.datastax.pulsar.SimplePulsarConsumer - Message received: {"show_id":59819331,"cast":"LeBron James, Anthony Davis, Kyrie Irving, Damian Lillard, Klay Thompson...","country":"United States","date_added":"July 16, 2021","description":"NBA superstar LeBron James teams up with Bugs Bunny and the rest of the Looney Tunes for this long-awaited sequel.","director":"Malcolm D. Lee","duration":"120 min","listed_in":"Animation, Adventure, Comedy","rating":"PG","release_year":2021,"title":"Space Jam: A New Legacy","type":"Movie"}
    [main] INFO com.datastax.pulsar.SimplePulsarConsumer - Message received: {"show_id":31365142,"cast":"LeBron James, Anthony Davis, Kyrie Irving, Damian Lillard, Klay Thompson...","country":"United States","date_added":"July 16, 2021","description":"NBA superstar LeBron James teams up with Bugs Bunny and the rest of the Looney Tunes for this long-awaited sequel.","director":"Malcolm D. Lee","duration":"120 min","listed_in":"Animation, Adventure, Comedy","rating":"PG","release_year":2021,"title":"Space Jam: A New Legacy","type":"Movie"}
    [main] INFO com.datastax.pulsar.SimplePulsarConsumer - Message received: {"show_id":51277375,"cast":"LeBron James, Anthony Davis, Kyrie Irving, Damian Lillard, Klay Thompson...","country":"United States","date_added":"July 16, 2021","description":"NBA superstar LeBron James teams up with Bugs Bunny and the rest of the Looney Tunes for this long-awaited sequel.","director":"Malcolm D. Lee","duration":"120 min","listed_in":"Animation, Adventure, Comedy","rating":"PG","release_year":2021,"title":"Space Jam: A New Legacy","type":"Movie"}
  3. In a new terminal window, run another instance of SimplePulsarConsumer.java.

    The new consumer subscribes to the topic and consumes messages:

    Result
    [main] INFO com.datastax.pulsar.SimplePulsarConsumer - Message received: {"show_id":70129519,"cast":"LeBron James, Anthony Davis, Kyrie Irving, Damian Lillard, Klay Thompson...","country":"United States","date_added":"July 16, 2021","description":"NBA superstar LeBron James teams up with Bugs Bunny and the rest of the Looney Tunes for this long-awaited sequel.","director":"Malcolm D. Lee","duration":"120 min","listed_in":"Animation, Adventure, Comedy","rating":"PG","release_year":2021,"title":"Space Jam: A New Legacy","type":"Movie"}
    [main] INFO com.datastax.pulsar.SimplePulsarConsumer - Message received: {"show_id":48206643,"cast":"LeBron James, Anthony Davis, Kyrie Irving, Damian Lillard, Klay Thompson...","country":"United States","date_added":"July 16, 2021","description":"NBA superstar LeBron James teams up with Bugs Bunny and the rest of the Looney Tunes for this long-awaited sequel.","director":"Malcolm D. Lee","duration":"120 min","listed_in":"Animation, Adventure, Comedy","rating":"PG","release_year":2021,"title":"Space Jam: A New Legacy","type":"Movie"}

Because this test uses shared subscriptions, you can attach multiple consumers to the topic. If you run this test with exclusive subscriptions, you cannot attach more than once subscriber to the exclusive topic.

To continue testing the shared subscription configuration, you can continue running new instances of SimplePulsarConsumer.java in new terminal windows. All the consumers subscribe to the topic and consume messages in a round-robin fashion.

Was this helpful?

Give Feedback

How can we improve the documentation?

© Copyright IBM Corporation 2026 | Privacy policy | Terms of use Manage Privacy Choices

Apache, Apache Cassandra, Cassandra, Apache Tomcat, Tomcat, Apache Lucene, Apache Solr, Apache Hadoop, Hadoop, Apache Pulsar, Pulsar, Apache Spark, Spark, Apache TinkerPop, TinkerPop, Apache Kafka and Kafka are either registered trademarks or trademarks of the Apache Software Foundation or its subsidiaries in Canada, the United States and/or other countries. Kubernetes is the registered trademark of the Linux Foundation.

General Inquiries: Contact IBM