Google BigQuery

Google BigQuery is a fully managed enterprise data warehouse that helps you manage and analyze your data with built-in features like machine learning, geospatial analysis, and business intelligence. BigQuery’s serverless architecture lets you use SQL queries to answer your organization’s biggest questions with zero infrastructure management. BigQuery’s scalable, distributed analysis engine lets you query terabytes in seconds and petabytes in minutes.

BigQuery Pulsar Sink is not integrated with BigQuery directly. It uses Pulsar’s built-in Kafka Connect adapter library to transform message data into a Kafka compatible format. Then the Kafka Connect BigQuery Sink is used as the actual BigQuery integration. The adaptor provides a flexible and extensible framework for data transformation and processing. It supports various data formats, including JSON, Avro, and Protobuf, and enables users to apply transformations on the data as it is being streamed from Pulsar.

You will notice references to Kafka throughout the configuration. You don’t need a running instance of Kafka to use this connector. The Kafka references are used as "translation points” by this connector.

Get Started

Set the following environment variables using pulsar-admin or curl:

export TENANT=<replace-me>
export INPUT_TOPIC=<replace-me>
export NAMESPACE=default
export SINK_NAME=bigquery-sink
  • Pulsar Admin

  • curl

  • Sample Config Data

Refer to the complete pulsar-admin sinks spec for all available options.

Assuming you have downloaded client.conf to the Pulsar folder:

./bin/pulsar-admin sinks create \
  --sink-type bigquery \
  --name "$SINK_NAME" \
  --inputs "$TENANT/$NAMESPACE/$INPUT_TOPIC" \
  --tenant "$TENANT" \
  --processing-guarantees EFFECTIVELY_ONCE \
  --sink-config '{
      "topic": "bq-test01",
      "kafkaConnectorSinkClass": "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",
      "offsetStorageTopic": "bq-sink-offsets01",
      "sanitizeTopicName": "true",
      "kafkaConnectorConfigProperties":
        "name": "bq-sink1",
        "topics": "bq-test01",
        "project": "my-bigquery-project",
        "defaultDataset": "BQ_CONNECTOR_TEST",
        "keyfile": "/Users/me/my-bigquery-key.json",
        "keySource": "FILE",
        "autoCreateTables": "true",
        "sanitizeTopics": "false"
      }'

You need a Pulsar token for REST API authentication. This is different from your Astra DB application tokens.

  1. In the Astra Portal, click Streaming tenants.

  2. Click your tenant’s name, and then click the Settings tab.

  3. Click Create Token.

  4. Copy the token, store it securely, and then click Close.

  5. Click the Connect tab, and then copy the Web Service URL.

  6. Create environment variables for your tenant’s token and web service URL:

    export WEB_SERVICE_URL=<replace-me>
    export ASTRA_STREAMING_TOKEN=<replace-me>

    Refer to the complete Pulsar sinks REST API spec for all available options.

