Cloud Storage

The Cloud Storage sink connector reads messages from Pulsar topics and writes them to cloud storage using specified format, such as Avro or Parquet.

The following cloud storage providers are supported:

Create the connector

  1. Optional: If you are using the pulsar-admin CLI or Pulsar Admin API, set the following commonly-used environment variables:

    export TENANT="TENANT_NAME"
    export TOPIC="INPUT_TOPIC_NAME"
    export NAMESPACE="NAMESPACE_NAME" # or default
    export SINK_NAME="SINK_CONNECTOR_UNIQUE_NAME"
    export PULSAR_TOKEN="TENANT_PULSAR_TOKEN" # API only
    export WEB_SERVICE_URL="TENANT_PULSAR_WEB_SERVICE_URL" # API only

    SINK_NAME is the name for your new sink connector. DataStax recommends a memorable, human-readable name that summarizes the connector’s purpose. For example: cloud-storage-sink-prod-us-east-1.

  2. Create the connector using JSON-formatted connector configuration data. You can pass the configuration directly or with a configuration file.

    pulsar-admin CLI
    ./bin/pulsar-admin sinks create \
      --sink-type cloud-storage \
      --name "$SINK_NAME" \
      --inputs "persistent://$TENANT/$NAMESPACE/$TOPIC" \
      --tenant "$TENANT" \
      --sink-config-file configs.json
    Pulsar Admin API
    curl -sS --fail -L -X POST "$WEB_SERVICE_URL/admin/v3/sinks/$TENANT/$NAMESPACE/$SINK_NAME" \
      --header "Authorization: Bearer $PULSAR_TOKEN" \
      --form "sinkConfig=@configs.json;type=application/json"
    Example configuration data structure
    {
      "archive": "builtin://cloud-storage",
      "autoAck": true,
      "className": "org.apache.pulsar.io.jcloud.sink.CloudStorageGenericRecordSink",
      "cleanupSubscription": false,
      "configs": {
        "batchSize": "10",
        "batchTimeMs": "1000",
        "bucket": "S3",
        "formatType": "json",
        "maxBatchBytes": "10000000",
        "partitionerType": "partition",
        "partitionerUseIndexAsOffset": false,
        "pendingQueueSize": "10",
        "provider": "AWS",
        "skipFailedMessages": false,
        "sliceTopicPartitionPath": false,
        "useHumanReadableMessageId": false,
        "useHumanReadableSchemaVersion": false,
        "withMetadata": false,
        "withTopicPartitionNumber": true
      },
      "customRuntimeOptions": "internal_data",
      "deadLetterTopic": null,
      "inputSpecs": {
        "persistent://${TENANT}/${NAMESPACE}/${TOPIC}": {
          "consumerProperties": {},
          "cryptoConfig": null,
          "poolMessages": false,
          "receiverQueueSize": null,
          "regexPattern": false,
          "schemaProperties": {},
          "schemaType": null,
          "serdeClassName": null
        }
      },
      "inputs": [
        "persistent://${TENANT}/${NAMESPACE}/${TOPIC}"
      ],
      "maxMessageRetries": null,
      "name": "${SINK_NAME}",
      "namespace": "${NAMESPACE}",
      "negativeAckRedeliveryDelayMs": null,
      "parallelism": 1,
      "processingGuarantees": "ATLEAST_ONCE",
      "resources": {
        "cpu": 0.25,
        "disk": 1000000000,
        "ram": 1000000000
      },
      "retainOrdering": false,
      "retainKeyOrdering": true,
      "runtimeFlags": null,
      "secrets": {},
      "sourceSubscriptionName": null,
      "sourceSubscriptionPosition": "Latest",
      "tenant": "${TENANT}",
      "timeoutMs": 5000,
      "topicToSchemaProperties": null,
      "topicToSchemaType": null,
      "topicToSerdeClassName": null,
      "transformFunction": null,
      "transformFunctionClassName": null,
      "transformFunctionConfig": null
    }

Edit the connector

To update a connector, pass the new configuration definition to the connector. For example, if you created the connector with a configuration file, you can pass an updated configuration file.

You can include the entire configuration or only the properties that you want to change. Additionally, some properties can be modified with specific arguments, such as --parallelism.

To get the current configuration, see Get sink connector configuration data.

pulsar-admin CLI
./bin/pulsar-admin sinks update \
  --sink-type cloud-storage \
  --name "$SINK_NAME" \
  --inputs "persistent://$TENANT/$NAMESPACE/$TOPIC" \
  --tenant "$TENANT" \
  --parallelism 2
Pulsar Admin API
curl -sS --fail -L -X PUT "$WEB_SERVICE_URL/admin/v3/sinks/$TENANT/$NAMESPACE/$SINK_NAME" \
  --header "Authorization: Bearer $PULSAR_TOKEN" \
  --form "sinkConfig=@configs.json;type=application/json"

Manage the connector

See Create and manage connectors for details on how to manage connectors after you create them, including:

  • Get connector status

  • Get existing connectors

  • Get connector configuration details

  • Start connectors

  • Stop connectors

  • Restart connectors

  • Delete connectors

Connector configuration reference

To configure Astra Streaming Pulsar connectors, you use a combination of common Pulsar properties and provider-specific properties.

Because Astra Streaming and the Astra Streaming Pulsar connectors are based on a specific version of the open-source Apache Pulsar project and connectors, some properties and values might be unsupported or ignored by Astra Streaming.

Pulsar sink connector properties

Pulsar connectors and functions both use Pulsar functions workers. Therefore, some function configuration properties are also used to configure connectors.

The following table lists Astra Streaming Pulsar sink connector configuration properties by JSON field name. For the equivalent command line arguments and more information about these properties, see the following:

Pulsar configuration properties for sink connectors
Field name Required Description

archive

Yes

The type of built-in Astra Streaming Pulsar connector that you want to deploy. Formatted as builtin://CONNECTOR, such as builtin://kafka.

autoAck

Yes

Whether the framework automatically acknowledges messages.

Default: false (automatic acknowledgment disabled)

className

Yes

The connector type’s class reference beginning with org.apache.pulsar, such as org.apache.pulsar.common.schema.KeyValue or org.apache.pulsar.io.kafka.connect.KafkaConnectSink.

cleanupSubscription

No

Whether to delete subscriptions that are created or used by a sink when the sink is deleted.

Default: false

configs

Yes

A JSON-formatted key-value map containing configuration properties specific to the connector type, including provider-specific authentication and integration settings. For available subproperties, see Cloud Storage sink connector properties (configs).

Default: {} (Empty map, uses defaults if they exist, fails otherwise)

customRuntimeOptions

No

A string that encodes options to configure the Apache Pulsar function runtime.

deadLetterTopic

No

The name of the topic that receives unacknowledged messages, such as those that exceed the maximum number of retries or fail to be processed completely.

If null or not set, unacknowledged messages are discarded.

See also maxMessageRetries and negativeAckRedeliveryDelayMs.

inputSpecs

No

A map of input topics to consumer configuration. By default, most values are null or empty. For example:

  "inputSpecs": {
    "persistent://$TENANT/$NAMESPACE/$TOPIC": {
      "schemaType": null,
      "serdeClassName": null,
      "schemaProperties": {},
      "consumerProperties": {},
      "receiverQueueSize": null,
      "cryptoConfig": null,
      "poolMessages": false,
      "regexPattern": false
    }
  },

inputs

Yes

An array of input topics that the sink consumes messages from, such as ["persistent://$TENANT/$NAMESPACE/$TOPIC"].

To consume all topics matching a naming pattern, use topicsPattern instead. inputs and topicsPattern are mutually exclusive.

Default: []

maxMessageRetries

No

Maximum number of times that a message attempts to be delivered before being sent to the dead letter queue as an unacknowledged message.

See also negativeAckRedeliveryDelayMs and deadLetterTopic.

name

Yes

