Cassandra Data Migrator

Use Cassandra Data Migrator to migrate and validate tables between origin and target Cassandra clusters, with available logging and reconciliation support.

Cassandra Data Migrator prerequisites

Read the prerequisites below before using the Cassandra Data Migrator.

  • Install or switch to Java 11. The Spark binaries are compiled with this version of Java.

  • Select a single VM to run this job and install Spark 3.5.1 there. No cluster is necessary.

  • Optionally, install Maven 3.9.x if you want to build the JAR for local development.

Run the following commands to install Apache Spark:

wget https://archive.apache.org/dist/spark/spark-3.5.1/spark-3.5.1-bin-hadoop3-scala2.13.tgz

tar -xvzf spark-3.5.1-bin-hadoop3-scala2.13.tgz

Install Cassandra Data Migrator as a Container

Get the latest image that includes all dependencies from DockerHub.

All migration tools, cassandra-data-migrator and dsbulk and cqlsh, are available in the /assets/ folder of the container.

Install Cassandra Data Migrator as a JAR file

Download the latest JAR file from the Cassandra Data Migrator GitHub repo. Latest release

Version 4.x of Cassandra Data Migrator is not backward-compatible with *.properties files created in previous versions, and package names have changed. If you’re starting new, use the latest released version if possible.

Build Cassandra Data Migrator JAR for local development (optional)

Optionally, you can build the Cassandra Data Migrator JAR for local development. You’ll need Maven 3.9.x.

Example:

cd ~/github
git clone git@github.com:datastax/cassandra-data-migrator.git
cd cassandra-data-migrator
mvn clean package

The fat jar file, cassandra-data-migrator-x.y.z.jar, should be present now in the target folder.

Use Cassandra Data Migrator

  1. Configure for your environment the cdm*.properties file that’s provided in the Cassandra Data Migrator GitHub repo. The file can have any name. It does not need to be cdm.properties or cdm-detailed.properties. In both versions, the spark-submit job processes only the parameters that aren’t commented out. Other parameter values use defaults or are ignored. See the descriptions and defaults in each file. For more information, see the following:

    • The simplified sample properties configuration, cdm.properties. This file contains only those parameters that are commonly configured.

    • The complete sample properties configuration, cdm-detailed.properties, for the full set of configurable settings.

  2. Place the properties file that you elected to use and customize where it can be accessed while running the job using spark-submit.

  3. Run the job using spark-submit command:

./spark-submit --properties-file cdm.properties \
--conf spark.cdm.schema.origin.keyspaceTable="<keyspacename>.<tablename>" \
--master "local[*]" --driver-memory 25G --executor-memory 25G \
--class com.datastax.cdm.job.Migrate cassandra-data-migrator-x.y.z.jar &> logfile_name_$(date +%Y%m%d_%H_%M).txt
  • The command generates a log file logfile_name_*.txt to prevent log output on the console.

  • Update the memory options, driver and executor memory, based on your use case.

Use Cassandra Data Migrator steps in validation mode

To run your migration job with Cassandra Data Migrator in data validation mode, use class option --class com.datastax.cdm.job.DiffData. Example:

./spark-submit --properties-file cdm.properties \
--conf spark.cdm.schema.origin.keyspaceTable="<keyspacename>.<tablename>" \
--master "local[*]" --driver-memory 25G --executor-memory 25G \
--class com.datastax.cdm.job.DiffData cassandra-data-migrator-x.y.z.jar &> logfile_name_$(date +%Y%m%d_%H_%M).txt

The Cassandra Data Migrator validation job reports differences as ERROR entries in the log file. Example:

23/04/06 08:43:06 ERROR DiffJobSession: Mismatch row found for key: [key3] Mismatch: Target Index: 1 Origin: valueC Target: value999)
23/04/06 08:43:06 ERROR DiffJobSession: Corrected mismatch row in target: [key3]
23/04/06 08:43:06 ERROR DiffJobSession: Missing target row found for key: [key2]
23/04/06 08:43:06 ERROR DiffJobSession: Inserted missing row in target: [key2]

