Google BigQuery

The Pulsar BigQuery sink connector reads messages from Pulsar topics and writes them to Google BigQuery.

This connector doesn’t integrate directly with BigQuery. Instead, it uses the Apache Pulsar Kafka Connect adaptor to transform Pulsar message data into a Kafka-compatible format, and then it uses the Kafka Connect BigQuery Sink for the BigQuery integration.

The adaptor provides a flexible and extensible framework for data transformation and processing, including support for common data formats and in-transit data transformations.

This connector doesn’t require a Kafka instance.

Although the connector uses Kafka-related libraries, it operates entirely within Pulsar.

Classes, properties, and configuration options that reference Kafka are translation points that map Pulsar concepts to Kafka concepts in the connector.

Create the connector

  1. Optional: If you are using the pulsar-admin CLI or Pulsar Admin API, set the following commonly-used environment variables:

    export TENANT="TENANT_NAME"
    export TOPIC="INPUT_TOPIC_NAME"
    export NAMESPACE="NAMESPACE_NAME" # or default
    export SINK_NAME="SINK_CONNECTOR_UNIQUE_NAME"
    export PULSAR_TOKEN="TENANT_PULSAR_TOKEN" # API only
    export WEB_SERVICE_URL="TENANT_PULSAR_WEB_SERVICE_URL" # API only

    SINK_NAME is the name for your new sink connector. DataStax recommends a memorable, human-readable name that summarizes the connector’s purpose. For example: bigquery-sink-prod-us-east-1.

  2. Create the connector using JSON-formatted connector configuration data. You can pass the configuration directly or with a configuration file.

    pulsar-admin CLI
    ./bin/pulsar-admin sinks create \
      --sink-type bigquery \
      --name "$SINK_NAME" \
      --inputs "persistent://$TENANT/$NAMESPACE/$TOPIC" \
      --tenant "$TENANT" \
      --sink-config-file configs.json
    Pulsar Admin API
    curl -sS --fail -L -X POST "$WEB_SERVICE_URL/admin/v3/sinks/$TENANT/$NAMESPACE/$SINK_NAME" \
      --header "Authorization: Bearer $PULSAR_TOKEN" \
      --form "sinkConfig=@configs.json;type=application/json"
    Example configuration data structure
    {
      "archive": "builtin://bigquery",
      "autoAck": true,
      "className": "org.apache.pulsar.io.kafka.connect.KafkaConnectSink",
      "cleanupSubscription": false,
      "configs": {
        "batchSize": "1000",
        "kafkaConnectorConfigProperties": {
          "name": "${SINK_NAME}",
          "project": "my-bigquery-project",
          "defaultDataset": "BQ_CONNECTOR_TEST",
          "autoCreateBucket": true,
          "autoCreateTables": false,
          "keySource": "JSON",
          "queueSize": "-1",
          "sanitizeTopics": false,
          "topics": "${TOPIC}",
          "keyfile": {
            "type": "service_account",
            "project_id": "XXXXXX",
            "private_key_id": "XXXXXXXXX",
            "private_key": "-----BEGIN PRIVATE KEY-----\nMIIEvQIBADANBgkqhkiG9w ... U=\n-----END PRIVATE KEY-----\n",
            "client_email": "XXXXXXXXX",
            "client_id": "XXXXXX",
            "auth_uri": "https://accounts.google.com/o/oauth2/auth",
            "token_uri": "https://oauth2.googleapis.com/token",
            "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
            "client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/XXXXXX"
          },
        },
        "kafkaConnectorSinkClass": "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",
        "lingerTimeMs": "1000",
        "offsetStorageTopic": "${TOPIC}-offsets",
        "sanitizeTopicName": true,
        "topic": "${TOPIC}"
      },
      "customRuntimeOptions": "internal_data",
      "deadLetterTopic": null,
      "inputSpecs": {
        "${TENANT}/${NAMESPACE}/${TOPIC}": {
          "consumerProperties": {},
          "cryptoConfig": null,
          "poolMessages": false,
          "receiverQueueSize": null,
          "regexPattern": false,
          "schemaProperties": {},
          "schemaType": null,
          "serdeClassName": null
        }
      },
      "inputs": [
        "${TENANT}/${NAMESPACE}/${TOPIC}"
      ],
      "maxMessageRetries": null,
      "name": "${SINK_NAME}",
      "namespace": "${NAMESPACE}",
      "negativeAckRedeliveryDelayMs": null,
      "parallelism": 1,
      "processingGuarantees": "EFFECTIVELY_ONCE",
      "resources": {
        "cpu": 0.25,
        "disk": 1000000000,
        "ram": 1000000000
      },
      "retainOrdering": false,
      "retainKeyOrdering": true,
      "runtimeFlags": null,
      "secrets": {},
      "sourceSubscriptionName": null,
      "sourceSubscriptionPosition": "Latest",
      "tenant": "${TENANT}",
      "timeoutMs": 5000,
      "topicToSchemaProperties": null,
      "topicToSchemaType": null,
      "topicToSerdeClassName": null,
      "transformFunction": null,
      "transformFunctionClassName": null,
      "transformFunctionConfig": null
    }

