Mapping JSON messages
The DataStax Apache Pulsar™ Connector supports mapping JSON messages with or without a schema. In this example, the key is regular JSON without schema. The value is also JSON but contains a schema and a payload. The type of the payload is Map and the connector is able to access the individual fields of that map.
key | value |
---|---|
|
|
|
|
|
|
|
|
|
|
See the DataStax Pulsar Examples for a full example.
-
Set up the supported database table.
-
Create the keyspace. Ensure that keyspace is replicated to a datacenter that is set in the DataStax Apache Pulsar Connector contactPoints parameter. For example, create the
stocks_keyspace
:cqlsh -e "CREATE KEYSPACE stocks_keyspace \ WITH replication = {'class': 'NetworkTopologyStrategy',\ 'Cassandra': 1};"
The datacenter name is case sensitive. Use nodetool ring to get a list of datacenters. -
Create the table. For example, create the
stocks_table
:cqlsh -e "CREATE TABLE stocks_keyspace.stocks_table ( \ symbol text, \ ts timestamp, \ exchange text, \ industry text, \ name text, \ value double, \ PRIMARY KEY (symbol, ts));"
-
Verify that all nodes have the same schema version using nodetool describering. Replace keyspace\_name:
nodetool describering -- keyspace\_name
-
In the DataStax Apache Pulsar™ Connector configuration file:
-
Add the topic name to topics.
-
Define the topic-to-table map prefix.
-
Define the field-to-column map.
Example configurations for
stocks_topic
tostocks_table
using the minimum required settings:tasks.max: 1 topics: stocks_topic topic: stocks_topic: stocks_keyspace: stocks_table: mapping: 'symbol=value.symbol,ts=value.dateTime,exchange=value.exchange,industry=value.industry,name=key.name value=value.value'
-
-
Update configuration on a running worker or deploy the DataStax Connector for the first time.