Create a Change Data Capture (CDC) connector
CDC connectors are only available for Astra DB Serverless deployments. |
CDC for Astra DB automatically captures changes in real time, de-duplicates the changes, and streams the clean set of changed data into Astra Streaming where it can be processed by client applications or sent to downstream systems.
Astra Streaming processes data changes via a Pulsar topic. By design, the Change Data Capture (CDC) component is simple, with a 1:1 correspondence between the table and a single Pulsar topic.
This doc will show you how to create a CDC connector for your Astra DB deployment and send change data to an Elasticsearch sink.
Enabling CDC for Astra DB Serverless databases increases costs based on your Astra Streaming usage. See Astra Streaming pricing and CDC metering rates. |
Supported data structures
The following data types and corresponding AVRO or logical types are supported for CDC for Astra DB Serverless databases:
Data type |
AVRO type |
ascii |
string |
bigint |
long |
blob |
bytes |
boolean |
boolean |
counter |
long |
date |
int |
decimal |
cql_decimal |
double |
double |
duration |
cql_duration |
float |
float |
inet |
string |
int |
int |
list |
array |
map |
map (only string-type keys are supported) |
set |
array |
smallint |
int |
text |
string |
time |
long |
timestamp |
long |
timeuuid |
string |
tinyint |
int |
uuid |
string |
varchar |
string |
varint |
cql_varint / bytes |
Cassandra static columns are supported:
-
On row-level updates, static columns are included in the message value.
-
On partition-level updates, the clustering keys are null in the message key. The message value only has static columns on
INSERT
andUPDATE
operations.
For columns using data types that are not supported, the data types are omitted from the events sent to the data topic. If a row update contains both supported and unsupported data types, the event includes only columns with supported data types.
AVRO interpretation
Astra DB Serverless database keys are strings, while CDC produces AVRO messages which are structures. The conversion for some AVRO structures requires additional tooling that can result in unexpected output.
The table below describes the conversion of AVRO logical types. The record
type is a schema containing the listed fields.
Name | AVRO type | Fields | Explanation |
---|---|---|---|
collections |
array |
lists, sets |
Sets and Lists are treated as AVRO type |
decimal |
record |
BIG_INT, DECIMAL_SCALE |
The Cassandra DECIMAL type is converted to a |
duration |
record |
CQL_DURATION_MONTHS, CQL_DURATION_DAYS, CQL_DURATION_NANOSECONDS |
The Cassandra DURATION type is converted to a |
maps |
map |
KEYS_CONVERTED_TO_STRINGS, VALUE_SCHEMA |
The Cassandra MAP type is converted to the AVRO map type, but the keys are converted to strings. For complex types, the key is represented in JSON. |
Limitations
CDC for Astra DB Serverless databases has the following limitations:
-
Does not manage table truncates.
-
Does not sync data available before starting the CDC agent.
-
Does not replay logged batches.
-
Does not manage time-to-live.
-
Does not support range deletes.
-
CQL column names must not match a Pulsar primitive type name (ex: INT32).
-
Does not support multi-region.
-
Does not support multi-table mutations.
Prerequisites
You need the following items to complete this procedure:
-
An active Astra account.
-
An Astra DB Serverless database created in the Astra Portal.
-
An keyspace created in the Astra Portal.
-
An active Elasticsearch account.
-
An Elasticsearch endpoint, index name, and API key retrieved from your Elasticsearch Deployment.
Create a streaming tenant
-
Log into the Astra Portal. At the bottom of the Welcome page, select View Streaming.
-
Select Create Tenant.
-
Enter a name for your new streaming tenant.
-
Select a provider and region.
-
Select Create Tenant.
Astra Streaming CDC can only be used in a region that supports both Astra Streaming and Astra DB Serverless databases. See Astra Streaming Regions for more information.
Create a table
-
Select Databases from the main navigation.
-
Select the name of the active database that you would like to use.
-
Select the CQL Console tab.
-
Create a table with a primary key column using the following command. Edit the command to add your
KEYSPACE_NAME
and choose aTABLE_NAME
.CREATE TABLE IF NOT EXISTS KEYSPACE_NAME.TABLE_NAME (key text PRIMARY KEY, c1 text);
-
Confirm that your table was created:
select * from KEYSPACE_NAME.TABLE_NAME;
Result:
key | c1 -----+---- (0 rows)
You have now created a table and confirmed that the table exists in your Astra DB Serverless database.
Connect to CDC for Astra DB Serverless databases
Complete the following steps after you have created a streaming tenant and a table.
-
Select Databases from the main navigation.
-
Select the name of the active database that you would like to use.
-
Click the CDC tab.
-
Click Enable CDC.
-
Complete the fields to select a tenant, select a keyspace, and select the name of the table you created.
-
Click Enable CDC.
Enabling CDC creates a new astracdc
namespace with two new topics, data-
and log-
.
The log-
topic consumes schema changes, processes them, and then writes clean data to the data-
topic.
The log-
topic is for CDC functionality and should not be used.
The data-
topic is used to consume CDC data in Astra Streaming.
For more information, see Increase the CDC data-topic Partitions.
Connect Elasticsearch sink
Connect an Elasticsearch sink to CDC that consumes messages from the data-
topic and sends them to your Elasticsearch deployment.
-
Go to your database’s CDC tab.
-
Under Change Data Capture, select the name of the CDC-enabled table you would like to use. You should still be in the CDC tab after selecting a name, but the header becomes CDC for
TABLE_NAME
with a green Active icon next to it. -
Select Add Elastic Search Sink to select your settings.
-
Select the
astracdc
namespace. -
Select Elastic Search for the sink type.
-
Enter a name for your sink.
-
Under Connect Topics, select a
data-
topic in theastracdc
namespace for the input topic. -
Complete Sink-Specific Configuration with the Elasticsearch URL, Index name, and API key found in your Elasticsearch deployment portal. Leave username, password, and token blank.
Default values auto-populate. These values are recommended:
-
Ignore Record Key
asfalse
-
Null Value Action
asDELETE
-
Enable Schema
astrue
-
-
When the fields are completed, select Create.
If creation is successful, SINK_NAME created successfully
appears at the top of the screen.
You can confirm that your new sink was created in the Sinks tab.
Send messages
Let’s process some changes with CDC.
-
Go to your database’s CQL Console tab.
-
Modify the table you created.
INSERT INTO KEYSPACE_NAME.TABLE_NAME (key,c1) VALUES ('32a','bob3123'); INSERT INTO KEYSPACE_NAME.TABLE_NAME (key,c1) VALUES ('32b','bob3123b');
-
Confirm the changes you’ve made:
select * from KEYSPACE_NAME.TABLE_NAME;
Result:
key | c1 -----+---------- 32a | bob3123 32b | bob3123b (2 rows)
Your processed changes in the resulting table verify that the messages sent successfully.
Confirm Elasticsearch receives change data
Ensure that your new Elasticsearch sink receives data once it is connected.
-
Issue a GET request to your Elasticsearch deployment to confirm Elasticsearch is receiving changes from your database via CDC.
curl -X POST "ELASTIC_URL/INDEX_NAME/_search?pretty" -H "Authorization: ApiKey 'API_KEY'"
-
A JSON response with your changes to the index is returned, confirming that Astra Streaming is sending your CDC changes to your Elasticsearch sink.
{ "took": 1, "timed_out": false, "_shards": { "total": 1, "successful": 1, "skipped": 0, "failed": 0 }, "hits": { "total": { "value": 3, "relation": "eq" }, "max_score": 1.0, "hits": [ { "_index": "INDEX_NAME", "_id": "khl_hI0Bh25AUvCHghQo", "_score": 1.0, "_source": { "name": "foo", "title": "bar" } }, { "_index": "INDEX_NAME", "_id": "32a", "_score": 1.0, "_source": { "c1": "bob3123" } }, { "_index": "INDEX_NAME", "_id": "32b", "_score": 1.0, "_source": { "c1": "bob3123b" } } ] } }
Outcomes
At this point you have successfully:
-
Created a tenant, topic, and table.
-
Connected your Astra DB Serverless database to CDC.
-
Connected Elasicsearch sink to your CDC and verified that messages are sent and received successfully.
Increase the CDC data-topic Partitions
After enabling CDC, 3 data and 3 log partitions are created under the astracdc
namespace.
Increasing the number of partitions will create new partitions, but existing data will remain in the old partitions.
New messages will be distributed across the new partitions.
-
Confirm the current state of the topic before making changes.
bin/pulsar-admin topics list-partitioned-topics astracdc
Result:
persistent://ten01/astracdc/data-7e3a1b2c-4d5e-6f7a-8b9c-0d1e2f3a4b5c-keysp.table1-partition-1 persistent://ten01/astracdc/log-7e3a1b2c-4d5e-6f7a-8b9c-0d1e2f3a4b5c-keysp.table1-partition-2 persistent://ten01/astracdc/data-7e3a1b2c-4d5e-6f7a-8b9c-0d1e2f3a4b5c-keysp.table1-partition-0 persistent://ten01/astracdc/log-7e3a1b2c-4d5e-6f7a-8b9c-0d1e2f3a4b5c-keysp.table1-partition-0 persistent://ten01/astracdc/log-7e3a1b2c-4d5e-6f7a-8b9c-0d1e2f3a4b5c-keysp.table1-partition-1 persistent://ten01/astracdc/data-7e3a1b2c-4d5e-6f7a-8b9c-0d1e2f3a4b5c-keysp.table1-partition-2
-
Use the
update-partitioned-topic
command to change the number of partitions for a specified topic.bin/pulsar-admin topics update-partitioned-topic ten01/astracdc/data-7e3a1b2c-4d5e-6f7a-8b9c-0d1e2f3a4b5c-keysp.table1 --partitions 10
Here, we are increasing the number of partitions to 10. You can only increase the number of partitions. Decreasing is not supported due to potential data loss and message ordering issues.
-
Verify the update.
bin/pulsar-admin topics list ten01/astracdc
Result:
persistent://ten01/astracdc/log-7e3a1b2c-4d5e-6f7a-8b9c-0d1e2f3a4b5c-keysp.table1-partition-2 persistent://ten01/astracdc/log-7e3a1b2c-4d5e-6f7a-8b9c-0d1e2f3a4b5c-keysp.table1-partition-0 persistent://ten01/astracdc/log-7e3a1b2c-4d5e-6f7a-8b9c-0d1e2f3a4b5c-keysp.table1-partition-1 persistent://ten01/astracdc/data-7e3a1b2c-4d5e-6f7a-8b9c-0d1e2f3a4b5c-keysp.table1-partition-9 persistent://ten01/astracdc/data-7e3a1b2c-4d5e-6f7a-8b9c-0d1e2f3a4b5c-keysp.table1-partition-8 persistent://ten01/astracdc/data-7e3a1b2c-4d5e-6f7a-8b9c-0d1e2f3a4b5c-keysp.table1-partition-7 persistent://ten01/astracdc/data-7e3a1b2c-4d5e-6f7a-8b9c-0d1e2f3a4b5c-keysp.table1-partition-6 persistent://ten01/astracdc/data-7e3a1b2c-4d5e-6f7a-8b9c-0d1e2f3a4b5c-keysp.table1-partition-1 persistent://ten01/astracdc/data-7e3a1b2c-4d5e-6f7a-8b9c-0d1e2f3a4b5c-keysp.table1-partition-0 persistent://ten01/astracdc/data-7e3a1b2c-4d5e-6f7a-8b9c-0d1e2f3a4b5c-keysp.table1-partition-5 persistent://ten01/astracdc/data-7e3a1b2c-4d5e-6f7a-8b9c-0d1e2f3a4b5c-keysp.table1-partition-4 persistent://ten01/astracdc/data-7e3a1b2c-4d5e-6f7a-8b9c-0d1e2f3a4b5c-keysp.table1-partition-3 persistent://ten01/astracdc/data-7e3a1b2c-4d5e-6f7a-8b9c-0d1e2f3a4b5c-keysp.table1-partition-2
-
Check the topic to confirm it has been updated to have 10 partitions.
bin/pulsar-admin topics partitioned-stats persistent://ten01/astracdc/data-7e3a1b2c-4d5e-6f7a-8b9c-0d1e2f3a4b5c-keysp.table1
Result:
Details
{ "msgRateIn" : 0.0, "msgThroughputIn" : 0.0, "msgRateOut" : 0.0, "msgThroughputOut" : 0.0, "bytesInCounter" : 0, "msgInCounter" : 0, "bytesOutCounter" : 0, "msgOutCounter" : 0, "averageMsgSize" : 0.0, "msgChunkPublished" : false, "storageSize" : 0, "backlogSize" : 0, "publishRateLimitedTimes" : 0, "earliestMsgPublishTimeInBacklogs" : 0, "offloadedStorageSize" : 0, "lastOffloadLedgerId" : 0, "lastOffloadSuccessTimeStamp" : 0, "lastOffloadFailureTimeStamp" : 0, "publishers" : [ ], "waitingPublishers" : 0, "subscriptions" : { }, "replication" : { }, "nonContiguousDeletedMessagesRanges" : 0, "nonContiguousDeletedMessagesRangesSerializedSize" : 0, "compaction" : { "lastCompactionRemovedEventCount" : 0, "lastCompactionSucceedTimestamp" : 0, "lastCompactionFailedTimestamp" : 0, "lastCompactionDurationTimeInMills" : 0 }, "metadata" : { "partitions" : 10 }, "partitions" : { } }
Resources
For more on Astra Streaming, see: