Cloud Storage

Each public cloud has different ways of persisting data to their storage systems. Each cloud has their own way of formatting and storing the bytes. The Cloud Storage sink connector is a general interface to a chosen cloud storage, that exports data from a Pulsar topic to the given system following a desired format.

The cloud storage system supported are:

  • Google Cloud Storage (GCP)

  • S3 (AWS)

  • Azure Blob (Azure)

(see below for supported data formats)

Get Started

Set the required variables using any of the methods below.

export TENANT=<replace-me>
export INPUT_TOPIC=<replace-me>
export NAMESPACE=default
export SINK_NAME=cloud-storage-sink
  • Pulsar Admin

  • cURL

  • Sample Config Data

Refer to the complete pulsar-admin sinks spec for all available options.

Assuming you have downloaded client.conf to the Pulsar folder:

./bin/pulsar-admin sinks create \
  --sink-type cloud-storage \
  --name "$SINK_NAME" \
  --inputs "$TENANT/$NAMESPACE/$INPUT_TOPIC" \
  --tenant "$TENANT" \
  --processing-guarantees EFFECTIVELY_ONCE \
  --sink-config '{ <see below reference for storage specifics> }'

You’ll need to create an Astra Streaming API token to be used with the REST API. This is different from your Astra tokens.

Navigate to the "Settings" area in the Astra Streaming UI and choose "Create Token".

Retrieve the web service URL from the "Connect" tab in the Astra Streaming UI.

Refer to the complete Pulsar sinks REST API spec for all available options.

export WEB_SERVICE_URL=<replace-me>
export ASTRA_STREAMING_TOKEN=<replace-me>
curl -sS --fail --request POST ''$WEB_SERVICE_URL'/admin/v3/sinks/'$TENANT'/'$NAMESPACE'/'$SINK_NAME'?opt=poweruser' \
  --header "Authorization: Bearer $PULSAR_TOKEN" \
  --form 'sinkConfig="{
    \"archive\":\"builtin:\/\/cloud-storage\",
    \"tenant\":\"'$TENANT'\",
    \"namespace\":\"'$NAMESPACE'\",
    \"name\":\"'$SINK_NAME'\",
    \"parallelism\": 1,
    \"inputs\":[\"'$TENANT'\/'$NAMESPACE'\/'$INPUT_TOPIC'\"],
    \"configs\":{ <see below reference for storage specifics> }
  }"'
{
  "archive": "builtin://cloud-storage",
  "autoAck": true,
  "className": "org.apache.pulsar.io.jcloud.sink.CloudStorageGenericRecordSink",
  "cleanupSubscription": false,
  "configs": {
    "batchSize": "10",
    "batchTimeMs": "1000",
    "bucket": "S3",
    "formatType": "json",
    "maxBatchBytes": "10000000",
    "partitionerType": "partition",
    "partitionerUseIndexAsOffset": false,
    "pendingQueueSize": "10",
    "provider": "AWS",
    "skipFailedMessages": false,
    "sliceTopicPartitionPath": false,
    "useHumanReadableMessageId": false,
    "useHumanReadableSchemaVersion": false,
    "withMetadata": false,
    "withTopicPartitionNumber": true
  },
  "customRuntimeOptions": "internal_data",
  "deadLetterTopic": null,
  "inputSpecs": {
    "persistent://homelab/default/clue-sensors": {
      "consumerProperties": {},
      "cryptoConfig": null,
      "poolMessages": false,
      "receiverQueueSize": null,
      "regexPattern": false,
      "schemaProperties": {},
      "schemaType": null,
      "serdeClassName": null
    }
  },
  "inputs": [
    "persistent://homelab/default/clue-sensors"
  ],
  "maxMessageRetries": null,
  "name": "cloud-storage-sink",
  "namespace": "default",
  "negativeAckRedeliveryDelayMs": null,
  "parallelism": 1,
  "processingGuarantees": "ATLEAST_ONCE",
  "resources": {
    "cpu": 0.25,
    "disk": 1000000000,
    "ram": 1000000000
  },
  "retainKeyOrdering": false,
  "retainOrdering": false,
  "runtimeFlags": null,
  "secrets": null,
  "sourceSubscriptionName": null,
  "sourceSubscriptionPosition": "Latest",
  "tenant": "homelab",
  "timeoutMs": 5000,
  "topicToSchemaProperties": null,
  "topicToSchemaType": null,
  "topicToSerdeClassName": null,
  "topicsPattern": null,
  "transformFunction": null,
  "transformFunctionClassName": null,
  "transformFunctionConfig": null
}

Data format types

The Cloud Storage sink connector provides multiple output format options, including JSON, Avro, Bytes, or Parquet. The default format is JSON. With current implementation, there are some limitations for different formats:

Pulsar Schema types supported by the writers:

