CDC Backfill CLI
When CDC is enabled on a table, the data topic doesn’t contain any data from before CDC was enabled. The backfill CLI solves this problem by exporting the table’s primary key to a Comma Separated Values (CSV) file, storing the CSV file on disk, and sending the primary key from the CSV file to the event topic. The Cassandra Source Connector reads the primary key from the event topic and populates the data topic with historical data. The backfill CLI is powered by the DataStax Bulk Loader, a battle-tested data loader tool. This means the CLI takes full advantage of optimizations done in DSBulk when exporting data from table to disk.
Developers can also use the backfill CLI to trigger change events for downstream applications without having to insert new data.
Installation
The CDC backfill CLI is distributed both as a JAR file and as a Pulsar-admin extension NAR file. The Pulsar-admin extension is packaged with the DataStax Luna Streaming distribution in the /cliextensions folder, so you don’t need to build from source unless you want to make changes to the code.
Both artifacts are built with Gradle. To build the CLI, run the following commands:
-
Gradle
-
Result
git clone git@github.com:datastax/cdc-apache-cassandra.git
cd cdc-apache-cassandra
./gradlew backfill-cli:assemble
BUILD SUCCESSFUL in 37s
17 actionable tasks: 15 executed, 2 up-to-date
Gradle generates two main artifacts:
-
An uber JAR file containing the CLI and all its dependencies: backfill-cli/build/libs/backfill-cli-2.3.0-all.jar
-
A NAR archive that wraps the CLI as a Pulsar-admin Extension: backfill-cli/build/libs/pulsar-cassandra-admin-2.3.0-nar.nar
Once the artifacts are generated, you can run the backfill CLI tool as either a standalone Java application or as a Pulsar-admin extension.
-
Java standalone
-
Pulsar-admin extension
java -jar backfill-cli/build/libs/backfill-cli-2.3.0-all.jar --data-dir target/export --export-host 127.0.0.1:9042 \
--export-username cassandra --export-password cassandra --keyspace ks1 --table table1
The Pulsar-admin extension is packaged with the DataStax Luna Streaming distribution in the /cliextensions folder, so you don’t need to build from source unless you want to make changes to the code.
-
Move the generated NAR archive to the /cliextensions folder of your Pulsar installation (e.g. /pulsar/cliextensions).
-
Modify the client.conf file of your Pulsar installation to include:
customCommandFactories=cassandra-cdc
. -
Run the following command (this assumes the default installation of DSE Cassandra):
./bin/pulsar-admin cassandra-cdc backfill --data-dir target/export --export-host 127.0.0.1:9042 \ --export-username cassandra --export-password cassandra --keyspace ks1 --table table1
Test
This test quickly confirms your CDC backfill is working correctly.
Prerequisites:
-
A running DSE Cassandra cluster
-
A running Pulsar cluster (standalone is fine)
-
Backfill CLI built with Gradle (see Installation)
-
Start DSE Cassandra from the installation directory.
./bin/dse cassandra -f
-
Confirm that a Cassandra node is running:
./bin/dse-6.8.33 nodetool status Datacenter: Cassandra ===================== Status=Up/Down |/ State=Normal/Leaving/Joining/Moving/Stopped -- Address Load Owns (effective) Host ID Token Rack UN 127.0.0.1 169.82 KiB 100.0% d4a8181f-b248-431f-a218-47651a30ef4d -5094113265891965089 rack1
-
Create a CDC-enabled table and add values to the table with CQL shell:
./bin/dse-6.8.33 cqlsh Connected to Test Cluster at 127.0.0.1:9042. [cqlsh 6.8.0 | DSE 6.8.33 | CQL spec 3.4.5 | DSE protocol v2] cqlsh> create CREATE KEYSPACE ks1 WITH REPLICATION = {'class' : 'SimpleStrategy', 'replication_factor' : 1}; cqlsh> CREATE KEYSPACE ks1 WITH REPLICATION = {'class' : 'SimpleStrategy', 'replication_factor' : 1}; AlreadyExists: Keyspace 'ks1' already exists cqlsh> CREATE TABLE ks1.table1 (id text primary key, val text) with cdc=true; cqlsh> INSERT INTO ks1.table1 (id,val) VALUES ('1', 'val1'); cqlsh> INSERT INTO ks1.table1 (id,val) VALUES ('2', 'val2'); cqlsh> INSERT INTO ks1.table1 (id,val) VALUES ('3', 'val3');
-
Start Pulsar standalone.
./bin/pulsar standalone
-
Create a Cassandra source connector in Pulsar:
lunastreaming-2.10.3.1 ./bin/pulsar-admin sources create \ --name csc --classname com.datastax.oss.pulsar.source.CassandraSource \ --archive connectors/pulsar-cassandra-source-2.2.2.nar \ --tenant public \ --namespace default \ --destination-topic-name "persistent://public/default/data-ks1.table1" \ --parallelism 1 \ --source-config '{ "events.topic": "persistent://public/default/events-ks1.table1", "keyspace": "ks1", "table": "table1", "contactPoints": "localhost", "port": "9042", "loadBalancing.localDc": "Cassandra" }' Created successfully
-
Create a consumer subscription on the CDC data topic:
./bin/pulsar-client consume -s subsc -n 0 -st auto_consume -p Earliest persistent://public/default/data-ks1.table1
-
Now everything is set up: you have a Cassandra table with pre-existing data, a Pulsar topic with a CDC connector, and a Pulsar consumer subscription.
The moment of truth! Run the backfill CLI to hydrate the data topic with the existing data in the Cassandra table:
./bin/pulsar-admin cassandra-cdc backfill --data-dir target/export --export-host 127.0.0.1:9042 --export-username cassandra --export-password cassandra --keyspace ks1 --table table1
-
You will get a lot of output in the console, but the last line should look like this:
2023-04-14T11:38:53,421-0400 [main] INFO com.datastax.oss.cdc.backfill.importer.PulsarImporter - Pulsar Importer Summary: Import status=STATUS_OK, Read mutations from disk=3, Sent mutations=3, Failed mutations=0
Success! Your data topic is now populated with the existing data from the Cassandra table.
-
Check your Pulsar subscription as well to ensure Pulsar received the change events.
Pulsar consumer subscription output:
----- got message ----- 2023-04-14T11:47:48,652-0400 [main] INFO org.apache.pulsar.client.impl.schema.AutoConsumeSchema - Configure topic schema \x00\x00\x00\x00\x00\x00\x00\x00 for topic persistent://public/default/data-ks1.table1 : {"key":{"name":"table1","schema":{"type":"record","name":"table1","namespace":"ks1","doc":"Table ks1.table1","fields":[{"name":"id","type":"string"}]},"type":"AVRO","timestamp":0,"properties":{}},"value":{"name":"table1","schema":{"type":"record","name":"table1","namespace":"ks1","doc":"Table ks1.table1","fields":[{"name":"val","type":["null","string"]}]},"type":"AVRO","timestamp":0,"properties":{}}} 2023-04-14T11:47:48,654-0400 [main] INFO org.apache.pulsar.client.impl.schema.generic.MultiVersionGenericAvroReader - Load schema reader for version(0), schema is : { "name": "table1", "schema": { "type": "record", "name": "table1", "namespace": "ks1", "doc": "Table ks1.table1", "fields": [ { "name": "id", "type": "string" } ] }, "type": "AVRO", "timestamp": 0, "properties": {} } 2023-04-14T11:47:48,674-0400 [main] INFO org.apache.pulsar.client.impl.schema.generic.MultiVersionGenericAvroReader - Load schema reader for version(0), schema is : { "name": "table1", "schema": { "type": "record", "name": "table1", "namespace": "ks1", "doc": "Table ks1.table1", "fields": [ { "name": "val", "type": [ "null", "string" ] } ] }, "type": "AVRO", "timestamp": 0, "properties": {} } key:[AjI=], properties:[writetime=1681487266389000], content:{key={id=2}, value={val=val2}} ----- got message ----- key:[AjM=], properties:[writetime=1681487267244000], content:{key={id=3}, value={val=val3}} ----- got message ----- key:[AjE=], properties:[writetime=1681487267246000], content:{key={id=1}, value={val=val1}} 2023-04-14T11:48:18,905-0400 [pulsar-timer-6-1] INFO org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - [persistent://public/default/data-ks1.table1] [subsc] [5759a] Prefetched messages: 0 --- Consume throughput received: 0.05 msgs/s --- 0.00 Mbit/s --- Ack sent rate: 0.05 ack/s --- Failed messages: 0 --- batch messages: 0 ---Failed acks: 0
-
Now that you’ve confirmed the backfill CLI is working, run it manually when you want to backfill data on a CDC-enabled table, or when you need to create events for existing data or upstream applications.
Parameters reference
When running the backfill CLI as a Pulsar-admin extension, all --pulsar-*
parameters are loaded from the client.conf
file.
The --dsbulk-log-dir
is only available when running the backfill CLI as a standalone Java application.
The --export-dsbulk-option
parameter passes extra parameters to DSBulk.
The relevant DSBulk settings configure the CSV connector and can be found here.
Shortened option names are not supported.
In both the Pulsar-admin extension and the standalone Java application, C* related configurations are exclusively passed as command line arguments.
Parameter | Description |
---|---|
--data-dir=PATH |
The directory where data will be exported to and imported from. The default is a 'data' subdirectory in the current working directory. The data directory will be created if it does not exist. Tables will be exported in subdirectories of the data directory specified here; there will be one subdirectory per keyspace inside the data directory, then one subdirectory per table inside each keyspace directory. |
--help, -h |
Displays this help message |
--dsbulk-log-dir=PATH, -l |
The directory where DSBulk should store its logs. The default is a 'logs' subdirectory in the current working directory. This subdirectory will be created if it does not exist. Each DSBulk operation will create a subdirectory inside the log directory specified here. This command is not available in the Pulsar-admin extension. |
--export-bundle=PATH |
The path to a secure connect bundle to connect to the Cassandra cluster, if that cluster is a DataStax Astra cluster. Options --export-host and --export-bundle are mutually exclusive. |
--export-consistency=CONSISTENCY |
The consistency level to use when exporting data. The default is LOCAL_QUORUM. |
--export-max-concurrent-files=NUM|AUTO |
The maximum number of concurrent files to write to. Must be a positive number or the special value AUTO. The default is AUTO. |
--export-max-concurrent-queries=NUM|AUTO |
The maximum number of concurrent queries to execute. Must be a positive number or the special value AUTO. The default is AUTO. |
--export-splits=NUM|NC |
The maximum number of token range queries to generate. Use the NC syntax to specify a multiple of the number of available cores, e.g. 8C = 8 times the number of available cores. The default is 8C. This is an advanced setting; you should rarely need to modify the default value. |
--export-dsbulk-option=OPT=VALUE |
An extra DSBulk option to use when exporting. Any valid DSBulk option can be specified here, and it will be passed as-is to the DSBulk process. DSBulk options, including driver options, must be passed as '--long.option.name=<value>'. Short options are not supported. For more DSBulk options, see here. |
--export-host=HOST[:PORT] |
The host name or IP and, optionally, the port of a node from the Cassandra cluster. If the port is not specified, it will default to 9042. This option can be specified multiple times. Options --export-host and --export-bundle are mutually exclusive. |
--export-password |
The password to use to authenticate against the origin cluster. Options --export-username and --export-password must be provided together, or not at all. Omit the parameter value to be prompted for the password interactively. |
--export-protocol-version=VERSION |
The protocol version to use to connect to the Cassandra cluster, e.g. 'V4'. If not specified, the driver will negotiate the highest version supported by both the client and the server. |
--export-username=STRING |
The username to use to authenticate against the origin cluster. Options --export-username and --export-password must be provided together, or not at all. |
--keyspace=<keyspace>, -k |
The name of the keyspace where the table to be exported exists |
--max-rows-per-second=PATH |
The maximum number of rows per second to read from the Cassandra table. Setting this option to any negative value or zero will disable it. The default is -1. |
--table=<table>, -t |
The name of the table to export data from for cdc back filling |
--version, -v |
Displays version info. |
Pulsar connectivity parameters
Pulsar connectivity parameters are auto-populated from the client.conf
file available to the CLI when used as a Pulsar-admin extension.
These parameters should be passed as command line arguments in the standalone Java application.
Parameter | Description |
---|---|
--events-topic-prefix=<topicPrefix> |
The event topic name prefix. The |
--pulsar-auth-params=<pulsarAuthParams> |
The Pulsar authentication parameters. |
--pulsar-auth-plugin-class-name=<pulsarAuthPluginClassName> |
The Pulsar authentication plugin class name. |
--pulsar-url=<pulsarServiceUrl> |
The Pulsar broker service URL. |
--pulsar-ssl-provider=<sslProvider> |
The SSL/TLS provider to use. |
--pulsar-ssl-truststore-path=<sslTruststorePath> |
The path to the SSL/TLS truststore file. |
--pulsar-ssl-truststore-password=<sslTruststorePassword> |
The password for the SSL/TLS truststore. |
--pulsar-ssl-truststore-type=<sslTruststoreType> |
The type of the SSL/TLS truststore. |
--pulsar-ssl-keystore-path=<sslKeystorePath> |
The path to the SSL/TLS keystore file. |
--pulsar-ssl-keystore-password=<sslKeystorePassword> |
The password for the SSL/TLS keystore. |
--pulsar-ssl-cipher-suites=<sslCipherSuites> |
Defines one or more cipher suites to use for negotiating the SSL/TLS connection. |
--pulsar-ssl-enabled-protocols=<sslEnabledProtocols> |
Enabled SSL/TLS protocols |
--pulsar-ssl-allow-insecure-connections |
Allows insecure connections to servers whose certificate has not been signed by an approved CA. You should always disable |
--pulsar-ssl-enable-hostname-verification |
Enable the server hostname verification. |
--pulsar-ssl-tls-trust-certs-path=<tlsTrustCertsFilePath> |
The path to the trusted TLS certificate file. |
--pulsar-ssl-use-key-store-tls |
If TLS is enabled, specifies whether to use KeyStore type as TLS configuration parameter. |
What’s next?
-
For more on using CDC with Apache Pulsar, including schema management and consumption patterns, see our Streaming learning page.