Map to User Defined Types (UDTs)
The DataStax Apache Pulsar™ connector can process JSON, Avro, and complex types directly into user-defined type columns.
The Pulsar messages' embedded field names and the UDT’s field names must match.
UDT mapping example
In the following example, the key is a basic string and the value is regular JSON:
key | value |
---|---|
|
|
|
|
|
|
|
|
|
|
-
Create a keyspace named
stocks_keyspace
. -
Create a UDT named
stocks_type
with the following definition:CREATE TYPE stocks_keyspace.stocks_type ( symbol text, ts timestamp, exchange text, industry text, value double);
-
Create a table named
stocks_udt_table
that has a column for the primitive string fieldname
and a column that uses thestocks_type
UDT:CREATE TABLE stocks_keyspace.stocks_table ( name text primary key, stocks FROZEN<stocks_type>);
-
In the connector configuration, define the mapping as follows:
topic: stocks_topic: stocks_keyspace: stocks_udt_table: mapping: 'name=key,stocks=value'
Map complex UDTs
The connector supports mapping complex user defined types of map
, set
, and list
.
-
Using the same keyspace from the previous example, create a UDT named
stocks_complex_type
with the following definition:CREATE TYPE stocks_keyspace.stocks_complex_type ( symbol text, ts timestamp, exchange text, industry text, value double, trailing_five_days_value frozen<list<double>>, similar_symbols frozen<set<text>>, stats frozen<map<text, double>>);
For this example, the UDT
stocks_complex_type
has the following fields:-
trailing_five_days_value frozen<list<double>>
: Contains alist
of the value of the stock in the last five days, for example[10.0, 11.0, 12.0, 23.0, 10.0]
. Values in a list don’t need to be unique. -
similar_symbols frozen<set<text>>
: Contains aset
of symbols in the same industry, or performing similarly, for example["M", "GM", "F"]
. Values in a set must be unique. -
stats frozen<map<text, double>>
: Contains amap
of key-value pairs, for example{"open": 11.0, "high": 20.0, "low": 9.0}
.
-
-
Create a table named
stocks_complex_udt_table
that has a column for the primitive string fieldname
and a column that uses thestocks_complex_type
UDT:CREATE TABLE stocks_keyspace.stocks_complex_udt_table ( name text primary key, stocks_complex FROZEN<stocks_complex_type>);
-
In the connector configuration, define the mapping as follows:
topic: stocks_topic: stocks_keyspace: stocks_complex_udt_table: mapping: 'name=key,stocks_complex=value'
Selectively update maps and UDTs
If your mapping includes maps or UDTs, you can use CQL queries in your connector configuration to selectively update maps and UDTs based on the existence of values in the Pulsar fields.
When a new record arrives, the connector sets only the non-null fields in the message.
It won’t override UDT fields that have a null value or aren’t present in the Pulsar record.
This configuration requires that nullToUnset
is true
.
This configuration helps minimize tombstones in the database.
It uses the CQL UPDATE
command to only update the fields present in the Pulsar record.
You must use the udtColNotFrozen
keyword in the CQL query, and the type definition cannot be frozen.
-
Create the UDT and ensure that it isn’t frozen. For example:
CREATE TYPE IF NOT EXISTS myudt (udtmem1 int, udtmem2 text);
-
In your connector configuration, set the
nullToUnset
parameter totrue
:topic: topic_name: keyspace_name: table_name: nullToUnset: true
-
Ensure all UDT fields and bound variables you plan to use in your query are set in the
mapping
:topic: topic_name: keyspace_name: table_name: nullToUnset: true mapping: bigintcol=key, udtcol1=value.udtmem1, udtcol2=value.udtmem2
-
Once you will have the UDT and mapping defined, add the
query
parameter with yourUPDATE
command using theudtColNotFrozen
keyword:topic: topic_name: keyspace_name: table_name: nullToUnset: true mapping: bigintcol=key, udtcol1=value.udtmem1, udtcol2=value.udtmem2 query: 'UPDATE ks.table set udtColNotFrozen.udtmem1=:udtcol1, udtColNotFrozen.udtmem2=:udtcol2 where bigintCol=:bigintcol'