Create a Change Data Capture (CDC) connector
|
CDC for Astra DB automatically captures changes in real time, de-duplicates the changes, and streams the clean set of changed data into Astra Streaming where it can be processed by client applications or sent to downstream systems.
Astra Streaming processes data changes via a Pulsar topic. By design, the Change Data Capture (CDC) component is simple, with a 1:1 correspondence between the table and a single Pulsar topic.
This guide explains how to connect your Astra DB database to CDC and send change data to an Elasticsearch sink.
Supported data structures
The following data types and corresponding AVRO or logical types are supported for CDC for Astra DB Serverless databases:
Data type | AVRO type |
---|---|
ascii |
string |
bigint |
long |
blob |
bytes |
boolean |
boolean |
counter |
long |
date |
int |
decimal |
cql_decimal |
double |
double |
duration |
cql_duration |
float |
float |
inet |
string |
int |
int |
list |
array |
map |
map (only string-type keys are supported) |
set |
array |
smallint |
int |
text |
string |
time |
long |
timestamp |
long |
timeuuid |
string |
tinyint |
int |
uuid |
string |
varchar |
string |
varint |
cql_varint / bytes |
Cassandra static columns are supported:
-
On row-level updates, static columns are included in the message value.
-
On partition-level updates, the clustering keys are null in the message key. The message value only has static columns on
INSERT
andUPDATE
operations.
For columns using data types that are not supported, the data types are omitted from the events sent to the data topic. If a row update contains both supported and unsupported data types, the event includes only columns with supported data types.
AVRO interpretation
Astra DB Serverless database keys are strings, while CDC produces AVRO messages which are structures. The conversion for some AVRO structures requires additional tooling that can result in unexpected output.
The table below describes the conversion of AVRO logical types. The record
type is a schema containing the listed fields.
Name | AVRO type | Fields | Explanation |
---|---|---|---|
collections |
array |
lists, sets |
Sets and Lists are treated as AVRO type |
decimal |
record |
BIG_INT, DECIMAL_SCALE |
The Cassandra DECIMAL type is converted to a |
duration |
record |
CQL_DURATION_MONTHS, CQL_DURATION_DAYS, CQL_DURATION_NANOSECONDS |
The Cassandra DURATION type is converted to a |
maps |
map |
KEYS_CONVERTED_TO_STRINGS, VALUE_SCHEMA |
The Cassandra MAP type is converted to the AVRO map type, but the keys are converted to strings. For complex types, the key is represented in JSON. |
Limitations
CDC for Astra DB Serverless databases has the following limitations:
-
Does not manage table truncates.
-
Does not sync data available before starting the CDC agent.
-
Does not replay logged batches.
-
Does not manage time-to-live.
-
Does not support range deletes.
-
CQL column names must not match a Pulsar primitive type name (ex: INT32).
-
Does not support multi-region.
-
Does not support multi-table mutations.
Prerequisites
You need the following items to complete this procedure:
-
An active Astra DB account.
-
An Astra DB Serverless database created in the Astra Portal.
-
An keyspace created in the Astra Portal.
-
An active Elasticsearch account.
-
An Elasticsearch endpoint, index name, and API key retrieved from your Elasticsearch Deployment.
Create a streaming tenant
-
In the Astra Portal navigation menu, click Streaming.
-
Select Create Tenant.
-
Enter a name for your new streaming tenant.
-
Select a provider and region.
Astra Streaming CDC can only be used in a region that supports both Astra Streaming and Astra DB Serverless databases. See Astra Streaming regions for more information.
-
Select Create Tenant.
Create a table
-
Select Databases from the main navigation.
-
Select the name of the active database that you would like to use.
-
Select the CQL Console tab.
-
Create a table with a primary key column using the following command. Edit the command to add your
KEYSPACE_NAME
and choose aTABLE_NAME
.CREATE TABLE IF NOT EXISTS KEYSPACE_NAME.TABLE_NAME (key text PRIMARY KEY, c1 text);
-
Confirm that your table was created:
select * from KEYSPACE_NAME.TABLE_NAME;
Result
key | c1 -----+---- (0 rows)
Connect to CDC for Astra DB Serverless databases
Complete the following steps after you have created a streaming tenant and a table.
-
In the Astra Portal, go to your database.
-
Click the CDC tab.
-
Click Enable CDC.
-
Select a tenant, keyspace, and table.
-
Click Enable CDC.
Enabling CDC creates a new astracdc
namespace with two new topics: data-
and log-
.
The log-
topic consumes schema changes, processes them, and then writes clean data to the data-
topic.
The log-
topic is for mandatory CDC functionality and should not be used.
The data-
topic is used to consume CDC data in Astra Streaming.
For more information, see Increase the CDC data-topic Partitions.
Connect Elasticsearch sink
Connect an Elasticsearch sink to CDC that consumes messages from the data-
topic and sends them to your Elasticsearch deployment.
-
Go to your database’s CDC tab.
-
Under Change Data Capture, select the name of the CDC-enabled table you would like to use. You should still be in the CDC tab after selecting a name, but the header becomes CDC for
TABLE_NAME
with a green Active icon next to it. -
Select Add Elastic Search Sink to select your settings.
-
Select the
astracdc
namespace. -
Select Elastic Search for the sink type.
-
Enter a name for your sink.
-
Under Connect Topics, select a
data-
topic in theastracdc
namespace for the input topic. -
Complete Sink-Specific Configuration with the Elasticsearch URL, Index name, and API key found in your Elasticsearch deployment portal. Leave username, password, and token blank.
Default values auto-populate. These values are recommended:
-
Ignore Record Key
asfalse
-
Null Value Action
asDELETE
-
Enable Schema
astrue
-
-
Click Create.
-
Confirm that your new sink was created on the Sinks tab.
Send messages
Process some changes with CDC:
-
Go to your database’s CQL Console tab.
-
Modify the table you created:
INSERT INTO KEYSPACE_NAME.TABLE_NAME (key,c1) VALUES ('32a','bob3123'); INSERT INTO KEYSPACE_NAME.TABLE_NAME (key,c1) VALUES ('32b','bob3123b');
-
Confirm the changes you made:
select * from KEYSPACE_NAME.TABLE_NAME;
Result
key | c1 -----+---------- 32a | bob3123 32b | bob3123b (2 rows)
Your processed changes in the resulting table verify that the messages sent successfully.
Confirm Elasticsearch receives change data
Send a GET
request to your Elasticsearch deployment to confirm Elasticsearch is receiving changes from your database via CDC:
curl -sS --location -X POST "ELASTIC_URL/INDEX_NAME/_search?pretty"
--header "Authorization: ApiKey 'API_KEY'"
Make sure the response includes your changes to the index:
{
"took": 1,
"timed_out": false,
"_shards": {
"total": 1,
"successful": 1,
"skipped": 0,
"failed": 0
},
"hits": {
"total": {
"value": 3,
"relation": "eq"
},
"max_score": 1.0,
"hits": [
{
"_index": "INDEX_NAME",
"_id": "khl_hI0Bh25AUvCHghQo",
"_score": 1.0,
"_source": {
"name": "foo",
"title": "bar"
}
},
{
"_index": "INDEX_NAME",
"_id": "32a",
"_score": 1.0,
"_source": {
"c1": "bob3123"
}
},
{
"_index": "INDEX_NAME",
"_id": "32b",
"_score": 1.0,
"_source": {
"c1": "bob3123b"
}
}
]
}
}
Increase the CDC data-topic partitions
After enabling CDC, 3 data and 3 log partitions are created under the astracdc
namespace.
Increasing the number of partitions will create new partitions, but existing data will remain in the old partitions.
New messages will be distributed across the new partitions.
-
Confirm the current state of the topic before making changes:
bin/pulsar-admin topics list-partitioned-topics astracdc
Result
persistent://ten01/astracdc/data-7e3a1b2c-4d5e-6f7a-8b9c-0d1e2f3a4b5c-keysp.table1-partition-1 persistent://ten01/astracdc/log-7e3a1b2c-4d5e-6f7a-8b9c-0d1e2f3a4b5c-keysp.table1-partition-2 persistent://ten01/astracdc/data-7e3a1b2c-4d5e-6f7a-8b9c-0d1e2f3a4b5c-keysp.table1-partition-0 persistent://ten01/astracdc/log-7e3a1b2c-4d5e-6f7a-8b9c-0d1e2f3a4b5c-keysp.table1-partition-0 persistent://ten01/astracdc/log-7e3a1b2c-4d5e-6f7a-8b9c-0d1e2f3a4b5c-keysp.table1-partition-1 persistent://ten01/astracdc/data-7e3a1b2c-4d5e-6f7a-8b9c-0d1e2f3a4b5c-keysp.table1-partition-2
-
Use the
update-partitioned-topic
command to change the number of partitions for a specified topic:bin/pulsar-admin topics update-partitioned-topic ten01/astracdc/data-7e3a1b2c-4d5e-6f7a-8b9c-0d1e2f3a4b5c-keysp.table1 --partitions 10
This example increases the number of partitions to 10. You can only increase the number of partitions. Decreasing is not supported due to potential data loss and message ordering issues.
-
Verify the update:
bin/pulsar-admin topics list ten01/astracdc
Result
persistent://ten01/astracdc/log-7e3a1b2c-4d5e-6f7a-8b9c-0d1e2f3a4b5c-keysp.table1-partition-2 persistent://ten01/astracdc/log-7e3a1b2c-4d5e-6f7a-8b9c-0d1e2f3a4b5c-keysp.table1-partition-0 persistent://ten01/astracdc/log-7e3a1b2c-4d5e-6f7a-8b9c-0d1e2f3a4b5c-keysp.table1-partition-1 persistent://ten01/astracdc/data-7e3a1b2c-4d5e-6f7a-8b9c-0d1e2f3a4b5c-keysp.table1-partition-9 persistent://ten01/astracdc/data-7e3a1b2c-4d5e-6f7a-8b9c-0d1e2f3a4b5c-keysp.table1-partition-8 persistent://ten01/astracdc/data-7e3a1b2c-4d5e-6f7a-8b9c-0d1e2f3a4b5c-keysp.table1-partition-7 persistent://ten01/astracdc/data-7e3a1b2c-4d5e-6f7a-8b9c-0d1e2f3a4b5c-keysp.table1-partition-6 persistent://ten01/astracdc/data-7e3a1b2c-4d5e-6f7a-8b9c-0d1e2f3a4b5c-keysp.table1-partition-1 persistent://ten01/astracdc/data-7e3a1b2c-4d5e-6f7a-8b9c-0d1e2f3a4b5c-keysp.table1-partition-0 persistent://ten01/astracdc/data-7e3a1b2c-4d5e-6f7a-8b9c-0d1e2f3a4b5c-keysp.table1-partition-5 persistent://ten01/astracdc/data-7e3a1b2c-4d5e-6f7a-8b9c-0d1e2f3a4b5c-keysp.table1-partition-4 persistent://ten01/astracdc/data-7e3a1b2c-4d5e-6f7a-8b9c-0d1e2f3a4b5c-keysp.table1-partition-3 persistent://ten01/astracdc/data-7e3a1b2c-4d5e-6f7a-8b9c-0d1e2f3a4b5c-keysp.table1-partition-2
-
Check the topic to confirm it has been updated to have 10 partitions:
bin/pulsar-admin topics partitioned-stats persistent://ten01/astracdc/data-7e3a1b2c-4d5e-6f7a-8b9c-0d1e2f3a4b5c-keysp.table1
Result
{ "msgRateIn" : 0.0, "msgThroughputIn" : 0.0, "msgRateOut" : 0.0, "msgThroughputOut" : 0.0, "bytesInCounter" : 0, "msgInCounter" : 0, "bytesOutCounter" : 0, "msgOutCounter" : 0, "averageMsgSize" : 0.0, "msgChunkPublished" : false, "storageSize" : 0, "backlogSize" : 0, "publishRateLimitedTimes" : 0, "earliestMsgPublishTimeInBacklogs" : 0, "offloadedStorageSize" : 0, "lastOffloadLedgerId" : 0, "lastOffloadSuccessTimeStamp" : 0, "lastOffloadFailureTimeStamp" : 0, "publishers" : [ ], "waitingPublishers" : 0, "subscriptions" : { }, "replication" : { }, "nonContiguousDeletedMessagesRanges" : 0, "nonContiguousDeletedMessagesRangesSerializedSize" : 0, "compaction" : { "lastCompactionRemovedEventCount" : 0, "lastCompactionSucceedTimestamp" : 0, "lastCompactionFailedTimestamp" : 0, "lastCompactionDurationTimeInMills" : 0 }, "metadata" : { "partitions" : 10 }, "partitions" : { } }