curl -sS --fail --request POST ''$WEB_SERVICE_URL'/admin/v3/sinks/'$TENANT'/'$NAMESPACE'/'$SINK_NAME'?opt=poweruser' \
  --header "Authorization: Bearer $PULSAR_TOKEN" \
  --form 'sinkConfig="{
    \"archive\":\"builtin:\/\/bigquery\",
    \"tenant\":\"'$TENANT'\",
    \"namespace\":\"'$NAMESPACE'\",
    \"name\":\"'$SINK_NAME'\",
    \"parallelism\": 1,
    \"inputs\":[\"'$TENANT'\/'$NAMESPACE'\/'$INPUT_TOPIC'\"],
    \"configs\":{
      \"topic\": \"bq-test01\",
      \"kafkaConnectorSinkClass\": \"com.wepay.kafka.connect.bigquery.BigQuerySinkConnector\",
      \"offsetStorageTopic\": \"bq-sink-offsets01\",
      \"sanitizeTopicName\": \"true\",
      \"kafkaConnectorConfigProperties\":{
        \"name\": \"bq-sink1\",
        \"topics\": \"bq-test01\",
        \"project\": \"my-bigquery-project\",
        \"defaultDataset\": \"BQ_CONNECTOR_TEST\",
        \"keyfile\": \"{\"type\":\"service_account\",\"project_id\":\"XXXXXX\",\"private_key_id\":\"XXXXXXXXX\",\"private_key\":\"-----BEGIN PRIVATE KEY-----\\nMIIEvQIBADANBgkqhkiG9w … U=\\n-----END PRIVATE KEY-----\\n\",\"client_email\":\"XXXXXXXXX\",\"client_id\":\"XXXXXX\",\"auth_uri\":\"https://accounts.google.com/o/oauth2/auth\",\"token_uri\":\"https://oauth2.googleapis.com/token\",\"auth_provider_x509_cert_url\":\"https://www.googleapis.com/oauth2/v1/certs\",\"client_x509_cert_url\":\"https://www.googleapis.com/robot/v1/metadata/x509/XXXXXX\"}\",
        \"keySource\": \"JSON\",
        \"autoCreateTables\": \"true\",
        \"sanitizeTopics\": \"false\"
      }
    }
  }"'
{
  "archive": "builtin://bigquery",
  "autoAck": true,
  "className": "org.apache.pulsar.io.kafka.connect.KafkaConnectSink",
  "cleanupSubscription": false,
  "configs": {
    "batchSize": "1000",
    "kafkaConnectorConfigProperties": {
      "autoCreateBucket": true,
      "autoCreateTables": false,
      "keySource": "JSON",
      "queueSize": "-1",
      "sanitizeTopics": false,
      "topics": "homelab/default/clue-sensors"
    },
    "kafkaConnectorSinkClass": "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",
    "lingerTimeMs": "1000",
    "offsetStorageTopic": "homelab/default/clue-sensors",
    "sanitizeTopicName": true,
    "topic": "homelab/default/clue-sensors"
  },
  "customRuntimeOptions": "internal_data",
  "deadLetterTopic": null,
  "inputSpecs": {
    "homelab/default/clue-sensors": {
      "consumerProperties": {},
      "cryptoConfig": null,
      "poolMessages": false,
      "receiverQueueSize": null,
      "regexPattern": false,
      "schemaProperties": {},
      "schemaType": null,
      "serdeClassName": null
    }
  },
  "inputs": [
    "homelab/default/clue-sensors"
  ],
  "maxMessageRetries": null,
  "name": "bq-sink",
  "namespace": "default",
  "negativeAckRedeliveryDelayMs": null,
  "parallelism": 1,
  "processingGuarantees": "EFFECTIVELY_ONCE",
  "resources": {
    "cpu": 0.25,
    "disk": 1000000000,
    "ram": 1000000000
  },
  "retainKeyOrdering": false,
  "retainOrdering": true,
  "runtimeFlags": null,
  "secrets": null,
  "sourceSubscriptionName": null,
  "sourceSubscriptionPosition": "Latest",
  "tenant": "homelab",
  "timeoutMs": 2000,
  "topicToSchemaProperties": null,
  "topicToSchemaType": null,
  "topicToSerdeClassName": null,
  "topicsPattern": null,
  "transformFunction": null,
  "transformFunctionClassName": null,
  "transformFunctionConfig": null
}

Managing the Connector

Start

  • Pulsar Admin

  • curl

Refer to the complete pulsar-admin sinks spec for all available options.

Assuming you have downloaded client.conf to the Pulsar folder:

# Start all instances of a connector
./bin/pulsar-admin sinks start \
  --namespace "$NAMESPACE" \
  --name "$SINK_NAME" \
  --tenant "$TENANT"

# optionally add --instance-id to only start an individual instance

You need a Pulsar token for REST API authentication. This is different from your Astra DB application tokens.

  1. In the Astra Portal, click Streaming tenants.

  2. Click your tenant’s name, and then click the Settings tab.

  3. Click Create Token.

  4. Copy the token, store it securely, and then click Close.

  5. Click the Connect tab, and then copy the Web Service URL.

  6. Create environment variables for your tenant’s token and web service URL:

    export WEB_SERVICE_URL=<replace-me>
    export ASTRA_STREAMING_TOKEN=<replace-me>

    Refer to the complete Pulsar sinks REST API spec for all available options.

# Start all instances of a connector
curl -sS --fail --location --request POST ''$WEB_SERVICE_URL'/admin/v3/sinks/'$TENANT'/'$NAMESPACE'/'$SINK_NAME'/start' \
  --header "Authorization: Bearer $ASTRA_STREAMING_TOKEN"

# Start an individual instance of a connector
curl -X POST "$WEB_SERVICE_URL/admin/v3/sinks/$TENANT/$NAMESPACE/$SINK_NAME/$SINK_INSTANCEID/start" \
-H "Authorization: $ASTRA_STREAMING_TOKEN"

Stop

  • Pulsar Admin

  • curl

Refer to the complete pulsar-admin sinks spec for all available options.

Assuming you have downloaded client.conf to the Pulsar folder:

# Stop all instances of a connector
./bin/pulsar-admin sinks stop \
  --namespace "$NAMESPACE" \
  --name "$SINK_NAME" \
  --tenant "$TENANT"

# optionally add --instance-id to only stop an individual instance

You need a Pulsar token for REST API authentication. This is different from your Astra DB application tokens.

  1. In the Astra Portal, click Streaming tenants.

  2. Click your tenant’s name, and then click the Settings tab.

  3. Click Create Token.

  4. Copy the token, store it securely, and then click Close.

  5. Click the Connect tab, and then copy the Web Service URL.

  6. Create environment variables for your tenant’s token and web service URL:

    export WEB_SERVICE_URL=<replace-me>
    export ASTRA_STREAMING_TOKEN=<replace-me>

    Refer to the complete Pulsar sinks REST API spec for all available options.

