Geo-replication

A key feature of Apache Pulsar™ is how it handles geo-replication. While other pub-sub messaging systems require additional processes to mirror messages between data centers, Pulsar has multi-datacenter replication as an integrated feature.

Pulsar’s serving layer (brokers) and storage layer (bookies) are decoupled in Pulsar architecture, allowing seamless message replication across data centers in different regions.

Geo-replication typically provides redundancy in the event of datacenter outages, but can also be used for any application where Pulsar messages are produced and consumed across regions.

This doc provides:

If you’re already familiar with Pulsar’s messaging replication and asynchronous geo-replication, skip ahead to enabling geo-replication in Astra Streaming.

Overview of message replication in Pulsar

In Pulsar, cross-cluster message replication can be implemented with synchronous or asynchronous message replication, and with or without a global configuration store in ZooKeeper.

Astra Streaming only supports asynchronous geo-replication without a global configuration store. This approach offers better performance and lower latency.

In asynchronous geo-replication, each region has its own local Pulsar cluster. Messages published in a cluster of one region are automatically replicated asynchronously to remote clusters in other regions. This is achieved through Pulsar’s built-in geo-replication capability.

Asynchronous geo-replication overview

Astra Streaming only supports asynchronous geo-replication without a global configuration store.

An asynchronous geo-replication cluster consists of two or more independent Pulsar clusters running in different regions.

Each Pulsar cluster contains its own set of brokers, bookies, and ZooKeeper nodes that are completely isolated from one another.

When messages are produced on a Pulsar topic, they are first persisted to the local cluster, and are then replicated asynchronously to the remote clusters.

The message producer doesn’t wait for confirmation from multiple Pulsar clusters. The producer receives a response immediately after the nearest cluster successfully persists the data. The data is then asynchronously replicated to the other Pulsar clusters in the background.

To set up asynchronous geo-replication in Astra Streaming, see Enable geo-replication in Astra Streaming.

Enable geo-replication in Astra Streaming

Asynchronous geo-replication is enabled on a per-tenant basis and managed at the namespace level.

This means you can enable asynchronous geo-replication on topics where it is needed, while controlling which datasets are replicated by namespace.

To enable geo-replication in Astra Streaming, follow these steps:

  1. In your Astra Streaming tenant, select Namespaces and Topics to list your tenant’s current namespaces and topics.

  2. Select Modify namespace in the namespace you want to replicate from.

  3. Select Replication, and select Add Region.

  4. Select the Cloud Provider and Region you want to replicate to, and select Create Replication.

  5. The new replication is created:

    • A new streaming tenant is replicated in the selected region and provider.

    • The new replication’s Provider, Region, and Replication Direction are listed in *Namespace Replication.

    • The Replication direction value can be Replicate to, Replicate from, or Bidirectional.

To control retention, backlog, and schema policies, visit the Settings tab.

To disable replication on a namespace, select Disable Replication.

