Installing CDC for Cassandra for VM deployment

Download the DataStax Change Data Capture (CDC) Agent for Apache Cassandra®

By downloading this DataStax product, you agree to the terms of the open-source Apache-2.0 license agreement.

Perform the following steps:

  1. Download the change agent tar file from the DataStax downloads page.
    The following files are available in the tar file:

    Cassandra type JAR file

    Apache Cassandra 3.x

    agent-c3-<version>-all.jar

    Apache Cassandra 4.x

    agent-c4-<version>-all.jar

    DSE 6.8.16+

    agent-dse4-<version>-all.jar

  2. Extract the files from the tar with the following command:

    tar xvf cassandra-source-agents-<version>.tar
  3. Use the file that matches your Cassandra type and streaming platform.

Start Cassandra with the Change Agent for Cassandra

All data nodes of your Cassandra or DSE datacenter must run the change agent as a JVM agent to send mutations into the events topic of your streaming software. Start your Cassandra or DSE nodes with the appropriate producer binary matching your Cassandra (3.11 or 4.0) or DSE (6.8.16) version and your streaming platform (Luna Streaming 2.8+ or Apache Pulsar 2.8.1+).

In CDC agent versions before 1.0.3, the CDC agent Pulsar connection parameters were provided as extra JVM options after the jar file name in the form of a comma-separated list of paramName=paramValue, as below:

export JVM_EXTRA_OPTS="-javaagent:/path/to/agent-c4-<version>-all.jar=pulsarServiceUrl=pulsar://pulsar:6650"

In CDC agent versions after 1.0.3, the CDC agent Pulsar connection parameters are also provided as system environment parameters in cassandra-env.sh. The same JVM option above is now appended to cassandra-env.sh as below:

export CDC_PULSAR_SERVICE_URL="pulsar://<pulsar_server_ip>:6650"

And to enable CDC for your C* deployment:

JVM_OPTS="$JVM_OPTS -javaagent:/home/automaton/cdc104/agent-dse4-pulsar-1.0.5-all.jar"

The CDC parameter mappings between JVM and C* environment variables are provided in CDC Environment Parameter Strings.
For the full set of JVM configuration options, see Table Change Agent Parameters.

Cassandra.yaml

In addition to configuring the change agent, enable CDC on the Cassandra node by setting cdc_enabled to true in the cassandra.yaml file.

For Cassandra 4.X and DSE 6.8.16+, you can configure the commit log sync period. This parameter controls how often the commit logs are made available for CDC processing. The default is 10 seconds. To reduce the latency from when the table is updated to when the event is available in the data topic, decrease the period, such as 2 seconds.

The total amount of CDC data stored is controlled by the cdc_total_space_in_mb parameter. If the amount of unprocessed CDC reaches this threshold, writes to CDC enabled tables will be rejected. Make sure you have the change agent installed on the node when CDC is enabled. Without the change agent to process the CDC data, the space used will grow until it hits this limit and then writes to CDC-enabled tables will stop.

Here is an example set of configurations for the cassandra.yaml file:

cdc_enabled: true
commitlog_sync_period_in_ms: 2000
cdc_total_space_in_mb: 50000

Change Agent Parameters

Table 1. Table Change Agent Parameters

Name

Description

Type

Default

topicPrefix

The event topic name prefix. The <keyspace_name>.<table_name> is appended to that prefix to build the topic name.

string

events-

cdcWorkingDir

The CDC working directory where the last sent offset is saved, and where the archived and errored commitlogs files are copied.

string

cdc

cdcPollIntervalMs

The poll interval in milliseconds for watching new commitlog files in the CDC raw directory.

long

60000

errorCommitLogReprocessEnabled

Enable the re-processing of error commitlogs files.

boolean

false

cdcConcurrentProcessors

The number of threads used to process commitlog files. The default value is the memtable_flush_writers.

integer

-1

maxInflightMessagesPerTask

The maximum number of in-flight messages per commitlog processing task.

integer

16384

pulsarServiceUrl

The Pulsar broker service URL.

string

pulsar://localhost:6650

pulsarBatchDelayInMs