Pulsar Schema Writer: Avro Writer: JSON Writer: Parquet Writer: Bytes

Primitive

✅ *

Avro

Json

Protobuf **

ProtobufNative

✅ * * *

*The JSON writer will try to convert data with a String or Bytes schema to JSON-format data if convertable.

**The Protobuf schema is based on the Avro schema. It uses Avro as an intermediate format, so it may not provide the best effort conversion.

*** The ProtobufNative record holds the Protobuf descriptor and the message. When writing to Avro format, the connector uses avro-protobuf to do the conversion.

Supported withMetadata configurations for different writer formats:

Writer Format withMetadata

Avro

JSON

Parquet

✅ *

Bytes

*When using Parquet with PROTOBUF_NATIVE format, the connector will write the messages with the DynamicMessage format. When withMetadata is set to true, the connector will add message_metadata to the messages with PulsarIOCSCProtobufMessageMetadata format.

For example, if a message User has the following schema:

syntax = "proto3";
message User {
 string name = 1;
 int32 age = 2;
}

When withMetadata is set to true, the connector will write the message DynamicMessage with the following schema:

syntax = "proto3";
message PulsarIOCSCProtobufMessageMetadata {
 map<string, string> properties = 1;
 string schema_version = 2;
 string message_id = 3;
}
message User {
 string name = 1;
 int32 age = 2;
 PulsarIOCSCProtobufMessageMetadata __message_metadata__ = 3;
}

By default, when the connector receives a message with a non-supported schema type, the connector will fail the message. If you want to skip the non-supported messages, you can set skipFailedMessages to true.

Dead-letter topics

To use a dead-letter topic, set skipFailedMessages to false in the cloud provider config. Then using either pulsar-admin or curl, set --max-redeliver-count and --dead-letter-topic. For more info about dead-letter topics, see the Pulsar documentation. If a message fails to be sent to the Cloud Storage sink and there is a dead-letter topic, the connector will send the message to the assigned topic.

Managing the Connector

Start

  • Pulsar Admin

  • cURL

Refer to the complete pulsar-admin sinks spec for all available options.

Assuming you have downloaded client.conf to the Pulsar folder:

# Start all instances of a connector
./bin/pulsar-admin sinks start \
  --namespace "$NAMESPACE" \
  --name "$SINK_NAME" \
  --tenant "$TENANT"

# optionally add --instance-id to only start an individual instance

You’ll need to create an Astra Streaming API token to be used with the REST API. This is different from your Astra tokens.

Navigate to the "Settings" area in the Astra Streaming UI and choose "Create Token".

Retrieve the web service URL from the "Connect" tab in the Astra Streaming UI.

Refer to the complete Pulsar sinks REST API spec for all available options.

export WEB_SERVICE_URL=<replace-me>
export ASTRA_STREAMING_TOKEN=<replace-me>
# Start all instances of a connector
curl -sS --fail --location --request POST ''$WEB_SERVICE_URL'/admin/v3/sinks/'$TENANT'/'$NAMESPACE'/'$SINK_NAME'/start' \
  --header "Authorization: Bearer $ASTRA_STREAMING_TOKEN"

# Start an individual instance of a connector
curl -X POST "$WEB_SERVICE_URL/admin/v3/sinks/$TENANT/$NAMESPACE/$SINK_NAME/$SINK_INSTANCEID/start" \
-H "Authorization: $ASTRA_STREAMING_TOKEN"

Stop

  • Pulsar Admin

  • cURL

Refer to the complete pulsar-admin sinks spec for all available options.

Assuming you have downloaded client.conf to the Pulsar folder:

# Stop all instances of a connector
./bin/pulsar-admin sinks stop \
  --namespace "$NAMESPACE" \
  --name "$SINK_NAME" \
  --tenant "$TENANT"

# optionally add --instance-id to only stop an individual instance

You’ll need to create an Astra Streaming API token to be used with the REST API. This is different from your Astra tokens.

Navigate to the "Settings" area in the Astra Streaming UI and choose "Create Token".

Retrieve the web service URL from the "Connect" tab in the Astra Streaming UI.

Refer to the complete Pulsar sinks REST API spec for all available options.

export WEB_SERVICE_URL=<replace-me>
export ASTRA_STREAMING_TOKEN=<replace-me>
# Stop all instances of a connector
curl -sS --fail --request POST ''$WEB_SERVICE_URL'/admin/v3/sinks/'$TENANT'/'$NAMESPACE'/'$SINK_NAME'/stop' \
  --header "Authorization: Bearer $ASTRA_STREAMING_TOKEN"

# Stop an individual instance of a connector
curl -X POST "$WEB_SERVICE_URL/admin/v3/sinks/$TENANT/$NAMESPACE/$SINK_NAME/$SINK_INSTANCEID/stop" \
  --H "Authorization: $ASTRA_STREAMING_TOKEN"