# Stop all instances of a connector
curl -sS --fail --request POST ''$WEB_SERVICE_URL'/admin/v3/sinks/'$TENANT'/'$NAMESPACE'/'$SINK_NAME'/stop' \
  --header "Authorization: Bearer $ASTRA_STREAMING_TOKEN"

# Stop an individual instance of a connector
curl -X POST "$WEB_SERVICE_URL/admin/v3/sinks/$TENANT/$NAMESPACE/$SINK_NAME/$SINK_INSTANCEID/stop" \
  --H "Authorization: $ASTRA_STREAMING_TOKEN"

Restart

  • Pulsar Admin

  • curl

Refer to the complete pulsar-admin sinks spec for all available options.

Assuming you have downloaded client.conf to the Pulsar folder:

# Restart all instances of a connector
./bin/pulsar-admin sinks restart \
  --namespace "$NAMESPACE" \
  --name "$SINK_NAME" \
  --tenant "$TENANT"

# optionally add --instance-id to only restart an individual instance

You need a Pulsar token for REST API authentication. This is different from your Astra DB application tokens.

  1. In the Astra Portal, click Streaming tenants.

  2. Click your tenant’s name, and then click the Settings tab.

  3. Click Create Token.

  4. Copy the token, store it securely, and then click Close.

  5. Click the Connect tab, and then copy the Web Service URL.

  6. Create environment variables for your tenant’s token and web service URL:

    export WEB_SERVICE_URL=<replace-me>
    export ASTRA_STREAMING_TOKEN=<replace-me>

    Refer to the complete Pulsar sinks REST API spec for all available options.

# Restart all instances of a connector
curl -sS --fail --request POST ''$WEB_SERVICE_URL'/admin/v3/sinks/'$TENANT'/'$NAMESPACE'/'$SINK_NAME'/restart' \
  --header "Authorization: Bearer $ASTRA_STREAMING_TOKEN"

# Restart an individual instance of a connector
curl -X POST "$WEB_SERVICE_URL/admin/v3/sinks/$TENANT/$NAMESPACE/$SINK_NAME/$SINK_INSTANCEID/restart" \
-H "Authorization: $ASTRA_STREAMING_TOKEN"

Update

  • Pulsar Admin

  • curl

  • Sample Config Data

Refer to the complete pulsar-admin sinks spec for all available options.

Assuming you have downloaded client.conf to the Pulsar folder:

./bin/pulsar-admin sinks update \
  --sink-type bigquery \
  --name "$SINK_NAME" \
  --inputs "$TENANT/$NAMESPACE/$INPUT_TOPIC" \
  --tenant "$TENANT" \
  --parallelism 2

You need a Pulsar token for REST API authentication. This is different from your Astra DB application tokens.

  1. In the Astra Portal, click Streaming tenants.

  2. Click your tenant’s name, and then click the Settings tab.

  3. Click Create Token.

  4. Copy the token, store it securely, and then click Close.

  5. Click the Connect tab, and then copy the Web Service URL.

  6. Create environment variables for your tenant’s token and web service URL:

    export WEB_SERVICE_URL=<replace-me>
    export ASTRA_STREAMING_TOKEN=<replace-me>

    Refer to the complete Pulsar sinks REST API spec for all available options.

