Map messages that contain JSON

When the data format for the key or value is JSON, the DataStax Apache Pulsar™ connector mapping can include individual fields in the JSON structure.

JSON records in Pulsar can have a schema associated with them, but this isn’t required.

Mapping requirements

When mapping fields to columns, the following requirements apply:

  • The data in the Pulsar field must be compatible with the mapped column’s data type. For example, a string field cannot map to a number column.

  • A Pulsar field mapped to a table’s primary key (PK) column must always contain data because primary key columns don’t allow nulls.

  • All primary key columns must have a mapping because primary key columns don’t allow nulls.

Map messages that contain basic and JSON fields

When the data format for the message key or value is JSON, the DataStax Apache Pulsar™ connector mapping can include individual fields in the JSON structure.

In the following example, the key is a text field and the value is JSON. The key is mapped to a name field, and each of the JSON fields are mapped to a separate column in the table.

key value

APPLE

{
  "symbol":"APPL",
  "value":208,
  "exchange":"NASDAQ",
  "industry":"TECH",
  "ts":"2018-11-26T19:26:27.483"
}

EXXON MOBIL

{
  "symbol":"M",
  "value":80,
  "exchange":"NYSE",
  "industry":"ENERGY",
  "ts":"2018-11-26T19:26:27.483"
}

GENERAL MOTORS

{
  "symbol":"GM",
  "value":38,
  "exchange":"NYSE",
  "industry":"AUTO",
  "ts":"2018-11-26T19:26:27.483"
}

AT&T

{
  "symbol":"AT&T",
  "value":33,
  "exchange":"NYSE",
  "industry":"TELECOM",
  "ts":"2018-11-26T19:26:27.483"
}

FORD MOTORS

{
  "symbol":"F",
  "value":10,
  "exchange":"NYSE",
  "industry":"AUTO",
  "ts":"2018-11-26T19:26:27.483"
}
  1. Create a keyspace in your database.

    This example creates a keyspace named stocks_keyspace:

    cqlsh -e "CREATE KEYSPACE stocks_keyspace \
      WITH replication = {'class': 'NetworkTopologyStrategy',\
      'Cassandra': 1};"

    For DSE and Cassandra clusters, make sure that the keyspace is replicated to a datacenter set in the connector’s contactPoints parameter. DSE datacenter names are case sensitive. Use nodetool ring to get a list of datacenters.

  2. Create a table.

    This example creates a table named stocks_table with six columns:

    cqlsh -e "CREATE TABLE stocks_keyspace.stocks_table ( \
      symbol text, \
      ts timestamp, \
      exchange text, \
      industry text, \
      name text, \
      value double, \
      PRIMARY KEY (symbol, ts));"
  3. For DSE, verify that all nodes have the same schema version using nodetool describering.

    For example, this command verifies the schema for stocks_keyspace:

    nodetool describering -- stocks_keyspace
  4. In the DataStax connector configuration file, add the Pulsar topic name to topics, and define the field-to-column mapping.

    The following example is maps the stocks_topic to the stocks_table using the minimum required settings:

    tasks.max: 1
    topics: stocks_topic
    topic:
      stocks_topic:
        stocks_keyspace:
          stocks_table:
            mapping: 'symbol=value.symbol, ts=value.ts, exchange=value.exchange, industry=value.industry, name=key, value=value.value'
  5. Use pulsar-admin sinks update to apply the new configuration.

Map JSON-only messages

The DataStax Apache Pulsar connector supports mapping JSON messages with or without a schema.

In this example, the key is regular JSON without a schema, and the value is JSON that contains a schema and payload. The type of the payload is Map, and the connector is able to access the individual fields of the map.

key value

{"name":"APPLE"}

{
  "schema": {
    "type": "map",
    "fields": [
      { "type": "string", "optional": false, "field": "symbol" },
      { "type": "int32", "optional": true, "field": "value" },
      { "type": "string", "optional": true, "field": "exchange" },
      { "type": "string", "optional": true, "field": "industry" },
      { "type": "string", "optional": false, "field": "ts" }
    ],
    "optional": false,
    "name": "stocksdata"
  },
  "payload": {
    "symbol": "APPL",
    "value": 208,
    "exchange": "NASDAQ",
    "industry": "TECH",
    "ts": "2018-11-26T19:26:27.483"
  }
}

{"name":"EXXON MOBIL"}

