Kinesis

The Kinesis source connector pulls data from Amazon Kinesis and persists data into an Apache Pulsar™ topic.

Get Started

Set the following environment variables using pulsar-admin or curl:

export TENANT=<replace-me>
export DESTINATION_TOPIC=<replace-me>
export NAMESPACE=default
export SOURCE_NAME=kinesis-src
  • Pulsar Admin

  • curl

  • Sample Config Data

Refer to the complete pulsar-admin sources spec for all available options.

Assuming you have downloaded client.conf to the Pulsar folder:

./bin/pulsar-admin sources create \
  --source-type kinesis \
  --name "$SOURCE_NAME" \
  --destination-topic-name "persistent://$TENANT/$NAMESPACE/$DESTINATION_TOPIC" \
  --tenant "$TENANT" \
  --source-config '{
    "awsEndpoint": "https://some.endpoint.aws",
    "awsRegion": "us-east-1",
    "awsKinesisStreamName": "my-stream",
    "awsCredentialPluginParam": "{\"accessKey\":\"myKey\",\"secretKey\":\"my-Secret\"}",
    "applicationName": "My test application",
    "checkpointInterval": "30000",
    "backoffTime": "4000",
    "numRetries": "3",
    "receiveQueueSize": 2000,
    "initialPositionInStream": "TRIM_HORIZON",
    "startAtTime": "2020-08-09T19:28:58.000Z"
    }'

You need a Pulsar token for REST API authentication. This is different from your Astra DB application tokens.

  1. In the Astra Portal, click Streaming tenants.

  2. Click your tenant’s name, and then click the Settings tab.

  3. Click Create Token.

  4. Copy the token, store it securely, and then click Close.

  5. Click the Connect tab, and then copy the Web Service URL.

  6. Create environment variables for your tenant’s token and web service URL:

    export WEB_SERVICE_URL=<replace-me>
    export ASTRA_STREAMING_TOKEN=<replace-me>

    Refer to the complete Pulsar sources REST API spec, for all available options.

curl -sS --fail -X POST "$WEB_SERVICE_URL/admin/v3/astrasources/$TENANT/$NAMESPACE/$SOURCE_NAME" \
  -H "accept: application/json" \
  -H "Content-Type: application/json" \
  -H "Authorization: $ASTRA_STREAMING_TOKEN" \
  -d '{
        "tenant": "'$TENANT'",
        "topicName": "persistent://'$TENANT'/'$NAMESPACE'/'$DESTINATION_TOPIC'",
        "name": "'$SOURCE_NAME'",
        "namespace": "'$NAMESPACE'",
        "archive": "builtin://kinesis",
        "parallelism": 1,
        "processingGuarantees": "ATLEAST_ONCE",
        "configs": {
          "awsEndpoint": "https://some.endpoint.aws",
          "awsRegion": "us-east-1",
          "awsKinesisStreamName": "my-stream",
          "awsCredentialPluginParam": "{\"accessKey\":\"myKey\",\"secretKey\":\"my-Secret\"}",
          "applicationName": "My test application",
          "checkpointInterval": "30000",
          "backoffTime": "4000",
          "numRetries": "3",
          "receiveQueueSize": 2000,
          "initialPositionInStream": "TRIM_HORIZON",
          "startAtTime": "2020-08-09T19:28:58.000Z"
        }
      }'
{
    "awsEndpoint": "https://some.endpoint.aws",
    "awsRegion": "us-east-1",
    "awsKinesisStreamName": "my-stream",
    "awsCredentialPluginParam": "{\"accessKey\":\"myKey\",\"secretKey\":\"my-Secret\"}",
    "applicationName": "My test application",
    "checkpointInterval": "30000",
    "backoffTime": "4000",
    "numRetries": "3",
    "receiveQueueSize": 2000,
    "initialPositionInStream": "TRIM_HORIZON",
    "startAtTime": "2020-08-09T19:28:58.000Z"
}

Managing the Connector

Start

  • Pulsar Admin

  • curl

Refer to the complete pulsar-admin sources spec for all available options.

Assuming you have downloaded client.conf to the Pulsar folder:

# Start all instances of a connector
./bin/pulsar-admin sources start \
  --namespace "$NAMESPACE" \
  --name "$SOURCE_NAME" \
  --tenant "$TENANT"

# optionally add --instance-id to only start an individual instance

You need a Pulsar token for REST API authentication. This is different from your Astra DB application tokens.

  1. In the Astra Portal, click Streaming tenants.

  2. Click your tenant’s name, and then click the Settings tab.

  3. Click Create Token.

  4. Copy the token, store it securely, and then click Close.

  5. Click the Connect tab, and then copy the Web Service URL.

  6. Create environment variables for your tenant’s token and web service URL:

    export WEB_SERVICE_URL=<replace-me>
    export ASTRA_STREAMING_TOKEN=<replace-me>

    Refer to the complete Pulsar sources REST API spec, for all available options.

Start all instances of a connector:

curl -sS --fail -X POST "$WEB_SERVICE_URL/admin/v3/sources/$TENANT/$NAMESPACE/$SOURCE_NAME/start" \
  -H "Authorization: $ASTRA_STREAMING_TOKEN"

Start an individual instance of a connector:

curl -sS --fail -X POST "$WEB_SERVICE_URL/admin/v3/sources/$TENANT/$NAMESPACE/$SOURCE_NAME/$SOURCE_INSTANCEID/start" \
  -H "Authorization: $ASTRA_STREAMING_TOKEN"

Stop

  • Pulsar Admin

  • curl

Refer to the complete pulsar-admin sources spec for all available options.

Assuming you have downloaded client.conf to the Pulsar folder:

# Stop all instances of a connector
./bin/pulsar-admin sources stop \
  --namespace "$NAMESPACE" \
  --name "$SOURCE_NAME" \
  --tenant "$TENANT"

# optionally add --instance-id to only stop an individual instance

You need a Pulsar token for REST API authentication. This is different from your Astra DB application tokens.

  1. In the Astra Portal, click Streaming tenants.

  2. Click your tenant’s name, and then click the Settings tab.

  3. Click Create Token.

  4. Copy the token, store it securely, and then click Close.

  5. Click the Connect tab, and then copy the Web Service URL.

  6. Create environment variables for your tenant’s token and web service URL:

    export WEB_SERVICE_URL=<replace-me>
    export ASTRA_STREAMING_TOKEN=<replace-me>

    Refer to the complete Pulsar sources REST API spec, for all available options.

Stop all instances of a connector:

curl -sS --fail -X POST "$WEB_SERVICE_URL/admin/v3/sources/$TENANT/$NAMESPACE/$SOURCE_NAME/stop" \
  -H "Authorization: $ASTRA_STREAMING_TOKEN"

Stop an individual instance of a connector:

curl -sS --fail -X POST "$WEB_SERVICE_URL/admin/v3/sources/$TENANT/$NAMESPACE/$SOURCE_NAME/$SOURCE_INSTANCEID/stop" \
  -H "Authorization: $ASTRA_STREAMING_TOKEN"

Restart

  • Pulsar Admin

  • curl

Refer to the complete pulsar-admin sources spec for all available options.

Assuming you have downloaded client.conf to the Pulsar folder:

# Restart all instances of a connector
./bin/pulsar-admin sources restart \
  --namespace "$NAMESPACE" \
  --name "$SOURCE_NAME" \
  --tenant "$TENANT"

# optionally add --instance-id to only restart an individual instance

You need a Pulsar token for REST API authentication. This is different from your Astra DB application tokens.

  1. In the Astra Portal, click Streaming tenants.

  2. Click your tenant’s name, and then click the Settings tab.

  3. Click Create Token.

  4. Copy the token, store it securely, and then click Close.

  5. Click the Connect tab, and then copy the Web Service URL.

  6. Create environment variables for your tenant’s token and web service URL:

    export WEB_SERVICE_URL=<replace-me>
    export ASTRA_STREAMING_TOKEN=<replace-me>

    Refer to the complete Pulsar sources REST API spec, for all available options.

# Restart all instances of a connector
curl -sS --fail -X POST "$WEB_SERVICE_URL/admin/v3/sources/$TENANT/$NAMESPACE/$SOURCE_NAME/restart" \
  -H "Authorization: $ASTRA_STREAMING_TOKEN"

