Map to User Defined Types (UDTs)

The DataStax Apache Pulsar™ connector can process JSON, Avro, and complex types directly into user-defined type columns.

The Pulsar messages' embedded field names and the UDT’s field names must match.

UDT mapping example

In the following example, the key is a basic string and the value is regular JSON:

key value

APPLE

{
  "symbol":"APPL",
  "value":208,
  "exchange":"NASDAQ",
  "industry":"TECH",
  "ts":"2018-11-26T19:26:27.483"
}

EXXON MOBIL

{
  "symbol":"M",
  "value":80,
  "exchange":"NYSE",
  "industry":"ENERGY",
  "ts":"2018-11-26T19:26:27.483"
}

GENERAL MOTORS

{
  "symbol":"GM",
  "value":38,
  "exchange":"NYSE",
  "industry":"AUTO",
  "ts":"2018-11-26T19:26:27.483"
}

AT&T

{
  "symbol":"AT&T",
  "value":33,
  "exchange":"NYSE",
  "industry":"TELECOM",
  "ts":"2018-11-26T19:26:27.483"
}

FORD MOTORS

{
  "symbol":"F",
  "value":10,
  "exchange":"NYSE",
  "industry":"AUTO",
  "ts":"2018-11-26T19:26:27.483"
}
  1. Create a keyspace named stocks_keyspace.

  2. Create a UDT named stocks_type with the following definition:

    CREATE TYPE stocks_keyspace.stocks_type (
        symbol text,
        ts timestamp,
        exchange text,
        industry text,
        value double);
  3. Create a table named stocks_udt_table that has a column for the primitive string field name and a column that uses the stocks_type UDT:

    CREATE TABLE stocks_keyspace.stocks_table (
        name text primary key,
        stocks FROZEN<stocks_type>);
  4. In the connector configuration, define the mapping as follows:

    topic:
      stocks_topic:
        stocks_keyspace:
          stocks_udt_table:
            mapping: 'name=key,stocks=value'

Map complex UDTs

The connector supports mapping complex user defined types of map, set, and list.

  1. Using the same keyspace from the previous example, create a UDT named stocks_complex_type with the following definition:

    CREATE TYPE stocks_keyspace.stocks_complex_type (
        symbol text,
        ts timestamp,
        exchange text,
        industry text,
        value double,
        trailing_five_days_value frozen<list<double>>,
        similar_symbols frozen<set<text>>,
        stats frozen<map<text, double>>);

    For this example, the UDT stocks_complex_type has the following fields:

    • trailing_five_days_value frozen<list<double>>: Contains a list of the value of the stock in the last five days, for example [10.0, 11.0, 12.0, 23.0, 10.0]. Values in a list don’t need to be unique.

    • similar_symbols frozen<set<text>>: Contains a set of symbols in the same industry, or performing similarly, for example ["M", "GM", "F"]. Values in a set must be unique.

    • stats frozen<map<text, double>>: Contains a map of key-value pairs, for example {"open": 11.0, "high": 20.0, "low": 9.0}.

  2. Create a table named stocks_complex_udt_table that has a column for the primitive string field name and a column that uses the stocks_complex_type UDT:

    CREATE TABLE stocks_keyspace.stocks_complex_udt_table (
        name text primary key,
        stocks_complex FROZEN<stocks_complex_type>);
  3. In the connector configuration, define the mapping as follows:

    topic:
      stocks_topic:
        stocks_keyspace:
          stocks_complex_udt_table:
            mapping: 'name=key,stocks_complex=value'

Selectively update maps and UDTs

If your mapping includes maps or UDTs, you can use CQL queries in your connector configuration to selectively update maps and UDTs based on the existence of values in the Pulsar fields.

When a new record arrives, the connector sets only the non-null fields in the message. It won’t override UDT fields that have a null value or aren’t present in the Pulsar record. This configuration requires that nullToUnset is true.

This configuration helps minimize tombstones in the database. It uses the CQL UPDATE command to only update the fields present in the Pulsar record. You must use the udtColNotFrozen keyword in the CQL query, and the type definition cannot be frozen.

  1. Create the UDT and ensure that it isn’t frozen. For example:

    CREATE TYPE IF NOT EXISTS myudt (udtmem1 int, udtmem2 text);
  2. In your connector configuration, set the nullToUnset parameter to true:

    topic:
      topic_name:
        keyspace_name:
          table_name:
            nullToUnset: true
  3. Ensure all UDT fields and bound variables you plan to use in your query are set in the mapping:

    topic:
      topic_name:
        keyspace_name:
          table_name:
            nullToUnset: true
            mapping: bigintcol=key, udtcol1=value.udtmem1, udtcol2=value.udtmem2
  4. Once you will have the UDT and mapping defined, add the query parameter with your UPDATE command using the udtColNotFrozen keyword:

    topic:
      topic_name:
        keyspace_name:
          table_name:
            nullToUnset: true
            mapping: bigintcol=key, udtcol1=value.udtmem1, udtcol2=value.udtmem2
            query: 'UPDATE ks.table set udtColNotFrozen.udtmem1=:udtcol1, udtColNotFrozen.udtmem2=:udtcol2 where bigintCol=:bigintcol'

Was this helpful?

Give Feedback

How can we improve the documentation?

© 2025 DataStax, an IBM Company | Privacy policy | Terms of use | Manage Privacy Choices

Apache, Apache Cassandra, Cassandra, Apache Tomcat, Tomcat, Apache Lucene, Apache Solr, Apache Hadoop, Hadoop, Apache Pulsar, Pulsar, Apache Spark, Spark, Apache TinkerPop, TinkerPop, Apache Kafka and Kafka are either registered trademarks or trademarks of the Apache Software Foundation or its subsidiaries in Canada, the United States and/or other countries. Kubernetes is the registered trademark of the Linux Foundation.

General Inquiries: +1 (650) 389-6000, info@datastax.com