Restart

  • Pulsar Admin

  • cURL

Refer to the complete pulsar-admin sinks spec for all available options.

Assuming you have downloaded client.conf to the Pulsar folder:

# Restart all instances of a connector
./bin/pulsar-admin sinks restart \
  --namespace "$NAMESPACE" \
  --name "$SINK_NAME" \
  --tenant "$TENANT"

# optionally add --instance-id to only restart an individual instance

You’ll need to create an Astra Streaming API token to be used with the REST API. This is different from your Astra tokens.

Navigate to the "Settings" area in the Astra Streaming UI and choose "Create Token".

Retrieve the web service URL from the "Connect" tab in the Astra Streaming UI.

Refer to the complete Pulsar sinks REST API spec for all available options.

export WEB_SERVICE_URL=<replace-me>
export ASTRA_STREAMING_TOKEN=<replace-me>
# Restart all instances of a connector
curl -sS --fail --request POST ''$WEB_SERVICE_URL'/admin/v3/sinks/'$TENANT'/'$NAMESPACE'/'$SINK_NAME'/restart' \
  --header "Authorization: Bearer $ASTRA_STREAMING_TOKEN"

# Restart an individual instance of a connector
curl -X POST "$WEB_SERVICE_URL/admin/v3/sinks/$TENANT/$NAMESPACE/$SINK_NAME/$SINK_INSTANCEID/restart" \
-H "Authorization: $ASTRA_STREAMING_TOKEN"

Update

  • Pulsar Admin

  • cURL

  • Sample Config Data

Refer to the complete pulsar-admin sinks spec for all available options.

Assuming you have downloaded client.conf to the Pulsar folder:

./bin/pulsar-admin sinks update \
  --sink-type cloud-storage \
  --name "$SINK_NAME" \
  --inputs "$TENANT/$NAMESPACE/$INPUT_TOPIC" \
  --tenant "$TENANT" \
  --parallelism 2

You’ll need to create an Astra Streaming API token to be used with the REST API. This is different from your Astra tokens.

Navigate to the "Settings" area in the Astra Streaming UI and choose "Create Token".

Retrieve the web service URL from the "Connect" tab in the Astra Streaming UI.

Refer to the complete Pulsar sinks REST API spec for all available options.

export WEB_SERVICE_URL=<replace-me>
export ASTRA_STREAMING_TOKEN=<replace-me>
curl -sS --fail --request PUT ''$WEB_SERVICE_URL'/admin/v3/sinks/'$TENANT'/'$NAMESPACE'/'$SINK_NAME'?opt=poweruser' \
  --header "Authorization: Bearer $ASTRA_STREAMING_TOKEN" \
  --form 'sinkConfig="{
    \"archive\":\"builtin:\/\/cloud-storage\",
    \"tenant\":\"'$TENANT'\",
    \"namespace\":\"'$NAMESPACE'\",
    \"name\":\"'$SINK_NAME'\",
    \"parallelism\": 2,
    \"inputs\":[\"'$TENANT'\/'$NAMESPACE'\/'$INPUT_TOPIC'\"]
  }"'
{
  "archive": "builtin://cloud-storage",
  "autoAck": true,
  "className": "org.apache.pulsar.io.jcloud.sink.CloudStorageGenericRecordSink",
  "cleanupSubscription": false,
  "configs": {
    "batchSize": "10",
    "batchTimeMs": "1000",
    "bucket": "S3",
    "formatType": "json",
    "maxBatchBytes": "10000000",
    "partitionerType": "partition",
    "partitionerUseIndexAsOffset": false,
    "pendingQueueSize": "10",
    "provider": "AWS",
    "skipFailedMessages": false,
    "sliceTopicPartitionPath": false,
    "useHumanReadableMessageId": false,
    "useHumanReadableSchemaVersion": false,
    "withMetadata": false,
    "withTopicPartitionNumber": true
  },
  "customRuntimeOptions": "internal_data",
  "deadLetterTopic": null,
  "inputSpecs": {
    "persistent://homelab/default/clue-sensors": {
      "consumerProperties": {},
      "cryptoConfig": null,
      "poolMessages": false,
      "receiverQueueSize": null,
      "regexPattern": false,
      "schemaProperties": {},
      "schemaType": null,
      "serdeClassName": null
    }
  },
  "inputs": [
    "persistent://homelab/default/clue-sensors"
  ],
  "maxMessageRetries": null,
  "name": "cloud-storage-sink",
  "namespace": "default",
  "negativeAckRedeliveryDelayMs": null,
  "parallelism": 1,
  "processingGuarantees": "ATLEAST_ONCE",
  "resources": {
    "cpu": 0.25,
    "disk": 1000000000,
    "ram": 1000000000
  },
  "retainKeyOrdering": false,
  "retainOrdering": false,
  "runtimeFlags": null,
  "secrets": null,
  "sourceSubscriptionName": null,
  "sourceSubscriptionPosition": "Latest",
  "tenant": "homelab",
  "timeoutMs": 5000,
  "topicToSchemaProperties": null,
  "topicToSchemaType": null,
  "topicToSerdeClassName": null,
  "topicsPattern": null,
  "transformFunction": null,
  "transformFunctionClassName": null,
  "transformFunctionConfig": null
}

