Geo-replication
A key feature of Apache Pulsar™ is how it handles geo-replication. While other pub-sub messaging systems require additional processes to mirror messages between data centers, Pulsar has multi-datacenter replication as an integrated feature.
Pulsar’s serving layer (brokers) and storage layer (bookies) are decoupled in Pulsar architecture, allowing seamless message replication across data centers in different regions.
Geo-replication typically provides redundancy in the event of datacenter outages, but can also be used for any application where Pulsar messages are produced and consumed across regions.
Message replication in Pulsar
In Pulsar, cross-cluster message replication can be implemented with synchronous or asynchronous message replication, and with or without a global configuration store in ZooKeeper.
Astra Streaming supports only asynchronous geo-replication without a global configuration store. This approach offers better performance and lower latency.
In asynchronous geo-replication, each region has its own local Pulsar cluster. Each Pulsar cluster contains its own set of brokers, bookies, and ZooKeeper nodes that are completely isolated from one another.
When messages are produced on a Pulsar topic, they are first persisted to the local cluster, and are then replicated asynchronously to the remote clusters in other regions. This is achieved through Pulsar’s built-in geo-replication capability.
The message producer doesn’t wait for confirmation from multiple Pulsar clusters. The producer receives a response immediately after the nearest cluster successfully persists the data. The data is then asynchronously replicated to the other Pulsar clusters in the background.
Enable geo-replication in Astra Streaming
Asynchronous geo-replication is enabled on a per-tenant basis and managed at the namespace level.
This means you can enable asynchronous geo-replication on topics where it is needed, while controlling which datasets are replicated by namespace.
To enable geo-replication in Astra Streaming, follow these steps:
-
In your Astra Streaming tenant, select Namespaces and Topics to list your tenant’s current namespaces and topics.
-
Select Modify namespace in the namespace you want to replicate from.
-
Select Replication, and select Add Region.
-
Select the Cloud Provider and Region you want to replicate to, and select Create Replication.
-
The new replication is created:
-
A new streaming tenant is replicated in the selected region and provider.
-
The new replication’s Provider, Region, and Replication Direction are listed in *Namespace Replication.
-
The Replication direction value can be Replicate to, Replicate from, or Bidirectional.
-
To control retention, backlog, and schema policies, visit the Settings tab.
To disable replication on a namespace, select Disable Replication.
Test georeplicated clusters
-
Verify that your user token can access tenant metadata:
bin/pulsar-admin tenants get TENANT_NAME
In the result, the
allowedClusters
are the clusters where you can replicate the tenant:Result{ "adminRoles" : [ "client;838cbf7f-0331-46da-b559-ef1c2209b875;Z2VvcmVwbGljYXRlZC10ZW5hbnQ=;68fc51728f", "client;838cbf7f-0331-46da-b559-ef1c2209b875;Z2VvcmVwbGljYXRlZC10ZW5hbnQ=;25599ee732", "client;838cbf7f-0331-46da-b559-ef1c2209b875;Z2VvcmVwbGljYXRlZC10ZW5hbnQ=", "838cbf7f-0331-46da-b559-ef1c2209b875;Z2VvcmVwbGljYXRlZC10ZW5hbnQ=" ], "allowedClusters" : [ "pulsar-gcp-useast4-staging", "pulsar-aws-useast1-staging", "pulsar-gcp-useast1-staging", "pulsar-azure-westus2-staging", "pulsar-aws-useast2-staging" ] }
-
Verify your
pulsar-admin
can read replicated clusters for your namespace:bin/pulsar-admin namespaces get-clusters TENANT_NAME/NAMESPACE_NAME
Result
pulsar-aws-useast1-staging pulsar-aws-useast2-staging
-
Create a Pulsar consumer with a subscription to a specified topic:
bin/pulsar-client consume -s "subscription-test" TENANT_NAME/NAMESPACE_NAME/TOPIC_NAME -n 0
-
Create a Pulsar producer to produce messages from a specific topic:
bin/pulsar-client produce TENANT_NAME/NAMESPACE_NAME/TOPIC_NAME --messages "hello-from-pulsar" -n 10
The consumer acknowledges the messages:
Result----- got message ----- key:[null], properties:[], content:hello-from-pulsar ----- got message ----- key:[null], properties:[], content:hello-from-pulsar ----- got message ----- key:[null], properties:[], content:hello-from-pulsar ----- got message ----- key:[null], properties:[], content:hello-from-pulsar ----- got message ----- key:[null], properties:[], content:hello-from-pulsar ----- got message ----- key:[null], properties:[], content:hello-from-pulsar ----- got message ----- key:[null], properties:[], content:hello-from-pulsar ----- got message ----- key:[null], properties:[], content:hello-from-pulsar ----- got message ----- key:[null], properties:[], content:hello-from-pulsar ----- got message ----- key:[null], properties:[], content:hello-from-pulsar
-
In the Astra Portal, go to your tenant’s Namespaces and Topics tab.
-
Make sure
persistent://TENANT_NAME/NAMESPACE_NAME/TOPIC_NAME
is visible and shows traffic across all regions.
Replicated subscriptions
The isReplicated
value controls subscription behavior during replication.
Subscriptions are created with isReplicated=false
by default, which means they do not replicate when the cluster is replicated.
-
Specify
replicateSubscriptionState(true)
at subscription creation to replicate the subscription when the cluster is replicated:Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING) .topic(topic) .subscriptionName("my-subscription") .replicateSubscriptionState(true) .subscribe();
-
Check topic stats:
bin/pulsar-admin topics stats persistent://TENANT_NAME/NAMESPACE_NAME/TOPIC_NAME
In the configuration,
isReplicated
is nowtrue
for this subscription.config.json
{ "msgRateIn" : 0.0, "msgThroughputIn" : 0.0, "msgRateOut" : 0.0, "msgThroughputOut" : 0.0, "bytesInCounter" : 880, "msgInCounter" : 10, "bytesOutCounter" : 1030, "msgOutCounter" : 10, "averageMsgSize" : 0.0, "msgChunkPublished" : false, "storageSize" : 927, "backlogSize" : 0, "publishRateLimitedTimes" : 0, "earliestMsgPublishTimeInBacklogs" : 0, "offloadedStorageSize" : 0, "lastOffloadLedgerId" : 0, "lastOffloadSuccessTimeStamp" : 0, "lastOffloadFailureTimeStamp" : 0, "publishers" : [ ], "waitingPublishers" : 0, "subscriptions" : { "my-subscription" : { "msgRateOut" : 0.0, "msgThroughputOut" : 0.0, "bytesOutCounter" : 1030, "msgOutCounter" : 10, "msgRateRedeliver" : 0.0, "messageAckRate" : 0.0, "chunkedMessageRate" : 0, "msgBacklog" : 0, "backlogSize" : 0, "earliestMsgPublishTimeInBacklog" : 0, "msgBacklogNoDelayed" : 0, "blockedSubscriptionOnUnackedMsgs" : false, "msgDelayed" : 0, "unackedMessages" : 0, "type" : "Exclusive", "msgRateExpired" : 0.0, "totalMsgExpired" : 0, "lastExpireTimestamp" : 0, "lastConsumedFlowTimestamp" : 1663262370972, "lastConsumedTimestamp" : 0, "lastAckedTimestamp" : 0, "lastMarkDeleteAdvancedTimestamp" : 1663262440379, "consumers" : [ ], "isDurable" : true, "isReplicated" : true, "allowOutOfOrderDelivery" : false, "consumersAfterMarkDeletePosition" : { }, "nonContiguousDeletedMessagesRanges" : 0, "nonContiguousDeletedMessagesRangesSerializedSize" : 0, "subscriptionProperties" : { }, "replicated" : false, "durable" : true } }, "replication" : { "pulsar-aws-useast2-staging" : { "msgRateIn" : 0.0, "msgThroughputIn" : 0.0, "msgRateOut" : 0.0, "msgThroughputOut" : 0.0, "msgRateExpired" : 0.0, "replicationBacklog" : 0, "connected" : true, "replicationDelayInSeconds" : 0, "inboundConnection" : "/192.168.98.62:40346", "inboundConnectedSince" : "2022-09-14T20:20:35.128325Z", "outboundConnection" : "[id: 0xd3b42242, L:/192.168.71.231:42272 - R:pulsar-aws-useast2.streaming.datastax.com/3.14.0.138:6651]", "outboundConnectedSince" : "2022-09-14T18:37:16.060159Z" } }, "deduplicationStatus" : "Disabled", "nonContiguousDeletedMessagesRanges" : 0, "nonContiguousDeletedMessagesRangesSerializedSize" : 0, "compaction" : { "lastCompactionRemovedEventCount" : 0, "lastCompactionSucceedTimestamp" : 0, "lastCompactionFailedTimestamp" : 0, "lastCompactionDurationTimeInMills" : 0 } }
Monitor replicated clusters
Astra Streaming exposes the following topic-level replication metrics, which can be viewed in the Overview section of Namespaces and Topics in the Astra Portal:
Name | Type | Description |
---|---|---|
|
Gauge |
The total message rate of the topic replicating from remote cluster (messages per second). |
|
Gauge |
The total message rate of the topic replicating to remote cluster (messages per second). |
|
Gauge |
The total throughput of the topic replicating from remote cluster (bytes per second). |
|
Gauge |
The total throughput of the topic replicating to remote cluster (bytes per second). |
|
Gauge |
The total backlog of the topic replicating to remote cluster (messages). |
|
Gauge |
Total rate of messages expired (messages per second) |
|
Gauge |
The count of replication subscribers up and running to replicate to remote clusters. |
|
Gauge |
Time in seconds from the time a message was produced to the time when it is about to be replicated. |
See also
For more on multiregion georeplication, including region awareness and rack awareness, see the Pulsar documentation.