Astra DB and Apache Cassandra

The open-source Apache Pulsar™ Cassandra sink connector (cassandra) reads messages from Pulsar topics and writes them to Apache Cassandra databases.

The DataStax Apache Pulsar sink connector (cassandra-enhanced) extends the open-source connector by adding support for DataStax product connections. This includes Astra DB connections with a Secure Connect Bundle (SCB) and Astra application token.

This means that you can use this sink connector to write messages to tables in Astra DB or self-managed Cassandra clusters.

You can configure this connector in the Astra Portal or programmatically. The Astra Portal provides a guided configuration process, but it is only suitable for connecting to Astra DB databases. Using a programmatic approach, you can connect to Astra DB or a self-managed Cassandra cluster.

Create the connector

  1. Optional: If you are using the pulsar-admin CLI or Pulsar Admin API, set the following commonly-used environment variables:

    export TENANT="TENANT_NAME"
    export TOPIC="INPUT_TOPIC_NAME"
    export NAMESPACE="NAMESPACE_NAME" # or default
    export SINK_NAME="SINK_CONNECTOR_UNIQUE_NAME"
    export PULSAR_TOKEN="TENANT_PULSAR_TOKEN" # API only
    export WEB_SERVICE_URL="TENANT_PULSAR_WEB_SERVICE_URL" # API only

    SINK_NAME is the name for your new sink connector. DataStax recommends a memorable, human-readable name that summarizes the connector’s purpose. For example: cassandra-enhanced-sink-prod-us-east-1.

  2. Create the connector using JSON-formatted connector configuration data. You can pass the configuration directly or with a configuration file.

    pulsar-admin CLI
    ./bin/pulsar-admin sinks create \
      --sink-type cassandra-enhanced \
      --name "$SINK_NAME" \
      --inputs "persistent://$TENANT/$NAMESPACE/$TOPIC" \
      --tenant "$TENANT" \
      --sink-config-file configs.json
    Pulsar Admin API
    curl -sS --fail -L -X POST "$WEB_SERVICE_URL/admin/v3/sinks/$TENANT/$NAMESPACE/$SINK_NAME" \
      --header "Authorization: Bearer $PULSAR_TOKEN" \
      --form "sinkConfig=@configs.json;type=application/json"
    Example configuration data structure
    {
      "tenant": "${TENANT}",
      "namespace": "${NAMESPACE}",
      "name": "${SINK_NAME}",
      "className": "org.apache.pulsar.io.cassandra.sink.CassandraEnhancedSink",
      "sourceSubscriptionName": "string",
      "sourceSubscriptionPosition": "Latest",
      "inputs": [
        "persistent://${TENANT}/${NAMESPACE}/${TOPIC}"
      ],
      "topicToSerdeClassName": {
        "property1": "string",
        "property2": "string"
      },
      "topicToSchemaType": {
        "property1": "string",
        "property2": "string"
      },
      "topicToSchemaProperties": {
        "property1": "string",
        "property2": "string"
      },
      "inputSpecs": {
        "property1": {
          "schemaType": "string",
          "serdeClassName": "string",
          "schemaProperties": {
            "property1": "string",
            "property2": "string"
          },
          "consumerProperties": {
            "property1": "string",
            "property2": "string"
          },
          "receiverQueueSize": 0,
          "cryptoConfig": {
            "cryptoKeyReaderClassName": "string",
            "cryptoKeyReaderConfig": {
              "property1": {},
              "property2": {}
            },
            "encryptionKeys": [
              "string"
            ],
            "producerCryptoFailureAction": "FAIL",
            "consumerCryptoFailureAction": "FAIL"
          },
          "poolMessages": true,
          "regexPattern": false
        }
      },
      "maxMessageRetries": 0,
      "deadLetterTopic": "string",
      "configs": {
        "contactPoints": "cassandra",
        "loadBalancing.localDc": "datacenter1",
        "port": 9042,
        "cloud.secureConnectBundle": null,
        "ignoreErrors": "None",
        "maxConcurrentRequests": 500,
        "maxNumberOfRecordsInBatch": 32,
        "queryExecutionTimeout": 30,
        "connectionPoolLocalSize": 4,
        "jmx": true,
        "compression": "None",
        "auth": {
          "provider": "None",
          "username": null,
          "password": null,
          "gssapi": {
            "keyTab": null,
            "principal": null,
            "service": "dse"
          }
        },
        "ssl": {
          "provider": "None",
          "hostnameValidation": true,
          "keystore": {
            "password": null,
            "path": null
          },
          "openssl": {
            "keyCertChain": null,
            "privateKey": null
          },
          "truststore": {
            "password": null,
            "path": null
          },
          "cipherSuites": null
        },
        "topic": {
          "${TOPIC}": {
            "${KEYSPACE_NAME}": {
              "${TABLE_NAME}": {
                "mapping": "name=value.name",
                "consistencyLevel": "LOCAL_ONE",
                "ttl": -1,
                "ttlTimeUnit": "SECONDS",
                "timestampTimeUnit": "MICROSECONDS",
                "nullToUnset": true,
                "deletesEnabled": true
              }
            },
            "codec": {
              "locale": "en_US",
              "timeZone": "UTC",
              "timestamp": "CQL_TIMESTAMP",
              "date": "ISO_LOCAL_DATE",
              "time": "ISO_LOCAL_TIME",
              "unit": "MILLISECONDS"
            }
          }
        }
      },
      "secrets": {},
      "parallelism": 1,
      "processingGuarantees": "ATLEAST_ONCE",
      "retainOrdering": false,
      "retainKeyOrdering": true,
      "resources": {
        "cpu": 0.25,
        "disk": 1000000000,
        "ram": 1000000000
      },
      "autoAck": true,
      "timeoutMs": 5000,
      "negativeAckRedeliveryDelayMs": 0,
      "archive": "builtin://cassandra-enhanced",
      "cleanupSubscription": true,
      "runtimeFlags": "string",
      "customRuntimeOptions": "string",
      "transformFunction": "string",
      "transformFunctionClassName": "string",
      "transformFunctionConfig": "string"
    }

