GPT schema translator

Systems in streaming pipelines can use different representations for schema and data types.

Schema mapping is required to align congruent types in a pipeline. For example, to send data from a CDC-enabled Apache Cassandra® table to a Pulsar topic, you must define schema mapping between the Cassandra table and the Pulsar topic. If the schema includes more complex data types, like maps and nested data structures, schema management becomes more difficult.

Schema management is a complicated, tedious, and error-prone tasks that requires you to understand and translate multiple sets of schema rules.

Instead, you can use the GPT schema translator to save time and reduce schema mapping toil. This tool uses generative AI,based on the GPT-4 model, to automatically generate schema mappings between Astra Streaming (Pulsar topics) and the Astra DB sink connector (Cassandra tables).

Prerequisites

  • An Astra Streaming tenant with a namespace and topic.

  • An Astra DB database with a keyspace and table.

  • An Astra DB sink connector. The GPT schema translator is available for the Astra DB sink connector only.

JSON-to-CQL mapping example

This example uses a JSON schema for a Pulsar topic, and a CQL schema for an Astra DB table. The GPT schema translator generates a mapping between the two schemas that the Astra DB sink connector can use to write data from the Pulsar topic to the Astra DB table.

  1. In the Astra Portal navigation menu, click Streaming, and then select your tenant.

  2. On the Namespaces and Topics tab, locate the topic that you want to map to a table, and then click Create mapping.

  3. Select your database’s keyspace, and then enter the name of the table that you want to map to.

  4. Click Generate Mapping.

    If this button isn’t available, the GPT schema translator doesn’t have an available schema for the topic or schema mapping isn’t available for the selected topic and table.

    Schema mapping
    Cassandra-to-Pulsar schema mapping example
    Cassandra table schema Pulsar JSON schema Generated mapping
    {
      "primaryKey": {
        "partitionKey": [
          "id"
        ]
      },
      "columnDefinitions": [
        {
          "name": "id",
          "typeDefinition": "uuid",
          "static": false
        },
        {
          "name": "file1",
          "typeDefinition": "text",
          "static": false
        },
        {
          "name": "file2",
          "typeDefinition": "text",
          "static": false
        },
        {
          "name": "file3",
          "typeDefinition": "text",
          "static": false
        }
      ]
    }
    {
      "type": "record",
      "name": "sample.schema",
      "namespace": "default",
      "fields": [
        {
          "name": "file1",
          "type": [
            "null",
            "string"
          ],
          "default": null
        },
        {
          "name": "file2",
          "type": [
            "null",
            "string"
          ],
          "default": null
        },
        {
          "name": "file3",
          "type": [
            "string",
            "null"
          ],
          "default": "dfdf"
        }
      ]
    }
    id=key, file1=value.file1, file2=value.file2, file3=value.file3
  5. Save the mapping configuration.

  6. After configuring your Astra DB sink connector for the given topic and table, messages should flow between the two schemas without error. You can check the Astra Portal logs to confirm that the data is flowing into your table without errors.

AVRO-to-CQL mapping example

