• Glossary
  • Support
  • Downloads
  • DataStax Home
Get Live Help
Expand All
Collapse All

DataStax Pulsar Connector

    • Getting Started
      • About the DataStax Apache Pulsar™ Connector
        • System requirements
      • Pulsar Connector release notes
      • Installing DataStax Apache Pulsar™ Connector
      • Pulsar Connector single instance quick start
    • Guides and examples
      • Configuration
        • Configuring parallelism
        • Specify writetime timestamp column
        • Setting row-level TTL values from Pulsar fields
        • Pass Pulsar Connector settings directly to the DataStax Java driver
        • Mapping pulsar topics to database tables
          • Determining topic data structure
          • Mapping basic messages to table columns
          • Mapping a message that contain JSON fields
            • Mapping a message that contains both basic and JSON fields
            • Mapping JSON messages
          • Mapping Avro messages
          • Extract Pulsar record header values
          • Mapping messages to table that has a User Defined Type
          • Mapping a topic to multiple tables
          • Multiple topics to multiple tables
          • Provide CQL queries in mappings
          • The now() function in mappings
      • Operations
        • About operating and maintaining the DataStax Connector
        • Scaling the DataStax Apache Pulsar™ Connector
        • Changing the topic or table schema
        • Restarting the DataStax Apache Pulsar™ Connector
        • Displaying the DataStax Apache Pulsar™ Connector configuration
        • Updating the DataStax Apache Pulsar™ Connector configuration
        • Deleting the DataStax Apache Pulsar™ Connector
        • Getting the DataStax Connector status
      • Security
        • Using internal or LDAP authentication
      • DataStax Apache Pulsar™ Connector metrics
      • Troubleshooting
        • Record fails to write
        • Writing fails because of mutation size
        • Data parsing fails
        • Loading balancing datacenter is not specified
    • Reference
      • DataStax Apache Pulsar™ Connector details
      • DataStax connection
      • Pulsar topic-to-table settings
      • Converting date and times for a topic
      • Using the DataStax Apache Pulsar™ Connector with DataStax Enterprise authentication
        • Internal or LDAP authentication
      • SSL encrypted connection
      • Configure error handling
  • DataStax Pulsar Connector
  • Guides and examples
  • Configuration
  • Mapping pulsar topics to database tables
  • Mapping messages to table that has a User Defined Type

Mapping messages to table that has a User Defined Type

The DataStax Apache Pulsar™ Connector processes JSON, Avro, and complex types directly into a user-defined type column in the supported database. The Pulsar message embedded field names and the UDT field names must match.

In the example stocks_topic, the key is a basic string and the value is regular JSON.

key value

APPLE

{"symbol":"APPL", "value":208, "exchange":"NASDAQ", "industry":"TECH", "ts":"2018-11-26T19:26:27.483"}

EXXON MOBIL

{"symbol":"M",
"value":80,
"exchange":"NYSE",
"industry":"ENERGY",
"ts":"2018-11-26T19:26:27.483"}

GENERAL MOTORS

{"symbol":"GM", "value":38, "exchange":"NYSE", "industry":"AUTO", "ts":"2018-11-26T19:26:27.483"}

AT&T

{"symbol":"AT&T",
"value":33,
"exchange":"NYSE",
"industry":"TELECOM",
"ts":"2018-11-26T19:26:27.483"}

FORD MOTOR

{"symbol":"F", "value":10, "exchange":"NYSE", "industry":"AUTO", "ts":"2018-11-26T19:26:27.483"}

The DataStax keyspace name is stocks_keyspace. In the stocks keyspace create the user defined type, stocks_type:

CREATE TYPE stocks_keyspace.stocks_type (
    symbol text,
    ts timestamp,
    exchange text,
    industry text,
    value double);

Define a table named stocks_udt_table that has a column for the primitive string field name and a second column that uses the type, stocks_type:

CREATE TABLE stocks_keyspace.stocks_table (
    name text primary key,
    stocks FROZEN<stocks_type>);

Configure the connector settings and use the following for the map specification:

topic:
    stocks_topic:
        stocks_keyspace:
            stocks_udt_table:
                mapping: 'name=key,stocks=value'

Mapping complex user defined types

The DataStax Apache Pulsar™ Connector supports mapping complex user defined types of map, set, and list.

The DataStax keyspace name is stocks_keyspace. In the stocks keyspace create the user defined type, stocks_complex_type:

CREATE TYPE stocks_keyspace.stocks_complex_type (
    symbol text,
    ts timestamp,
    exchange text,
    industry text,
    value double,
    trailing_five_days_value frozen<list<double>>,
    similar_symbols frozen<set<text>>,
    stats frozen<map<text, double>>);
  • trailing_five_days_value frozen<list<double>> contains a list of the value of the stock in the last five days, for example [10.0, 11.0, 12.0, 23.0, 10.0]. Values in a list do not need to be unique.

  • similar_symbols frozen<set<text>> contains a set of symbols in the same industry, or performing similarly, for example ["M", "GM", "F"]. Values in a set must be unique.

  • stats frozen<map<text, double>> contains a map of key-value pairs, for example {"open": 11.0, "high": 20.0, "low": 9.0}.

Define a table named stocks_complex_udt_table that has a column for the primitive string field name and a second column that uses the type, stocks_complex_type:

CREATE TABLE stocks_keyspace.stocks_complex_udt_table (
    name text primary key,
    stocks_complex FROZEN<stocks_complex_type>);

Configure the connector settings and use the following for the map specification:

topic:
    stocks_topic:
        stocks_keyspace:
            stocks_complex_udt_table:
                mapping: 'name=key,stocks_complex=value'
Extract Pulsar record header values Mapping a topic to multiple tables

General Inquiries: +1 (650) 389-6000 info@datastax.com

© DataStax | Privacy policy | Terms of use

DataStax, Titan, and TitanDB are registered trademarks of DataStax, Inc. and its subsidiaries in the United States and/or other countries.

Apache, Apache Cassandra, Cassandra, Apache Tomcat, Tomcat, Apache Lucene, Apache Solr, Apache Hadoop, Hadoop, Apache Pulsar, Pulsar, Apache Spark, Spark, Apache TinkerPop, TinkerPop, Apache Kafka and Kafka are either registered trademarks or trademarks of the Apache Software Foundation or its subsidiaries in Canada, the United States and/or other countries.

Kubernetes is the registered trademark of the Linux Foundation.

landing_page landingpage