# Restart an individual instance of a connector
curl -sS --fail -X POST "$WEB_SERVICE_URL/admin/v3/sources/$TENANT/$NAMESPACE/$SOURCE_NAME/$SOURCE_INSTANCEID/restart" \
  -H "Authorization: $ASTRA_STREAMING_TOKEN"

Update

  • Pulsar Admin

  • curl

Refer to the complete pulsar-admin sources spec for all available options.

Assuming you have downloaded client.conf to the Pulsar folder:

./bin/pulsar-admin sources update \
  --source-type kinesis \
  --name "$SOURCE_NAME" \
  --destination-topic-name "persistent://$TENANT/$NAMESPACE/$DESTINATION_TOPIC" \
  --tenant "$TENANT" \
  --parallelism 2 \
  --source-config '{}'

You need a Pulsar token for REST API authentication. This is different from your Astra DB application tokens.

  1. In the Astra Portal, click Streaming tenants.

  2. Click your tenant’s name, and then click the Settings tab.

  3. Click Create Token.

  4. Copy the token, store it securely, and then click Close.

  5. Click the Connect tab, and then copy the Web Service URL.

  6. Create environment variables for your tenant’s token and web service URL:

    export WEB_SERVICE_URL=<replace-me>
    export ASTRA_STREAMING_TOKEN=<replace-me>

    Refer to the complete Pulsar sources REST API spec, for all available options.

curl -sS --fail -X PUT "$WEB_SERVICE_URL/admin/v3/astrasources/$TENANT/$NAMESPACE/$SOURCE_NAME" \
  -H "accept: application/json" \
  -H "Content-Type: application/json" \
  -H "Authorization: $ASTRA_STREAMING_TOKEN" \
  -d '{
        "tenant": "'$TENANT'",
        "topicName": "persistent://'$TENANT'/'$NAMESPACE'/'$DESTINATION_TOPIC'",
        "name": "'$SOURCE_NAME'",
        "namespace": "'$NAMESPACE'",
        "archive": "builtin://kinesis",
        "parallelism": 1,
        "processingGuarantees": "ATLEAST_ONCE",
        "configs": {
          "awsEndpoint": "https://some.endpoint.aws",
          "awsRegion": "us-east-1",
          "awsKinesisStreamName": "my-stream",
          "awsCredentialPluginParam": "{\"accessKey\":\"myKey\",\"secretKey\":\"my-Secret\"}",
          "applicationName": "My test application",
          "checkpointInterval": "30000",
          "backoffTime": "4000",
          "numRetries": "3",
          "receiveQueueSize": 2000,
          "initialPositionInStream": "TRIM_HORIZON",
          "startAtTime": "2020-08-09T19:28:58.000Z"
        }
      }'
Result
 {
   "tenant": "string",
   "namespace": "string",
   "name": "string",
   "className": "string",
   "topicName": "string",
   "producerConfig": {
     "maxPendingMessages": 0,
     "maxPendingMessagesAcrossPartitions": 0,
     "useThreadLocalProducers": true,
     "cryptoConfig": {
       "cryptoKeyReaderClassName": "string",
       "cryptoKeyReaderConfig": {
         "property1": {},
         "property2": {}
       },
       "encryptionKeys": [
         "string"
       ],
       "producerCryptoFailureAction": "FAIL",
       "consumerCryptoFailureAction": "FAIL"
     },
     "batchBuilder": "string"
   },
   "serdeClassName": "string",
   "schemaType": "string",
   "configs": {
     "property1": {},
     "property2": {}
   },
   "secrets": {
     "property1": {},
     "property2": {}
   },
   "parallelism": 0,
   "processingGuarantees": "ATLEAST_ONCE",
   "resources": {
     "cpu": 0,
     "ram": 0,
     "disk": 0
   },
   "archive": "string",
   "runtimeFlags": "string",
   "customRuntimeOptions": "string",
   "batchSourceConfig": {
     "discoveryTriggererClassName": "string",
     "discoveryTriggererConfig": {
       "property1": {},
       "property2": {}
     }
   },
   "batchBuilder": "string"
}