To get the list of missing or mismatched records, grep for all ERROR entries in the log files. Differences noted in the log file are listed by primary-key values.

You can also run the Cassandra Data Migrator validation job in an AutoCorrect mode, which can:

  • Add any missing records from the origin to target cluster.

  • Update any mismatched records between the origin and target clusters; this action makes the target cluster the same as the origin cluster.

To enable or disable this feature, use one or both of the following settings in your *.properties configuration file.

spark.cdm.autocorrect.missing                     false|true
spark.cdm.autocorrect.mismatch                    false|true

The Cassandra Data Migrator validation job never deletes records from the target cluster. The job only adds or updates data on the target cluster.

Migrate or validate specific partition ranges

You can also use Cassandra Data Migrator to migrate or validate specific partition ranges. Use a partition-file with the name ./<keyspacename>.<tablename>_partitions.csv. Use the following format in the CSV file, in the current folder as input. Example:

-507900353496146534,-107285462027022883
-506781526266485690,1506166634797362039
2637884402540451982,4638499294009575633
798869613692279889,8699484505161403540

Each line in the CSV represents a partition-range (min,max).

Alternatively, you can also pass the partition-file with a command-line parameter. Example:

./spark-submit --properties-file cdm.properties \
 --conf spark.cdm.schema.origin.keyspaceTable="<keyspacename>.<tablename>" \
 --conf spark.cdm.tokenrange.partitionFile.input="/<path-to-file>/<csv-input-filename>" \
 --master "local[*]" --driver-memory 25G --executor-memory 25G \
 --class com.datastax.cdm.job.<Migrate|DiffData> cassandra-data-migrator-x.y.z.jar &> logfile_name_$(date +%Y%m%d_%H_%M).txt

This mode is specifically useful to process a subset of partition-ranges that may have failed during a previous run.

In the format shown above, the migration and validation jobs autogenerate a file named ./<keyspacename>.<tablename>_partitions.csv. The file contains any failed partition ranges. No file is created if there were no failed partitions. You can use the CSV as input to process any failed partition in a subsequent run.

Perform large-field guardrail violation checks

Use Cassandra Data Migrator to identify large fields from a table that may break your cluster guardrails. For example, Astra DB has a 10MB limit for a single large field. Specify --class com.datastax.cdm.job.GuardrailCheck on the command. Example:

./spark-submit --properties-file cdm.properties \
--conf spark.cdm.schema.origin.keyspaceTable="<keyspacename>.<tablename>" \
--conf spark.cdm.feature.guardrail.colSizeInKB=10000 \
--master "local[*]" --driver-memory 25G --executor-memory 25G \
--class com.datastax.cdm.job.GuardrailCheck cassandra-data-migrator-4.x.x.jar &> logfile_name_$(date +%Y%m%d_%H_%M).txt

Cassandra Data Migrator references

Common connection parameters for Origin and Target

Property Default Notes

spark.cdm.connect.origin.host

localhost

Hostname/IP address of the cluster. May be a comma-separated list, and can follow the <hostname>:<port> convention.

spark.cdm.connect.origin.port

9042

Port number to use if not specified on spark.cdm.connect.origin.host.

spark.cdm.connect.origin.scb

(Not set)

Secure Connect Bundle, used to connect to an Astra DB database. Example: file:///aaa/bbb/scb-enterprise.zip.

spark.cdm.connect.origin.username

cassandra

Username (or client_id value) used to authenticate.

spark.cdm.connect.origin.password

cassandra

Password (or client_secret value) used to authenticate.

spark.cdm.connect.target.host

localhost

Hostname/IP address of the cluster. May be a comma-separated list, and can follow the <hostname>:<port> convention.

spark.cdm.connect.target.port

9042

Port number to use if not specified on spark.cdm.connect.origin.host.

spark.cdm.connect.target.scb

(Not set)

Secure Connect Bundle, used to connect to an Astra DB database. Default is not set. Example if set: file:///aaa/bbb/my-scb.zip.

