Create a Change Data Capture (CDC) connector

CDC connectors are only available for Serverless (Non-Vector) deployments.

Enabling CDC for Serverless (Non-Vector) databases increases costs based on your Astra Streaming usage. For more information, see Astra Streaming pricing and CDC metering rates.

You can use the Change Data Capture (CDC) connector to:

  • Process data by client applications.

  • Send data to downstream systems.

  • Capture changes in real time, de-duplicate changes, and stream the clean set of changed data into Astra Streaming.

Astra Streaming processes data changes via a Pulsar topic. By design, the Change Data Capture (CDC) component has a one-to-one correspondence between a table and a single Pulsar topic.

Supported data structures

CDC for Serverless (Non-Vector) databases supports the following data types and corresponding AVRO or logical types:

Data type

AVRO type

ascii

string

bigint

long

blob

bytes

boolean

boolean

counter

long

date

int

decimal

cql_decimal

double

double

duration

cql_duration

float

float

inet

string

int

int

list

array

map

map (only string-type keys are supported)

set

array

smallint

int

text

string

time

long

timestamp

long

timeuuid

string

tinyint

int

uuid

string

varchar

string

varint

cql_varint / bytes

Cassandra static columns are supported:

  • On row-level updates, static columns are included in the message value.

  • On partition-level updates, the clustering keys are null in the message key. The message value only has static columns on INSERT and UPDATE operations.

For columns using data types that are not supported, the data types are omitted from the events sent to the data topic. If a row update contains both supported and unsupported data types, the event includes only columns with supported data types.

AVRO interpretation

Serverless (Non-Vector) database keys are strings, and CDC produces AVRO messages, which are structures. The conversion for some AVRO structures requires additional tooling that can result in unexpected output.

The following table describes the conversion of AVRO logical types. The record AVRO type is a schema containing the listed fields.

AVRO complex types
Name AVRO type Fields Explanation

collections

array

lists, sets

Sets and Lists are treated as AVRO type array, with the attribute items containing the schema of the array’s items.

decimal

record

BIG_INT, DECIMAL_SCALE

The Cassandra DECIMAL type is converted to a record with the cql_decimal logical type.

duration

record

CQL_DURATION_MONTHS, CQL_DURATION_DAYS, CQL_DURATION_NANOSECONDS

The Cassandra DURATION type is converted to a record with the cql_duration logical type.

maps

map

KEYS_CONVERTED_TO_STRINGS, VALUE_SCHEMA

The Cassandra MAP type is converted to the AVRO map type, but the keys are converted to strings. For complex types, the key is represented in JSON.

Limitations

CDC for Serverless (Non-Vector) databases has the following limitations:

  • Does not manage table truncates.

  • Does not sync data available before starting the CDC agent.

  • Does not replay logged batches.

  • Does not manage time-to-live.

  • Does not support range deletes.

  • CQL column names must not match a Pulsar primitive type name (ex: INT32).

  • Does not support multi-region.

  • Does not support multi-table mutations.

Configure CDC for a Serverless (Non-Vector) database

To configure CDC for a Serverless (Non-Vector) database, you need to do the following:

  • Create a tenant, topic, and table.

  • Create a CDC connector, which connects your Serverless (Non-Vector) database to CDC.

  • Connect an Elasticsearch sink to your CDC connector.

    This example uses an Elasticsearch sink. You can use other Astra Streaming sinks.

  • Verify that the connection successfully sends and receives messages.

Prerequisites

To enable CDC for Serverless (Non-Vector) databases, you need the following:

Create a streaming tenant

  1. In the Astra Portal, go to Streaming.

  2. Click Create Tenant.

  3. Enter a name for the streaming tenant.

  4. Select a cloud provider and region.

    To use Astra Streaming CDC, the region must support both Astra Streaming and Serverless (Non-Vector) databases. For more information, see Astra Streaming Regions.

  5. Select Create Tenant.

Create a table

  1. In the Astra Portal navigation menu, select your Serverless (Non-Vector) database.

  2. Click the CQL Console tab.

  3. Use the CQL Console to create a table with a primary key column in your database:

    CREATE TABLE IF NOT EXISTS KEYSPACE_NAME.TABLE_NAME (key text PRIMARY KEY, c1 text);

    Replace the following:

    • KEYSPACE_NAME: Your database’s keyspace name.

    • TABLE_NAME: Enter a name for the new table.

  4. Confirm table creation:

    select * from KEYSPACE_NAME.TABLE_NAME;
    Result:
    token@cqlsh> select * from KEYSPACE_NAME.TABLE_NAME;
    
     key | c1
    -----+----
    
    (0 rows)
    token@cqlsh>

