Create a Change Data Capture (CDC) connector
CDC for Astra DB incurs billed charges based on your Astra Streaming usage. For more information, see Astra Streaming pricing and CDC metering rates. |
CDC for Astra DB automatically captures changes in real time, de-duplicates the changes, and then streams the clean set of changed data into Astra Streaming where it can be processed by client applications or sent to downstream systems.
Astra Streaming processes data changes through a Pulsar topic. By design, the Change Data Capture (CDC) component has a one-to-one correspondence between a table and a single Pulsar topic.
Supported data structures
CDC for Astra DB supports the following CQL data types and corresponding AVRO or logical types:
Data type | AVRO type |
---|---|
ascii |
string |
bigint |
long |
blob |
bytes |
boolean |
boolean |
counter |
long |
date |
int |
decimal |
cql_decimal |
double |
double |
duration |
cql_duration |
float |
float |
inet |
string |
int |
int |
list |
array |
map |
map (only string-type keys are supported) |
set |
array |
smallint |
int |
text |
string |
time |
long |
timestamp |
long |
timeuuid |
string |
tinyint |
int |
uuid |
string |
varchar |
string |
varint |
cql_varint / bytes |
Apache Cassandra® static columns are supported:
-
On row-level updates, static columns are included in the message value.
-
On partition-level updates, the clustering keys are null in the message key. The message value only has static columns on
INSERT
andUPDATE
operations.
For columns using data types that are not supported, the data types are omitted from the events sent to the data topic. If a row update contains both supported and unsupported data types, the event includes only columns with supported data types.
AVRO interpretation
Keys from tables in Astra DB databases are strings, but CDC produces AVRO messages that are structures. The conversion for some AVRO structures requires additional tooling that can result in unexpected output.
The following table describes the conversion of AVRO logical types:
Name | AVRO type | Fields | Explanation |
---|---|---|---|
collections |
array |
lists, sets |
Sets and Lists are treated as AVRO type |
decimal |
record |
BIG_INT, DECIMAL_SCALE |
The Cassandra DECIMAL type is converted to a The |
duration |
record |
CQL_DURATION_MONTHS, CQL_DURATION_DAYS, CQL_DURATION_NANOSECONDS |
The Cassandra DURATION type is converted to a The |
maps |
map |
KEYS_CONVERTED_TO_STRINGS, VALUE_SCHEMA |
The Cassandra MAP type is converted to the AVRO map type, but the keys are converted to strings. For complex types, the key is represented in JSON. |
Limitations
CDC for Astra DB has the following limitations:
-
Does not manage table truncates.
-
Does not sync data available before starting the CDC agent.
-
Does not replay logged batches.
-
Does not manage time-to-live.
-
Does not support range deletes.
-
Does not allow CQL column names that match a Pulsar primitive type name, such as
INT32
. -
Does not support multi-region.
-
Does not support multi-table mutations.
Configure CDC for Astra DB
To configure CDC for Astra DB, you must create a streaming tenant, table, and CDC connector, and then connect a sink to the CDC connector. The CDC connector connects your database to CDC, and it automatically creates a namespace and topics in your streaming tenant. The integration between your CDC connector and sink enables the sink to consume messages from a topic in the tenant, and then send them to the associated service deployment.
These instructions use an Elasticsearch sink as an example. You can use other Astra Streaming sinks.
Prerequisites
To enable CDC for Astra DB Serverless, you need the following:
-
An active Astra account
-
The database’s Secure Connect Bundle (SCB)
-
An active account for your sink service, such as Elasticsearch
-
Connection details for your sink service deployment, such as an Elasticsearch endpoint, index name, and API key for an Elasticsearch Deployment
Create a streaming tenant
-
In the Astra Portal navigation menu, click Streaming.
-
Click Create Tenant.
-
Enter a name for the streaming tenant.
-
Select a cloud provider and region.
You can only use CDC for Astra DB in regions that support both Astra Streaming and Astra DB Serverless. For more information, see Astra Streaming regions and Astra DB Serverless database regions and maintenance schedules.
-
Click Create Tenant.
Do not create a namespace or topics in your tenant. The CDC connector does this automatically.
Create a table
-
In the Astra Portal navigation menu, select your database.
-
Click CQL Console.
-
Use the CQL Console to create a table with a primary key column in your database:
CREATE TABLE IF NOT EXISTS KEYSPACE_NAME.TABLE_NAME (key text PRIMARY KEY, c1 text);
Replace the following:
-
KEYSPACE_NAME
: Your database’s keyspace name. -
TABLE_NAME
: Enter a name for the new table.
-
-
Confirm table creation:
select * from KEYSPACE_NAME.TABLE_NAME;
Result:
token@cqlsh> select * from KEYSPACE_NAME.TABLE_NAME; key | c1 -----+---- (0 rows) token@cqlsh>
Connect to CDC for Astra DB
After you Create a streaming tenant and Create a table, create a CDC connector:
-
In the Astra Portal navigation menu, select your database.
-
Click the CDC tab.
-
Click Enable CDC.
-
Select a tenant, keyspace, and table.
-
Click Enable CDC to create the CDC connector.
When you enabling CDC, Astra DB creates a new astracdc
namespace in your streaming tenant that has two topics:
-
The
data-
topic consumes CDC data in Astra Streaming. -
The
log-
topic consumes schema changes, processes them, and then writes clean data to thedata-
topic. Thelog-
topic is required for CDC functionality; it is not for direct use.
Each topic has three partitions by default.
You can increase partitions for the data-
topic, if desired.
For more information, see Increase CDC data topic partitions.
Connect a sink
After you create the CDC connector, connect a sink to it.
This integration enables the sink to consume messages from the data-
topic, and then send them to the associated service deployment.
This example uses an Elasticsearch sink. You can use other Astra Streaming sinks.
-
In the Astra Portal navigation menu, select your database.
-
Click the CDC tab.
-
In the Change Data Capture list, click the name of the CDC-enabled table that you want to use.
-
Click Add Elastic Search Sink.
-
For Namespace, select astracdc.
-
For Sink Type, select Elastic Search.
-
Enter a name for the sink.
-
In the Connect Topics section, for the Input topic, select a data- topic in the astracdc namespace.
-
In the Sink-Specific Configuration section, enter your Elasticsearch URL, Index name, and API key for your Elasticsearch deployment. Do not enter a username, password, or token.
-
For Ignore Record Key, Null Value Action, and Enable Schema, DataStax recommends the following values:
-
Ignore Record Key:
false
-
Null Value Action:
DELETE
-
Enable Schema:
true
-
-
Click Create.
If sink creation succeeds, a confirmation message appears in the Astra Portal, and the new sink appears on the Sinks tab.
Test the connection
Test the CDC functionality to verify that your Elasticsearch sink receives data through the CDC connector:
-
In the Astra Portal navigation menu, select your database.
-
Click CQL Console.
-
Process some changes with CDC. For example, you can modify the table:
INSERT INTO KEYSPACE_NAME.TABLE_NAME (key,c1) VALUES ('32a','bob3123'); INSERT INTO KEYSPACE_NAME.TABLE_NAME (key,c1) VALUES ('32b','bob3123b');
Replace the following:
-
KEYSPACE_NAME
: Your database’s keyspace name. -
TABLE_NAME
: Your CDC-enabled table name.
-
-
Verify the changes:
select * from KEYSPACE_NAME.TABLE_NAME;
Result:
key | c1 -----+---------- 32a | bob3123 32b | bob3123b (2 rows) token@cqlsh>
-
Fetch the data from your sink service deployment. For example, send a
GET
request to an Elasticsearch deployment:curl -sS -L -X POST "ELASTICSEARCH_URL/INDEX_NAME/_search?pretty" \ -header "Authorization: ApiKey 'API_KEY'"
Replace
ELASTICSEARCH_URL
,INDEX_NAME
, andAPI_KEY
with the values from your Elasticsearch deployment that you used to connect the sink. -
Make sure the response includes your changes. This indicates that Astra Streaming successfully sent changes tracked by CDC to your sink service deployment.
{ "took": 1, "timed_out": false, "_shards": { "total": 1, "successful": 1, "skipped": 0, "failed": 0 }, "hits": { "total": { "value": 3, "relation": "eq" }, "max_score": 1.0, "hits": [ { "_index": "INDEX_NAME", "_id": "khl_hI0Bh25AUvCHghQo", "_score": 1.0, "_source": { "name": "foo", "title": "bar" } }, { "_index": "INDEX_NAME", "_id": "32a", "_score": 1.0, "_source": { "c1": "bob3123" } }, { "_index": "INDEX_NAME", "_id": "32b", "_score": 1.0, "_source": { "c1": "bob3123b" } } ] } }
Increase CDC data topic partitions
When you enable CDC, Astra DB creates three data-
partitions and three log-
partitions in your tenant’s astracdc
namespace.
Optionally, you can increase the number of partitions for the data-
topic.
Increasing the number of partitions creates new partitions, but existing data remains in the original partitions.
New messages are distributed across the new partitions.
To increase the number of data-
topic partitions, do the following:
-
Before you make changes, use
pulsar-admin
to get the namespace’s existing partitions:bin/pulsar-admin topics list-partitioned-topics astracdc
The response describes the existing partitions for the
data-
andlog-
topics. The default configuration has three partitions for each topic numbered 0, 1, and 2.persistent://TENANT_NAME/astracdc/data-ID-KEYSPACE_NAME.TABLE_NAME-partition-1 persistent://TENANT_NAME/astracdc/log-ID-KEYSPACE_NAME.TABLE_NAME-partition-2 persistent://TENANT_NAME/astracdc/data-ID-KEYSPACE_NAME.TABLE_NAME-partition-0 persistent://TENANT_NAME/astracdc/log-ID-KEYSPACE_NAME.TABLE_NAME-partition-0 persistent://TENANT_NAME/astracdc/log-ID-KEYSPACE_NAME.TABLE_NAME-partition-1 persistent://TENANT_NAME/astracdc/data-ID-KEYSPACE_NAME.TABLE_NAME-partition-2
The
TENANT_NAME
,ID
,KEYSPACE_NAME
, andTABLE_NAME
values are the same for each partition. The actual values depend on your CDC configuration. -
From the response, get a
data-
topic string withoutpersistent://
and the partition number.For example, from
persistent://TENANT_NAME/astracdc/data-ID-KEYSPACE_NAME.TABLE_NAME-partition-1
, extract onlyTENANT_NAME/astracdc/data-ID-KEYSPACE_NAME.TABLE_NAME
. -
Use the
update-partitioned-topic
command to increase the number of partitions for thedata-
topic:bin/pulsar-admin topics update-partitioned-topic DATA_TOPIC_STRING --partitions NUMBER
Replace the following:
-
DATA_TOPIC_STRING
: Thedata-
topic string from thelist-partitioned-topics
response in the format ofTENANT_NAME/astracdc/data-ID-KEYSPACE_NAME.TABLE_NAME
. -
NUMBER
: The desired total number of partitions.For example,
--partitions 10
increases the total number of partitions to 10. If the topic has 3 partitions, then--partitions 10
creates 7 new partitions for a total of 10.You can only increase the number of partitions.
You can’t decrease the number of partitions due to potential data loss and message ordering issues.
-
-
Verify the increase:
bin/pulsar-admin topics list TENANT_NAME/astracdc
Replace
TENANT_NAME
with your CDC tenant name. -
Make sure the response includes the desired total number of partitions.
The following response indicates that the
data-
topic now has 10 total partitions numbered 0-9:persistent://TENANT_NAME/astracdc/log-ID-KEYSPACE_NAME.TABLE_NAME-partition-2 persistent://TENANT_NAME/astracdc/log-ID-KEYSPACE_NAME.TABLE_NAME-partition-0 persistent://TENANT_NAME/astracdc/log-ID-KEYSPACE_NAME.TABLE_NAME-partition-1 persistent://TENANT_NAME/astracdc/data-ID-KEYSPACE_NAME.TABLE_NAME-partition-9 persistent://TENANT_NAME/astracdc/data-ID-KEYSPACE_NAME.TABLE_NAME-partition-8 persistent://TENANT_NAME/astracdc/data-ID-KEYSPACE_NAME.TABLE_NAME-partition-7 persistent://TENANT_NAME/astracdc/data-ID-KEYSPACE_NAME.TABLE_NAME-partition-6 persistent://TENANT_NAME/astracdc/data-ID-KEYSPACE_NAME.TABLE_NAME-partition-1 persistent://TENANT_NAME/astracdc/data-ID-KEYSPACE_NAME.TABLE_NAME-partition-0 persistent://TENANT_NAME/astracdc/data-ID-KEYSPACE_NAME.TABLE_NAME-partition-5 persistent://TENANT_NAME/astracdc/data-ID-KEYSPACE_NAME.TABLE_NAME-partition-4 persistent://TENANT_NAME/astracdc/data-ID-KEYSPACE_NAME.TABLE_NAME-partition-3 persistent://TENANT_NAME/astracdc/data-ID-KEYSPACE_NAME.TABLE_NAME-partition-2
-
Confirm that the topic was updated to have the desired number of partitions:
bin/pulsar-admin topics partitioned-stats persistent://**DATA_TOPIC_STRING**
Replace
DATA_TOPIC_STRING
with thedata-
topic string in the format ofTENANT_NAME/astracdc/data-ID-KEYSPACE_NAME.TABLE_NAME
.Result
{ "msgRateIn" : 0.0, "msgThroughputIn" : 0.0, "msgRateOut" : 0.0, "msgThroughputOut" : 0.0, "bytesInCounter" : 0, "msgInCounter" : 0, "bytesOutCounter" : 0, "msgOutCounter" : 0, "averageMsgSize" : 0.0, "msgChunkPublished" : false, "storageSize" : 0, "backlogSize" : 0, "publishRateLimitedTimes" : 0, "earliestMsgPublishTimeInBacklogs" : 0, "offloadedStorageSize" : 0, "lastOffloadLedgerId" : 0, "lastOffloadSuccessTimeStamp" : 0, "lastOffloadFailureTimeStamp" : 0, "publishers" : [ ], "waitingPublishers" : 0, "subscriptions" : { }, "replication" : { }, "nonContiguousDeletedMessagesRanges" : 0, "nonContiguousDeletedMessagesRangesSerializedSize" : 0, "compaction" : { "lastCompactionRemovedEventCount" : 0, "lastCompactionSucceedTimestamp" : 0, "lastCompactionFailedTimestamp" : 0, "lastCompactionDurationTimeInMills" : 0 }, "metadata" : { "partitions" : 10 }, "partitions" : { } }