Debezium SQL Server

The Debezium SQL Server source connector reads data from Debezium SQL Server databases and tables, and then emits change data event messages to Pulsar topics.

It is based on the change data capture (CDC) feature available in SQL Server 2016 Service Pack 1 (SP1) and later editions.

The SQL Server capture process monitors designated databases and tables, and then stores changes into dedicated change tables with stored procedure facades.

Compatibility

Astra Streaming supports Apache Pulsar™ 3.1, which uses Debezium 1.7 libraries.

For supported database versions, see Debezium 1.7 compatibility.

Create the connector

To create a Debezium SQL Server source connector, you must prepare your SQL Server instance and Pulsar topics before creating the connector.

  1. If you don’t already have one, create a compatible SQL Server instance, such as Microsoft Azure SQL Server.

  2. In your SQL Server instance, select or create a database and table to use as the source of CDC events that the connector will stream to your Pulsar tenant.

  3. In your Pulsar namespace, create four topics using the following naming convention:

    • CONNECTOR_NAME-debezium-history-topic

    • CONNECTOR_NAME-debezium-offset-topic

    • SERVER_NAME

    • SERVER_NAME.DB_NAME.TABLE_NAME

    CONNECTOR_NAME is the name that you will use for your Debezium SQL Server source connector.

    SERVER_NAME, DB_NAME, and TABLE_NAME are the names of your SQL Server instance, database, and table.

    You must use these exact topic names because the connector will fail if it can’t find the expected topics. For more information, see Debezium SQL Server connector topic names.

    You must create the required topics before you create the connector because the connector cannot create the topics automatically.

  4. When you create the source connector in the next steps, make sure the following properties are set correctly:

    • The name property (or the $SOURCE_NAME environment variable) must match the CONNECTOR_NAME you used in the topic names.

    • In configs, the topic.prefix and database.server.name must be set to your SQL Server instance name.

    • In configs, the task.id value must be a string ("task.id": "0"). If it is an integer ("task.id": 0), then the connector throws a NullPointerException.

  5. Optional: If you are using the pulsar-admin CLI or Pulsar Admin API, set the following commonly-used environment variables:

    export TENANT="TENANT_NAME"
    export TOPIC="OUTPUT_TOPIC_NAME"
    export NAMESPACE="NAMESPACE_NAME" # or default
    export SOURCE_NAME="SOURCE_CONNECTOR_UNIQUE_NAME"
    export PULSAR_TOKEN="TENANT_PULSAR_TOKEN" # API only
    export WEB_SERVICE_URL="TENANT_PULSAR_WEB_SERVICE_URL" # API only

    SOURCE_NAME is the name for your new source connector. DataStax recommends a memorable, human-readable name that summarizes the connector’s purpose. For example: debezium-sqlserver-source-prod-us-east-1.

  6. Create the connector using JSON-formatted connector configuration data. You can pass the configuration directly or with a configuration file.

    pulsar-admin CLI
    ./bin/pulsar-admin sources create \
      --source-type debezium-sqlserver \
      --name "$SOURCE_NAME" \
      --destination-topic-name "persistent://$TENANT/$NAMESPACE/$TOPIC" \
      --tenant "$TENANT" \
      --source-config-file configs.json
    Pulsar Admin API
    curl -sS --fail -L -X POST "$WEB_SERVICE_URL/admin/v3/sources/$TENANT/$NAMESPACE/$SOURCE_NAME" \
      --header "Authorization: Bearer $PULSAR_TOKEN" \
      --form "sourceConfig=@mynetty-source-config.json;type=application/json"
    Example configuration data structure
    {
      "tenant": "${TENANT}",
      "namespace": "${NAMESPACE}",
      "topicName": "persistent://${TENANT}/${NAMESPACE}/${TOPIC}",
      "name": "${SOURCE_NAME}",
      "producerConfig": {
        "maxPendingMessages": 0,
        "maxPendingMessagesAcrossPartitions": 0,
        "useThreadLocalProducers": true,
        "cryptoConfig": {
          "cryptoKeyReaderClassName": "string",
          "cryptoKeyReaderConfig": {
            "property1": {},
            "property2": {}
          },
          "encryptionKeys": [
            "string"
          ],
          "producerCryptoFailureAction": "FAIL",
          "consumerCryptoFailureAction": "FAIL"
        },
        "batchBuilder": "string"
      },
      "serdeClassName": "string",
      "schemaType": "string",
      "configs": {
        "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
        "database.dbname": "cdc_test",
        "database.history": "org.apache.pulsar.io.debezium.PulsarDatabaseHistory",
        "database.history.name": "cdc_test_history",
        "database.history.pulsar.service.url": "pulsar+ssl://pulsar-azure-westus2.streaming.datastax.com:6651",
        "database.history.pulsar.token": "${PULSAR_TOKEN}",
        "database.history.pulsar.topic": "dbz-stream672-history-topic",
        "database.hostname": "sqlserver.database.windows.net",
        "database.include.list": "cdc_test",
        "database.names": "cdc_test",
        "database.password": "${PASSWORD}",
        "database.port": 1433,
        "database.server.name": "sqlserver",
        "database.ssl": true,
        "database.ssl.mode": "required",
        "database.tcpKeepAlive": true,
        "database.user": "debezium_user",
        "decimal.handling.mode": "double",
        "driver.encrypt": true,
        "driver.trustServerCertificate": true,
        "include.schema.changes": true,
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "name": "dbz-stream672",
        "pulsar.auth.token": "${PULSAR_TOKEN}",
        "pulsar.service.url": "pulsar+ssl://pulsar-azure-westus2.streaming.datastax.com:6651",
        "snapshot.mode": "always",
        "table.include.list": "dbo.accounts",
        "task.class": "io.debezium.connector.sqlserver.SqlServerConnectorTask",
        "task.id": "0",
        "topic.namespace": "${TENANT}/${NAMESPACE}",
        "topic.prefix": "sqlserver",
        "typeClassName": "org.apache.pulsar.common.schema.KeyValue",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter"
      },
      "secrets": {},
      "parallelism": 1,
      "processingGuarantees": "ATLEAST_ONCE",
      "resources": {
        "cpu": 0.25,
        "disk": 1000000000,
        "ram": 1000000000
      },
      "archive": "builtin://debezium-sqlserver",
      "runtimeFlags": "string",
      "customRuntimeOptions": "string",
      "batchSourceConfig": {
        "discoveryTriggererClassName": "string",
        "discoveryTriggererConfig": {
          "property1": {},
          "property2": {}
        }
      },
      "batchBuilder": "string"
    }
  7. Set your consumers to subscribe to the events topic in your Pulsar namespace to receive CDC events. This topic contains the CDC events emitted by Debezium for the configured tables.