spark.cdm.connect.target.username

cassandra

Username (or client_id value) used to authenticate.

spark.cdm.connect.origin.password

cassandra

Password (or client_secret value) used to authenticate.

Origin schema parameters

Property Default Notes

spark.cdm.schema.origin.keyspaceTable

Required - the <keyspace>.<table_name> of the table to be migrated. Table must exist in the origin cluster.

spark.cdm.schema.origin.column.ttl.automatic

true

Default is true, unless spark.cdm.schema.origin.column.ttl.names is specified. When true, determine the Time To Live (TTL) of the target record. Find the maximum TTL of all origin columns that can have TTL set. This excludes partition key, clustering key, collections/UDT/tuple, and frozen columns. When false, and spark.cdm.schema.origin.column.ttl.names is not set, the target record has the target table configuration determine the TTL.

spark.cdm.schema.origin.column.ttl.names

Default is empty, meaning the names are determined automatically if spark.cdm.schema.origin.column.ttl.automatic is set. Specify a subset of eligible columns that are used to calculate the TTL of the target record.

spark.cdm.schema.origin.column.writetime.automatic

true

Default is true, unless spark.cdm.schema.origin.column.writetime.names is specified. When true, determine the WRITETIME of the target record. Find the maximum WRITETIME of all origin columns that can have WRITETIME set. This excludes partition key, clustering key, collections/UDT/tuple, and frozen columns. When false, and spark.cdm.schema.origin.column.writetime.names is not set, the target table configuration determines the target record’s WRITETIME.

The spark.cdm.transform.custom.writetime property, if set, overrides spark.cdm.schema.origin.column.writetime.

spark.cdm.schema.origin.column.writetime.names

Default is empty, meaning the names are determined automatically if spark.cdm.schema.origin.column.writetime.automatic is set. Otherwise, specify a subset of eligible columns that are used to calculate the WRITETIME of the target record. Example: data_col1,data_col2,…​

spark.cdm.schema.origin.column.names.to.target

Default is empty. If column names are changed between the origin and target clusters, then this mapped list provides a mechanism to associate the two. The format is <origin_column_name>:<target_column_name>. The list is comma separated. You only need to list renamed columns.

For optimization reasons, Cassandra Data Migrator does not migrate TTL and writetime at the field level. Instead, Cassandra Data Migrator finds the field with the highest TTL and the field with the highest writetime within an origin table row, and uses those values on the entire target table row.

Target schema parameters

Property Default Notes

spark.cdm.schema.target.keyspaceTable

Equals the value of spark.cdm.schema.origin.keyspaceTable

This parameter is commented out. It’s the <keyspace>.<table_name> of the table to be migrated into the target. Table must exist in the target cluster.

Auto-correction parameters

Auto-correction parameters allow Cassandra Data Migrator to correct data differences found between the origin and target clusters when you run the DiffData program. Typically, these parameters are run-disabled for "what if" migration testing, and generate a list of data discrepancies. The reasons for these discrepancies can then be investigated, and if necessary the parameters below can be enabled.

For information about invoking DiffData in a Cassandra Data Migrator command, see Cassandra Data Migrator steps in validation mode.

Property Default Notes

spark.cdm.autocorrect.missing

false

When true, data that is missing in the target cluster but is found in the origin cluster is re-migrated to the target cluster.

spark.cdm.autocorrect.mismatch

false

When true, data that is different between the origin and target clusters is reconciled.

The TIMESTAMP of records may have an effect. If the WRITETIME of the origin record that is determined with .writetime.names is earlier than the WRITETIME of the target record, the change does appear in the target cluster. This comparative state may be particularly challenging to troubleshoot if individual columns or cells have been modified in the target cluster.

spark.cdm.autocorrect.missing.counter

false

Commented out. By default, counter tables are not copied when missing, unless explicitly set.

spark.tokenrange.partitionFile

./<keyspace>.<tablename>_partitions.csv