The preceding example connects to a self-managed Cassandra cluster.

For an Astra DB example, see Topic-to-table mapping properties (topic).

Edit the connector

To update a connector, pass the new configuration definition to the connector. For example, if you created the connector with a configuration file, you can pass an updated configuration file.

You can include the entire configuration or only the properties that you want to change. Additionally, some properties can be modified with specific arguments, such as --parallelism.

To get the current configuration, see Get sink connector configuration data.

pulsar-admin CLI
./bin/pulsar-admin sinks update \
  --sink-type cassandra-enhanced \
  --name "$SINK_NAME" \
  --inputs "persistent://$TENANT/$NAMESPACE/$TOPIC" \
  --tenant "$TENANT" \
  --parallelism 2
Pulsar Admin API
curl -sS --fail -L -X PUT "$WEB_SERVICE_URL/admin/v3/sinks/$TENANT/$NAMESPACE/$SINK_NAME" \
  --header "Authorization: Bearer $PULSAR_TOKEN" \
  --form "sinkConfig=@configs.json;type=application/json"

Manage the connector

See Create and manage connectors for details on how to manage connectors after you create them, including:

  • Get connector status

  • Get existing connectors

  • Get connector configuration details

  • Start connectors

  • Stop connectors

  • Restart connectors

  • Delete connectors

Connector configuration reference

To configure Astra Streaming Pulsar connectors, you use a combination of common Pulsar properties and provider-specific properties.

Because Astra Streaming and the Astra Streaming Pulsar connectors are based on a specific version of the open-source Apache Pulsar project and connectors, some properties and values might be unsupported or ignored by Astra Streaming.

Pulsar sink connector properties

Pulsar connectors and functions both use Pulsar functions workers. Therefore, some function configuration properties are also used to configure connectors.

The following table lists Astra Streaming Pulsar sink connector configuration properties by JSON field name. For the equivalent command line arguments and more information about these properties, see the following:

Pulsar configuration properties for sink connectors
Field name Required Description

archive

Yes

The type of built-in Astra Streaming Pulsar connector that you want to deploy. Formatted as builtin://CONNECTOR, such as builtin://kafka.

autoAck

Yes

Whether the framework automatically acknowledges messages.

Default: false (automatic acknowledgment disabled)

className

Yes