curl -sS --fail --request PUT ''$WEB_SERVICE_URL'/admin/v3/sinks/'$TENANT'/'$NAMESPACE'/'$SINK_NAME'?opt=poweruser' \
  --header "Authorization: Bearer $ASTRA_STREAMING_TOKEN" \
  --form 'sinkConfig="{
    \"archive\":\"builtin:\/\/bigquery\",
    \"tenant\":\"'$TENANT'\",
    \"namespace\":\"'$NAMESPACE'\",
    \"name\":\"'$SINK_NAME'\",
    \"parallelism\": 2,
    \"inputs\":[\"'$TENANT'\/'$NAMESPACE'\/'$INPUT_TOPIC'\"]
  }"'
{
  "archive": "builtin://bigquery",
  "autoAck": true,
  "className": "org.apache.pulsar.io.kafka.connect.KafkaConnectSink",
  "cleanupSubscription": false,
  "configs": {
    "batchSize": "1000",
    "kafkaConnectorConfigProperties": {
      "autoCreateBucket": true,
      "autoCreateTables": false,
      "keySource": "JSON",
      "queueSize": "-1",
      "sanitizeTopics": false,
      "topics": "homelab/default/clue-sensors"
    },
    "kafkaConnectorSinkClass": "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",
    "lingerTimeMs": "1000",
    "offsetStorageTopic": "homelab/default/clue-sensors",
    "sanitizeTopicName": true,
    "topic": "homelab/default/clue-sensors"
  },
  "customRuntimeOptions": "internal_data",
  "deadLetterTopic": null,
  "inputSpecs": {
    "homelab/default/clue-sensors": {
      "consumerProperties": {},
      "cryptoConfig": null,
      "poolMessages": false,
      "receiverQueueSize": null,
      "regexPattern": false,
      "schemaProperties": {},
      "schemaType": null,
      "serdeClassName": null
    }
  },
  "inputs": [
    "homelab/default/clue-sensors"
  ],
  "maxMessageRetries": null,
  "name": "bq-sink",
  "namespace": "default",
  "negativeAckRedeliveryDelayMs": null,
  "parallelism": 1,
  "processingGuarantees": "EFFECTIVELY_ONCE",
  "resources": {
    "cpu": 0.25,
    "disk": 1000000000,
    "ram": 1000000000
  },
  "retainKeyOrdering": false,
  "retainOrdering": true,
  "runtimeFlags": null,
  "secrets": null,
  "sourceSubscriptionName": null,
  "sourceSubscriptionPosition": "Latest",
  "tenant": "homelab",
  "timeoutMs": 2000,
  "topicToSchemaProperties": null,
  "topicToSchemaType": null,
  "topicToSerdeClassName": null,
  "topicsPattern": null,
  "transformFunction": null,
  "transformFunctionClassName": null,
  "transformFunctionConfig": null
}

Delete

  • Pulsar Admin

  • curl

Refer to the complete pulsar-admin sinks spec for all available options.

Assuming you have downloaded client.conf to the Pulsar folder:

# Delete all instances of a connector
./bin/pulsar-admin sinks delete \
  --namespace "$NAMESPACE" \
  --name "$SINK_NAME" \
  --tenant "$TENANT"

You need a Pulsar token for REST API authentication. This is different from your Astra DB application tokens.

  1. In the Astra Portal, click Streaming tenants.

  2. Click your tenant’s name, and then click the Settings tab.

  3. Click Create Token.

  4. Copy the token, store it securely, and then click Close.

  5. Click the Connect tab, and then copy the Web Service URL.

  6. Create environment variables for your tenant’s token and web service URL:

    export WEB_SERVICE_URL=<replace-me>
    export ASTRA_STREAMING_TOKEN=<replace-me>

    Refer to the complete Pulsar sinks REST API spec for all available options.

# Delete all instances of a connector
curl -sS --fail --location --request DELETE ''$WEB_SERVICE_URL'/admin/v3/sinks/'$TENANT'/'$NAMESPACE'/'$SINK_NAME'' \
  --header "Authorization: Bearer $ASTRA_STREAMING_TOKEN"

Monitoring the Connector

Info

  • Pulsar Admin

  • curl

Refer to the complete pulsar-admin sinks spec for all available options.

Assuming you have downloaded client.conf to the Pulsar folder:

# Get information about a connector
./bin/pulsar-admin sinks get \
  --namespace "$NAMESPACE" \
  --name "$SINK_NAME" \
  --tenant "$TENANT"

You need a Pulsar token for REST API authentication. This is different from your Astra DB application tokens.

  1. In the Astra Portal, click Streaming tenants.

  2. Click your tenant’s name, and then click the Settings tab.

  3. Click Create Token.

  4. Copy the token, store it securely, and then click Close.

  5. Click the Connect tab, and then copy the Web Service URL.

  6. Create environment variables for your tenant’s token and web service URL:

    export WEB_SERVICE_URL=<replace-me>
    export ASTRA_STREAMING_TOKEN=<replace-me>

    Refer to the complete Pulsar sinks REST API spec for all available options.

# Get information about a connector
curl -sS --fail --location ''$WEB_SERVICE_URL'/admin/v3/sinks/'$TENANT'/'$NAMESPACE'/'$SINK_NAME'' \
  --header "Authorization: Bearer $ASTRA_STREAMING_TOKEN"

+ .Result

