• Glossary
  • Support
  • Downloads
  • DataStax Home
Get Live Help
Expand All
Collapse All

DataStax Pulsar Connector

    • Getting Started
      • About the DataStax Apache Pulsar™ Connector
        • System requirements
      • Pulsar Connector release notes
      • Installing DataStax Apache Pulsar™ Connector
      • Pulsar Connector single instance quick start
    • Guides and examples
      • Configuration
        • Configuring parallelism
        • Specify writetime timestamp column
        • Setting row-level TTL values from Pulsar fields
        • Pass Pulsar Connector settings directly to the DataStax Java driver
        • Mapping pulsar topics to database tables
          • Determining topic data structure
          • Mapping basic messages to table columns
          • Mapping a message that contain JSON fields
            • Mapping a message that contains both basic and JSON fields
            • Mapping JSON messages
          • Mapping Avro messages
          • Extract Pulsar record header values
          • Mapping messages to table that has a User Defined Type
          • Mapping a topic to multiple tables
          • Multiple topics to multiple tables
          • Provide CQL queries in mappings
          • The now() function in mappings
      • Operations
        • About operating and maintaining the DataStax Connector
        • Scaling the DataStax Apache Pulsar™ Connector
        • Changing the topic or table schema
        • Restarting the DataStax Apache Pulsar™ Connector
        • Displaying the DataStax Apache Pulsar™ Connector configuration
        • Updating the DataStax Apache Pulsar™ Connector configuration
        • Deleting the DataStax Apache Pulsar™ Connector
        • Getting the DataStax Connector status
      • Security
        • Using internal or LDAP authentication
      • DataStax Apache Pulsar™ Connector metrics
      • Troubleshooting
        • Record fails to write
        • Writing fails because of mutation size
        • Data parsing fails
        • Loading balancing datacenter is not specified
    • Reference
      • DataStax Apache Pulsar™ Connector details
      • DataStax connection
      • Pulsar topic-to-table settings
      • Converting date and times for a topic
      • Using the DataStax Apache Pulsar™ Connector with DataStax Enterprise authentication
        • Internal or LDAP authentication
      • SSL encrypted connection
      • Configure error handling
  • DataStax Pulsar Connector
  • Guides and examples
  • Configuration
  • Mapping pulsar topics to database tables
  • Mapping a message that contain JSON fields
  • Mapping a message that contains both basic and JSON fields

Mapping a message that contains both basic and JSON fields

When the data format for the message key or value is JSON, the DataStax Apache Pulsar™ connector mapping can include individual fields in the JSON structure.

In the following example, the key is text field and the value is JSON. The key is mapped to the name field and each of the JSON fields to a separate column in the table.

key value

APPLE

{"symbol":"APPL", "value":208, "exchange":"NASDAQ", "industry":"TECH", "ts":"2018-11-26T19:26:27.483"}

EXXON MOBIL

{"symbol":"M",
"value":80,
"exchange":"NYSE",
"industry":"ENERGY",
"ts":"2018-11-26T19:26:27.483"}

GENERAL MOTORS

{"symbol":"GM", "value":38, "exchange":"NYSE", "industry":"AUTO", "ts":"2018-11-26T19:26:27.483"}

AT&T

{"symbol":"AT&T",
"value":33,
"exchange":"NYSE",
"industry":"TELECOM",
"ts":"2018-11-26T19:26:27.483"}

FORD MOTOR

{"symbol":"F", "value":10, "exchange":"NYSE", "industry":"AUTO", "ts":"2018-11-26T19:26:27.483"} `
JSON records in Pulsar can also have a schema associated with them.

Table requirements

Ensure the following when mapping fields to columns:

  • Data in the Apache Pulsar™ field is compatible with the database table column data type.

  • Pulsar field mapped to a database primary key (PK) column always contains data. Null values are not allowed in PK columns.

    1. Set up the supported database table.

    2. Create the keyspace. Ensure that keyspace is replicated to a datacenter that is set in the DataStax Apache Pulsar Connector contactPoints parameter. For example, create the stocks_keyspace:

      cqlsh -e "CREATE KEYSPACE stocks_keyspace \
        WITH replication = {'class': 'NetworkTopologyStrategy',\
        'Cassandra': 1};"
      The datacenter name is case sensitive. Use nodetool ring to get a list of datacenters.
    3. Create the table. For example, create the stocks_table:

      cqlsh -e "CREATE TABLE stocks_keyspace.stocks_table ( \
        symbol text, \
        ts timestamp, \
        exchange text, \
        industry text, \
        name text, \
        value double, \
        PRIMARY KEY (symbol, ts));"
    4. Verify that all nodes have the same schema version using nodetool describering. Replace keyspace_name:

      nodetool describering -- keyspace\_name
    5. In the DataStax Connector configuration file:

      1. Add the topic name to topics.

      2. Define the field-to-column mapping.

        Example configurations for stocks_topic to stocks_table using the minimum required settings:

        tasks.max: 1
        topics: stocks_topic
        topic
          stocks_topic:
            stocks_keyspace:
              stocks_table:
                mapping: 'symbol=value.symbol, ts=value.ts, exchange=value.exchange, industry=value.industry, name=key, value=value.value'
Mapping a message that contain JSON fields Mapping JSON messages

General Inquiries: +1 (650) 389-6000 info@datastax.com

© DataStax | Privacy policy | Terms of use

DataStax, Titan, and TitanDB are registered trademarks of DataStax, Inc. and its subsidiaries in the United States and/or other countries.

Apache, Apache Cassandra, Cassandra, Apache Tomcat, Tomcat, Apache Lucene, Apache Solr, Apache Hadoop, Hadoop, Apache Pulsar, Pulsar, Apache Spark, Spark, Apache TinkerPop, TinkerPop, Apache Kafka and Kafka are either registered trademarks or trademarks of the Apache Software Foundation or its subsidiaries in Canada, the United States and/or other countries.

Kubernetes is the registered trademark of the Linux Foundation.

landing_page landingpage