The connector type’s class reference beginning with org.apache.pulsar, such as org.apache.pulsar.common.schema.KeyValue or org.apache.pulsar.io.kafka.connect.KafkaConnectSink.

cleanupSubscription

No

Whether to delete subscriptions that are created or used by a sink when the sink is deleted.

Default: false

configs

Yes

A JSON-formatted key-value map containing configuration properties specific to the connector type, including provider-specific authentication and integration settings. For available subproperties, see DataStax Apache Pulsar sink connector properties (configs).

Default: {} (Empty map, uses defaults if they exist, fails otherwise)

customRuntimeOptions

No

A string that encodes options to configure the Apache Pulsar function runtime.

deadLetterTopic

No

The name of the topic that receives unacknowledged messages, such as those that exceed the maximum number of retries or fail to be processed completely.

If null or not set, unacknowledged messages are discarded.

See also maxMessageRetries and negativeAckRedeliveryDelayMs.

inputSpecs

No

A map of input topics to consumer configuration. By default, most values are null or empty. For example:

  "inputSpecs": {
    "persistent://$TENANT/$NAMESPACE/$TOPIC": {
      "schemaType": null,
      "serdeClassName": null,
      "schemaProperties": {},
      "consumerProperties": {},
      "receiverQueueSize": null,
      "cryptoConfig": null,
      "poolMessages": false,
      "regexPattern": false
    }
  },

inputs

Yes

An array of input topics that the sink consumes messages from, such as ["persistent://$TENANT/$NAMESPACE/$TOPIC"].

To consume all topics matching a naming pattern, use topicsPattern instead. inputs and topicsPattern are mutually exclusive.

Default: []

maxMessageRetries

No

Maximum number of times that a message attempts to be delivered before being sent to the dead letter queue as an unacknowledged message.

See also negativeAckRedeliveryDelayMs and deadLetterTopic.

name

Yes

The name for the connector. It must start with a lowercase letter, and contain only numbers, hyphens (-), and lowercase letters. DataStax recommends a memorable, human-readable name that summarizes the connector’s purpose. For example: cassandra-enhanced-prod-us-east-1.

namespace

Yes

The namespace in your Pulsar tenant where you want to create the connector.

negativeAckRedeliveryDelayMs

No

The amount of time, in milliseconds, to wait before attempting redelivery if message delivery times out or fails.

See also maxMessageRetries and deadLetterTopic.

parallelism

Yes

The number of Pulsar function instances to run.

Default: 1

processingGuarantees

Yes

The messaging delivery semantic to use when writing to topics: ATLEAST_ONCE, ATMOST_ONCE, or EFFECTIVELY_ONCE.

Respect for the processing guarantee depends on the connector implementation.

For more information, see the Pulsar documentation on Function processing guarantees and Processing guarantees in I/O connectors.

Default: ATLEAST_ONCE

resources

No

A JSON string describing the compute resources to allocate to each Pulsar function instance. For example: {"cpu":0.25,"disk":1000000000,"ram":500000000}.

retainKeyOrdering

No

Whether the sink consumes and processes messages in key order.

Default: true

retainOrdering

No

Whether the sink consumes and processes messages in the order they were written to the topic.

Default: false

runtimeFlags

No

A string that encodes flags to pass to the Apache Pulsar function runtime. Only applicable to process and Kubernetes runtimes.

secrets

No

If security is enabled on your function workers, you can provide a map of secret names (secretName) to secrets provider configuration objects. The secretName is used by the connector to reference the secret. The mapped object contains the required properties to fetch the secret from the secrets provider.

To get the types for the values in this mapping, use the SecretProviderConfigurator.getSecretObjectType() method.

This is separate from connector-specific security settings in configs and the Pulsar authentication token used to connect to your Pulsar cluster.

sourceSubscriptionName

No

The name of a specific Pulsar source subscription, if required by your input topic consumer.

sourceSubscriptionPosition

No

The position to begin reading from in the source, if sourceSubscriptionName is set.

Default: Latest

tenant

Yes

The Pulsar tenant where you want to create the connector.

timeoutMs

No

The message timeout in milliseconds

Default: 5000

topicsPattern

No

A topic naming pattern to select topics to consume from all topics in a namespace.

To consume an exact list of topics, use inputs instead. inputs and topicsPattern are mutually exclusive.

topicToSchemaProperties

No

