Pulsar topic-to-table parameters

The connector writes messages from your Apache Pulsar™ topics to tables in your database using the mapping and topic properties set in your connector’s configuration YAML file.

This includes the topic name, keyspace name, table name, schema mapping, codec options, and other write options.

Use the following syntax for standalone properties file:

topics: topic_name, topic_name1, topic_name2
topic:
  topic_name:
    keyspace_name:
      table_name:
        mapping: 'record=key, content=value'
        consistencyLevel: LOCAL_ONE
        ttlTimeUnit: SECONDS
        ttl: -1
        nullToUnset: true
        deletesEnabled: true
    codec:
      locale: en_US
      timeZone: UTC
      timestamp: CQL_TIMESTAMP
      date: ISO_LOCAL_DATE
      time: ISO_LOCAL_TIME
      unit: MILLISECONDS

Core parameters

The essential, top-level parameters are topics, topic, topic names, keyspace names, and table_names.

topics: topic_name, topic_name1, topic_name2
topic:
  topic_name:
    keyspace_name:
      table_name:
topics

A comma separated list of all topics to which the DataStax connector subscribes.

topic

A container for the configuration details for each subscribed topic. For each subscribed topic, provide the topic, keyspace, and table name nested under topic. For example:

topic:
  my_topic:
    my_keyspace:
      my_table:

Make sure that the keyspace and table are the correct targets for the topic. This is where Pulsar will write the messages from the topic.

Table parameters

These parameters are nested under the table name in the configuration YAML file. They define the mapping and write options for the records inserted into the table.

topics: topic_name
topic:
  topic_name:
    keyspace_name:
      table_name:
        mapping: 'record=key, content=value'
        consistencyLevel: LOCAL_ONE
        ttlTimeUnit: SECONDS
        ttl: -1
        nullToUnset: true
        deletesEnabled: true
consistencyLevel

Query consistency level for writes. Valid values depend on your database provider and use case.

Default: LOCAL_ONE

deletesEnabled

This parameter controls the handling of records that cause all columns to contain null values except for the primary key (PK) columns.

If false (disabled), records are always applied as INSERT/UPDATE statements, even if all non-PK columns would become null.

If true (enabled, default), a record that results in all-null values (except for PK columns) is applied as a DELETE, rather than an INSERT/UPDATE that would write nulls to all non-PK columns.

If true, the mapping must include all columns in order for the setting to function. If the mapping doesn’t include all columns, this parameter is treated as false.

The behavior applies to the post-mapping result. For example, if a row has a mix of null and non-null values, and the new record nullifies all of the non-null values, the result is a DELETE.

mapping

Required. A mapping of Pulsar fields to table columns. The exact contents depends on the message format and table schema. For examples and options, see the other mapping topics in this documentation and Mapping properties and functions.

nullToUnset

Whether to treat null values in Pulsar as UNSET in the database table.

This parameter controls the handling of updates versus overrides.

If true, the DataStax Pulsar connector treats null values as unset fields. When a new record arrives, it only updates fields that aren’t null in the incoming message. DataStax recommends setting this to true to avoid creating unnecessary tombstones.

If false, fields with null values are visible in the table as UNSET values.

Default: true

query

See Use CQL queries in mappings.

ttl, ttlTimeUnit, __ttl

The default time-to-live (TTL) is -1, which means that TTL is disabled. If you want to use TTL, there are two ways to set TTL in your connector configuration.

  • Static TTL

  • Dynamic TTL

In the table configuration, set ttl to the amount of seconds that a record remains in the table before it is automatically deleted.

      table_name:
        mapping: 'col1=key.f1, col2=value.f1'
        ttlTimeUnit: SECONDS
        ttl: 80000

In the mapping parameter, you can use the __ttl property to specify the column to use as the TTL value for the record being inserted into the database table. For example:

      table_name:
        mapping: 'col1=key.f1, col2=value.f1, __ttl=value.f2'
        ttlTimeUnit: SECONDS

If you set both static and dynamic TTL, the dynamic TTL (in mapping) takes precedence, and the connector writes a warning message to the log file.

If the dynamic TTL value is negative, an error is thrown when writing records to the table. However, the connector inserts the record without the TTL because the Pulsar connector is a streaming system.

If TTL is set (either statically or dynamically), the same TTL limit is applied to all rows in the mapped table. The DataStax Pulsar connector appends AND TTL <seconds> to all INSERT statements for the table.

With either static or dynamic TTL, you can optionally set ttlTimeUnit to specify the time unit for the ttl value. The default time unit is SECONDS. If you specify a different time unit, such as HOURS, the DataStax Pulsar connector automatically converts the value to SECONDS for compatibility.

Mapping properties and functions

The mapping parameter accepts additional properties that you can use to modify how the connector writes records to the table:

header

In the mapping parameter, you can extract values from the message properties, and then write those values to a database table.

For example, the following mapping extracts the header property f4 from the message properties, and writes it to the col3 column in the specified table:

topic:
  topic_name:
    keyspace_name:
      table_name:
        mapping: 'col1=key.f1, col2=value.f1, __ttl=value.f2, __timestamp=value.f3, col3=header.f4'

Use one or more individual mapping statements to extract the desired values and write them to table columns.

__timestamp, timestampTimeUnit

In the mapping parameter, you can use the optional __timestamp property to specify which column should be used for the writetime timestamp when a record is written to the database. The specified __timestamp column must be a number type. For example:

        mapping: 'col1=key.f1, col2=value.f1, __timestamp=value.f2'

By default, the database internally tracks the writetime timestamp of records inserted from Pulsar. Use the __timestamp property when your Pulsar records have an explicit timestamp field that you want to use as the writetime for the database record produced by the connector.

If you set __timestamp, you can also set timestampTimeUnit to specify the time unit for the provided timestamp value:

      table_name:
        timestampTimeUnit: SECONDS
        mapping: 'col1=key.f1, col2=value.f1, __timestamp=value.f2'

The default timestamp time unit is MICROSECONDS. If you specify a different time unit, such as SECONDS, the DataStax Pulsar connector automatically converts the value to MICROSECONDS for compatibility.

__ttl

See ttl.

now()

In the mapping parameter, you can use the optional now() function to set a column to the current timeuuid when a record is written to the database. The function gets the TIMEUUID, which is written to the table’s UUID and timeuuid column.

You can use the now() function on one or more columns, for example:

      table_name:
        mapping: 'col1=now(), col2=now(), col3=now()'

A column mapped to now() must exist in the table definition, and it must have the type timeuuid.

If mapped to multiple columns, each occurrence of now() is evaluated and returns a different TIMEUUID.

If deletesEnabled is true and the record causes all non-primary key columns to be null, then the called now() function is ignored, and the record is deleted. This prevents the table from containing rows with only a primary key and timestamp.

Codec parameters

Use the codec parameters to configure date and time conversion for each Pulsar topic if you don’t want to use the default settings:

topics: topic_name
topic:
  topic_name:
    codec:
      locale: en_US
      timeZone: UTC
      timestamp: CQL_TIMESTAMP
      date: ISO_LOCAL_DATE
      time: ISO_LOCAL_TIME
      unit: MILLISECONDS
    keyspace_name:
      table_name:

Note that the codec parameters are nested under the topic name (topics.topic.topic_name.codec), and they apply to that topic only.

In addition to the following settings, the connector supports any public static field in java.time.format.DateTimeFormatter as an option.

timestamp

The temporal pattern to use for string-to-CQL timestamp conversion. Allowed values include the following:

  • A date-time pattern, such as yyyy-MM-dd HH:mm:ss

  • A pre-defined formatter, such as ISO_ZONED_DATE_TIME or ISO_INSTANT

  • A special formatter, CQL_TIMESTAMP, that accepts all valid CQL literal formats for the timestamp type.

    Default: CQL_TIMESTAMP

    If the Pulsar records are strings that contain only digits that cannot be parsed by the timestamp format, use the unit parameter to specify the time unit of the parsed value.

date

The temporal pattern to use for string-to-CQL date conversion. Allowed values include the following:

  • A date-time pattern, such as yyyy-MM-dd

  • A pre-defined formatter, such as ISO_LOCAL_DATE

    Default: ISO_LOCAL_DATE

    For example, to write a Pulsar string field like "2018-04-12" to a date column, set date: "yyyy-MM-dd".

time

The temporal pattern to use for string-to-CQL time conversion. Allowed values include the following:

  • A date-time pattern, such as HH:mm:ss

  • A pre-defined formatter, such as ISO_LOCAL_TIME

    Default: ISO_LOCAL_TIME

    For example, to write a Pulsar string field like "10:15:30" to a time column, the default ISO_LOCAL_TIME correctly converts the time format.

unit

If the Pulsar records are strings that contain only digits that cannot be parsed by the timestamp format, use the unit parameter to specify the time unit to apply to the parsed values.

unit can be any TimeUnit enum constant.

Default: MILLISECONDS

timeZone

The time zone to use for temporal conversions that don’t convey any explicit time zone information.

Default: UTC

For example, to write a Pulsar string field that contains values like 2018-03-09T17:12:32.584+01:00[Europe/Paris] to a timestamp column, set timezone: ISO_ZONED_DATE_TIME.

locale

Locale to use for locale-sensitive conversions.

Default: en_US

Was this helpful?

Give Feedback

How can we improve the documentation?

© 2025 DataStax, an IBM Company | Privacy policy | Terms of use | Manage Privacy Choices

Apache, Apache Cassandra, Cassandra, Apache Tomcat, Tomcat, Apache Lucene, Apache Solr, Apache Hadoop, Hadoop, Apache Pulsar, Pulsar, Apache Spark, Spark, Apache TinkerPop, TinkerPop, Apache Kafka and Kafka are either registered trademarks or trademarks of the Apache Software Foundation or its subsidiaries in Canada, the United States and/or other countries. Kubernetes is the registered trademark of the Linux Foundation.

General Inquiries: +1 (650) 389-6000, info@datastax.com