Test georeplicated clusters

  1. Verify your user token can access tenant metadata.
    allowedClusters lists the clusters the tenant can be replicated to.

    • Command

    • Result

    bin/pulsar-admin tenants get <tenant>
    {
      "adminRoles" : [ "client;838cbf7f-0331-46da-b559-ef1c2209b875;Z2VvcmVwbGljYXRlZC10ZW5hbnQ=;68fc51728f", "client;838cbf7f-0331-46da-b559-ef1c2209b875;Z2VvcmVwbGljYXRlZC10ZW5hbnQ=;25599ee732", "client;838cbf7f-0331-46da-b559-ef1c2209b875;Z2VvcmVwbGljYXRlZC10ZW5hbnQ=", "838cbf7f-0331-46da-b559-ef1c2209b875;Z2VvcmVwbGljYXRlZC10ZW5hbnQ=" ],
      "allowedClusters" : [ "pulsar-gcp-useast4-staging", "pulsar-aws-useast1-staging", "pulsar-gcp-useast1-staging", "pulsar-azure-westus2-staging", "pulsar-aws-useast2-staging" ]
    }
  2. Verify your pulsar-admin can view the replicated clusters for your namespace.

    • Command

    • Result

    bin/pulsar-admin namespaces get-clusters <tenant>/<namespace>
    pulsar-aws-useast1-staging
    pulsar-aws-useast2-staging
  3. Create a Pulsar consumer with a subscription to the <my-tenant>/<my-namespace>/<my-topic> topic:

    bin/pulsar-client consume -s "subscription-test" <my-tenant>/<my-namespace>/<my-topic> -n 0
  4. Create a Pulsar producer to produce messages to the <my-tenant>/<my-namespace>/<my-topic> topic:

    bin/pulsar-client produce <my-tenant>/<my-namespace>/<my-topic> --messages "hello-from-pulsar" -n 10
  5. Your consumer will acknowledge receipt of the messages:

    ----- got message -----
    key:[null], properties:[], content:hello-from-pulsar
    ----- got message -----
    key:[null], properties:[], content:hello-from-pulsar
    ----- got message -----
    key:[null], properties:[], content:hello-from-pulsar
    ----- got message -----
    key:[null], properties:[], content:hello-from-pulsar
    ----- got message -----
    key:[null], properties:[], content:hello-from-pulsar
    ----- got message -----
    key:[null], properties:[], content:hello-from-pulsar
    ----- got message -----
    key:[null], properties:[], content:hello-from-pulsar
    ----- got message -----
    key:[null], properties:[], content:hello-from-pulsar
    ----- got message -----
    key:[null], properties:[], content:hello-from-pulsar
    ----- got message -----
    key:[null], properties:[], content:hello-from-pulsar
  6. Navigate to the Namespaces and Topics tab in your geo-replicated Astra Streaming clusters.
    persistent://<my-tenant>/<my-namespace>/<my-topic> should be visible and showing traffic across all regions.

Replicated subscriptions

The isReplicated value controls subscription behavior during replication.

