Change Data Capture with DataStax Enterprise

Capture schema changes in your C* tables and pass them to Apache Pulsar® with DataStax Change Data Capture (CDC). This doc will guide you through installing, configuring, and using CDC with C* or DSE in a VM-based deployment.

This installation requires the following. Latest version artifacts are available here. Use cdc apache cassandra?color=green&display name=tag for the latest version.

  • DSE 6.8.16 or later

  • OSS Apache Cassandra®

  • CDC Agent

    • DSE - use agent-dse4-<version>-all.jar

    • OSS C* - use agent-c4-<version>-all.jar

  • Pulsar

    • IBM Elite Support for Apache Pulsar - use agent-dse4-<version>-all.jar

  • Pulsar C* source connector (CSC)

    • Pulsar Cassandra Source NAR - use pulsar-cassandra-source-<version>.nar

Architecture overview

The CDC pipeline consists of three stages that work together to capture and process changes and publishes change events to the events topic in Pulsar.

  • The CDC agent running in Pulsar creates events topics using the format: events-<keyspace>.<table>. When configuring the source connector, the events.topic parameter must match exactly what the CDC agent publishes to. For example, if your keyspace is ks1 and table is table1, the events.topic parameter is persistent://public/default/events-ks1.table1.

  • The Cassandra Source Connector consumes change events from the events topic (events-<keyspace>.<table>), fetches the full row data from Cassandra for each change event, performs de-duplication to handle events from multiple replicas, and publishes processed data to the data topic as data-<keyspace>.<table>.

  • Downstream consumers consume from the data topic for further processing. Consumers include sink connectors, analytics systems, or other applications.

