Exclusive subscriptions in Pulsar
Subscriptions in Pulsar describe which consumers are consuming data from a topic and how they want to consume that data.
An exclusive subscription describes a basic publish-subscribe pattern where a single consumer subscribes to a single topic and consumes from it.
This document explains how to use Pulsar’s exclusive subscription model to manage your topic consumption.
Prerequisites
To run this example, you’ll need:
-
A configured Astra Streaming instance with at least one streaming tenant and one topic. See the Astra Streaming quick start for instructions.
-
A local clone of the DataStax Pulsar Subscription Example repository
-
Modify the
src/main/resources/application.properties
in thepulsar-subscription-example
repo to connect to your Astra Streaming cluster, as below:service_url={broker-service-url} namespace=default tenant_name=my-tenant authentication_token={astra-auth-token} topic_name=my-topic
Exclusive subscription example
This example uses the pulsarConsumer
object in SimplePulsarConsumer.java
below.
pulsarConsumer = pulsarClient.newConsumer(Schema.JSON(DemoBean.class))
.topic("persistent://"
+ conf.getTenantName() + "/"
+ conf.getNamespace() + "/"
+ conf.getTopicName())
.startMessageIdInclusive()
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscriptionName("SimplePulsarConsumer")
.subscribe();
Pulsar creates an exclusive subscription by default when no |
-
Open the
pulsar-subscription-example
repo in the IDE of your choice and runSimplePulsarConsumer.java
to begin consuming messages.
The confirmation message and a cursor appear to indicate the consumer is ready.[main] INFO com.datastax.pulsar.Configuration - Configuration has been loaded successfully ... [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConsumerImpl - [persistent://<tenant_name>/<namespace>/in][SimplePulsarConsumer] Subscribed to topic on <service_url> -- consumer: 0
-
In a new terminal window, run
SimplePulsarProducer.java
to begin producing messages.[main] INFO com.datastax.pulsar.SimplePulsarProducer - Message 93573631 sent [main] INFO com.datastax.pulsar.SimplePulsarProducer - Message 16931522 sent [main] INFO com.datastax.pulsar.SimplePulsarProducer - Message 68306175 sent
-
The consumer begins consuming the produced messages.
[main] INFO com.datastax.pulsar.SimplePulsarConsumer - Message received: {"show_id":93573631,"cast":"LeBron James, Anthony Davis, Kyrie Irving, Damian Lillard, Klay Thompson...","country":"United States","date_added":"July 16, 2021","description":"NBA superstar LeBron James teams up with Bugs Bunny and the rest of the Looney Tunes for this long-awaited sequel.","director":"Malcolm D. Lee","duration":"120 min","listed_in":"Animation, Adventure, Comedy","rating":"PG","release_year":2021,"title":"Space Jam: A New Legacy","type":"Movie"} [main] INFO com.datastax.pulsar.SimplePulsarConsumer - Message received: {"show_id":16931522,"cast":"LeBron James, Anthony Davis, Kyrie Irving, Damian Lillard, Klay Thompson...","country":"United States","date_added":"July 16, 2021","description":"NBA superstar LeBron James teams up with Bugs Bunny and the rest of the Looney Tunes for this long-awaited sequel.","director":"Malcolm D. Lee","duration":"120 min","listed_in":"Animation, Adventure, Comedy","rating":"PG","release_year":2021,"title":"Space Jam: A New Legacy","type":"Movie"} [main] INFO com.datastax.pulsar.SimplePulsarConsumer - Message received: {"show_id":68306175,"cast":"LeBron James, Anthony Davis, Kyrie Irving, Damian Lillard, Klay Thompson...","country":"United States","date_added":"July 16, 2021","description":"NBA superstar LeBron James teams up with Bugs Bunny and the rest of the Looney Tunes for this long-awaited sequel.","director":"Malcolm D. Lee","duration":"120 min","listed_in":"Animation, Adventure, Comedy","rating":"PG","release_year":2021,"title":"Space Jam: A New Legacy","type":"Movie"}
-
Open a new terminal window and try to run
SimplePulsarConsumer.java
.[main] INFO com.datastax.pulsar.Configuration - Configuration has been loaded successfully ... [main] INFO com.datastax.pulsar.SimplePulsarConsumer - SimplePulsarProducer has been stopped. Exception in thread "main" java.lang.IllegalStateException: Cannot connect to pulsar at com.datastax.pulsar.SimplePulsarConsumer.main(SimplePulsarConsumer.java:53) Caused by: org.apache.pulsar.client.api.PulsarClientException$ConsumerBusyException: {"errorMsg":"Exclusive consumer is already connected","reqId":2964994443801550457, "remote":"<service_url>", "local":"/192.168.0.95:55777"}
The second consumer can’t subscribe to the topic because the subscription is exclusive.
In the example above, the consumer didn’t declare a subscription type, so Pulsar created an exclusive subscription by default.
To explicitly define an exclusive subscription, add .subscriptionType(SubscriptionType.Exclusive)
to the consumer.
What’s next
For more on subscriptions, see: