Map messages that contain JSON
When the data format for the key or value is JSON, the DataStax Apache Pulsar™ connector mapping can include individual fields in the JSON structure.
JSON records in Pulsar can have a schema associated with them, but this isn’t required.
Mapping requirements
When mapping fields to columns, the following requirements apply:
-
The data in the Pulsar field must be compatible with the mapped column’s data type. For example, a string field cannot map to a number column.
-
A Pulsar field mapped to a table’s primary key (PK) column must always contain data because primary key columns don’t allow nulls.
-
All primary key columns must have a mapping because primary key columns don’t allow nulls.
Map messages that contain basic and JSON fields
When the data format for the message key or value is JSON, the DataStax Apache Pulsar™ connector mapping can include individual fields in the JSON structure.
In the following example, the key is a text field and the value is JSON.
The key is mapped to a name
field, and each of the JSON fields are mapped to a separate column in the table.
key | value |
---|---|
|
|
|
|
|
|
|
|
|
|
-
Create a keyspace in your database.
This example creates a keyspace named
stocks_keyspace
:cqlsh -e "CREATE KEYSPACE stocks_keyspace \ WITH replication = {'class': 'NetworkTopologyStrategy',\ 'Cassandra': 1};"
For DSE and Cassandra clusters, make sure that the keyspace is replicated to a datacenter set in the connector’s
contactPoints
parameter. DSE datacenter names are case sensitive. Usenodetool ring
to get a list of datacenters. -
Create a table.
This example creates a table named
stocks_table
with six columns:cqlsh -e "CREATE TABLE stocks_keyspace.stocks_table ( \ symbol text, \ ts timestamp, \ exchange text, \ industry text, \ name text, \ value double, \ PRIMARY KEY (symbol, ts));"
-
For DSE, verify that all nodes have the same schema version using
nodetool describering
.For example, this command verifies the schema for
stocks_keyspace
:nodetool describering -- stocks_keyspace
-
In the DataStax connector configuration file, add the Pulsar topic name to
topics
, and define the field-to-column mapping.The following example is maps the
stocks_topic
to thestocks_table
using the minimum required settings:tasks.max: 1 topics: stocks_topic topic: stocks_topic: stocks_keyspace: stocks_table: mapping: 'symbol=value.symbol, ts=value.ts, exchange=value.exchange, industry=value.industry, name=key, value=value.value'
-
Use
pulsar-admin sinks update
to apply the new configuration.
Map JSON-only messages
The DataStax Apache Pulsar connector supports mapping JSON messages with or without a schema.
In this example, the key is regular JSON without a schema, and the value is JSON that contains a schema and payload.
The type of the payload is Map
, and the connector is able to access the individual fields of the map.
key | value |
---|---|
|
|
|
|
|
|
|
|
|
|
-
Create a keyspace in your database.
This example creates a keyspace named
stocks_keyspace
:cqlsh -e "CREATE KEYSPACE stocks_keyspace \ WITH replication = {'class': 'NetworkTopologyStrategy',\ 'Cassandra': 1};"
For DSE and Cassandra clusters, make sure that the keyspace is replicated to a datacenter set in the connector’s
contactPoints
parameter. DSE datacenter names are case sensitive. Usenodetool ring
to get a list of datacenters. -
Create a table.
This example creates a table named
stocks_table
with six columns:cqlsh -e "CREATE TABLE stocks_keyspace.stocks_table ( \ symbol text, \ ts timestamp, \ exchange text, \ industry text, \ name text, \ value double, \ PRIMARY KEY (symbol, ts));"
-
For DSE, verify that all nodes have the same schema version using
nodetool describering
.For example, this command verifies the schema for
stocks_keyspace
:nodetool describering -- stocks_keyspace
-
In the DataStax connector configuration file, add the Pulsar topic name to
topics
, and then define the field-to-column mapping.The following example is maps the
stocks_topic
to thestocks_table
using the minimum required settings:tasks.max: 1 topics: stocks_topic topic: stocks_topic: stocks_keyspace: stocks_table: mapping: 'symbol=value.symbol, ts=value.dateTime, exchange=value.exchange, industry=value.industry, name=key.name, value=value.value'
-
Use
pulsar-admin sinks update
to apply the new configuration.