Geo-replication

A key feature of Apache Pulsar™ is how it handles geo-replication. While other pub-sub messaging systems require additional processes to mirror messages between data centers, Pulsar has multi-datacenter replication as an integrated feature.

Pulsar’s serving layer (brokers) and storage layer (bookies) are decoupled in Pulsar architecture, allowing seamless message replication across data centers in different regions.

Geo-replication typically provides redundancy in the event of datacenter outages, but can also be used for any application where Pulsar messages are produced and consumed across regions.

Message replication in Pulsar

In Pulsar, cross-cluster message replication can be implemented with synchronous or asynchronous message replication, and with or without a global configuration store in ZooKeeper.

Astra Streaming supports only asynchronous geo-replication without a global configuration store. This approach offers better performance and lower latency.

In asynchronous geo-replication, each region has its own local Pulsar cluster. Each Pulsar cluster contains its own set of brokers, bookies, and ZooKeeper nodes that are completely isolated from one another.

When messages are produced on a Pulsar topic, they are first persisted to the local cluster, and are then replicated asynchronously to the remote clusters in other regions. This is achieved through Pulsar’s built-in geo-replication capability.

The message producer doesn’t wait for confirmation from multiple Pulsar clusters. The producer receives a response immediately after the nearest cluster successfully persists the data. The data is then asynchronously replicated to the other Pulsar clusters in the background.

Enable geo-replication in Astra Streaming

Asynchronous geo-replication is enabled on a per-tenant basis and managed at the namespace level.

This means you can enable asynchronous geo-replication on topics where it is needed, while controlling which datasets are replicated by namespace.

To enable geo-replication in Astra Streaming, follow these steps:

  1. In your Astra Streaming tenant, select Namespaces and Topics to list your tenant’s current namespaces and topics.

  2. Select Modify namespace in the namespace you want to replicate from.

  3. Select Replication, and select Add Region.

  4. Select the Cloud Provider and Region you want to replicate to, and select Create Replication.

  5. The new replication is created:

    • A new streaming tenant is replicated in the selected region and provider.

    • The new replication’s Provider, Region, and Replication Direction are listed in *Namespace Replication.

    • The Replication direction value can be Replicate to, Replicate from, or Bidirectional.

To control retention, backlog, and schema policies, visit the Settings tab.

To disable replication on a namespace, select Disable Replication.

Test georeplicated clusters

  1. Verify that your user token can access tenant metadata:

    bin/pulsar-admin tenants get TENANT_NAME

    In the result, the allowedClusters are the clusters where you can replicate the tenant:

    Result
    {
      "adminRoles" : [ "client;838cbf7f-0331-46da-b559-ef1c2209b875;Z2VvcmVwbGljYXRlZC10ZW5hbnQ=;68fc51728f", "client;838cbf7f-0331-46da-b559-ef1c2209b875;Z2VvcmVwbGljYXRlZC10ZW5hbnQ=;25599ee732", "client;838cbf7f-0331-46da-b559-ef1c2209b875;Z2VvcmVwbGljYXRlZC10ZW5hbnQ=", "838cbf7f-0331-46da-b559-ef1c2209b875;Z2VvcmVwbGljYXRlZC10ZW5hbnQ=" ],
      "allowedClusters" : [ "pulsar-gcp-useast4-staging", "pulsar-aws-useast1-staging", "pulsar-gcp-useast1-staging", "pulsar-azure-westus2-staging", "pulsar-aws-useast2-staging" ]
    }
  2. Verify your pulsar-admin can read replicated clusters for your namespace:

    bin/pulsar-admin namespaces get-clusters TENANT_NAME/NAMESPACE_NAME
    Result
    pulsar-aws-useast1-staging
    pulsar-aws-useast2-staging
  3. Create a Pulsar consumer with a subscription to a specified topic:

    bin/pulsar-client consume -s "subscription-test" TENANT_NAME/NAMESPACE_NAME/TOPIC_NAME -n 0
  4. Create a Pulsar producer to produce messages from a specific topic:

    bin/pulsar-client produce TENANT_NAME/NAMESPACE_NAME/TOPIC_NAME --messages "hello-from-pulsar" -n 10

    The consumer acknowledges the messages:

    Result
    ----- got message -----
    key:[null], properties:[], content:hello-from-pulsar
    ----- got message -----
    key:[null], properties:[], content:hello-from-pulsar
    ----- got message -----
    key:[null], properties:[], content:hello-from-pulsar
    ----- got message -----
    key:[null], properties:[], content:hello-from-pulsar
    ----- got message -----
    key:[null], properties:[], content:hello-from-pulsar
    ----- got message -----
    key:[null], properties:[], content:hello-from-pulsar
    ----- got message -----
    key:[null], properties:[], content:hello-from-pulsar
    ----- got message -----
    key:[null], properties:[], content:hello-from-pulsar
    ----- got message -----
    key:[null], properties:[], content:hello-from-pulsar
    ----- got message -----
    key:[null], properties:[], content:hello-from-pulsar
  5. In the Astra Portal, go to your tenant’s Namespaces and Topics tab.

  6. Make sure persistent://TENANT_NAME/NAMESPACE_NAME/TOPIC_NAME is visible and shows traffic across all regions.

Replicated subscriptions

The isReplicated value controls subscription behavior during replication.

