GPT schema translator
Systems in streaming pipelines can use different representations for schema and data types.
Schema mapping is required to align congruent types in a pipeline. For example, to send data from a CDC-enabled Apache Cassandra® table to a Pulsar topic, you must define schema mapping between the Cassandra table and the Pulsar topic. If the schema includes more complex data types, like maps and nested data structures, schema management becomes more difficult.
Schema management is a complicated, tedious, and error-prone tasks that requires you to understand and translate multiple sets of schema rules.
Instead, you can use the GPT schema translator to save time and reduce schema mapping toil.
This tool uses generative AI,based on the GPT-4
model, to automatically generate schema mappings between Astra Streaming (Pulsar topics) and the Astra DB sink connector (Cassandra tables).
Prerequisites
-
An Astra Streaming tenant with a namespace and topic.
-
An Astra DB database with a keyspace and table.
-
An Astra DB sink connector. The GPT schema translator is available for the Astra DB sink connector only.
JSON-to-CQL mapping example
This example uses a JSON schema for a Pulsar topic, and a CQL schema for an Astra DB table. The GPT schema translator generates a mapping between the two schemas that the Astra DB sink connector can use to write data from the Pulsar topic to the Astra DB table.
-
In the Astra Portal navigation menu, click Streaming, and then select your tenant.
-
On the Namespaces and Topics tab, locate the topic that you want to map to a table, and then click Create mapping.
-
Select your database’s keyspace, and then enter the name of the table that you want to map to.
-
Click Generate Mapping.
If this button isn’t available, the GPT schema translator doesn’t have an available schema for the topic or schema mapping isn’t available for the selected topic and table.
Cassandra-to-Pulsar schema mapping example
Cassandra table schema Pulsar JSON schema Generated mapping { "primaryKey": { "partitionKey": [ "id" ] }, "columnDefinitions": [ { "name": "id", "typeDefinition": "uuid", "static": false }, { "name": "file1", "typeDefinition": "text", "static": false }, { "name": "file2", "typeDefinition": "text", "static": false }, { "name": "file3", "typeDefinition": "text", "static": false } ] }
{ "type": "record", "name": "sample.schema", "namespace": "default", "fields": [ { "name": "file1", "type": [ "null", "string" ], "default": null }, { "name": "file2", "type": [ "null", "string" ], "default": null }, { "name": "file3", "type": [ "string", "null" ], "default": "dfdf" } ] }
id=key, file1=value.file1, file2=value.file2, file3=value.file3
-
Save the mapping configuration.
-
After configuring your Astra DB sink connector for the given topic and table, messages should flow between the two schemas without error. You can check the Astra Portal logs to confirm that the data is flowing into your table without errors.
AVRO-to-CQL mapping example
This example demonstrates how you can generates schema mapping in real time.
-
The DataGenerator source connector generates data for a Pulsar topic with an AVRO schema.
AVRO schema example
"pulsar_topic_schema": { "person": { "type": "record", "name": "Person", "namespace": "org.apache.pulsar.io.datagenerator", "fields": [ { "name": "address", "type": [ "null", { "type": "record", "name": "Address", "namespace": "org.apache.pulsar.io.datagenerator.Person", "fields": [ { "name": "apartmentNumber", "type": [ "null", "string" ], "default": null }, { "name": "city", "type": [ "null", "string" ], "default": null }, { "name": "postalCode", "type": [ "null", "string" ], "default": null }, { "name": "street", "type": [ "null", "string" ], "default": null }, { "name": "streetNumber", "type": [ "null", "string" ], "default": null } ] } ], "default": null }, { "name": "age", "type": [ "null", "int" ], "default": null }, { "name": "company", "type": [ "null", { "type": "record", "name": "Company", "namespace": "org.apache.pulsar.io.datagenerator.Person", "fields": [ { "name": "domain", "type": [ "null", "string" ], "default": null }, { "name": "email", "type": [ "null", "string" ], "default": null }, { "name": "name", "type": [ "null", "string" ], "default": null }, { "name": "vatIdentificationNumber", "type": [ "null", "string" ], "default": null } ] } ], "default": null }, { "name": "companyEmail", "type": [ "null", "string" ], "default": null }, { "name": "dateOfBirth", "type": { "type": "long", "logicalType": "timestamp-millis" } }, { "name": "email", "type": [ "null", "string" ], "default": null }, { "name": "firstName", "type": [ "null", "string" ], "default": null }, { "name": "lastName", "type": [ "null", "string" ], "default": null }, { "name": "middleName", "type": [ "null", "string" ], "default": null }, { "name": "nationalIdentificationNumber", "type": [ "null", "string" ], "default": null }, { "name": "nationalIdentityCardNumber", "type": [ "null", "string" ], "default": null }, { "name": "passportNumber", "type": [ "null", "string" ], "default": null }, { "name": "password", "type": [ "null", "string" ], "default": null }, { "name": "sex", "type": [ "null", { "type": "enum", "name": "Sex", "namespace": "org.apache.pulsar.io.datagenerator.Person", "symbols": [ "MALE", "FEMALE" ] } ], "default": null }, { "name": "telephoneNumber", "type": [ "null", "string" ], "default": null }, { "name": "username", "type": [ "null", "string" ], "default": null } ] }, }
-
The Astra DB sink connector writes data to the Cassandra table with a CQL schema.
CQL schema example
"cassandra_table_schemas": { "person": { "primaryKey": { "partitionKey": [ "passportnumber" ] }, "columnDefinitions": [ { "name": "passportnumber", "typeDefinition": "text", "static": false }, { "name": "age", "typeDefinition": "varint", "static": false }, { "name": "firstname", "typeDefinition": "text", "static": false }, { "name": "lastname", "typeDefinition": "text", "static": false } ] }, }
-
In the Astra Portal, create a mapping for the tenant. When a topic schema is available to the GPT schema translator, click Generate Mapping.
The GPT schema translator generates an AVRO-to-CQL schema mapping while messages are processed.
passportnumber=value.passportNumber, age=value.age, firstname=value.firstName, lastname=value.lastName
Notice that the
firstname
value becamefirstName
because the Pulsar topic AVRO schema superseded the Cassandra table schema.
No schema on Pulsar topic
If you don’t declare a schema in the Pulsar topic, the schema translator can generate a default schema mapping based on the values of your Cassandra table schema, without using GPT.
When you create the mapping in the Astra Portal, you can click Generate Mapping to create a generic Pulsar topic schema based on your Cassandra table schema. If schema mapping isn’t possible for the selected table and topic, the Generate Mapping button isn’t available.
For example, assume you have the following Cassandra table schema:
{
"primaryKey": {
"partitionKey": [
"passportnumber"
]
},
"columnDefinitions": [
{
"name": "passportnumber",
"typeDefinition": "text",
"static": false
},
{
"name": "age",
"typeDefinition": "varint",
"static": false
},
{
"name": "firstname",
"typeDefinition": "text",
"static": false
},
{
"name": "lastname",
"typeDefinition": "text",
"static": false
}
]
}
The schema translator would generate the following Pulsar JSON schema mapping based on the given Cassandra table schema:
passportnumber=value.passportnumber, age=value.age, firstname=value.firstname, lastname=value.lastname