CDC for Astra DB
CDC for Astra DB automatically captures changes in real time, de-duplicates the changes, and streams the clean set of changed data into Astra Streaming where it can be processed by client applications or sent to downstream systems.
Astra Streaming processes data changes via a Pulsar topic. By design, the Change Data Capture (CDC) component is simple, with a 1:1 correspondence between the table and a single Pulsar topic.
This doc will show you how to create a CDC connector for your Astra DB deployment and send change data to an Elasticsearch sink.
Enabling CDC for Astra DB incurs billed charges based on your Astra Streaming usage. For more information, see Astra Streaming pricing and CDC for Astra DB pricing. |
Supported data structures
The following data types (with the associated AVRO type or logical-type) are supported for CDC for Astra DB:
-
ascii (string)
-
bigint (long)
-
blob (bytes)
-
boolean (boolean)
-
counter (long)
-
date (int)
-
decimal (cql_decimal)
-
double (double)
-
duration (cql_duration)
-
float (float)
-
inet (string)
-
int (int)
-
list (array)
-
map (map, only string-type keys are supported)
-
set (array)
-
smallint (int)
-
text (string)
-
time (long)
-
timestamp (long)
-
timeuuid (string)
-
tinyint (int)
-
uuid (string)
-
varchar (string)
-
varint (cql_varint / bytes)
Cassandra static columns are supported:
-
On row-level updates, static columns are included in the message value.
-
On partition-level updates, the clustering keys are null in the message key. The message value only has static columns on
INSERT
/UPDATE
operations.
For columns using data types that are not supported, the data types are omitted from the events sent to the data topic. If a row update contains both supported and unsupported data types, the event will include only columns with supported data types.
AVRO interpretation
Astra DB keys are strings, while CDC produces AVRO messages which are structures. The conversion for some AVRO structures requires additional tooling that can result in unexpected output.
The table below describes the conversion of AVRO logical types. The record
type is a schema containing the listed fields.
Name | AVRO type | Fields | Explanation |
---|---|---|---|
collections |
array |
lists, sets |
Sets and Lists are treated as AVRO type |
decimal |
record |
BIG_INT, DECIMAL_SCALE |
The Cassandra DECIMAL type is converted to a |
duration |
record |
CQL_DURATION_MONTHS, CQL_DURATION_DAYS, CQL_DURATION_NANOSECONDS |
The Cassandra DURATION type is converted to a |
maps |
map |
The Cassandra MAP type is converted to the AVRO map type, but the keys are converted to strings. |
Limitations
CDC for Astra DB has the following limitations:
-
Does not manage table truncates.
-
Does not sync data available before starting the CDC agent.
-
Does not replay logged batches.
-
Does not manage time-to-live.
-
Does not support range deletes.
-
CQL column names must not match a Pulsar primitive type name (ex: INT32).
-
Does not support multi-region.
Creating a tenant and a topic
-
In astra.datastax.com, select Create a Streaming Tenant.
-
Enter the name for your new streaming tenant and select a provider.
-
Select Create Tenant.
Use the default persistent and non-partitioned topic.
Astra Streaming CDC can only be used in a region that supports both Astra Streaming and Astra DB. See Regions for more information. |
Creating a table
-
In your database, create a table with a primary key column:
CREATE TABLE IF NOT EXISTS <keyspacename>.tbl1 (key text PRIMARY KEY, c1 text);
-
Confirm you created your table:
-
CQLSH
-
Result
select * from ks1.tbl1;
token@cqlsh> select * from ks1.tbl1; key | c1 -----+---- (0 rows) token@cqlsh>
-
Connecting to CDC for Astra DB
-
In the Astra Portal, go to Databases, and then select your database.
-
Click the CDC tab.
-
Click Enable CDC.
-
Complete the fields to connect CDC.
-
Select Enable CDC. Once created, your CDC connector will appear:
-
Enabling CDC creates a new
astracdc
namespace with two new topics,data-
andlog-
. Thelog-
topic consumes schema changes, processes them, and then writes clean data to thedata-
topic. Thelog-
topic is for CDC functionality and should not be used. Thedata-
topic can be used to consume CDC data in Astra Streaming.
Connecting Elasticsearch sink
After creating your CDC connector, connect an Elasticsearch sink to it. DataStax recommends using the default Astra Streaming settings.
-
Select the cdc-enabled table from the database CDC tab and click Add Elastic Search Sink to enforce the default settings.
-
Select the corresponding data topic for the chosen table. The topic name will look something like this:
data-64b406e3-28ec-4eaf-a802-69ade0415b58-ks1.tbl1
. -
Use your Elasticsearch deployment to complete the fields. To find your Elasticsearch URL, navigate to your deployment within the Elastic Common Schema (ECS). Copy the Elasticsearch endpoint to the Elastic Search URL field.
-
Complete the remaining fields.
Most values will auto-populate. These values are recommended:
-
Ignore Record Key
asfalse
-
Null Value Action
asDELETE
-
Enable Schema
astrue
-
-
When the fields are completed, select Create.
If creation is successful, <sink-name> created successfully
appears at the top of the screen. You can confirm your new sink was created in the Sinks tab.
Sending messages
Let’s process some changes with CDC.
-
Go to the CQL console.
-
Modify the table you created.
INSERT INTO <keyspacename>.tbl1 (key,c1) VALUES ('32a','bob3123'); INSERT INTO <keyspacename>.tbl1 (key,c1) VALUES ('32b','bob3123b');
-
Confirm the changes you’ve made:
token@cqlsh> select * from ks1.tbl1; key | c1 -----+---------- 32a | bob3123 32b | bob3123b (2 rows)
Confirming ECS is receiving data
To confirm ECS is receiving your CDC changes, issue a curl GET request to your ECS deployment.
-
Get your index name from your ECS sink tab:
-
Issue your curl GET request with your Elastic
username
,password
, andindex name
:curl -u <username>:<password> \ -XGET "https://asdev.es.westus2.azure.elastic-cloud.com:9243/<index_name>/_search?pretty" \ -H 'Content-Type: application/json'
If you’re using a trial account, the username is
elastic
.
You will receive a JSON response with your changes to the index, which confirms Astra Streaming is sending your CDC changes to your ECS sink.
{
"_index" : "index.tbl1",
"_type" : "_doc",
"_id" : "32a",
"_score" : 1.0,
"_source" : {
"c1" : "bob3123"
}
},
{
"_index" : "index.tbl1",
"_type" : "_doc",
"_id" : "32b",
"_score" : 1.0,
"_source" : {
"c1" : "bob3123b"
}
}