Mapping JSON messages

Supports mapping JSON messages with or without a schema.

The DataStax Apache Kafka™ Connector supports mapping JSON messages with or without a schema. In this example, the key is regular JSON without schema. The value is also JSON but contains a schema and a payload. The type of the payload is Map and the connector is able to access the individual fields of that map.

key value
{"name":"APPLE"} {"schema":{"type":"map","fields":[{"type":"string","optional":false,"field":"symbol"},{"type":"int32","optional":true,"field":"value"},{"type":"string","optional":true,"field":"exchange"},{"type":"string","optional":true,"field":"industry"},{"type":"string","optional":false,"field":"ts"}],"optional":false,"name":"stocksdata"},"payload":{"symbol":"APPL","value":208,"exchange":"NASDAQ","industry":"TECH","ts":"2018-11-26T19:26:27.483"}}
{"name":"EXXON MOBIL"} {"schema":{"type":"map","fields":[{"type":"string","optional":false,"field":"symbol"},{"type":"int32","optional":true,"field":"value"},{"type":"string","optional":true,"field":"exchange"},{"type":"string","optional":true,"field":"industry"},{"type":"string","optional":false,"field":"ts"}],"optional":false,"name":"stocksdata"},"payload":{"symbol":"M","value":80,"exchange":"NYSE","industry":"ENERGY","ts":"2018-11-26T19:26:27.483"}}
{"name":"GENERAL MOTORS”} {"schema":{"type":"map","fields":[{"type":"string","optional":false,"field":"symbol"},{"type":"int32","optional":true,"field":"value"},{"type":"string","optional":true,"field":"exchange"},{"type":"string","optional":true,"field":"industry"},{"type":"string","optional":false,"field":"ts"}],"optional":false,"name":"stocksdata"},"payload":{"symbol":"GM","value":38,"exchange":"NYSE","industry":"AUTO","ts":"2018-11-26T19:26:27.483"}}
{"name":"AT&T”} {"schema":{"type":"map","fields":[{"type":"string","optional":false,"field":"symbol"},{"type":"int32","optional":true,"field":"value"},{"type":"string","optional":true,"field":"exchange"},{"type":"string","optional":true,"field":"industry"},{"type":"string","optional":false,"field":"ts"}],"optional":false,"name":"stocksdata"},"payload":{"symbol":"AT&T","value":33,"exchange":"NYSE","industry":"TELECOM","ts":"2018-11-26T19:26:27.483"}}
{"name":"FORD MOTOR”} {"schema":{"type":"map","fields":[{"type":"string","optional":false,"field":"symbol"},{"type":"int32","optional":true,"field":"value"},{"type":"string","optional":true,"field":"exchange"},{"type":"string","optional":true,"field":"industry"},{"type":"string","optional":false,"field":"ts"}],"optional":false,"name":"stocksdata"},"payload":{"symbol":"F","value":10,"exchange":"NYSE","industry":"AUTO","ts":"2018-11-26T19:26:27.483"}}
See the DataStax Kafka Examples for a full example.

Procedure

  1. Verify that the correct converter is set in the key.converter and value.converter of the connect-distributed.properties file.
  2. Set up the DataStax database table.
    1. Create the keyspace. Ensure that keyspace is replicated to a datacenter that is set in the DataStax Connector contactPoints parameter.
      For example, create the stocks_keyspace:
      cqlsh -e "CREATE KEYSPACE stocks_keyspace \
      WITH replication = {'class': 'NetworkTopologyStrategy',\
      'Cassandra': 1};"
      Note: The datacenter name is case sensitive. Use nodetool ring to get a list of datacenters.
    2. Create the table.
      For example, create the stocks_table:
      cqlsh -e "CREATE TABLE stocks_keyspace.stocks_table ( \
        symbol text, \
        ts timestamp, \
        exchange text, \
        industry text, \
        name text, \
        value double, \
        PRIMARY KEY (symbol, ts));"
    3. Verify that all nodes have the same schema version using nodetool describering.
      Replace keyspace_name:
      nodetool describering -- keyspace_name
  3. In the DataStax Connector configuration file:
    1. Add the topic name to topics.
    2. Define the topic-to-table map prefix.
    3. Define the field-to-column map.
    Example configurations for stocks_topic to stocks_table using the minimum required settings:
    • JSON for distributed mode:
      {
        "name": "stocks-sink",
        "config": {
          "connector.class": "com.datastax.kafkaconnector.DseSinkConnector",
          "tasks.max": "1",
          "topics": "stocks_topic",
          "topic.stocks_topic.stocks_keyspace.stocks_table.mapping": 
          “symbol=value.symbol, ts=value.dateTime, exchange=value.exchange, industry=value.industry, name=key.name, value=value.value”
        }
      }
    • Properties file for standalone mode:
      name=stocks-sink
      connector.class=com.datastax.kafkaconnector.DseSinkConnector
      tasks.max=1
      topics=stocks_topic
      topic.stocks_topic.stocks_keyspace.stocks_table.mapping = symbol=value.symbol,ts=value.dateTime,exchange=value.exchange,industry=value.industry,name=key.name,value=value.value
    Note: See DataStax Apache Kafka Connector configuration parameter reference for additional parameters. When the contactPoints parameter is missing, the localhost; this assumes the database is co-located on the Apache Kafka Connect node.
  4. Update configuration on a running worker or deploy the DataStax Connector for the first time.