Map basic messages

When messages are created using a basic or primitive serializer, the message contains a key-value pair. Map the key and value to table columns.

Mapping requirements

When mapping fields to columns, the following requirements apply:

  • The data in the Pulsar field must be compatible with the mapped column’s data type. For example, a string field cannot map to a number column.

  • A Pulsar field mapped to a table’s primary key (PK) column must always contain data because primary key columns don’t allow nulls.

  • All primary key columns must have a mapping because primary key columns don’t allow nulls.

Basic mapping example

In this example, a Pulsar topic named world_topic contains five records with a key-value pair in each. The key is an integer and the value is text.

  1. Create a keyspace in your database.

    This example creates a keyspace named world_keyspace:

    cqlsh -e "CREATE KEYSPACE world_keyspace \
      WITH replication = {'class': 'NetworkTopologyStrategy', \
      'Cassandra': 1};"

    For DSE and Cassandra clusters, make sure that the keyspace is replicated to a datacenter set in the connector’s contactPoints parameter. DSE datacenter names are case sensitive. Use nodetool ring to get a list of datacenters.

  2. Create a table.

    This example creates a table named world_table with two columns:

    cqlsh -e "CREATE TABLE world_table (recordid int PRIMARY KEY, continent text);"
  3. For DSE, verify that all nodes have the same schema version using nodetool describering.

    For example, this command verifies the schema for world_keyspace:

    nodetool describering -- world_keyspace
  4. In the DataStax connector configuration file, add the Pulsar topic name to topics, and define the field-to-column mapping.

    The following example is maps the world_topic to the world_table using the minimum required settings:

    tasks.max: 1
    topics: world_topic
    topic:
      world_topic:
        world_keyspace:
          world_table:
            mapping: 'recordid=key, continent=value'
  5. Use pulsar-admin sinks update to apply the new configuration.

Was this helpful?

Give Feedback

How can we improve the documentation?

© 2025 DataStax, an IBM Company | Privacy policy | Terms of use | Manage Privacy Choices

Apache, Apache Cassandra, Cassandra, Apache Tomcat, Tomcat, Apache Lucene, Apache Solr, Apache Hadoop, Hadoop, Apache Pulsar, Pulsar, Apache Spark, Spark, Apache TinkerPop, TinkerPop, Apache Kafka and Kafka are either registered trademarks or trademarks of the Apache Software Foundation or its subsidiaries in Canada, the United States and/or other countries. Kubernetes is the registered trademark of the Linux Foundation.

General Inquiries: +1 (650) 389-6000, info@datastax.com