Exclusive subscriptions in Pulsar
Subscriptions in Pulsar describe which consumers are consuming data from a topic and how they want to consume that data.
An exclusive subscription describes a basic publish-subscribe (pub-sub) pattern where a single consumer subscribes to a single topic and consumes from it.
This page explains how to use Pulsar’s exclusive subscription model to manage your topic consumption.
Prerequisites
This example requires the following:
-
Astra Streaming access with at least one streaming tenant and one topic
-
A local clone of the DataStax Pulsar Subscription Example repository
-
In the
pulsar-subscription-example
repo, navigate tosrc/main/resources
, and then editapplication.properties
to connect to your Astra Streaming cluster:application.propertiesservice_url=BROKER_SERVICE_URL namespace=default tenant_name=my-tenant authentication_token=ASTRA_DB_APPLICATION_TOKEN topic_name=my-topic
Exclusive subscription example
-
To configure a Pulsar exclusive subscription, define a
pulsarConsumer
object inSimplePulsarConsumer.java
, as you would for other subscription types. However, you don’t need to declare asubscriptionType
. Whereas other subscription types required you to declare a specificsubscriptionType
, Pulsar creates an exclusive subscription by default if you don’t declare asubscriptionType
.SimplePulsarConsumer.javapulsarConsumer = pulsarClient.newConsumer(Schema.JSON(DemoBean.class)) .topic("persistent://" + conf.getTenantName() + "/" + conf.getNamespace() + "/" + conf.getTopicName()) .startMessageIdInclusive() .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) .subscriptionName("SimplePulsarConsumer") .subscribe();
If you want to explicitly define an exclusive subscription, you can add
.subscriptionType(SubscriptionType.Exclusive)
to the consumer. -
In the
pulsar-subscription-example
project, runSimplePulsarConsumer.java
to begin consuming messages.The confirmation message and a cursor appear to indicate the consumer is ready:
Result[main] INFO com.datastax.pulsar.Configuration - Configuration has been loaded successfully ... [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConsumerImpl - [persistent://<tenant_name>/<namespace>/in][SimplePulsarConsumer] Subscribed to topic on <service_url> -- consumer: 0
-
In a new terminal window, run
SimplePulsarProducer.java
to begin producing messages:Result[main] INFO com.datastax.pulsar.SimplePulsarProducer - Message 93573631 sent [main] INFO com.datastax.pulsar.SimplePulsarProducer - Message 16931522 sent [main] INFO com.datastax.pulsar.SimplePulsarProducer - Message 68306175 sent
In the
SimplePulsarConsumer
terminal, the consumer begins consuming the produced messages:Result[main] INFO com.datastax.pulsar.SimplePulsarConsumer - Message received: {"show_id":93573631,"cast":"LeBron James, Anthony Davis, Kyrie Irving, Damian Lillard, Klay Thompson...","country":"United States","date_added":"July 16, 2021","description":"NBA superstar LeBron James teams up with Bugs Bunny and the rest of the Looney Tunes for this long-awaited sequel.","director":"Malcolm D. Lee","duration":"120 min","listed_in":"Animation, Adventure, Comedy","rating":"PG","release_year":2021,"title":"Space Jam: A New Legacy","type":"Movie"} [main] INFO com.datastax.pulsar.SimplePulsarConsumer - Message received: {"show_id":16931522,"cast":"LeBron James, Anthony Davis, Kyrie Irving, Damian Lillard, Klay Thompson...","country":"United States","date_added":"July 16, 2021","description":"NBA superstar LeBron James teams up with Bugs Bunny and the rest of the Looney Tunes for this long-awaited sequel.","director":"Malcolm D. Lee","duration":"120 min","listed_in":"Animation, Adventure, Comedy","rating":"PG","release_year":2021,"title":"Space Jam: A New Legacy","type":"Movie"} [main] INFO com.datastax.pulsar.SimplePulsarConsumer - Message received: {"show_id":68306175,"cast":"LeBron James, Anthony Davis, Kyrie Irving, Damian Lillard, Klay Thompson...","country":"United States","date_added":"July 16, 2021","description":"NBA superstar LeBron James teams up with Bugs Bunny and the rest of the Looney Tunes for this long-awaited sequel.","director":"Malcolm D. Lee","duration":"120 min","listed_in":"Animation, Adventure, Comedy","rating":"PG","release_year":2021,"title":"Space Jam: A New Legacy","type":"Movie"}
-
In a new terminal window, try to run another instance of
SimplePulsarConsumer.java
.The second consumer can’t subscribe to the topic because the subscription is exclusive:
Result[main] INFO com.datastax.pulsar.Configuration - Configuration has been loaded successfully ... [main] INFO com.datastax.pulsar.SimplePulsarConsumer - SimplePulsarProducer has been stopped. Exception in thread "main" java.lang.IllegalStateException: Cannot connect to pulsar at com.datastax.pulsar.SimplePulsarConsumer.main(SimplePulsarConsumer.java:53) Caused by: org.apache.pulsar.client.api.PulsarClientException$ConsumerBusyException: {"errorMsg":"Exclusive consumer is already connected","reqId":2964994443801550457, "remote":"<service_url>", "local":"/192.168.0.95:55777"}