Details
{
  "tenant": "string",
  "namespace": "string",
  "name": "string",
  "className": "string",
  "sourceSubscriptionName": "string",
  "sourceSubscriptionPosition": "Latest",
  "inputs": [
    "string"
  ],
  "topicToSerdeClassName": {
    "property1": "string",
    "property2": "string"
  },
  "topicsPattern": "string",
  "topicToSchemaType": {
    "property1": "string",
    "property2": "string"
  },
  "topicToSchemaProperties": {
    "property1": "string",
    "property2": "string"
  },
  "inputSpecs": {
    "property1": {
      "schemaType": "string",
      "serdeClassName": "string",
      "schemaProperties": {
        "property1": "string",
        "property2": "string"
      },
      "consumerProperties": {
        "property1": "string",
        "property2": "string"
      },
      "receiverQueueSize": 0,
      "cryptoConfig": {
        "cryptoKeyReaderClassName": "string",
        "cryptoKeyReaderConfig": {
          "property1": {},
          "property2": {}
        },
        "encryptionKeys": [
          "string"
        ],
        "producerCryptoFailureAction": "FAIL",
        "consumerCryptoFailureAction": "FAIL"
      },
      "poolMessages": true,
      "regexPattern": true
    },
    "property2": {
      "schemaType": "string",
      "serdeClassName": "string",
      "schemaProperties": {
        "property1": "string",
        "property2": "string"
      },
      "consumerProperties": {
        "property1": "string",
        "property2": "string"
      },
      "receiverQueueSize": 0,
      "cryptoConfig": {
        "cryptoKeyReaderClassName": "string",
        "cryptoKeyReaderConfig": {
          "property1": {},
          "property2": {}
        },
        "encryptionKeys": [
          "string"
        ],
        "producerCryptoFailureAction": "FAIL",
        "consumerCryptoFailureAction": "FAIL"
      },
      "poolMessages": true,
      "regexPattern": true
    }
  },
  "maxMessageRetries": 0,
  "deadLetterTopic": "string",
  "configs": {
    "property1": {},
    "property2": {}
  },
  "secrets": {
    "property1": {},
    "property2": {}
  },
  "parallelism": 0,
  "processingGuarantees": "ATLEAST_ONCE",
  "retainOrdering": true,
  "retainKeyOrdering": true,
  "resources": {
    "cpu": 0,
    "ram": 0,
    "disk": 0
  },
  "autoAck": true,
  "timeoutMs": 0,
  "negativeAckRedeliveryDelayMs": 0,
  "sinkType": "string",
  "archive": "string",
  "cleanupSubscription": true,
  "runtimeFlags": "string",
  "customRuntimeOptions": "string",
  "transformFunction": "string",
  "transformFunctionClassName": "string",
  "transformFunctionConfig": "string"
}

Health

  • Pulsar Admin

  • curl

Refer to the complete pulsar-admin sinks spec for all available options.

Assuming you have downloaded client.conf to the Pulsar folder:

# Check connector status
./bin/pulsar-admin sinks status \
  --instance-id "$SINK_INSTANCEID" \
  --namespace "$NAMESPACE" \
  --name "$SINK_NAME" \
  --tenant "$TENANT"

You need a Pulsar token for REST API authentication. This is different from your Astra DB application tokens.

  1. In the Astra Portal, click Streaming tenants.

  2. Click your tenant’s name, and then click the Settings tab.

  3. Click Create Token.

  4. Copy the token, store it securely, and then click Close.

  5. Click the Connect tab, and then copy the Web Service URL.

  6. Create environment variables for your tenant’s token and web service URL:

    export WEB_SERVICE_URL=<replace-me>
    export ASTRA_STREAMING_TOKEN=<replace-me>

    Refer to the complete Pulsar sinks REST API spec for all available options.

# Get the status of all connector instances
curl -sS --fail --location ''$WEB_SERVICE_URL'/admin/v3/sinks/'$TENANT'/'$NAMESPACE'/'$SINK_NAME'/status' \
  --header "Authorization: Bearer $ASTRA_STREAMING_TOKEN"

# Get the status of an individual connector instance
curl "$WEB_SERVICE_URL/admin/v3/sinks/$TENANT/$NAMESPACE/$SINK_NAME/$SINK_INSTANCEID/status" \
  -H "accept: application/json" \
  -H "Authorization: $ASTRA_STREAMING_TOKEN"
Result

Status response for all connector instances:

{
 "numInstances": 0,
 "numRunning": 0,
 "instances": [
   {
     "instanceId": 0,
     "status": {
       "running": true,
       "error": "string",
       "numRestarts": 0,
       "numReadFromPulsar": 0,
       "numSystemExceptions": 0,
       "latestSystemExceptions": [
         {
           "exceptionString": "string",
           "timestampMs": 0
         }
       ],
       "numSinkExceptions": 0,
       "latestSinkExceptions": [
         {
           "exceptionString": "string",
           "timestampMs": 0
         }
       ],
       "numWrittenToSink": 0,
       "lastReceivedTime": 0,
       "workerId": "string"
     }
   }
 ]
}

Status response for individual connector instance:

{
 "running": true,
 "error": "string",
 "numRestarts": 0,
 "numReadFromPulsar": 0,
 "numSystemExceptions": 0,
 "latestSystemExceptions": [
   {
     "exceptionString": "string",
     "timestampMs": 0
   }
 ],
 "numSinkExceptions": 0,
 "latestSinkExceptions": [
   {
     "exceptionString": "string",
     "timestampMs": 0
   }
 ],
 "numWrittenToSink": 0,
 "lastReceivedTime": 0,
 "workerId": "string"
}

