CDC Backfill CLI

When CDC is enabled on a table, the data topic doesn’t contain any data from before CDC was enabled. The backfill CLI solves this problem by exporting the table’s primary key to a Comma Separated Values (CSV) file, storing the CSV file on disk, and sending the primary key from the CSV file to the event topic. The Cassandra Source Connector reads the primary key from the event topic and populates the data topic with historical data. The backfill CLI is powered by the DataStax Bulk Loader, a battle-tested data loader tool. This means the CLI takes full advantage of optimizations done in DSBulk when exporting data from table to disk.

Developers can also use the backfill CLI to trigger change events for downstream applications without having to insert new data.

Installation

The CDC backfill CLI is distributed both as a JAR file and as a Pulsar-admin extension NAR file. The Pulsar-admin extension is packaged with the DataStax Luna Streaming distribution in the /cliextensions folder, so you don’t need to build from source unless you want to make changes to the code.

Both artifacts are built with Gradle. To build the CLI, run the following commands:

  • Gradle

  • Result

git clone git@github.com:datastax/cdc-apache-cassandra.git
cd cdc-apache-cassandra
./gradlew backfill-cli:assemble
BUILD SUCCESSFUL in 37s
17 actionable tasks: 15 executed, 2 up-to-date

Gradle generates two main artifacts:

  • An uber JAR file containing the CLI and all its dependencies: backfill-cli/build/libs/backfill-cli-2.2.9-all.jar

  • A NAR archive that wraps the CLI as a Pulsar-admin Extension: backfill-cli/build/libs/pulsar-cassandra-admin-2.2.9-nar.nar

Once the artifacts are generated, you can run the backfill CLI tool as either a standalone Java application or as a Pulsar-admin extension.

  • Java standalone

  • Pulsar-admin extension

java -jar backfill-cli/build/libs/backfill-cli-2.2.9-all.jar --data-dir target/export --export-host 127.0.0.1:9042 \
 --export-username cassandra --export-password cassandra --keyspace ks1 --table table1

The Pulsar-admin extension is packaged with the DataStax Luna Streaming distribution in the /cliextensions folder, so you don’t need to build from source unless you want to make changes to the code.

  1. Move the generated NAR archive to the /cliextensions folder of your Pulsar installation (e.g. /pulsar/cliextensions).

  2. Modify the client.conf file of your Pulsar installation to include: customCommandFactories=cassandra-cdc.

  3. Run the following command (this assumes the default installation of DSE Cassandra):

    ./bin/pulsar-admin cassandra-cdc backfill --data-dir target/export --export-host 127.0.0.1:9042 \
     --export-username cassandra --export-password cassandra --keyspace ks1 --table table1

Test

This test quickly confirms your CDC backfill is working correctly.

