Operate and maintain the DataStax Apache Pulsar™ connector
To operate and maintain the DataStax Apache Pulsar™ connector, use the Apache Pulsar administration tools, which include the pulsar-admin CLI and the Pulsar admin API.
Get the connector’s status
Use the pulsar-admin sinks status
command to get the status of a Pulsar sink:
bin/pulsar-admin sinks status --name SINK_NAME
Replace SINK_NAME
with the name of the sink that you want to check.
For example, to check the status of a sink named cass-sink-kv
, run bin/pulsar-admin sinks status --name cass-sink-kv
.
Result
{
"numInstances" : 1,
"numRunning" : 1,
"instances" : [ {
"instanceId" : 0,
"status" : {
"running" : true,
"error" : "",
"numRestarts" : 0,
"numReadFromPulsar" : 0,
"numSystemExceptions" : 0,
"latestSystemExceptions" : [ ],
"numSinkExceptions" : 0,
"latestSinkExceptions" : [ ],
"numWrittenToSink" : 0,
"lastReceivedTime" : 0,
"workerId" : "c-standalone-fw-localhost-8080"
}
} ]
}
Get the connector’s configuration
Use the pulsar-admin sinks get
command to retrieve the configuration details for a Pulsar sink:
bin/pulsar-admin sinks get --name SINK_NAME
Replace SINK_NAME
with the name of the sink that you want to check.
For example, to retrieve the configuration for a sink named dse-sink-kv
, run bin/pulsar-admin sinks get --name dse-sink-kv
.
The configuration is returned in JSON format.
Result
{
"tenant": "public",
"namespace": "default",
"name": "cass-sink-kv",
"className": "com.datastax.oss.sink.pulsar.StringCassandraSinkTask",
"inputSpecs": {
"persistent://public/default/example_topic": {
"isRegexPattern": false,
"schemaProperties": {},
"consumerProperties": {}
}
},
"configs": {
"loadBalancing.localDc": "Cassandra",
"queryExecutionTimeout": 30,
"auth": {
"provider": "None",
"gssapi": {
"service": "dse"
}
},
"tasks.max": 1,
"topics": "example_topic",
"contactPoints": "localhost",
"batchFlushTimeoutMs": 1000,
"maxConcurrentRequests": 500,
"ignoreErrors": "None",
"ssl": {
"provider": "None",
"hostnameValidation": true,
"keystore": {},
"openssl": {},
"truststore": {}
},
"verbose": false,
"connectionPoolLocalSize": 4,
"jmx": true,
"port": 9042,
"maxNumberOfRecordsInBatch": 32,
"topic": {
"example_topic": {
"pulsar_qs": {
"pulsar_kv": {
"mapping": "key\u003dkey,content\u003dvalue",
"consistencyLevel": "LOCAL_ONE",
"ttl": -1,
"ttlTimeUnit": "SECONDS",
"timestampTimeUnit": "MICROSECONDS",
"nullToUnset": true,
"deletesEnabled": true
}
},
"codec": {
"locale": "en_US",
"timeZone": "UTC",
"timestamp": "CQL_TIMESTAMP",
"date": "ISO_LOCAL_DATE",
"time": "ISO_LOCAL_TIME",
"unit": "MILLISECONDS"
}
}
},
"batchSize": 3000,
"compression": "None"
},
"parallelism": 1,
"processingGuarantees": "ATLEAST_ONCE",
"retainOrdering": false,
"autoAck": true,
"archive": "builtin://cassandra-enhanced"
}
Apply configuration changes or edit sink settings
Use the pulsar-admin sinks update
command to update the configuration of an active Pulsar sink.
You can use this command to apply changes made to the connector’s configuration file, and you can edit sink settings that aren’t set in the configuration file.
pulsar-admin sinks update --name SINK_NAME OPTIONS
Replace the following:
-
SINK_NAME
: The name of the sink associated with your DataStax Pulsar connector. -
OPTIONS
: Pulsar sink configuration options that you want to update, such as--sink-config-file
,--parallelism
, or--timeout-ms
.
For example, after modifying the connector’s configuration file, run the following command to apply the new configuration.
Replace CONFIG_FILE
with the name or path to your configuration file.
bin/pulsar-admin sinks update --name SINK_NAME --sink-config-file CONFIG_FILE
The sinks update
command automatically validates the changes, and then redeploys the sink with the new configuration.
It shuts down the existing instances and starts new ones.
To verify the updated configuration, use pulsar-admin sinks get
.
Restart the connector
Use the pulsar-admin sinks restart
command to restart a sink that was previously running:
bin/pulsar-admin sinks restart --name SINK_NAME
Replace SINK_NAME
with the name of the sink that you want to restart.
For example, to restart a sink named dse-sink-kv
, run bin/pulsar-admin sinks restart --name dse-sink-kv
.
By default, the command restarts all instances of the sink.
To restart a single instance, use the --instance-id
option:
bin/pulsar-admin sinks restart --name SINK_NAME --instance-id ID