Installing CDC for Cassandra for VM deployment
Download the DataStax Change Data Capture (CDC) Agent for Apache Cassandra®
By downloading this DataStax product, you agree to the terms of the open-source Apache-2.0 license agreement. |
Perform the following steps:
-
Download the change agent tar file from the DataStax downloads page.
The following files are available in the tar file:Cassandra type JAR file Apache Cassandra 3.x
agent-c3-<version>-all.jar
Apache Cassandra 4.x
agent-c4-<version>-all.jar
DSE 6.8.16+
agent-dse4-<version>-all.jar
-
Extract the files from the tar with the following command:
tar xvf cassandra-source-agents-<version>.tar
-
Use the file that matches your Cassandra type and streaming platform.
Start Cassandra with the Change Agent for Cassandra
All data nodes of your Cassandra or DSE datacenter must run the change agent as a JVM agent to send mutations into the events topic of your streaming software. Start your Cassandra or DSE nodes with the appropriate producer binary matching your Cassandra (3.11 or 4.0) or DSE (6.8.16) version and your streaming platform (Luna Streaming 2.8+ or Apache Pulsar 2.8.1+).
In CDC agent versions before 1.0.3, the CDC agent Pulsar connection parameters were provided as extra JVM options after the jar
file name in the form of a comma-separated list of paramName=paramValue
, as below:
export JVM_EXTRA_OPTS="-javaagent:/path/to/agent-c4-<version>-all.jar=pulsarServiceUrl=pulsar://pulsar:6650"
In CDC agent versions after 1.0.3, the CDC agent Pulsar connection parameters are also provided as system environment parameters in cassandra-env.sh
. The same JVM option above is now appended to cassandra-env.sh
as below:
export CDC_PULSAR_SERVICE_URL="pulsar://<pulsar_server_ip>:6650"
And to enable CDC for your C* deployment:
JVM_OPTS="$JVM_OPTS -javaagent:/home/automaton/cdc104/agent-dse4-pulsar-1.0.5-all.jar"
The CDC parameter mappings between JVM and C* environment variables are provided in CDC Environment Parameter Strings.
For the full set of JVM configuration options, see Change Agent Parameters.
Cassandra.yaml
In addition to configuring the change agent, enable CDC on the Cassandra node by setting cdc_enabled
to true
in the cassandra.yaml
file.
For Cassandra 4.X and DSE 6.8.16+, you can configure the commit log sync period. This parameter controls how often the commit logs are made available for CDC processing. The default is 10 seconds. To reduce the latency from when the table is updated to when the event is available in the data topic, decrease the period, such as 2 seconds.
The total amount of CDC data stored is controlled by the cdc_total_space_in_mb
parameter.
If the amount of unprocessed CDC reaches this threshold, writes to CDC enabled tables will be rejected.
Make sure you have the change agent installed on the node when CDC is enabled.
Without the change agent to process the CDC data, the space used will grow until it hits this limit and then writes to CDC-enabled tables will stop.
Here is an example set of configurations for the cassandra.yaml
file:
cdc_enabled: true
commitlog_sync_period_in_ms: 2000
cdc_total_space_in_mb: 50000
Change Agent Parameters
Name |
Description |
Type |
Default |
topicPrefix |
The event topic name prefix. The |
string |
events- |
cdcWorkingDir |
The CDC working directory where the last sent offset is saved, and where the archived and errored commitlogs files are copied. |
string |
cdc |
cdcPollIntervalMs |
The poll interval in milliseconds for watching new commitlog files in the CDC raw directory. |
long |
60000 |
errorCommitLogReprocessEnabled |
Enable the re-processing of error commitlogs files. |
boolean |
false |
cdcConcurrentProcessors |
The number of threads used to process commitlog files. The default value is the |
integer |
-1 |
maxInflightMessagesPerTask |
The maximum number of in-flight messages per commitlog processing task. |
integer |
16384 |
pulsarServiceUrl |
The Pulsar broker service URL. |
string |
pulsar://localhost:6650 |
pulsarBatchDelayInMs |
Pulsar batching delay in milliseconds. Pulsar batching is enabled when this value is greater than zero. |
long |
-1 |
pulsarKeyBasedBatcher |
When true, use the Pulsar KEY_BASED BatchBuilder. |
boolean |
false |
pulsarMaxPendingMessages |
The Pulsar maximum size of a queue holding pending messages. |
integer |
1000 |
pulsarMaxPendingMessagesAcrossPartitions |
The Pulsar maximum number of pending messages across partitions. |
integer |
50000 |
pulsarAuthPluginClassName |
The Pulsar authentication plugin class name. |
string |
|
pulsarAuthParams |
The Pulsar authentication parameters. |
string |
|
sslProvider |
The SSL/TLS provider to use. |
string |
|
sslTruststorePath |
The path to the SSL/TLS truststore file. |
string |
|
sslTruststorePassword |
The password for the SSL/TLS truststore. |
string |
|
sslTruststoreType |
The type of the SSL/TLS truststore. |
string |
JKS |
sslKeystorePath |
The path to the SSL/TLS keystore file. |
string |
|
sslKeystorePassword |
The password for the SSL/TLS keystore. |
string |
|
sslCipherSuites |
Defines one or more cipher suites to use for negotiating the SSL/TLS connection. |
string |
|
sslEnabledProtocols |
Enabled SSL/TLS protocols |
string |
TLSv1.2,TLSv1.1,TLSv1 |
sslAllowInsecureConnection |
Allows insecure connections to servers whose certificate has not been signed by an approved CA. You should always disable |
boolean |
false |
sslHostnameVerificationEnable |
Enable the server hostname verification. |
boolean |
false |
Download CDC for Cassandra
IMPORTANT
By downloading this DataStax product, you agree to the terms of the open-source Apache-2.0 license agreement.
-
Download the
cassandra-source-connectors-<version>.tar
file from the DataStax downloads page.
The following files are available:
Streaming platform |
---|
NAR file |
Apache Pulsar 2.8 and Luna Streaming 2.8 |
pulsar-cassandra-source-<version>.nar |
Extract the files from the tar with the following command:
tar xvf cassandra-source-connectors-<version>.tar
Use the version that matches your streaming platform.
Deploy CDC for Cassandra
To deploy the CDC for Cassandra NAR
file in your Pulsar cluster, upload it to Luna Streaming or Pulsar using the pulsar-admin sources create
command.
You need to deploy CDC for Cassandra for each CDC-enabled table.
For each CDC-enabled table, the change agent will send events to the events topic.
The topic name is determined by the topicPrefix
setting in the agent (default is events-
).
The <keyspace_name>.<table_name>
is appended to the prefix to build the topic name.
You have to specify the following parameters:
-
Connector name. You have one connector per CDC-enabled Cassandra table, make sure to use a unique name.
-
Previously downloaded CDC for Cassandra
NAR
file. -
Pulsar
tenant
andnamespace
where the connector will run. -
Destination
topic
for Cassandra data (data
topic). -
Number of instances (parallelism) of the connector. For high-volume tables, you might need to run multiple connector instances to prevent a growing backlog on the events topic.
-
Name of the
events
topic the connector will read from. -
Keyspace and table associated with the events topics.
-
Connection information (such as contact points and DC information) for Cassandra.
Here is an example:
pulsar-admin source create \
--name cassandra-source-1 \
--archive /path/to/pulsar-cassandra-source-<version>.nar \
--tenant public \
--namespace default \
--destination-topic-name public/default/data-ks1.table1 \
--parallelism 1 \
--source-config '{
"events.topic": "persistent://public/default/events-cdc-ks1.table1",
"keyspace": "ks1",
"table": "table1",
"contactPoints": "localhost",
"port": 9042,
"loadBalancing.localDc": "DC1",
"auth.provider": "PLAIN",
"auth.username": "<username>",
"auth.password": "<password>"
}'
Then check your connector is in the running state with no errors:
pulsar-admin source status --name cassandra-source-1
Once the connector is running, it will process events from the events
topic and publish the result to the data
topic.
For the full set of source configuration options, see CDC for Cassandra settings. For the full set of Cassandra authentication options, see Cassandra Authentication settings. For the full set of Cassandra SSL settings, see Cassandra SSL/TLS settings. For advanced configuration of the Cassandra driver in the CDC for Cassandra, see Pass CDC for Cassandra settings directly to the DataStax Java driver.
Enabling and disabling CDC on a table
Once the change agent is installed and the connector is running, you can enable or disable CDC on table using the following commands:
CREATE TABLE foo (a int, b text, PRIMARY KEY(a)) WITH cdc=true;
ALTER TABLE foo WITH cdc=true;
ALTER TABLE foo WITH cdc=false;
When CDC is enabled on a table, updates to that table are sent by the change agent to the CDC for Cassandra which further processes the event and then sends it to the data topic when it can be processed by other connectors (for example, Elasticsearch).
DataStax Cassandra Source Connector for Apache Pulsar™ settings
Name | Description | Type | Validator | Default |
---|---|---|---|---|
events.topic |
The topic name to listen cassandra mutation events to |
string |
||
keyspace |
Cassandra keyspace name |
string |
||
table |
Cassandra table name |
string |
||
cloud.secureConnectBundle |
The location of the cloud secure bundle used to connect to Datastax Astra DB. |
string |
"" |
|
compression |
Compression algorithm to use when issuing requests to the database server |
string |
(case insensitive) [LZ4, NONE, SNAPPY] |
None |
connectionPoolLocalSize |
Number of connections that driver maintains within a connection pool to each node in local dc |
int |
[1,…] |
4 |
contactPoints |
Initial contact points |
list |
"" |
|
ignoreErrors |
Specifies which errors the connector should ignore when processing the record. Valid values are: None (never ignore errors), All (ignore all errors), Driver (ignore driver errors only, i.e. errors when writing to the database). |
string |
None |
|
jmx |
Whether to enable JMX reporting |
boolean |
true |
|
key.converter |
Converter class used to write the message key to the data topic. This setting is experimental for advanced user only. |
class |
null |
|
loadBalancing.localDc |
The datacenter name (commonly dc1, dc2, etc.) local to the machine on which the connector is running |
string |
"" |
|
maxConcurrentRequests |
The maximum number of requests to send at once |
int |
[1,…] |
500 |
metricsHighestLatency |
This is used to scale internal data structures for gathering metrics. It should be higher than queryExecutionTimeout. This parameter should be expressed in seconds. |
int |
[1,…] |
35 |
port |
Port to connect to nodes |
int |
[1,…] |
9042 |
queryExecutionTimeout |
CQL statement execution timeout, in seconds |
int |
[1,…] |
30 |
value.converter |
Converter class used to write the message value to the data topic. This setting is experimental for advanced user only. |
class |
null |
|
batch.size |
The batch size for grouping mutations before sending them to the data topic |
int |
200 |
|
query.backoffInMs |
Retry backoff in milliseconds when there is not enough Cassandra replicas to perform the query. (Capped exponential jittered backoff) |
long |
100 |
|
query.executors |
The initial and maximum number of threads to execute concurrent Cassandra queries |
int |
10 |
|
query.maxBackoffInSec |
Maximum backoff delay in seconds when there is not enough Cassandra replicas to perform the query |
long |
3600 |
|
query.maxMobileAvgLatency |
Maximum mobile average CQL query latency beyond which the number of executors is decreased |
long |
100 |
|
query.minMobileAvgLatency |
Minimum mobile average CQL query latency beyond which the number of executors is increased |
long |
10 |
|
columns |
Regular expression of the Cassandra replicated column names |
string |
.* |
|
jmxConnectorDomain |
Domain for JMX reporting |
string |
com.datastax.oss.cdc |
|
events.subscription.name |
The pulsar events topic subscription name, with a default set to 'sub' |
string |
sub |
|
events.subscription.type |
The pulsar events topic subscription type, with a default set to Key_Shared (case sensitive) for a non-partitioned events topic. If your events topic is partitioned, you should set subscription type to Failover |
string |
[Exclusive, Shared, Failover, Key_Shared] |
Key_Shared |
cache.max.digest |
The maximum number of digest per mutation cache entry, with a default set to 3 |
long |
[1,…] |
3 |
cache.max.capacity |
The maximum capacity of the mutation cache, with a default size of 32767 |
long |
[1,…] |
32767 |
cache.expire.after.ms |
The mutation cache entry duration in milliseconds, with a default value of 60 seconds. |
long |
[1000,…] |
60000 |
cache.only_if_coordinator_match |
Cache the mutation digest only if the coordinator node is the originator node. |
boolean |
true |
Cassandra Authentication settings
Name | Description | Type | Validator | Default |
---|---|---|---|---|
auth.gssapi.keyTab |
Kerberos keytab file for GSSAPI provider authentication |
string |
"" |
|
auth.gssapi.principal |
Kerberos principal for GSSAPI provider authentication |
string |
"" |
|
auth.gssapi.service |
SASL service name to use for GSSAPI provider authentication |
string |
dse |
|
auth.password |
Password for PLAIN (username/password) provider authentication |
password |
[hidden] |
|
auth.provider |
Authentication provider |
string |
[None, PLAIN, GSSAPI] |
None |
auth.username |
Username for PLAIN (username/password) provider authentication |
string |
"" |
Cassandra SSL/TLS settings
Name | Description | Type | Validator | Default |
---|---|---|---|---|
ssl.cipherSuites |
The cipher suites to enable |
list |
"" |
|
ssl.hostnameValidation |
Whether or not to validate node hostnames when using SSL |
boolean |
true |
|
ssl.keystore.password |
Keystore password |
password |
[hidden] |
|
ssl.keystore.path |
The path to the keystore file |
string |
"" |
|
ssl.openssl.keyCertChain |
The path to the certificate chain file |
string |
"" |
|
ssl.openssl.privateKey |
The path to the private key file |
string |
"" |
|
ssl.provider |
SSL/TLS provider |
string |
[None, JDK, OpenSSL] |
None |
ssl.truststore.password |
Truststore password |
password |
[hidden] |
|
ssl.truststore.path |
The path to the truststore file |
string |
"" |
Pass CDC for Cassandra settings directly to the DataStax Java driver
In your CDC for Cassandra configuration file, you can directly pass settings to the DataStax Java driver by using the datastax-java-driver
prefix.
For example:
datastax-java-driver.basic.request.consistency=ALL
Mapping CDC for Cassandra settings to Java driver properties
The following table identifies functionally equivalent CDC for Cassandra and DataStax Java driver settings.
If you define both in your configuration, the CDC for Cassandra setting take precedence over the datastax-java-driver.property-name .
If you do not provide either in your configuration, CDC for Cassandra defaults are in effect.
|
For information about the Java properties, refer to the DataStax Java driver documentation.
DataStax Cassandra Source Connector for Apache Pulsar™ | Using datastax-java-driver prefix |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
There is a difference between the CDC for Cassandra’s contactPoints
setting and the Java driver’s datastax-java-driver.basic.contact-points
.
For CDC for Cassandra’s contactPoints
, the value of the port is appended to every host provided by this setting.
For datastax-java-driver.basic.contact-points
, you must provide the fully qualified contact points (host:port
).
By passing in the Java driver’s setting, this option gives you more configuration flexibility because you can specify a different port for each host. For example:
datastax-java-driver.basic.contact-points = 127.0.0.1:9042, 127.0.0.2:9042
Java driver reference
For more information, refer to the Java driver reference configuration topic.
Scaling up your configuration
If your connector is not keeping up and the messages in the events topic are growing, increase the number of connector instances using the parallelism
parameter.
Pulsar ensures in-order processing using Key_Shared subscriptions.
If the volume of data in the events topic is very high, partition the events topic to distribute the load across multiple Pulsar brokers.
Do this before starting the change agent, since by default Pulsar will auto-create non-partitioned topics.
If you are using partitioned topics, change events.subscription.type
to Failover
to ensure in-order delivery when running multiple connector instances.
To further improve the throughput, you can adjust the pulsarBatchDelayInMs
in the change agent to batch messages in the change agent before sending them to Pulsar.
To improve performance on individual connector instances as they read data from Cassandra, you can adjust the batch.size
and the query.executors
.
Increasing these values from their defaults will increase parallelism within the connector instances.
The de-duplication cache is configurable, including the cache size with cache.max.capacity
, the entry retention duration cache.expire.after.ms
and the number of MD5 digest per primary key entry with cache.max.digest
.