Delete

  • Pulsar Admin

  • cURL

Refer to the complete pulsar-admin sinks spec for all available options.

Assuming you have downloaded client.conf to the Pulsar folder:

# Delete all instances of a connector
./bin/pulsar-admin sinks delete \
  --namespace "$NAMESPACE" \
  --name "$SINK_NAME" \
  --tenant "$TENANT"

You’ll need to create an Astra Streaming API token to be used with the REST API. This is different from your Astra tokens.

Navigate to the "Settings" area in the Astra Streaming UI and choose "Create Token".

Retrieve the web service URL from the "Connect" tab in the Astra Streaming UI.

Refer to the complete Pulsar sinks REST API spec for all available options.

export WEB_SERVICE_URL=<replace-me>
export ASTRA_STREAMING_TOKEN=<replace-me>
# Delete all instances of a connector
curl -sS --fail --location --request DELETE ''$WEB_SERVICE_URL'/admin/v3/sinks/'$TENANT'/'$NAMESPACE'/'$SINK_NAME'' \
  --header "Authorization: Bearer $ASTRA_STREAMING_TOKEN"

Monitoring the Connector

Info

  • Pulsar Admin

  • cURL

  • Response

Refer to the complete pulsar-admin sinks spec for all available options.

Assuming you have downloaded client.conf to the Pulsar folder:

# Get information about connector
./bin/pulsar-admin sinks get \
  --namespace "$NAMESPACE" \
  --name "$SINK_NAME" \
  --tenant "$TENANT"

You’ll need to create an Astra Streaming API token to be used with the REST API. This is different from your Astra tokens.

Navigate to the "Settings" area in the Astra Streaming UI and choose "Create Token".

Retrieve the web service URL from the "Connect" tab in the Astra Streaming UI.

Refer to the complete Pulsar sinks REST API spec for all available options.

export WEB_SERVICE_URL=<replace-me>
export ASTRA_STREAMING_TOKEN=<replace-me>
curl -sS --fail --location ''$WEB_SERVICE_URL'/admin/v3/sinks/'$TENANT'/'$NAMESPACE'/'$SINK_NAME'' \
  --header "Authorization: Bearer $ASTRA_STREAMING_TOKEN"
{
  "tenant": "string",
  "namespace": "string",
  "name": "string",
  "className": "string",
  "sourceSubscriptionName": "string",
  "sourceSubscriptionPosition": "Latest",
  "inputs": [
    "string"
  ],
  "topicToSerdeClassName": {
    "property1": "string",
    "property2": "string"
  },
  "topicsPattern": "string",
  "topicToSchemaType": {
    "property1": "string",
    "property2": "string"
  },
  "topicToSchemaProperties": {
    "property1": "string",
    "property2": "string"
  },
  "inputSpecs": {
    "property1": {
      "schemaType": "string",
      "serdeClassName": "string",
      "schemaProperties": {
        "property1": "string",
        "property2": "string"
      },
      "consumerProperties": {
        "property1": "string",
        "property2": "string"
      },
      "receiverQueueSize": 0,
      "cryptoConfig": {
        "cryptoKeyReaderClassName": "string",
        "cryptoKeyReaderConfig": {
          "property1": {},
          "property2": {}
        },
        "encryptionKeys": [
          "string"
        ],
        "producerCryptoFailureAction": "FAIL",
        "consumerCryptoFailureAction": "FAIL"
      },
      "poolMessages": true,
      "regexPattern": true
    },
    "property2": {
      "schemaType": "string",
      "serdeClassName": "string",
      "schemaProperties": {
        "property1": "string",
        "property2": "string"
      },
      "consumerProperties": {
        "property1": "string",
        "property2": "string"
      },
      "receiverQueueSize": 0,
      "cryptoConfig": {
        "cryptoKeyReaderClassName": "string",
        "cryptoKeyReaderConfig": {
          "property1": {},
          "property2": {}
        },
        "encryptionKeys": [
          "string"
        ],
        "producerCryptoFailureAction": "FAIL",
        "consumerCryptoFailureAction": "FAIL"
      },
      "poolMessages": true,
      "regexPattern": true
    }
  },
  "maxMessageRetries": 0,
  "deadLetterTopic": "string",
  "configs": {
    "property1": {},
    "property2": {}
  },
  "secrets": {
    "property1": {},
    "property2": {}
  },
  "parallelism": 0,
  "processingGuarantees": "ATLEAST_ONCE",
  "retainOrdering": true,
  "retainKeyOrdering": true,
  "resources": {
    "cpu": 0,
    "ram": 0,
    "disk": 0
  },
  "autoAck": true,
  "timeoutMs": 0,
  "negativeAckRedeliveryDelayMs": 0,
  "sinkType": "string",
  "archive": "string",
  "cleanupSubscription": true,
  "runtimeFlags": "string",
  "customRuntimeOptions": "string",
  "transformFunction": "string",
  "transformFunctionClassName": "string",
  "transformFunctionConfig": "string"
}