Edit the connector

To update a connector, pass the new configuration definition to the connector. For example, if you created the connector with a configuration file, you can pass an updated configuration file.

You can include the entire configuration or only the properties that you want to change. Additionally, some properties can be modified with specific arguments, such as --parallelism.

To get the current configuration, see Get sink connector configuration data.

pulsar-admin CLI
./bin/pulsar-admin sinks update \
  --sink-type bigquery \
  --name "$SINK_NAME" \
  --inputs "persistent://$TENANT/$NAMESPACE/$TOPIC" \
  --tenant "$TENANT" \
  --parallelism 2
Pulsar Admin API
curl -sS --fail -L -X PUT "$WEB_SERVICE_URL/admin/v3/sinks/$TENANT/$NAMESPACE/$SINK_NAME" \
  --header "Authorization: Bearer $PULSAR_TOKEN" \
  --form "sinkConfig=@configs.json;type=application/json"

Manage the connector

See Create and manage connectors for details on how to manage connectors after you create them, including:

  • Get connector status

  • Get existing connectors

  • Get connector configuration details

  • Start connectors

  • Stop connectors

  • Restart connectors

  • Delete connectors

Connector configuration reference

To configure Astra Streaming Pulsar connectors, you use a combination of common Pulsar properties and provider-specific properties.

Because Astra Streaming and the Astra Streaming Pulsar connectors are based on a specific version of the open-source Apache Pulsar project and connectors, some properties and values might be unsupported or ignored by Astra Streaming.

Pulsar sink connector properties

Pulsar connectors and functions both use Pulsar functions workers. Therefore, some function configuration properties are also used to configure connectors.

The following table lists Astra Streaming Pulsar sink connector configuration properties by JSON field name. For the equivalent command line arguments and more information about these properties, see the following:

Pulsar configuration properties for sink connectors
Field name Required Description

archive

Yes

The type of built-in Astra Streaming Pulsar connector that you want to deploy. Formatted as builtin://CONNECTOR, such as builtin://kafka.

autoAck

Yes

Whether the framework automatically acknowledges messages.

Default: false (automatic acknowledgment disabled)

className

Yes

The connector type’s class reference beginning with org.apache.pulsar, such as org.apache.pulsar.common.schema.KeyValue or org.apache.pulsar.io.kafka.connect.KafkaConnectSink.

cleanupSubscription

No

Whether to delete subscriptions that are created or used by a sink when the sink is deleted.

Default: false

configs

Yes

A JSON-formatted key-value map containing configuration properties specific to the connector type, including provider-specific authentication and integration settings. For available subproperties, see Kafka Connect adaptor properties (configs).

Default: {} (Empty map, uses defaults if they exist, fails otherwise)

customRuntimeOptions

No

A string that encodes options to configure the Apache Pulsar function runtime.

deadLetterTopic

No

The name of the topic that receives unacknowledged messages, such as those that exceed the maximum number of retries or fail to be processed completely.

If null or not set, unacknowledged messages are discarded.

See also maxMessageRetries and negativeAckRedeliveryDelayMs.

inputSpecs

No

A map of input topics to consumer configuration. By default, most values are null or empty. For example:

  "inputSpecs": {
    "persistent://$TENANT/$NAMESPACE/$TOPIC": {
      "schemaType": null,
      "serdeClassName": null,
      "schemaProperties": {},
      "consumerProperties": {},
      "receiverQueueSize": null,
      "cryptoConfig": null,
      "poolMessages": false,
      "regexPattern": false
    }
  },

inputs

Yes

An array of input topics that the sink consumes messages from, such as ["persistent://$TENANT/$NAMESPACE/$TOPIC"].

To consume all topics matching a naming pattern, use topicsPattern instead. inputs and topicsPattern are mutually exclusive.

Default: []

maxMessageRetries

No

Maximum number of times that a message attempts to be delivered before being sent to the dead letter queue as an unacknowledged message.

See also negativeAckRedeliveryDelayMs and deadLetterTopic.

name

Yes

The name for the connector. It must start with a lowercase letter, and contain only numbers, hyphens (-), and lowercase letters. DataStax recommends a memorable, human-readable name that summarizes the connector’s purpose. For example: bigquery-prod-us-east-1.