Metrics

Astra Streaming exposes Prometheus formatted metrics for every connector. Refer to the scrape metrics with Prometheus page for more detail.

Connector Reference

The BigQuery sink has multiple sets of parameters: the Astra Streaming parameters, the Kafka Connect Adapter parameters, and the Google BigQuery parameters. Each set of parameters provides a way to coordinate how data will be streamed from Pulsar to BigQuery.

Astra Streaming

Name Required Default Description

archive

true

The connector type, like 'builtin://elastic_search'

autoAck

true

false

Boolean denotes whether or not the framework will automatically acknowledge messages

className

true

The connector type’s class reference, like 'org.apache.pulsar.io.debezium.mysql.DebeziumMysqlSource'

cleanupSubscription

false

false

Boolean denotes whether the subscriptions the functions created/used should be deleted when the functions is deleted

configs

false

{}

A key/value map of config properties specific to the type of connector. See the reference table below for values.

customRuntimeOptions

false

A string that encodes options to customize the runtime, see Apache Pulsar docs for configured runtime for details

deadLetterTopic

false

Name of the dead topic where the failing messages will be sent

inputSpecs

false

The map of input topics to its consumer configuration, each configuration has schema of {"schemaType": "type-x", "serdeClassName": "name-x", "isRegexPattern": true, "receiverQueueSize": 5}

inputs

true

[]

The input topic or topics of the Sink (specified as a JSON array)

maxMessageRetries

false

Maximum number of times that a message will be redelivered before being sent to the dead letter queue

name

true

Give your sink a good name for later reference. The name must start with a lowercase alphabetic character. It can only contain lowercase alphanumeric characters, and hyphens(kebab-case).

namespace

true

The namespace you’d like the sink created under

negativeAckRedeliveryDelayMs

false

The negative ack message redelivery delay in milliseconds

parallelism

true

1

The number of sink instances to run

processingGuarantees

true

ATLEAST_ONCE

The delivery semantics applied to the Pulsar Sink. Values are 'ATLEAST_ONCE', 'ATMOST_ONCE', 'EFFECTIVELY_ONCE'

resources

false

The compute resources that need to be allocated per instance (applicable only to the process)(as a JSON string). Example: {"cpu": 0.25,"disk":1000000000,"ram":500000000}

retainKeyOrdering

false

true

Sink consumes and processes messages in key order

retainOrdering

false

false

Boolean denotes whether the Pulsar Sink consumes and processes messages in order

runtimeFlags

false

A string that encodes options to customize the runtime, see Apache Pulsar docs for configured runtime for details

secrets

false

This is a map of secretName(that is how the secret is going to be accessed in the function via context) to an object that encapsulates how the secret is fetched by the underlying secrets provider. The type of an value here can be found by the SecretProviderConfigurator.getSecretObjectType() method

sourceSubscriptionName

false

Pulsar source subscription name if user wants a specific subscription-name for input-topic consumer

sourceSubscriptionPosition

false

Earliest

The position to begin reading from the source

tenant

true

The tenant you’d like the sink created under

timeoutMs

false

5000

Denotes the message timeout in milliseconds

topicToSchemaProperties

false

topicToSchemaType

false

The map of input topics to Schema types or class names (specified as a JSON object)

topicToSerdeClassName

false

The map of input topics to SerDe class names (specified as a JSON object)

topicsPattern

false

TopicsPattern to consume from list of topics under a namespace that match the pattern. [input] and [topicsPattern] are mutually exclusive. Add SerDe class name for a pattern in customSerdeInputs (supported for java fun only)

Kafka Connect Adapter Configuration (configs)

These values are provided in the configs area.

For source code for these configuration, see PulsarKafkaConnectSinkConfig.java.

Name Required Default Description

kafkaConnectorSinkClass

yes

A Kafka-connector sink class to use. Unless you’ve developed your own, use the value com.wepay.kafka.connect.bigquery.BigQuerySinkConnector.

offsetStorageTopic

yes

Pulsar topic to store offsets at. This is an additional topic to your topic with the actual data going to BigQuery.

sanitizeTopicName

yes

Some connectors cannot handle Pulsar topic names like persistent://a/b/topic, and they won’t sanitize the topic name themselves. If enabled, all non alpha-digital characters in topic name are replaced with underscores. In some cases this may result in topic name collisions (topic_a and topic.a both resolve to topic_a).

This value must be true. Any other value causes an error.

batchSize

no

16384

Size of messages in bytes the sink will attempt to batch messages together before flush.

collapsePartitionedTopics

no

false

Supply Kafka record with topic name without -partition- suffix for partitioned topics.

kafkaConnectorConfigProperties

no

{}

