Create a Change Data Capture (CDC) connector

CDC for Astra DB incurs billed charges based on your Astra Streaming usage. For more information, see Astra Streaming pricing and CDC metering rates.

CDC for Astra DB automatically captures changes in real time, de-duplicates the changes, and then streams the clean set of changed data into Astra Streaming where it can be processed by client applications or sent to downstream systems.

Astra Streaming processes data changes through a Pulsar topic. By design, the Change Data Capture (CDC) component has a one-to-one correspondence between a table and a single Pulsar topic.

Supported data structures

CDC for Astra DB supports the following CQL data types and corresponding AVRO or logical types:

Data type AVRO type

ascii

string

bigint

long

blob

bytes

boolean

boolean

counter

long

date

int

decimal

cql_decimal

double

double

duration

cql_duration

float

float

inet

string

int

int

list

array

map

map (only string-type keys are supported)

set

array

smallint

int

text

string

time

long

timestamp

long

timeuuid

string

tinyint

int

uuid

string

varchar

string

varint

cql_varint / bytes

Apache Cassandra® static columns are supported:

  • On row-level updates, static columns are included in the message value.

  • On partition-level updates, the clustering keys are null in the message key. The message value only has static columns on INSERT and UPDATE operations.

For columns using data types that are not supported, the data types are omitted from the events sent to the data topic. If a row update contains both supported and unsupported data types, the event includes only columns with supported data types.

AVRO interpretation

Keys from tables in Astra DB databases are strings, but CDC produces AVRO messages that are structures. The conversion for some AVRO structures requires additional tooling that can result in unexpected output.

The following table describes the conversion of AVRO logical types:

AVRO complex types
Name AVRO type Fields Explanation

collections

array

lists, sets

Sets and Lists are treated as AVRO type array, with the attribute items containing the schema of the array’s items.

decimal

record

BIG_INT, DECIMAL_SCALE

The Cassandra DECIMAL type is converted to a record with the cql_decimal logical type.

The record AVRO type is a schema containing the listed fields.

duration

record

CQL_DURATION_MONTHS, CQL_DURATION_DAYS, CQL_DURATION_NANOSECONDS

The Cassandra DURATION type is converted to a record with the cql_duration logical type.

The record AVRO type is a schema containing the listed fields.

maps

map

KEYS_CONVERTED_TO_STRINGS, VALUE_SCHEMA

The Cassandra MAP type is converted to the AVRO map type, but the keys are converted to strings. For complex types, the key is represented in JSON.

Limitations

CDC for Astra DB has the following limitations:

  • Does not manage table truncates.

  • Does not sync data available before starting the CDC agent.

  • Does not replay logged batches.

  • Does not manage time-to-live.

  • Does not support range deletes.

  • Does not allow CQL column names that match a Pulsar primitive type name, such as INT32.

  • Does not support multi-region.

  • Does not support multi-table mutations.

Configure CDC for Astra DB

To configure CDC for Astra DB, you must create a streaming tenant, table, and CDC connector, and then connect a sink to the CDC connector. The CDC connector connects your database to CDC, and it automatically creates a namespace and topics in your streaming tenant. The integration between your CDC connector and sink enables the sink to consume messages from a topic in the tenant, and then send them to the associated service deployment.

These instructions use an Elasticsearch sink as an example. You can use other Astra Streaming sinks.

Prerequisites

To enable CDC for Astra DB Serverless, you need the following:

Create a streaming tenant

  1. In the Astra Portal navigation menu, click Streaming.

  2. Click Create Tenant.

  3. Enter a name for the streaming tenant.

  4. Select a cloud provider and region.

    You can only use CDC for Astra DB in regions that support both Astra Streaming and Astra DB Serverless. For more information, see Astra Streaming regions and Astra DB Serverless database regions and maintenance schedules.

  5. Click Create Tenant.

Do not create a namespace or topics in your tenant. The CDC connector does this automatically.