namespace

Yes

The namespace in your Pulsar tenant where you want to create the connector.

negativeAckRedeliveryDelayMs

No

The amount of time, in milliseconds, to wait before attempting redelivery if message delivery times out or fails.

See also maxMessageRetries and deadLetterTopic.

parallelism

Yes

The number of Pulsar function instances to run.

Default: 1

processingGuarantees

Yes

The messaging delivery semantic to use when writing to topics: ATLEAST_ONCE, ATMOST_ONCE, or EFFECTIVELY_ONCE.

Respect for the processing guarantee depends on the connector implementation.

For more information, see the Pulsar documentation on Function processing guarantees and Processing guarantees in I/O connectors.

Default: ATLEAST_ONCE

resources

No

A JSON string describing the compute resources to allocate to each Pulsar function instance. For example: {"cpu":0.25,"disk":1000000000,"ram":500000000}.

retainKeyOrdering

No

Whether the sink consumes and processes messages in key order.

Default: true

retainOrdering

No

Whether the sink consumes and processes messages in the order they were written to the topic.

Default: false

runtimeFlags

No

A string that encodes flags to pass to the Apache Pulsar function runtime. Only applicable to process and Kubernetes runtimes.

secrets

No

If security is enabled on your function workers, you can provide a map of secret names (secretName) to secrets provider configuration objects. The secretName is used by the connector to reference the secret. The mapped object contains the required properties to fetch the secret from the secrets provider.

To get the types for the values in this mapping, use the SecretProviderConfigurator.getSecretObjectType() method.

This is separate from connector-specific security settings in configs and the Pulsar authentication token used to connect to your Pulsar cluster.

sourceSubscriptionName

No

The name of a specific Pulsar source subscription, if required by your input topic consumer.

sourceSubscriptionPosition

No

The position to begin reading from in the source, if sourceSubscriptionName is set.

Default: Latest

tenant

Yes

The Pulsar tenant where you want to create the connector.

timeoutMs

No

The message timeout in milliseconds

Default: 5000

topicsPattern

No

A topic naming pattern to select topics to consume from all topics in a namespace.

To consume an exact list of topics, use inputs instead. inputs and topicsPattern are mutually exclusive.

topicToSchemaProperties

No

A map of input topics to schema properties specified as a JSON object.

topicToSchemaType

No

A map of input topics to schema types or class names specified as a JSON object.

topicToSerdeClassName

No

A map of input topics to SerDe class names specified as a JSON object.

Kafka Connect adaptor properties (configs)

The properties in the configs section define how data is streamed from Pulsar to BigQuery using the Kafka Connect adaptor.

For more information, see PulsarKafkaConnectSinkConfig.java.

Name Required Description

batchSize

No

The maximum size, in bytes, of messages that the sink attempts to batch before flush.

Default: 16384

collapsePartitionedTopics

No

Whether to omit the -partition- suffix from the names of partitioned topics. This determines how many BigQuery tables are created from partitioned topics:

  • true: Collapse partitioned topics by removing the -partition- suffix from topic names. All partitions of a partitioned topic are written to the same BigQuery table.

  • false (default): Maintain partitions by preserving the -partition- suffix when creating BigQuery tables from topic names. Each partition of a partitioned topic is written to its own BigQuery table.

kafkaConnectorConfigProperties

No

A key-value map of Kafka Connect BigQuery Sink properties.

kafkaConnectorSinkClass

Yes

The Kafka connector sink class to use. For Astra Streaming, use com.wepay.kafka.connect.bigquery.BigQuerySinkConnector unless you have your own custom class.

lingerTimeMs

No

The duration, in milliseconds, that the sink attempts to batch messages before flush.

Default: 2147483647L

maxBatchBitsForOffset

No

Number of bits to use for the index of messages in a batch, which are translated into offsets (UUIDs). Must be an integer between 0 and 20.

If 0, the behavior is disabled, and all messages in a batch have the same offset.

Default: 12

offsetStorageTopic

Yes

The Pulsar topic where you want to store message offsets (UUIDs). This is an additional topic that helps track the connector’s progress as messages are written to BigQuery.

sanitizeTopicName

Yes

Whether to replace unprocessable characters in topic names with underscores. For example, a Pulsar topic named persistent://a/b/topic becomes persistent___a_b_topic when this property is true.

For BigQuery, you must set sanitizeTopicName to true to prevent errors from invalid characters in topic names. BigQuery table names can include only letters, numbers, and underscores, and the Kafka Connect BigQuery Sink doesn’t sanitize topic names before creating BigQuery tables.

