Debezium MongoDB

Debezium’s MongoDB connector tracks a MongoDB replica set or a MongoDB sharded cluster for document changes in databases and collections and records those changes as messages in an Apache Pulsar™ topic.

The connector automatically handles the addition or removal of shards in a sharded cluster, changes in membership of each replica set, elections within each replica set, and the resolution of communications problems.

Astra Streaming currently supports Apache Pulsar 2.10, which uses Debezium 1.7 libraries. For a list of supported MongoDB versions, please refer to the Debezium documentation.

Get Started

Set the required variables using any of the methods below.

export TENANT=<replace-me>
export DESTINATION_TOPIC=<replace-me>
export NAMESPACE=default
export SOURCE_NAME=debezium-mongo-src
  • Pulsar Admin

  • cURL

  • Sample Config Data

Refer to the complete pulsar-admin sources spec for all available options.

Assuming you have downloaded client.conf to the Pulsar folder:

./bin/pulsar-admin sources create \
  --source-type debezium-mongodb \
  --name "$SOURCE_NAME" \
  --destination-topic-name "persistent://$TENANT/$NAMESPACE/$DESTINATION_TOPIC" \
  --tenant "$TENANT" \
  --source-config '{
    "database.whitelist": "asdasd",
    "mongodb.hosts": "asd",
    "": "asdasd",
    "mongodb.password": "asd",
    "": "123",
    "mongodb.user": "asd"

You need a Pulsar token for REST API authentication. This is different from your Astra DB application tokens.

  1. In the Astra Portal, go to Streaming, click your tenant’s name, and then click the Settings tab.

  2. Click Create Token.

  3. Copy the token, store it securely, and then click Close.

  4. Click the Connect tab, and then copy the Web Service URL.

  5. Create environment variables for your tenant’s token and web service URL:

    export WEB_SERVICE_URL=<replace-me>
    export ASTRA_STREAMING_TOKEN=<replace-me>

    Refer to the complete Pulsar sources REST API spec, for all available options.

