GPT schema translator

The GPT schema translator is currently available for the Astra DB Sink Connector, with support for additional connectors to come.

Overview

Systems within streaming pipelines typically represent schema and data types differently. This requires schemas within a pipeline to be mapped to each other, a process which is complicated, tedious, and error-prone. For example, to send data from a CDC-enabled Cassandra (C*) table to a Pulsar topic, you must define a schema mapping between the C* table and the Pulsar topic, represented below.

  • Cassandra table schema

  • Pulsar JSON schema

{
  "primaryKey": {
    "partitionKey": [
      "id"
    ]
  },
  "columnDefinitions": [
    {
      "name": "id",
      "typeDefinition": "uuid",
      "static": false
    },
    {
      "name": "file1",
      "typeDefinition": "text",
      "static": false
    },
    {
      "name": "file2",
      "typeDefinition": "text",
      "static": false
    },
    {
      "name": "file3",
      "typeDefinition": "text",
      "static": false
    }
  ]
}
{
  "type": "record",
  "name": "sample.schema",
  "namespace": "default",
  "fields": [
    {
      "name": "file1",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "file2",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "file3",
      "type": [
        "string",
        "null"
      ],
      "default": "dfdf"
    }
  ]
}

As you add more complex data types like maps and nested data structures, schema management becomes more difficult. Schema management is tedious work that requires understanding rules across various domains and translating between them, but it is trivial work for the GPT-4 AI model.

The GPT schema translator uses generative AI to automatically generate schema mappings between Astra Streaming (Pulsar topics) and the Astra DB sink connector (Cassandra tables).

Prerequisites

  • An Astra Streaming cluster with a topic created. For more information, see the Getting Started documentation.

  • An Astra database with a keyspace and table created. For more, see the Astra DB documentation.

Usage

To use the GPT schema translator, start by creating an AstraDB sink connector as you usually would. If you’re unsure about this, see the Astra DB Sink Connector documentation.

In this example, we’ll continue with the two schemas from above: one JSON schema for the Pulsar topic, and one CQL schema for the Astra DB table. The GPT schema translator will generate a mapping between the two schemas, which can be used by the Astra DB sink connector to write data from the Pulsar topic to the Astra DB table.

Schema mapping

To generate a schema mapping, click the "Generate Mapping With GPT" button. This will generate a schema mapping.

  • Cassandra table schema

  • Pulsar JSON schema

  • Generated schema mapping

{
  "primaryKey": {
    "partitionKey": [
      "id"
    ]
  },
  "columnDefinitions": [
    {
      "name": "id",
      "typeDefinition": "uuid",
      "static": false
    },
    {
      "name": "file1",
      "typeDefinition": "text",
      "static": false
    },
    {
      "name": "file2",
      "typeDefinition": "text",
      "static": false
    },
    {
      "name": "file3",
      "typeDefinition": "text",
      "static": false
    }
  ]
}
{
  "type": "record",
  "name": "sample.schema",
  "namespace": "default",
  "fields": [
    {
      "name": "file1",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "file2",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "file3",
      "type": [
        "string",
        "null"
      ],
      "default": "dfdf"
    }
  ]
}
id=key, file1=value.file1, file2=value.file2, file3=value.file3

Great! Now, once your connector is created, messages will flow smoothly between the two different schemas. Check in your Astra Portal logs to see the data flowing into your table with no pesky error messages.

Pulsar topic with an AVRO schema to Cassandra table

This example will produce a mapping between a Pulsar Topic in AVRO schema and a Cassandra table schema. Avro schema definitions are JSON records, so this example isn’t radically different from the first, but this time, we’ll use the DataGenerator source connector to generate data for the Pulsar topic, the Astra DB sink connector to write data to the Cassandra table, and the GPT schema translator to generate a schema mapping between the two as the messages are processed.

The DataGenerator source connector will generate data for the Pulsar topic using the following schema:

DataGenerator source connector schema
"pulsar_topic_schema": {
        "person": {
            "type": "record",
            "name": "Person",
            "namespace": "org.apache.pulsar.io.datagenerator",
            "fields": [
              {
                "name": "address",
                "type": [
                  "null",
                  {
                    "type": "record",
                    "name": "Address",
                    "namespace": "org.apache.pulsar.io.datagenerator.Person",
                    "fields": [
                      {
                        "name": "apartmentNumber",
                        "type": [
                          "null",
                          "string"
                        ],
                        "default": null
                      },
                      {
                        "name": "city",
                        "type": [
                          "null",
                          "string"
                        ],
                        "default": null
                      },
                      {
                        "name": "postalCode",
                        "type": [
                          "null",
                          "string"
                        ],
                        "default": null
                      },
                      {
                        "name": "street",
                        "type": [
                          "null",
                          "string"
                        ],
                        "default": null
                      },
                      {
                        "name": "streetNumber",
                        "type": [
                          "null",
                          "string"
                        ],
                        "default": null
                      }
                    ]
                  }
                ],
                "default": null
              },
              {
                "name": "age",
                "type": [
                  "null",
                  "int"
                ],
                "default": null
              },
              {
                "name": "company",
                "type": [
                  "null",
                  {
                    "type": "record",
                    "name": "Company",
                    "namespace": "org.apache.pulsar.io.datagenerator.Person",
                    "fields": [
                      {
                        "name": "domain",
                        "type": [
                          "null",
                          "string"
                        ],
                        "default": null
                      },
                      {
                        "name": "email",
                        "type": [
                          "null",
                          "string"
                        ],
                        "default": null
                      },
                      {
                        "name": "name",
                        "type": [
                          "null",
                          "string"
                        ],
                        "default": null
                      },
                      {
                        "name": "vatIdentificationNumber",
                        "type": [
                          "null",
                          "string"
                        ],
                        "default": null
                      }
                    ]
                  }
                ],
                "default": null
              },
              {
                "name": "companyEmail",
                "type": [
                  "null",
                  "string"
                ],
                "default": null
              },
              {
                "name": "dateOfBirth",
                "type": {
                  "type": "long",
                  "logicalType": "timestamp-millis"
                }
              },
              {
                "name": "email",
                "type": [
                  "null",
                  "string"
                ],
                "default": null
              },
              {
                "name": "firstName",
                "type": [
                  "null",
                  "string"
                ],
                "default": null
              },
              {
                "name": "lastName",
                "type": [
                  "null",
                  "string"
                ],
                "default": null
              },
              {
                "name": "middleName",
                "type": [
                  "null",
                  "string"
                ],
                "default": null
              },
              {
                "name": "nationalIdentificationNumber",
                "type": [
                  "null",
                  "string"
                ],
                "default": null
              },
              {
                "name": "nationalIdentityCardNumber",
                "type": [
                  "null",
                  "string"
                ],
                "default": null
              },
              {
                "name": "passportNumber",
                "type": [
                  "null",
                  "string"
                ],
                "default": null
              },
              {
                "name": "password",
                "type": [
                  "null",
                  "string"
                ],
                "default": null
              },
              {
                "name": "sex",
                "type": [
                  "null",
                  {
                    "type": "enum",
                    "name": "Sex",
                    "namespace": "org.apache.pulsar.io.datagenerator.Person",
                    "symbols": [
                      "MALE",
                      "FEMALE"
                    ]
                  }
                ],
                "default": null
              },
              {
                "name": "telephoneNumber",
                "type": [
                  "null",
                  "string"
                ],
                "default": null
              },
              {
                "name": "username",
                "type": [
                  "null",
                  "string"
                ],
                "default": null
              }
            ]
          },

The Cassandra table for the AstraDB sink has the following schema:

CQL schema
"cassandra_table_schemas": {
        "person": {
            "primaryKey": {
              "partitionKey": [
                "passportnumber"
              ]
            },
            "columnDefinitions": [
              {
                "name": "passportnumber",
                "typeDefinition": "text",
                "static": false
              },
              {
                "name": "age",
                "typeDefinition": "varint",
                "static": false
              },
              {
                "name": "firstname",
                "typeDefinition": "text",
                "static": false
              },
              {
                "name": "lastname",
                "typeDefinition": "text",
                "static": false
              }
            ]
          },

When a topic schema is available to the GPT schema translator, the button prompt will change to "Generate Mapping". generate a mapping between the two schemas.

Schema mapping

GPT examines the schemas and generates a mapping. The mapping is displayed in the UI, and can be copied to the clipboard.

passportnumber=value.passportNumber, age=value.age, firstname=value.firstName, lastname=value.lastName

Notice that the firstname value becomes firstName because the Pulsar topic JSON schema supersedes the Cassandra table schema.

No schema on Pulsar topic

Even with no schema declared in the Pulsar topic, the schema translator will still provide a schema mapping that mirrors the values of your Cassandra table schema, without using GPT.
For example, starting with this schema on a Cassandra table:

{
  "primaryKey": {
    "partitionKey": [
      "passportnumber"
    ]
  },
  "columnDefinitions": [
    {
      "name": "passportnumber",
      "typeDefinition": "text",
      "static": false
    },
    {
      "name": "age",
      "typeDefinition": "varint",
      "static": false
    },
    {
      "name": "firstname",
      "typeDefinition": "text",
      "static": false
    },
    {
      "name": "lastname",
      "typeDefinition": "text",
      "static": false
    }
  ]
}

Since you have an available schema in your Cassandra table, the button prompt will change to "Generate Mapping" to let you know you can create a topic schema from the Cassandra table schema. Click on this button to generate a schema for the Pulsar topic from your Cassandra table schema.

passportnumber=value.passportnumber, age=value.age, firstname=value.firstname, lastname=value.lastname

Was this helpful?

Give Feedback

How can we improve the documentation?

© 2024 DataStax | Privacy policy | Terms of use

Apache, Apache Cassandra, Cassandra, Apache Tomcat, Tomcat, Apache Lucene, Apache Solr, Apache Hadoop, Hadoop, Apache Pulsar, Pulsar, Apache Spark, Spark, Apache TinkerPop, TinkerPop, Apache Kafka and Kafka are either registered trademarks or trademarks of the Apache Software Foundation or its subsidiaries in Canada, the United States and/or other countries. Kubernetes is the registered trademark of the Linux Foundation.

General Inquiries: +1 (650) 389-6000, info@datastax.com