Be aware that this can cause naming collisions if two topics resolve to the same sanitized name. For example, topic_a and topic.a are both sanitized as topic_a.

Default: false

topic

Yes

The name of the Pulsar topic that you want the sink to read from. Only provide the topic name, not the whole address.

unwrapKeyValueIfAvailable

No

Whether to unwrap KeyValue<> data if the message value is of type Record<KeyValue<>>:

  • true (default): For Record<KeyValue<>> data, extract KeyValue<> from the Record when writing to BigQuery.

  • false: For Record<KeyValue<>> data, use the Record key (the entire KeyValue<> object) for writing to BigQuery.

useIndexAsOffset

No

Whether to use the message index as the offset, instead of the message sequenceId:

  • true (default): Use the message index as the offset, if an index is available. Otherwise, use the message sequenceId.

  • false: Use the message sequenceId as the offset.

Requires AppendIndexMetadataInterceptor and exposingBrokerEntryMetadataToClientEnabled=true on brokers.

useOptionalPrimitives

No

Pulsar schemas don’t contain information about optional schemas, but Kafka schemas can include this information.

Set this property to true to force all primitive schemas to be optional when converting the Pulsar schema to a Kafka schema.

Default: false

Kafka Connect BigQuery Sink properties (kafkaConnectorConfigProperties)

kafkaConnectorConfigProperties is a subproperty of configs that contains a map of configuration properties for the Kafka Connect BigQuery Sink.

After the Kafka Connect adaptor transforms the Pulsar data into a Kafka-compatible format, the Kafka Connect BigQuery Sink writes the data to BigQuery according to the properties in kafkaConnectorConfigProperties.

Some of the Kafka Connect BigQuery Sink properties are the same as the Kafka Connect adaptor properties. In some cases, the duplicate properties require matching values. In other cases, you can use duplicate properties to perform additional data transformation or schema modification before writing to BigQuery.

Name Required Description

allBQFieldsNullable

No

Whether the BigQuery schemas generated by the connector allow null values in all fields.

  • false (default): All fields in the BigQuery schemas are REQUIRED.

  • true: No fields in the BigQuery schemas are REQUIRED. All non-nullable Avro fields are translated as NULLABLE (or REPEATED, if arrays).

allowBigQueryRequiredFieldRelaxation

No

If allBQFieldsNullable is false, you can set this property to true to allow BigQuery schema updates to change REQUIRED fields to NULLABLE. This only applies to schema updates that happen after initial table creation.

Default: false

allowNewBigQueryFields

No

Whether BigQuery schema updates can add fields:

  • false (default): Schema updates cannot add fields to BigQuery tables.

  • true: Schema updates can add new fields to BigQuery tables.

This only applies to schema updates that happen after initial table creation.

allowSchemaUnionization

No

Whether to enable schema unionization during schema updates:

  • false (default): The record of the last schema in a batch is used for any necessary table creation and schema update attempts.

  • true: The existing table schema, if present, is unionized with new record schemas during schema updates.

For more information about this parameter, how it interacts with other parameters, and important considerations, see allowSchemaUnionization in the Kafka Connect BigQuery Sink configuration reference.

autoCreateBucket

No

Whether to automatically create the bucket if it doesn’t exist.

Default: true

autoCreateTables

No

Automatically create BigQuery tables if they don’t already exist.

Tables are named after the sanitized Pulsar topic names unless overridden by topic2TableMap.

Default: false

avroDataCacheSize

No

The size of the cache to use when converting schemas from Avro to Kafka Connect.

Default: 100

bigQueryMessageTimePartitioning

No

Whether or not to use the message time when inserting records. Default uses the connector processing time.

Default: false

bigQueryPartitionDecorator

No

true

Whether to append the partition decorator to the BigQuery table name when inserting records.

* true (default): Appends partition decorator to table name, such as table$yyyyMMdd. * false: Bypasses the logic to append the partition decorator. Uses raw table name for inserts.

bigQueryRetry

No

The number of retry attempts made for a BigQuery request that fails with a backend error or a quota exceeded error.

Default: 0 (no retries)

bigQueryRetryWait

No

The minimum amount of time, in milliseconds, to wait between retry attempts for a BigQuery backend or quota exceeded error.

Default: 1000

clusteringPartitionFieldNames

No

Comma-separated list of fields where data is clustered in BigQuery.

convertDoubleSpecialValues

No

false

Whether to make the following conversions to ensure successful delivery to BigQuery:

  • +Infinity to Double.MAX_VALUE

  • -Infinity and NaN to Double.MIN_VALUE

Default: false (No conversion)

defaultDataset