Subscriptions are created with isReplicated=false by default, and will not replicate when the cluster is replicated.

  1. Specify replicateSubscriptionState(true) at subscription creation to replicate the subscription when the cluster is replicated.

    Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
            .topic(topic)
            .subscriptionName("my-subscription")
            .replicateSubscriptionState(true)
            .subscribe();
  2. Check topic stats. isReplicated is now true for this subscription.

    • Command

    • Result

    bin/pulsar-admin topics stats persistent://<my-tenant>/<my-namespace>/<my-topic>
    {
      "msgRateIn" : 0.0,
      "msgThroughputIn" : 0.0,
      "msgRateOut" : 0.0,
      "msgThroughputOut" : 0.0,
      "bytesInCounter" : 880,
      "msgInCounter" : 10,
      "bytesOutCounter" : 1030,
      "msgOutCounter" : 10,
      "averageMsgSize" : 0.0,
      "msgChunkPublished" : false,
      "storageSize" : 927,
      "backlogSize" : 0,
      "publishRateLimitedTimes" : 0,
      "earliestMsgPublishTimeInBacklogs" : 0,
      "offloadedStorageSize" : 0,
      "lastOffloadLedgerId" : 0,
      "lastOffloadSuccessTimeStamp" : 0,
      "lastOffloadFailureTimeStamp" : 0,
      "publishers" : [ ],
      "waitingPublishers" : 0,
      "subscriptions" : {
        "my-subscription" : {
          "msgRateOut" : 0.0,
          "msgThroughputOut" : 0.0,
          "bytesOutCounter" : 1030,
          "msgOutCounter" : 10,
          "msgRateRedeliver" : 0.0,
          "messageAckRate" : 0.0,
          "chunkedMessageRate" : 0,
          "msgBacklog" : 0,
          "backlogSize" : 0,
          "earliestMsgPublishTimeInBacklog" : 0,
          "msgBacklogNoDelayed" : 0,
          "blockedSubscriptionOnUnackedMsgs" : false,
          "msgDelayed" : 0,
          "unackedMessages" : 0,
          "type" : "Exclusive",
          "msgRateExpired" : 0.0,
          "totalMsgExpired" : 0,
          "lastExpireTimestamp" : 0,
          "lastConsumedFlowTimestamp" : 1663262370972,
          "lastConsumedTimestamp" : 0,
          "lastAckedTimestamp" : 0,
          "lastMarkDeleteAdvancedTimestamp" : 1663262440379,
          "consumers" : [ ],
          "isDurable" : true,
          "isReplicated" : true,
          "allowOutOfOrderDelivery" : false,
          "consumersAfterMarkDeletePosition" : { },
          "nonContiguousDeletedMessagesRanges" : 0,
          "nonContiguousDeletedMessagesRangesSerializedSize" : 0,
          "subscriptionProperties" : { },
          "replicated" : false,
          "durable" : true
        }
      },
      "replication" : {
        "pulsar-aws-useast2-staging" : {
          "msgRateIn" : 0.0,
          "msgThroughputIn" : 0.0,
          "msgRateOut" : 0.0,
          "msgThroughputOut" : 0.0,
          "msgRateExpired" : 0.0,
          "replicationBacklog" : 0,
          "connected" : true,
          "replicationDelayInSeconds" : 0,
          "inboundConnection" : "/192.168.98.62:40346",
          "inboundConnectedSince" : "2022-09-14T20:20:35.128325Z",
          "outboundConnection" : "[id: 0xd3b42242, L:/192.168.71.231:42272 - R:pulsar-aws-useast2.staging.streaming.datastax.com/3.14.0.138:6651]",
          "outboundConnectedSince" : "2022-09-14T18:37:16.060159Z"
        }
      },
      "deduplicationStatus" : "Disabled",
      "nonContiguousDeletedMessagesRanges" : 0,
      "nonContiguousDeletedMessagesRangesSerializedSize" : 0,
      "compaction" : {
        "lastCompactionRemovedEventCount" : 0,
        "lastCompactionSucceedTimestamp" : 0,
        "lastCompactionFailedTimestamp" : 0,
        "lastCompactionDurationTimeInMills" : 0
      }
    }

Monitor replicated clusters

Astra Streaming exposes the following topic-level replication metrics, which can be viewed in the Overview tab of the Namespaces and Topics page.

Name Type Description

pulsar_replication_rate_in

Gauge

The total message rate of the topic replicating from remote cluster (messages/second).

pulsar_replication_rate_out

Gauge

The total message rate of the topic replicating to remote cluster (messages/second).

pulsar_replication_throughput_in

Gauge

The total throughput of the topic replicating from remote cluster (bytes/second).

pulsar_replication_throughput_out

Gauge

The total throughput of the topic replicating to remote cluster (bytes/second).

pulsar_replication_backlog

Gauge

The total backlog of the topic replicating to remote cluster (messages).

pulsar_replication_rate_expired

Gauge

Total rate of messages expired (messages/second)

pulsar_replication_connected_count

Gauge

The count of replication subscribers up and running to replicate to remote clusters.

pulsar_replication_delay_in_seconds

Gauge

Time in seconds from the time a message was produced to the time when it is about to be replicated.

What’s next?

For more on multiregion georeplication, including region awareness and rack awareness, see the Pulsar docs.

Was this helpful?

Give Feedback

How can we improve the documentation?

© 2024 DataStax | Privacy policy | Terms of use

Apache, Apache Cassandra, Cassandra, Apache Tomcat, Tomcat, Apache Lucene, Apache Solr, Apache Hadoop, Hadoop, Apache Pulsar, Pulsar, Apache Spark, Spark, Apache TinkerPop, TinkerPop, Apache Kafka and Kafka are either registered trademarks or trademarks of the Apache Software Foundation or its subsidiaries in Canada, the United States and/or other countries. Kubernetes is the registered trademark of the Linux Foundation.

General Inquiries: +1 (650) 389-6000, info@datastax.com