Mapping JSON messages

The DataStax Apache Pulsar™ Connector supports mapping JSON messages with or without a schema. In this example, the key is regular JSON without schema. The value is also JSON but contains a schema and a payload. The type of the payload is Map and the connector is able to access the individual fields of that map.

key value

{"name":"APPLE"}

{"schema":{"type":"map","fields":[{"type":"string","optional":false,"field":"symbol"},{"type":"int32","optional":true,"field":"value"},{"type":"string","optional":true,"field":"exchange"},{"type":"string","optional":true,"field":"industry"},{"type":"string","optional":false,"field":"ts"}],"optional":false,"name":"stocksdata"},"payload":{"symbol":"APPL","value":208,"exchange":"NASDAQ","industry":"TECH","ts":"2018-11-26T19:26:27.483"}}

{"name":"EXXON MOBIL"}

{"schema":{"type":"map","fields":[{"type":"string","optional":false,"field":"symbol"},{"type":"int32","optional":true,"field":"value"},{"type":"string","optional":true,"field":"exchange"},{"type":"string","optional":true,"field":"industry"},{"type":"string","optional":false,"field":"ts"}],"optional":false,"name":"stocksdata"},"payload":{"symbol":"M","value":80,"exchange":"NYSE","industry":"ENERGY","ts":"2018-11-26T19:26:27.483"}}

{"name":"GENERAL MOTORS`"}

{"schema":{"type":"map","fields":[{"type":"string","optional":false,"field":"symbol"},{"type":"int32","optional":true,"field":"value"},{"type":"string","optional":true,"field":"exchange"},{"type":"string","optional":true,"field":"industry"},{"type":"string","optional":false,"field":"ts"}],"optional":false,"name":"stocksdata"},"payload":{"symbol":"GM","value":38,"exchange":"NYSE","industry":"AUTO","ts":"2018-11-26T19:26:27.483"}}

{"name":"AT&T`"}

{"schema":{"type":"map","fields":[{"type":"string","optional":false,"field":"symbol"},{"type":"int32","optional":true,"field":"value"},{"type":"string","optional":true,"field":"exchange"},{"type":"string","optional":true,"field":"industry"},{"type":"string","optional":false,"field":"ts"}],"optional":false,"name":"stocksdata"},"payload":{"symbol":"AT&T","value":33,"exchange":"NYSE","industry":"TELECOM","ts":"2018-11-26T19:26:27.483"}}

{"name":"FORD MOTOR`"}

{"schema":{"type":"map","fields":[{"type":"string","optional":false,"field":"symbol"},{"type":"int32","optional":true,"field":"value"},{"type":"string","optional":true,"field":"exchange"},{"type":"string","optional":true,"field":"industry"},{"type":"string","optional":false,"field":"ts"}],"optional":false,"name":"stocksdata"},"payload":{"symbol":"F","value":10,"exchange":"NYSE","industry":"AUTO","ts":"2018-11-26T19:26:27.483"}}

See the DataStax Pulsar Examples for a full example.

  1. Set up the supported database table.

  2. Create the keyspace. Ensure that keyspace is replicated to a datacenter that is set in the DataStax Apache Pulsar Connector contactPoints parameter. For example, create the stocks_keyspace:

    cqlsh -e "CREATE KEYSPACE stocks_keyspace \
      WITH replication = {'class': 'NetworkTopologyStrategy',\
      'Cassandra': 1};"
    The datacenter name is case sensitive. Use nodetool ring to get a list of datacenters.
  3. Create the table. For example, create the stocks_table:

    cqlsh -e "CREATE TABLE stocks_keyspace.stocks_table ( \
      symbol text, \
      ts timestamp, \
      exchange text, \
      industry text, \
      name text, \
      value double, \
      PRIMARY KEY (symbol, ts));"
  4. Verify that all nodes have the same schema version using nodetool describering. Replace keyspace\_name:

    nodetool describering -- keyspace\_name
  5. In the DataStax Apache Pulsar™ Connector configuration file:

    1. Add the topic name to topics.

    2. Define the topic-to-table map prefix.

    3. Define the field-to-column map.

      Example configurations for stocks_topic to stocks_table using the minimum required settings:

      tasks.max: 1
      topics: stocks_topic
      topic:
        stocks_topic:
          stocks_keyspace:
            stocks_table:
              mapping: 'symbol=value.symbol,ts=value.dateTime,exchange=value.exchange,industry=value.industry,name=key.name value=value.value'
  6. Update configuration on a running worker or deploy the DataStax Connector for the first time.