Connect to CDC for Serverless (Non-Vector) databases

After you Create a streaming tenant and Create a table, create a CDC connector:

  1. In the Astra Portal navigation menu, select your database.

  2. Click the CDC tab.

  3. Click Enable CDC.

  4. Select a tenant, keyspace, and table.

  5. Click Enable CDC to create the CDC connector.

Enabling CDC creates a new astracdc namespace with two new topics: data- and log-. The log- topic consumes schema changes, processes them, and then writes clean data to the data- topic. The log- topic is for CDC functionality; it is not for direct use. The data- topic consumes CDC data in Astra Streaming.

Connect Elasticsearch sink

Connect an Elasticsearch sink to CDC that consumes messages from the data- topic and sends them to your Elasticsearch deployment:

  1. In the Astra Portal navigation menu, select your database.

  2. Click the CDC tab.

  3. In the Change Data Capture list, select the name of the CDC-enabled table that you want to use.

  4. Click Add Elastic Search Sink.

  5. For the Namespace, select astracdc.

  6. For the Sink Type, select Elastic Search.

  7. Enter a name for the sink.

  8. In the Connect Topics section, for the Input topic, select a data- topic in the astracdc namespace.

  9. In the Sink-Specific Configuration section, enter your Elasticsearch URL, Index name, and API key for your Elasticsearch deployment. Do not enter a username, password, or token.

  10. For Ignore Record Key, Null Value Action, and Enable Schema, DataStax recommends replacing the default values with the following:

    • Ignore Record Key: false

    • Null Value Action: DELETE

    • Enable Schema: true

  11. Click Create.

If sink creation succeeds, a confirmation message appears in the Astra Portal, and the new sink appears in the Sinks tab.

Test the connection

Test the CDC functionality and verify that your Elasticsearch sink receives data through the CDC connector.

  1. In the Astra Portal navigation menu, select your database.

  2. Click the CQL Console tab.

  3. Process some changes with CDC. For example, you can modify the table:

    INSERT INTO KEYSPACE_NAME.TABLE_NAME (key,c1) VALUES ('32a','bob3123');
    INSERT INTO KEYSPACE_NAME.TABLE_NAME (key,c1) VALUES ('32b','bob3123b');

    Replace the following:

    • KEYSPACE_NAME: Your database’s keyspace name.

    • TABLE_NAME: Your CDC-enabled table name.

  4. Verify that the messages sent successfully:

    select * from KEYSPACE_NAME.TABLE_NAME;
    Result:
     key | c1
    -----+----------
     32a |  bob3123
     32b | bob3123b
    
    (2 rows)
    token@cqlsh>
  5. Send a GET request to your Elasticsearch deployment:

    curl -X POST "ELASTICSEARCH_URL/INDEX_NAME/_search?pretty"
      -H "Authorization: ApiKey 'API_KEY'"
  6. Make sure the response describes changes to the index. This indicates that Astra Streaming successfully sends your CDC changes to your Elasticsearch sink.

    {
      "took": 1,
      "timed_out": false,
      "_shards": {
        "total": 1,
        "successful": 1,
        "skipped": 0,
        "failed": 0
      },
      "hits": {
        "total": {
          "value": 3,
          "relation": "eq"
        },
        "max_score": 1.0,
        "hits": [
          {
            "_index": "INDEX_NAME",
            "_id": "khl_hI0Bh25AUvCHghQo",
            "_score": 1.0,
            "_source": {
              "name": "foo",
              "title": "bar"
            }
          },
          {
            "_index": "INDEX_NAME",
            "_id": "32a",
            "_score": 1.0,
            "_source": {
              "c1": "bob3123"
            }
          },
          {
            "_index": "INDEX_NAME",
            "_id": "32b",
            "_score": 1.0,
            "_source": {
              "c1": "bob3123b"
            }
          }
        ]
      }
    }

Was this helpful?

Give Feedback

How can we improve the documentation?

© 2024 DataStax | Privacy policy | Terms of use

Apache, Apache Cassandra, Cassandra, Apache Tomcat, Tomcat, Apache Lucene, Apache Solr, Apache Hadoop, Hadoop, Apache Pulsar, Pulsar, Apache Spark, Spark, Apache TinkerPop, TinkerPop, Apache Kafka and Kafka are either registered trademarks or trademarks of the Apache Software Foundation or its subsidiaries in Canada, the United States and/or other countries. Kubernetes is the registered trademark of the Linux Foundation.

General Inquiries: +1 (650) 389-6000, info@datastax.com