Create a table

  1. In the Astra Portal navigation menu, select your database.

  2. Click CQL Console.

  3. Use the CQL Console to create a table with a primary key column in your database:

    CREATE TABLE IF NOT EXISTS KEYSPACE_NAME.TABLE_NAME (key text PRIMARY KEY, c1 text);

    Replace the following:

    • KEYSPACE_NAME: Your database’s keyspace name.

    • TABLE_NAME: Enter a name for the new table.

  4. Confirm table creation:

    select * from KEYSPACE_NAME.TABLE_NAME;
    Result:
    token@cqlsh> select * from KEYSPACE_NAME.TABLE_NAME;
    
     key | c1
    -----+----
    
    (0 rows)
    token@cqlsh>

Connect to CDC for Astra DB

After you Create a streaming tenant and Create a table, create a CDC connector:

  1. In the Astra Portal navigation menu, select your database.

  2. Click the CDC tab.

  3. Click Enable CDC.

  4. Select a tenant, keyspace, and table.

  5. Click Enable CDC to create the CDC connector.

When you enabling CDC, Astra DB creates a new astracdc namespace in your streaming tenant that has two topics:

  • The data- topic consumes CDC data in Astra Streaming.

  • The log- topic consumes schema changes, processes them, and then writes clean data to the data- topic. The log- topic is required for CDC functionality; it is not for direct use.

Each topic has three partitions by default. You can increase partitions for the data- topic, if desired. For more information, see Increase CDC data topic partitions.

Connect a sink

After you create the CDC connector, connect a sink to it. This integration enables the sink to consume messages from the data- topic, and then send them to the associated service deployment.

This example uses an Elasticsearch sink. You can use other Astra Streaming sinks.

  1. In the Astra Portal navigation menu, select your database.

  2. Click the CDC tab.

  3. In the Change Data Capture list, click the name of the CDC-enabled table that you want to use.

  4. Click Add Elastic Search Sink.

  5. For Namespace, select astracdc.

  6. For Sink Type, select Elastic Search.

  7. Enter a name for the sink.

  8. In the Connect Topics section, for the Input topic, select a data- topic in the astracdc namespace.

  9. In the Sink-Specific Configuration section, enter your Elasticsearch URL, Index name, and API key for your Elasticsearch deployment. Do not enter a username, password, or token.

  10. For Ignore Record Key, Null Value Action, and Enable Schema, DataStax recommends the following values:

    • Ignore Record Key: false

    • Null Value Action: DELETE

    • Enable Schema: true

  11. Click Create.

If sink creation succeeds, a confirmation message appears in the Astra Portal, and the new sink appears on the Sinks tab.

Test the connection

Test the CDC functionality to verify that your Elasticsearch sink receives data through the CDC connector:

  1. In the Astra Portal navigation menu, select your database.

  2. Click CQL Console.

  3. Process some changes with CDC. For example, you can modify the table:

    INSERT INTO KEYSPACE_NAME.TABLE_NAME (key,c1) VALUES ('32a','bob3123');
    INSERT INTO KEYSPACE_NAME.TABLE_NAME (key,c1) VALUES ('32b','bob3123b');

    Replace the following:

    • KEYSPACE_NAME: Your database’s keyspace name.

    • TABLE_NAME: Your CDC-enabled table name.

  4. Verify the changes:

    select * from KEYSPACE_NAME.TABLE_NAME;
    Result:
     key | c1
    -----+----------
     32a |  bob3123
     32b | bob3123b
    
    (2 rows)
    token@cqlsh>
  5. Fetch the data from your sink service deployment. For example, send a GET request to an Elasticsearch deployment:

    curl -sS --location -X POST "ELASTICSEARCH_URL/INDEX_NAME/_search?pretty" \
    -header "Authorization: ApiKey 'API_KEY'"

    Replace ELASTICSEARCH_URL, INDEX_NAME, and API_KEY with the values from your Elasticsearch deployment that you used to connect the sink.

  6. Make sure the response includes your changes. This indicates that Astra Streaming successfully sent changes tracked by CDC to your sink service deployment.

    {
      "took": 1,
      "timed_out": false,
      "_shards": {
        "total": 1,
        "successful": 1,
        "skipped": 0,
        "failed": 0
      },
      "hits": {
        "total": {
          "value": 3,
          "relation": "eq"
        },
        "max_score": 1.0,
        "hits": [
          {
            "_index": "INDEX_NAME",
            "_id": "khl_hI0Bh25AUvCHghQo",
            "_score": 1.0,
            "_source": {
              "name": "foo",
              "title": "bar"
            }
          },
          {
            "_index": "INDEX_NAME",
            "_id": "32a",
            "_score": 1.0,
            "_source": {
              "c1": "bob3123"
            }
          },
          {
            "_index": "INDEX_NAME",
            "_id": "32b",
            "_score": 1.0,
            "_source": {
              "c1": "bob3123b"
            }
          }
        ]
      }
    }