Health

  • Pulsar Admin

  • cURL

  • Response

Refer to the complete pulsar-admin sinks spec for all available options.

Assuming you have downloaded client.conf to the Pulsar folder:

# Check connector status
./bin/pulsar-admin sinks status \
  --instance-id "$SINK_INSTANCEID" \
  --namespace "$NAMESPACE" \
  --name "$SINK_NAME" \
  --tenant "$TENANT"

You’ll need to create an Astra Streaming API token to be used with the REST API. This is different from your Astra tokens.

Navigate to the "Settings" area in the Astra Streaming UI and choose "Create Token".

Retrieve the web service URL from the "Connect" tab in the Astra Streaming UI.

Refer to the complete Pulsar sinks REST API spec for all available options.

export WEB_SERVICE_URL=<replace-me>
export ASTRA_STREAMING_TOKEN=<replace-me>
# Get the status of all connector instances
curl -sS --fail --location ''$WEB_SERVICE_URL'/admin/v3/sinks/'$TENANT'/'$NAMESPACE'/'$SINK_NAME'/status' \
  --header "Authorization: Bearer $ASTRA_STREAMING_TOKEN"

# Get the status of an individual connector instance
curl "$WEB_SERVICE_URL/admin/v3/sinks/$TENANT/$NAMESPACE/$SINK_NAME/$SINK_INSTANCEID/status" \
  -H "accept: application/json" \
  -H "Authorization: $ASTRA_STREAMING_TOKEN"

Status response for all connector instances

{
 "numInstances": 0,
 "numRunning": 0,
 "instances": [
   {
     "instanceId": 0,
     "status": {
       "running": true,
       "error": "string",
       "numRestarts": 0,
       "numReadFromPulsar": 0,
       "numSystemExceptions": 0,
       "latestSystemExceptions": [
         {
           "exceptionString": "string",
           "timestampMs": 0
         }
       ],
       "numSinkExceptions": 0,
       "latestSinkExceptions": [
         {
           "exceptionString": "string",
           "timestampMs": 0
         }
       ],
       "numWrittenToSink": 0,
       "lastReceivedTime": 0,
       "workerId": "string"
     }
   }
 ]
}

Status response for individual connector instance

{
 "running": true,
 "error": "string",
 "numRestarts": 0,
 "numReadFromPulsar": 0,
 "numSystemExceptions": 0,
 "latestSystemExceptions": [
   {
     "exceptionString": "string",
     "timestampMs": 0
   }
 ],
 "numSinkExceptions": 0,
 "latestSinkExceptions": [
   {
     "exceptionString": "string",
     "timestampMs": 0
   }
 ],
 "numWrittenToSink": 0,
 "lastReceivedTime": 0,
 "workerId": "string"
}

Metrics

Astra Streaming exposes Prometheus formatted metrics for every connector. Refer to the scrape metrics with Prometheus page for more detail.

Connector Reference

With the Cloud Storage Sink there are two sets of parameters. First, the Astra Streaming parameters, then the parameters specific to your chosen cloud store.

Astra Streaming

Name Required Default Description

archive

true

The connector type, like 'builtin://elastic_search'

autoAck

true

false

Boolean denotes whether or not the framework will automatically acknowledge messages

className

true

The connector type’s class reference, like 'org.apache.pulsar.io.debezium.mysql.DebeziumMysqlSource'

cleanupSubscription

false

false

Boolean denotes whether the subscriptions the functions created/used should be deleted when the functions is deleted

configs

false

{}

A key/value map of config properties specific to the type of connector. See the reference table below for values.

customRuntimeOptions

false

A string that encodes options to customize the runtime, see Apache Pulsar docs for configured runtime for details

deadLetterTopic

false

Name of the dead topic where the failing messages will be sent

inputSpecs

false

The map of input topics to its consumer configuration, each configuration has schema of {"schemaType": "type-x", "serdeClassName": "name-x", "isRegexPattern": true, "receiverQueueSize": 5}