Installing and configuring

  1. Download the Pulsar tarball and set up a Pulsar cluster. This example uses Pulsar standalone mode, but you can also use our helpful Ansible Scripts.

    cd $PULSAR_HOME
    bin/pulsar standalone

    We recommend using the latest CDC agent version (at least version 1.0.4+) to support C* collection data types.

  2. Install C*/DSE.

  3. After installing C*/DSE, but before starting the C*/DSE service, set the Cassandra-env.sh configuration:

    export CDC_PULSAR_SERVICE_URL="<pulsar_broker_service_url>" (e.g. pulsar://<pulsar_server_ip>:6650)
    
    # needed when Pulsar JWT authentication is enabled
    export CDC_PULSAR_AUTH_PLUGIN_CLASS_NAME="org.apache.pulsar.client.impl.auth.AuthenticationToken"
    export CDC_PULSAR_AUTH_PARAMS="file://</path/to/token/file>"
    
    # needed when Pulsar TLS encryption is enabled
    export CDC_TLS_TRUST_CERTS_FILE_PATH=”</path/to/trusted/cert/file>”
    
    # DSE CDC
    JVM_OPTS="$JVM_OPTS -javaagent:/home/automaton/cdc104/agent-dse4-<version>-all.jar"

    For CDC agent versions after 1.0.3, the CDC agent Pulsar connection parameters are provided as system environment variables (see DSE CDC in the example above).

    For CDC agent versions before 1.0.3, the CDC agent Pulsar connection parameters are also provided as extra JVM options, as below:

    export JVM_EXTRA_OPTS="-javaagent:/path/to/agent-c4-<version>-all.jar=pulsarServiceUrl=pulsar://pulsar:6650"
  4. Set the cassandra.yaml configuration:

    cdc_enabled:true
    cdc_raw_directory:/var/lib/cassandra/cdc_raw
    commitlog_sync_period_in_ms:2000
    cdc_total_space_in_mb:4096
  5. Start C*/DSE and verify your logs are similar to the C* system.log file below. CDC agent started indicates your CDC agent has started properly.

    INFO  [main] 2022-04-11 18:47:06,302  Agent.java:60 - Starting CDC agent, cdc_raw_directory=/var/lib/cassandra/cdc_raw
    INFO  [main] 2022-04-11 18:47:06,331  AgentConfig.java:526 - maxInflightMessagesPerTask=16384, sslEnabledProtocols=TLSv1.2,TLSv1.1,TLSv1, cdcWorkingDir=/usr/share/dse/data/cdc, pulsarMaxPendingMessagesAcrossPartitions=50000, pulsarMaxPendingMessages=1000, sslTruststorePath=null, cdcPollIntervalMs=60000, pulsarAuthParams=null, sslHostnameVerificationEnable=false, errorCommitLogReprocessEnabled=false, sslTruststorePassword=null, tlsTrustCertsFilePath=null, sslKeystorePath=null, sslKeystorePassword=null, sslAllowInsecureConnection=false, cdcConcurrentProcessors=-1, pulsarServiceUrl=pulsar://10.101.32.213:6650, pulsarKeyBasedBatcher=false, sslTruststoreType=JKS, pulsarBatchDelayInMs=-1, topicPrefix=events-, sslCipherSuites=null, pulsarAuthPluginClassName=null, sslProvider=null, useKeyStoreTls=false
    INFO  [main] 2022-04-11 18:47:06,433  Agent.java:92 - CDC agent started

Deploy Pulsar Connector

  1. Deploy the Pulsar Cassandra Source Connector (CSC) for each CDC-enabled C* table. The connector consumes from the events topic (where the CDC agent publishes) and writes to the data topic (where downstream consumers read from).

    The events.topic parameter in the source connector configuration must match exactly the topic name that the CDC agent publishes to.

    The agent creates topics using the format `events-**KEYSPACE**.**TABLE**` when using the default `events-` prefix. For example, for keyspace `ks1` and table `table1`, the events topic is `events-ks1.table1`.

    The outputFormat source configuration controls the format of messages on the data topic.

    • key-value-avro - Default behavior. Key and value are encoded separately in AVRO format.

    • key-value-json - Key and value are encoded separately in JSON format.

    • json - Key and value are encoded together in a single JSON object. Key field is populated with a JSON string representing key fields.

      • Key-value-avro

      • Key-value-json

      • JSON

      $ pulsar-admin source create \
        --name <csc_connector_name> \
        --archive /path/to/pulsar-cassandra-source-<version>.nar \
        --tenant public \
        --namespace default \
        --destination-topic-name persistent://public/default/data-<keyspace>.<table> \
        --parallelism 1 \
        --source-config {
        	"events.topic": "persistent://public/default/events-<keyspace>.<table>",
        	"keyspace": "<C*_keyspace>",
        	"table": "<C*_table>",
        	"contactPoints": "<C*_contact_point_list>",
        	"port": "9042",
        	"loadBalancing.localDc": "<C*_DC_name>",
          "outputFormat": "key-value-avro"
        }
      $ pulsar-admin source create \
        --name <csc_connector_name> \
        --archive /path/to/pulsar-cassandra-source-<version>.nar \
        --tenant public \
        --namespace default \
        --destination-topic-name persistent://public/default/data-<keyspace>.<table> \
        --parallelism 1 \
        --source-config {
        	"events.topic": "persistent://public/default/events-<keyspace>.<table>",
        	"keyspace": "<C*_keyspace>",
        	"table": "<C*_table>",
        	"contactPoints": "<C*_contact_point_list>",
        	"port": "9042",
        	"loadBalancing.localDc": "<C*_DC_name>",
          "outputFormat": "key-value-json"
        }
      $ pulsar-admin source create \
        --name <csc_connector_name> \
        --archive /path/to/pulsar-cassandra-source-<version>.nar \
        --tenant public \
        --namespace default \
        --destination-topic-name persistent://public/default/data-<keyspace>.<table> \
        --parallelism 1 \
        --source-config {
        	"events.topic": "persistent://public/default/events-<keyspace>.<table>",
        	"keyspace": "<C*_keyspace>",
        	"table": "<C*_table>",
        	"contactPoints": "<C*_contact_point_list>",
        	"port": "9042",
        	"loadBalancing.localDc": "<C*_DC_name>",
          "outputFormat": "json"
        }
  2. Verify the CSC connector is deployed.

    pulsar-admin source list ["<csc_connector_name>"]
  3. Check the CSC connector log file at <$PULSAR_HOME>/logs/functions/public/default/<csc_connector_name> for errors.

Verify end-to-end operation

Now that Pulsar, C*/DSE, CDC, and the CSC connector are installed and verified to be operational, you can verify the end-to-end message flow.

  1. To verify that the CDC agent in Pulsar is publishing to the events topic, confirm that the events topic exists and has messages:

    pulsar-admin topics stats persistent://public/default/events-<keyspace>.<table>

    Optionally, view raw CDC events from the events topic:

    pulsar-client consume -s test-sub-events -st Earliest -n 10 persistent://public/default/events-<keyspace>.<table>
  2. To verify that the Cassandra Source Connector is processing events and publishing to the data topic, check the connector and data topic status:

    # Check connector status
    pulsar-admin source status --name <csc_connector_name>
    
    # Check data topic stats
    pulsar-admin topics stats persistent://public/default/data-<keyspace>.<table>
  3. Consume processed data from the data topic:

    pulsar-client consume -s mysub -st auto_consume -n 0 persistent://public/default/data-<keyspace>.<table>

    Any captured CDC events from your database table should be reflected in the command line output of the data topic consumer.

Was this helpful?

Give Feedback

How can we improve the documentation?

© Copyright IBM Corporation 2025 | Privacy policy | Terms of use Manage Privacy Choices

Apache, Apache Cassandra, Cassandra, Apache Tomcat, Tomcat, Apache Lucene, Apache Solr, Apache Hadoop, Hadoop, Apache Pulsar, Pulsar, Apache Spark, Spark, Apache TinkerPop, TinkerPop, Apache Kafka and Kafka are either registered trademarks or trademarks of the Apache Software Foundation or its subsidiaries in Canada, the United States and/or other countries. Kubernetes is the registered trademark of the Linux Foundation.

General Inquiries: Contact IBM