Increase CDC data topic partitions

When you enable CDC, Astra DB creates three data- partitions and three log- partitions in your tenant’s astracdc namespace.

Optionally, you can increase the number of partitions for the data- topic. Increasing the number of partitions creates new partitions, but existing data remains in the original partitions. New messages are distributed across the new partitions.

To increase the number of data- topic partitions, do the following:

  1. Before you make changes, use pulsar-admin to get the namespace’s existing partitions:

    bin/pulsar-admin topics list-partitioned-topics astracdc

    The response describes the existing partitions for the data- and log- topics. The default configuration has three partitions for each topic numbered 0, 1, and 2.

    persistent://TENANT_NAME/astracdc/data-ID-KEYSPACE_NAME.TABLE_NAME-partition-1
    persistent://TENANT_NAME/astracdc/log-ID-KEYSPACE_NAME.TABLE_NAME-partition-2
    persistent://TENANT_NAME/astracdc/data-ID-KEYSPACE_NAME.TABLE_NAME-partition-0
    persistent://TENANT_NAME/astracdc/log-ID-KEYSPACE_NAME.TABLE_NAME-partition-0
    persistent://TENANT_NAME/astracdc/log-ID-KEYSPACE_NAME.TABLE_NAME-partition-1
    persistent://TENANT_NAME/astracdc/data-ID-KEYSPACE_NAME.TABLE_NAME-partition-2

    The TENANT_NAME, ID, KEYSPACE_NAME, and TABLE_NAME values are the same for each partition. The actual values depend on your CDC configuration.

  2. From the response, get a data- topic string without persistent:// and the partition number.

    For example, from persistent://TENANT_NAME/astracdc/data-ID-KEYSPACE_NAME.TABLE_NAME-partition-1, extract only TENANT_NAME/astracdc/data-ID-KEYSPACE_NAME.TABLE_NAME.

  3. Use the update-partitioned-topic command to increase the number of partitions for the data- topic:

    bin/pulsar-admin topics update-partitioned-topic DATA_TOPIC_STRING --partitions NUMBER

    Replace the following:

    • DATA_TOPIC_STRING: The data- topic string from the list-partitioned-topics response in the format of TENANT_NAME/astracdc/data-ID-KEYSPACE_NAME.TABLE_NAME.

    • NUMBER: The desired total number of partitions.

      For example, --partitions 10 increases the total number of partitions to 10. If the topic has 3 partitions, then --partitions 10 creates 7 new partitions for a total of 10.

      You can only increase the number of partitions.

      You can’t decrease the number of partitions due to potential data loss and message ordering issues.

  4. Verify the increase:

    bin/pulsar-admin topics list TENANT_NAME/astracdc

    Replace TENANT_NAME with your CDC tenant name.

  5. Make sure the response includes the desired total number of partitions.

    The following response indicates that the data- topic now has 10 total partitions numbered 0-9:

    persistent://TENANT_NAME/astracdc/log-ID-KEYSPACE_NAME.TABLE_NAME-partition-2
    persistent://TENANT_NAME/astracdc/log-ID-KEYSPACE_NAME.TABLE_NAME-partition-0
    persistent://TENANT_NAME/astracdc/log-ID-KEYSPACE_NAME.TABLE_NAME-partition-1
    persistent://TENANT_NAME/astracdc/data-ID-KEYSPACE_NAME.TABLE_NAME-partition-9
    persistent://TENANT_NAME/astracdc/data-ID-KEYSPACE_NAME.TABLE_NAME-partition-8
    persistent://TENANT_NAME/astracdc/data-ID-KEYSPACE_NAME.TABLE_NAME-partition-7
    persistent://TENANT_NAME/astracdc/data-ID-KEYSPACE_NAME.TABLE_NAME-partition-6
    persistent://TENANT_NAME/astracdc/data-ID-KEYSPACE_NAME.TABLE_NAME-partition-1
    persistent://TENANT_NAME/astracdc/data-ID-KEYSPACE_NAME.TABLE_NAME-partition-0
    persistent://TENANT_NAME/astracdc/data-ID-KEYSPACE_NAME.TABLE_NAME-partition-5
    persistent://TENANT_NAME/astracdc/data-ID-KEYSPACE_NAME.TABLE_NAME-partition-4
    persistent://TENANT_NAME/astracdc/data-ID-KEYSPACE_NAME.TABLE_NAME-partition-3
    persistent://TENANT_NAME/astracdc/data-ID-KEYSPACE_NAME.TABLE_NAME-partition-2
  6. Confirm that the topic was updated to have the desired number of partitions:

    bin/pulsar-admin topics partitioned-stats persistent://**DATA_TOPIC_STRING**

    Replace DATA_TOPIC_STRING with the data- topic string in the format of TENANT_NAME/astracdc/data-ID-KEYSPACE_NAME.TABLE_NAME.

    Result
    {
      "msgRateIn" : 0.0,
      "msgThroughputIn" : 0.0,
      "msgRateOut" : 0.0,
      "msgThroughputOut" : 0.0,
      "bytesInCounter" : 0,
      "msgInCounter" : 0,
      "bytesOutCounter" : 0,
      "msgOutCounter" : 0,
      "averageMsgSize" : 0.0,
      "msgChunkPublished" : false,
      "storageSize" : 0,
      "backlogSize" : 0,
      "publishRateLimitedTimes" : 0,
      "earliestMsgPublishTimeInBacklogs" : 0,
      "offloadedStorageSize" : 0,
      "lastOffloadLedgerId" : 0,
      "lastOffloadSuccessTimeStamp" : 0,
      "lastOffloadFailureTimeStamp" : 0,
      "publishers" : [ ],
      "waitingPublishers" : 0,
      "subscriptions" : { },
      "replication" : { },
      "nonContiguousDeletedMessagesRanges" : 0,
      "nonContiguousDeletedMessagesRangesSerializedSize" : 0,
      "compaction" : {
        "lastCompactionRemovedEventCount" : 0,
        "lastCompactionSucceedTimestamp" : 0,
        "lastCompactionFailedTimestamp" : 0,
        "lastCompactionDurationTimeInMills" : 0
      },
      "metadata" : {
        "partitions" : 10
      },
      "partitions" : { }
    }

Was this helpful?

Give Feedback

How can we improve the documentation?

© 2024 DataStax | Privacy policy | Terms of use

Apache, Apache Cassandra, Cassandra, Apache Tomcat, Tomcat, Apache Lucene, Apache Solr, Apache Hadoop, Hadoop, Apache Pulsar, Pulsar, Apache Spark, Spark, Apache TinkerPop, TinkerPop, Apache Kafka and Kafka are either registered trademarks or trademarks of the Apache Software Foundation or its subsidiaries in Canada, the United States and/or other countries. Kubernetes is the registered trademark of the Linux Foundation.

General Inquiries: +1 (650) 389-6000, info@datastax.com