Exclusive subscriptions in Pulsar

Subscriptions in Pulsar describe which consumers are consuming data from a topic and how they want to consume that data.

An exclusive subscription describes a basic publish-subscribe (pub-sub) pattern where a single consumer subscribes to a single topic and consumes from it.

This page explains how to use Pulsar’s exclusive subscription model to manage your topic consumption.

Prerequisites

This example requires the following:

  • Apache Maven

  • Java OpenJDK 11

  • Astra Streaming access with at least one streaming tenant and one topic

  • A local clone of the DataStax Pulsar Subscription Example repository

  • In the pulsar-subscription-example repo, navigate to src/main/resources, and then edit application.properties to connect to your Astra Streaming cluster:

    application.properties
    service_url=BROKER_SERVICE_URL
    namespace=default
    tenant_name=my-tenant
    authentication_token=ASTRA_DB_APPLICATION_TOKEN
    topic_name=my-topic

Exclusive subscription example

  1. To configure a Pulsar exclusive subscription, define a pulsarConsumer object in SimplePulsarConsumer.java, as you would for other subscription types. However, you don’t need to declare a subscriptionType. Whereas other subscription types required you to declare a specific subscriptionType, Pulsar creates an exclusive subscription by default if you don’t declare a subscriptionType.

    SimplePulsarConsumer.java
    pulsarConsumer = pulsarClient.newConsumer(Schema.JSON(DemoBean.class))
        .topic("persistent://"
                + conf.getTenantName() + "/"
                + conf.getNamespace() + "/"
                + conf.getTopicName())
        .startMessageIdInclusive()
        .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
        .subscriptionName("SimplePulsarConsumer")
        .subscribe();

    If you want to explicitly define an exclusive subscription, you can add .subscriptionType(SubscriptionType.Exclusive) to the consumer.

  2. In the pulsar-subscription-example project, run SimplePulsarConsumer.java to begin consuming messages.

    The confirmation message and a cursor appear to indicate the consumer is ready:

    Result
    [main] INFO com.datastax.pulsar.Configuration - Configuration has been loaded successfully
    ...
    [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConsumerImpl - [persistent://<tenant_name>/<namespace>/in][SimplePulsarConsumer] Subscribed to topic on <service_url> -- consumer: 0
  3. In a new terminal window, run SimplePulsarProducer.java to begin producing messages:

    Result
    [main] INFO com.datastax.pulsar.SimplePulsarProducer - Message 93573631 sent
    [main] INFO com.datastax.pulsar.SimplePulsarProducer - Message 16931522 sent
    [main] INFO com.datastax.pulsar.SimplePulsarProducer - Message 68306175 sent

    In the SimplePulsarConsumer terminal, the consumer begins consuming the produced messages:

    Result
    [main] INFO com.datastax.pulsar.SimplePulsarConsumer - Message received: {"show_id":93573631,"cast":"LeBron James, Anthony Davis, Kyrie Irving, Damian Lillard, Klay Thompson...","country":"United States","date_added":"July 16, 2021","description":"NBA superstar LeBron James teams up with Bugs Bunny and the rest of the Looney Tunes for this long-awaited sequel.","director":"Malcolm D. Lee","duration":"120 min","listed_in":"Animation, Adventure, Comedy","rating":"PG","release_year":2021,"title":"Space Jam: A New Legacy","type":"Movie"}
    [main] INFO com.datastax.pulsar.SimplePulsarConsumer - Message received: {"show_id":16931522,"cast":"LeBron James, Anthony Davis, Kyrie Irving, Damian Lillard, Klay Thompson...","country":"United States","date_added":"July 16, 2021","description":"NBA superstar LeBron James teams up with Bugs Bunny and the rest of the Looney Tunes for this long-awaited sequel.","director":"Malcolm D. Lee","duration":"120 min","listed_in":"Animation, Adventure, Comedy","rating":"PG","release_year":2021,"title":"Space Jam: A New Legacy","type":"Movie"}
    [main] INFO com.datastax.pulsar.SimplePulsarConsumer - Message received: {"show_id":68306175,"cast":"LeBron James, Anthony Davis, Kyrie Irving, Damian Lillard, Klay Thompson...","country":"United States","date_added":"July 16, 2021","description":"NBA superstar LeBron James teams up with Bugs Bunny and the rest of the Looney Tunes for this long-awaited sequel.","director":"Malcolm D. Lee","duration":"120 min","listed_in":"Animation, Adventure, Comedy","rating":"PG","release_year":2021,"title":"Space Jam: A New Legacy","type":"Movie"}
  4. In a new terminal window, try to run another instance of SimplePulsarConsumer.java.

    The second consumer can’t subscribe to the topic because the subscription is exclusive:

    Result
    [main] INFO com.datastax.pulsar.Configuration - Configuration has been loaded successfully
    ...
    [main] INFO com.datastax.pulsar.SimplePulsarConsumer - SimplePulsarProducer has been stopped.
    Exception in thread "main" java.lang.IllegalStateException: Cannot connect to pulsar
    	at com.datastax.pulsar.SimplePulsarConsumer.main(SimplePulsarConsumer.java:53)
    Caused by: org.apache.pulsar.client.api.PulsarClientException$ConsumerBusyException: {"errorMsg":"Exclusive consumer is already connected","reqId":2964994443801550457, "remote":"<service_url>", "local":"/192.168.0.95:55777"}

Was this helpful?

Give Feedback

How can we improve the documentation?

© 2024 DataStax | Privacy policy | Terms of use

Apache, Apache Cassandra, Cassandra, Apache Tomcat, Tomcat, Apache Lucene, Apache Solr, Apache Hadoop, Hadoop, Apache Pulsar, Pulsar, Apache Spark, Spark, Apache TinkerPop, TinkerPop, Apache Kafka and Kafka are either registered trademarks or trademarks of the Apache Software Foundation or its subsidiaries in Canada, the United States and/or other countries. Kubernetes is the registered trademark of the Linux Foundation.

General Inquiries: +1 (650) 389-6000, info@datastax.com