Subscriptions are created with isReplicated=false by default, which means they do not replicate when the cluster is replicated.

  1. Specify replicateSubscriptionState(true) at subscription creation to replicate the subscription when the cluster is replicated:

    Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
            .topic(topic)
            .subscriptionName("my-subscription")
            .replicateSubscriptionState(true)
            .subscribe();
  2. Check topic stats:

    bin/pulsar-admin topics stats persistent://TENANT_NAME/NAMESPACE_NAME/TOPIC_NAME

    In the configuration, isReplicated is now true for this subscription.

    config.json
    {
      "msgRateIn" : 0.0,
      "msgThroughputIn" : 0.0,
      "msgRateOut" : 0.0,
      "msgThroughputOut" : 0.0,
      "bytesInCounter" : 880,
      "msgInCounter" : 10,
      "bytesOutCounter" : 1030,
      "msgOutCounter" : 10,
      "averageMsgSize" : 0.0,
      "msgChunkPublished" : false,
      "storageSize" : 927,
      "backlogSize" : 0,
      "publishRateLimitedTimes" : 0,
      "earliestMsgPublishTimeInBacklogs" : 0,
      "offloadedStorageSize" : 0,
      "lastOffloadLedgerId" : 0,
      "lastOffloadSuccessTimeStamp" : 0,
      "lastOffloadFailureTimeStamp" : 0,
      "publishers" : [ ],
      "waitingPublishers" : 0,
      "subscriptions" : {
        "my-subscription" : {
          "msgRateOut" : 0.0,
          "msgThroughputOut" : 0.0,
          "bytesOutCounter" : 1030,
          "msgOutCounter" : 10,
          "msgRateRedeliver" : 0.0,
          "messageAckRate" : 0.0,
          "chunkedMessageRate" : 0,
          "msgBacklog" : 0,
          "backlogSize" : 0,
          "earliestMsgPublishTimeInBacklog" : 0,
          "msgBacklogNoDelayed" : 0,
          "blockedSubscriptionOnUnackedMsgs" : false,
          "msgDelayed" : 0,
          "unackedMessages" : 0,
          "type" : "Exclusive",
          "msgRateExpired" : 0.0,
          "totalMsgExpired" : 0,
          "lastExpireTimestamp" : 0,
          "lastConsumedFlowTimestamp" : 1663262370972,
          "lastConsumedTimestamp" : 0,
          "lastAckedTimestamp" : 0,
          "lastMarkDeleteAdvancedTimestamp" : 1663262440379,
          "consumers" : [ ],
          "isDurable" : true,
          "isReplicated" : true,
          "allowOutOfOrderDelivery" : false,
          "consumersAfterMarkDeletePosition" : { },
          "nonContiguousDeletedMessagesRanges" : 0,
          "nonContiguousDeletedMessagesRangesSerializedSize" : 0,
          "subscriptionProperties" : { },
          "replicated" : false,
          "durable" : true
        }
      },
      "replication" : {
        "pulsar-aws-useast2-staging" : {
          "msgRateIn" : 0.0,
          "msgThroughputIn" : 0.0,
          "msgRateOut" : 0.0,
          "msgThroughputOut" : 0.0,
          "msgRateExpired" : 0.0,
          "replicationBacklog" : 0,
          "connected" : true,
          "replicationDelayInSeconds" : 0,
          "inboundConnection" : "/192.168.98.62:40346",
          "inboundConnectedSince" : "2022-09-14T20:20:35.128325Z",
          "outboundConnection" : "[id: 0xd3b42242, L:/192.168.71.231:42272 - R:pulsar-aws-useast2.streaming.datastax.com/3.14.0.138:6651]",
          "outboundConnectedSince" : "2022-09-14T18:37:16.060159Z"
        }
      },
      "deduplicationStatus" : "Disabled",
      "nonContiguousDeletedMessagesRanges" : 0,
      "nonContiguousDeletedMessagesRangesSerializedSize" : 0,
      "compaction" : {
        "lastCompactionRemovedEventCount" : 0,
        "lastCompactionSucceedTimestamp" : 0,
        "lastCompactionFailedTimestamp" : 0,
        "lastCompactionDurationTimeInMills" : 0
      }
    }

Monitor replicated clusters

Astra Streaming exposes the following topic-level replication metrics, which can be viewed in the Overview section of Namespaces and Topics in the Astra Portal:

Name Type Description

pulsar_replication_rate_in

Gauge

The total message rate of the topic replicating from remote cluster (messages per second).

pulsar_replication_rate_out

Gauge

The total message rate of the topic replicating to remote cluster (messages per second).

pulsar_replication_throughput_in

Gauge

The total throughput of the topic replicating from remote cluster (bytes per second).

pulsar_replication_throughput_out

Gauge

The total throughput of the topic replicating to remote cluster (bytes per second).

pulsar_replication_backlog

Gauge

The total backlog of the topic replicating to remote cluster (messages).

pulsar_replication_rate_expired

Gauge

Total rate of messages expired (messages per second)

pulsar_replication_connected_count

Gauge

The count of replication subscribers up and running to replicate to remote clusters.

pulsar_replication_delay_in_seconds

Gauge

Time in seconds from the time a message was produced to the time when it is about to be replicated.

See also

For more on multiregion georeplication, including region awareness and rack awareness, see the Pulsar documentation.

Was this helpful?

Give Feedback

How can we improve the documentation?

© 2024 DataStax | Privacy policy | Terms of use

Apache, Apache Cassandra, Cassandra, Apache Tomcat, Tomcat, Apache Lucene, Apache Solr, Apache Hadoop, Hadoop, Apache Pulsar, Pulsar, Apache Spark, Spark, Apache TinkerPop, TinkerPop, Apache Kafka and Kafka are either registered trademarks or trademarks of the Apache Software Foundation or its subsidiaries in Canada, the United States and/or other countries. Kubernetes is the registered trademark of the Linux Foundation.

General Inquiries: +1 (650) 389-6000, info@datastax.com