The name for the connector. It must start with a lowercase letter, and contain only numbers, hyphens (-), and lowercase letters. DataStax recommends a memorable, human-readable name that summarizes the connector’s purpose. For example: cloud-storage-prod-us-east-1.

namespace

Yes

The namespace in your Pulsar tenant where you want to create the connector.

negativeAckRedeliveryDelayMs

No

The amount of time, in milliseconds, to wait before attempting redelivery if message delivery times out or fails.

See also maxMessageRetries and deadLetterTopic.

parallelism

Yes

The number of Pulsar function instances to run.

Default: 1

processingGuarantees

Yes

The messaging delivery semantic to use when writing to topics: ATLEAST_ONCE, ATMOST_ONCE, or EFFECTIVELY_ONCE.

Respect for the processing guarantee depends on the connector implementation.

For more information, see the Pulsar documentation on Function processing guarantees and Processing guarantees in I/O connectors.

Default: ATLEAST_ONCE

resources

No

A JSON string describing the compute resources to allocate to each Pulsar function instance. For example: {"cpu":0.25,"disk":1000000000,"ram":500000000}.

retainKeyOrdering

No

Whether the sink consumes and processes messages in key order.

Default: true

retainOrdering

No

Whether the sink consumes and processes messages in the order they were written to the topic.

Default: false

runtimeFlags

No

A string that encodes flags to pass to the Apache Pulsar function runtime. Only applicable to process and Kubernetes runtimes.

secrets

No

If security is enabled on your function workers, you can provide a map of secret names (secretName) to secrets provider configuration objects. The secretName is used by the connector to reference the secret. The mapped object contains the required properties to fetch the secret from the secrets provider.

To get the types for the values in this mapping, use the SecretProviderConfigurator.getSecretObjectType() method.

This is separate from connector-specific security settings in configs and the Pulsar authentication token used to connect to your Pulsar cluster.

sourceSubscriptionName

No

The name of a specific Pulsar source subscription, if required by your input topic consumer.

sourceSubscriptionPosition

No

The position to begin reading from in the source, if sourceSubscriptionName is set.

Default: Latest

tenant

Yes

The Pulsar tenant where you want to create the connector.

timeoutMs

No

The message timeout in milliseconds

Default: 5000

topicsPattern

No

A topic naming pattern to select topics to consume from all topics in a namespace.

To consume an exact list of topics, use inputs instead. inputs and topicsPattern are mutually exclusive.

topicToSchemaProperties

No

A map of input topics to schema properties specified as a JSON object.

topicToSchemaType

No

A map of input topics to schema types or class names specified as a JSON object.

topicToSerdeClassName

No

A map of input topics to SerDe class names specified as a JSON object.

Cloud Storage sink connector properties (configs)

In the configs section of the connector configuration, set the following data transformation and cloud storage connection properties.

Name Required Description

azureStorageAccountConnectionString

Yes for Azure

Not used for GCS or AWS

The Azure Blob Storage connection string to authenticate with the storage account.

If not set, you must set either azureStorageAccountSASToken or both azureStorageAccountKey and azureStorageAccountName.

azureStorageAccountKey and azureStorageAccountName

Yes for Azure

Not used for GCS or AWS

The Azure Blob Storage account key and account name to authenticate with the storage account. You must provide both properties when using this authentication method.

If not set, you must set either azureStorageAccountSASToken or azureStorageAccountConnectionString.

azureStorageAccountSASToken

Yes for Azure

Not used for GCS or AWS

The Azure Blob Storage account SAS token to authenticate with the storage account.

If not set, you must set either azureStorageAccountConnectionString or both azureStorageAccountKey and azureStorageAccountName.

avroCodec

No

If formatType is avro, set the codec compression type: bzip2, deflate, snappy (default), xz, zstandard, or null (no compression).

batchSize

No

The number of records submitted in a batch.

By default, batchSize is 10, and batchSize and pendingQueueSize are equal.

batchTimeMs

No

The interval for batch submission.

Default: 1000 milliseconds (1 second)

bucket

Yes

