Debezium MongoDB
The Debezium MongoDB source connector tracks either a MongoDB replica set or a MongoDB sharded cluster for document changes in databases and collections. It streams those changes as messages to a Pulsar topic.
The connector automatically handles the following:
-
Addition and removal of shards in a sharded cluster.
-
Changes in membership for each replica set.
-
Elections within each replica set.
-
Resolution of communications issues with replica set members.
Compatibility
Astra Streaming supports Apache Pulsar™ 3.1, which uses Debezium 1.7 libraries.
For supported database versions, see Debezium 1.7 compatibility.
Create the connector
-
Optional: If you are using the
pulsar-adminCLI or Pulsar Admin API, set the following commonly-used environment variables:export TENANT="TENANT_NAME" export TOPIC="OUTPUT_TOPIC_NAME" export NAMESPACE="NAMESPACE_NAME" # or default export SOURCE_NAME="SOURCE_CONNECTOR_UNIQUE_NAME" export PULSAR_TOKEN="TENANT_PULSAR_TOKEN" # API only export WEB_SERVICE_URL="TENANT_PULSAR_WEB_SERVICE_URL" # API onlySOURCE_NAMEis the name for your new source connector. DataStax recommends a memorable, human-readable name that summarizes the connector’s purpose. For example:debezium-mongodb-source-prod-us-east-1. -
Create the connector using JSON-formatted connector configuration data. You can pass the configuration directly or with a configuration file.
pulsar-admin CLI./bin/pulsar-admin sources create \ --source-type debezium-mongodb \ --name "$SOURCE_NAME" \ --destination-topic-name "persistent://$TENANT/$NAMESPACE/$TOPIC" \ --tenant "$TENANT" \ --source-config-file configs.jsonPulsar Admin APIcurl -sS --fail -L -X POST "$WEB_SERVICE_URL/admin/v3/sources/$TENANT/$NAMESPACE/$SOURCE_NAME" \ --header "Authorization: Bearer $PULSAR_TOKEN" \ --form "sourceConfig=@mynetty-source-config.json;type=application/json"Example configuration data structure{ "tenant": "${TENANT}", "namespace": "${NAMESPACE}", "topicName": "persistent://${TENANT}/${NAMESPACE}/${TOPIC}", "name": "${SOURCE_NAME}", "producerConfig": { "maxPendingMessages": 0, "maxPendingMessagesAcrossPartitions": 0, "useThreadLocalProducers": true, "cryptoConfig": { "cryptoKeyReaderClassName": "string", "cryptoKeyReaderConfig": { "property1": {}, "property2": {} }, "encryptionKeys": [ "string" ], "producerCryptoFailureAction": "FAIL", "consumerCryptoFailureAction": "FAIL" }, "batchBuilder": "string" }, "serdeClassName": "string", "schemaType": "string", "configs": { "database.whitelist": "usersdb", "mongodb.hosts": "host:port", "mongodb.name": "streaming_prod", "mongodb.password": "${PASSWORD}", "mongodb.task.id": "123", "mongodb.user": "mongodb" }, "secrets": {}, "parallelism": 1, "processingGuarantees": "ATLEAST_ONCE", "resources": { "cpu": 0.25, "disk": 1000000000, "ram": 1000000000 }, "archive": "builtin://debezium-mongodb", "runtimeFlags": "string", "customRuntimeOptions": "string", "batchSourceConfig": { "discoveryTriggererClassName": "string", "discoveryTriggererConfig": { "property1": {}, "property2": {} } }, "batchBuilder": "string" }
Edit the connector
To update a connector, pass the new configuration definition to the connector. For example, if you created the connector with a configuration file, you can pass an updated configuration file.
You can include the entire configuration or only the properties that you want to change.
Additionally, some properties can be modified with specific arguments, such as --parallelism.
To get the current configuration, see Get source connector configuration data.
./bin/pulsar-admin sources update \
--source-type debezium-mongodb \
--name "$SOURCE_NAME" \
--destination-topic-name "persistent://$TENANT/$NAMESPACE/$TOPIC" \
--tenant "$TENANT" \
--parallelism 2
curl -sS --fail -L -X PUT "$WEB_SERVICE_URL/admin/v3/sources/$TENANT/$NAMESPACE/$SOURCE_NAME" \
--header "Authorization: Bearer $PULSAR_TOKEN" \
--form "sourceConfig=@mynetty-source-config.json;type=application/json"
Manage the connector
See Create and manage connectors for details on how to manage connectors after you create them, including:
-
Get connector status
-
Get existing connectors
-
Get connector configuration details
-
Start connectors
-
Stop connectors
-
Restart connectors
-
Delete connectors
Connector configuration reference
To configure Astra Streaming Pulsar connectors, you use a combination of common Pulsar properties and provider-specific properties.
Because Astra Streaming and the Astra Streaming Pulsar connectors are based on a specific version of the open-source Apache Pulsar project and connectors, some properties and values might be unsupported or ignored by Astra Streaming.
Pulsar source connector properties
|
Pulsar connectors and functions both use Pulsar functions workers. Therefore, some function configuration properties are also used to configure connectors. |
The following table lists Astra Streaming Pulsar source connector configuration properties by JSON field name. For the equivalent command line arguments and more information about these properties, see the following:
| Field name | Required | Description |
|---|---|---|
|
Yes |
The type of built-in Astra Streaming Pulsar connector that you want to deploy.
Formatted as |
|
No |
The batch construction method: Default: |
|
No |
The batch source configuration key-value as a JSON string. |
|
Yes |
A JSON-formatted key-value map containing configuration properties specific to the connector type, including provider-specific authentication and integration settings.
For available subproperties, see Debezium MongoDB source connector properties ( Default: |
|
No |
A string that encodes options to configure the Apache Pulsar function runtime. |
|
Yes |
The name for the connector.
It must start with a lowercase letter, and contain only numbers, hyphens ( |
|
Yes |
The namespace in your Pulsar tenant where you want to create the connector. |
|
Yes |
The number of Pulsar function instances to run. Default: |
|
Yes |
The messaging delivery semantic to use when writing to topics: Respect for the processing guarantee depends on the connector implementation. For more information, see the Pulsar documentation on Function processing guarantees and Processing guarantees in I/O connectors. Default: |
|
No |
A JSON string containing a custom producer configuration.
By default, most values are
|
|
No |
A JSON string describing the compute resources to allocate to each Pulsar function instance.
For example: |
|
No |
A string that encodes flags to pass to the Apache Pulsar function runtime. Only applicable to process and Kubernetes runtimes. |
|
No |
The schema type used to encode messages emitted from a Pulsar source.
Accepts either a built-in schema like |
|
No |
If security is enabled on your function workers, you can provide a map of secret names ( To get the types for the values in this mapping, use the This is separate from connector-specific security settings in |
|
No |
The SerDe classname for the source. |
|
Yes |
The Pulsar tenant where you want to create the connector. |
|
Yes |
An existing topic in your Astra Streaming tenant where messages are published.
Must be formatted as a full persistent or non-persistent topic name, such as |
|
Yes |
The class reference for the connector type.
Begins with |
Debezium MongoDB source connector properties (configs)
Set these properties in the configs section of the connector configuration.
The following table lists some commonly used properties. For more information and additional properties, see the documentation for the Debezium connector for MongoDB and the OSS Apache Pulsar Debezium source connector.
| Name | Required | Description |
|---|---|---|
|
No if alternative properties are used |
A list of databases hosted by the server monitored by the connector. There are multiple properties you can use to specify (inclusively or exclusively) the databases and tables to monitor. |
|
Yes |
A comma-separated list of hosts (either as If |
|
Yes |
A unique name that identifies the connector, MongoDB replica set, or shared cluster that the source connector monitors. Use only one Debezium MongoDB source connector for each MongoDB server. |
|
Yes if authentication is enabled |
The password to authenticate to the MongoDB server. |
|
Yes |
The |
|
Yes if authentication is enabled |
The name of the database user to authenticate to the MongoDB server. |