Mapping a message that contains both basic and JSON fields

When the data format for the Kafka key or value is JSON, individual fields of that JSON structure can be specified in the connector mapping.

When the data format for the message key or value is JSON, the connector mapping can include individual fields in the JSON structure. DataStax Apache Kafka supports JSON produced by both the JsonSerializer and StringSerializer; mapping semantics are the same.

In the following example, the key is text field and the value is JSON. The key is mapped to the name field and each of the JSON fields to a separate column in the table.
key value
APPLE
{"symbol":"APPL",
"value":208,
"exchange":"NASDAQ",
"industry":"TECH",
"ts":"2018-11-26T19:26:27.483"}
EXXON MOBIL
{"symbol":"M",
"value":80,
"exchange":"NYSE",
"industry":"ENERGY",
"ts":"2018-11-26T19:26:27.483"}
GENERAL MOTORS
{"symbol":"GM",
"value":38,
"exchange":"NYSE",
"industry":"AUTO",
"ts":"2018-11-26T19:26:27.483"}
AT&T
{"symbol":"AT&T",
"value":33,
"exchange":"NYSE",
"industry":"TELECOM",
"ts":"2018-11-26T19:26:27.483"}
FORD MOTOR
{"symbol":"F",
"value":10,
"exchange":"NYSE",
"industry":"AUTO",
"ts":"2018-11-26T19:26:27.483"}
Note: JSON records in Kafka can also have a schema associated with them.

Table requirements

Ensure the following when mapping fields to columns:
  • Data in the Kafka field is compatible with the database table column data type.
  • Kafka field mapped to a database primary key (PK) column always contains data. Null values are not allowed in PK columns.

Procedure

  1. Verify that the correct converter is set in the key.converter and value.converter of the connect-distributed.properties file.
  2. Set up the supported database table.
    1. Create the keyspace. Ensure that keyspace is replicated to a datacenter that is set in the DataStax Apache Kafka Connector contactPoints parameter.
      For example, create the stocks_keyspace:
      cqlsh -e "CREATE KEYSPACE stocks_keyspace \
      WITH replication = {'class': 'NetworkTopologyStrategy',\
      'Cassandra': 1};"
      Note: The datacenter name is case sensitive. Use nodetool ring to get a list of datacenters.
    2. Create the table.
      For example, create the stocks_table:
      cqlsh -e "CREATE TABLE stocks_keyspace.stocks_table ( \
        symbol text, \
        ts timestamp, \
        exchange text, \
        industry text, \
        name text, \
        value double, \
        PRIMARY KEY (symbol, ts));"
    3. Verify that all nodes have the same schema version using nodetool describering.
      Replace keyspace_name:
      nodetool describering -- keyspace_name
  3. In the DataStax Connector configuration file:
    1. Add the topic name to topics.
    2. Define the topic-to-table map prefix.
    3. Define the field-to-column map.
    Example configurations for stocks_topic to stocks_table using the minimum required settings:
    Note: See DataStax Apache Kafka Connector configuration parameter reference for additional parameters. When the contactPoints parameter is missing, the localhost; this assumes the database is co-located on the DataStax Apache Kafka Connector node.
    • JSON for distributed mode:
      {
        "name": "stocks-sink",
        "config": {
          "connector.class": "com.datastax.kafkaconnector.DseSinkConnector",
          "tasks.max": "1",
          "topics": "stocks_topic",
          "topic.stocks_topic.stocks_keyspace.stocks_table.mapping": 
          "symbol=value.symbol, ts=value.ts, exchange=value.exchange, industry=value.industry, name=key, value=value.value"
        }
      }
    • Properties file for standalone mode:
      name=stocks-sink
      connector.class=com.datastax.kafkaconnector.DseSinkConnector
      tasks.max=1
      topics=stocks_topic
      topic.stocks_topic.stocks_keyspace.stocks_table.mapping = symbol=value.symbol, ts=value.ts, exchange=value.exchange, industry=value.industry, name=key, value=value.value
  4. Update configuration on a running worker or deploy the DataStax Connector for the first time.