Mapping JSON messages

The DataStax Apache Pulsar™ Connector supports mapping JSON messages with or without a schema. In this example, the key is regular JSON without schema. The value is also JSON but contains a schema and a payload. The type of the payload is Map and the connector is able to access the individual fields of that map.

key value

{"name":"APPLE"}

{"schema":{"type":"map","fields":[{"type":"string","optional":false,"field":"symbol"},{"type":"int32","optional":true,"field":"value"},{"type":"string","optional":true,"field":"exchange"},{"type":"string","optional":true,"field":"industry"},{"type":"string","optional":false,"field":"ts"}],"optional":false,"name":"stocksdata"},"payload":{"symbol":"APPL","value":208,"exchange":"NASDAQ","industry":"TECH","ts":"2018-11-26T19:26:27.483"}}

{"name":"EXXON MOBIL"}

{"schema":{"type":"map","fields":[{"type":"string","optional":false,"field":"symbol"},{"type":"int32","optional":true,"field":"value"},{"type":"string","optional":true,"field":"exchange"},{"type":"string","optional":true,"field":"industry"},{"type":"string","optional":false,"field":"ts"}],"optional":false,"name":"stocksdata"},"payload":{"symbol":"M","value":80,"exchange":"NYSE","industry":"ENERGY","ts":"2018-11-26T19:26:27.483"}}

{"name":"GENERAL MOTORS`"}

{"schema":{"type":"map","fields":[{"type":"string","optional":false,"field":"symbol"},{"type":"int32","optional":true,"field":"value"},{"type":"string","optional":true,"field":"exchange"},{"type":"string","optional":true,"field":"industry"},{"type":"string","optional":false,"field":"ts"}],"optional":false,"name":"stocksdata"},"payload":{"symbol":"GM","value":38,"exchange":"NYSE","industry":"AUTO","ts":"2018-11-26T19:26:27.483"}}

{"name":"AT&T`"}

{"schema":{"type":"map","fields":[{"type":"string","optional":false,"field":"symbol"},{"type":"int32","optional":true,"field":"value"},{"type":"string","optional":true,"field":"exchange"},{"type":"string","optional":true,"field":"industry"},{"type":"string","optional":false,"field":"ts"}],"optional":false,"name":"stocksdata"},"payload":{"symbol":"AT&T","value":33,"exchange":"NYSE","industry":"TELECOM","ts":"2018-11-26T19:26:27.483"}}

{"name":"FORD MOTOR`"}

{"schema":{"type":"map","fields":[{"type":"string","optional":false,"field":"symbol"},{"type":"int32","optional":true,"field":"value"},{"type":"string","optional":true,"field":"exchange"},{"type":"string","optional":true,"field":"industry"},{"type":"string","optional":false,"field":"ts"}],"optional":false,"name":"stocksdata"},"payload":{"symbol":"F","value":10,"exchange":"NYSE","industry":"AUTO","ts":"2018-11-26T19:26:27.483"}}

See the DataStax Pulsar Examples for a full example.

  1. Set up the supported database table.

  2. Create the keyspace. Ensure that keyspace is replicated to a datacenter that is set in the DataStax Apache Pulsar Connector contactPoints parameter. For example, create the stocks_keyspace:

    cqlsh -e "CREATE KEYSPACE stocks_keyspace \
      WITH replication = {'class': 'NetworkTopologyStrategy',\
      'Cassandra': 1};"
    The datacenter name is case sensitive. Use nodetool ring to get a list of datacenters.
  3. Create the table. For example, create the stocks_table:

    cqlsh -e "CREATE TABLE stocks_keyspace.stocks_table ( \
      symbol text, \
      ts timestamp, \
      exchange text, \
      industry text, \
      name text, \
      value double, \
      PRIMARY KEY (symbol, ts));"
  4. Verify that all nodes have the same schema version using nodetool describering. Replace keyspace\_name:

    nodetool describering -- keyspace\_name
  5. In the DataStax Apache Pulsar™ Connector configuration file:

    1. Add the topic name to topics.

    2. Define the topic-to-table map prefix.

    3. Define the field-to-column map.

      Example configurations for stocks_topic to stocks_table using the minimum required settings:

      tasks.max: 1
      topics: stocks_topic
      topic:
        stocks_topic:
          stocks_keyspace:
            stocks_table:
              mapping: 'symbol=value.symbol,ts=value.dateTime,exchange=value.exchange,industry=value.industry,name=key.name value=value.value'
  6. Update configuration on a running worker or deploy the DataStax Connector for the first time.

Was this helpful?

Give Feedback

How can we improve the documentation?

© 2024 DataStax | Privacy policy | Terms of use

Apache, Apache Cassandra, Cassandra, Apache Tomcat, Tomcat, Apache Lucene, Apache Solr, Apache Hadoop, Hadoop, Apache Pulsar, Pulsar, Apache Spark, Spark, Apache TinkerPop, TinkerPop, Apache Kafka and Kafka are either registered trademarks or trademarks of the Apache Software Foundation or its subsidiaries in Canada, the United States and/or other countries. Kubernetes is the registered trademark of the Linux Foundation.

General Inquiries: +1 (650) 389-6000, info@datastax.com