Edit the connector

To update a connector, pass the new configuration definition to the connector. For example, if you created the connector with a configuration file, you can pass an updated configuration file.

You can include the entire configuration or only the properties that you want to change. Additionally, some properties can be modified with specific arguments, such as --parallelism.

To get the current configuration, see Get source connector configuration data.

pulsar-admin CLI
./bin/pulsar-admin sources update \
  --source-type debezium-sqlserver \
  --name "$SOURCE_NAME" \
  --destination-topic-name "persistent://$TENANT/$NAMESPACE/$TOPIC" \
  --tenant "$TENANT" \
  --parallelism 2
Pulsar Admin API
curl -sS --fail -L -X PUT "$WEB_SERVICE_URL/admin/v3/sources/$TENANT/$NAMESPACE/$SOURCE_NAME" \
  --header "Authorization: Bearer $PULSAR_TOKEN" \
  --form "sourceConfig=@mynetty-source-config.json;type=application/json"

Manage the connector

See Create and manage connectors for details on how to manage connectors after you create them, including:

  • Get connector status

  • Get existing connectors

  • Get connector configuration details

  • Start connectors

  • Stop connectors

  • Restart connectors

  • Delete connectors

Connector configuration reference

To configure Astra Streaming Pulsar connectors, you use a combination of common Pulsar properties and provider-specific properties.

Because Astra Streaming and the Astra Streaming Pulsar connectors are based on a specific version of the open-source Apache Pulsar project and connectors, some properties and values might be unsupported or ignored by Astra Streaming.

Pulsar source connector properties

The name property must match the connector name you used in the topic names as explained in Create the connector.

Pulsar connectors and functions both use Pulsar functions workers. Therefore, some function configuration properties are also used to configure connectors.

The following table lists Astra Streaming Pulsar source connector configuration properties by JSON field name. For the equivalent command line arguments and more information about these properties, see the following:

Pulsar configuration properties for source connectors
Field name Required Description

