Pulsar topic-to-table parameters
The connector writes messages from your Apache Pulsar™ topics to tables in your database using the mapping and topic properties set in your connector’s configuration YAML file.
This includes the topic name, keyspace name, table name, schema mapping, codec options, and other write options.
Use the following syntax for standalone properties file:
topics: topic_name, topic_name1, topic_name2
topic:
topic_name:
keyspace_name:
table_name:
mapping: 'record=key, content=value'
consistencyLevel: LOCAL_ONE
ttlTimeUnit: SECONDS
ttl: -1
nullToUnset: true
deletesEnabled: true
codec:
locale: en_US
timeZone: UTC
timestamp: CQL_TIMESTAMP
date: ISO_LOCAL_DATE
time: ISO_LOCAL_TIME
unit: MILLISECONDS
Core parameters
The essential, top-level parameters are topics
, topic
, topic names, keyspace names, and table_names.
topics: topic_name, topic_name1, topic_name2
topic:
topic_name:
keyspace_name:
table_name:
- topics
-
A comma separated list of all topics to which the DataStax connector subscribes.
- topic
-
A container for the configuration details for each subscribed topic. For each subscribed topic, provide the topic, keyspace, and table name nested under
topic
. For example:topic: my_topic: my_keyspace: my_table:
Make sure that the keyspace and table are the correct targets for the topic. This is where Pulsar will write the messages from the topic.
Table parameters
These parameters are nested under the table name in the configuration YAML file. They define the mapping and write options for the records inserted into the table.
topics: topic_name
topic:
topic_name:
keyspace_name:
table_name:
mapping: 'record=key, content=value'
consistencyLevel: LOCAL_ONE
ttlTimeUnit: SECONDS
ttl: -1
nullToUnset: true
deletesEnabled: true
- consistencyLevel
-
Query consistency level for writes. Valid values depend on your database provider and use case.
Default:
LOCAL_ONE
- deletesEnabled
-
This parameter controls the handling of records that cause all columns to contain null values except for the primary key (PK) columns.
If
false
(disabled), records are always applied asINSERT
/UPDATE
statements, even if all non-PK columns would become null.If
true
(enabled, default), a record that results in all-null values (except for PK columns) is applied as aDELETE
, rather than anINSERT
/UPDATE
that would write nulls to all non-PK columns.If
true
, themapping
must include all columns in order for the setting to function. If themapping
doesn’t include all columns, this parameter is treated asfalse
.The behavior applies to the post-mapping result. For example, if a row has a mix of null and non-null values, and the new record nullifies all of the non-null values, the result is a
DELETE
.
- mapping
-
Required. A mapping of Pulsar fields to table columns. The exact contents depends on the message format and table schema. For examples and options, see the other mapping topics in this documentation and Mapping properties and functions.
- nullToUnset
-
Whether to treat null values in Pulsar as
UNSET
in the database table.This parameter controls the handling of updates versus overrides.
If
true
, the DataStax Pulsar connector treats null values as unset fields. When a new record arrives, it only updates fields that aren’t null in the incoming message. DataStax recommends setting this totrue
to avoid creating unnecessary tombstones.If
false
, fields with null values are visible in the table asUNSET
values.Default:
true
- query
- ttl, ttlTimeUnit, __ttl
-
The default time-to-live (TTL) is
-1
, which means that TTL is disabled. If you want to use TTL, there are two ways to set TTL in your connector configuration.-
Static TTL
-
Dynamic TTL
In the table configuration, set
ttl
to the amount of seconds that a record remains in the table before it is automatically deleted.table_name: mapping: 'col1=key.f1, col2=value.f1' ttlTimeUnit: SECONDS ttl: 80000
In the
mapping
parameter, you can use the__ttl
property to specify the column to use as the TTL value for the record being inserted into the database table. For example:table_name: mapping: 'col1=key.f1, col2=value.f1, __ttl=value.f2' ttlTimeUnit: SECONDS
If you set both static and dynamic TTL, the dynamic TTL (in
mapping
) takes precedence, and the connector writes a warning message to the log file.If the dynamic TTL value is negative, an error is thrown when writing records to the table. However, the connector inserts the record without the TTL because the Pulsar connector is a streaming system.
If TTL is set (either statically or dynamically), the same TTL limit is applied to all rows in the mapped table. The DataStax Pulsar connector appends
AND TTL <seconds>
to allINSERT
statements for the table.With either static or dynamic TTL, you can optionally set
ttlTimeUnit
to specify the time unit for thettl
value. The default time unit isSECONDS
. If you specify a different time unit, such asHOURS
, the DataStax Pulsar connector automatically converts the value toSECONDS
for compatibility. -
Mapping properties and functions
The mapping
parameter accepts additional properties that you can use to modify how the connector writes records to the table:
- header
-
In the
mapping
parameter, you can extract values from the message properties, and then write those values to a database table.For example, the following mapping extracts the
header
propertyf4
from the message properties, and writes it to thecol3
column in the specified table:topic: topic_name: keyspace_name: table_name: mapping: 'col1=key.f1, col2=value.f1, __ttl=value.f2, __timestamp=value.f3, col3=header.f4'
Use one or more individual mapping statements to extract the desired values and write them to table columns.
- __timestamp, timestampTimeUnit
-
In the
mapping
parameter, you can use the optional__timestamp
property to specify which column should be used for the writetime timestamp when a record is written to the database. The specified__timestamp
column must be a number type. For example:mapping: 'col1=key.f1, col2=value.f1, __timestamp=value.f2'
By default, the database internally tracks the
writetime
timestamp of records inserted from Pulsar. Use the__timestamp
property when your Pulsar records have an explicit timestamp field that you want to use as the writetime for the database record produced by the connector.If you set
__timestamp
, you can also settimestampTimeUnit
to specify the time unit for the provided timestamp value:table_name: timestampTimeUnit: SECONDS mapping: 'col1=key.f1, col2=value.f1, __timestamp=value.f2'
The default timestamp time unit is
MICROSECONDS
. If you specify a different time unit, such asSECONDS
, the DataStax Pulsar connector automatically converts the value toMICROSECONDS
for compatibility. - __ttl
-
See
ttl
. - now()
-
In the
mapping
parameter, you can use the optionalnow()
function to set a column to the current timeuuid when a record is written to the database. The function gets theTIMEUUID
, which is written to the table’s UUID and timeuuid column.You can use the
now()
function on one or more columns, for example:table_name: mapping: 'col1=now(), col2=now(), col3=now()'
A column mapped to
now()
must exist in the table definition, and it must have the typetimeuuid
.If mapped to multiple columns, each occurrence of
now()
is evaluated and returns a differentTIMEUUID
.If
deletesEnabled
istrue
and the record causes all non-primary key columns to be null, then the callednow()
function is ignored, and the record is deleted. This prevents the table from containing rows with only a primary key and timestamp.
Codec parameters
Use the codec
parameters to configure date and time conversion for each Pulsar topic if you don’t want to use the default settings:
topics: topic_name
topic:
topic_name:
codec:
locale: en_US
timeZone: UTC
timestamp: CQL_TIMESTAMP
date: ISO_LOCAL_DATE
time: ISO_LOCAL_TIME
unit: MILLISECONDS
keyspace_name:
table_name:
Note that the codec
parameters are nested under the topic name (topics.topic.topic_name.codec
), and they apply to that topic only.
In addition to the following settings, the connector supports any public static field in |
- timestamp
-
The temporal pattern to use for string-to-CQL timestamp conversion. Allowed values include the following:
-
A date-time pattern, such as
yyyy-MM-dd HH:mm:ss
-
A pre-defined formatter, such as
ISO_ZONED_DATE_TIME
orISO_INSTANT
-
A special formatter,
CQL_TIMESTAMP
, that accepts all valid CQL literal formats for the timestamp type.Default:
CQL_TIMESTAMP
If the Pulsar records are strings that contain only digits that cannot be parsed by the
timestamp
format, use theunit
parameter to specify the time unit of the parsed value.
-
- date
-
The temporal pattern to use for string-to-CQL date conversion. Allowed values include the following:
-
A date-time pattern, such as
yyyy-MM-dd
-
A pre-defined formatter, such as
ISO_LOCAL_DATE
Default:
ISO_LOCAL_DATE
For example, to write a Pulsar string field like "2018-04-12" to a date column, set
date: "yyyy-MM-dd"
.
-
- time
-
The temporal pattern to use for string-to-CQL time conversion. Allowed values include the following:
-
A date-time pattern, such as
HH:mm:ss
-
A pre-defined formatter, such as
ISO_LOCAL_TIME
Default:
ISO_LOCAL_TIME
For example, to write a Pulsar string field like "10:15:30" to a time column, the default
ISO_LOCAL_TIME
correctly converts the time format.
-
- unit
-
If the Pulsar records are strings that contain only digits that cannot be parsed by the
timestamp
format, use theunit
parameter to specify the time unit to apply to the parsed values.unit
can be any TimeUnit enum constant.Default:
MILLISECONDS
- timeZone
-
The time zone to use for temporal conversions that don’t convey any explicit time zone information.
Default:
UTC
For example, to write a Pulsar string field that contains values like
2018-03-09T17:12:32.584+01:00[Europe/Paris]
to a timestamp column, settimezone: ISO_ZONED_DATE_TIME
. - locale
-
Locale to use for locale-sensitive conversions.
Default:
en_US