inputs

true

[]

The input topic or topics of the Sink (specified as a JSON array)

maxMessageRetries

false

Maximum number of times that a message will be redelivered before being sent to the dead letter queue

name

true

Give your sink a good name for later reference. The name must start with a lowercase alphabetic character. It can only contain lowercase alphanumeric characters, and hyphens(kebab-case).

namespace

true

The namespace you’d like the sink created under

negativeAckRedeliveryDelayMs

false

The negative ack message redelivery delay in milliseconds

parallelism

true

1

The number of sink instances to run

processingGuarantees

true

ATLEAST_ONCE

The delivery semantics applied to the Pulsar Sink. Values are 'ATLEAST_ONCE', 'ATMOST_ONCE', 'EFFECTIVELY_ONCE'

resources

false

The compute resources that need to be allocated per instance (applicable only to the process)(as a JSON string). Example: {"cpu": 0.25,"disk":1000000000,"ram":500000000}

retainKeyOrdering

false

true

Sink consumes and processes messages in key order

retainOrdering

false

false

Boolean denotes whether the Pulsar Sink consumes and processes messages in order

runtimeFlags

false

A string that encodes options to customize the runtime, see Apache Pulsar docs for configured runtime for details

secrets

false

This is a map of secretName(that is how the secret is going to be accessed in the function via context) to an object that encapsulates how the secret is fetched by the underlying secrets provider. The type of an value here can be found by the SecretProviderConfigurator.getSecretObjectType() method

sourceSubscriptionName

false

Pulsar source subscription name if user wants a specific subscription-name for input-topic consumer

sourceSubscriptionPosition

false

Earliest

The position to begin reading from the source

tenant

true

The tenant you’d like the sink created under

timeoutMs

false

5000

Denotes the message timeout in milliseconds

topicToSchemaProperties

false

topicToSchemaType

false

The map of input topics to Schema types or class names (specified as a JSON object)

topicToSerdeClassName

false

The map of input topics to SerDe class names (specified as a JSON object)

topicsPattern

false

TopicsPattern to consume from list of topics under a namespace that match the pattern. [input] and [topicsPattern] are mutually exclusive. Add SerDe class name for a pattern in customSerdeInputs (supported for java fun only)

Cloud specific parameters (configs)

Choose the storage provider and set the parameter values in the "configs" area.

  • Google Cloud Storage

  • AWS S3 Storage

  • Azure Blob Storage

Name Required Default Description

bucket

true

null

The Cloud Storage bucket.

provider

true

null

The Cloud Storage type. Google cloud storage only supports the google-cloud-storage provider.

avroCodec

false

snappy

Compression codec used when formatType=avro. Available compression types are: null (no compression), deflate, bzip2, xz, zstandard, snappy.

batchSize

false

10

The number of records submitted in batch.

batchTimeMs

false

1000

The interval for batch submission.

bytesFormatTypeSeparator

false

0x10

It is inserted between records for the formatType of bytes. By default, it is set to '0x10'. An input record that contains the line separator looks like multiple records in the output object.

formatType

false

json

The data format type. Available options are JSON, Avro, Bytes, or Parquet. By default, it is set to JSON.

gcsServiceAccountKeyFileContent

false

The contents of the JSON service key file. If empty, credentials are read from gcsServiceAccountKeyFilePath file.

gcsServiceAccountKeyFilePath

false

Path to the GCS credentials file. If empty, the credentials file are read from the GOOGLE_APPLICATION_CREDENTIALS environment variable.

jsonAllowNaN

false

false

Recognize 'NaN', 'INF', '-INF' as legal floating number values when formatType=json. Since JSON specification does not allow such values this is a non-standard feature and disabled by default.

maxBatchBytes

false

10000000

The maximum number of bytes in a batch.

parquetCodec

false

gzip

Compression codec used when formatType=parquet. Available compression types are: null (no compression), snappy, gzip, lzo, brotli, lz4, zstd.

partitionerType

false

partition

The partitioning type. It can be configured by topic partitions or by time. By default, the partition type is configured by topic partitions.

partitionerUseIndexAsOffset

false

false

Whether to use the Pulsar’s message index as offset or the record sequence. It’s recommended if the incoming messages may be batched. The brokers may or not expose the index metadata and, if it’s not present on the record, the sequence will be used. See PIP-70 for more details.

pathPrefix

false

false

If it is set, the output files are stored in a folder under the given bucket path. The pathPrefix must be in the format of xx/xxx/.

pendingQueueSize

false

10

The number of records buffered in queue. By default, it is equal to batchSize. You can set it manually.

skipFailedMessages

false

false

Configure whether to skip a message which it fails to be processed. If it is set to true, the connector will skip the failed messages by ack it. Otherwise, the connector will fail the message.

