Mapping JSON messages
Supports mapping JSON messages with or without a schema.
The DataStax Apache Kafka™ Connector supports mapping JSON messages with or without a schema. In this example, the key is regular JSON without schema. The value is also JSON but contains a schema and a payload. The type of the payload is Map and the connector is able to access the individual fields of that map.
key | value |
{"name":"APPLE"} | {"schema":{"type":"map","fields":[{"type":"string","optional":false,"field":"symbol"},{"type":"int32","optional":true,"field":"value"},{"type":"string","optional":true,"field":"exchange"},{"type":"string","optional":true,"field":"industry"},{"type":"string","optional":false,"field":"ts"}],"optional":false,"name":"stocksdata"},"payload":{"symbol":"APPL","value":208,"exchange":"NASDAQ","industry":"TECH","ts":"2018-11-26T19:26:27.483"}} |
{"name":"EXXON MOBIL"} | {"schema":{"type":"map","fields":[{"type":"string","optional":false,"field":"symbol"},{"type":"int32","optional":true,"field":"value"},{"type":"string","optional":true,"field":"exchange"},{"type":"string","optional":true,"field":"industry"},{"type":"string","optional":false,"field":"ts"}],"optional":false,"name":"stocksdata"},"payload":{"symbol":"M","value":80,"exchange":"NYSE","industry":"ENERGY","ts":"2018-11-26T19:26:27.483"}} |
{"name":"GENERAL MOTORS”} | {"schema":{"type":"map","fields":[{"type":"string","optional":false,"field":"symbol"},{"type":"int32","optional":true,"field":"value"},{"type":"string","optional":true,"field":"exchange"},{"type":"string","optional":true,"field":"industry"},{"type":"string","optional":false,"field":"ts"}],"optional":false,"name":"stocksdata"},"payload":{"symbol":"GM","value":38,"exchange":"NYSE","industry":"AUTO","ts":"2018-11-26T19:26:27.483"}} |
{"name":"AT&T”} | {"schema":{"type":"map","fields":[{"type":"string","optional":false,"field":"symbol"},{"type":"int32","optional":true,"field":"value"},{"type":"string","optional":true,"field":"exchange"},{"type":"string","optional":true,"field":"industry"},{"type":"string","optional":false,"field":"ts"}],"optional":false,"name":"stocksdata"},"payload":{"symbol":"AT&T","value":33,"exchange":"NYSE","industry":"TELECOM","ts":"2018-11-26T19:26:27.483"}} |
{"name":"FORD MOTOR”} | {"schema":{"type":"map","fields":[{"type":"string","optional":false,"field":"symbol"},{"type":"int32","optional":true,"field":"value"},{"type":"string","optional":true,"field":"exchange"},{"type":"string","optional":true,"field":"industry"},{"type":"string","optional":false,"field":"ts"}],"optional":false,"name":"stocksdata"},"payload":{"symbol":"F","value":10,"exchange":"NYSE","industry":"AUTO","ts":"2018-11-26T19:26:27.483"}} |
Procedure
-
Verify that the correct converter is set in the key.converter and value.converter of the
connect-distributed.properties
file. -
Set up the supported database table.
-
In the DataStax Apache Kafka Connector configuration file:
- Add the topic name to topics.
- Define the topic-to-table map prefix.
- Define the field-to-column map.
Example configurations forstocks_topic
tostocks_table
using the minimum required settings:- JSON for distributed
mode:
{ "name": "stocks-sink", "config": { "connector.class": "com.datastax.kafkaconnector.DseSinkConnector", "tasks.max": "1", "topics": "stocks_topic", "topic.stocks_topic.stocks_keyspace.stocks_table.mapping": “symbol=value.symbol, ts=value.dateTime, exchange=value.exchange, industry=value.industry, name=key.name, value=value.value” } }
- Properties file for standalone
mode:
name=stocks-sink connector.class=com.datastax.kafkaconnector.DseSinkConnector tasks.max=1 topics=stocks_topic topic.stocks_topic.stocks_keyspace.stocks_table.mapping = symbol=value.symbol,ts=value.dateTime,exchange=value.exchange,industry=value.industry,name=key.name,value=value.value
Note: See DataStax Apache Kafka Connector configuration parameter reference for additional parameters. When the contactPoints parameter is missing, thelocalhost
; this assumes the database is co-located on the DataStax Apache Kafka Connector node. - Update configuration on a running worker or deploy the DataStax Connector for the first time.