Commented out. This CSV file is used as input, as well as output, when applicable. If the file exists, only the partition ranges in this file are migrated or validated. Similarly, if exceptions occur while migrating or validating, partition ranges with exceptions are logged to this file.

Performance and operations parameters

Performance and operations parameters that can affect migration throughput, error handling, and similar concerns.

Property Default Notes

spark.cdm.perfops.numParts

10000

In standard operation, the full token range of -2^63 to 2^63-1 is divided into a number of parts, which are parallel processed. You should aim for each part to comprise a total of ≈1-10GB of data to migrate. During initial testing, you may want this to be a small number, such as 1.

spark.cdm.perfops.batchSize

5

When writing to the target cluster, this comprises the number of records that are put into an UNLOGGED batch. Cassandra Data Migrator tends to work on the same partition at a time. If your partition sizes are larger, this number may be increased. If the spark.cdm.perfops.batchSize would mean that more than 1 partition is often contained in a batch, reduce this parameter’s value. Ideally < 1% of batches have more than 1 partition.

spark.cdm.perfops.ratelimit.origin

20000

Concurrent number of operations across all parallel threads from the origin cluster. This value may be adjusted up or down, depending on the amount of data and the processing capacity of the origin cluster.

spark.cdm.perfops.ratelimit.target

40000

Concurrent number of operations across all parallel threads from the target cluster. This may be adjusted up or down, depending on the amount of data and the processing capacity of the target cluster.

spark.cdm.perfops.consistency.read

LOCAL_QUORUM

Commented out. Read consistency from the origin cluster and from the target cluster when records are read for comparison purposes. The consistency parameters may be one of: ANY, ONE, TWO, THREE, QUORUM, LOCAL_ONE, EACH_QUORUM, LOCAL_QUORUM, SERIAL, LOCAL_SERIAL, ALL.

spark.cdm.perfops.consistency.write

LOCAL_QUORUM

Commented out. Write consistency to the target cluster. The consistency parameters may be one of: ANY, ONE, TWO, THREE, QUORUM, LOCAL_ONE, EACH_QUORUM, LOCAL_QUORUM, SERIAL, LOCAL_SERIAL, ALL.

spark.cdm.perfops.printStatsAfter

100000

Commented out. Number of rows of processing after which a progress log entry is made.

spark.cdm.perfops.fetchSizeInRows

1000

Commented out. This parameter affects the frequency of reads from the origin cluster and the frequency of flushes to the target cluster.

spark.cdm.perfops.errorLimit

0

Commented out. Controls how many errors a thread may encounter during MigrateData and DiffData operations before failing. Recommendation: set this parameter to a non-zero value only when not doing a mutation-type operation, such as when you’re running DiffData without .autocorrect.

Transformation parameters

Parameters to perform schema transformations between the origin and target clusters.

By default, these parameters are commented out.

Property Default Notes

spark.cdm.transform.missing.key.ts.replace.value

1685577600000

Timestamp value in milliseconds. Partition and clustering columns cannot have null values. If they are added as part of a schema transformation between the origin and target clusters, it is possible that the origin side is null. In this case, the Migrate data operation fails. This parameter allows a crude constant value to be used in its place that is separate from the constant values feature.

spark.cdm.transform.custom.writetime

0

Default is 0 (disabled). Timestamp value in microseconds to use as the WRITETIME for the target record. This is useful when the WRITETIME of the record in the origin cluster cannot be determined. Such an example is when the only non-key columns are collections. This parameter allows a crude constant value to be used in its place and overrides spark.cdm.schema.origin.column.writetime.names.

spark.cdm.transform.custom.writetime.incrementBy

0

Default is 0. This is useful when you have a list that is not frozen and you are updating this using the autocorrect feature. Lists are not idempotent, and subsequent UPSERTs add duplicates to the list.

spark.cdm.transform.codecs