Prerequisites:

  • A running DSE Cassandra cluster

  • A running Pulsar cluster (standalone is fine)

  • Backfill CLI built with Gradle (see Installation)

    1. Start DSE Cassandra from the installation directory.

      ./bin/dse cassandra -f
    2. Confirm that a Cassandra node is running:

      ./bin/dse-6.8.33 nodetool status
      Datacenter: Cassandra
      =====================
      Status=Up/Down
      |/ State=Normal/Leaving/Joining/Moving/Stopped
      --  Address    Load       Owns (effective)  Host ID                               Token                                    Rack
      UN  127.0.0.1  169.82 KiB  100.0%            d4a8181f-b248-431f-a218-47651a30ef4d  -5094113265891965089                     rack1
    3. Create a CDC-enabled table and add values to the table with CQL shell:

      ./bin/dse-6.8.33 cqlsh
      Connected to Test Cluster at 127.0.0.1:9042.
      [cqlsh 6.8.0 | DSE 6.8.33 | CQL spec 3.4.5 | DSE protocol v2]
      cqlsh> create CREATE KEYSPACE  ks1 WITH REPLICATION = {'class' : 'SimpleStrategy', 'replication_factor' : 1};
      cqlsh> CREATE KEYSPACE  ks1 WITH REPLICATION = {'class' : 'SimpleStrategy', 'replication_factor' : 1};
      AlreadyExists: Keyspace 'ks1' already exists
      cqlsh> CREATE TABLE ks1.table1 (id text primary key, val text) with cdc=true;
      cqlsh> INSERT INTO ks1.table1 (id,val) VALUES ('1', 'val1');
      cqlsh> INSERT INTO ks1.table1 (id,val) VALUES ('2', 'val2');
      cqlsh> INSERT INTO ks1.table1 (id,val) VALUES ('3', 'val3');
    4. Start Pulsar standalone.

      ./bin/pulsar standalone
    5. Create a Cassandra source connector in Pulsar:

      lunastreaming-2.10.3.1 ./bin/pulsar-admin sources create \
        --name csc --classname com.datastax.oss.pulsar.source.CassandraSource \
        --archive connectors/pulsar-cassandra-source-2.2.2.nar \
        --tenant public \
        --namespace default \
        --destination-topic-name "persistent://public/default/data-ks1.table1" \
        --parallelism 1 \
        --source-config '{
              "events.topic": "persistent://public/default/events-ks1.table1",
              "keyspace": "ks1",
              "table": "table1",
              "contactPoints": "localhost",
              "port": "9042",
          "loadBalancing.localDc": "Cassandra"
        }'
      
      Created successfully
    6. Create a consumer subscription on the CDC data topic:

      ./bin/pulsar-client consume -s subsc -n 0 -st auto_consume -p Earliest persistent://public/default/data-ks1.table1
    7. Now everything is set up: you have a Cassandra table with pre-existing data, a Pulsar topic with a CDC connector, and a Pulsar consumer subscription.

      The moment of truth! Run the backfill CLI to hydrate the data topic with the existing data in the Cassandra table:

      ./bin/pulsar-admin cassandra-cdc backfill --data-dir target/export --export-host 127.0.0.1:9042
       --export-username cassandra --export-password cassandra --keyspace ks1 --table table1
    8. You will get a lot of output in the console, but the last line should look like this:

      2023-04-14T11:38:53,421-0400 [main] INFO  com.datastax.oss.cdc.backfill.importer.PulsarImporter - Pulsar Importer Summary: Import status=STATUS_OK, Read mutations from disk=3, Sent mutations=3, Failed mutations=0

      Success! Your data topic is now populated with the existing data from the Cassandra table.

    9. Check your Pulsar subscription as well to ensure Pulsar received the change events.

      Pulsar consumer subscription output:
      ----- got message -----
      2023-04-14T11:47:48,652-0400 [main] INFO  org.apache.pulsar.client.impl.schema.AutoConsumeSchema - Configure topic schema \x00\x00\x00\x00\x00\x00\x00\x00 for topic persistent://public/default/data-ks1.table1 : {"key":{"name":"table1","schema":{"type":"record","name":"table1","namespace":"ks1","doc":"Table ks1.table1","fields":[{"name":"id","type":"string"}]},"type":"AVRO","timestamp":0,"properties":{}},"value":{"name":"table1","schema":{"type":"record","name":"table1","namespace":"ks1","doc":"Table ks1.table1","fields":[{"name":"val","type":["null","string"]}]},"type":"AVRO","timestamp":0,"properties":{}}}
      2023-04-14T11:47:48,654-0400 [main] INFO  org.apache.pulsar.client.impl.schema.generic.MultiVersionGenericAvroReader - Load schema reader for version(0), schema is : {
        "name": "table1",
        "schema": {
          "type": "record",
          "name": "table1",
          "namespace": "ks1",
          "doc": "Table ks1.table1",
          "fields": [
            {
              "name": "id",
              "type": "string"
            }
          ]
        },
        "type": "AVRO",
        "timestamp": 0,
        "properties": {}
      }
      2023-04-14T11:47:48,674-0400 [main] INFO  org.apache.pulsar.client.impl.schema.generic.MultiVersionGenericAvroReader - Load schema reader for version(0), schema is : {
        "name": "table1",
        "schema": {
          "type": "record",
          "name": "table1",
          "namespace": "ks1",
          "doc": "Table ks1.table1",
          "fields": [
            {
              "name": "val",
              "type": [
                "null",
                "string"
              ]
            }
          ]
        },
        "type": "AVRO",
        "timestamp": 0,
        "properties": {}
      }
      key:[AjI=], properties:[writetime=1681487266389000], content:{key={id=2}, value={val=val2}}
      ----- got message -----
      key:[AjM=], properties:[writetime=1681487267244000], content:{key={id=3}, value={val=val3}}
      ----- got message -----
      key:[AjE=], properties:[writetime=1681487267246000], content:{key={id=1}, value={val=val1}}
      2023-04-14T11:48:18,905-0400 [pulsar-timer-6-1] INFO  org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - [persistent://public/default/data-ks1.table1] [subsc] [5759a] Prefetched messages: 0 --- Consume throughput received: 0.05 msgs/s --- 0.00 Mbit/s --- Ack sent rate: 0.05 ack/s --- Failed messages: 0 --- batch messages: 0 ---Failed acks: 0

Now that you’ve confirmed the backfill CLI is working, run it manually when you want to backfill data on a CDC-enabled table, or when you need to create events for existing data or upstream applications.

Parameters reference

When running the backfill CLI as a Pulsar-admin extension, all --pulsar-* parameters are loaded from the client.conf file.

The --dsbulk-log-dir is only available when running the backfill CLI as a standalone Java application.

The --export-dsbulk-option parameter passes extra parameters to DSBulk. The relevant DSBulk settings configure the CSV connector and can be found here. Shortened option names are not supported.

In both the Pulsar-admin extension and the standalone Java application, C* related configurations are exclusively passed as command line arguments.

CDC backfill CLI parameters
Parameter Description

--data-dir=PATH

The directory where data will be exported to and imported from. The default is a 'data' subdirectory in the current working directory. The data directory will be created if it does not exist. Tables will be exported in subdirectories of the data directory specified here; there will be one subdirectory per keyspace inside the data directory, then one subdirectory per table inside each keyspace directory.

--help, -h

Displays this help message

--dsbulk-log-dir=PATH, -l

The directory where DSBulk should store its logs. The default is a 'logs' subdirectory in the current working directory. This subdirectory will be created if it does not exist. Each DSBulk operation will create a subdirectory inside the log directory specified here. This command is not available in the Pulsar-admin extension.

--export-bundle=PATH

The path to a secure connect bundle to connect to the Cassandra cluster, if that cluster is a DataStax Astra cluster. Options --export-host and --export-bundle are mutually exclusive.

--export-consistency=CONSISTENCY

The consistency level to use when exporting data. The default is LOCAL_QUORUM.

--export-max-concurrent-files=NUM|AUTO

The maximum number of concurrent files to write to. Must be a positive number or the special value AUTO. The default is AUTO.

--export-max-concurrent-queries=NUM|AUTO

The maximum number of concurrent queries to execute. Must be a positive number or the special value AUTO. The default is AUTO.

--export-splits=NUM|NC

The maximum number of token range queries to generate. Use the NC syntax to specify a multiple of the number of available cores, e.g. 8C = 8 times the number of available cores. The default is 8C. This is an advanced setting; you should rarely need to modify the default value.

--export-dsbulk-option=OPT=VALUE

An extra DSBulk option to use when exporting. Any valid DSBulk option can be specified here, and it will be passed as-is to the DSBulk process. DSBulk options, including driver options, must be passed as '--long.option.name=<value>'. Short options are not supported. For more DSBulk options, see here.

--export-host=HOST[:PORT]

The host name or IP and, optionally, the port of a node from the Cassandra cluster. If the port is not specified, it will default to 9042. This option can be specified multiple times. Options --export-host and --export-bundle are mutually exclusive.

--export-password

The password to use to authenticate against the origin cluster. Options --export-username and --export-password must be provided together, or not at all. Omit the parameter value to be prompted for the password interactively.

--export-protocol-version=VERSION

The protocol version to use to connect to the Cassandra cluster, e.g. 'V4'. If not specified, the driver will negotiate the highest version supported by both the client and the server.

--export-username=STRING

The username to use to authenticate against the origin cluster. Options --export-username and --export-password must be provided together, or not at all.

--keyspace=<keyspace>, -k

The name of the keyspace where the table to be exported exists

--max-rows-per-second=PATH

The maximum number of rows per second to read from the Cassandra table. Setting this option to any negative value or zero will disable it. The default is -1.

--table=<table>, -t

The name of the table to export data from for cdc back filling

--version, -v

Displays version info.

Pulsar connectivity parameters

Pulsar connectivity parameters are auto-populated from the client.conf file available to the CLI when used as a Pulsar-admin extension. These parameters should be passed as command line arguments in the standalone Java application.

Pulsar connectivity parameters
Parameter Description

--events-topic-prefix=<topicPrefix>

The event topic name prefix. The <keyspace_name>.<table_name> is appended to that prefix to build the topic name.

--pulsar-auth-params=<pulsarAuthParams>

The Pulsar authentication parameters.

--pulsar-auth-plugin-class-name=<pulsarAuthPluginClassName>

The Pulsar authentication plugin class name.

--pulsar-url=<pulsarServiceUrl>

The Pulsar broker service URL.

--pulsar-ssl-provider=<sslProvider>

The SSL/TLS provider to use.

--pulsar-ssl-truststore-path=<sslTruststorePath>

The path to the SSL/TLS truststore file.

--pulsar-ssl-truststore-password=<sslTruststorePassword>

The password for the SSL/TLS truststore.

--pulsar-ssl-truststore-type=<sslTruststoreType>

The type of the SSL/TLS truststore.

--pulsar-ssl-keystore-path=<sslKeystorePath>

The path to the SSL/TLS keystore file.

--pulsar-ssl-keystore-password=<sslKeystorePassword>

The password for the SSL/TLS keystore.

--pulsar-ssl-cipher-suites=<sslCipherSuites>

Defines one or more cipher suites to use for negotiating the SSL/TLS connection.

--pulsar-ssl-enabled-protocols=<sslEnabledProtocols>

Enabled SSL/TLS protocols

--pulsar-ssl-allow-insecure-connections

Allows insecure connections to servers whose certificate has not been signed by an approved CA. You should always disable sslAllowInsecureConnection in production environments.

--pulsar-ssl-enable-hostname-verification

Enable the server hostname verification.

--pulsar-ssl-tls-trust-certs-path=<tlsTrustCertsFilePath>

The path to the trusted TLS certificate file.

--pulsar-ssl-use-key-store-tls

If TLS is enabled, specifies whether to use KeyStore type as TLS configuration parameter.

What’s next?

Was this helpful?

Give Feedback

How can we improve the documentation?

© 2024 DataStax | Privacy policy | Terms of use

Apache, Apache Cassandra, Cassandra, Apache Tomcat, Tomcat, Apache Lucene, Apache Solr, Apache Hadoop, Hadoop, Apache Pulsar, Pulsar, Apache Spark, Spark, Apache TinkerPop, TinkerPop, Apache Kafka and Kafka are either registered trademarks or trademarks of the Apache Software Foundation or its subsidiaries in Canada, the United States and/or other countries. Kubernetes is the registered trademark of the Linux Foundation.

General Inquiries: +1 (650) 389-6000, info@datastax.com