A key/value map of config properties to pass to the Kafka connector. See the reference table below.

lingerTimeMs

no

2147483647L

Time interval in milliseconds the sink will attempt to batch messages together before flush.

maxBatchBitsForOffset

no

12

Number of bits (0 to 20) to use for index of message in the batch for translation into an offset. 0 to disable this behavior (Messages from the same batch will have the same offset which can affect some connectors.)

topic

yes

The Kafka topic name that is passed to the Kafka sink.

unwrapKeyValueIfAvailable

no

true

In case of Record<KeyValue<>> data use key from KeyValue<> instead of one from Record.

useIndexAsOffset

no

true

Allows use of message index instead of message sequenceId as offset, if available. Requires AppendIndexMetadataInterceptor and exposingBrokerEntryMetadataToClientEnabled=true on brokers.

useOptionalPrimitives

no

false

Pulsar schema does not contain information whether the Schema is optional, Kafka’s does. This provides a way to force all primitive schemas to be optional for Kafka.

Google BigQuery Configuration (kafkaConnectorConfigProperties)

These values are provided in the kafkaConnectorConfigProperties area.

For the source code for these configurations, see BigQuerySinkConfig.java.

Name Required Default Description

allBQFieldsNullable

no

false

If true, no fields in any produced BigQuery schemas are REQUIRED. All non-nullable Avro fields are translated as NULLABLE (or REPEATED, if arrays).

allowBigQueryRequiredFieldRelaxation

no

false

If true, fields in BigQuery Schema can be changed from REQUIRED to NULLABLE.

allowNewBigQueryFields

no

false

If true, new fields can be added to BigQuery tables during subsequent schema updates.

allowSchemaUnionization

no

false

If true, the existing table schema (if one is present) is unionized with new record schemas during schema updates.

If false, the record of the last schema in a batch is used for any necessary table creation and schema update attempts.

Setting allowSchemaUnionization to false and allowNewBigQueryFields and allowBigQueryRequiredFieldRelaxation to true is equivalent to setting autoUpdateSchemas to true in older (pre-2.0.0) versions of this connector. In this case, if BigQuery raises a schema validation exception or a table doesn’t exist when a writing a batch, the connector tries to remediate by required field relaxation and/or adding new fields.

If allowSchemaUnionization, allowNewBigQueryFields, and allowBigQueryRequiredFieldRelaxation are all true, then the connector creates or updates tables with a schema whose fields are a union of the existing table schema fields and the fields present in all of the records of the current batch.

The key difference is that with unionization disabled, new record schemas have to be a superset of the table schema in BigQuery.

allowSchemaUnionization is a useful tool for parsing. For example, if you’d like to remove fields from data upstream, the updated schemas still work in the connector. It is similarly useful when different tasks see records whose schemas contain different fields that are not in the table.

However, be aware that if allowSchemaUnionization is set to true, and some bad records are in the topic, then the BigQuery schema can be permanently changed. This presents two issues:

  • Since BigQuery doesn’t allow columns to be dropped from tables, they add unnecessary noise to the schema.

  • Since BigQuery doesn’t allow column types to be modified, they can break downstream pipelines where well-behaved records have schemas whose field names overlap with the accidentally-added columns in the table, but the types don’t match.

autoCreateBucket

no

true

Whether to automatically create the given bucket if it does not exist.

autoCreateTables

no

false

Automatically create BigQuery tables if they don’t already exist

avroDataCacheSize

no

100

The size of the cache to use when converting schemas from Avro to Kafka Connect.

batchLoadIntervalSec

no

120

The interval, in seconds, in which to attempt to run GCS to BigQuery load jobs. Only relevant if enableBatchLoad is configured.

bigQueryMessageTimePartitioning

no

false

Whether or not to use the message time when inserting records. Default uses the connector processing time.

bigQueryPartitionDecorator

no

true

Whether or not to append partition decorator to BigQuery table name when inserting records. Default is true. Setting this to true appends partition decorator to table name (e.g. table$yyyyMMdd depending on the configuration set for bigQueryPartitionDecorator). Setting this to false bypasses the logic to append the partition decorator and uses raw table name for inserts.

bigQueryRetry

no

0

The number of retry attempts made for a BigQuery request that fails with a backend error or a quota exceeded error.

bigQueryRetryWait

no

1000

The minimum amount of time, in milliseconds, to wait between retry attempts for a BigQuery backend or quota exceeded error.

clusteringPartitionFieldNames

no

Comma-separated list of fields where data is clustered in BigQuery.

convertDoubleSpecialValues

no

false

Designates whether +Infinity is converted to Double.MAX_VALUE and whether -Infinity and NaN are converted to Double.MIN_VALUE to ensure successful delivery to BigQuery.

defaultDataset

yes

The default dataset to be used

deleteEnabled

no

