Mapping a message that contains both basic and JSON fields
When the data format for the Kafka key or value is JSON, individual fields of that JSON structure can be specified in the connector mapping.
When the data format for the message key or value is JSON, the connector mapping can
include individual fields in the JSON structure. DataStax Apache Kafka™ supports JSON produced by both the JsonSerializer
and StringSerializer
; mapping semantics are the same.
In the following example, the key is text field and the value is JSON. The key is
mapped to the name field and each of the JSON fields to a separate column in the
table.
key | value |
---|---|
APPLE |
|
EXXON MOBIL |
|
GENERAL MOTORS |
|
AT&T |
|
FORD MOTOR |
|
Note: JSON records in Kafka can also have a schema associated with
them.
Table requirements
Ensure the following when mapping fields to columns:
- Data in the Kafka field is compatible with the database table column data type.
- Kafka field mapped to a database primary key (PK) column always contains data. Null values are not allowed in PK columns.
Procedure
-
Verify that the correct converter is set in the key.converter and value.converter of the
connect-distributed.properties
file. -
Set up the supported database table.
-
In the DataStax Connector configuration file:
- Add the topic name to topics.
- Define the topic-to-table map prefix.
- Define the field-to-column map.
Example configurations forstocks_topic
tostocks_table
using the minimum required settings:Note: See DataStax Apache Kafka Connector configuration parameter reference for additional parameters. When the contactPoints parameter is missing, thelocalhost
; this assumes the database is co-located on the DataStax Apache Kafka Connector node.- JSON for distributed
mode:
{ "name": "stocks-sink", "config": { "connector.class": "com.datastax.kafkaconnector.DseSinkConnector", "tasks.max": "1", "topics": "stocks_topic", "topic.stocks_topic.stocks_keyspace.stocks_table.mapping": "symbol=value.symbol, ts=value.ts, exchange=value.exchange, industry=value.industry, name=key, value=value.value" } }
- Properties file for standalone
mode:
name=stocks-sink connector.class=com.datastax.kafkaconnector.DseSinkConnector tasks.max=1 topics=stocks_topic topic.stocks_topic.stocks_keyspace.stocks_table.mapping = symbol=value.symbol, ts=value.ts, exchange=value.exchange, industry=value.industry, name=key, value=value.value
- Update configuration on a running worker or deploy the DataStax Connector for the first time.