Kinesis
The Kinesis source connector pulls data from Amazon Kinesis and persists data into an Apache Pulsar™ topic.
For more, see Apache Pulsar’s Kinesis source documentation.
Get Started
Set the following environment variables using pulsar-admin
or curl:
export TENANT=<replace-me>
export DESTINATION_TOPIC=<replace-me>
export NAMESPACE=default
export SOURCE_NAME=kinesis-src
-
Pulsar Admin
-
curl
-
Sample Config Data
Refer to the complete pulsar-admin sources spec for all available options.
Assuming you have downloaded client.conf
to the Pulsar
folder:
./bin/pulsar-admin sources create \
--source-type kinesis \
--name "$SOURCE_NAME" \
--destination-topic-name "persistent://$TENANT/$NAMESPACE/$DESTINATION_TOPIC" \
--tenant "$TENANT" \
--source-config '{
"awsEndpoint": "https://some.endpoint.aws",
"awsRegion": "us-east-1",
"awsKinesisStreamName": "my-stream",
"awsCredentialPluginParam": "{\"accessKey\":\"myKey\",\"secretKey\":\"my-Secret\"}",
"applicationName": "My test application",
"checkpointInterval": "30000",
"backoffTime": "4000",
"numRetries": "3",
"receiveQueueSize": 2000,
"initialPositionInStream": "TRIM_HORIZON",
"startAtTime": "2020-08-09T19:28:58.000Z"
}'
You need a Pulsar token for REST API authentication. This is different from your Astra DB application tokens.
-
In the Astra Portal, click Streaming tenants.
-
Click your tenant’s name, and then click the Settings tab.
-
Click Create Token.
-
Copy the token, store it securely, and then click Close.
-
Click the Connect tab, and then copy the Web Service URL.
-
Create environment variables for your tenant’s token and web service URL:
export WEB_SERVICE_URL=<replace-me> export ASTRA_STREAMING_TOKEN=<replace-me>
Refer to the complete Pulsar sources REST API spec, for all available options.
curl -sS --fail -X POST "$WEB_SERVICE_URL/admin/v3/astrasources/$TENANT/$NAMESPACE/$SOURCE_NAME" \
-H "accept: application/json" \
-H "Content-Type: application/json" \
-H "Authorization: $ASTRA_STREAMING_TOKEN" \
-d '{
"tenant": "'$TENANT'",
"topicName": "persistent://'$TENANT'/'$NAMESPACE'/'$DESTINATION_TOPIC'",
"name": "'$SOURCE_NAME'",
"namespace": "'$NAMESPACE'",
"archive": "builtin://kinesis",
"parallelism": 1,
"processingGuarantees": "ATLEAST_ONCE",
"configs": {
"awsEndpoint": "https://some.endpoint.aws",
"awsRegion": "us-east-1",
"awsKinesisStreamName": "my-stream",
"awsCredentialPluginParam": "{\"accessKey\":\"myKey\",\"secretKey\":\"my-Secret\"}",
"applicationName": "My test application",
"checkpointInterval": "30000",
"backoffTime": "4000",
"numRetries": "3",
"receiveQueueSize": 2000,
"initialPositionInStream": "TRIM_HORIZON",
"startAtTime": "2020-08-09T19:28:58.000Z"
}
}'
{
"awsEndpoint": "https://some.endpoint.aws",
"awsRegion": "us-east-1",
"awsKinesisStreamName": "my-stream",
"awsCredentialPluginParam": "{\"accessKey\":\"myKey\",\"secretKey\":\"my-Secret\"}",
"applicationName": "My test application",
"checkpointInterval": "30000",
"backoffTime": "4000",
"numRetries": "3",
"receiveQueueSize": 2000,
"initialPositionInStream": "TRIM_HORIZON",
"startAtTime": "2020-08-09T19:28:58.000Z"
}
Managing the Connector
Start
-
Pulsar Admin
-
curl
Refer to the complete pulsar-admin sources spec for all available options.
Assuming you have downloaded client.conf
to the Pulsar
folder:
# Start all instances of a connector
./bin/pulsar-admin sources start \
--namespace "$NAMESPACE" \
--name "$SOURCE_NAME" \
--tenant "$TENANT"
# optionally add --instance-id to only start an individual instance
You need a Pulsar token for REST API authentication. This is different from your Astra DB application tokens.
-
In the Astra Portal, click Streaming tenants.
-
Click your tenant’s name, and then click the Settings tab.
-
Click Create Token.
-
Copy the token, store it securely, and then click Close.
-
Click the Connect tab, and then copy the Web Service URL.
-
Create environment variables for your tenant’s token and web service URL:
export WEB_SERVICE_URL=<replace-me> export ASTRA_STREAMING_TOKEN=<replace-me>
Refer to the complete Pulsar sources REST API spec, for all available options.
Start all instances of a connector:
curl -sS --fail -X POST "$WEB_SERVICE_URL/admin/v3/sources/$TENANT/$NAMESPACE/$SOURCE_NAME/start" \
-H "Authorization: $ASTRA_STREAMING_TOKEN"
Start an individual instance of a connector:
curl -sS --fail -X POST "$WEB_SERVICE_URL/admin/v3/sources/$TENANT/$NAMESPACE/$SOURCE_NAME/$SOURCE_INSTANCEID/start" \
-H "Authorization: $ASTRA_STREAMING_TOKEN"
Stop
-
Pulsar Admin
-
curl
Refer to the complete pulsar-admin sources spec for all available options.
Assuming you have downloaded client.conf
to the Pulsar
folder:
# Stop all instances of a connector
./bin/pulsar-admin sources stop \
--namespace "$NAMESPACE" \
--name "$SOURCE_NAME" \
--tenant "$TENANT"
# optionally add --instance-id to only stop an individual instance
You need a Pulsar token for REST API authentication. This is different from your Astra DB application tokens.
-
In the Astra Portal, click Streaming tenants.
-
Click your tenant’s name, and then click the Settings tab.
-
Click Create Token.
-
Copy the token, store it securely, and then click Close.
-
Click the Connect tab, and then copy the Web Service URL.
-
Create environment variables for your tenant’s token and web service URL:
export WEB_SERVICE_URL=<replace-me> export ASTRA_STREAMING_TOKEN=<replace-me>
Refer to the complete Pulsar sources REST API spec, for all available options.
Stop all instances of a connector:
curl -sS --fail -X POST "$WEB_SERVICE_URL/admin/v3/sources/$TENANT/$NAMESPACE/$SOURCE_NAME/stop" \
-H "Authorization: $ASTRA_STREAMING_TOKEN"
Stop an individual instance of a connector:
curl -sS --fail -X POST "$WEB_SERVICE_URL/admin/v3/sources/$TENANT/$NAMESPACE/$SOURCE_NAME/$SOURCE_INSTANCEID/stop" \
-H "Authorization: $ASTRA_STREAMING_TOKEN"
Restart
-
Pulsar Admin
-
curl
Refer to the complete pulsar-admin sources spec for all available options.
Assuming you have downloaded client.conf
to the Pulsar
folder:
# Restart all instances of a connector
./bin/pulsar-admin sources restart \
--namespace "$NAMESPACE" \
--name "$SOURCE_NAME" \
--tenant "$TENANT"
# optionally add --instance-id to only restart an individual instance
You need a Pulsar token for REST API authentication. This is different from your Astra DB application tokens.
-
In the Astra Portal, click Streaming tenants.
-
Click your tenant’s name, and then click the Settings tab.
-
Click Create Token.
-
Copy the token, store it securely, and then click Close.
-
Click the Connect tab, and then copy the Web Service URL.
-
Create environment variables for your tenant’s token and web service URL:
export WEB_SERVICE_URL=<replace-me> export ASTRA_STREAMING_TOKEN=<replace-me>
Refer to the complete Pulsar sources REST API spec, for all available options.
# Restart all instances of a connector
curl -sS --fail -X POST "$WEB_SERVICE_URL/admin/v3/sources/$TENANT/$NAMESPACE/$SOURCE_NAME/restart" \
-H "Authorization: $ASTRA_STREAMING_TOKEN"
# Restart an individual instance of a connector
curl -sS --fail -X POST "$WEB_SERVICE_URL/admin/v3/sources/$TENANT/$NAMESPACE/$SOURCE_NAME/$SOURCE_INSTANCEID/restart" \
-H "Authorization: $ASTRA_STREAMING_TOKEN"
Update
-
Pulsar Admin
-
curl
Refer to the complete pulsar-admin sources spec for all available options.
Assuming you have downloaded client.conf
to the Pulsar
folder:
./bin/pulsar-admin sources update \
--source-type kinesis \
--name "$SOURCE_NAME" \
--destination-topic-name "persistent://$TENANT/$NAMESPACE/$DESTINATION_TOPIC" \
--tenant "$TENANT" \
--parallelism 2 \
--source-config '{}'
You need a Pulsar token for REST API authentication. This is different from your Astra DB application tokens.
-
In the Astra Portal, click Streaming tenants.
-
Click your tenant’s name, and then click the Settings tab.
-
Click Create Token.
-
Copy the token, store it securely, and then click Close.
-
Click the Connect tab, and then copy the Web Service URL.
-
Create environment variables for your tenant’s token and web service URL:
export WEB_SERVICE_URL=<replace-me> export ASTRA_STREAMING_TOKEN=<replace-me>
Refer to the complete Pulsar sources REST API spec, for all available options.
curl -sS --fail -X PUT "$WEB_SERVICE_URL/admin/v3/astrasources/$TENANT/$NAMESPACE/$SOURCE_NAME" \
-H "accept: application/json" \
-H "Content-Type: application/json" \
-H "Authorization: $ASTRA_STREAMING_TOKEN" \
-d '{
"tenant": "'$TENANT'",
"topicName": "persistent://'$TENANT'/'$NAMESPACE'/'$DESTINATION_TOPIC'",
"name": "'$SOURCE_NAME'",
"namespace": "'$NAMESPACE'",
"archive": "builtin://kinesis",
"parallelism": 1,
"processingGuarantees": "ATLEAST_ONCE",
"configs": {
"awsEndpoint": "https://some.endpoint.aws",
"awsRegion": "us-east-1",
"awsKinesisStreamName": "my-stream",
"awsCredentialPluginParam": "{\"accessKey\":\"myKey\",\"secretKey\":\"my-Secret\"}",
"applicationName": "My test application",
"checkpointInterval": "30000",
"backoffTime": "4000",
"numRetries": "3",
"receiveQueueSize": 2000,
"initialPositionInStream": "TRIM_HORIZON",
"startAtTime": "2020-08-09T19:28:58.000Z"
}
}'
Result
{
"tenant": "string",
"namespace": "string",
"name": "string",
"className": "string",
"topicName": "string",
"producerConfig": {
"maxPendingMessages": 0,
"maxPendingMessagesAcrossPartitions": 0,
"useThreadLocalProducers": true,
"cryptoConfig": {
"cryptoKeyReaderClassName": "string",
"cryptoKeyReaderConfig": {
"property1": {},
"property2": {}
},
"encryptionKeys": [
"string"
],
"producerCryptoFailureAction": "FAIL",
"consumerCryptoFailureAction": "FAIL"
},
"batchBuilder": "string"
},
"serdeClassName": "string",
"schemaType": "string",
"configs": {
"property1": {},
"property2": {}
},
"secrets": {
"property1": {},
"property2": {}
},
"parallelism": 0,
"processingGuarantees": "ATLEAST_ONCE",
"resources": {
"cpu": 0,
"ram": 0,
"disk": 0
},
"archive": "string",
"runtimeFlags": "string",
"customRuntimeOptions": "string",
"batchSourceConfig": {
"discoveryTriggererClassName": "string",
"discoveryTriggererConfig": {
"property1": {},
"property2": {}
}
},
"batchBuilder": "string"
}
Delete
-
Pulsar Admin
-
curl
Refer to the complete pulsar-admin sources spec for all available options.
Assuming you have downloaded client.conf
to the Pulsar
folder:
# Delete all instances of a connector
./bin/pulsar-admin sources delete \
--namespace "$NAMESPACE" \
--name "$SOURCE_NAME" \
--tenant "$TENANT"
You need a Pulsar token for REST API authentication. This is different from your Astra DB application tokens.
-
In the Astra Portal, click Streaming tenants.
-
Click your tenant’s name, and then click the Settings tab.
-
Click Create Token.
-
Copy the token, store it securely, and then click Close.
-
Click the Connect tab, and then copy the Web Service URL.
-
Create environment variables for your tenant’s token and web service URL:
export WEB_SERVICE_URL=<replace-me> export ASTRA_STREAMING_TOKEN=<replace-me>
Refer to the complete Pulsar sources REST API spec, for all available options.
# Delete all instances of a connector
curl -sS --fail -X DELETE "$WEB_SERVICE_URL/admin/v3/sources/$TENANT/$NAMESPACE/$SOURCE_NAME" \
-H "Authorization: $ASTRA_STREAMING_TOKEN"
Monitoring the Connector
Info
-
Pulsar Admin
-
curl
-
Sample Config Data
Refer to the complete pulsar-admin sources spec for all available options.
Assuming you have downloaded client.conf
to the Pulsar
folder:
# Get information about connector
./bin/pulsar-admin sources get \
--namespace "$NAMESPACE" \
--name "$SOURCE_NAME" \
--tenant "$TENANT"
You need a Pulsar token for REST API authentication. This is different from your Astra DB application tokens.
-
In the Astra Portal, click Streaming tenants.
-
Click your tenant’s name, and then click the Settings tab.
-
Click Create Token.
-
Copy the token, store it securely, and then click Close.
-
Click the Connect tab, and then copy the Web Service URL.
-
Create environment variables for your tenant’s token and web service URL:
export WEB_SERVICE_URL=<replace-me> export ASTRA_STREAMING_TOKEN=<replace-me>
-
Use these values to form curl commands to the REST API, for example:
# Get a connector's information curl -sS --fail "$WEB_SERVICE_URL/admin/v3/sources/$TENANT/$NAMESPACE/$SOURCE_NAME" \ -H "accept: application/json" \ -H "Authorization: $ASTRA_STREAMING_TOKEN"
{
"awsEndpoint": "https://some.endpoint.aws",
"awsRegion": "us-east-1",
"awsKinesisStreamName": "my-stream",
"awsCredentialPluginParam": "{\"accessKey\":\"myKey\",\"secretKey\":\"my-Secret\"}",
"applicationName": "My test application",
"checkpointInterval": "30000",
"backoffTime": "4000",
"numRetries": "3",
"receiveQueueSize": 2000,
"initialPositionInStream": "TRIM_HORIZON",
"startAtTime": "2020-08-09T19:28:58.000Z"
}
Health
-
Pulsar Admin
-
curl
Refer to the complete pulsar-admin sources spec for all available options.
Assuming you have downloaded client.conf
to the Pulsar
folder:
# Check connector status
./bin/pulsar-admin sources status \
--instance-id "$SOURCE_INSTANCEID" \
--namespace "$NAMESPACE" \
--name "$SOURCE_NAME" \
--tenant "$TENANT"
You need a Pulsar token for REST API authentication. This is different from your Astra DB application tokens.
-
In the Astra Portal, click Streaming tenants.
-
Click your tenant’s name, and then click the Settings tab.
-
Click Create Token.
-
Copy the token, store it securely, and then click Close.
-
Click the Connect tab, and then copy the Web Service URL.
-
Create environment variables for your tenant’s token and web service URL:
export WEB_SERVICE_URL=<replace-me> export ASTRA_STREAMING_TOKEN=<replace-me>
-
Use these values to form curl commands to the REST API, for example:
# Get the status of all connector instances curl -sS --fail "$WEB_SERVICE_URL/admin/v3/sources/$TENANT/$NAMESPACE/$SOURCE_NAME/status" \ -H "accept: application/json" \ -H "Authorization: $ASTRA_STREAMING_TOKEN" # Get the status of an individual connector instance curl "$WEB_SERVICE_URL/admin/v3/sources/$TENANT/$NAMESPACE/$SOURCE_NAME/$SOURCE_INSTANCEID/status" \ -H "accept: application/json" \ -H "Authorization: $ASTRA_STREAMING_TOKEN"
Result
Status response for all connector instances:
{
"numInstances": 0,
"numRunning": 0,
"instances": [
{
"instanceId": 0,
"status": {
"running": true,
"error": "string",
"numRestarts": 0,
"numReceivedFromSource": 0,
"numSystemExceptions": 0,
"latestSystemExceptions": [
{
"exceptionString": "string",
"timestampMs": 0
}
],
"numSourceExceptions": 0,
"latestSourceExceptions": [
{
"exceptionString": "string",
"timestampMs": 0
}
],
"numWritten": 0,
"lastReceivedTime": 0,
"workerId": "string"
}
}
]
}
Status response for individual connector instance:
{
"running": true,
"error": "string",
"numRestarts": 0,
"numReceivedFromSource": 0,
"numSystemExceptions": 0,
"latestSystemExceptions": [
{
"exceptionString": "string",
"timestampMs": 0
}
],
"numSourceExceptions": 0,
"latestSourceExceptions": [
{
"exceptionString": "string",
"timestampMs": 0
}
],
"numWritten": 0,
"lastReceivedTime": 0,
"workerId": "string"
}
Metrics
Astra Streaming exposes Prometheus formatted metrics for every connector. Refer to scrape metrics with Prometheus page for more detail.
Connector Reference
There are two sets of parameters that support source connectors.
Astra Streaming
Name | Required | Default | Description |
---|---|---|---|
archive |
true |
The connector type, like 'builtin://debezium-mysql' |
|
batchBuilder |
false |
BatchBuilder provides two types of batch construction methods, DEFAULT and KEY_BASED. The default value is: DEFAULT |
|
batchSourceConfig |
false |
Batch source config key/value (as a JSON string) |
|
className |
true |
The connector type’s class reference, like 'org.apache.pulsar.io.debezium.mysql.DebeziumMysqlSource' |
|
configs |
false |
{} |
JSON key/value config of source type specific settings. Example: {"property1":"1234","property2":{"subProperty":"asdf"}} |
customRuntimeOptions |
false |
A string that encodes options to customize the runtime, see Apache Pulsar docs for configured runtime for details |
|
name |
true |
Give your source a good name for later reference. The name must start with a lowercase alphabetic character. It can only contain lowercase alphanumeric characters, and hyphens (kebab-case). |
|
namespace |
true |
The namespace you’d like the source created under |
|
parallelism |
true |
1 |
The number of a Pulsar Source instances to run |
processingGuarantees |
true |
ATLEAST_ONCE |
The delivery semantics applied to the Pulsar Sink. Values are 'ATLEAST_ONCE', 'ATMOST_ONCE', 'EFFECTIVELY_ONCE' |
producerConfig |
false |
The custom producer configuration (as a JSON string) |
|
resources |
false |
The compute resources that need to be allocated per source instance (applicable only to the process)(as a JSON string). Example: {"cpu": 0.25,"disk":1000000000,"ram":500000000} |
|
runtimeFlags |
false |
A string that encodes options to customize the runtime, see Apache Pulsar docs for configured runtime for details |
|
schemaType |
false |
The schema type (either a builtin schema like 'avro', 'json', etc.. or custom Schema class name to be used to encode messages emitted from the Pulsar Source |
|
secrets |
false |
This is a map of secretName(that is how the secret is going to be accessed in the function via context) to an object that encapsulates how the secret is fetched by the underlying secrets provider. The type of an value here can be found by the SecretProviderConfigurator.getSecretObjectType() method |
|
serdeClassName |
false |
The SerDe classname for the Pulsar Source |
|
tenant |
true |
The tenant you’d like the source created under |
|
topicName |
true |
The name of an existing topic in Astra Streaming, where messages will be published to. Should be in the format of [non-]persistent://<tenant>/<namespace>/<topic-name> |
Kinesis configuration options
These values are provided in the configs
area:
Name | Type | Required | Default | Description |
---|---|---|---|---|
|
InitialPositionInStream |
false |
LATEST |
The position where the connector starts from. Below are the available options: |
|
Date |
false |
" " (empty string) |
If set to |
|
String |
false |
Pulsar IO connector |
The name of the Amazon Kinesis application. |
|
long |
false |
60000 |
The frequency of the Kinesis stream checkpoint in milliseconds. |
|
long |
false |
3000 |
The amount of time to delay between requests when the connector encounters a throttling exception from AWS Kinesis in milliseconds. |
|
int |
false |
3 |
The number of re-attempts when the connector encounters an exception while trying to set a checkpoint. |
|
int |
false |
1000 |
The maximum number of AWS records that can be buffered inside the connector. |
|
String |
false |
" " (empty string) |
The Dynamo end-point URL, which can be found at here. |
|
String |
false |
" " (empty string) |
The Cloudwatch end-point URL, which can be found at here. |
|
boolean |
false |
true |
If set to true, it uses Kinesis enhanced fan-out. +If set to false, it uses polling. |
|
String |
false |
" " (empty string) |
The Kinesis end-point URL, which can be found at here. |
|
String |
false |
" " (empty string) |
The AWS region. Example |
|
String |
true |
" " (empty string) |
The Kinesis stream name. |
|
String |
false |
" " (empty string) |
The JSON parameter to initialize |
|
String |
false |
" " (empty string) |
The fully-qualified class name of implementation of {@inject: github:AwsCredentialProviderPlugin:/pulsar-io/aws/src/main/java/org/apache/pulsar/io/aws/AwsCredentialProviderPlugin.java} |
The Astra Streaming Kinesis source connector supports all configuration properties provided by Apache Pulsar. For a complete list, see the Kinesis source connector properties.