false

Enable delete functionality on the connector through the use of record keys, intermediate tables, and periodic merge flushes. A delete will be performed when a record with a null value (that is–a tombstone record) is read. This feature will not work with SMTs that change the name of the topic.

enableBatchLoad

no

empty

Beta Feature. Use with caution. The sublist of topics to be batch loaded through GCS.

gcsBucketName

no

empty

The name of the bucket where Google Cloud Storage (GCS) blobs are located. These blobs are used to batch-load to BigQuery. This is applicable only if enableBatchLoad is configured.

includeKafkaData

no

false

Whether to include an extra block containing the Kafka source topic, offset, and partition information in the resulting BigQuery rows.

intermediateTableSuffix

no

.tmp

A suffix that will be appended to the names of destination tables to create the names for the corresponding intermediate tables. Multiple intermediate tables may be created for a single destination table, but their names will always start with the name of the destination table, followed by this suffix, and possibly followed by an additional suffix.

kafkaDataFieldName

no

The Kafka data field name. The default value is null, which means the Kafka Data field will not be included.

kafkaKeyFieldName

no

The Kafka key field name. The default value is null, which means the Kafka Key field will not be included.

keyfile

yes

Can be either a string representation of the Google credentials file or the path to the Google credentials file itself.

When using the Astra Streaming UI, the string representation must be used. If using pulsar-admin with Astra Streaming, either the representation or file can be used.

keySource

yes

FILE

Determines whether the keyfile configuration is the path to the credentials JSON file or to the JSON itself. Available values are FILE and JSON.

When using the Astra Streaming UI, JSON will be the only option. If using pulsar-admin with Astra Streaming, either the representation or file can be used.

name

yes

The name of the connector. Use the same value as Pulsar sink name.

mergeIntervalMs

no

60_000L

How often (in milliseconds) to perform a merge flush, if upsert/delete is enabled. Can be set to -1 to disable periodic flushing.

mergeRecordsThreshold

no

-1

How many records to write to an intermediate table before performing a merge flush, if upsert/delete is enabled. If set to -1, then record count-based flushing is disabled.

project

yes

The BigQuery project to write to

queueSize

no

-1

The maximum size (or -1 for no maximum size) of the worker queue for BigQuery write requests before all topics are paused. This is a soft limit; the size of the queue can go over this before topics are paused. All topics resume once a flush is triggered or the size of the queue drops under half of the maximum size.

sanitizeTopics

yes

false

Designates whether to automatically sanitize topic names before using them as table names. If not enabled, topic names are used as table names.

The only accepted value is false. Providing any other value will result in an error.

schemaRetriever

no

com.wepay.kafka.connect.bigquery.retrieve.IdentitySchemaRetriever

A class that can be used for automatically creating tables and/or updating schemas.

threadPoolSize

no

10

The size of the BigQuery write thread pool. This establishes the maximum number of concurrent writes to BigQuery.

timePartitioningType

no

DAY

The time partitioning type to use when creating tables. Existing tables will not be altered to use this partitioning type. Valid Values: (case insensitive) [MONTH, YEAR, HOUR, DAY]

timestampPartitionFieldName

no

The name of the field in the value that contains the timestamp to partition by in BigQuery and enable timestamp partitioning for each table. Leave this configuration blank, to enable ingestion time partitioning for each table.

topic2TableMap

no

Optional map of topics to tables in the format of comma-separated tuples, such as <topic-1>:<table-1>,<topic-2>:<table-2>,…​

Because sanitizeTopicName must be true, any alphanumeric character in topic names are replaced with underscores. Keep this in mind when creating the mapping to avoid overlapping names. For example, if the topic name is provided as persistent://a/b/c-d, then the mapping topic name would be persistent___a_b_c_d.

topics

yes

A list of Kafka topics to read from. Use the same name as the Pulsar topic. Only provide the topic name, not the whole address.

upsertEnabled

no

false

Enable upsert functionality on the connector through the use of record keys, intermediate tables, and periodic merge flushes. Row-matching will be performed based on the contents of record keys. This feature won’t work with SMTs that change the name of the topic.

Was this helpful?

Give Feedback

How can we improve the documentation?

© 2025 DataStax, an IBM Company | Privacy policy | Terms of use | Manage Privacy Choices

Apache, Apache Cassandra, Cassandra, Apache Tomcat, Tomcat, Apache Lucene, Apache Solr, Apache Hadoop, Hadoop, Apache Pulsar, Pulsar, Apache Spark, Spark, Apache TinkerPop, TinkerPop, Apache Kafka and Kafka are either registered trademarks or trademarks of the Apache Software Foundation or its subsidiaries in Canada, the United States and/or other countries. Kubernetes is the registered trademark of the Linux Foundation.

General Inquiries: +1 (650) 389-6000, info@datastax.com