Default is empty. A comma-separated list of additional codecs to enable.

  • INT_STRING : int stored in a string.

  • DOUBLE_STRING : double stored in a string.

  • BIGINT_STRING : bigint stored in a string.

  • DECIMAL_STRING : decimal stored in a string.

  • TIMESTAMP_STRING_MILLIS : timestamp stored in a string, as Epoch milliseconds.

  • TIMESTAMP_STRING_FORMAT : timestamp stored in a string with a custom format.

Where there are multiple type pair options, such as with TIMESTAMP_STRING_*, only one can be configured at a time with the spark.cdm.transform.codecs parameter.

spark.cdm.transform.codecs.timestamp.string.format

yyyyMMddHHmmss

Configuration for CQL_TIMESTAMP_TO_STRING_FORMAT codec. Default format is yyyyMMddHHmmss; DateTimeFormatter.ofPattern(formatString)

spark.cdm.transform.codecs.timestamp.string.zone

UTC

Default is UTC. Must be in ZoneRulesProvider.getAvailableZoneIds().

Cassandra filter parameters

Cassandra filters are applied on the coordinator node. Depending on the filter, the coordinator node may need to do a lot more work than is normal, notably because Cassandra Data Migrator specifies ALLOW FILTERING.

By default, these parameters are commented out.

Property Default Notes

spark.cdm.filter.cassandra.partition.min

-9223372036854775808

Default is 0 when using RandomPartitioner and -9223372036854775808 or -2^63 otherwise. Lower partition bound of the range is inclusive.

spark.cdm.filter.cassandra.partition.max

9223372036854775807

Default is 2^127-1 when using RandomPartitioner and 9223372036854775807 or 2^63-1 otherwise. Upper partition bound of the range is inclusive.

spark.cdm.filter.cassandra.whereCondition

CQL added to the WHERE clause of SELECT statements from the origin cluster.

Java filter parameters

Java filters are applied on the client node. Data must be pulled from the origin cluster and then filtered. However, this option may have a lower impact on the production cluster than Cassandra filters. Java filters put a load onto the Cassandra Data Migrator processing node. They send more data from Cassandra. Cassandra filters put a load on the Cassandra nodes because Cassandra Data Migrator specifies ALLOW FILTERING, which could cause the coordinator node to perform a lot more work.

By default, these parameters are commented out.

Property Default Notes

spark.cdm.filter.java.token.percent

100

Between 1 and 100 percent of the token in each split that is migrated. This property is used to do a wide and random sampling of the data. The percentage value is applied to each split. Invalid percentages are treated as 100.

spark.cdm.filter.java.writetime.min

0

The lowest (inclusive) writetime values to be migrated. Using the spark.cdm.filter.java.writetime.min and spark.cdm.filter.java.writetime.max thresholds, Cassandra Data Migrator can filter records based on their writetimes. The maximum writetime of the columns configured at spark.cdm.schema.origin.column.writetime.names are compared to the .min and .max thresholds, which must be in microseconds since the epoch. If the spark.cdm.schema.origin.column.writetime.names are not specified or the thresholds are null or otherwise invalid, the filter is ignored. Note that spark.cdm.s.perfops.batchSize is ignored when this filter is in place; a value of 1 is used instead.

spark.cdm.filter.java.writetime.max

9223372036854775807

The highest (inclusive) writetime values to be migrated. The spark.cdm.schema.origin.column.writetime.names specifies the maximum timestamp of the columns. If that property is not specified or is for some reason null, the filter is ignored.

spark.cdm.filter.java.column.name

Filter rows based on matching a configured value. With spark.cdm.filter.java.column.name, specify the column name against which the spark.cdm.filter.java.column.value is compared. Must be on the column list specified at spark.cdm.schema.origin.column.names. The column value is converted to a string, trimmed of whitespace on both ends, and compared.

spark.cdm.filter.java.column.value

String value to use as comparison. The whitespace on the ends of spark.cdm.filter.java.column.value is trimmed.

Constant column feature parameters

The constant columns feature allows you to add constant columns to the target table. If used, the spark.cdm.feature.constantColumns.names, spark.cdm.feature.constantColumns.types, and spark.cdm.feature.constantColumns.values lists must all be the same length.

