Change Data Capture with DataStax Enterprise
You can capture schema changes in your tables and pass them to Apache Pulsar® with DataStax Change Data Capture (CDC). This guide explains how to install, configure, and use CDC with Apache Cassandra® or DataStax Enterprise (DSE) in a VM-based deployment.
Prerequisites
This installation requires the following:
-
DSE 6.8.16 or later
-
OSS Apache Cassandra®
-
DataStax Change Agent for Apache Cassandra®
-
DSE - use
agent-dse4-<version>-all.jar -
OSS Cassandra - use
agent-c4-<version>-all.jar
-
-
Pulsar
-
IBM Elite Support for Apache Pulsar - use
agent-dse4-<version>-all.jar
-
-
Pulsar Cassandra source connector (DataStax Cassandra Source Connector for Apache Pulsar™)
-
Pulsar Cassandra Source NAR - use
pulsar-cassandra-source-<version>.nar
-
The latest versions of the Change Agent for Cassandra and CSC for Pulsar are available from the DataStax CDC for Cassandra repository.
Architecture overview
The CDC pipeline consists of three stages that work together to capture and process changes and publishes change events to the events topic in Pulsar.
-
The Change Agent for Cassandra running in Pulsar creates events topics using the format:
events-<keyspace>.<table>. When configuring the source connector, theevents.topicparameter must match exactly what the Change Agent for Cassandra publishes to. For example, if your keyspace isks1and table istable1, theevents.topicparameter ispersistent://public/default/events-ks1.table1. -
The Cassandra Source Connector consumes change events from the events topic (
events-<keyspace>.<table>), fetches the full row data from Cassandra for each change event, performs de-duplication to handle events from multiple replicas, and publishes processed data to the data topic asdata-<keyspace>.<table>. -
Downstream consumers consume from the data topic for further processing. Consumers include sink connectors, analytics systems, or other applications.
Installing and configuring
-
Download the Pulsar tarball and set up a Pulsar cluster. This example uses Pulsar standalone mode, but you can also use our helpful Ansible Scripts.
cd $PULSAR_HOME bin/pulsar standaloneDataStax recommends using the Change Agent for Cassandra version 1.0.4 or later to support Cassandra collection data types.
-
Install Cassandra/DSE.
-
After installing Cassandra/DSE, but before starting the Cassandra/DSE service, set the
Cassandra-env.shconfiguration:export CDC_PULSAR_SERVICE_URL="<pulsar_broker_service_url>" (e.g. pulsar://<pulsar_server_ip>:6650) # needed when Pulsar JWT authentication is enabled export CDC_PULSAR_AUTH_PLUGIN_CLASS_NAME="org.apache.pulsar.client.impl.auth.AuthenticationToken" export CDC_PULSAR_AUTH_PARAMS="file://</path/to/token/file>" # needed when Pulsar TLS encryption is enabled export CDC_TLS_TRUST_CERTS_FILE_PATH=”</path/to/trusted/cert/file>” # DSE CDC JVM_OPTS="$JVM_OPTS -javaagent:/home/automaton/cdc104/agent-dse4-<version>-all.jar"For Change Agent for Cassandra versions after 1.0.3, the Change Agent for Cassandra Pulsar connection parameters are provided as system environment variables (see *DSE CDCassandra in the example above).
For Change Agent for Cassandra versions before 1.0.3, the Change Agent for Cassandra Pulsar connection parameters are also provided as extra JVM options, as below:
export JVM_EXTRA_OPTS="-javaagent:/path/to/agent-c4-<version>-all.jar=pulsarServiceUrl=pulsar://pulsar:6650" -
Set the
cassandra.yamlconfiguration:cdc_enabled:true cdc_raw_directory:/var/lib/cassandra/cdc_raw commitlog_sync_period_in_ms:2000 cdc_total_space_in_mb:4096 -
Start Cassandra/DSE and verify your logs are similar to the Cassandra
system.logfile below.Change Agent for Cassandra startedindicates your Change Agent for Cassandra has started properly.INFO [main] 2022-04-11 18:47:06,302 Agent.java:60 - Starting CDC agent, cdc_raw_directory=/var/lib/cassandra/cdc_raw INFO [main] 2022-04-11 18:47:06,331 AgentConfig.java:526 - maxInflightMessagesPerTask=16384, sslEnabledProtocols=TLSv1.2,TLSv1.1,TLSv1, cdcWorkingDir=/usr/share/dse/data/cdc, pulsarMaxPendingMessagesAcrossPartitions=50000, pulsarMaxPendingMessages=1000, sslTruststorePath=null, cdcPollIntervalMs=60000, pulsarAuthParams=null, sslHostnameVerificationEnable=false, errorCommitLogReprocessEnabled=false, sslTruststorePassword=null, tlsTrustCertsFilePath=null, sslKeystorePath=null, sslKeystorePassword=null, sslAllowInsecureConnection=false, cdcConcurrentProcessors=-1, pulsarServiceUrl=pulsar://10.101.32.213:6650, pulsarKeyBasedBatcher=false, sslTruststoreType=JKS, pulsarBatchDelayInMs=-1, topicPrefix=events-, sslCipherSuites=null, pulsarAuthPluginClassName=null, sslProvider=null, useKeyStoreTls=false INFO [main] 2022-04-11 18:47:06,433 Agent.java:92 - CDC agent started
Deploy Pulsar Connector
-
Deploy the Pulsar Cassandra Source Connector (CSC) for each CDC-enabled Cassandra table. The connector consumes from the events topic (where the Change Agent for Cassandra publishes) and writes to the data topic (where downstream consumers read from).
The
events.topicparameter in the source connector configuration must match exactly the topic name that the Change Agent for Cassandra publishes to.The agent creates topics using the format `events-**KEYSPACE**.**TABLE**` when using the default `events-` prefix. For example, for keyspace `ks1` and table `table1`, the events topic is `events-ks1.table1`.
The
outputFormatsource configuration controls the format of messages on the data topic.-
key-value-avro- Default behavior. Key and value are encoded separately in AVRO format. -
key-value-json- Key and value are encoded separately in JSON format. -
json- Key and value are encoded together in a single JSON object. Key field is populated with a JSON string representing key fields.-
Key-value-avro
-
Key-value-json
-
JSON
$ pulsar-admin source create \ --name <csc_connector_name> \ --archive /path/to/pulsar-cassandra-source-<version>.nar \ --tenant public \ --namespace default \ --destination-topic-name persistent://public/default/data-<keyspace>.<table> \ --parallelism 1 \ --source-config { "events.topic": "persistent://public/default/events-<keyspace>.<table>", "keyspace": "<Cassandra_keyspace>", "table": "<Cassandra_table>", "contactPoints": "<Cassandra_contact_point_list>", "port": "9042", "loadBalancing.localDc": "<Cassandra_DC_name>", "outputFormat": "key-value-avro" }$ pulsar-admin source create \ --name <csc_connector_name> \ --archive /path/to/pulsar-cassandra-source-<version>.nar \ --tenant public \ --namespace default \ --destination-topic-name persistent://public/default/data-<keyspace>.<table> \ --parallelism 1 \ --source-config { "events.topic": "persistent://public/default/events-<keyspace>.<table>", "keyspace": "<Cassandra_keyspace>", "table": "<Cassandra_table>", "contactPoints": "<Cassandra_contact_point_list>", "port": "9042", "loadBalancing.localDc": "<Cassandra_DC_name>", "outputFormat": "key-value-json" }$ pulsar-admin source create \ --name <csc_connector_name> \ --archive /path/to/pulsar-cassandra-source-<version>.nar \ --tenant public \ --namespace default \ --destination-topic-name persistent://public/default/data-<keyspace>.<table> \ --parallelism 1 \ --source-config { "events.topic": "persistent://public/default/events-<keyspace>.<table>", "keyspace": "<Cassandra_keyspace>", "table": "<Cassandra_table>", "contactPoints": "<Cassandra_contact_point_list>", "port": "9042", "loadBalancing.localDc": "<Cassandra_DC_name>", "outputFormat": "json" } -
-
-
Verify the CSC connector is deployed.
pulsar-admin source list ["<csc_connector_name>"] -
Check the CSC connector log file at
<$PULSAR_HOME>/logs/functions/public/default/<csc_connector_name>for errors.
Verify end-to-end operation
Now that Pulsar, Cassandra/DSE, CDC, and the CSC connector are installed and verified to be operational, you can verify the end-to-end message flow.
-
To verify that the Change Agent for Cassandra in Pulsar is publishing to the events topic, confirm that the events topic exists and has messages:
pulsar-admin topics stats persistent://public/default/events-<keyspace>.<table>Optionally, view raw CDC events from the events topic:
pulsar-client consume -s test-sub-events -st Earliest -n 10 persistent://public/default/events-<keyspace>.<table> -
To verify that the Cassandra Source Connector is processing events and publishing to the data topic, check the connector and data topic status:
# Check connector status pulsar-admin source status --name <csc_connector_name> # Check data topic stats pulsar-admin topics stats persistent://public/default/data-<keyspace>.<table> -
Consume processed data from the data topic:
pulsar-client consume -s mysub -st auto_consume -n 0 persistent://public/default/data-<keyspace>.<table>Any captured CDC events from your database table should be reflected in the command line output of the data topic consumer.