curl -sS --fail -X POST "$WEB_SERVICE_URL/admin/v3/sources/$TENANT/$NAMESPACE/$SOURCE_NAME" \
  -H "accept: application/json" \
  -H "Content-Type: application/json" \
  -H "Authorization: $ASTRA_STREAMING_TOKEN" \
  -d '{
        "tenant": "'$TENANT'",
        "topicName": "persistent://'$TENANT'/'$NAMESPACE'/'$DESTINATION_TOPIC'",
        "name": "'$SOURCE_NAME'",
        "namespace": "'$NAMESPACE'",
        "archive": "builtin://debezium-mongodb",
        "parallelism": 1,
        "processingGuarantees": "ATLEAST_ONCE",
        "configs": {
          "database.whitelist": "asdasd",
          "mongodb.hosts": "asd",
          "": "asdasd",
          "mongodb.password": "asd",
          "": "123",
          "mongodb.user": "asd"
    "database.whitelist": "asdasd",
    "mongodb.hosts": "asd",
    "": "asdasd",
    "mongodb.password": "asd",
    "": "123",
    "mongodb.user": "asd"

Managing the Connector


  • Pulsar Admin

  • cURL

Refer to the complete pulsar-admin sources spec for all available options.

Assuming you have downloaded client.conf to the Pulsar folder:

# Start all instances of a connector
./bin/pulsar-admin sources start \
  --namespace "$NAMESPACE" \
  --name "$SOURCE_NAME" \
  --tenant "$TENANT"

# optionally add --instance-id to only start an individual instance

You need a Pulsar token for REST API authentication. This is different from your Astra DB application tokens.

  1. In the Astra Portal, go to Streaming, click your tenant’s name, and then click the Settings tab.

  2. Click Create Token.

  3. Copy the token, store it securely, and then click Close.

  4. Click the Connect tab, and then copy the Web Service URL.

  5. Create environment variables for your tenant’s token and web service URL:

    export WEB_SERVICE_URL=<replace-me>
    export ASTRA_STREAMING_TOKEN=<replace-me>

    Refer to the complete Pulsar sources REST API spec, for all available options.

# Start all instances of a connector
curl -sS --fail -X POST "$WEB_SERVICE_URL/admin/v3/sources/$TENANT/$NAMESPACE/$SOURCE_NAME/start" \
  -H "Authorization: $ASTRA_STREAMING_TOKEN"

# Start an individual instance of a connector
  -H "Authorization: $ASTRA_STREAMING_TOKEN"


  • Pulsar Admin

  • cURL

Refer to the complete pulsar-admin sources spec for all available options.

Assuming you have downloaded client.conf to the Pulsar folder:

# Stop all instances of a connector
./bin/pulsar-admin sources stop \
  --namespace "$NAMESPACE" \
  --name "$SOURCE_NAME" \
  --tenant "$TENANT"

# optionally add --instance-id to only stop an individual instance

You need a Pulsar token for REST API authentication. This is different from your Astra DB application tokens.

  1. In the Astra Portal, go to Streaming, click your tenant’s name, and then click the Settings tab.

  2. Click Create Token.

  3. Copy the token, store it securely, and then click Close.

  4. Click the Connect tab, and then copy the Web Service URL.

  5. Create environment variables for your tenant’s token and web service URL:

    export WEB_SERVICE_URL=<replace-me>
    export ASTRA_STREAMING_TOKEN=<replace-me>

    Refer to the complete Pulsar sources REST API spec, for all available options.

# Stop all instances of a connector
curl -sS --fail -X POST "$WEB_SERVICE_URL/admin/v3/sources/$TENANT/$NAMESPACE/$SOURCE_NAME/stop" \
  -H "Authorization: $ASTRA_STREAMING_TOKEN"

# Stop an individual instance of a connector
#  -H "Authorization: $ASTRA_STREAMING_TOKEN"


  • Pulsar Admin

  • cURL

Refer to the complete pulsar-admin sources spec for all available options.

Assuming you have downloaded client.conf to the Pulsar folder:

# Restart all instances of a connector
./bin/pulsar-admin sources restart \
  --namespace "$NAMESPACE" \
  --name "$SOURCE_NAME" \
  --tenant "$TENANT"

# optionally add --instance-id to only restart an individual instance

You need a Pulsar token for REST API authentication. This is different from your Astra DB application tokens.

  1. In the Astra Portal, go to Streaming, click your tenant’s name, and then click the Settings tab.

  2. Click Create Token.

  3. Copy the token, store it securely, and then click Close.

  4. Click the Connect tab, and then copy the Web Service URL.

  5. Create environment variables for your tenant’s token and web service URL:

    export WEB_SERVICE_URL=<replace-me>
    export ASTRA_STREAMING_TOKEN=<replace-me>

    Refer to the complete Pulsar sources REST API spec, for all available options.

# Restart all instances of a connector
curl -sS --fail -X POST "$WEB_SERVICE_URL/admin/v3/sources/$TENANT/$NAMESPACE/$SOURCE_NAME/restart" \
  -H "Authorization: $ASTRA_STREAMING_TOKEN"

# Restart an individual instance of a connector
#curl -sS --fail -X POST "$WEB_SERVICE_URL/admin/v3/sources/$TENANT/$NAMESPACE/$SOURCE_NAME/$SOURCE_INSTANCEID/restart" \
#  -H "Authorization: $ASTRA_STREAMING_TOKEN"


  • Pulsar Admin

  • cURL

  • Response

Refer to the complete pulsar-admin sources spec for all available options.

Assuming you have downloaded client.conf to the Pulsar folder:

./bin/pulsar-admin sources update \
  --source-type debezium-mongodb \
  --name "$SOURCE_NAME" \
  --destination-topic-name "persistent://$TENANT/$NAMESPACE/$DESTINATION_TOPIC" \
  --tenant "$TENANT" \
  --parallelism 2 \
  --source-config '{}'

You need a Pulsar token for REST API authentication. This is different from your Astra DB application tokens.

  1. In the Astra Portal, go to Streaming, click your tenant’s name, and then click the Settings tab.

  2. Click Create Token.

  3. Copy the token, store it securely, and then click Close.

  4. Click the Connect tab, and then copy the Web Service URL.

  5. Create environment variables for your tenant’s token and web service URL:

    export WEB_SERVICE_URL=<replace-me>
    export ASTRA_STREAMING_TOKEN=<replace-me>

    Refer to the complete Pulsar sources REST API spec, for all available options.

curl -sS --fail -X PUT "$WEB_SERVICE_URL/admin/v3/sources/$TENANT/$NAMESPACE/$SOURCE_NAME" \
  -H "accept: application/json" \
  -H "Content-Type: application/json" \
  -H "Authorization: $ASTRA_STREAMING_TOKEN" \
  -d '{
        "tenant": "'$TENANT'",
        "topicName": "persistent://'$TENANT'/'$NAMESPACE'/'$DESTINATION_TOPIC'",
        "name": "'$SOURCE_NAME'",
        "namespace": "'$NAMESPACE'",
        "archive": "builtin://debezium-mongodb",
        "parallelism": 1,
        "processingGuarantees": "ATLEAST_ONCE",
        "configs": {
          "database.whitelist": "asdasd",
          "mongodb.hosts": "asd",
          "": "asdasd",
          "mongodb.password": "asd",
          "": "123",
          "mongodb.user": "asd"
   "tenant": "string",
   "namespace": "string",
   "name": "string",
   "className": "string",
   "topicName": "string",
   "producerConfig": {
     "maxPendingMessages": 0,
     "maxPendingMessagesAcrossPartitions": 0,
     "useThreadLocalProducers": true,
     "cryptoConfig": {
       "cryptoKeyReaderClassName": "string",
       "cryptoKeyReaderConfig": {
         "property1": {},
         "property2": {}
       "encryptionKeys": [
       "producerCryptoFailureAction": "FAIL",
       "consumerCryptoFailureAction": "FAIL"
     "batchBuilder": "string"
   "serdeClassName": "string",
   "schemaType": "string",
   "configs": {
     "property1": {},
     "property2": {}
   "secrets": {
     "property1": {},
     "property2": {}
   "parallelism": 0,
   "processingGuarantees": "ATLEAST_ONCE",
   "resources": {
     "cpu": 0,
     "ram": 0,
     "disk": 0
   "archive": "string",
   "runtimeFlags": "string",
   "customRuntimeOptions": "string",
   "batchSourceConfig": {
     "discoveryTriggererClassName": "string",
     "discoveryTriggererConfig": {
       "property1": {},
       "property2": {}
   "batchBuilder": "string"


  • Pulsar Admin

  • cURL

Refer to the complete pulsar-admin sources spec for all available options.

Assuming you have downloaded client.conf to the Pulsar folder:

# Delete all instances of a connector
./bin/pulsar-admin sources delete \
  --namespace "$NAMESPACE" \
  --name "$SOURCE_NAME" \
  --tenant "$TENANT"

You need a Pulsar token for REST API authentication. This is different from your Astra DB application tokens.

  1. In the Astra Portal, go to Streaming, click your tenant’s name, and then click the Settings tab.

  2. Click Create Token.

  3. Copy the token, store it securely, and then click Close.

  4. Click the Connect tab, and then copy the Web Service URL.

  5. Create environment variables for your tenant’s token and web service URL:

    export WEB_SERVICE_URL=<replace-me>
    export ASTRA_STREAMING_TOKEN=<replace-me>

    Refer to the complete Pulsar sources REST API spec, for all available options.

# Delete all instances of a connector
curl -sS --fail -X DELETE "$WEB_SERVICE_URL/admin/v3/sources/$TENANT/$NAMESPACE/$SOURCE_NAME" \
  -H "Authorization: $ASTRA_STREAMING_TOKEN"

Monitoring the Connector


  • Pulsar Admin

  • cURL

  • Sample Config Data

Assuming you have downloaded client.conf to the Pulsar folder:

# Get information about connector
./bin/pulsar-admin sources get \
  --namespace "$NAMESPACE" \
  --name "$SOURCE_NAME" \
  --tenant "$TENANT"

You need a Pulsar token for REST API authentication. This is different from your Astra DB application tokens.

  1. In the Astra Portal, go to Streaming, click your tenant’s name, and then click the Settings tab.

  2. Click Create Token.

  3. Copy the token, store it securely, and then click Close.

  4. Click the Connect tab, and then copy the Web Service URL.

  5. Create environment variables for your tenant’s token and web service URL:

    export WEB_SERVICE_URL=<replace-me>
    export ASTRA_STREAMING_TOKEN=<replace-me>
  6. Use these values to form curl commands to the REST API:

    # Get a connector's information
    curl -sS --fail "$WEB_SERVICE_URL/admin/v3/sources/$TENANT/$NAMESPACE/$SOURCE_NAME" \
      -H "accept: application/json" \
      -H "Authorization: $ASTRA_STREAMING_TOKEN"
    "database.whitelist": "asdasd",
    "mongodb.hosts": "asd",
    "": "asdasd",
    "mongodb.password": "asd",
    "": "123",
    "mongodb.user": "asd"


  • Pulsar Admin

  • cURL

  • Response

Assuming you have downloaded the client.conf to the pulsar folder.

# Stop all instances of a connector
./bin/pulsar-admin sources status \
  --instance-id "$SOURCE_INSTANCEID" \
  --namespace "$NAMESPACE" \
  --name "$SOURCE_NAME" \
  --tenant "$TENANT"

You need a Pulsar token for REST API authentication. This is different from your Astra DB application tokens.

  1. In the Astra Portal, go to Streaming, click your tenant’s name, and then click the Settings tab.

  2. Click Create Token.

  3. Copy the token, store it securely, and then click Close.

  4. Click the Connect tab, and then copy the Web Service URL.

  5. Create environment variables for your tenant’s token and web service URL:

    export WEB_SERVICE_URL=<replace-me>
    export ASTRA_STREAMING_TOKEN=<replace-me>
  6. Use these values to form curl commands to the REST API:

    # Get the status of all connector instances
    curl -sS --fail "$WEB_SERVICE_URL/admin/v3/sources/$TENANT/$NAMESPACE/$SOURCE_NAME/status" \
      -H "accept: application/json" \
      -H "Authorization: $ASTRA_STREAMING_TOKEN"
    # Get the status of an individual connector instance
      -H "accept: application/json" \
      -H "Authorization: $ASTRA_STREAMING_TOKEN"

Status response for all connector instances

  "numInstances": 0,
  "numRunning": 0,
  "instances": [
      "instanceId": 0,
      "status": {
        "running": true,
        "error": "string",
        "numRestarts": 0,
        "numReceivedFromSource": 0,
        "numSystemExceptions": 0,
        "latestSystemExceptions": [
            "exceptionString": "string",
            "timestampMs": 0
        "numSourceExceptions": 0,
        "latestSourceExceptions": [
            "exceptionString": "string",
            "timestampMs": 0
        "numWritten": 0,
        "lastReceivedTime": 0,
        "workerId": "string"

Status response for individual connector instance

  "running": true,
  "error": "string",
  "numRestarts": 0,
  "numReceivedFromSource": 0,
  "numSystemExceptions": 0,
  "latestSystemExceptions": [
      "exceptionString": "string",
      "timestampMs": 0
  "numSourceExceptions": 0,
  "latestSourceExceptions": [
      "exceptionString": "string",
      "timestampMs": 0
  "numWritten": 0,
  "lastReceivedTime": 0,
  "workerId": "string"


Astra Streaming exposes Prometheus formatted metrics for every connector. Refer to scrape metrics with Prometheus page for more detail.

Connector Reference

There are two sets of parameters that support source connectors.

Astra Streaming

Name Required Default Description



The connector type, like 'builtin://debezium-mysql'



BatchBuilder provides two types of batch construction methods, DEFAULT and KEY_BASED. The default value is: DEFAULT



Batch source config key/value (as a JSON string)



The connector type’s class reference, like ''




JSON key/value config of source type specific settings. Example: {"property1":"1234","property2":{"subProperty":"asdf"}}



A string that encodes options to customize the runtime, see Apache Pulsar docs for configured runtime for details



Give your source a good name for later reference. The name must start with a lowercase alphabetic character. It can only contain lowercase alphanumeric characters, and hyphens (kebab-case).



The namespace you’d like the source created under




The number of a Pulsar Source instances to run




The delivery semantics applied to the Pulsar Sink. Values are 'ATLEAST_ONCE', 'ATMOST_ONCE', 'EFFECTIVELY_ONCE'



The custom producer configuration (as a JSON string)



The compute resources that need to be allocated per source instance (applicable only to the process)(as a JSON string). Example: {"cpu": 0.25,"disk":1000000000,"ram":500000000}



A string that encodes options to customize the runtime, see Apache Pulsar docs for configured runtime for details



The schema type (either a builtin schema like 'avro', 'json', etc.. or custom Schema class name to be used to encode messages emitted from the Pulsar Source



This is a map of secretName(that is how the secret is going to be accessed in the function via context) to an object that encapsulates how the secret is fetched by the underlying secrets provider. The type of an value here can be found by the SecretProviderConfigurator.getSecretObjectType() method



The SerDe classname for the Pulsar Source



The tenant you’d like the source created under



The name of an existing topic in Astra Streaming, where messages will be published to. Should be in the format of [non-]persistent://<tenant>/<namespace>/<topic-name>

Debezium MongoDB (configs)

These values are provided in the "configs" area.

Name Required Default Description



A list of all databases hosted by this server which is monitored by the connector. This is optional, and there are other properties for listing databases and tables to include or exclude from monitoring.



The comma-separated list of hostname and port pairs ('host' or 'host:port') of the MongoDB servers in the replica set. If is set to false, prefix with the replica set name (e.g., rs0/localhost:27017).


A unique name that identifies the connector and/or MongoDB replica set or shared cluster that this connector monitors. Each server should be monitored by at most one Debezium connector.



Password to be used when connecting to MongoDB. This is required only when MongoDB is configured to use authentication.


The taskId of the MongoDB connector that attempts to use a separate task for each replica set.



Name of the database user to be used when connecting to MongoDB. This is required only when MongoDB is configured to use authentication.

What’s next?

For a detailed explanation of Debezium’s MongoDB connector, read the Debezium documentation.

Learn more about Debezium’s MongoDB connector in the Apache Pulsar MongoDB documentation.

Was this helpful?

Give Feedback

How can we improve the documentation?

© 2025 DataStax | Privacy policy | Terms of use | Manage Privacy Choices

Apache, Apache Cassandra, Cassandra, Apache Tomcat, Tomcat, Apache Lucene, Apache Solr, Apache Hadoop, Hadoop, Apache Pulsar, Pulsar, Apache Spark, Spark, Apache TinkerPop, TinkerPop, Apache Kafka and Kafka are either registered trademarks or trademarks of the Apache Software Foundation or its subsidiaries in Canada, the United States and/or other countries. Kubernetes is the registered trademark of the Linux Foundation.

General Inquiries: +1 (650) 389-6000,