Delete

  • Pulsar Admin

  • curl

Refer to the complete pulsar-admin sources spec for all available options.

Assuming you have downloaded client.conf to the Pulsar folder:

# Delete all instances of a connector
./bin/pulsar-admin sources delete \
  --namespace "$NAMESPACE" \
  --name "$SOURCE_NAME" \
  --tenant "$TENANT"

You need a Pulsar token for REST API authentication. This is different from your Astra DB application tokens.

  1. In the Astra Portal, click Streaming tenants.

  2. Click your tenant’s name, and then click the Settings tab.

  3. Click Create Token.

  4. Copy the token, store it securely, and then click Close.

  5. Click the Connect tab, and then copy the Web Service URL.

  6. Create environment variables for your tenant’s token and web service URL:

    export WEB_SERVICE_URL=<replace-me>
    export ASTRA_STREAMING_TOKEN=<replace-me>

    Refer to the complete Pulsar sources REST API spec, for all available options.

# Delete all instances of a connector
curl -sS --fail -X DELETE "$WEB_SERVICE_URL/admin/v3/sources/$TENANT/$NAMESPACE/$SOURCE_NAME" \
  -H "Authorization: $ASTRA_STREAMING_TOKEN"

Monitoring the Connector

Info

  • Pulsar Admin

  • curl

  • Sample Config Data

Refer to the complete pulsar-admin sources spec for all available options.

Assuming you have downloaded client.conf to the Pulsar folder:

# Get information about connector
./bin/pulsar-admin sources get \
  --namespace "$NAMESPACE" \
  --name "$SOURCE_NAME" \
  --tenant "$TENANT"

You need a Pulsar token for REST API authentication. This is different from your Astra DB application tokens.

  1. In the Astra Portal, click Streaming tenants.

  2. Click your tenant’s name, and then click the Settings tab.

  3. Click Create Token.

  4. Copy the token, store it securely, and then click Close.

  5. Click the Connect tab, and then copy the Web Service URL.

  6. Create environment variables for your tenant’s token and web service URL:

    export WEB_SERVICE_URL=<replace-me>
    export ASTRA_STREAMING_TOKEN=<replace-me>
  7. Use these values to form curl commands to the REST API, for example:

    # Get a connector's information
    curl -sS --fail "$WEB_SERVICE_URL/admin/v3/sources/$TENANT/$NAMESPACE/$SOURCE_NAME" \
      -H "accept: application/json" \
      -H "Authorization: $ASTRA_STREAMING_TOKEN"
{
    "awsEndpoint": "https://some.endpoint.aws",
    "awsRegion": "us-east-1",
    "awsKinesisStreamName": "my-stream",
    "awsCredentialPluginParam": "{\"accessKey\":\"myKey\",\"secretKey\":\"my-Secret\"}",
    "applicationName": "My test application",
    "checkpointInterval": "30000",
    "backoffTime": "4000",
    "numRetries": "3",
    "receiveQueueSize": 2000,
    "initialPositionInStream": "TRIM_HORIZON",
    "startAtTime": "2020-08-09T19:28:58.000Z"
}

Health

  • Pulsar Admin

  • curl

Refer to the complete pulsar-admin sources spec for all available options.

Assuming you have downloaded client.conf to the Pulsar folder:

# Check connector status
./bin/pulsar-admin sources status \
  --instance-id "$SOURCE_INSTANCEID" \
  --namespace "$NAMESPACE" \
  --name "$SOURCE_NAME" \
  --tenant "$TENANT"

