Operate and maintain the DataStax Apache Pulsar™ connector

To operate and maintain the DataStax Apache Pulsar™ connector, use the Apache Pulsar administration tools, which include the pulsar-admin CLI and the Pulsar admin API.

Get the connector’s status

Use the pulsar-admin sinks status command to get the status of a Pulsar sink:

bin/pulsar-admin sinks status --name SINK_NAME

Replace SINK_NAME with the name of the sink that you want to check. For example, to check the status of a sink named cass-sink-kv, run bin/pulsar-admin sinks status --name cass-sink-kv.

Result
{
  "numInstances" : 1,
  "numRunning" : 1,
  "instances" : [ {
    "instanceId" : 0,
    "status" : {
      "running" : true,
      "error" : "",
      "numRestarts" : 0,
      "numReadFromPulsar" : 0,
      "numSystemExceptions" : 0,
      "latestSystemExceptions" : [ ],
      "numSinkExceptions" : 0,
      "latestSinkExceptions" : [ ],
      "numWrittenToSink" : 0,
      "lastReceivedTime" : 0,
      "workerId" : "c-standalone-fw-localhost-8080"
    }
  } ]
}

Get the connector’s configuration

Use the pulsar-admin sinks get command to retrieve the configuration details for a Pulsar sink:

bin/pulsar-admin sinks get --name SINK_NAME

Replace SINK_NAME with the name of the sink that you want to check. For example, to retrieve the configuration for a sink named dse-sink-kv, run bin/pulsar-admin sinks get --name dse-sink-kv.

The configuration is returned in JSON format.

Result
{
  "tenant": "public",
  "namespace": "default",
  "name": "cass-sink-kv",
  "className": "com.datastax.oss.sink.pulsar.StringCassandraSinkTask",
  "inputSpecs": {
    "persistent://public/default/example_topic": {
      "isRegexPattern": false,
      "schemaProperties": {},
      "consumerProperties": {}
    }
  },
  "configs": {
    "loadBalancing.localDc": "Cassandra",
    "queryExecutionTimeout": 30,
    "auth": {
      "provider": "None",
      "gssapi": {
        "service": "dse"
      }
    },
    "tasks.max": 1,
    "topics": "example_topic",
    "contactPoints": "localhost",
    "batchFlushTimeoutMs": 1000,
    "maxConcurrentRequests": 500,
    "ignoreErrors": "None",
    "ssl": {
      "provider": "None",
      "hostnameValidation": true,
      "keystore": {},
      "openssl": {},
      "truststore": {}
    },
    "verbose": false,
    "connectionPoolLocalSize": 4,
    "jmx": true,
    "port": 9042,
    "maxNumberOfRecordsInBatch": 32,
    "topic": {
      "example_topic": {
        "pulsar_qs": {
          "pulsar_kv": {
            "mapping": "key\u003dkey,content\u003dvalue",
            "consistencyLevel": "LOCAL_ONE",
            "ttl": -1,
            "ttlTimeUnit": "SECONDS",
            "timestampTimeUnit": "MICROSECONDS",
            "nullToUnset": true,
            "deletesEnabled": true
          }
        },
        "codec": {
          "locale": "en_US",
          "timeZone": "UTC",
          "timestamp": "CQL_TIMESTAMP",
          "date": "ISO_LOCAL_DATE",
          "time": "ISO_LOCAL_TIME",
          "unit": "MILLISECONDS"
        }
      }
    },
    "batchSize": 3000,
    "compression": "None"
  },
  "parallelism": 1,
  "processingGuarantees": "ATLEAST_ONCE",
  "retainOrdering": false,
  "autoAck": true,
  "archive": "builtin://cassandra-enhanced"
}

Apply configuration changes or edit sink settings

Use the pulsar-admin sinks update command to update the configuration of an active Pulsar sink. You can use this command to apply changes made to the connector’s configuration file, and you can edit sink settings that aren’t set in the configuration file.

pulsar-admin sinks update --name SINK_NAME OPTIONS

Replace the following:

  • SINK_NAME: The name of the sink associated with your DataStax Pulsar connector.

  • OPTIONS: Pulsar sink configuration options that you want to update, such as --sink-config-file, --parallelism, or --timeout-ms.

For example, after modifying the connector’s configuration file, run the following command to apply the new configuration. Replace CONFIG_FILE with the name or path to your configuration file.

bin/pulsar-admin sinks update --name SINK_NAME --sink-config-file CONFIG_FILE

The sinks update command automatically validates the changes, and then redeploys the sink with the new configuration. It shuts down the existing instances and starts new ones. To verify the updated configuration, use pulsar-admin sinks get.

Restart the connector

Use the pulsar-admin sinks restart command to restart a sink that was previously running:

bin/pulsar-admin sinks restart --name SINK_NAME

Replace SINK_NAME with the name of the sink that you want to restart. For example, to restart a sink named dse-sink-kv, run bin/pulsar-admin sinks restart --name dse-sink-kv.

By default, the command restarts all instances of the sink. To restart a single instance, use the --instance-id option:

bin/pulsar-admin sinks restart --name SINK_NAME --instance-id ID

Was this helpful?

Give Feedback

How can we improve the documentation?

© 2025 DataStax, an IBM Company | Privacy policy | Terms of use | Manage Privacy Choices

Apache, Apache Cassandra, Cassandra, Apache Tomcat, Tomcat, Apache Lucene, Apache Solr, Apache Hadoop, Hadoop, Apache Pulsar, Pulsar, Apache Spark, Spark, Apache TinkerPop, TinkerPop, Apache Kafka and Kafka are either registered trademarks or trademarks of the Apache Software Foundation or its subsidiaries in Canada, the United States and/or other countries. Kubernetes is the registered trademark of the Linux Foundation.

General Inquiries: +1 (650) 389-6000, info@datastax.com