{
  "schema": {
    "type": "map",
    "fields": [
      { "type": "string", "optional": false, "field": "symbol" },
      { "type": "int32", "optional": true, "field": "value" },
      { "type": "string", "optional": true, "field": "exchange" },
      { "type": "string", "optional": true, "field": "industry" },
      { "type": "string", "optional": false, "field": "ts" }
    ],
    "optional": false,
    "name": "stocksdata"
  },
  "payload": {
    "symbol": "M",
    "value": 80,
    "exchange": "NYSE",
    "industry": "ENERGY",
    "ts": "2018-11-26T19:26:27.483"
  }
}

{"name":"GENERAL MOTORS"}

{
  "schema": {
    "type": "map",
    "fields": [
      { "type": "string", "optional": false, "field": "symbol" },
      { "type": "int32", "optional": true, "field": "value" },
      { "type": "string", "optional": true, "field": "exchange" },
      { "type": "string", "optional": true, "field": "industry" },
      { "type": "string", "optional": false, "field": "ts" }
    ],
    "optional": false,
    "name": "stocksdata"
  },
  "payload": {
    "symbol": "GM",
    "value": 38,
    "exchange": "NYSE",
    "industry": "AUTO",
    "ts": "2018-11-26T19:26:27.483"
  }
}

{"name":"AT&T"}

{
  "schema": {
    "type": "map",
    "fields": [
      { "type": "string", "optional": false, "field": "symbol" },
      { "type": "int32", "optional": true, "field": "value" },
      { "type": "string", "optional": true, "field": "exchange" },
      { "type": "string", "optional": true, "field": "industry" },
      { "type": "string", "optional": false, "field": "ts" }
    ],
    "optional": false,
    "name": "stocksdata"
  },
  "payload": {
    "symbol": "AT&T",
    "value": 33,
    "exchange": "NYSE",
    "industry": "TELECOM",
    "ts": "2018-11-26T19:26:27.483"
  }
}

{"name":"FORD MOTOR"}

{
  "schema": {
    "type": "map",
    "fields": [
      { "type": "string", "optional": false, "field": "symbol" },
      { "type": "int32", "optional": true, "field": "value" },
      { "type": "string", "optional": true, "field": "exchange" },
      { "type": "string", "optional": true, "field": "industry" },
      { "type": "string", "optional": false, "field": "ts" }
    ],
    "optional": false,
    "name": "stocksdata"
  },
  "payload": {
    "symbol": "F",
    "value": 10,
    "exchange": "NYSE",
    "industry": "AUTO",
    "ts": "2018-11-26T19:26:27.483"
  }
}
  1. Create a keyspace in your database.

    This example creates a keyspace named stocks_keyspace:

    cqlsh -e "CREATE KEYSPACE stocks_keyspace \
      WITH replication = {'class': 'NetworkTopologyStrategy',\
      'Cassandra': 1};"

    For DSE and Cassandra clusters, make sure that the keyspace is replicated to a datacenter set in the connector’s contactPoints parameter. DSE datacenter names are case sensitive. Use nodetool ring to get a list of datacenters.

  2. Create a table.

    This example creates a table named stocks_table with six columns:

    cqlsh -e "CREATE TABLE stocks_keyspace.stocks_table ( \
      symbol text, \
      ts timestamp, \
      exchange text, \
      industry text, \
      name text, \
      value double, \
      PRIMARY KEY (symbol, ts));"
  3. For DSE, verify that all nodes have the same schema version using nodetool describering.

    For example, this command verifies the schema for stocks_keyspace:

    nodetool describering -- stocks_keyspace
  4. In the DataStax connector configuration file, add the Pulsar topic name to topics, and then define the field-to-column mapping.

    The following example is maps the stocks_topic to the stocks_table using the minimum required settings:

    tasks.max: 1
    topics: stocks_topic
    topic:
      stocks_topic:
        stocks_keyspace:
          stocks_table:
            mapping: 'symbol=value.symbol, ts=value.dateTime, exchange=value.exchange, industry=value.industry, name=key.name, value=value.value'
  5. Use pulsar-admin sinks update to apply the new configuration.

Was this helpful?

Give Feedback

How can we improve the documentation?

© 2025 DataStax, an IBM Company | Privacy policy | Terms of use | Manage Privacy Choices

Apache, Apache Cassandra, Cassandra, Apache Tomcat, Tomcat, Apache Lucene, Apache Solr, Apache Hadoop, Hadoop, Apache Pulsar, Pulsar, Apache Spark, Spark, Apache TinkerPop, TinkerPop, Apache Kafka and Kafka are either registered trademarks or trademarks of the Apache Software Foundation or its subsidiaries in Canada, the United States and/or other countries. Kubernetes is the registered trademark of the Linux Foundation.

General Inquiries: +1 (650) 389-6000, info@datastax.com