Cassandra table schema evolution with CDC
This article is a continuation of the CDC with Cassandra and Pulsar article. Please read that article first to understand the fundamentals of what resources are being used.
The message schema is of particular importance in completing the CDC pattern. Initially, it is set to match the Cassandra table’s schema as closely as possible, but some data types are not known in Pulsar (or more accurately, not known in Avro). To overcome this, there are adaptations performed when the Cassandra Source Connector builds the Pulsar message. Some types are not compatible and not able to be adapted. In this case, those columns of data are dropped while creating the Pulsar message.
To better understand how exactly the CDC agent constructs the event message, here is the pseudo code of how the schema is created:
org.apache.pulsar.common.schema.SchemaType.AVRO GenericRecord = all key fields in the Cassandra table org.apache.cassandra.schema TableMetadata = convert a log entry to a mutation instance Schema<KeyValue<byte, MutationValue>> keyValueSchema = Schema.KeyValue( (convert GenericRecord to byte), (set TableMetadata to SchemaType.AVRO), KeyValueEncodingType.SEPARATED );
Notice the two types used in KeyValue. The byte array is an Avro-encoded record that documents the table’s primary key(s). The MutationValue is an extended Avro record that has direction on what changed and how to get its specifics.
CDC sets the initial topic schema on the first change it detects. Once the initial topic schema has been set, a “happy path” has been established to create change data events in Pulsar.
Inevitably, table design will change. Columns are added, updated, or even removed. When these changes occur, the resources that are part of the CDC flow need to be adapted to keep this happy path of event data. This can become quite a complex set of decisions and as such, there are specific changes a CDC flow can tolerate before the flow needs to be recreated entirely.
Here is a brief summary of how the data message schema is created:
receive GenericRecord event message of type KeyValue
use a Cassandra client to query for row data
convert data as GenericRecord of type KeyValue and return
the connector interface produces a new message to the destination topic
This is the easiest of scenarios for table design change. Assuming the new column’s data type is compatible with the source connector, a new schema will replace the existing and message compatibility will be kept. Note that because the schema auto-update compatibility strategy is set to BACKWARD_TRANSITIVE, the new column must be optional, which is the default of any non-primary-key column.
An example of adding a column:
ALTER TABLE [keyspace_name.] table_name ADD my-super-awesome-column text;
Altering a table column includes renaming a column or changing a column’s type. Assuming the new column’s data type is compatible with the source connector, a new schema will replace the existing schema and message compatibility will be kept. Once a table has been created, a table’s primary key(s) can not be modified. This fits well with the CDC pattern.
While technically updating columns is possible when CDC is enabled, it is not recommended. Instead, changes to a Cassandra table should be additive only. (If you are familiar with data migrations, this concept is the same). To change the name or type of table column, add a new column. The resulting event messages will have a reference to both columns, and you can handle this migration downstream.
Note that this recommendation assumes a schema compatibility strategy of BACKWARD_TRANSITIVE. If you are using a different schema compatibility strategy, table updates will be handled differently.
Removing a table column is a simple command in CQL. The resulting CDC data messages will simply not include that data anymore.
An example of removing a column:
ALTER TABLE [keyspace_name.] table_name DROP my-super-awesome-column;