Google BigQuery
The Pulsar BigQuery sink connector reads messages from Pulsar topics and writes them to Google BigQuery.
This connector doesn’t integrate directly with BigQuery. Instead, it uses the Apache Pulsar Kafka Connect adaptor to transform Pulsar message data into a Kafka-compatible format, and then it uses the Kafka Connect BigQuery Sink for the BigQuery integration.
The adaptor provides a flexible and extensible framework for data transformation and processing, including support for common data formats and in-transit data transformations.
|
This connector doesn’t require a Kafka instance. Although the connector uses Kafka-related libraries, it operates entirely within Pulsar. Classes, properties, and configuration options that reference Kafka are translation points that map Pulsar concepts to Kafka concepts in the connector. |
Create the connector
-
Optional: If you are using the
pulsar-adminCLI or Pulsar Admin API, set the following commonly-used environment variables:export TENANT="TENANT_NAME" export TOPIC="INPUT_TOPIC_NAME" export NAMESPACE="NAMESPACE_NAME" # or default export SINK_NAME="SINK_CONNECTOR_UNIQUE_NAME" export PULSAR_TOKEN="TENANT_PULSAR_TOKEN" # API only export WEB_SERVICE_URL="TENANT_PULSAR_WEB_SERVICE_URL" # API onlySINK_NAMEis the name for your new sink connector. DataStax recommends a memorable, human-readable name that summarizes the connector’s purpose. For example:bigquery-sink-prod-us-east-1. -
Create the connector using JSON-formatted connector configuration data. You can pass the configuration directly or with a configuration file.
pulsar-admin CLI./bin/pulsar-admin sinks create \ --sink-type bigquery \ --name "$SINK_NAME" \ --inputs "persistent://$TENANT/$NAMESPACE/$TOPIC" \ --tenant "$TENANT" \ --sink-config-file configs.jsonPulsar Admin APIcurl -sS --fail -L -X POST "$WEB_SERVICE_URL/admin/v3/sinks/$TENANT/$NAMESPACE/$SINK_NAME" \ --header "Authorization: Bearer $PULSAR_TOKEN" \ --form "sinkConfig=@configs.json;type=application/json"Example configuration data structure{ "archive": "builtin://bigquery", "autoAck": true, "className": "org.apache.pulsar.io.kafka.connect.KafkaConnectSink", "cleanupSubscription": false, "configs": { "batchSize": "1000", "kafkaConnectorConfigProperties": { "name": "${SINK_NAME}", "project": "my-bigquery-project", "defaultDataset": "BQ_CONNECTOR_TEST", "autoCreateBucket": true, "autoCreateTables": false, "keySource": "JSON", "queueSize": "-1", "sanitizeTopics": false, "topics": "${TOPIC}", "keyfile": { "type": "service_account", "project_id": "XXXXXX", "private_key_id": "XXXXXXXXX", "private_key": "-----BEGIN PRIVATE KEY-----\nMIIEvQIBADANBgkqhkiG9w ... U=\n-----END PRIVATE KEY-----\n", "client_email": "XXXXXXXXX", "client_id": "XXXXXX", "auth_uri": "https://accounts.google.com/o/oauth2/auth", "token_uri": "https://oauth2.googleapis.com/token", "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs", "client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/XXXXXX" }, }, "kafkaConnectorSinkClass": "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector", "lingerTimeMs": "1000", "offsetStorageTopic": "${TOPIC}-offsets", "sanitizeTopicName": true, "topic": "${TOPIC}" }, "customRuntimeOptions": "internal_data", "deadLetterTopic": null, "inputSpecs": { "${TENANT}/${NAMESPACE}/${TOPIC}": { "consumerProperties": {}, "cryptoConfig": null, "poolMessages": false, "receiverQueueSize": null, "regexPattern": false, "schemaProperties": {}, "schemaType": null, "serdeClassName": null } }, "inputs": [ "${TENANT}/${NAMESPACE}/${TOPIC}" ], "maxMessageRetries": null, "name": "${SINK_NAME}", "namespace": "${NAMESPACE}", "negativeAckRedeliveryDelayMs": null, "parallelism": 1, "processingGuarantees": "EFFECTIVELY_ONCE", "resources": { "cpu": 0.25, "disk": 1000000000, "ram": 1000000000 }, "retainOrdering": false, "retainKeyOrdering": true, "runtimeFlags": null, "secrets": {}, "sourceSubscriptionName": null, "sourceSubscriptionPosition": "Latest", "tenant": "${TENANT}", "timeoutMs": 5000, "topicToSchemaProperties": null, "topicToSchemaType": null, "topicToSerdeClassName": null, "transformFunction": null, "transformFunctionClassName": null, "transformFunctionConfig": null }
Edit the connector
To update a connector, pass the new configuration definition to the connector. For example, if you created the connector with a configuration file, you can pass an updated configuration file.
You can include the entire configuration or only the properties that you want to change.
Additionally, some properties can be modified with specific arguments, such as --parallelism.
To get the current configuration, see Get sink connector configuration data.
./bin/pulsar-admin sinks update \
--sink-type bigquery \
--name "$SINK_NAME" \
--inputs "persistent://$TENANT/$NAMESPACE/$TOPIC" \
--tenant "$TENANT" \
--parallelism 2
curl -sS --fail -L -X PUT "$WEB_SERVICE_URL/admin/v3/sinks/$TENANT/$NAMESPACE/$SINK_NAME" \
--header "Authorization: Bearer $PULSAR_TOKEN" \
--form "sinkConfig=@configs.json;type=application/json"
Manage the connector
See Create and manage connectors for details on how to manage connectors after you create them, including:
-
Get connector status
-
Get existing connectors
-
Get connector configuration details
-
Start connectors
-
Stop connectors
-
Restart connectors
-
Delete connectors
Connector configuration reference
To configure Astra Streaming Pulsar connectors, you use a combination of common Pulsar properties and provider-specific properties.
Because Astra Streaming and the Astra Streaming Pulsar connectors are based on a specific version of the open-source Apache Pulsar project and connectors, some properties and values might be unsupported or ignored by Astra Streaming.
Pulsar sink connector properties
|
Pulsar connectors and functions both use Pulsar functions workers. Therefore, some function configuration properties are also used to configure connectors. |
The following table lists Astra Streaming Pulsar sink connector configuration properties by JSON field name. For the equivalent command line arguments and more information about these properties, see the following:
| Field name | Required | Description |
|---|---|---|
|
Yes |
The type of built-in Astra Streaming Pulsar connector that you want to deploy.
Formatted as |
|
Yes |
Whether the framework automatically acknowledges messages. Default: |
|
Yes |
The connector type’s class reference beginning with |
|
No |
Whether to delete subscriptions that are created or used by a sink when the sink is deleted. Default: |
|
Yes |
A JSON-formatted key-value map containing configuration properties specific to the connector type, including provider-specific authentication and integration settings.
For available subproperties, see Kafka Connect adaptor properties ( Default: |
|
No |
A string that encodes options to configure the Apache Pulsar function runtime. |
|
No |
The name of the topic that receives unacknowledged messages, such as those that exceed the maximum number of retries or fail to be processed completely. If See also |
|
No |
A map of input topics to consumer configuration.
By default, most values are
|
|
Yes |
An array of input topics that the sink consumes messages from, such as To consume all topics matching a naming pattern, use Default: |
|
No |
Maximum number of times that a message attempts to be delivered before being sent to the dead letter queue as an unacknowledged message. See also |
|
Yes |
The name for the connector.
It must start with a lowercase letter, and contain only numbers, hyphens ( |
|
Yes |
The namespace in your Pulsar tenant where you want to create the connector. |
|
No |
The amount of time, in milliseconds, to wait before attempting redelivery if message delivery times out or fails. See also |
|
Yes |
The number of Pulsar function instances to run. Default: |
|
Yes |
The messaging delivery semantic to use when writing to topics: Respect for the processing guarantee depends on the connector implementation. For more information, see the Pulsar documentation on Function processing guarantees and Processing guarantees in I/O connectors. Default: |
|
No |
A JSON string describing the compute resources to allocate to each Pulsar function instance.
For example: |
|
No |
Whether the sink consumes and processes messages in key order. Default: |
|
No |
Whether the sink consumes and processes messages in the order they were written to the topic. Default: |
|
No |
A string that encodes flags to pass to the Apache Pulsar function runtime. Only applicable to process and Kubernetes runtimes. |
|
No |
If security is enabled on your function workers, you can provide a map of secret names ( To get the types for the values in this mapping, use the This is separate from connector-specific security settings in |
|
No |
The name of a specific Pulsar source subscription, if required by your input topic consumer. |
|
No |
The position to begin reading from in the source, if Default: |
|
Yes |
The Pulsar tenant where you want to create the connector. |
|
No |
The message timeout in milliseconds Default: |
|
No |
A topic naming pattern to select topics to consume from all topics in a namespace. To consume an exact list of topics, use |
|
No |
A map of input topics to schema properties specified as a JSON object. |
|
No |
A map of input topics to schema types or class names specified as a JSON object. |
|
No |
A map of input topics to SerDe class names specified as a JSON object. |
Kafka Connect adaptor properties (configs)
The properties in the configs section define how data is streamed from Pulsar to BigQuery using the Kafka Connect adaptor.
For more information, see PulsarKafkaConnectSinkConfig.java.
| Name | Required | Description | ||
|---|---|---|---|---|
|
No |
The maximum size, in bytes, of messages that the sink attempts to batch before flush. Default: |
||
|
No |
Whether to omit the
|
||
|
No |
A key-value map of Kafka Connect BigQuery Sink properties. |
||
|
Yes |
The Kafka connector sink class to use.
For Astra Streaming, use |
||
|
No |
The duration, in milliseconds, that the sink attempts to batch messages before flush. Default: |
||
|
No |
Number of bits to use for the index of messages in a batch, which are translated into offsets (UUIDs).
Must be an integer between If Default: |
||
|
Yes |
The Pulsar topic where you want to store message offsets (UUIDs). This is an additional topic that helps track the connector’s progress as messages are written to BigQuery. |
||
|
Yes |
Whether to replace unprocessable characters in topic names with underscores.
For example, a Pulsar topic named
Default: |
||
|
Yes |
The name of the Pulsar topic that you want the sink to read from. Only provide the topic name, not the whole address. |
||
|
No |
Whether to unwrap
|
||
|
No |
Whether to use the message index as the offset, instead of the message
Requires |
||
|
No |
Pulsar schemas don’t contain information about optional schemas, but Kafka schemas can include this information. Set this property to Default: |
Kafka Connect BigQuery Sink properties (kafkaConnectorConfigProperties)
kafkaConnectorConfigProperties is a subproperty of configs that contains a map of configuration properties for the Kafka Connect BigQuery Sink.
After the Kafka Connect adaptor transforms the Pulsar data into a Kafka-compatible format, the Kafka Connect BigQuery Sink writes the data to BigQuery according to the properties in kafkaConnectorConfigProperties.
Some of the Kafka Connect BigQuery Sink properties are the same as the Kafka Connect adaptor properties. In some cases, the duplicate properties require matching values. In other cases, you can use duplicate properties to perform additional data transformation or schema modification before writing to BigQuery.
For more information, see the Kafka Connect BigQuery Sink configuration reference and BigQuerySinkConfig.java.
| Name | Required | Description | ||
|---|---|---|---|---|
|
No |
Whether the BigQuery schemas generated by the connector allow
|
||
|
No |
If Default: |
||
|
No |
Whether BigQuery schema updates can add fields:
This only applies to schema updates that happen after initial table creation. |
||
|
No |
Whether to enable schema unionization during schema updates:
For more information about this parameter, how it interacts with other parameters, and important considerations, see |
||
|
No |
Whether to automatically create the bucket if it doesn’t exist. Default: |
||
|
No |
Automatically create BigQuery tables if they don’t already exist. Tables are named after the sanitized Pulsar topic names unless overridden by Default: |
||
|
No |
The size of the cache to use when converting schemas from Avro to Kafka Connect. Default: |
||
|
No |
Whether or not to use the message time when inserting records. Default uses the connector processing time. Default: |
||
|
No |
|
||
Whether to append the partition decorator to the BigQuery table name when inserting records. * |
|
No |
||
The number of retry attempts made for a BigQuery request that fails with a backend error or a quota exceeded error. Default: |
|
No |
||
The minimum amount of time, in milliseconds, to wait between retry attempts for a BigQuery backend or quota exceeded error. Default: |
|
No |
||
Comma-separated list of fields where data is clustered in BigQuery. |
|
No |
||
|
Whether to make the following conversions to ensure successful delivery to BigQuery:
Default: |
|
||
Yes |
The default dataset to be used |
|
||
No |
Whether to enable delete functionality on the connector through the use of record keys, intermediate tables, and periodic merge flushes. A delete is performed when a record with a null value (a tombstone) is read. This feature doesn’t work with SMTs that change the name of the topic. Default: |
|
||
No |
A subset of
This is a beta feature that might have unexpected results or suboptimal performance. |
|
||
No |
Whether to include an extra block containing the original topic, offset, and partition information in the resulting BigQuery rows. If Default: |
|
||
No |
A suffix that is used when creating intermediate tables from BigQuery destination tables. The names of intermediate tables are constructed from the destination table name, the Default: |
|
||
Yes |
A string representation of Google credentials JSON, or the path to a Google credentials JSON file. String format is required when creating this connector in the Astra Portal. |
|
||
Yes |
If If
Default: |
|
||
No |
The merge flush interval in milliseconds, if upsert/delete is enabled. Set to Default: See also |
|
||
No |
The number of records to write to an intermediate table before performing a merge flush, if upsert/delete is enabled. The default is See also |
|
||
Yes |
You must set this to the same value as the |
|
||
Yes |
The BigQuery project to write to. |
|
||
No |
The maximum size of the worker queue for BigQuery write requests before all topics are paused. This is a soft limit; the queue size can exceed the maximum before topics are paused. All topics resume after a flush, or when the queue size is less than half of the maximum. The default is |
|
||
Yes |
In Topic names are sanitized by the Kafka Connect adaptor with Default: |
|
||
No |
A class that can be used to create tables and update schemas automatically. Default: |
|
||
No |
The size of the BigQuery write thread pool. This determines the maximum number of concurrent writes to BigQuery. Default: |
|
||
No |
The time partitioning type to use when creating tables: |
|
||
No |
The name of the field in the message If empty (default), tables use ingestion-time partitioning. |
|
||
No |
Optional map of topic names to table names formatted as comma-separated tuples, such as
|
|
||
Yes |
You must set this to the same value as the |
|