Create a Change Data Capture (CDC) connector

CDC connectors are only available for Astra DB Serverless deployments.

CDC for Astra DB automatically captures changes in real time, de-duplicates the changes, and streams the clean set of changed data into Astra Streaming where it can be processed by client applications or sent to downstream systems.

Astra Streaming processes data changes via a Pulsar topic. By design, the Change Data Capture (CDC) component is simple, with a 1:1 correspondence between the table and a single Pulsar topic.

This doc will show you how to create a CDC connector for your Astra DB deployment and send change data to an Elasticsearch sink.

Enabling CDC for Astra DB Serverless databases increases costs based on your Astra Streaming usage. See Astra Streaming pricing and CDC metering rates.

Supported data structures

The following data types and corresponding AVRO or logical types are supported for CDC for Astra DB Serverless databases:

Data type

AVRO type

ascii

string

bigint

long

blob

bytes

boolean

boolean

counter

long

date

int

decimal

cql_decimal

double

double

duration

cql_duration

float

float

inet

string

int

int

list

array

map

map (only string-type keys are supported)

set

array

smallint

int

text

string

time

long

timestamp

long

timeuuid

string

tinyint

int

uuid

string

varchar

string

varint

cql_varint / bytes

Cassandra static columns are supported:

  • On row-level updates, static columns are included in the message value.

  • On partition-level updates, the clustering keys are null in the message key. The message value only has static columns on INSERT and UPDATE operations.

For columns using data types that are not supported, the data types are omitted from the events sent to the data topic. If a row update contains both supported and unsupported data types, the event includes only columns with supported data types.

AVRO interpretation

Astra DB Serverless database keys are strings, while CDC produces AVRO messages which are structures. The conversion for some AVRO structures requires additional tooling that can result in unexpected output.

The table below describes the conversion of AVRO logical types. The record type is a schema containing the listed fields.

AVRO complex types
Name AVRO type Fields Explanation

collections

array

lists, sets

Sets and Lists are treated as AVRO type array, with the attribute items containing the schema of the array’s items.

decimal

record

BIG_INT, DECIMAL_SCALE

The Cassandra DECIMAL type is converted to a record with the cql_decimal logical type.

duration

record

CQL_DURATION_MONTHS, CQL_DURATION_DAYS, CQL_DURATION_NANOSECONDS

The Cassandra DURATION type is converted to a record with the cql_duration logical type.

maps

map

KEYS_CONVERTED_TO_STRINGS, VALUE_SCHEMA

The Cassandra MAP type is converted to the AVRO map type, but the keys are converted to strings. For complex types, the key is represented in JSON.

Limitations

CDC for Astra DB Serverless databases has the following limitations:

  • Does not manage table truncates.

  • Does not sync data available before starting the CDC agent.

  • Does not replay logged batches.

  • Does not manage time-to-live.

  • Does not support range deletes.

  • CQL column names must not match a Pulsar primitive type name (ex: INT32).

  • Does not support multi-region.

  • Does not support multi-table mutations.

Prerequisites

You need the following items to complete this procedure:

Create a streaming tenant

  1. Log into the Astra Portal. At the bottom of the Welcome page, select View Streaming.

  2. Select Create Tenant.

  3. Enter a name for your new streaming tenant.

  4. Select a provider and region.

  5. Select Create Tenant.

    Astra Streaming CDC can only be used in a region that supports both Astra Streaming and Astra DB Serverless databases. See Astra Streaming Regions for more information.

Create a table

  1. Select Databases from the main navigation.

  2. Select the name of the active database that you would like to use.

  3. Select the CQL Console tab.

  4. Create a table with a primary key column using the following command. Edit the command to add your KEYSPACE_NAME and choose a TABLE_NAME.

    CREATE TABLE IF NOT EXISTS KEYSPACE_NAME.TABLE_NAME (key text PRIMARY KEY, c1 text);
  5. Confirm that your table was created:

    select * from KEYSPACE_NAME.TABLE_NAME;

    Result:

     key | c1
    -----+----
    
    (0 rows)

You have now created a table and confirmed that the table exists in your Astra DB Serverless database.

Connect to CDC for Astra DB Serverless databases

Complete the following steps after you have created a streaming tenant and a table.

  1. Select Databases from the main navigation.

  2. Select the name of the active database that you would like to use.

  3. Click the CDC tab.

  4. Click Enable CDC.

  5. Complete the fields to select a tenant, select a keyspace, and select the name of the table you created.

  6. Click Enable CDC.

Enabling CDC creates a new astracdc namespace with two new topics, data- and log-. The log- topic consumes schema changes, processes them, and then writes clean data to the data- topic. The log- topic is for CDC functionality and should not be used. The data- topic is used to consume CDC data in Astra Streaming.

For more information, see Increase the CDC data-topic Partitions.

Connect Elasticsearch sink