Pulsar batching delay in milliseconds. Pulsar batching is enabled when this value is greater than zero.

long

-1

pulsarKeyBasedBatcher

When true, use the Pulsar KEY_BASED BatchBuilder.

boolean

false

pulsarMaxPendingMessages

The Pulsar maximum size of a queue holding pending messages.

integer

1000

pulsarMaxPendingMessagesAcrossPartitions

The Pulsar maximum number of pending messages across partitions.

integer

50000

pulsarAuthPluginClassName

The Pulsar authentication plugin class name.

string

pulsarAuthParams

The Pulsar authentication parameters.

string

sslProvider

The SSL/TLS provider to use.

string

sslTruststorePath

The path to the SSL/TLS truststore file.

string

sslTruststorePassword

The password for the SSL/TLS truststore.

string

sslTruststoreType

The type of the SSL/TLS truststore.

string

JKS

sslKeystorePath

The path to the SSL/TLS keystore file.

string

sslKeystorePassword

The password for the SSL/TLS keystore.

string

sslCipherSuites

Defines one or more cipher suites to use for negotiating the SSL/TLS connection.

string

sslEnabledProtocols

Enabled SSL/TLS protocols

string

TLSv1.2,TLSv1.1,TLSv1

sslAllowInsecureConnection

Allows insecure connections to servers whose certificate has not been signed by an approved CA. You should always disable sslAllowInsecureConnection in production environments.

boolean

false

sslHostnameVerificationEnable

Enable the server hostname verification.

boolean

false

Download CDC for Cassandra

IMPORTANT

By downloading this DataStax product, you agree to the terms of the open-source Apache-2.0 license agreement.

  1. Download the cassandra-source-connectors-<version>.tar file from the DataStax downloads page.

The following files are available:

Streaming platform

NAR file

Apache Pulsar 2.8 and Luna Streaming 2.8

pulsar-cassandra-source-<version>.nar

Extract the files from the tar with the following command:

tar xvf cassandra-source-connectors-<version>.tar

Use the version that matches your streaming platform.

Deploy CDC for Cassandra

To deploy the CDC for Cassandra NAR file in your Pulsar cluster, upload it to Luna Streaming or Pulsar using the pulsar-admin sources create command. You need to deploy CDC for Cassandra for each CDC-enabled table.

For each CDC-enabled table, the change agent will send events to the events topic. The topic name is determined by the topicPrefix setting in the agent (default is events-). The <keyspace_name>.<table_name> is appended to the prefix to build the topic name.

You have to specify the following parameters:

  • Connector name. You have one connector per CDC-enabled Cassandra table, make sure to use a unique name.

  • Previously downloaded CDC for Cassandra NAR file.

  • Pulsar tenant and namespace where the connector will run.

  • Destination topic for Cassandra data (data topic).

  • Number of instances (parallelism) of the connector. For high-volume tables, you might need to run multiple connector instances to prevent a growing backlog on the events topic.

  • Name of the events topic the connector will read from.

  • Keyspace and table associated with the events topics.

  • Connection information (such as contact points and DC information) for Cassandra.

Here is an example:

pulsar-admin source create \
--name cassandra-source-1 \
--archive /path/to/pulsar-cassandra-source-<version>.nar \
--tenant public \
--namespace default \
--destination-topic-name public/default/data-ks1.table1 \
--parallelism 1 \
--source-config '{
    "events.topic": "persistent://public/default/events-cdc-ks1.table1",
    "keyspace": "ks1",
    "table": "table1",
    "contactPoints": "localhost",
    "port": 9042,
    "loadBalancing.localDc": "DC1",
    "auth.provider": "PLAIN",
    "auth.username": "<username>",
    "auth.password": "<password>"
}'

Then check your connector is in the running state with no errors:

pulsar-admin source status --name cassandra-source-1

Once the connector is running, it will process events from the events topic and publish the result to the data topic.

For the full set of source configuration options, see CDC for Cassandra settings. For the full set of Cassandra authentication options, see Cassandra Authentication settings. For the full set of Cassandra SSL settings, see Cassandra SSL/TLS settings. For advanced configuration of the Cassandra driver in the CDC for Cassandra, see Pass CDC for Cassandra settings directly to the DataStax Java driver.

