Change Data Capture with DSE
Capture schema changes in your DSE tables and pass them to Apache Pulsar® with DSE Change Data Capture (CDC). This doc will guide you through installing, configuring, and using CDC with DSE in a VM-based deployment.
This installation requires the following. Latest version artifacts are available here. Use for the latest version.
-
DSE installation
-
DSE 6.9 CDC Agent - use
agent-c4-<version>-all.jar
Prerequisites
-
Running DSE cluster
Installing and configuring
-
Download the Pulsar tarball and set up a Pulsar cluster. This example uses Pulsar standalone mode, but you can also use our helpful Ansible Scripts.
cd $PULSAR_HOME bin/pulsar standalone
We recommend using the latest CDC agent version (at least version
1.0.4
+) to support C* collection data types. -
Configure the
Cassandra-env.sh
file as follows:export CDC_PULSAR_SERVICE_URL="<pulsar_broker_service_url>" (e.g. pulsar://<pulsar_server_ip>:6650) # needed when Pulsar JWT authentication is enabled export CDC_PULSAR_AUTH_PLUGIN_CLASS_NAME="org.apache.pulsar.client.impl.auth.AuthenticationToken" export CDC_PULSAR_AUTH_PARAMS="file://</path/to/token/file>" # needed when Pulsar TLS encryption is enabled export CDC_TLS_TRUST_CERTS_FILE_PATH="</path/to/trusted/cert/file>" # DSE CDC JVM_OPTS="$JVM_OPTS -javaagent:/home/automaton/cdc104/agent-dse4-<version>-all.jar"
-
Restart DSE to apply the changes.
For CDC agent versions after 1.0.3, the CDC agent Pulsar connection parameters are provided as system environment variables (see DSE CDC in the example above).
For CDC agent versions before 1.0.3, the CDC agent Pulsar connection parameters are also provided as extra JVM options, as below:
export JVM_EXTRA_OPTS="-javaagent:/path/to/agent-c4-<version>-all.jar=pulsarServiceUrl=pulsar://pulsar:6650"
-
Set the
cassandra.yaml
configuration:cdc_enabled:true cdc_raw_directory:/var/lib/cassandra/cdc_raw commitlog_sync_period_in_ms:2000 cdc_total_space_in_mb:4096
-
Restart DSE and verify your logs are similar to this example
system.log
file below.CDC agent started
indicates your CDC agent has started properly.INFO [main] 2022-04-11 18:47:06,302 Agent.java:60 - Starting CDC agent, cdc_raw_directory=/var/lib/cassandra/cdc_raw INFO [main] 2022-04-11 18:47:06,331 AgentConfig.java:526 - maxInflightMessagesPerTask=16384, sslEnabledProtocols=TLSv1.2,TLSv1.1,TLSv1, cdcWorkingDir=/usr/share/dse/data/cdc, pulsarMaxPendingMessagesAcrossPartitions=50000, pulsarMaxPendingMessages=1000, sslTruststorePath=null, cdcPollIntervalMs=60000, pulsarAuthParams=null, sslHostnameVerificationEnable=false, errorCommitLogReprocessEnabled=false, sslTruststorePassword=null, tlsTrustCertsFilePath=null, sslKeystorePath=null, sslKeystorePassword=null, sslAllowInsecureConnection=false, cdcConcurrentProcessors=-1, pulsarServiceUrl=pulsar://10.101.32.213:6650, pulsarKeyBasedBatcher=false, sslTruststoreType=JKS, pulsarBatchDelayInMs=-1, topicPrefix=events-, sslCipherSuites=null, pulsarAuthPluginClassName=null, sslProvider=null, useKeyStoreTls=false INFO [main] 2022-04-11 18:47:06,433 Agent.java:92 - CDC agent started
Deploy Pulsar Connector
-
Deploy the Pulsar Cassandra Source Connector (CSC) for each CDC-enabled table. In the below example, we’re configuring the source connector with parameters passed in
--source-config
.The
outputFormat
source configuration controls the format of messages on the data topic.-
key-value-avro
- Default behavior. Key and value are encoded separately in AVRO format. -
key-value-json
- Key and value are encoded separately in JSON format. -
json
- Key and value are encoded together in a single JSON object. Key field is populated with a JSON string representing key fields.-
Key-value-avro
-
Key-value-json
-
JSON
$ pulsar-admin source create \ --name <csc_connector_name> \ --archive /path/to/pulsar-cassandra-source-<version>.nar \ --tenant public \ --namespace default \ --destination-topic-name persistent://public/default/data-<keyspace>.<table> \ --parallelism 1 \ --source-config { "events.topic": "persistent://public/default/events-<keyspace>.<table>", "keyspace": "<C*_keyspace>", "table": "<C*_table>", "contactPoints": "<C*_contact_point_list>", "port": "9042", "loadBalancing.localDc": "<C*_DC_name>", "outputFormat": "key-value-avro" }
$ pulsar-admin source create \ --name <csc_connector_name> \ --archive /path/to/pulsar-cassandra-source-<version>.nar \ --tenant public \ --namespace default \ --destination-topic-name persistent://public/default/data-<keyspace>.<table> \ --parallelism 1 \ --source-config { "events.topic": "persistent://public/default/events-<keyspace>.<table>", "keyspace": "<C*_keyspace>", "table": "<C*_table>", "contactPoints": "<C*_contact_point_list>", "port": "9042", "loadBalancing.localDc": "<C*_DC_name>", "outputFormat": "key-value-json" }
$ pulsar-admin source create \ --name <csc_connector_name> \ --archive /path/to/pulsar-cassandra-source-<version>.nar \ --tenant public \ --namespace default \ --destination-topic-name persistent://public/default/data-<keyspace>.<table> \ --parallelism 1 \ --source-config { "events.topic": "persistent://public/default/events-<keyspace>.<table>", "keyspace": "<C*_keyspace>", "table": "<C*_table>", "contactPoints": "<C*_contact_point_list>", "port": "9042", "loadBalancing.localDc": "<C*_DC_name>", "outputFormat": "json" }
-
-
-
Verify the CSC connector is deployed.
pulsar-admin source list ["<csc_connector_name>"]
-
Check the CSC connector log file at
<$PULSAR_HOME>/logs/functions/public/default/<csc_connector_name>
for errors.
Verify end-to-end operation
Now that Pulsar, DSE, CDC, and the CSC connector are installed and verified to be operational, we can monitor the Pulsar data topic for the CDC-enabled C* table.
Any captured CDC events from the C* table will be reflected in the command line output of the following command:
pulsar-client consume -s mysub -st auto_consume -n 0 persistent://public/default/data-<keyspace>.<table>