Yes

The default dataset to be used

deleteEnabled

No

Whether to enable delete functionality on the connector through the use of record keys, intermediate tables, and periodic merge flushes.

A delete is performed when a record with a null value (a tombstone) is read. This feature doesn’t work with SMTs that change the name of the topic.

Default: false

enableBatchLoad (Beta)

No

A subset of topics to be batch loaded through GCS to BigQuery. If set, you can also set the following properties:

  • batchLoadIntervalSec: The frequency, in seconds, that the connector attempts to run GCS to BigQuery load jobs.

  • gcsBucketName: The name of the GCS bucket where blobs are staged for batch loading to BigQuery.

This is a beta feature that might have unexpected results or suboptimal performance.

includeKafkaData

No

Whether to include an extra block containing the original topic, offset, and partition information in the resulting BigQuery rows.

If true, you can use kafkaDataFieldName to specify the field name for this block.

Default: false

intermediateTableSuffix

No

A suffix that is used when creating intermediate tables from BigQuery destination tables.

The names of intermediate tables are constructed from the destination table name, the intermediateTableSuffix, and an additional suffix if needed for uniqueness. For example, using the default suffix, a destination table named bqTable would have intermediate tables named bqTable.tmp, bqTable.tmp_1, bqTable.tmp_2, and so on.

Default: .tmp

keyfile

Yes

A string representation of Google credentials JSON, or the path to a Google credentials JSON file.

String format is required when creating this connector in the Astra Portal.

keySource

Yes

If keyfile is a file path, set keySource to FILE.

If keyfile is a string, set keySource to JSON.

JSON is required when creating this connector in the Astra Portal.

Default: FILE

mergeIntervalMs

No

The merge flush interval in milliseconds, if upsert/delete is enabled.

Set to -1 to disable periodic flushing.

Default: 60_000L

See also mergeRecordsThreshold.

mergeRecordsThreshold

No

The number of records to write to an intermediate table before performing a merge flush, if upsert/delete is enabled.

The default is -1, which disables merge flushing based on record count.

See also mergeIntervalMs.

name

Yes

You must set this to the same value as the name in the Pulsar sink connector properties.

project

Yes

The BigQuery project to write to.

queueSize

No

The maximum size of the worker queue for BigQuery write requests before all topics are paused.

This is a soft limit; the queue size can exceed the maximum before topics are paused. All topics resume after a flush, or when the queue size is less than half of the maximum.

The default is -1, which means the queue size is unlimited.

sanitizeTopics

Yes

In kafkaConnectorConfigProperties, you must set this property to false.

Topic names are sanitized by the Kafka Connect adaptor with sanitizeTopicName in configs.

Default: false

schemaRetriever

No

A class that can be used to create tables and update schemas automatically.

Default: com.wepay.kafka.connect.bigquery.retrieve.IdentitySchemaRetriever

threadPoolSize

No

The size of the BigQuery write thread pool. This determines the maximum number of concurrent writes to BigQuery.

Default: 10

timePartitioningType

No

The time partitioning type to use when creating tables: DAY (default), HOUR, MONTH, or YEAR. This applies to new tables only; existing tables aren’t altered to this partitioning type.

timestampPartitionFieldName

No

The name of the field in the message value that contains a timestamp to use for partitioning in BigQuery, and to enable timestamp partitioning on each table.

If empty (default), tables use ingestion-time partitioning.

topic2TableMap

No

Optional map of topic names to table names formatted as comma-separated tuples, such as topic1:table1,topic2:table2,topic3:table3.

If you use this property, you must use the sanitized topic names, not the original Pulsar topic names. Because sanitizeTopicName is true in configs, the Kafka Connect BigQuery Sink receives sanitized topic names from the Kafka Connect adaptor. For example, a Pulsar topic named persistent://a/b/c-d must be mapped with the sanitized name persistent___a_b_c_d.

topics

Yes

You must set this to the same value as the topic in the Pulsar sink connector properties.

upsertEnabled

Was this helpful?

Give Feedback

How can we improve the documentation?

© Copyright IBM Corporation 2026 | Privacy policy | Terms of use Manage Privacy Choices

Apache, Apache Cassandra, Cassandra, Apache Tomcat, Tomcat, Apache Lucene, Apache Solr, Apache Hadoop, Hadoop, Apache Pulsar, Pulsar, Apache Spark, Spark, Apache TinkerPop, TinkerPop, Apache Kafka and Kafka are either registered trademarks or trademarks of the Apache Software Foundation or its subsidiaries in Canada, the United States and/or other countries. Kubernetes is the registered trademark of the Linux Foundation.

General Inquiries: Contact IBM