You need a Pulsar token for REST API authentication. This is different from your Astra DB application tokens.

  1. In the Astra Portal, click Streaming tenants.

  2. Click your tenant’s name, and then click the Settings tab.

  3. Click Create Token.

  4. Copy the token, store it securely, and then click Close.

  5. Click the Connect tab, and then copy the Web Service URL.

  6. Create environment variables for your tenant’s token and web service URL:

    export WEB_SERVICE_URL=<replace-me>
    export ASTRA_STREAMING_TOKEN=<replace-me>
  7. Use these values to form curl commands to the REST API, for example:

    # Get the status of all connector instances
    curl -sS --fail "$WEB_SERVICE_URL/admin/v3/sources/$TENANT/$NAMESPACE/$SOURCE_NAME/status" \
      -H "accept: application/json" \
      -H "Authorization: $ASTRA_STREAMING_TOKEN"
    
    # Get the status of an individual connector instance
    curl "$WEB_SERVICE_URL/admin/v3/sources/$TENANT/$NAMESPACE/$SOURCE_NAME/$SOURCE_INSTANCEID/status" \
      -H "accept: application/json" \
      -H "Authorization: $ASTRA_STREAMING_TOKEN"
Result

Status response for all connector instances:

{
  "numInstances": 0,
  "numRunning": 0,
  "instances": [
    {
      "instanceId": 0,
      "status": {
        "running": true,
        "error": "string",
        "numRestarts": 0,
        "numReceivedFromSource": 0,
        "numSystemExceptions": 0,
        "latestSystemExceptions": [
          {
            "exceptionString": "string",
            "timestampMs": 0
          }
        ],
        "numSourceExceptions": 0,
        "latestSourceExceptions": [
          {
            "exceptionString": "string",
            "timestampMs": 0
          }
        ],
        "numWritten": 0,
        "lastReceivedTime": 0,
        "workerId": "string"
      }
    }
  ]
}

Status response for individual connector instance:

{
  "running": true,
  "error": "string",
  "numRestarts": 0,
  "numReceivedFromSource": 0,
  "numSystemExceptions": 0,
  "latestSystemExceptions": [
    {
      "exceptionString": "string",
      "timestampMs": 0
    }
  ],
  "numSourceExceptions": 0,
  "latestSourceExceptions": [
    {
      "exceptionString": "string",
      "timestampMs": 0
    }
  ],
  "numWritten": 0,
  "lastReceivedTime": 0,
  "workerId": "string"
}

Metrics

Astra Streaming exposes Prometheus formatted metrics for every connector. Refer to scrape metrics with Prometheus page for more detail.

Connector Reference

There are two sets of parameters that support source connectors.

Astra Streaming

Name Required Default Description

archive

true

The connector type, like 'builtin://debezium-mysql'

batchBuilder

false

BatchBuilder provides two types of batch construction methods, DEFAULT and KEY_BASED. The default value is: DEFAULT

batchSourceConfig

false

Batch source config key/value (as a JSON string)

className

true

The connector type’s class reference, like 'org.apache.pulsar.io.debezium.mysql.DebeziumMysqlSource'

configs

false

{}

JSON key/value config of source type specific settings. Example: {"property1":"1234","property2":{"subProperty":"asdf"}}

customRuntimeOptions

false

A string that encodes options to customize the runtime, see Apache Pulsar docs for configured runtime for details

name

true

Give your source a good name for later reference. The name must start with a lowercase alphabetic character. It can only contain lowercase alphanumeric characters, and hyphens (kebab-case).

namespace

true

The namespace you’d like the source created under

parallelism

true

1

The number of a Pulsar Source instances to run

processingGuarantees

true

ATLEAST_ONCE

The delivery semantics applied to the Pulsar Sink. Values are 'ATLEAST_ONCE', 'ATMOST_ONCE', 'EFFECTIVELY_ONCE'

producerConfig

false

The custom producer configuration (as a JSON string)

resources

false

The compute resources that need to be allocated per source instance (applicable only to the process)(as a JSON string). Example: {"cpu": 0.25,"disk":1000000000,"ram":500000000}

runtimeFlags

false

A string that encodes options to customize the runtime, see Apache Pulsar docs for configured runtime for details

schemaType

false

