Geo-replication
A key feature of Apache Pulsar™ is how it handles geo-replication. While other pub-sub messaging systems require additional processes to mirror messages between data centers, Pulsar has multi-datacenter replication as an integrated feature.
Pulsar’s serving layer (brokers) and storage layer (bookies) are decoupled in Pulsar architecture, allowing seamless message replication across data centers in different regions.
Geo-replication typically provides redundancy in the event of datacenter outages, but can also be used for any application where Pulsar messages are produced and consumed across regions.
This doc provides:
If you’re already familiar with Pulsar’s messaging replication and asynchronous geo-replication, skip ahead to enabling geo-replication in Astra Streaming.
Overview of message replication in Pulsar
In Pulsar, cross-cluster message replication can be implemented with synchronous or asynchronous message replication, and with or without a global configuration store in ZooKeeper.
Astra Streaming only supports asynchronous geo-replication without a global configuration store. This approach offers better performance and lower latency.
In asynchronous geo-replication, each region has its own local Pulsar cluster. Messages published in a cluster of one region are automatically replicated asynchronously to remote clusters in other regions. This is achieved through Pulsar’s built-in geo-replication capability.
Asynchronous geo-replication overview
Astra Streaming only supports asynchronous geo-replication without a global configuration store.
An asynchronous geo-replication cluster consists of two or more independent Pulsar clusters running in different regions.
Each Pulsar cluster contains its own set of brokers, bookies, and ZooKeeper nodes that are completely isolated from one another.
When messages are produced on a Pulsar topic, they are first persisted to the local cluster, and are then replicated asynchronously to the remote clusters.
The message producer doesn’t wait for confirmation from multiple Pulsar clusters. The producer receives a response immediately after the nearest cluster successfully persists the data. The data is then asynchronously replicated to the other Pulsar clusters in the background.
To set up asynchronous geo-replication in Astra Streaming, see Enable geo-replication in Astra Streaming.
Enable geo-replication in Astra Streaming
Asynchronous geo-replication is enabled on a per-tenant basis and managed at the namespace level.
This means you can enable asynchronous geo-replication on topics where it is needed, while controlling which datasets are replicated by namespace.
To enable geo-replication in Astra Streaming, follow these steps:
-
In your Astra Streaming tenant, select Namespaces and Topics to list your tenant’s current namespaces and topics.
-
Select Modify namespace in the namespace you want to replicate from.
-
Select Replication, and select Add Region.
-
Select the Cloud Provider and Region you want to replicate to, and select Create Replication.
-
The new replication is created:
-
A new streaming tenant is replicated in the selected region and provider.
-
The new replication’s Provider, Region, and Replication Direction are listed in *Namespace Replication.
-
The Replication direction value can be Replicate to, Replicate from, or Bidirectional.
-
To control retention, backlog, and schema policies, visit the Settings tab.
To disable replication on a namespace, select Disable Replication.
Test georeplicated clusters
-
Verify your user token can access tenant metadata.
allowedClusters
lists the clusters the tenant can be replicated to.-
Command
-
Result
bin/pulsar-admin tenants get <tenant>
{ "adminRoles" : [ "client;838cbf7f-0331-46da-b559-ef1c2209b875;Z2VvcmVwbGljYXRlZC10ZW5hbnQ=;68fc51728f", "client;838cbf7f-0331-46da-b559-ef1c2209b875;Z2VvcmVwbGljYXRlZC10ZW5hbnQ=;25599ee732", "client;838cbf7f-0331-46da-b559-ef1c2209b875;Z2VvcmVwbGljYXRlZC10ZW5hbnQ=", "838cbf7f-0331-46da-b559-ef1c2209b875;Z2VvcmVwbGljYXRlZC10ZW5hbnQ=" ], "allowedClusters" : [ "pulsar-gcp-useast4-staging", "pulsar-aws-useast1-staging", "pulsar-gcp-useast1-staging", "pulsar-azure-westus2-staging", "pulsar-aws-useast2-staging" ] }
-
-
Verify your
pulsar-admin
can view the replicated clusters for your namespace.-
Command
-
Result
bin/pulsar-admin namespaces get-clusters <tenant>/<namespace>
pulsar-aws-useast1-staging pulsar-aws-useast2-staging
-
-
Create a Pulsar consumer with a subscription to the
<my-tenant>/<my-namespace>/<my-topic>
topic:bin/pulsar-client consume -s "subscription-test" <my-tenant>/<my-namespace>/<my-topic> -n 0
-
Create a Pulsar producer to produce messages to the
<my-tenant>/<my-namespace>/<my-topic>
topic:bin/pulsar-client produce <my-tenant>/<my-namespace>/<my-topic> --messages "hello-from-pulsar" -n 10
-
Your consumer will acknowledge receipt of the messages:
----- got message ----- key:[null], properties:[], content:hello-from-pulsar ----- got message ----- key:[null], properties:[], content:hello-from-pulsar ----- got message ----- key:[null], properties:[], content:hello-from-pulsar ----- got message ----- key:[null], properties:[], content:hello-from-pulsar ----- got message ----- key:[null], properties:[], content:hello-from-pulsar ----- got message ----- key:[null], properties:[], content:hello-from-pulsar ----- got message ----- key:[null], properties:[], content:hello-from-pulsar ----- got message ----- key:[null], properties:[], content:hello-from-pulsar ----- got message ----- key:[null], properties:[], content:hello-from-pulsar ----- got message ----- key:[null], properties:[], content:hello-from-pulsar
-
Navigate to the Namespaces and Topics tab in your geo-replicated Astra Streaming clusters.
persistent://<my-tenant>/<my-namespace>/<my-topic>
should be visible and showing traffic across all regions.
Replicated subscriptions
The isReplicated
value controls subscription behavior during replication.
Subscriptions are created with isReplicated=false
by default, and will not replicate when the cluster is replicated.
-
Specify
replicateSubscriptionState(true)
at subscription creation to replicate the subscription when the cluster is replicated.Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING) .topic(topic) .subscriptionName("my-subscription") .replicateSubscriptionState(true) .subscribe();
-
Check topic stats.
isReplicated
is nowtrue
for this subscription.-
Command
-
Result
bin/pulsar-admin topics stats persistent://<my-tenant>/<my-namespace>/<my-topic>
{ "msgRateIn" : 0.0, "msgThroughputIn" : 0.0, "msgRateOut" : 0.0, "msgThroughputOut" : 0.0, "bytesInCounter" : 880, "msgInCounter" : 10, "bytesOutCounter" : 1030, "msgOutCounter" : 10, "averageMsgSize" : 0.0, "msgChunkPublished" : false, "storageSize" : 927, "backlogSize" : 0, "publishRateLimitedTimes" : 0, "earliestMsgPublishTimeInBacklogs" : 0, "offloadedStorageSize" : 0, "lastOffloadLedgerId" : 0, "lastOffloadSuccessTimeStamp" : 0, "lastOffloadFailureTimeStamp" : 0, "publishers" : [ ], "waitingPublishers" : 0, "subscriptions" : { "my-subscription" : { "msgRateOut" : 0.0, "msgThroughputOut" : 0.0, "bytesOutCounter" : 1030, "msgOutCounter" : 10, "msgRateRedeliver" : 0.0, "messageAckRate" : 0.0, "chunkedMessageRate" : 0, "msgBacklog" : 0, "backlogSize" : 0, "earliestMsgPublishTimeInBacklog" : 0, "msgBacklogNoDelayed" : 0, "blockedSubscriptionOnUnackedMsgs" : false, "msgDelayed" : 0, "unackedMessages" : 0, "type" : "Exclusive", "msgRateExpired" : 0.0, "totalMsgExpired" : 0, "lastExpireTimestamp" : 0, "lastConsumedFlowTimestamp" : 1663262370972, "lastConsumedTimestamp" : 0, "lastAckedTimestamp" : 0, "lastMarkDeleteAdvancedTimestamp" : 1663262440379, "consumers" : [ ], "isDurable" : true, "isReplicated" : true, "allowOutOfOrderDelivery" : false, "consumersAfterMarkDeletePosition" : { }, "nonContiguousDeletedMessagesRanges" : 0, "nonContiguousDeletedMessagesRangesSerializedSize" : 0, "subscriptionProperties" : { }, "replicated" : false, "durable" : true } }, "replication" : { "pulsar-aws-useast2-staging" : { "msgRateIn" : 0.0, "msgThroughputIn" : 0.0, "msgRateOut" : 0.0, "msgThroughputOut" : 0.0, "msgRateExpired" : 0.0, "replicationBacklog" : 0, "connected" : true, "replicationDelayInSeconds" : 0, "inboundConnection" : "/192.168.98.62:40346", "inboundConnectedSince" : "2022-09-14T20:20:35.128325Z", "outboundConnection" : "[id: 0xd3b42242, L:/192.168.71.231:42272 - R:pulsar-aws-useast2.staging.streaming.datastax.com/3.14.0.138:6651]", "outboundConnectedSince" : "2022-09-14T18:37:16.060159Z" } }, "deduplicationStatus" : "Disabled", "nonContiguousDeletedMessagesRanges" : 0, "nonContiguousDeletedMessagesRangesSerializedSize" : 0, "compaction" : { "lastCompactionRemovedEventCount" : 0, "lastCompactionSucceedTimestamp" : 0, "lastCompactionFailedTimestamp" : 0, "lastCompactionDurationTimeInMills" : 0 } }
-
Monitor replicated clusters
Astra Streaming exposes the following topic-level replication metrics, which can be viewed in the Overview tab of the Namespaces and Topics page.
Name | Type | Description |
---|---|---|
pulsar_replication_rate_in |
Gauge |
The total message rate of the topic replicating from remote cluster (messages/second). |
pulsar_replication_rate_out |
Gauge |
The total message rate of the topic replicating to remote cluster (messages/second). |
pulsar_replication_throughput_in |
Gauge |
The total throughput of the topic replicating from remote cluster (bytes/second). |
pulsar_replication_throughput_out |
Gauge |
The total throughput of the topic replicating to remote cluster (bytes/second). |
pulsar_replication_backlog |
Gauge |
The total backlog of the topic replicating to remote cluster (messages). |
pulsar_replication_rate_expired |
Gauge |
Total rate of messages expired (messages/second) |
pulsar_replication_connected_count |
Gauge |
The count of replication subscribers up and running to replicate to remote clusters. |
pulsar_replication_delay_in_seconds |
Gauge |
Time in seconds from the time a message was produced to the time when it is about to be replicated. |
What’s next?
For more on multiregion georeplication, including region awareness and rack awareness, see the Pulsar docs.