This example demonstrates how you can generates schema mapping in real time.

  1. The DataGenerator source connector generates data for a Pulsar topic with an AVRO schema.

    AVRO schema example
    "pulsar_topic_schema": {
            "person": {
                "type": "record",
                "name": "Person",
                "namespace": "org.apache.pulsar.io.datagenerator",
                "fields": [
                  {
                    "name": "address",
                    "type": [
                      "null",
                      {
                        "type": "record",
                        "name": "Address",
                        "namespace": "org.apache.pulsar.io.datagenerator.Person",
                        "fields": [
                          {
                            "name": "apartmentNumber",
                            "type": [
                              "null",
                              "string"
                            ],
                            "default": null
                          },
                          {
                            "name": "city",
                            "type": [
                              "null",
                              "string"
                            ],
                            "default": null
                          },
                          {
                            "name": "postalCode",
                            "type": [
                              "null",
                              "string"
                            ],
                            "default": null
                          },
                          {
                            "name": "street",
                            "type": [
                              "null",
                              "string"
                            ],
                            "default": null
                          },
                          {
                            "name": "streetNumber",
                            "type": [
                              "null",
                              "string"
                            ],
                            "default": null
                          }
                        ]
                      }
                    ],
                    "default": null
                  },
                  {
                    "name": "age",
                    "type": [
                      "null",
                      "int"
                    ],
                    "default": null
                  },
                  {
                    "name": "company",
                    "type": [
                      "null",
                      {
                        "type": "record",
                        "name": "Company",
                        "namespace": "org.apache.pulsar.io.datagenerator.Person",
                        "fields": [
                          {
                            "name": "domain",
                            "type": [
                              "null",
                              "string"
                            ],
                            "default": null
                          },
                          {
                            "name": "email",
                            "type": [
                              "null",
                              "string"
                            ],
                            "default": null
                          },
                          {
                            "name": "name",
                            "type": [
                              "null",
                              "string"
                            ],
                            "default": null
                          },
                          {
                            "name": "vatIdentificationNumber",
                            "type": [
                              "null",
                              "string"
                            ],
                            "default": null
                          }
                        ]
                      }
                    ],
                    "default": null
                  },
                  {
                    "name": "companyEmail",
                    "type": [
                      "null",
                      "string"
                    ],
                    "default": null
                  },
                  {
                    "name": "dateOfBirth",
                    "type": {
                      "type": "long",
                      "logicalType": "timestamp-millis"
                    }
                  },
                  {
                    "name": "email",
                    "type": [
                      "null",
                      "string"
                    ],
                    "default": null
                  },
                  {
                    "name": "firstName",
                    "type": [
                      "null",
                      "string"
                    ],
                    "default": null
                  },
                  {
                    "name": "lastName",
                    "type": [
                      "null",
                      "string"
                    ],
                    "default": null
                  },
                  {
                    "name": "middleName",
                    "type": [
                      "null",
                      "string"
                    ],
                    "default": null
                  },
                  {
                    "name": "nationalIdentificationNumber",
                    "type": [
                      "null",
                      "string"
                    ],
                    "default": null
                  },
                  {
                    "name": "nationalIdentityCardNumber",
                    "type": [
                      "null",
                      "string"
                    ],
                    "default": null
                  },
                  {
                    "name": "passportNumber",
                    "type": [
                      "null",
                      "string"
                    ],
                    "default": null
                  },
                  {
                    "name": "password",
                    "type": [
                      "null",
                      "string"
                    ],
                    "default": null
                  },
                  {
                    "name": "sex",
                    "type": [
                      "null",
                      {
                        "type": "enum",
                        "name": "Sex",
                        "namespace": "org.apache.pulsar.io.datagenerator.Person",
                        "symbols": [
                          "MALE",
                          "FEMALE"
                        ]
                      }
                    ],
                    "default": null
                  },
                  {
                    "name": "telephoneNumber",
                    "type": [
                      "null",
                      "string"
                    ],
                    "default": null
                  },
                  {
                    "name": "username",
                    "type": [
                      "null",
                      "string"
                    ],
                    "default": null
                  }
                ]
              },
    }
  2. The Astra DB sink connector writes data to the Cassandra table with a CQL schema.

    CQL schema example
    "cassandra_table_schemas": {
            "person": {
                "primaryKey": {
                  "partitionKey": [
                    "passportnumber"
                  ]
                },
                "columnDefinitions": [
                  {
                    "name": "passportnumber",
                    "typeDefinition": "text",
                    "static": false
                  },
                  {
                    "name": "age",
                    "typeDefinition": "varint",
                    "static": false
                  },
                  {
                    "name": "firstname",
                    "typeDefinition": "text",
                    "static": false
                  },
                  {
                    "name": "lastname",
                    "typeDefinition": "text",
                    "static": false
                  }
                ]
              },
    }
  3. In the Astra Portal, create a mapping for the tenant. When a topic schema is available to the GPT schema translator, click Generate Mapping.

    Schema mapping

    The GPT schema translator generates an AVRO-to-CQL schema mapping while messages are processed.

    passportnumber=value.passportNumber, age=value.age, firstname=value.firstName, lastname=value.lastName

    Notice that the firstname value became firstName because the Pulsar topic AVRO schema superseded the Cassandra table schema.

No schema on Pulsar topic

If you don’t declare a schema in the Pulsar topic, the schema translator can generate a default schema mapping based on the values of your Cassandra table schema, without using GPT.

When you create the mapping in the Astra Portal, you can click Generate Mapping to create a generic Pulsar topic schema based on your Cassandra table schema. If schema mapping isn’t possible for the selected table and topic, the Generate Mapping button isn’t available.

For example, assume you have the following Cassandra table schema:

{
  "primaryKey": {
    "partitionKey": [
      "passportnumber"
    ]
  },
  "columnDefinitions": [
    {
      "name": "passportnumber",
      "typeDefinition": "text",
      "static": false
    },
    {
      "name": "age",
      "typeDefinition": "varint",
      "static": false
    },
    {
      "name": "firstname",
      "typeDefinition": "text",
      "static": false
    },
    {
      "name": "lastname",
      "typeDefinition": "text",
      "static": false
    }
  ]
}

The schema translator would generate the following Pulsar JSON schema mapping based on the given Cassandra table schema:

passportnumber=value.passportnumber, age=value.age, firstname=value.firstname, lastname=value.lastname

Was this helpful?

Give Feedback

How can we improve the documentation?

© 2024 DataStax | Privacy policy | Terms of use

Apache, Apache Cassandra, Cassandra, Apache Tomcat, Tomcat, Apache Lucene, Apache Solr, Apache Hadoop, Hadoop, Apache Pulsar, Pulsar, Apache Spark, Spark, Apache TinkerPop, TinkerPop, Apache Kafka and Kafka are either registered trademarks or trademarks of the Apache Software Foundation or its subsidiaries in Canada, the United States and/or other countries. Kubernetes is the registered trademark of the Linux Foundation.

General Inquiries: +1 (650) 389-6000, info@datastax.com