The name of the cloud storage bucket or container to write to.

bytesFormatTypeSeparator

No

This separator is inserted between records for the formatType of bytes. An input record that contains the line separator will look like multiple records in the output object.

Default: 0x10 (ASCII Data Link Escape character)

endpoint

Yes for AWS and Azure

Not used for GCS

The endpoint for your AWS S3 or Azure Blob Storage service.

formatType

Yes

The desired format type for the output to cloud storage: json (default), avro, bytes, or parquet.

For supported schema conversions and limitations, see Data format types.

See also avroCodec, bytesFormatTypeSeparator, jsonAllowNaN, parquetCodec, and withMetadata.

gcsServiceAccountKeyFileContent

Yes for GCS

Not used for AWS or Azure

The contents of a JSON GCS service account key file. If empty or not set, the connector attempts to read credentials from gcsServiceAccountKeyFilePath.

gcsServiceAccountKeyFilePath

Yes for GCS

Not used for AWS or Azure

The path to a JSON GCS service account key file. If empty or not set, the connector attempts to read credentials from the system’s GOOGLE_APPLICATION_CREDENTIALS environment variable.

jsonAllowNaN

No

If formatType is json, set this property to true if you want the connector to treat 'NaN', 'INF', '-INF' as valid floating number values.

The default is disabled (false) because these values aren’t compliant with the JSON specification.

maxBatchBytes

No

The maximum number of bytes in a batch.

Default: 10000000

parquetCodec

No

If formatType is parquet, set the codec compression type: brotli, gzip (default), lzo, lz4, snappy, zstd, or null (no compression).

partitionerType

No

The partitioning type:

  • partition (default): Partitions are based on topic partitions.

  • time: Partitions are based on message publish time. If using time-based partitioning, you can also set timePartitionDuration and timePartitionPattern.

partitionerUseIndexAsOffset

No

Whether to use the the Pulsar message index as the offset (message UUID) instead of the record sequence.

  • false (default): Use the record sequence as the offset.

  • true: Use the message index as the offset, if available. This is recommended when using batching.

If the broker doesn’t expose index metadata, or the record doesn’t have index metadata, then the sequence is used.

pathPrefix

No

If empty or not set, the output files are stored at the root of the specified bucket.

If set, output files are stored in a folder at the path within the bucket. Format the path as DIRECTORY/SUBDIRECTORY/ with a trailing slash but no leading slash.

Default: Not set

pendingQueueSize

No

The number of records buffered in queue.

By default, pendingQueueSize is 10, and batchSize and pendingQueueSize are equal.

provider

Yes

Specify the provider type for your cloud storage service:

  • Azure: Only supports azure-blob-storage.

  • AWS S3: Supports aws-s3 or s3v2. s3v2 uses the AWS client, not the JCloud client.

  • GCS: Only supports google-cloud-storage.

There is no default.

role and roleSessionName

Yes for AWS if using IAM roles

Not used for GCS or Azure

If using AWS IAM roles for authentication to AWS S3, then set role to the IAM role name, and set roleSessionName to the IAM role instance identifier. You must also set secretAccessKey and accessKeyId.

secretAccessKey and accessKeyId

Yes for AWS

Not used for GCS or Azure

An AWS secret access key and access key ID that grant access to the specified bucket. If using AWS IAM roles, you must also set role and roleSessionName.

Permission to write objects to the specified bucket are required. The following policies are recommended:

  • s3:AbortMultipartUpload

  • s3:GetObject*

  • s3:PutObject*

  • s3:List*

  • s3:GetBucketLocation (only if endpoint doesn’t include the region)

skipFailedMessages

No

Whether to skip messages that the connector fails to process for any reason, such as unsupported schema types:

  • true: The connector skips failed messages by acknowledging them as though they were successfully processed.

  • false (default): The connector fails the message, and the message remains unacknowledged.

If you want the connector to send failed messages to a dead-letter topic, set skipFailedMessages to false, and set maxMessageRetries and deadLetterTopic in the Pulsar sink connector properties.

