Cloud Storage
Each public cloud has different ways of persisting data to their storage systems. Each cloud has their own way of formatting and storing the bytes. The Cloud Storage sink connector is a general interface to a chosen cloud storage, that exports data from a Pulsar topic to the given system following a desired format.
The cloud storage system supported are:
Get Started
Set the following environment variables using pulsar-admin
or curl:
export TENANT=<replace-me>
export INPUT_TOPIC=<replace-me>
export NAMESPACE=default
export SINK_NAME=cloud-storage-sink
-
Pulsar Admin
-
curl
-
Sample Config Data
Refer to the complete pulsar-admin sinks spec for all available options.
Assuming you have downloaded client.conf
to the Pulsar
folder:
./bin/pulsar-admin sinks create \
--sink-type cloud-storage \
--name "$SINK_NAME" \
--inputs "$TENANT/$NAMESPACE/$INPUT_TOPIC" \
--tenant "$TENANT" \
--processing-guarantees EFFECTIVELY_ONCE \
--sink-config '{ <see below reference for storage specifics> }'
You need a Pulsar token for REST API authentication. This is different from your Astra DB application tokens.
-
In the Astra Portal, click Streaming tenants.
-
Click your tenant’s name, and then click the Settings tab.
-
Click Create Token.
-
Copy the token, store it securely, and then click Close.
-
Click the Connect tab, and then copy the Web Service URL.
-
Create environment variables for your tenant’s token and web service URL:
export WEB_SERVICE_URL=<replace-me> export ASTRA_STREAMING_TOKEN=<replace-me>
Refer to the complete Pulsar sinks REST API spec for all available options.
curl -sS --fail --request POST ''$WEB_SERVICE_URL'/admin/v3/sinks/'$TENANT'/'$NAMESPACE'/'$SINK_NAME'?opt=poweruser' \
--header "Authorization: Bearer $PULSAR_TOKEN" \
--form 'sinkConfig="{
\"archive\":\"builtin:\/\/cloud-storage\",
\"tenant\":\"'$TENANT'\",
\"namespace\":\"'$NAMESPACE'\",
\"name\":\"'$SINK_NAME'\",
\"parallelism\": 1,
\"inputs\":[\"'$TENANT'\/'$NAMESPACE'\/'$INPUT_TOPIC'\"],
\"configs\":{ <see below reference for storage specifics> }
}"'
{
"archive": "builtin://cloud-storage",
"autoAck": true,
"className": "org.apache.pulsar.io.jcloud.sink.CloudStorageGenericRecordSink",
"cleanupSubscription": false,
"configs": {
"batchSize": "10",
"batchTimeMs": "1000",
"bucket": "S3",
"formatType": "json",
"maxBatchBytes": "10000000",
"partitionerType": "partition",
"partitionerUseIndexAsOffset": false,
"pendingQueueSize": "10",
"provider": "AWS",
"skipFailedMessages": false,
"sliceTopicPartitionPath": false,
"useHumanReadableMessageId": false,
"useHumanReadableSchemaVersion": false,
"withMetadata": false,
"withTopicPartitionNumber": true
},
"customRuntimeOptions": "internal_data",
"deadLetterTopic": null,
"inputSpecs": {
"persistent://homelab/default/clue-sensors": {
"consumerProperties": {},
"cryptoConfig": null,
"poolMessages": false,
"receiverQueueSize": null,
"regexPattern": false,
"schemaProperties": {},
"schemaType": null,
"serdeClassName": null
}
},
"inputs": [
"persistent://homelab/default/clue-sensors"
],
"maxMessageRetries": null,
"name": "cloud-storage-sink",
"namespace": "default",
"negativeAckRedeliveryDelayMs": null,
"parallelism": 1,
"processingGuarantees": "ATLEAST_ONCE",
"resources": {
"cpu": 0.25,
"disk": 1000000000,
"ram": 1000000000
},
"retainKeyOrdering": false,
"retainOrdering": false,
"runtimeFlags": null,
"secrets": null,
"sourceSubscriptionName": null,
"sourceSubscriptionPosition": "Latest",
"tenant": "homelab",
"timeoutMs": 5000,
"topicToSchemaProperties": null,
"topicToSchemaType": null,
"topicToSerdeClassName": null,
"topicsPattern": null,
"transformFunction": null,
"transformFunctionClassName": null,
"transformFunctionConfig": null
}
Data format types
The Cloud Storage sink connector provides multiple output format options, including JSON (default), Avro, Bytes, or Parquet. There are some limitations for certain formats, as explained in the following sections.
Pulsar Schema types supported by the writers
Pulsar Schema | Writer: Avro | Writer: JSON | Writer: Parquet | Writer: Bytes |
---|---|---|---|---|
Primitive |
❌ |
✅ The JSON writer will try to convert data with a String or Bytes schema to JSON-format data if convertible. |
❌ |
✅ |
Avro |
✅ |
✅ |
✅ |
✅ |
Json |
✅ |
✅ |
✅ |
✅ |
Protobuf The Protobuf schema is based on the Avro schema. It uses Avro as an intermediate format, so it may not provide the best effort conversion. |
✅ |
✅ |
✅ |
✅ |
ProtobufNative |
✅ The ProtobufNative record holds the Protobuf descriptor and the message.
When writing to Avro format, the connector uses |
❌ |
✅ |
✅ |
Supported withMetadata configurations for writer formats
Writer Format | withMetadata |
---|---|
Avro |
✅ |
JSON |
✅ |
Parquet |
✅ |
Bytes |
❌ |
Parquet with PROTOBUF_NATIVE format
When using Parquet with PROTOBUF_NATIVE
format, the connector writes the messages with the DynamicMessage
format.
When withMetadata
is true, the connector adds message_metadata to the messages with PulsarIOCSCProtobufMessageMetadata
format.
For example, if a message User
has the following schema:
syntax = "proto3";
message User {
string name = 1;
int32 age = 2;
}
When withMetadata
is set to true, the connector writes the message DynamicMessage
with the following schema:
syntax = "proto3";
message PulsarIOCSCProtobufMessageMetadata {
map<string, string> properties = 1;
string schema_version = 2;
string message_id = 3;
}
message User {
string name = 1;
int32 age = 2;
PulsarIOCSCProtobufMessageMetadata __message_metadata__ = 3;
}
Skip unsupported messages
By default, when the connector receives a message with a non-supported schema type, the connector will fail the message.
If you want to skip the non-supported messages, you can set skipFailedMessages
to true.
Dead-letter topics
If a message fails to send to a Cloud Storage sink, the connector can send the message to a dead-letter topic instead, if a dead-letter topic is assigned.
To use a dead-letter topic, set skipFailedMessages
to false
in the cloud provider config.
Then, using either pulsar-admin
or curl, set --max-redeliver-count
and --dead-letter-topic
.
Managing the Connector
Start
-
Pulsar Admin
-
curl
Refer to the complete pulsar-admin sinks spec for all available options.
Assuming you have downloaded client.conf
to the Pulsar
folder:
# Start all instances of a connector
./bin/pulsar-admin sinks start \
--namespace "$NAMESPACE" \
--name "$SINK_NAME" \
--tenant "$TENANT"
# optionally add --instance-id to only start an individual instance
You need a Pulsar token for REST API authentication. This is different from your Astra DB application tokens.
-
In the Astra Portal, click Streaming tenants.
-
Click your tenant’s name, and then click the Settings tab.
-
Click Create Token.
-
Copy the token, store it securely, and then click Close.
-
Click the Connect tab, and then copy the Web Service URL.
-
Create environment variables for your tenant’s token and web service URL:
export WEB_SERVICE_URL=<replace-me> export ASTRA_STREAMING_TOKEN=<replace-me>
Refer to the complete Pulsar sinks REST API spec for all available options.
# Start all instances of a connector
curl -sS --fail --location --request POST ''$WEB_SERVICE_URL'/admin/v3/sinks/'$TENANT'/'$NAMESPACE'/'$SINK_NAME'/start' \
--header "Authorization: Bearer $ASTRA_STREAMING_TOKEN"
# Start an individual instance of a connector
curl -X POST "$WEB_SERVICE_URL/admin/v3/sinks/$TENANT/$NAMESPACE/$SINK_NAME/$SINK_INSTANCEID/start" \
-H "Authorization: $ASTRA_STREAMING_TOKEN"
Stop
-
Pulsar Admin
-
curl
Refer to the complete pulsar-admin sinks spec for all available options.
Assuming you have downloaded client.conf
to the Pulsar
folder:
# Stop all instances of a connector
./bin/pulsar-admin sinks stop \
--namespace "$NAMESPACE" \
--name "$SINK_NAME" \
--tenant "$TENANT"
# optionally add --instance-id to only stop an individual instance
You need a Pulsar token for REST API authentication. This is different from your Astra DB application tokens.
-
In the Astra Portal, click Streaming tenants.
-
Click your tenant’s name, and then click the Settings tab.
-
Click Create Token.
-
Copy the token, store it securely, and then click Close.
-
Click the Connect tab, and then copy the Web Service URL.
-
Create environment variables for your tenant’s token and web service URL:
export WEB_SERVICE_URL=<replace-me> export ASTRA_STREAMING_TOKEN=<replace-me>
Refer to the complete Pulsar sinks REST API spec for all available options.
# Stop all instances of a connector
curl -sS --fail --request POST ''$WEB_SERVICE_URL'/admin/v3/sinks/'$TENANT'/'$NAMESPACE'/'$SINK_NAME'/stop' \
--header "Authorization: Bearer $ASTRA_STREAMING_TOKEN"
# Stop an individual instance of a connector
curl -X POST "$WEB_SERVICE_URL/admin/v3/sinks/$TENANT/$NAMESPACE/$SINK_NAME/$SINK_INSTANCEID/stop" \
--H "Authorization: $ASTRA_STREAMING_TOKEN"
Restart
-
Pulsar Admin
-
curl
Refer to the complete pulsar-admin sinks spec for all available options.
Assuming you have downloaded client.conf
to the Pulsar
folder:
# Restart all instances of a connector
./bin/pulsar-admin sinks restart \
--namespace "$NAMESPACE" \
--name "$SINK_NAME" \
--tenant "$TENANT"
# optionally add --instance-id to only restart an individual instance
You need a Pulsar token for REST API authentication. This is different from your Astra DB application tokens.
-
In the Astra Portal, click Streaming tenants.
-
Click your tenant’s name, and then click the Settings tab.
-
Click Create Token.
-
Copy the token, store it securely, and then click Close.
-
Click the Connect tab, and then copy the Web Service URL.
-
Create environment variables for your tenant’s token and web service URL:
export WEB_SERVICE_URL=<replace-me> export ASTRA_STREAMING_TOKEN=<replace-me>
Refer to the complete Pulsar sinks REST API spec for all available options.
# Restart all instances of a connector
curl -sS --fail --request POST ''$WEB_SERVICE_URL'/admin/v3/sinks/'$TENANT'/'$NAMESPACE'/'$SINK_NAME'/restart' \
--header "Authorization: Bearer $ASTRA_STREAMING_TOKEN"
# Restart an individual instance of a connector
curl -X POST "$WEB_SERVICE_URL/admin/v3/sinks/$TENANT/$NAMESPACE/$SINK_NAME/$SINK_INSTANCEID/restart" \
-H "Authorization: $ASTRA_STREAMING_TOKEN"
Update
-
Pulsar Admin
-
curl
-
Sample Config Data
Refer to the complete pulsar-admin sinks spec for all available options.
Assuming you have downloaded client.conf
to the Pulsar
folder:
./bin/pulsar-admin sinks update \
--sink-type cloud-storage \
--name "$SINK_NAME" \
--inputs "$TENANT/$NAMESPACE/$INPUT_TOPIC" \
--tenant "$TENANT" \
--parallelism 2
You need a Pulsar token for REST API authentication. This is different from your Astra DB application tokens.
-
In the Astra Portal, click Streaming tenants.
-
Click your tenant’s name, and then click the Settings tab.
-
Click Create Token.
-
Copy the token, store it securely, and then click Close.
-
Click the Connect tab, and then copy the Web Service URL.
-
Create environment variables for your tenant’s token and web service URL:
export WEB_SERVICE_URL=<replace-me> export ASTRA_STREAMING_TOKEN=<replace-me>
Refer to the complete Pulsar sinks REST API spec for all available options.
curl -sS --fail --request PUT ''$WEB_SERVICE_URL'/admin/v3/sinks/'$TENANT'/'$NAMESPACE'/'$SINK_NAME'?opt=poweruser' \
--header "Authorization: Bearer $ASTRA_STREAMING_TOKEN" \
--form 'sinkConfig="{
\"archive\":\"builtin:\/\/cloud-storage\",
\"tenant\":\"'$TENANT'\",
\"namespace\":\"'$NAMESPACE'\",
\"name\":\"'$SINK_NAME'\",
\"parallelism\": 2,
\"inputs\":[\"'$TENANT'\/'$NAMESPACE'\/'$INPUT_TOPIC'\"]
}"'
{
"archive": "builtin://cloud-storage",
"autoAck": true,
"className": "org.apache.pulsar.io.jcloud.sink.CloudStorageGenericRecordSink",
"cleanupSubscription": false,
"configs": {
"batchSize": "10",
"batchTimeMs": "1000",
"bucket": "S3",
"formatType": "json",
"maxBatchBytes": "10000000",
"partitionerType": "partition",
"partitionerUseIndexAsOffset": false,
"pendingQueueSize": "10",
"provider": "AWS",
"skipFailedMessages": false,
"sliceTopicPartitionPath": false,
"useHumanReadableMessageId": false,
"useHumanReadableSchemaVersion": false,
"withMetadata": false,
"withTopicPartitionNumber": true
},
"customRuntimeOptions": "internal_data",
"deadLetterTopic": null,
"inputSpecs": {
"persistent://homelab/default/clue-sensors": {
"consumerProperties": {},
"cryptoConfig": null,
"poolMessages": false,
"receiverQueueSize": null,
"regexPattern": false,
"schemaProperties": {},
"schemaType": null,
"serdeClassName": null
}
},
"inputs": [
"persistent://homelab/default/clue-sensors"
],
"maxMessageRetries": null,
"name": "cloud-storage-sink",
"namespace": "default",
"negativeAckRedeliveryDelayMs": null,
"parallelism": 1,
"processingGuarantees": "ATLEAST_ONCE",
"resources": {
"cpu": 0.25,
"disk": 1000000000,
"ram": 1000000000
},
"retainKeyOrdering": false,
"retainOrdering": false,
"runtimeFlags": null,
"secrets": null,
"sourceSubscriptionName": null,
"sourceSubscriptionPosition": "Latest",
"tenant": "homelab",
"timeoutMs": 5000,
"topicToSchemaProperties": null,
"topicToSchemaType": null,
"topicToSerdeClassName": null,
"topicsPattern": null,
"transformFunction": null,
"transformFunctionClassName": null,
"transformFunctionConfig": null
}
Delete
-
Pulsar Admin
-
curl
Refer to the complete pulsar-admin sinks spec for all available options.
Assuming you have downloaded client.conf
to the Pulsar
folder:
# Delete all instances of a connector
./bin/pulsar-admin sinks delete \
--namespace "$NAMESPACE" \
--name "$SINK_NAME" \
--tenant "$TENANT"
You need a Pulsar token for REST API authentication. This is different from your Astra DB application tokens.
-
In the Astra Portal, click Streaming tenants.
-
Click your tenant’s name, and then click the Settings tab.
-
Click Create Token.
-
Copy the token, store it securely, and then click Close.
-
Click the Connect tab, and then copy the Web Service URL.
-
Create environment variables for your tenant’s token and web service URL:
export WEB_SERVICE_URL=<replace-me> export ASTRA_STREAMING_TOKEN=<replace-me>
Refer to the complete Pulsar sinks REST API spec for all available options.
# Delete all instances of a connector
curl -sS --fail --location --request DELETE ''$WEB_SERVICE_URL'/admin/v3/sinks/'$TENANT'/'$NAMESPACE'/'$SINK_NAME'' \
--header "Authorization: Bearer $ASTRA_STREAMING_TOKEN"
Monitoring the Connector
Info
-
Pulsar Admin
-
curl
Refer to the complete pulsar-admin sinks spec for all available options.
Assuming you have downloaded client.conf
to the Pulsar
folder:
# Get information about a connector
./bin/pulsar-admin sinks get \
--namespace "$NAMESPACE" \
--name "$SINK_NAME" \
--tenant "$TENANT"
You need a Pulsar token for REST API authentication. This is different from your Astra DB application tokens.
-
In the Astra Portal, click Streaming tenants.
-
Click your tenant’s name, and then click the Settings tab.
-
Click Create Token.
-
Copy the token, store it securely, and then click Close.
-
Click the Connect tab, and then copy the Web Service URL.
-
Create environment variables for your tenant’s token and web service URL:
export WEB_SERVICE_URL=<replace-me> export ASTRA_STREAMING_TOKEN=<replace-me>
Refer to the complete Pulsar sinks REST API spec for all available options.
# Get information about a connector
curl -sS --fail --location ''$WEB_SERVICE_URL'/admin/v3/sinks/'$TENANT'/'$NAMESPACE'/'$SINK_NAME'' \
--header "Authorization: Bearer $ASTRA_STREAMING_TOKEN"
+ .Result
Details
{
"tenant": "string",
"namespace": "string",
"name": "string",
"className": "string",
"sourceSubscriptionName": "string",
"sourceSubscriptionPosition": "Latest",
"inputs": [
"string"
],
"topicToSerdeClassName": {
"property1": "string",
"property2": "string"
},
"topicsPattern": "string",
"topicToSchemaType": {
"property1": "string",
"property2": "string"
},
"topicToSchemaProperties": {
"property1": "string",
"property2": "string"
},
"inputSpecs": {
"property1": {
"schemaType": "string",
"serdeClassName": "string",
"schemaProperties": {
"property1": "string",
"property2": "string"
},
"consumerProperties": {
"property1": "string",
"property2": "string"
},
"receiverQueueSize": 0,
"cryptoConfig": {
"cryptoKeyReaderClassName": "string",
"cryptoKeyReaderConfig": {
"property1": {},
"property2": {}
},
"encryptionKeys": [
"string"
],
"producerCryptoFailureAction": "FAIL",
"consumerCryptoFailureAction": "FAIL"
},
"poolMessages": true,
"regexPattern": true
},
"property2": {
"schemaType": "string",
"serdeClassName": "string",
"schemaProperties": {
"property1": "string",
"property2": "string"
},
"consumerProperties": {
"property1": "string",
"property2": "string"
},
"receiverQueueSize": 0,
"cryptoConfig": {
"cryptoKeyReaderClassName": "string",
"cryptoKeyReaderConfig": {
"property1": {},
"property2": {}
},
"encryptionKeys": [
"string"
],
"producerCryptoFailureAction": "FAIL",
"consumerCryptoFailureAction": "FAIL"
},
"poolMessages": true,
"regexPattern": true
}
},
"maxMessageRetries": 0,
"deadLetterTopic": "string",
"configs": {
"property1": {},
"property2": {}
},
"secrets": {
"property1": {},
"property2": {}
},
"parallelism": 0,
"processingGuarantees": "ATLEAST_ONCE",
"retainOrdering": true,
"retainKeyOrdering": true,
"resources": {
"cpu": 0,
"ram": 0,
"disk": 0
},
"autoAck": true,
"timeoutMs": 0,
"negativeAckRedeliveryDelayMs": 0,
"sinkType": "string",
"archive": "string",
"cleanupSubscription": true,
"runtimeFlags": "string",
"customRuntimeOptions": "string",
"transformFunction": "string",
"transformFunctionClassName": "string",
"transformFunctionConfig": "string"
}
Health
-
Pulsar Admin
-
curl
Refer to the complete pulsar-admin sinks spec for all available options.
Assuming you have downloaded client.conf
to the Pulsar
folder:
# Check connector status
./bin/pulsar-admin sinks status \
--instance-id "$SINK_INSTANCEID" \
--namespace "$NAMESPACE" \
--name "$SINK_NAME" \
--tenant "$TENANT"
You need a Pulsar token for REST API authentication. This is different from your Astra DB application tokens.
-
In the Astra Portal, click Streaming tenants.
-
Click your tenant’s name, and then click the Settings tab.
-
Click Create Token.
-
Copy the token, store it securely, and then click Close.
-
Click the Connect tab, and then copy the Web Service URL.
-
Create environment variables for your tenant’s token and web service URL:
export WEB_SERVICE_URL=<replace-me> export ASTRA_STREAMING_TOKEN=<replace-me>
Refer to the complete Pulsar sinks REST API spec for all available options.
# Get the status of all connector instances
curl -sS --fail --location ''$WEB_SERVICE_URL'/admin/v3/sinks/'$TENANT'/'$NAMESPACE'/'$SINK_NAME'/status' \
--header "Authorization: Bearer $ASTRA_STREAMING_TOKEN"
# Get the status of an individual connector instance
curl "$WEB_SERVICE_URL/admin/v3/sinks/$TENANT/$NAMESPACE/$SINK_NAME/$SINK_INSTANCEID/status" \
-H "accept: application/json" \
-H "Authorization: $ASTRA_STREAMING_TOKEN"
Result
Status response for all connector instances:
{
"numInstances": 0,
"numRunning": 0,
"instances": [
{
"instanceId": 0,
"status": {
"running": true,
"error": "string",
"numRestarts": 0,
"numReadFromPulsar": 0,
"numSystemExceptions": 0,
"latestSystemExceptions": [
{
"exceptionString": "string",
"timestampMs": 0
}
],
"numSinkExceptions": 0,
"latestSinkExceptions": [
{
"exceptionString": "string",
"timestampMs": 0
}
],
"numWrittenToSink": 0,
"lastReceivedTime": 0,
"workerId": "string"
}
}
]
}
Status response for individual connector instance:
{
"running": true,
"error": "string",
"numRestarts": 0,
"numReadFromPulsar": 0,
"numSystemExceptions": 0,
"latestSystemExceptions": [
{
"exceptionString": "string",
"timestampMs": 0
}
],
"numSinkExceptions": 0,
"latestSinkExceptions": [
{
"exceptionString": "string",
"timestampMs": 0
}
],
"numWrittenToSink": 0,
"lastReceivedTime": 0,
"workerId": "string"
}
Metrics
Astra Streaming exposes Prometheus formatted metrics for every connector. Refer to the scrape metrics with Prometheus page for more detail.
Connector Reference
With the Cloud Storage Sink there are two sets of parameters: Astra Streaming parameters and cloud storage provider parameters.
Astra Streaming parameters for Cloud Storage Sink
Name | Required | Default | Description |
---|---|---|---|
archive |
true |
The connector type, like 'builtin://elastic_search' |
|
autoAck |
true |
false |
Boolean denotes whether or not the framework will automatically acknowledge messages |
className |
true |
The connector type’s class reference, like 'org.apache.pulsar.io.debezium.mysql.DebeziumMysqlSource' |
|
cleanupSubscription |
false |
false |
Boolean denotes whether the subscriptions the functions created/used should be deleted when the functions is deleted |
configs |
false |
{} |
A key/value map of config properties specific to the type of connector. See the reference table below for values. |
customRuntimeOptions |
false |
A string that encodes options to customize the runtime, see Apache Pulsar docs for configured runtime for details |
|
deadLetterTopic |
false |
Name of the dead topic where the failing messages will be sent |
|
inputSpecs |
false |
The map of input topics to its consumer configuration, each configuration has schema of {"schemaType": "type-x", "serdeClassName": "name-x", "isRegexPattern": true, "receiverQueueSize": 5} |
|
inputs |
true |
[] |
The input topic or topics of the Sink (specified as a JSON array) |
maxMessageRetries |
false |
Maximum number of times that a message will be redelivered before being sent to the dead letter queue |
|
name |
true |
Give your sink a good name for later reference. The name must start with a lowercase alphabetic character. It can only contain lowercase alphanumeric characters, and hyphens(kebab-case). |
|
namespace |
true |
The namespace you’d like the sink created under |
|
negativeAckRedeliveryDelayMs |
false |
The negative ack message redelivery delay in milliseconds |
|
parallelism |
true |
1 |
The number of sink instances to run |
processingGuarantees |
true |
ATLEAST_ONCE |
The delivery semantics applied to the Pulsar Sink. Values are 'ATLEAST_ONCE', 'ATMOST_ONCE', 'EFFECTIVELY_ONCE' |
resources |
false |
The compute resources that need to be allocated per instance (applicable only to the process)(as a JSON string). Example: {"cpu": 0.25,"disk":1000000000,"ram":500000000} |
|
retainKeyOrdering |
false |
true |
Sink consumes and processes messages in key order |
retainOrdering |
false |
false |
Boolean denotes whether the Pulsar Sink consumes and processes messages in order |
runtimeFlags |
false |
A string that encodes options to customize the runtime, see Apache Pulsar docs for configured runtime for details |
|
secrets |
false |
This is a map of secretName(that is how the secret is going to be accessed in the function via context) to an object that encapsulates how the secret is fetched by the underlying secrets provider. The type of an value here can be found by the SecretProviderConfigurator.getSecretObjectType() method |
|
sourceSubscriptionName |
false |
Pulsar source subscription name if user wants a specific subscription-name for input-topic consumer |
|
sourceSubscriptionPosition |
false |
Earliest |
The position to begin reading from the source |
tenant |
true |
The tenant you’d like the sink created under |
|
timeoutMs |
false |
5000 |
Denotes the message timeout in milliseconds |
topicToSchemaProperties |
false |
||
topicToSchemaType |
false |
The map of input topics to Schema types or class names (specified as a JSON object) |
|
topicToSerdeClassName |
false |
The map of input topics to SerDe class names (specified as a JSON object) |
|
topicsPattern |
false |
TopicsPattern to consume from list of topics under a namespace that match the pattern. [input] and [topicsPattern] are mutually exclusive. Add SerDe class name for a pattern in customSerdeInputs (supported for java fun only) |
Cloud storage provider parameters for Cloud Storage Sink
Set your cloud storage provider and other required values in the configs
area.
-
Google Cloud Storage
-
AWS S3 Storage
-
Azure Blob Storage
Name | Required | Default | Description |
---|---|---|---|
bucket |
yes |
null |
The Cloud Storage bucket |
provider |
yes |
null |
The Cloud Storage type. Google cloud storage only supports the google-cloud-storage provider. |
avroCodec |
no |
snappy |
Compression codec used when formatType=avro. Available compression types are: null (no compression), deflate, bzip2, xz, zstandard, snappy. |
batchSize |
no |
10 |
The number of records submitted in batch. |
batchTimeMs |
no |
1000 |
The interval for batch submission. |
bytesFormatTypeSeparator |
no |
0x10 |
It is inserted between records for the formatType of bytes. By default, it is set to '0x10'. An input record that contains the line separator looks like multiple records in the output object. |
formatType |
no |
json |
The data format type. Available options are JSON, Avro, Bytes, or Parquet. By default, it is set to JSON. |
gcsServiceAccountKeyFileContent |
no |
empty |
The contents of the JSON service key file. If empty, credentials are read from gcsServiceAccountKeyFilePath file. |
gcsServiceAccountKeyFilePath |
no |
empty |
Path to the GCS credentials file. If empty, the credentials file are read from the GOOGLE_APPLICATION_CREDENTIALS environment variable. |
jsonAllowNaN |
no |
false |
Recognize 'NaN', 'INF', '-INF' as legal floating number values when formatType=json. Since JSON specification does not allow such values this is a non-standard feature and disabled by default. |
maxBatchBytes |
no |
10000000 |
The maximum number of bytes in a batch. |
parquetCodec |
no |
gzip |
Compression codec used when formatType=parquet. Available compression types are: null (no compression), snappy, gzip, lzo, brotli, lz4, zstd. |
partitionerType |
no |
partition |
The partitioning type. It can be configured by topic partitions or by time. By default, the partition type is configured by topic partitions. |
partitionerUseIndexAsOffset |
no |
false |
Whether to use the Pulsar’s message index as offset or the record sequence. It’s recommended if the incoming messages may be batched. The brokers may or not expose the index metadata and, if it’s not present on the record, the sequence will be used. See PIP-70 for more details. |
pathPrefix |
no |
false |
If it is set, the output files are stored in a folder under the given bucket path. The pathPrefix must be in the format of xx/xxx/. |
pendingQueueSize |
no |
10 |
The number of records buffered in queue. By default, it is equal to batchSize. You can set it manually. |
skipFailedMessages |
no |
false |
Configure whether to skip a message which it fails to be processed. If it is set to true, the connector will skip the failed messages by ack it. Otherwise, the connector will fail the message. |
sliceTopicPartitionPath |
no |
false |
When it is set to true, split the partitioned topic name into separate folders in the bucket path. |
timePartitionDuration |
no |
86400000 |
The time interval for time-based partitioning. Support formatted interval string, such as 30d, 24h, 30m, 10s, and also support number in milliseconds precision, such as 86400000 refers to 24h or 1d. |
timePartitionPattern |
no |
yyyy-MM-dd |
The format pattern of the time-based partitioning. For details, refer to the Java date and time format. |
useHumanReadableMessageId |
no |
false |
Use a human-readable format string for messageId in message metadata. The messageId is in a format like ledgerId:entryId:partitionIndex:batchIndex. Otherwise, the messageId is a Hex-encoded string. |
useHumanReadableSchemaVersion |
no |
false |
Use a human-readable format string for the schema version in the message metadata. If it is set to true, the schema version is in plain string format. Otherwise, the schema version is in hex-encoded string format. |
withMetadata |
no |
false |
Save message attributes to metadata. |
withTopicPartitionNumber |
no |
true |
When it is set to true, include the topic partition number to the object path. |
The suggested permission policies for AWS S3 are:
-
s3:AbortMultipartUpload
-
s3:GetObject*
-
s3:PutObject*
-
s3:List*
If you don’t want to provide a region in the configuration, then enable the s3:GetBucketLocation
permission policy as well.
Name | Required | Default | Description |
---|---|---|---|
accessKeyId |
yes |
null |
The Cloud Storage access key ID. It requires permission to write objects. |
bucket |
yes |
null |
The Cloud Storage bucket. |
endpoint |
yes |
null |
The Cloud Storage endpoint. |
provider |
yes |
null |
The Cloud Storage type, such as aws-s3, s3v2 (s3v2 uses the AWS client but not the JCloud client). |
secretAccessKey |
yes |
null |
The Cloud Storage secret access key. |
avroCodec |
no |
snappy |
Compression codec used when formatType=avro. Available compression types are: null (no compression), deflate, bzip2, xz, zstandard, snappy. |
batchSize |
no |
10 |
The number of records submitted in batch. |
batchTimeMs |
no |
1000 |
The interval for batch submission. |
bytesFormatTypeSeparator |
no |
0x10 |
It is inserted between records for the formatType of bytes. By default, it is set to '0x10'. An input record that contains the line separator looks like multiple records in the output object. |
formatType |
no |
json |
The data format type. Available options are JSON, Avro, Bytes, or Parquet. By default, it is set to JSON. |
jsonAllowNaN |
no |
false |
Recognize 'NaN', 'INF', '-INF' as legal floating number values when formatType=json. Since JSON specification does not allow such values this is a non-standard feature and disabled by default. |
maxBatchBytes |
no |
10000000 |
The maximum number of bytes in a batch. |
parquetCodec |
no |
gzip |
Compression codec used when formatType=parquet. Available compression types are: null (no compression), snappy, gzip, lzo, brotli, lz4, zstd. |
partitionerType |
no |
partition |
The partitioning type. It can be configured by topic partitions or by time. By default, the partition type is configured by topic partitions. |
partitionerUseIndexAsOffset |
no |
false |
Whether to use the Pulsar’s message index as offset or the record sequence. It’s recommended if the incoming messages may be batched. The brokers may or not expose the index metadata and, if it’s not present on the record, the sequence will be used. See PIP-70 for more details. |
pathPrefix |
no |
false |
If it is set, the output files are stored in a folder under the given bucket path. The pathPrefix must be in the format of xx/xxx/. |
pendingQueueSize |
no |
10 |
The number of records buffered in queue. By default, it is equal to batchSize. You can set it manually. |
role |
no |
null |
The Cloud Storage role. |
roleSessionName |
no |
null |
The Cloud Storage role session name. |
skipFailedMessages |
no |
false |
Configure whether to skip a message which it fails to be processed. If it is set to true, the connector will skip the failed messages by ack it. Otherwise, the connector will fail the message. |
sliceTopicPartitionPath |
no |
false |
When it is set to true, split the partitioned topic name into separate folders in the bucket path. |
timePartitionDuration |
no |
86400000 |
The time interval for time-based partitioning. Support formatted interval string, such as 30d, 24h, 30m, 10s, and also support number in milliseconds precision, such as 86400000 refers to 24h or 1d. |
timePartitionPattern |
no |
yyyy-MM-dd |
The format pattern of the time-based partitioning. For details, refer to the Java date and time format. |
useHumanReadableMessageId |
no |
false |
Use a human-readable format string for messageId in message metadata. The messageId is in a format like ledgerId:entryId:partitionIndex:batchIndex. Otherwise, the messageId is a Hex-encoded string. |
useHumanReadableSchemaVersion |
no |
false |
Use a human-readable format string for the schema version in the message metadata. If it is set to true, the schema version is in plain string format. Otherwise, the schema version is in hex-encoded string format. |
withMetadata |
no |
false |
Save message attributes to metadata. |
withTopicPartitionNumber |
no |
true |
When it is set to true, include the topic partition number to the object path. |
Name | Required | Default | Description |
---|---|---|---|
azureStorageAccountConnectionString |
yes |
The Azure Blob Storage connection string. Required when authenticating via connection string. |
|
azureStorageAccountKey |
yes |
The Azure Blob Storage account key. Required when authenticating via account name and account key. |
|
azureStorageAccountName |
yes |
The Azure Blob Storage account name. Required when authenticating via account name and account key. |
|
azureStorageAccountSASToken |
yes |
The Azure Blob Storage account SAS token. Required when authenticating via SAS token. |
|
bucket |
yes |
null |
The Cloud Storage bucket. |
endpoint |
yes |
null |
The Azure Blob Storage endpoint. |
provider |
yes |
null |
The Cloud Storage type. Azure Blob Storage only supports the azure-blob-storage provider. |
avroCodec |
no |
snappy |
Compression codec used when formatType=avro. Available compression types are: null (no compression), deflate, bzip2, xz, zstandard, snappy. |
batchSize |
no |
10 |
The number of records submitted in batch. |
batchTimeMs |
no |
1000 |
The interval for batch submission. |
bytesFormatTypeSeparator |
no |
0x10 |
It is inserted between records for the formatType of bytes. By default, it is set to '0x10'. An input record that contains the line separator looks like multiple records in the output object. |
formatType |
no |
json |
The data format type. Available options are JSON, Avro, Bytes, or Parquet. By default, it is set to JSON. |
jsonAllowNaN |
no |
false |
Recognize 'NaN', 'INF', '-INF' as legal floating number values when formatType=json. Since JSON specification does not allow such values this is a non-standard feature and disabled by default. |
maxBatchBytes |
no |
10000000 |
The maximum number of bytes in a batch. |
parquetCodec |
no |
gzip |
Compression codec used when formatType=parquet. Available compression types are: null (no compression), snappy, gzip, lzo, brotli, lz4, zstd. |
partitionerType |
no |
partition |
The partitioning type. It can be configured by topic partitions or by time. By default, the partition type is configured by topic partitions. |
partitionerUseIndexAsOffset |
no |
false |
Whether to use the Pulsar’s message index as offset or the record sequence. It’s recommended if the incoming messages may be batched. The brokers may or not expose the index metadata and, if it’s not present on the record, the sequence will be used. See PIP-70 for more details. |
pathPrefix |
no |
false |
If it is set, the output files are stored in a folder under the given bucket path. The pathPrefix must be in the format of xx/xxx/. |
pendingQueueSize |
no |
10 |
The number of records buffered in queue. By default, it is equal to batchSize. You can set it manually. |
skipFailedMessages |
no |
false |
Configure whether to skip a message which it fails to be processed. If it is set to true, the connector will skip the failed messages by ack it. Otherwise, the connector will fail the message. |
sliceTopicPartitionPath |
no |
false |
When it is set to true, split the partitioned topic name into separate folders in the bucket path. |
timePartitionDuration |
no |
86400000 |
The time interval for time-based partitioning. Support formatted interval string, such as 30d, 24h, 30m, 10s, and also support number in milliseconds precision, such as 86400000 refers to 24h or 1d. |
timePartitionPattern |
no |
yyyy-MM-dd |
The format pattern of the time-based partitioning. For details, refer to the Java date and time format. |
useHumanReadableMessageId |
no |
false |
Use a human-readable format string for messageId in message metadata. The messageId is in a format like ledgerId:entryId:partitionIndex:batchIndex. Otherwise, the messageId is a Hex-encoded string. |
useHumanReadableSchemaVersion |
no |
false |
Use a human-readable format string for the schema version in the message metadata. If it is set to true, the schema version is in plain string format. Otherwise, the schema version is in hex-encoded string format. |
withMetadata |
no |
false |
Save message attributes to metadata. |
withTopicPartitionNumber |
no |
true |
When it is set to true, include the topic partition number to the object path. |