By default, these parameters are commented out.

Property Default Notes

spark.cdm.feature.constantColumns.names

A comma-separated list of column names, such as const1,const2.

spark.cdm.feature.constantColumns.type

A comma-separated list of column types.

spark.cdm.feature.constantColumns.values

A comma-separated list of hard-coded values. Each value should be provided as you would use on the CQLSH command line. Examples: 'abcd' for a string; 1234 for an int, and so on.

spark.cdm.feature.constantColumns.splitRegex

,

Defaults to comma, but can be any regex character that works with String.split(regex). This option is needed because some data values contain commas, such as in lists, maps, and sets.

Explode map feature parameters

The explode map feature allows you convert an origin table map into multiple target table records.

By default, these parameters are commented out.

Property Notes

spark.cdm.feature.explodeMap.origin.name

The name of the map column, such as my_map. Must be defined on spark.cdm.schema.origin.column.names, and the corresponding type on spark.cdm.schema.origin.column.types must be a map.

spark.cdm.feature.explodeMap.origin.name.key

The name of the column on the target table that holds the map key, such as my_map_key. This key must be present on the target primary key spark.cdm.schema.target.column.id.names.

spark.cdm.feature.explodeMap.origin.value

The name of the column on the target table that holds the map value, such as my_map_value.

Guardrail feature parameter

The guardrail feature manages records that exceed guardrail checks. The guardrail job generates a report; other jobs skip records that exceed the guardrail limit.

By default, these parameters are commented out.

Property Default Notes

spark.cdm.feature.guardrail.colSizeInKB

0

The 0 default means the guardrail check is not done. If set, table records with one or more fields that exceed the column size in kB are flagged. Note this is kB which is base 10, not kiB which is base 2.

TLS (SSL) connection parameters

These are TLS (SSL) connection parameters, if configured, for the origin and target clusters. Note that a secure connect bundle (SCB) embeds these details.

By default, these parameters are commented out.

Property Default Notes

spark.cdm.connect.origin.tls.enabled

false

If TLS is used, set to true.

spark.cdm.connect.origin.tls.trustStore.path

Path to the Java truststore file.

spark.cdm.connect.origin.tls.trustStore.password

Password needed to open the truststore.

spark.cdm.connect.origin.tls.trustStore.type

JKS

spark.cdm.connect.origin.tls.keyStore.path

Path to the Java keystore file.

spark.cdm.connect.origin.tls.keyStore.password

Password needed to open the keystore.

spark.cdm.connect.origin.tls.enabledAlgorithms

TLS_RSA_WITH_AES_128_CBC_SHA,TLS_RSA_WITH_AES_256_CBC_SHA

spark.cdm.connect.target.tls.enabled

false

If TLS is used, set to true.

spark.cdm.connect.target.tls.trustStore.path

Path to the Java truststore file.

spark.cdm.connect.target.tls.trustStore.password

Password needed to open the truststore.

spark.cdm.connect.target.tls.trustStore.type

JKS

spark.cdm.connect.target.tls.keyStore.path

Path to the Java keystore file.

spark.cdm.connect.target.tls.keyStore.password

Password needed to open the keystore.

spark.cdm.connect.target.tls.enabledAlgorithms

TLS_RSA_WITH_AES_128_CBC_SHA,TLS_RSA_WITH_AES_256_CBC_SHA

Was this helpful?

Give Feedback

How can we improve the documentation?

© 2024 DataStax | Privacy policy | Terms of use

Apache, Apache Cassandra, Cassandra, Apache Tomcat, Tomcat, Apache Lucene, Apache Solr, Apache Hadoop, Hadoop, Apache Pulsar, Pulsar, Apache Spark, Spark, Apache TinkerPop, TinkerPop, Apache Kafka and Kafka are either registered trademarks or trademarks of the Apache Software Foundation or its subsidiaries in Canada, the United States and/or other countries. Kubernetes is the registered trademark of the Linux Foundation.

General Inquiries: +1 (650) 389-6000, info@datastax.com