Connect an Elasticsearch sink to CDC that consumes messages from the data- topic and sends them to your Elasticsearch deployment.

  1. Go to your database’s CDC tab.

  2. Under Change Data Capture, select the name of the CDC-enabled table you would like to use. You should still be in the CDC tab after selecting a name, but the header becomes CDC for TABLE_NAME with a green Active icon next to it.

  3. Select Add Elastic Search Sink to select your settings.

  4. Select the astracdc namespace.

  5. Select Elastic Search for the sink type.

  6. Enter a name for your sink.

  7. Under Connect Topics, select a data- topic in the astracdc namespace for the input topic.

  8. Complete Sink-Specific Configuration with the Elasticsearch URL, Index name, and API key found in your Elasticsearch deployment portal. Leave username, password, and token blank.

    Default values auto-populate. These values are recommended:

    • Ignore Record Key as false

    • Null Value Action as DELETE

    • Enable Schema as true

  9. When the fields are completed, select Create.

If creation is successful, SINK_NAME created successfully appears at the top of the screen. You can confirm that your new sink was created in the Sinks tab.

Send messages

Let’s process some changes with CDC.

  1. Go to your database’s CQL Console tab.

  2. Modify the table you created.

    INSERT INTO KEYSPACE_NAME.TABLE_NAME (key,c1) VALUES ('32a','bob3123');
    INSERT INTO KEYSPACE_NAME.TABLE_NAME (key,c1) VALUES ('32b','bob3123b');
  3. Confirm the changes you’ve made:

    select * from KEYSPACE_NAME.TABLE_NAME;

    Result:

     key | c1
    -----+----------
     32a |  bob3123
     32b | bob3123b
    
    (2 rows)

Your processed changes in the resulting table verify that the messages sent successfully.

Confirm Elasticsearch receives change data

Ensure that your new Elasticsearch sink receives data once it is connected.

  1. Issue a GET request to your Elasticsearch deployment to confirm Elasticsearch is receiving changes from your database via CDC.

    curl -X POST "ELASTIC_URL/INDEX_NAME/_search?pretty"
      -H "Authorization: ApiKey 'API_KEY'"
  2. A JSON response with your changes to the index is returned, confirming that Astra Streaming is sending your CDC changes to your Elasticsearch sink.

    {
      "took": 1,
      "timed_out": false,
      "_shards": {
        "total": 1,
        "successful": 1,
        "skipped": 0,
        "failed": 0
      },
      "hits": {
        "total": {
          "value": 3,
          "relation": "eq"
        },
        "max_score": 1.0,
        "hits": [
          {
            "_index": "INDEX_NAME",
            "_id": "khl_hI0Bh25AUvCHghQo",
            "_score": 1.0,
            "_source": {
              "name": "foo",
              "title": "bar"
            }
          },
          {
            "_index": "INDEX_NAME",
            "_id": "32a",
            "_score": 1.0,
            "_source": {
              "c1": "bob3123"
            }
          },
          {
            "_index": "INDEX_NAME",
            "_id": "32b",
            "_score": 1.0,
            "_source": {
              "c1": "bob3123b"
            }
          }
        ]
      }
    }

Outcomes

At this point you have successfully:

  • Created a tenant, topic, and table.

  • Connected your Astra DB Serverless database to CDC.

  • Connected Elasicsearch sink to your CDC and verified that messages are sent and received successfully.

Increase the CDC data-topic Partitions