Enabling and disabling CDC on a table

Once the change agent is installed and the connector is running, you can enable or disable CDC on table using the following commands:

CREATE TABLE foo (a int, b text, PRIMARY KEY(a)) WITH cdc=true;
ALTER TABLE foo WITH cdc=true;
ALTER TABLE foo WITH cdc=false;

When CDC is enabled on a table, updates to that table are sent by the change agent to the CDC for Cassandra which further processes the event and then sends it to the data topic when it can be processed by other connectors (for example, Elasticsearch).

DataStax Cassandra Source Connector for Apache Pulsar™ settings

Table 2. Table DataStax Cassandra Source Connector for Apache Pulsar™ settings
Name Description Type Validator Default

events.topic

The topic name to listen cassandra mutation events to

string

keyspace

Cassandra keyspace name

string

table

Cassandra table name

string

cloud.secureConnectBundle

The location of the cloud secure bundle used to connect to Datastax Astra DB.

string

""

compression

Compression algorithm to use when issuing requests to the database server

string

(case insensitive) [LZ4, NONE, SNAPPY]

None

connectionPoolLocalSize

Number of connections that driver maintains within a connection pool to each node in local dc

int

[1,…​]

4

contactPoints

Initial contact points

list

""

ignoreErrors

Specifies which errors the connector should ignore when processing the record. Valid values are: None (never ignore errors), All (ignore all errors), Driver (ignore driver errors only, i.e. errors when writing to the database).

string

None

jmx

Whether to enable JMX reporting

boolean

true

key.converter

Converter class used to write the message key to the data topic. This setting is experimental for advanced user only.

class

null

loadBalancing.localDc

The datacenter name (commonly dc1, dc2, etc.) local to the machine on which the connector is running

string

""

maxConcurrentRequests

The maximum number of requests to send at once

int

[1,…​]

500

metricsHighestLatency

This is used to scale internal data structures for gathering metrics. It should be higher than queryExecutionTimeout. This parameter should be expressed in seconds.

int

[1,…​]

35

port

Port to connect to nodes

int

[1,…​]

9042

queryExecutionTimeout

CQL statement execution timeout, in seconds

int

[1,…​]

30

value.converter

Converter class used to write the message value to the data topic. This setting is experimental for advanced user only.

class

null

batch.size

The batch size for grouping mutations before sending them to the data topic

int

200

query.backoffInMs

Retry backoff in milliseconds when there is not enough Cassandra replicas to perform the query. (Capped exponential jittered backoff)

long

100

query.executors

The initial and maximum number of threads to execute concurrent Cassandra queries

int

10

query.maxBackoffInSec

Maximum backoff delay in seconds when there is not enough Cassandra replicas to perform the query

long

3600

query.maxMobileAvgLatency

Maximum mobile average CQL query latency beyond which the number of executors is decreased

long

100

query.minMobileAvgLatency

Minimum mobile average CQL query latency beyond which the number of executors is increased

long

10

columns

Regular expression of the Cassandra replicated column names

string

.*

jmxConnectorDomain

Domain for JMX reporting

string

com.datastax.oss.cdc

events.subscription.name

The pulsar events topic subscription name, with a default set to 'sub'

string

sub

events.subscription.type

The pulsar events topic subscription type, with a default set to Key_Shared (case sensitive) for a non-partitioned events topic. If your events topic is partitioned, you should set subscription type to Failover

string

[Exclusive, Shared, Failover, Key_Shared]

Key_Shared

cache.max.digest

The maximum number of digest per mutation cache entry, with a default set to 3

long

[1,…​]

3

cache.max.capacity

The maximum capacity of the mutation cache, with a default size of 32767

long

[1,…​]

32767

cache.expire.after.ms

The mutation cache entry duration in milliseconds, with a default value of 60 seconds.

long

[1000,…​]

60000

cache.only_if_coordinator_match

Cache the mutation digest only if the coordinator node is the originator node.

boolean

true

Cassandra Authentication settings

Table 3. Table Cassandra Authentication settings
Name Description Type Validator Default

auth.gssapi.keyTab

