CDC for Cassandra Events

The DataStax CDC for Cassandra agent pushes the mutation primary key for the CDC-enabled table into the Apache Pulsar events topic (also called the dirty topic). The messages in the data topic (or clean topic) are keyed messages where both the key and the payload are AVRO records:

  • The message key is an AVRO record including all the primary key columns of your Cassandra table.

  • The message payload is an AVRO record including regular columns from your Cassandra table.

In order to support Pulsar Topic Compaction, the message key is encoded separately from the message payload, in the message metadata.

Finally, the following CQL data types are encoded as AVRO logical types:

  • Date

  • Decimal

  • Duration

  • Timestamp

  • Varint

  • Uuid, timeuuid

See AVRO Logical Types for more info on AVRO.

Change Event’s Key

For a given table, the change event’s key is an AVRO record that contains a field for each column in the primary key of the table at the time the event was created. Both the events and the data topics (also called the dirty and the clean topics) have the same message key, an AVRO record including the primary key columns.

INSERT Event

Let’s create a Cassandra table to illustrate what happens:

CREATE KEYSPACE ks1 WITH replication = {'class': 'NetworkTopologyStrategy', 'us-east1': '3'}  AND durable_writes = true;

CREATE TABLE ks1.tbl1 (
    key text PRIMARY KEY,
    c1 text
) WITH additional_write_policy = '99PERCENTILE'
    AND bloom_filter_fp_chance = 0.01
    AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
    AND comment = ''
    AND compaction = {'class': 'org.apache.cassandra.db.compaction.UnifiedCompactionStrategy'}
    AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
    AND crc_check_chance = 1.0
    AND default_time_to_live = 0
    AND gc_grace_seconds = 864000
    AND max_index_interval = 2048
    AND memtable_flush_period_in_ms = 0
    AND min_index_interval = 128
    AND read_repair = 'BLOCKING'
    AND speculative_retry = '99PERCENTILE';

Then insert a row:

INSERT INTO ks1.tbl1 (key, c1) VALUES (`3`, `bob3`)

When an INSERT statement is executed, the CDC agent catches the mutation from the commitlog files and sends a message into the dirty topic. The deployed source connector then reads the full row from the Cassandra datacenter, and publishes a message into the clean topic with all regular columns encoded as an AVRO record in the message payload.

Here is a JSON representation of the AVRO Record for the message key:

{
  "key": "3"
}

And a JSON representation of the AVRO Record for the message payload:

{
  "c1": "bob3"
}

DELETE Event

When deleting a row, the Cassandra source connector will publish a tombstone message into the clean topic with a message key matching the C* primary key and a null payload. The null payload acts as a message tombstone, and the downstream Pulsar connectors like the Elasticsearch sink should delete the corresponding row (or document in the case of Elasticsearch).

If using the json output format, the Pulsar message will pass an empty JSON instead of a null payload, because Pulsar doesn’t support nulls in message values. The downstream connectors will still delete the corresponding row.

Check Source Connector status

You can check the connector status with the following command. The connector must be running.

bin/pulsar-admin source status --name cassandra-source-ks1-table1
{
    "numInstances" : 1,
    "numRunning" : 1,
    "instances" : [ {
        "instanceId" : 0,
        "status" : {
        "running" : true,
        "error" : "",
        "numRestarts" : 0,
        "numReceivedFromSource" : 0,
        "numSystemExceptions" : 0,
        "latestSystemExceptions" : [ ],
        "numSourceExceptions" : 0,
        "latestSourceExceptions" : [ ],
        "numWritten" : 0,
        "lastReceivedTime" : 0,
        "workerId" : "c-standalone-fw-localhost-8080"
        }
    } ]
}

Troubleshooting

If you’re having issues consuming CDC events, check the source connector logs on your Pulsar function workers and the data topic schema.

Check the source connector logs

Check the source connector logs on your Pulsar function workers. The name of the logs depends on the connectors' name.

cat logs/functions/public/default/cassandra-source-ks1-table1/cassandra-source-ks1-table1-0.log

Check the data topic schema

Check the Pulsar schema to ensure the clean topic matches your CQL table:

bin/pulsar-admin schemas get "persistent://public/default/data-ks1.table1"
{
  "version": 0,
  "schemaInfo": {
    "name": "data-ks1.table1",
    "schema": {
      "key": {
        "name": "root",
        "schema": {
          "type": "record",
          "name": "root",
          "namespace": "ks1",
          "doc": "",
          "fields": [
            {
              "name": "key",
              "type": "string"
            }
          ]
        },
        "type": "AVRO",
        "properties": {}
      },
      "value": {
        "name": "root",
        "schema": {
          "type": "record",
          "name": "root",
          "namespace": "ks1",
          "doc": "",
          "fields": [
            {
              "name": "c1",
              "type": [
                "null",
                "string"
              ]
            }
          ]
        },
        "type": "AVRO",
        "properties": {}
      }
    },
    "type": "KEY_VALUE",
    "properties": {
      "key.schema.name": "root",
      "key.schema.properties": "{}",
      "key.schema.type": "AVRO",
      "kv.encoding.type": "SEPARATED",
      "value.schema.name": "root",
      "value.schema.properties": "{}",
      "value.schema.type": "AVRO"
    }
  }
}

What’s next?

For more on change data capture, see Change Data Capture with DataStax Enterprise.