After enabling CDC, 3 data and 3 log partitions are created under the astracdc namespace. Increasing the number of partitions will create new partitions, but existing data will remain in the old partitions. New messages will be distributed across the new partitions.

  1. Confirm the current state of the topic before making changes.

    bin/pulsar-admin topics list-partitioned-topics astracdc

    Result:

    persistent://ten01/astracdc/data-7e3a1b2c-4d5e-6f7a-8b9c-0d1e2f3a4b5c-keysp.table1-partition-1
    persistent://ten01/astracdc/log-7e3a1b2c-4d5e-6f7a-8b9c-0d1e2f3a4b5c-keysp.table1-partition-2
    persistent://ten01/astracdc/data-7e3a1b2c-4d5e-6f7a-8b9c-0d1e2f3a4b5c-keysp.table1-partition-0
    persistent://ten01/astracdc/log-7e3a1b2c-4d5e-6f7a-8b9c-0d1e2f3a4b5c-keysp.table1-partition-0
    persistent://ten01/astracdc/log-7e3a1b2c-4d5e-6f7a-8b9c-0d1e2f3a4b5c-keysp.table1-partition-1
    persistent://ten01/astracdc/data-7e3a1b2c-4d5e-6f7a-8b9c-0d1e2f3a4b5c-keysp.table1-partition-2
  2. Use the update-partitioned-topic command to change the number of partitions for a specified topic.

    bin/pulsar-admin topics update-partitioned-topic ten01/astracdc/data-7e3a1b2c-4d5e-6f7a-8b9c-0d1e2f3a4b5c-keysp.table1 --partitions 10

    Here, we are increasing the number of partitions to 10. You can only increase the number of partitions. Decreasing is not supported due to potential data loss and message ordering issues.

  3. Verify the update.

    bin/pulsar-admin topics list ten01/astracdc

    Result:

    persistent://ten01/astracdc/log-7e3a1b2c-4d5e-6f7a-8b9c-0d1e2f3a4b5c-keysp.table1-partition-2
    persistent://ten01/astracdc/log-7e3a1b2c-4d5e-6f7a-8b9c-0d1e2f3a4b5c-keysp.table1-partition-0
    persistent://ten01/astracdc/log-7e3a1b2c-4d5e-6f7a-8b9c-0d1e2f3a4b5c-keysp.table1-partition-1
    persistent://ten01/astracdc/data-7e3a1b2c-4d5e-6f7a-8b9c-0d1e2f3a4b5c-keysp.table1-partition-9
    persistent://ten01/astracdc/data-7e3a1b2c-4d5e-6f7a-8b9c-0d1e2f3a4b5c-keysp.table1-partition-8
    persistent://ten01/astracdc/data-7e3a1b2c-4d5e-6f7a-8b9c-0d1e2f3a4b5c-keysp.table1-partition-7
    persistent://ten01/astracdc/data-7e3a1b2c-4d5e-6f7a-8b9c-0d1e2f3a4b5c-keysp.table1-partition-6
    persistent://ten01/astracdc/data-7e3a1b2c-4d5e-6f7a-8b9c-0d1e2f3a4b5c-keysp.table1-partition-1
    persistent://ten01/astracdc/data-7e3a1b2c-4d5e-6f7a-8b9c-0d1e2f3a4b5c-keysp.table1-partition-0
    persistent://ten01/astracdc/data-7e3a1b2c-4d5e-6f7a-8b9c-0d1e2f3a4b5c-keysp.table1-partition-5
    persistent://ten01/astracdc/data-7e3a1b2c-4d5e-6f7a-8b9c-0d1e2f3a4b5c-keysp.table1-partition-4
    persistent://ten01/astracdc/data-7e3a1b2c-4d5e-6f7a-8b9c-0d1e2f3a4b5c-keysp.table1-partition-3
    persistent://ten01/astracdc/data-7e3a1b2c-4d5e-6f7a-8b9c-0d1e2f3a4b5c-keysp.table1-partition-2
  4. Check the topic to confirm it has been updated to have 10 partitions.

    bin/pulsar-admin topics partitioned-stats persistent://ten01/astracdc/data-7e3a1b2c-4d5e-6f7a-8b9c-0d1e2f3a4b5c-keysp.table1

    Result:

    Details
    {
      "msgRateIn" : 0.0,
      "msgThroughputIn" : 0.0,
      "msgRateOut" : 0.0,
      "msgThroughputOut" : 0.0,
      "bytesInCounter" : 0,
      "msgInCounter" : 0,
      "bytesOutCounter" : 0,
      "msgOutCounter" : 0,
      "averageMsgSize" : 0.0,
      "msgChunkPublished" : false,
      "storageSize" : 0,
      "backlogSize" : 0,
      "publishRateLimitedTimes" : 0,
      "earliestMsgPublishTimeInBacklogs" : 0,
      "offloadedStorageSize" : 0,
      "lastOffloadLedgerId" : 0,
      "lastOffloadSuccessTimeStamp" : 0,
      "lastOffloadFailureTimeStamp" : 0,
      "publishers" : [ ],
      "waitingPublishers" : 0,
      "subscriptions" : { },
      "replication" : { },
      "nonContiguousDeletedMessagesRanges" : 0,
      "nonContiguousDeletedMessagesRangesSerializedSize" : 0,
      "compaction" : {
        "lastCompactionRemovedEventCount" : 0,
        "lastCompactionSucceedTimestamp" : 0,
        "lastCompactionFailedTimestamp" : 0,
        "lastCompactionDurationTimeInMills" : 0
      },
      "metadata" : {
        "partitions" : 10
      },
      "partitions" : { }
    }

Was this helpful?

Give Feedback

How can we improve the documentation?

© 2024 DataStax | Privacy policy | Terms of use

Apache, Apache Cassandra, Cassandra, Apache Tomcat, Tomcat, Apache Lucene, Apache Solr, Apache Hadoop, Hadoop, Apache Pulsar, Pulsar, Apache Spark, Spark, Apache TinkerPop, TinkerPop, Apache Kafka and Kafka are either registered trademarks or trademarks of the Apache Software Foundation or its subsidiaries in Canada, the United States and/or other countries. Kubernetes is the registered trademark of the Linux Foundation.

General Inquiries: +1 (650) 389-6000, info@datastax.com