archive

Yes

The type of built-in Astra Streaming Pulsar connector that you want to deploy. Formatted as builtin://CONNECTOR, such as builtin://kafka.

batchBuilder

No

The batch construction method: DEFAULT or KEY_BASED.

Default: DEFAULT

batchSourceConfig

No

The batch source configuration key-value as a JSON string.

configs

Yes

A JSON-formatted key-value map containing configuration properties specific to the connector type, including provider-specific authentication and integration settings. For available subproperties, see Debezium SQL Server source connector properties (configs).

Default: {} (Empty map, uses defaults if they exist, fails otherwise)

customRuntimeOptions

No

A string that encodes options to configure the Apache Pulsar function runtime.

name

Yes

The name for the connector. It must start with a lowercase letter, and contain only numbers, hyphens (-), and lowercase letters. DataStax recommends a memorable, human-readable name that summarizes the connector’s purpose. For example: debezium-sqlserver-prod-us-east-1.

namespace

Yes

The namespace in your Pulsar tenant where you want to create the connector.

parallelism

Yes

The number of Pulsar function instances to run.

Default: 1

processingGuarantees

Yes

The messaging delivery semantic to use when writing to topics: ATLEAST_ONCE, ATMOST_ONCE, or EFFECTIVELY_ONCE.

Respect for the processing guarantee depends on the connector implementation.

For more information, see the Pulsar documentation on Function processing guarantees and Processing guarantees in I/O connectors.

Default: ATLEAST_ONCE

producerConfig

No

A JSON string containing a custom producer configuration. By default, most values are null or empty. For example:

  "producerConfig": {
    "maxPendingMessages": null,
    "maxPendingMessagesAcrossPartitions": null,
    "useThreadLocalProducers": false,
    "cryptoConfig": null,
    "batchBuilder": ""
  },

resources

No

A JSON string describing the compute resources to allocate to each Pulsar function instance. For example: {"cpu":0.25,"disk":1000000000,"ram":500000000}.

runtimeFlags

No

A string that encodes flags to pass to the Apache Pulsar function runtime. Only applicable to process and Kubernetes runtimes.

schemaType

No

The schema type used to encode messages emitted from a Pulsar source. Accepts either a built-in schema like avro or json, or a custom Schema class name.

secrets

No

If security is enabled on your function workers, you can provide a map of secret names (secretName) to secrets provider configuration objects. The secretName is used by the connector to reference the secret. The mapped object contains the required properties to fetch the secret from the secrets provider.

To get the types for the values in this mapping, use the SecretProviderConfigurator.getSecretObjectType() method.

This is separate from connector-specific security settings in configs and the Pulsar authentication token used to connect to your Pulsar cluster.

serdeClassName

No

The SerDe classname for the source.

tenant

Yes

The Pulsar tenant where you want to create the connector.

topicName

Yes

An existing topic in your Astra Streaming tenant where messages are published. Must be formatted as a full persistent or non-persistent topic name, such as persistent://$TENANT/$NAMESPACE/$TOPIC.

typeClassName

Yes

The class reference for the connector type. Begins with org.apache.pulsar, such as org.apache.pulsar.common.schema.KeyValue or org.apache.pulsar.io.debezium.mysql.DebeziumMysqlSource.

Debezium SQL Server source connector properties (configs)

Make sure you set the topic.prefix, database.server.name, and task.id properties as explained in Create the connector.

Set these properties in the configs section of the connector configuration.

Generally, all properties provided in the Debezium connector for SQL Server and the OSS Apache Pulsar Debezium source connector are supported. Exceptions include properties that aren’t relevant to Astra Streaming and properties that are only present in incompatible versions.

Was this helpful?

Give Feedback

How can we improve the documentation?

© Copyright IBM Corporation 2026 | Privacy policy | Terms of use Manage Privacy Choices

Apache, Apache Cassandra, Cassandra, Apache Tomcat, Tomcat, Apache Lucene, Apache Solr, Apache Hadoop, Hadoop, Apache Pulsar, Pulsar, Apache Spark, Spark, Apache TinkerPop, TinkerPop, Apache Kafka and Kafka are either registered trademarks or trademarks of the Apache Software Foundation or its subsidiaries in Canada, the United States and/or other countries. Kubernetes is the registered trademark of the Linux Foundation.

General Inquiries: Contact IBM