Astra DB (Apache Cassandra® enhanced)
DataStax Astra DB Sink Connector is based on the open-source Apache Cassandra® sink connector for Apache Pulsar™. Depending on how you deploy the connector, it can be used to sink topic messages with a table in Astra DB or a table in a Cassandra cluster outside of DB.
The Astra Streaming portal provides simple way to connect this sink and a table in Astra DB with simply a token. Using pulsar-admin
or the REST API, you can configure the sink to connect with a Cassandra connection manually.
This reference assumes you are manually connecting to a Cassandra table.
If you would like to see the code, refer to the open source here. |
Get Started
Set the following environment variables using pulsar-admin
or curl:
export TENANT=<replace-me>
export INPUT_TOPIC=<replace-me>
export NAMESPACE=default
export SINK_NAME=astra-db-sink
-
Pulsar Admin
-
curl
-
Sample Config Data
Refer to the complete pulsar-admin sinks spec for all available options.
Assuming you have downloaded client.conf
to the Pulsar
folder:
# NOTE: This is not a working example.
CREATE KEYSPACE ks1 WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'};
CREATE TABLE ks1.table1 (name text, PRIMARY KEY (name));
# create topic pulsar-topic-ks1-table1
./bin/pulsar-admin sinks create \
--sink-type cassandra-enhanced \
--name "pulsar-sink-ks1-table1" \
--inputs "persistent://$TENANT/$NAMESPACE/pulsar-topic-ks1-table1" \
--tenant "$TENANT" \
--sink-config '{
"contactPoints": "cassandra",
"loadBalancing.localDc": "datacenter1",
"port": 9042,
"cloud.secureConnectBundle": null,
"ignoreErrors": "None",
"maxConcurrentRequests": 500,
"maxNumberOfRecordsInBatch": 32,
"queryExecutionTimeout": 30,
"connectionPoolLocalSize": 4,
"jmx": true,
"compression": "None",
"auth": {
"provider": "None",
"username": null,
"password": null,
"gssapi": {
"keyTab": null,
"principal": null,
"service": "dse"
}
},
"ssl": {
"provider": "None",
"hostnameValidation": true,
"keystore": {
"password": null,
"path": null
},
"openssl": {
"keyCertChain": null,
"privateKey": null
},
"truststore": {
"password": null,
"path": null
},
"cipherSuites": null
},
"topic": {
"pulsar-topic-ks1-table1": {
"ks1": {
"table1": {
"mapping": "name=value.name",
"consistencyLevel": "LOCAL_ONE",
"ttl": -1,
"ttlTimeUnit": "SECONDS",
"timestampTimeUnit": "MICROSECONDS",
"nullToUnset": true,
"deletesEnabled": true
}
},
"codec": {
"locale": "en_US",
"timeZone": "UTC",
"timestamp": "CQL_TIMESTAMP",
"date": "ISO_LOCAL_DATE",
"time": "ISO_LOCAL_TIME",
"unit": "MILLISECONDS"
}
}
}
}'
You need a Pulsar token for REST API authentication. This is different from your Astra DB application tokens.
-
In the Astra Portal, click Streaming tenants.
-
Click your tenant’s name, and then click the Settings tab.
-
Click Create Token.
-
Copy the token, store it securely, and then click Close.
-
Click the Connect tab, and then copy the Web Service URL.
-
Create environment variables for your tenant’s token and web service URL:
export WEB_SERVICE_URL=<replace-me> export ASTRA_STREAMING_TOKEN=<replace-me>
Refer to the complete Pulsar sinks REST API spec for all available options.
# NOTE: This is not a working example.
CREATE KEYSPACE ks1 WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'};
CREATE TABLE ks1.table1 (name text, PRIMARY KEY (name));
# create topic pulsar-topic-ks1-table1
curl -X POST "$WEB_SERVICE_URL/admin/v3/sinks/$TENANT/$NAMESPACE/$SINK_NAME?opt=poweruser" \
-H "Authorization: $ASTRA_STREAMING_TOKEN" \
-F 'sinkConfig="{
\"archive\":\"builtin:\/\/cassandra-enhanced\",
\"tenant\":\"'$TENANT'\",
\"namespace\":\"'$NAMESPACE'\",
\"name\":\"'$SINK_NAME'\",
\"inputs\":[ \"'$TENANT'\/'$NAMESPACE'\/'$INPUT_TOPIC'\" ],
\"configs\":{
...
}
}"'
{
"tenant": "string",
"namespace": "string",
"name": "string",
"className": "string",
"sourceSubscriptionName": "string",
"sourceSubscriptionPosition": "Latest",
"inputs": [
"string"
],
"topicToSerdeClassName": {
"property1": "string",
"property2": "string"
},
"topicsPattern": "string",
"topicToSchemaType": {
"property1": "string",
"property2": "string"
},
"topicToSchemaProperties": {
"property1": "string",
"property2": "string"
},
"inputSpecs": {
"property1": {
"schemaType": "string",
"serdeClassName": "string",
"schemaProperties": {
"property1": "string",
"property2": "string"
},
"consumerProperties": {
"property1": "string",
"property2": "string"
},
"receiverQueueSize": 0,
"cryptoConfig": {
"cryptoKeyReaderClassName": "string",
"cryptoKeyReaderConfig": {
"property1": {},
"property2": {}
},
"encryptionKeys": [
"string"
],
"producerCryptoFailureAction": "FAIL",
"consumerCryptoFailureAction": "FAIL"
},
"poolMessages": true,
"regexPattern": true
},
"property2": {
"schemaType": "string",
"serdeClassName": "string",
"schemaProperties": {
"property1": "string",
"property2": "string"
},
"consumerProperties": {
"property1": "string",
"property2": "string"
},
"receiverQueueSize": 0,
"cryptoConfig": {
"cryptoKeyReaderClassName": "string",
"cryptoKeyReaderConfig": {
"property1": {},
"property2": {}
},
"encryptionKeys": [
"string"
],
"producerCryptoFailureAction": "FAIL",
"consumerCryptoFailureAction": "FAIL"
},
"poolMessages": true,
"regexPattern": true
}
},
"maxMessageRetries": 0,
"deadLetterTopic": "string",
"configs": {
"property1": {},
"property2": {}
},
"secrets": {
"property1": {},
"property2": {}
},
"parallelism": 0,
"processingGuarantees": "ATLEAST_ONCE",
"retainOrdering": true,
"retainKeyOrdering": true,
"resources": {
"cpu": 0,
"ram": 0,
"disk": 0
},
"autoAck": true,
"timeoutMs": 0,
"negativeAckRedeliveryDelayMs": 0,
"sinkType": "string",
"archive": "string",
"cleanupSubscription": true,
"runtimeFlags": "string",
"customRuntimeOptions": "string",
"transformFunction": "string",
"transformFunctionClassName": "string",
"transformFunctionConfig": "string"
}
Managing the Connector
Start
-
Pulsar Admin
-
curl
Refer to the complete pulsar-admin sinks spec for all available options.
Assuming you have downloaded client.conf
to the Pulsar
folder:
# Start all instances of a connector
./bin/pulsar-admin sinks start \
--namespace "$NAMESPACE" \
--name "$SINK_NAME" \
--tenant "$TENANT"
# optionally add --instance-id to only start an individual instance
You need a Pulsar token for REST API authentication. This is different from your Astra DB application tokens.
-
In the Astra Portal, click Streaming tenants.
-
Click your tenant’s name, and then click the Settings tab.
-
Click Create Token.
-
Copy the token, store it securely, and then click Close.
-
Click the Connect tab, and then copy the Web Service URL.
-
Create environment variables for your tenant’s token and web service URL:
export WEB_SERVICE_URL=<replace-me> export ASTRA_STREAMING_TOKEN=<replace-me>
Refer to the complete Pulsar sinks REST API spec for all available options.
# Start all instances of a connector
curl -sS --fail --location --request POST ''$WEB_SERVICE_URL'/admin/v3/sinks/'$TENANT'/'$NAMESPACE'/'$SINK_NAME'/start' \
--header "Authorization: Bearer $ASTRA_STREAMING_TOKEN"
# Start an individual instance of a connector
curl -X POST "$WEB_SERVICE_URL/admin/v3/sinks/$TENANT/$NAMESPACE/$SINK_NAME/$SINK_INSTANCEID/start" \
-H "Authorization: $ASTRA_STREAMING_TOKEN"
Stop
-
Pulsar Admin
-
curl
Refer to the complete pulsar-admin sinks spec for all available options.
Assuming you have downloaded client.conf
to the Pulsar
folder:
# Stop all instances of a connector
./bin/pulsar-admin sinks stop \
--namespace "$NAMESPACE" \
--name "$SINK_NAME" \
--tenant "$TENANT"
# optionally add --instance-id to only stop an individual instance
You need a Pulsar token for REST API authentication. This is different from your Astra DB application tokens.
-
In the Astra Portal, click Streaming tenants.
-
Click your tenant’s name, and then click the Settings tab.
-
Click Create Token.
-
Copy the token, store it securely, and then click Close.
-
Click the Connect tab, and then copy the Web Service URL.
-
Create environment variables for your tenant’s token and web service URL:
export WEB_SERVICE_URL=<replace-me> export ASTRA_STREAMING_TOKEN=<replace-me>
Refer to the complete Pulsar sinks REST API spec for all available options.
# Stop all instances of a connector
curl -sS --fail --request POST ''$WEB_SERVICE_URL'/admin/v3/sinks/'$TENANT'/'$NAMESPACE'/'$SINK_NAME'/stop' \
--header "Authorization: Bearer $ASTRA_STREAMING_TOKEN"
# Stop an individual instance of a connector
curl -X POST "$WEB_SERVICE_URL/admin/v3/sinks/$TENANT/$NAMESPACE/$SINK_NAME/$SINK_INSTANCEID/stop" \
--H "Authorization: $ASTRA_STREAMING_TOKEN"
Restart
-
Pulsar Admin
-
curl
Refer to the complete pulsar-admin sinks spec for all available options.
Assuming you have downloaded client.conf
to the Pulsar
folder:
# Restart all instances of a connector
./bin/pulsar-admin sinks restart \
--namespace "$NAMESPACE" \
--name "$SINK_NAME" \
--tenant "$TENANT"
# optionally add --instance-id to only restart an individual instance
You need a Pulsar token for REST API authentication. This is different from your Astra DB application tokens.
-
In the Astra Portal, click Streaming tenants.
-
Click your tenant’s name, and then click the Settings tab.
-
Click Create Token.
-
Copy the token, store it securely, and then click Close.
-
Click the Connect tab, and then copy the Web Service URL.
-
Create environment variables for your tenant’s token and web service URL:
export WEB_SERVICE_URL=<replace-me> export ASTRA_STREAMING_TOKEN=<replace-me>
Refer to the complete Pulsar sinks REST API spec for all available options.
# Restart all instances of a connector
curl -sS --fail --request POST ''$WEB_SERVICE_URL'/admin/v3/sinks/'$TENANT'/'$NAMESPACE'/'$SINK_NAME'/restart' \
--header "Authorization: Bearer $ASTRA_STREAMING_TOKEN"
# Restart an individual instance of a connector
curl -X POST "$WEB_SERVICE_URL/admin/v3/sinks/$TENANT/$NAMESPACE/$SINK_NAME/$SINK_INSTANCEID/restart" \
-H "Authorization: $ASTRA_STREAMING_TOKEN"
Update
-
Pulsar Admin
-
curl
-
Sample Config Data
Refer to the complete pulsar-admin sinks spec for all available options.
Assuming you have downloaded client.conf
to the Pulsar
folder:
# NOTE: This is not a working example.
./bin/pulsar-admin sinks update \
--sink-type cassandra-enhanced \
--name "pulsar-sink-ks1-table1" \
--inputs "persistent://$TENANT/$NAMESPACE/pulsar-topic-ks1-table1" \
--tenant "$TENANT" \
--sink-config '{
"contactPoints": "cassandra",
"loadBalancing.localDc": "datacenter1",
"port": 9042,
"cloud.secureConnectBundle": null,
"ignoreErrors": "None",
"maxConcurrentRequests": 500,
"maxNumberOfRecordsInBatch": 32,
"queryExecutionTimeout": 30,
"connectionPoolLocalSize": 4,
"jmx": true,
"compression": "None",
"auth": {
"provider": "None",
"username": null,
"password": null,
"gssapi": {
"keyTab": null,
"principal": null,
"service": "dse"
}
},
"ssl": {
"provider": "None",
"hostnameValidation": true,
"keystore": {
"password": null,
"path": null
},
"openssl": {
"keyCertChain": null,
"privateKey": null
},
"truststore": {
"password": null,
"path": null
},
"cipherSuites": null
},
"topic": {
"pulsar-topic-ks1-table1": {
"ks1": {
"table1": {
"mapping": "name=value.name",
"consistencyLevel": "LOCAL_ONE",
"ttl": -1,
"ttlTimeUnit": "SECONDS",
"timestampTimeUnit": "MICROSECONDS",
"nullToUnset": true,
"deletesEnabled": true
}
},
"codec": {
"locale": "en_US",
"timeZone": "UTC",
"timestamp": "CQL_TIMESTAMP",
"date": "ISO_LOCAL_DATE",
"time": "ISO_LOCAL_TIME",
"unit": "MILLISECONDS"
}
}
}
}'
You need a Pulsar token for REST API authentication. This is different from your Astra DB application tokens.
-
In the Astra Portal, click Streaming tenants.
-
Click your tenant’s name, and then click the Settings tab.
-
Click Create Token.
-
Copy the token, store it securely, and then click Close.
-
Click the Connect tab, and then copy the Web Service URL.
-
Create environment variables for your tenant’s token and web service URL:
export WEB_SERVICE_URL=<replace-me> export ASTRA_STREAMING_TOKEN=<replace-me>
Refer to the complete Pulsar sinks REST API spec for all available options.
curl -X PUT "$WEB_SERVICE_URL/admin/v3/sinks/$TENANT/$NAMESPACE/$SINK_NAME?opt=poweruser" \
-H "Authorization: $ASTRA_STREAMING_TOKEN" \
-F 'sinkConfig="{
\"archive\":\"builtin:\/\/cassandra-enhanced\",
\"tenant\":\"'$TENANT'\",
\"namespace\":\"'$NAMESPACE'\",
\"name\":\"'$SINK_NAME'\",
\"inputs\":[ \"'$TENANT'\/'$NAMESPACE'\/'$INPUT_TOPIC'\" ],
\"configs\":{
...
}
}"'
{
"tenant": "string",
"namespace": "string",
"name": "string",
"className": "string",
"sourceSubscriptionName": "string",
"sourceSubscriptionPosition": "Latest",
"inputs": [
"string"
],
"topicToSerdeClassName": {
"property1": "string",
"property2": "string"
},
"topicsPattern": "string",
"topicToSchemaType": {
"property1": "string",
"property2": "string"
},
"topicToSchemaProperties": {
"property1": "string",
"property2": "string"
},
"inputSpecs": {
"property1": {
"schemaType": "string",
"serdeClassName": "string",
"schemaProperties": {
"property1": "string",
"property2": "string"
},
"consumerProperties": {
"property1": "string",
"property2": "string"
},
"receiverQueueSize": 0,
"cryptoConfig": {
"cryptoKeyReaderClassName": "string",
"cryptoKeyReaderConfig": {
"property1": {},
"property2": {}
},
"encryptionKeys": [
"string"
],
"producerCryptoFailureAction": "FAIL",
"consumerCryptoFailureAction": "FAIL"
},
"poolMessages": true,
"regexPattern": true
},
"property2": {
"schemaType": "string",
"serdeClassName": "string",
"schemaProperties": {
"property1": "string",
"property2": "string"
},
"consumerProperties": {
"property1": "string",
"property2": "string"
},
"receiverQueueSize": 0,
"cryptoConfig": {
"cryptoKeyReaderClassName": "string",
"cryptoKeyReaderConfig": {
"property1": {},
"property2": {}
},
"encryptionKeys": [
"string"
],
"producerCryptoFailureAction": "FAIL",
"consumerCryptoFailureAction": "FAIL"
},
"poolMessages": true,
"regexPattern": true
}
},
"maxMessageRetries": 0,
"deadLetterTopic": "string",
"configs": {
"property1": {},
"property2": {}
},
"secrets": {
"property1": {},
"property2": {}
},
"parallelism": 0,
"processingGuarantees": "ATLEAST_ONCE",
"retainOrdering": true,
"retainKeyOrdering": true,
"resources": {
"cpu": 0,
"ram": 0,
"disk": 0
},
"autoAck": true,
"timeoutMs": 0,
"negativeAckRedeliveryDelayMs": 0,
"sinkType": "string",
"archive": "string",
"cleanupSubscription": true,
"runtimeFlags": "string",
"customRuntimeOptions": "string",
"transformFunction": "string",
"transformFunctionClassName": "string",
"transformFunctionConfig": "string"
}
Delete
-
Pulsar Admin
-
curl
Refer to the complete pulsar-admin sinks spec for all available options.
Assuming you have downloaded client.conf
to the Pulsar
folder:
# Delete all instances of a connector
./bin/pulsar-admin sinks delete \
--namespace "$NAMESPACE" \
--name "$SINK_NAME" \
--tenant "$TENANT"
You need a Pulsar token for REST API authentication. This is different from your Astra DB application tokens.
-
In the Astra Portal, click Streaming tenants.
-
Click your tenant’s name, and then click the Settings tab.
-
Click Create Token.
-
Copy the token, store it securely, and then click Close.
-
Click the Connect tab, and then copy the Web Service URL.
-
Create environment variables for your tenant’s token and web service URL:
export WEB_SERVICE_URL=<replace-me> export ASTRA_STREAMING_TOKEN=<replace-me>
Refer to the complete Pulsar sinks REST API spec for all available options.
# Delete all instances of a connector
curl -sS --fail --location --request DELETE ''$WEB_SERVICE_URL'/admin/v3/sinks/'$TENANT'/'$NAMESPACE'/'$SINK_NAME'' \
--header "Authorization: Bearer $ASTRA_STREAMING_TOKEN"
Monitoring the Connector
Info
-
Pulsar Admin
-
curl
Refer to the complete pulsar-admin sinks spec for all available options.
Assuming you have downloaded client.conf
to the Pulsar
folder:
# Get information about a connector
./bin/pulsar-admin sinks get \
--namespace "$NAMESPACE" \
--name "$SINK_NAME" \
--tenant "$TENANT"
You need a Pulsar token for REST API authentication. This is different from your Astra DB application tokens.
-
In the Astra Portal, click Streaming tenants.
-
Click your tenant’s name, and then click the Settings tab.
-
Click Create Token.
-
Copy the token, store it securely, and then click Close.
-
Click the Connect tab, and then copy the Web Service URL.
-
Create environment variables for your tenant’s token and web service URL:
export WEB_SERVICE_URL=<replace-me> export ASTRA_STREAMING_TOKEN=<replace-me>
Refer to the complete Pulsar sinks REST API spec for all available options.
# Get information about a connector
curl -sS --fail --location ''$WEB_SERVICE_URL'/admin/v3/sinks/'$TENANT'/'$NAMESPACE'/'$SINK_NAME'' \
--header "Authorization: Bearer $ASTRA_STREAMING_TOKEN"
+ .Result
Details
{
"tenant": "string",
"namespace": "string",
"name": "string",
"className": "string",
"sourceSubscriptionName": "string",
"sourceSubscriptionPosition": "Latest",
"inputs": [
"string"
],
"topicToSerdeClassName": {
"property1": "string",
"property2": "string"
},
"topicsPattern": "string",
"topicToSchemaType": {
"property1": "string",
"property2": "string"
},
"topicToSchemaProperties": {
"property1": "string",
"property2": "string"
},
"inputSpecs": {
"property1": {
"schemaType": "string",
"serdeClassName": "string",
"schemaProperties": {
"property1": "string",
"property2": "string"
},
"consumerProperties": {
"property1": "string",
"property2": "string"
},
"receiverQueueSize": 0,
"cryptoConfig": {
"cryptoKeyReaderClassName": "string",
"cryptoKeyReaderConfig": {
"property1": {},
"property2": {}
},
"encryptionKeys": [
"string"
],
"producerCryptoFailureAction": "FAIL",
"consumerCryptoFailureAction": "FAIL"
},
"poolMessages": true,
"regexPattern": true
},
"property2": {
"schemaType": "string",
"serdeClassName": "string",
"schemaProperties": {
"property1": "string",
"property2": "string"
},
"consumerProperties": {
"property1": "string",
"property2": "string"
},
"receiverQueueSize": 0,
"cryptoConfig": {
"cryptoKeyReaderClassName": "string",
"cryptoKeyReaderConfig": {
"property1": {},
"property2": {}
},
"encryptionKeys": [
"string"
],
"producerCryptoFailureAction": "FAIL",
"consumerCryptoFailureAction": "FAIL"
},
"poolMessages": true,
"regexPattern": true
}
},
"maxMessageRetries": 0,
"deadLetterTopic": "string",
"configs": {
"property1": {},
"property2": {}
},
"secrets": {
"property1": {},
"property2": {}
},
"parallelism": 0,
"processingGuarantees": "ATLEAST_ONCE",
"retainOrdering": true,
"retainKeyOrdering": true,
"resources": {
"cpu": 0,
"ram": 0,
"disk": 0
},
"autoAck": true,
"timeoutMs": 0,
"negativeAckRedeliveryDelayMs": 0,
"sinkType": "string",
"archive": "string",
"cleanupSubscription": true,
"runtimeFlags": "string",
"customRuntimeOptions": "string",
"transformFunction": "string",
"transformFunctionClassName": "string",
"transformFunctionConfig": "string"
}
Health
-
Pulsar Admin
-
curl
Refer to the complete pulsar-admin sinks spec for all available options.
Assuming you have downloaded client.conf
to the Pulsar
folder:
# Check connector status
./bin/pulsar-admin sinks status \
--instance-id "$SINK_INSTANCEID" \
--namespace "$NAMESPACE" \
--name "$SINK_NAME" \
--tenant "$TENANT"
You need a Pulsar token for REST API authentication. This is different from your Astra DB application tokens.
-
In the Astra Portal, click Streaming tenants.
-
Click your tenant’s name, and then click the Settings tab.
-
Click Create Token.
-
Copy the token, store it securely, and then click Close.
-
Click the Connect tab, and then copy the Web Service URL.
-
Create environment variables for your tenant’s token and web service URL:
export WEB_SERVICE_URL=<replace-me> export ASTRA_STREAMING_TOKEN=<replace-me>
Refer to the complete Pulsar sinks REST API spec for all available options.
# Get the status of all connector instances
curl -sS --fail --location ''$WEB_SERVICE_URL'/admin/v3/sinks/'$TENANT'/'$NAMESPACE'/'$SINK_NAME'/status' \
--header "Authorization: Bearer $ASTRA_STREAMING_TOKEN"
# Get the status of an individual connector instance
curl "$WEB_SERVICE_URL/admin/v3/sinks/$TENANT/$NAMESPACE/$SINK_NAME/$SINK_INSTANCEID/status" \
-H "accept: application/json" \
-H "Authorization: $ASTRA_STREAMING_TOKEN"
Result
Status response for all connector instances:
{
"numInstances": 0,
"numRunning": 0,
"instances": [
{
"instanceId": 0,
"status": {
"running": true,
"error": "string",
"numRestarts": 0,
"numReadFromPulsar": 0,
"numSystemExceptions": 0,
"latestSystemExceptions": [
{
"exceptionString": "string",
"timestampMs": 0
}
],
"numSinkExceptions": 0,
"latestSinkExceptions": [
{
"exceptionString": "string",
"timestampMs": 0
}
],
"numWrittenToSink": 0,
"lastReceivedTime": 0,
"workerId": "string"
}
}
]
}
Status response for individual connector instance:
{
"running": true,
"error": "string",
"numRestarts": 0,
"numReadFromPulsar": 0,
"numSystemExceptions": 0,
"latestSystemExceptions": [
{
"exceptionString": "string",
"timestampMs": 0
}
],
"numSinkExceptions": 0,
"latestSinkExceptions": [
{
"exceptionString": "string",
"timestampMs": 0
}
],
"numWrittenToSink": 0,
"lastReceivedTime": 0,
"workerId": "string"
}
Metrics
Astra Streaming exposes Prometheus formatted metrics for every connector. Refer to the scrape metrics with Prometheus page for more detail.
Connector Reference
There are two sets of parameters that support sink connectors.
Pulsar sink connector parameters
Name | Required | Default | Description |
---|---|---|---|
archive |
true |
The connector type, like 'builtin://elastic_search' |
|
autoAck |
true |
false |
Boolean denotes whether or not the framework will automatically acknowledge messages |
className |
true |
The connector type’s class reference, like 'org.apache.pulsar.io.debezium.mysql.DebeziumMysqlSource' |
|
cleanupSubscription |
false |
false |
Boolean denotes whether the subscriptions the functions created/used should be deleted when the functions is deleted |
configs |
false |
{} |
A key/value map of config properties specific to the type of connector. See the reference table below for values. |
customRuntimeOptions |
false |
A string that encodes options to customize the runtime, see Apache Pulsar docs for configured runtime for details |
|
deadLetterTopic |
false |
Name of the dead topic where the failing messages will be sent |
|
inputSpecs |
false |
The map of input topics to its consumer configuration, each configuration has schema of {"schemaType": "type-x", "serdeClassName": "name-x", "isRegexPattern": true, "receiverQueueSize": 5} |
|
inputs |
true |
[] |
The input topic or topics of the Sink (specified as a JSON array) |
maxMessageRetries |
false |
Maximum number of times that a message will be redelivered before being sent to the dead letter queue |
|
name |
true |
Give your sink a good name for later reference. The name must start with a lowercase alphabetic character. It can only contain lowercase alphanumeric characters, and hyphens(kebab-case). |
|
namespace |
true |
The namespace you’d like the sink created under |
|
negativeAckRedeliveryDelayMs |
false |
The negative ack message redelivery delay in milliseconds |
|
parallelism |
true |
1 |
The number of sink instances to run |
processingGuarantees |
true |
ATLEAST_ONCE |
The delivery semantics applied to the Pulsar Sink. Values are 'ATLEAST_ONCE', 'ATMOST_ONCE', 'EFFECTIVELY_ONCE' |
resources |
false |
The compute resources that need to be allocated per instance (applicable only to the process)(as a JSON string). Example: {"cpu": 0.25,"disk":1000000000,"ram":500000000} |
|
retainKeyOrdering |
false |
true |
Sink consumes and processes messages in key order |
retainOrdering |
false |
false |
Boolean denotes whether the Pulsar Sink consumes and processes messages in order |
runtimeFlags |
false |
A string that encodes options to customize the runtime, see Apache Pulsar docs for configured runtime for details |
|
secrets |
false |
This is a map of secretName(that is how the secret is going to be accessed in the function via context) to an object that encapsulates how the secret is fetched by the underlying secrets provider. The type of an value here can be found by the SecretProviderConfigurator.getSecretObjectType() method |
|
sourceSubscriptionName |
false |
Pulsar source subscription name if user wants a specific subscription-name for input-topic consumer |
|
sourceSubscriptionPosition |
false |
Earliest |
The position to begin reading from the source |
tenant |
true |
The tenant you’d like the sink created under |
|
timeoutMs |
false |
5000 |
Denotes the message timeout in milliseconds |
topicToSchemaProperties |
false |
||
topicToSchemaType |
false |
The map of input topics to Schema types or class names (specified as a JSON object) |
|
topicToSerdeClassName |
false |
The map of input topics to SerDe class names (specified as a JSON object) |
|
topicsPattern |
false |
TopicsPattern to consume from list of topics under a namespace that match the pattern. [input] and [topicsPattern] are mutually exclusive. Add SerDe class name for a pattern in customSerdeInputs (supported for java fun only) |
Cassandra Connection
These values are provided in the Configs
area:
Name | Required | Default | Description |
---|---|---|---|
auth |
yes |
|
Refer to the auth properties reference |
cloud.secureConnectBundle |
yes |
Can either be a path to your database’s Secure Connect Bundle (SCB) zip or a base64 encoding of the zip provided it in the format |
|
compression |
yes |
None |
|
connectionPoolLocalSize |
yes |
4 |
|
ignoreErrors |
yes |
None |
|
jmx |
yes |
true |
|
maxConcurrentRequests |
yes |
500 |
|
maxNumberOfRecordsInBatch |
yes |
32 |
|
queryExecutionTimeout |
yes |
30 |
|
task.max |
yes |
1 |
|
tasks.max |
yes |
1 |
|
topic |
yes |
|
Refer to the topic properties reference |
topics |
yes |
The topic name to watch |
Auth Properties
These values are provided in the auth
area of the preceding Cassandra connection parameters:
Name | Required | Default |
---|---|---|
gssapi |
yes |
|
password |
yes |
|
provider |
yes |
None |
username |
yes |
|
Topic Properties
These values are provided in the topic
area of the preceding Cassandra connection parameters.
Refer to the official documentation for a connection properties reference.
Mapping topic data to table columns
An essential part of using this sink connector is mapping message values to table columns. There are many factors that influence how this done and what is possible.
While the preceding examples showed how to configure the connector in one large command, it is easier to manage this as a separate file. The following steps explain how to configure the connector using a JSON configuration file with the minimum required values.
For a more detailed example of this pattern, see the Pulsar Connector single instance quickstart.
-
Create a file named
configs.json
with the following content:"archive": "builtin://cassandra-enhanced", "tenant": "TENANT_NAME", "namespace": "NAMESPACE_NAME", "name": "CONNECTOR_NAME", "inputs": ["TOPIC_NAME"], "configs": { "topics": "TOPIC_NAME", "cloud.secureConnectBundle": "SCB", "topic": { "TOPIC_NAME": { "KEYSPACE_NAME": { "TABLE_NAME": { CONNECTION_PROPERTIES, "mapping": "MAPPING_STRING" } } } } }
Replace the following:
-
TENANT_NAME: Your tenant’s name.
-
NAMESPACE_NAME: The tenant’s associated namespace name.
-
CONNECTOR_NAME: The name of the connector.
-
TOPIC_NAME: In
inputs
andconfigs.topics
, specify the names of the topics to connect. Specify topic names only; don’t use the full topic addresses. You can specify multiple topics. Define oneconfigs.topic
object for each topic that you want to connect. -
SCB: The path to your database’s Secure Connect Bundle (SCB) zip or a base64 encoding of the SCB zip.
-
KEYSPACE_NAME: The name of the keyspace in your database that contains the table you want to connect to a topic.
-
TABLE_NAME: The name of the table to connect to a topic.
-
TABLE_CONNECTION_PROPERTIES: Additional topic-to-table connection properties, if required. For more information, see Pulsar topic-to-table settings.
-
MAPPING_STRING: The mapping string for the table columns as a comma-separated list of column names and message value fields. For example:
symbol=value.symbol, ts=value.ts, exchange=value.exchange, industry=value.industry, name=key, value=value.value
For more mapping examples, see Mapping pulsar topics to database tables.
-
-
Use the
pulsar-admin
CLI to create the connector with your JSON file:
./bin/pulsar-admin sinks create \
--name dse-sink-kv \
--classname com.datastax.oss.sink.pulsar.StringCassandraSinkTask \
--sink-config-file configs.json \
--sink-type cassandra-enhanced