Mapping basic messages to table columns

When messages are created using a Basic or primitive serializer, the message contains a key-value pair. Map the key and value to table columns. Ensure that the data types of the message field are compatible with the data type of the target column.

For example, the Pulsar world topic (world_topic) contains 5 records with a key-value pair in each. The key is an integer and the value is text.

Table requirements

  1. Ensure the following when mapping fields to columns:

    • Data in the Pulsar field is compatible with the database table column data type.

    • Pulsar field mapped to a database primary key (PK) column always contains data. Null values are not allowed in PK columns.

  2. Set up the supported database table.

  3. Create the keyspace. Ensure that keyspace is replicated to a datacenter that is set in the DataStax Apache Pulsar Connector contactPoints parameter. For example, create the world_keyspace:

    cqlsh -e "CREATE KEYSPACE world_keyspace WITH replication = {'class': 'NetworkTopologyStrategy','Cassandra': 1};"
    The datacenter name is case sensitive. Use nodetool ring to get a list of datacenters.
  4. Create the table. For example, create the world\_table:

    cqlsh -e "CREATE TABLE world_table (recordid int PRIMARY KEY, continent text);"
  5. Verify that all nodes have the same schema version using [nodetool describering](https://docs.datastax.com/en/dse/6.8/dse-admin/datastax_enterprise/tools/nodetool/toolsDescribeRing.html). Replace keyspace_name:

    nodetool describering -- keyspace_name
  6. In the DataStax Apache Pulsar™ Connector configuration file:

    1. Add the topic name to topics.

    2. Define the topic-to-table map prefix.

    3. Define the field-to-column map.

      Example configurations for world_topic to world_table using the minimum required settings:

      tasks.max: 1
      topics: world_topic
      topic:
        world_topic:
          world_keyspace:
            world_table:
              mapping: 'recordid=key, continent=value'

Was this helpful?

Give Feedback

How can we improve the documentation?

© 2024 DataStax | Privacy policy | Terms of use

Apache, Apache Cassandra, Cassandra, Apache Tomcat, Tomcat, Apache Lucene, Apache Solr, Apache Hadoop, Hadoop, Apache Pulsar, Pulsar, Apache Spark, Spark, Apache TinkerPop, TinkerPop, Apache Kafka and Kafka are either registered trademarks or trademarks of the Apache Software Foundation or its subsidiaries in Canada, the United States and/or other countries. Kubernetes is the registered trademark of the Linux Foundation.

General Inquiries: +1 (650) 389-6000, info@datastax.com