sliceTopicPartitionPath

false

false

When it is set to true, split the partitioned topic name into separate folders in the bucket path.

timePartitionDuration

false

86400000

The time interval for time-based partitioning. Support formatted interval string, such as 30d, 24h, 30m, 10s, and also support number in milliseconds precision, such as 86400000 refers to 24h or 1d.

timePartitionPattern

false

yyyy-MM-dd

The format pattern of the time-based partitioning. For details, refer to the Java date and time format.

useHumanReadableMessageId

false

false

Use a human-readable format string for messageId in message metadata. The messageId is in a format like ledgerId:entryId:partitionIndex:batchIndex. Otherwise, the messageId is a Hex-encoded string.

useHumanReadableSchemaVersion

false

false

Use a human-readable format string for the schema version in the message metadata. If it is set to true, the schema version is in plain string format. Otherwise, the schema version is in hex-encoded string format.

withMetadata

false

false

Save message attributes to metadata.

withTopicPartitionNumber

false

true

When it is set to true, include the topic partition number to the object path.

The suggested permission policies for AWS S3 are:

  • s3:AbortMultipartUpload

  • s3:GetObject*

  • s3:PutObject*

  • s3:List*

If you do not want to provide a region in the configuration, you should enable s3:GetBucketLocation permission policy as well.

Name Required Default Description

accessKeyId

true

null

The Cloud Storage access key ID. It requires permission to write objects.

bucket

true

null

The Cloud Storage bucket.

endpoint

true

null

The Cloud Storage endpoint.

provider

true

null

The Cloud Storage type, such as aws-s3,s3v2(s3v2 uses the AWS client but not the JCloud client).

secretAccessKey

true

null

The Cloud Storage secret access key.

avroCodec

false

snappy

Compression codec used when formatType=avro. Available compression types are: null (no compression), deflate, bzip2, xz, zstandard, snappy.

avroCodec

false

snappy

Compression codec used when formatType=avro. Available compression types are: null (no compression), deflate, bzip2, xz, zstandard, snappy.

batchSize

false

10

The number of records submitted in batch.

batchTimeMs

false

1000

The interval for batch submission.

bytesFormatTypeSeparator

false

0x10

It is inserted between records for the formatType of bytes. By default, it is set to '0x10'. An input record that contains the line separator looks like multiple records in the output object.

formatType

false

json

The data format type. Available options are JSON, Avro, Bytes, or Parquet. By default, it is set to JSON.

jsonAllowNaN

false

false

Recognize 'NaN', 'INF', '-INF' as legal floating number values when formatType=json. Since JSON specification does not allow such values this is a non-standard feature and disabled by default.

jsonAllowNaN

false

false

Recognize 'NaN', 'INF', '-INF' as legal floating number values when formatType=json. Since JSON specification does not allow such values this is a non-standard feature and disabled by default.

maxBatchBytes

false

10000000

The maximum number of bytes in a batch.

parquetCodec

false

gzip

Compression codec used when formatType=parquet. Available compression types are: null (no compression), snappy, gzip, lzo, brotli, lz4, zstd.

parquetCodec

false

gzip

Compression codec used when formatType=parquet. Available compression types are: null (no compression), snappy, gzip, lzo, brotli, lz4, zstd.

partitionerType

false

partition

The partitioning type. It can be configured by topic partitions or by time. By default, the partition type is configured by topic partitions.

partitionerUseIndexAsOffset

false

false

Whether to use the Pulsar’s message index as offset or the record sequence. It’s recommended if the incoming messages may be batched. The brokers may or not expose the index metadata and, if it’s not present on the record, the sequence will be used. See PIP-70 for more details.

pathPrefix

false

false

If it is set, the output files are stored in a folder under the given bucket path. The pathPrefix must be in the format of xx/xxx/.

pendingQueueSize

false

10

The number of records buffered in queue. By default, it is equal tobatchSize. You can set it manually.

role

false

null

The Cloud Storage role.

roleSessionName

false

null

The Cloud Storage role session name.

skipFailedMessages

false

false

Configure whether to skip a message which it fails to be processed. If it is set to true, the connector will skip the failed messages by ack it. Otherwise, the connector will fail the message.

sliceTopicPartitionPath

false

false

When it is set to true, split the partitioned topic name into separate folders in the bucket path.

timePartitionDuration

false

86400000

The time interval for time-based partitioning. Support formatted interval string, such as 30d, 24h, 30m, 10s, and also support number in milliseconds precision, such as 86400000 refers to 24h or 1d.

timePartitionPattern

false

yyyy-MM-dd

The format pattern of the time-based partitioning. For details, refer to the Java date and time format.

useHumanReadableMessageId

false

false

Use a human-readable format string for messageId in message metadata. The messageId is in a format like ledgerId:entryId:partitionIndex:batchIndex. Otherwise, the messageId is a Hex-encoded string.