Kerberos keytab file for GSSAPI provider authentication

string

""

auth.gssapi.principal

Kerberos principal for GSSAPI provider authentication

string

""

auth.gssapi.service

SASL service name to use for GSSAPI provider authentication

string

dse

auth.password

Password for PLAIN (username/password) provider authentication

password

[hidden]

auth.provider

Authentication provider

string

[None, PLAIN, GSSAPI]

None

auth.username

Username for PLAIN (username/password) provider authentication

string

""

Cassandra SSL/TLS settings

Table 4. Table Cassandra SSL/TLS settings
Name Description Type Validator Default

ssl.cipherSuites

The cipher suites to enable

list

""

ssl.hostnameValidation

Whether or not to validate node hostnames when using SSL

boolean

true

ssl.keystore.password

Keystore password

password

[hidden]

ssl.keystore.path

The path to the keystore file

string

""

ssl.openssl.keyCertChain

The path to the certificate chain file

string

""

ssl.openssl.privateKey

The path to the private key file

string

""

ssl.provider

SSL/TLS provider

string

[None, JDK, OpenSSL]

None

ssl.truststore.password

Truststore password

password

[hidden]

ssl.truststore.path

The path to the truststore file

string

""

Pass CDC for Cassandra settings directly to the DataStax Java driver

In your CDC for Cassandra configuration file, you can directly pass settings to the DataStax Java driver by using the datastax-java-driver prefix. For example:

datastax-java-driver.basic.request.consistency=ALL

Mapping CDC for Cassandra settings to Java driver properties

The following table identifies functionally equivalent CDC for Cassandra and DataStax Java driver settings.

If you define both in your configuration, the CDC for Cassandra setting take precedence over the datastax-java-driver.property-name. If you do not provide either in your configuration, CDC for Cassandra defaults are in effect.

For information about the Java properties, refer to the DataStax Java driver documentation.

DataStax Cassandra Source Connector for Apache Pulsar™ Using datastax-java-driver prefix

contactPoints

datastax-java-driver.basic.contact-points

loadBalancing.localDc

datastax-java-driver.basic.load-balancing-policy.local-datacenter

cloud.secureConnectBundle

datastax-java-driver.basic.cloud.secure-connect-bundle

queryExecutionTimeout

datastax-java-driver.basic.request.timeout

connectionPoolLocalSize

datastax-java-driver.advanced.connection.pool.local.size

compression

datastax-java-driver.advanced.protocol.compression

metricsHighestLatency

datastax-java-driver.advanced.metrics.session.cql-requests.highest-latency

There is a difference between the CDC for Cassandra’s contactPoints setting and the Java driver’s datastax-java-driver.basic.contact-points. For CDC for Cassandra’s contactPoints, the value of the port is appended to every host provided by this setting. For datastax-java-driver.basic.contact-points, you must provide the fully qualified contact points (host:port).

By passing in the Java driver’s setting, this option gives you more configuration flexibility because you can specify a different port for each host. For example:

datastax-java-driver.basic.contact-points = 127.0.0.1:9042, 127.0.0.2:9042

Java driver reference

For more information, refer to the Java driver reference configuration topic.

Scaling up your configuration

If your connector is not keeping up and the messages in the events topic are growing, increase the number of connector instances using the parallelism parameter. Pulsar ensures in-order processing using Key_Shared subscriptions.

If the volume of data in the events topic is very high, partition the events topic to distribute the load across multiple Pulsar brokers. Do this before starting the change agent, since by default Pulsar will auto-create non-partitioned topics. If you are using partitioned topics, change events.subscription.type to Failover to ensure in-order delivery when running multiple connector instances.

To further improve the throughput, you can adjust the pulsarBatchDelayInMs in the change agent to batch messages in the change agent before sending them to Pulsar.

To improve performance on individual connector instances as they read data from Cassandra, you can adjust the batch.size and the query.executors. Increasing these values from their defaults will increase parallelism within the connector instances.

The de-duplication cache is configurable, including the cache size with cache.max.capacity, the entry retention duration cache.expire.after.ms and the number of MD5 digest per primary key entry with cache.max.digest.