The schema type (either a builtin schema like 'avro', 'json', etc.. or custom Schema class name to be used to encode messages emitted from the Pulsar Source

secrets

false

This is a map of secretName(that is how the secret is going to be accessed in the function via context) to an object that encapsulates how the secret is fetched by the underlying secrets provider. The type of an value here can be found by the SecretProviderConfigurator.getSecretObjectType() method

serdeClassName

false

The SerDe classname for the Pulsar Source

tenant

true

The tenant you’d like the source created under

topicName

true

The name of an existing topic in Astra Streaming, where messages will be published to. Should be in the format of [non-]persistent://<tenant>/<namespace>/<topic-name>

Kinesis configuration options

These values are provided in the configs area:

Name Type Required Default Description

initialPositionInStream

InitialPositionInStream

false

LATEST

The position where the connector starts from. Below are the available options:
* AT_TIMESTAMP: start from the record at or after the specified timestamp.
* LATEST: start after the most recent data record.
* TRIM_HORIZON: start from the oldest available data record.

startAtTime

Date

false

" " (empty string)

If set to AT_TIMESTAMP, it specifies the point in time to start consumption.

applicationName

String

false

Pulsar IO connector

The name of the Amazon Kinesis application.
By default, the application name is included in the user agent string used to make AWS requests. This can assist with troubleshooting, for example, distinguish requests made by separate connector instances.

checkpointInterval

long

false

60000

The frequency of the Kinesis stream checkpoint in milliseconds.

backoffTime

long

false

3000

The amount of time to delay between requests when the connector encounters a throttling exception from AWS Kinesis in milliseconds.

numRetries

int

false

3

The number of re-attempts when the connector encounters an exception while trying to set a checkpoint.

receiveQueueSize

int

false

1000

The maximum number of AWS records that can be buffered inside the connector.
Once the receiveQueueSize is reached, the connector does not consume any messages from Kinesis until some messages in the queue are successfully consumed.

dynamoEndpoint

String

false

" " (empty string)

The Dynamo end-point URL, which can be found at here.

cloudwatchEndpoint

String

false

" " (empty string)

The Cloudwatch end-point URL, which can be found at here.

useEnhancedFanOut

boolean

false

true

If set to true, it uses Kinesis enhanced fan-out. +If set to false, it uses polling.

awsEndpoint

String

false

" " (empty string)

The Kinesis end-point URL, which can be found at here.

awsRegion

String

false

" " (empty string)

The AWS region. Example us-west-1, us-west-2

awsKinesisStreamName

String

true

" " (empty string)

The Kinesis stream name.

awsCredentialPluginParam

String

false

" " (empty string)

The JSON parameter to initialize awsCredentialsProviderPlugin.

awsCredentialPluginName

String

false

" " (empty string)

The fully-qualified class name of implementation of {@inject: github:AwsCredentialProviderPlugin:/pulsar-io/aws/src/main/java/org/apache/pulsar/io/aws/AwsCredentialProviderPlugin.java}
awsCredentialProviderPlugin has the following built-in plugs:
org.apache.pulsar.io.kinesis.AwsDefaultProviderChainPlugin: this plugin uses the default AWS provider chain. For more information, see using the default credential provider chain.
org.apache.pulsar.io.kinesis.STSAssumeRoleProviderPlugin: this plugin takes a configuration via the awsCredentialPluginParam that describes a role to assume when running the KCL.
JSON configuration example
{"roleArn": "arn…​", "roleSessionName": "name"}
awsCredentialPluginName is a factory class which creates an AWSCredentialsProvider that is used by Kinesis sink.
If awsCredentialPluginName set to empty, the Kinesis sink creates a default AWSCredentialsProvider which accepts json-map of credentials in awsCredentialPluginParam.

The Astra Streaming Kinesis source connector supports all configuration properties provided by Apache Pulsar. For a complete list, see the Kinesis source connector properties.

Was this helpful?

Give Feedback

How can we improve the documentation?

© 2025 DataStax, an IBM Company | Privacy policy | Terms of use | Manage Privacy Choices

Apache, Apache Cassandra, Cassandra, Apache Tomcat, Tomcat, Apache Lucene, Apache Solr, Apache Hadoop, Hadoop, Apache Pulsar, Pulsar, Apache Spark, Spark, Apache TinkerPop, TinkerPop, Apache Kafka and Kafka are either registered trademarks or trademarks of the Apache Software Foundation or its subsidiaries in Canada, the United States and/or other countries. Kubernetes is the registered trademark of the Linux Foundation.

General Inquiries: +1 (650) 389-6000, info@datastax.com