useHumanReadableSchemaVersion

false

false

Use a human-readable format string for the schema version in the message metadata. If it is set to true, the schema version is in plain string format. Otherwise, the schema version is in hex-encoded string format.

withMetadata

false

false

Save message attributes to metadata.

withTopicPartitionNumber

false

true

When it is set to true, include the topic partition number to the object path.

Name Required Default Description

azureStorageAccountConnectionString

true

The Azure Blob Storage connection string. Required when authenticating via connection string.

azureStorageAccountKey

true

The Azure Blob Storage account key. Required when authenticating via account name and account key.

azureStorageAccountName

true

The Azure Blob Storage account name. Required when authenticating via account name and account key.

azureStorageAccountSASToken

true

The Azure Blob Storage account SAS token. Required when authenticating via SAS token.

bucket

true

null

The Cloud Storage bucket.

endpoint

true

null

The Azure Blob Storage endpoint.

provider

true

null

The Cloud Storage type. Azure Blob Storage only supports the azure-blob-storage provider.

avroCodec

false

snappy

Compression codec used when formatType=avro. Available compression types are: null (no compression), deflate, bzip2, xz, zstandard, snappy.

batchSize

false

10

The number of records submitted in batch.

batchTimeMs

false

1000

The interval for batch submission.

bytesFormatTypeSeparator

false

0x10

It is inserted between records for the formatType of bytes. By default, it is set to '0x10'. An input record that contains the line separator looks like multiple records in the output object.

formatType

false

json

The data format type. Available options are JSON, Avro, Bytes, or Parquet. By default, it is set to JSON.

jsonAllowNaN

false

false

Recognize 'NaN', 'INF', '-INF' as legal floating number values when formatType=json. Since JSON specification does not allow such values this is a non-standard feature and disabled by default.

maxBatchBytes

false

10000000

The maximum number of bytes in a batch.

parquetCodec

false

gzip

Compression codec used when formatType=parquet. Available compression types are: null (no compression), snappy, gzip, lzo, brotli, lz4, zstd.

partitionerType

false

partition

The partitioning type. It can be configured by topic partitions or by time. By default, the partition type is configured by topic partitions.

partitionerUseIndexAsOffset

false

false

Whether to use the Pulsar’s message index as offset or the record sequence. It’s recommended if the incoming messages may be batched. The brokers may or not expose the index metadata and, if it’s not present on the record, the sequence will be used. See PIP-70 for more details.

pathPrefix

false

false

If it is set, the output files are stored in a folder under the given bucket path. The pathPrefix must be in the format of xx/xxx/.

pendingQueueSize

false

10

The number of records buffered in queue. By default, it is equal to batchSize. You can set it manually.

skipFailedMessages

false

false

Configure whether to skip a message which it fails to be processed. If it is set to true, the connector will skip the failed messages by ack it. Otherwise, the connector will fail the message.

sliceTopicPartitionPath

false

false

When it is set to true, split the partitioned topic name into separate folders in the bucket path.

timePartitionDuration

false

86400000

The time interval for time-based partitioning. Support formatted interval string, such as 30d, 24h, 30m, 10s, and also support number in milliseconds precision, such as 86400000 refers to 24h or 1d.

timePartitionPattern

false

yyyy-MM-dd

The format pattern of the time-based partitioning. For details, refer to the Java date and time format.

useHumanReadableMessageId

false

false

Use a human-readable format string for messageId in message metadata. The messageId is in a format like ledgerId:entryId:partitionIndex:batchIndex. Otherwise, the messageId is a Hex-encoded string.

useHumanReadableSchemaVersion

false

false

Use a human-readable format string for the schema version in the message metadata. If it is set to true, the schema version is in plain string format. Otherwise, the schema version is in hex-encoded string format.

withMetadata

false

false

Save message attributes to metadata.

withTopicPartitionNumber

false

true

When it is set to true, include the topic partition number to the object path.

What’s next?

Learn more about Google’s Cloud Storage.

Learn more about Azure Blob Store.

Learn more about AWS S3.

Was this helpful?

Give Feedback

How can we improve the documentation?

© 2025 DataStax | Privacy policy | Terms of use

Apache, Apache Cassandra, Cassandra, Apache Tomcat, Tomcat, Apache Lucene, Apache Solr, Apache Hadoop, Hadoop, Apache Pulsar, Pulsar, Apache Spark, Spark, Apache TinkerPop, TinkerPop, Apache Kafka and Kafka are either registered trademarks or trademarks of the Apache Software Foundation or its subsidiaries in Canada, the United States and/or other countries. Kubernetes is the registered trademark of the Linux Foundation.

General Inquiries: +1 (650) 389-6000, info@datastax.com