sliceTopicPartitionPath

No

Whether to expand partitioned topic names into multiple directories in the bucket:

  • true: Split the partitioned topic name into multiple directories in the bucket. For example, demo-partition-0 expands to demo/partition-0/.

  • false (default): Use the partitioned topic name as one directory name in the bucket path, such as demo-partition-0/.

See also withTopicPartitionNumber.

timePartitionDuration

No

If partitionerType is time, set the time interval for each partition. Accepts either a string (30d, 24h, or 30m) or a number of milliseconds (86400000).

Default: 86400000 (24 hours)

timePartitionPattern

No

If partitionerType is time, set the datetime format using Java date and time format.

Default: yyyy-MM-dd

useHumanReadableMessageId

No

Whether to use a human-readable string for messageId in message metadata:

  • true: The messageId is a plain, human-readable string, such as ledgerId:entryId:partitionIndex:batchIndex.

  • false (default): The messageId is a Hex-encoded string.

useHumanReadableSchemaVersion

No

Whether to use a human-readable string for the schema version in message metadata:

  • true: The schema version is a plain, human-readable string.

  • false (default): The schema version is a Hex-encoded string.

withMetadata

No

Whether to save message attributes as additional message metadata.

Default: false

Ignored if formatType is bytes.

For example, when writing to Parquet (formatType: "parquet") from the ProtobufNative schema, the connector writes messages with the DynamicMessage format. If withMetadata is true, then the connector adds __message_metadata__ to the messages with PulsarIOCSCProtobufMessageMetadata.

Therefore, given the following message:

syntax = "proto3";
message User {
 string name = 1;
 int32 age = 2;
}

The connector writes the following output with metadata:

syntax = "proto3";
message PulsarIOCSCProtobufMessageMetadata {
 map<string, string> properties = 1;
 string schema_version = 2;
 string message_id = 3;
}
message User {
 string name = 1;
 int32 age = 2;
 PulsarIOCSCProtobufMessageMetadata __message_metadata__ = 3;
}

withTopicPartitionNumber

No

Whether to include the topic partition number in the object path:

  • true (default): Include the partition number in the object path, such as topic-name/partition-0/.

  • false: Do not include the partition number in the object path, such as topic-name/. This can be used to collapse partitions.

See also sliceTopicPartitionPath.

Data format types

The formatType parameter supports JSON, Avro, Bytes, and Parquet formats.

The following table lists the supported conversions from Pulsar schema types to each writer format, as well as any limitations or comments.

Pulsar Schema Writer: Avro Writer: JSON Writer: Parquet Writer: Bytes Notes

Primitive

Not supported

Supported

Not supported

Supported

The JSON writer attempts to convert data with a String or Bytes schema to valid JSON, if possible.

Avro

Supported

Supported

Supported

Supported

JSON

Supported

Supported

Supported

Supported

Protobuf

Supported

Supported

Supported

Supported

The Protobuf schema is based on the Avro schema. It uses Avro as an intermediate format, so it might not provide the best possible conversion.

ProtobufNative

Supported

Not supported

Supported

Supported

The ProtobufNative schema holds the Protobuf descriptor and the message.

When writing to Avro format, the connector uses avro-protobuf to do the conversion.

For information about Protobuf to Parquet, see withMetadata in Cloud Storage sink connector properties (configs).

Was this helpful?

Give Feedback

How can we improve the documentation?

© Copyright IBM Corporation 2026 | Privacy policy | Terms of use Manage Privacy Choices

Apache, Apache Cassandra, Cassandra, Apache Tomcat, Tomcat, Apache Lucene, Apache Solr, Apache Hadoop, Hadoop, Apache Pulsar, Pulsar, Apache Spark, Spark, Apache TinkerPop, TinkerPop, Apache Kafka and Kafka are either registered trademarks or trademarks of the Apache Software Foundation or its subsidiaries in Canada, the United States and/or other countries. Kubernetes is the registered trademark of the Linux Foundation.

General Inquiries: Contact IBM