Installing CDC for Cassandra for VM deployment Download the DataStax Change Data Capture (CDC) Agent for Apache Cassandra® By downloading this DataStax product, you agree to the terms of the open-source Apache-2.0 license agreement. Perform the following steps: Download the change agent tar file from the DataStax downloads page. The following files are available in the tar file: Cassandra type JAR file Apache Cassandra 3.x agent-c3-<version>-all.jar Apache Cassandra 4.x agent-c4-<version>-all.jar DSE 6.8.16+ agent-dse4-<version>-all.jar Extract the files from the tar with the following command: tar xvf cassandra-source-agents-<version>.tar Use the file that matches your Cassandra type and streaming platform. Start Cassandra with the Change Agent for Cassandra All data nodes of your Cassandra or DSE datacenter must run the change agent as a JVM agent to send mutations into the events topic of your streaming software. Start your Cassandra or DSE nodes with the appropriate producer binary matching your Cassandra (3.11 or 4.0) or DSE (6.8.16) version and your streaming platform (Luna Streaming 2.8+ or Apache Pulsar 2.8.1+). In CDC agent versions before 1.0.3, the CDC agent Pulsar connection parameters were provided as extra JVM options after the jar file name in the form of a comma-separated list of paramName=paramValue, as below: export JVM_EXTRA_OPTS="-javaagent:/path/to/agent-c4-<version>-all.jar=pulsarServiceUrl=pulsar://pulsar:6650" In CDC agent versions after 1.0.3, the CDC agent Pulsar connection parameters are also provided as system environment parameters in cassandra-env.sh. The same JVM option above is now appended to cassandra-env.sh as below: export CDC_PULSAR_SERVICE_URL="pulsar://<pulsar_server_ip>:6650" And to enable CDC for your C* deployment: JVM_OPTS="$JVM_OPTS -javaagent:/home/automaton/cdc104/agent-dse4-pulsar-1.0.5-all.jar" The CDC parameter mappings between JVM and C* environment variables are provided in CDC Environment Parameter Strings. For the full set of JVM configuration options, see Table Change Agent Parameters. Cassandra.yaml In addition to configuring the change agent, enable CDC on the Cassandra node by setting cdc_enabled to true in the cassandra.yaml file. For Cassandra 4.X and DSE 6.8.16+, you can configure the commit log sync period. This parameter controls how often the commit logs are made available for CDC processing. The default is 10 seconds. To reduce the latency from when the table is updated to when the event is available in the data topic, decrease the period, such as 2 seconds. The total amount of CDC data stored is controlled by the cdc_total_space_in_mb parameter. If the amount of unprocessed CDC reaches this threshold, writes to CDC enabled tables will be rejected. Make sure you have the change agent installed on the node when CDC is enabled. Without the change agent to process the CDC data, the space used will grow until it hits this limit and then writes to CDC-enabled tables will stop. Here is an example set of configurations for the cassandra.yaml file: cdc_enabled: true commitlog_sync_period_in_ms: 2000 cdc_total_space_in_mb: 50000 Change Agent Parameters Table 1. Table Change Agent Parameters Name Description Type Default topicPrefix The event topic name prefix. The <keyspace_name>.<table_name> is appended to that prefix to build the topic name. string events- cdcWorkingDir The CDC working directory where the last sent offset is saved, and where the archived and errored commitlogs files are copied. string cdc cdcPollIntervalMs The poll interval in milliseconds for watching new commitlog files in the CDC raw directory. long 60000 errorCommitLogReprocessEnabled Enable the re-processing of error commitlogs files. boolean false cdcConcurrentProcessors The number of threads used to process commitlog files. The default value is the memtable_flush_writers. integer -1 maxInflightMessagesPerTask The maximum number of in-flight messages per commitlog processing task. integer 16384 pulsarServiceUrl The Pulsar broker service URL. string pulsar://localhost:6650 pulsarBatchDelayInMs Pulsar batching delay in milliseconds. Pulsar batching is enabled when this value is greater than zero. long -1 pulsarKeyBasedBatcher When true, use the Pulsar KEY_BASED BatchBuilder. boolean false pulsarMaxPendingMessages The Pulsar maximum size of a queue holding pending messages. integer 1000 pulsarMaxPendingMessagesAcrossPartitions The Pulsar maximum number of pending messages across partitions. integer 50000 pulsarAuthPluginClassName The Pulsar authentication plugin class name. string pulsarAuthParams The Pulsar authentication parameters. string sslProvider The SSL/TLS provider to use. string sslTruststorePath The path to the SSL/TLS truststore file. string sslTruststorePassword The password for the SSL/TLS truststore. string sslTruststoreType The type of the SSL/TLS truststore. string JKS sslKeystorePath The path to the SSL/TLS keystore file. string sslKeystorePassword The password for the SSL/TLS keystore. string sslCipherSuites Defines one or more cipher suites to use for negotiating the SSL/TLS connection. string sslEnabledProtocols Enabled SSL/TLS protocols string TLSv1.2,TLSv1.1,TLSv1 sslAllowInsecureConnection Allows insecure connections to servers whose certificate has not been signed by an approved CA. You should always disable sslAllowInsecureConnection in production environments. boolean false sslHostnameVerificationEnable Enable the server hostname verification. boolean false Download CDC for Cassandra IMPORTANT By downloading this DataStax product, you agree to the terms of the open-source Apache-2.0 license agreement. Download the cassandra-source-connectors-<version>.tar file from the DataStax downloads page. The following files are available: Streaming platform NAR file Apache Pulsar 2.8 and Luna Streaming 2.8 pulsar-cassandra-source-<version>.nar Extract the files from the tar with the following command: tar xvf cassandra-source-connectors-<version>.tar Use the version that matches your streaming platform. Deploy CDC for Cassandra To deploy the CDC for Cassandra NAR file in your Pulsar cluster, upload it to Luna Streaming or Pulsar using the pulsar-admin sources create command. You need to deploy CDC for Cassandra for each CDC-enabled table. For each CDC-enabled table, the change agent will send events to the events topic. The topic name is determined by the topicPrefix setting in the agent (default is events-). The <keyspace_name>.<table_name> is appended to the prefix to build the topic name. You have to specify the following parameters: Connector name. You have one connector per CDC-enabled Cassandra table, make sure to use a unique name. Previously downloaded CDC for Cassandra NAR file. Pulsar tenant and namespace where the connector will run. Destination topic for Cassandra data (data topic). Number of instances (parallelism) of the connector. For high-volume tables, you might need to run multiple connector instances to prevent a growing backlog on the events topic. Name of the events topic the connector will read from. Keyspace and table associated with the events topics. Connection information (such as contact points and DC information) for Cassandra. Here is an example: pulsar-admin source create \ --name cassandra-source-1 \ --archive /path/to/pulsar-cassandra-source-<version>.nar \ --tenant public \ --namespace default \ --destination-topic-name public/default/data-ks1.table1 \ --parallelism 1 \ --source-config '{ "events.topic": "persistent://public/default/events-cdc-ks1.table1", "keyspace": "ks1", "table": "table1", "contactPoints": "localhost", "port": 9042, "loadBalancing.localDc": "DC1", "auth.provider": "PLAIN", "auth.username": "<username>", "auth.password": "<password>" }' Then check your connector is in the running state with no errors: pulsar-admin source status --name cassandra-source-1 Once the connector is running, it will process events from the events topic and publish the result to the data topic. For the full set of source configuration options, see CDC for Cassandra settings. For the full set of Cassandra authentication options, see Cassandra Authentication settings. For the full set of Cassandra SSL settings, see Cassandra SSL/TLS settings. For advanced configuration of the Cassandra driver in the CDC for Cassandra, see Pass CDC for Cassandra settings directly to the DataStax Java driver. Enabling and disabling CDC on a table Once the change agent is installed and the connector is running, you can enable or disable CDC on table using the following commands: CREATE TABLE foo (a int, b text, PRIMARY KEY(a)) WITH cdc=true; ALTER TABLE foo WITH cdc=true; ALTER TABLE foo WITH cdc=false; When CDC is enabled on a table, updates to that table are sent by the change agent to the CDC for Cassandra which further processes the event and then sends it to the data topic when it can be processed by other connectors (for example, Elasticsearch). DataStax Cassandra Source Connector for Apache Pulsar™ settings Table 2. Table DataStax Cassandra Source Connector for Apache Pulsar™ settings Name Description Type Validator Default events.topic The topic name to listen cassandra mutation events to string keyspace Cassandra keyspace name string table Cassandra table name string cloud.secureConnectBundle The location of the cloud secure bundle used to connect to Datastax Astra DB. string "" compression Compression algorithm to use when issuing requests to the database server string (case insensitive) [LZ4, NONE, SNAPPY] None connectionPoolLocalSize Number of connections that driver maintains within a connection pool to each node in local dc int [1,…] 4 contactPoints Initial contact points list "" ignoreErrors Specifies which errors the connector should ignore when processing the record. Valid values are: None (never ignore errors), All (ignore all errors), Driver (ignore driver errors only, i.e. errors when writing to the database). string None jmx Whether to enable JMX reporting boolean true key.converter Converter class used to write the message key to the data topic. This setting is experimental for advanced user only. class null loadBalancing.localDc The datacenter name (commonly dc1, dc2, etc.) local to the machine on which the connector is running string "" maxConcurrentRequests The maximum number of requests to send at once int [1,…] 500 metricsHighestLatency This is used to scale internal data structures for gathering metrics. It should be higher than queryExecutionTimeout. This parameter should be expressed in seconds. int [1,…] 35 port Port to connect to nodes int [1,…] 9042 queryExecutionTimeout CQL statement execution timeout, in seconds int [1,…] 30 value.converter Converter class used to write the message value to the data topic. This setting is experimental for advanced user only. class null batch.size The batch size for grouping mutations before sending them to the data topic int 200 query.backoffInMs Retry backoff in milliseconds when there is not enough Cassandra replicas to perform the query. (Capped exponential jittered backoff) long 100 query.executors The initial and maximum number of threads to execute concurrent Cassandra queries int 10 query.maxBackoffInSec Maximum backoff delay in seconds when there is not enough Cassandra replicas to perform the query long 3600 query.maxMobileAvgLatency Maximum mobile average CQL query latency beyond which the number of executors is decreased long 100 query.minMobileAvgLatency Minimum mobile average CQL query latency beyond which the number of executors is increased long 10 columns Regular expression of the Cassandra replicated column names string .* jmxConnectorDomain Domain for JMX reporting string com.datastax.oss.cdc events.subscription.name The pulsar events topic subscription name, with a default set to 'sub' string sub events.subscription.type The pulsar events topic subscription type, with a default set to Key_Shared (case sensitive) for a non-partitioned events topic. If your events topic is partitioned, you should set subscription type to Failover string [Exclusive, Shared, Failover, Key_Shared] Key_Shared cache.max.digest The maximum number of digest per mutation cache entry, with a default set to 3 long [1,…] 3 cache.max.capacity The maximum capacity of the mutation cache, with a default size of 32767 long [1,…] 32767 cache.expire.after.ms The mutation cache entry duration in milliseconds, with a default value of 60 seconds. long [1000,…] 60000 cache.only_if_coordinator_match Cache the mutation digest only if the coordinator node is the originator node. boolean true Cassandra Authentication settings Table 3. Table Cassandra Authentication settings Name Description Type Validator Default auth.gssapi.keyTab Kerberos keytab file for GSSAPI provider authentication string "" auth.gssapi.principal Kerberos principal for GSSAPI provider authentication string "" auth.gssapi.service SASL service name to use for GSSAPI provider authentication string dse auth.password Password for PLAIN (username/password) provider authentication password [hidden] auth.provider Authentication provider string [None, PLAIN, GSSAPI] None auth.username Username for PLAIN (username/password) provider authentication string "" Cassandra SSL/TLS settings Table 4. Table Cassandra SSL/TLS settings Name Description Type Validator Default ssl.cipherSuites The cipher suites to enable list "" ssl.hostnameValidation Whether or not to validate node hostnames when using SSL boolean true ssl.keystore.password Keystore password password [hidden] ssl.keystore.path The path to the keystore file string "" ssl.openssl.keyCertChain The path to the certificate chain file string "" ssl.openssl.privateKey The path to the private key file string "" ssl.provider SSL/TLS provider string [None, JDK, OpenSSL] None ssl.truststore.password Truststore password password [hidden] ssl.truststore.path The path to the truststore file string "" Pass CDC for Cassandra settings directly to the DataStax Java driver In your CDC for Cassandra configuration file, you can directly pass settings to the DataStax Java driver by using the datastax-java-driver prefix. For example: datastax-java-driver.basic.request.consistency=ALL Mapping CDC for Cassandra settings to Java driver properties The following table identifies functionally equivalent CDC for Cassandra and DataStax Java driver settings. If you define both in your configuration, the CDC for Cassandra setting take precedence over the datastax-java-driver.property-name. If you do not provide either in your configuration, CDC for Cassandra defaults are in effect. For information about the Java properties, refer to the DataStax Java driver documentation. DataStax Cassandra Source Connector for Apache Pulsar™ Using datastax-java-driver prefix contactPoints datastax-java-driver.basic.contact-points loadBalancing.localDc datastax-java-driver.basic.load-balancing-policy.local-datacenter cloud.secureConnectBundle datastax-java-driver.basic.cloud.secure-connect-bundle queryExecutionTimeout datastax-java-driver.basic.request.timeout connectionPoolLocalSize datastax-java-driver.advanced.connection.pool.local.size compression datastax-java-driver.advanced.protocol.compression metricsHighestLatency datastax-java-driver.advanced.metrics.session.cql-requests.highest-latency There is a difference between the CDC for Cassandra’s contactPoints setting and the Java driver’s datastax-java-driver.basic.contact-points. For CDC for Cassandra’s contactPoints, the value of the port is appended to every host provided by this setting. For datastax-java-driver.basic.contact-points, you must provide the fully qualified contact points (host:port). By passing in the Java driver’s setting, this option gives you more configuration flexibility because you can specify a different port for each host. For example: datastax-java-driver.basic.contact-points = 127.0.0.1:9042, 127.0.0.2:9042 Java driver reference For more information, refer to the Java driver reference configuration topic. Scaling up your configuration If your connector is not keeping up and the messages in the events topic are growing, increase the number of connector instances using the parallelism parameter. Pulsar ensures in-order processing using Key_Shared subscriptions. If the volume of data in the events topic is very high, partition the events topic to distribute the load across multiple Pulsar brokers. Do this before starting the change agent, since by default Pulsar will auto-create non-partitioned topics. If you are using partitioned topics, change events.subscription.type to Failover to ensure in-order delivery when running multiple connector instances. To further improve the throughput, you can adjust the pulsarBatchDelayInMs in the change agent to batch messages in the change agent before sending them to Pulsar. To improve performance on individual connector instances as they read data from Cassandra, you can adjust the batch.size and the query.executors. Increasing these values from their defaults will increase parallelism within the connector instances. The de-duplication cache is configurable, including the cache size with cache.max.capacity, the entry retention duration cache.expire.after.ms and the number of MD5 digest per primary key entry with cache.max.digest.