A map of input topics to schema properties specified as a JSON object.

topicToSchemaType

No

A map of input topics to schema types or class names specified as a JSON object.

topicToSerdeClassName

No

A map of input topics to SerDe class names specified as a JSON object.

DataStax Apache Pulsar sink connector properties (configs)

Set these properties in the configs section of the connector configuration.

The relevant and required properties depend on whether you are connecting to Astra DB or a self-managed Cassandra cluster, as well as the cluster configuration.

The following table includes commonly used properties. For all properties and more information, see the DataStax Apache Pulsar sink connector documentation.

Name Required Default Description

auth

Yes

{}

Contains the authentication parameters to connect to the database.

cloud.secureConnectBundle

Yes for Astra DB

null

The path to your Astra DB database’s SCB zip file, or a base64 encoding of the SCB zip file (base64:ENCODED_STRING).

Only use this property for Astra DB connections.

For self-managed clusters, use ssl.

compression

Yes

None

The compression algorithm to use.

connectionPoolLocalSize

Yes

4

Number of connections to each node in the local datacenter.

contactPoints

Yes for self-managed clusters

[localhost]

Either [localhost] or a comma-separated list of hostnames or IP addresses for your self-managed Cassandra cluster nodes.

If contactPoints is set to a list of hosts, then loadBalancing.localDc is required.

If cloud.secureConnectBundle is set, then contactPoints is ignored. The contact points are set by the SCB.

ignoreErrors

Yes

None

Sets the error handling behavior.

jmx

Yes

true

Whether to collect and report metrics using Java Management Extensions (JMX).

maxConcurrentRequests

Yes

500

The maximum number of concurrent requests that can be sent to each connected node.

maxNumberOfRecordsInBatch

Yes

32

The maximum number of records to include in a batch write to the database.

queryExecutionTimeout

Yes

30

CQL statement execution timeout limit in seconds.

ssl

Depends on cluster configuration

{}

Contains SSL-encryption parameters for self-managed clusters with client-to-node encryption enabled.

For Astra DB, use cloud.secureConnectBundle, which includes the SSL encryption configuration. If cloud.secureConnectBundle is set, then ssl is ignored.

topic

Yes

{}

Contains the topic-to-table mapping properties.

topics

Yes

[]

The names of topics to subscribe to.

For each topic in topics, there must be a corresponding entry in the topic section.

Topic-to-table mapping properties (topic)

topic is a subproperty of configs that contains topic-to-table mapping properties.

This mapping determines which table receives messages from a Pulsar topic, and how the message value fields are written to the table columns.

There are many subproperties and values available in topic to support different write options, value translations, and data formats.

The following example shows a truncated configs section that maps one topic to a table in an Astra DB database:

  "configs": {
    "auth": {
      "username": "token",
      "password": "${ASTRA_APPLICATION_TOKEN}"
    },
    "cloud.secureConnectBundle": "path/to/scb.zip",
    "topics": "${TOPIC}",
    // ...
    // Other configs subproperties
    // ...
    "topic": {
    "${TOPIC}": {
        "${KEYSPACE_NAME}": {
          "${TABLE_NAME}": {
            "mapping": "column1=value.fieldA,column2=value.fieldC,column3=value.fieldG,column8=value.fieldW"
            // ...
            // Other topic subproperties
            // ...
          }
        }
      }
    }
  }

The mapping string is a comma-separated list of table column names and message value fields. You can use other subproperties for other write options and mapping configuration, such as codec and nullToUnset. For a more information, subproperties, and examples, see the DataStax Apache Pulsar sink connector documentation.

Was this helpful?

Give Feedback

How can we improve the documentation?

© Copyright IBM Corporation 2026 | Privacy policy | Terms of use Manage Privacy Choices

Apache, Apache Cassandra, Cassandra, Apache Tomcat, Tomcat, Apache Lucene, Apache Solr, Apache Hadoop, Hadoop, Apache Pulsar, Pulsar, Apache Spark, Spark, Apache TinkerPop, TinkerPop, Apache Kafka and Kafka are either registered trademarks or trademarks of the Apache Software Foundation or its subsidiaries in Canada, the United States and/or other countries. Kubernetes is the registered trademark of the Linux Foundation.

General Inquiries: Contact IBM