API operations
This Astra Streaming workbook is a comprehensive guide that provides detailed examples and practices for managing the Astra Streaming platform using the DevOps APIs. It provides details on the most commonly used APIs for managing Astra Streaming and Pulsar instances. These details include required parameters and the expected output from the API. The workbook is designed to fill the gap between detailed API reference docs and HowTo guides. The result is to help customers in operating and managing Astra Streaming and provide guidance on how to use DevOps API to automate many common tasks.
The workbook covers a wide range of topics, including provisioning of resources, monitoring, and troubleshooting. It provides instructions for various operations, such as creating a new tenant, namespace, topics, geo-replication, and access tokens, to setting up monitoring and alerting, and troubleshooting common issues.
Overall, this Astra Streaming Workbook is a valuable resource for customers who want to leverage the benefits of Astra Streaming and manage their streaming environment effectively with DevOps APIs. By following the best practices and guidelines outlined in the workbook, customers can ensure that their streaming applications are secure, performant, and reliable.
Prerequisites
-
An active Astra DB account with access to Astra Streaming
-
Credentials and values required to form HTTP requests
Set environment variables
Due to their frequency in Astra Streaming API calls, you might find it helpful to set the following environment variables:
export PULSAR_TOKEN="PULSAR_TOKEN"
export WEB_SERVICE_URL="PULSAR_WEB_SERVICE_URL"
export NAMESPACE="STREAMING_NAMESPACE_NAME"
export TENANT="STREAMING_TENANT_NAME"
export TOPIC="STREAMING_TOPIC_NAME"
export NUM_OF_PARTITIONS="PARTITIONS"
export SUBSCRIPTION="SUBSCRIPTION_NAME"
export INSTANCE="TENANT_INSTANCE_NAME"
export SOURCE="SOURCE_CONNECTOR_NAME"
export SINK="SINK_NAME"
export FUNCTION="FUNCTION_DISPLAY_NAME"
export TOKENID="PULSAR_TOKEN_UUID"
export ASTRA_TOKEN="ASTRA_DB_APPLICATION_TOKEN"
The examples in this guide use environment variables for these values.
Additionally, some examples in this guide use python3 -mjson.tool
to format the JSON response.
This is optional; it is not required to execute API requests.
Astra Streaming DevOps API tenant operations
The following examples demonstrate how to use the Astra Streaming DevOps API to manage Astra Streaming resources.
Get tenant information
curl --location --request GET 'https://api.astra.datastax.com/v2/streaming/tenants' --header "Authorization: Bearer $ASTRA_TOKEN" | python3 -mjson.tool
Result
[
{
"id": "14b77c47-bdfd-4ba1. . .",
"tenantName": "mytenant",
"clusterName": "pulsar-aws-useast2",
"webServiceUrl": "https://pulsar-aws-useast2",
"brokerServiceUrl": "pulsar+ssl://pulsar-aws-useast2:6651",
"websocketUrl": "wss://pulsar-aws-useast2:8001/ws/v2",
"websocketQueryParamUrl": "wss://pulsar-aws-useast2:8964/ws/v2",
"pulsarToken": "eyJhbGciOiJSUzI1NiIsInR5cCI6IkpX…..",
"plan": "payg",
"planCode": "1",
"astraOrgGUID": "b282a256-b129-......",
"cloudProvider": "aws",
"cloudProviderCode": "1",
"cloudRegion": "useast2",
"status": "active",
"jvmVersion": "JDK11",
"pulsarVersion": "2.10.2",
"regionZone": "na",
"Email": "",
"userMetricsUrl": "https://prometheus-aws-useast2….",
"pulsarInstance": "prod0"
},
{
"id": "e8bf25d8-a6a1-4169-. . .",
"tenantName": "mytenant2",
"clusterName": "pulsar-gcp-useast1",
"webServiceUrl": "https://pulsar-gcp-useast1",
"brokerServiceUrl": "pulsar+ssl://pulsar-gcp-useast1:6651",
"websocketUrl": "wss://pulsar-gcp-useast1m:8001/ws/v2",
"websocketQueryParamUrl": "wss://pulsar-gcp-useast1:8964/ws/v2",
"pulsarToken": "eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.ey. . .",
"plan": "payg",
"planCode": "1",
"astraOrgGUID": "b282a256-b129-43e9. . .",
"cloudProvider": "gcp",
"cloudProviderCode": "2",
"cloudRegion": "useast1",
"status": "active",
"jvmVersion": "JDK11",
"pulsarVersion": "2.10.2",
"regionZone": "na",
"Email": "",
"userMetricsUrl": "https://prometheus-gcp-useast1. . .",
"pulsarInstance": "prod0"
}
]
Get Astra Streaming cloud providers and regions
curl --location --request GET 'https://api.astra.datastax.com/v2/streaming/providers' --header "Authorization: Bearer $ASTRA_TOKEN" | python3 -mjson.tool
Result
{
"aws": [
"useast1",
"uswest2",
"useast2"
],
"azure": [
"westus2",
"eastus",
"australiaeast"
],
"gcp": [
"useast1",
"uscentral1",
"australiase1",
"europewest1",
"useast4"
]
}
Create a tenant
Create a tenant:
curl --location --request POST 'https://api.astra.datastax.com/v2/streaming/tenants' --header 'Content-Type: application/json' --header "Authorization: Bearer $ASTRA_TOKEN" --data-raw '{
"cloudProvider": "aws",
"cloudRegion": "useast2",
"tenantName": "mytenant",
"userEmail": "joshua@example.com"
}' | python3 -mjson.tool
Create a tenant with file input:
curl --fail --location --request POST 'https://api.astra.datastax.com/v2/streaming/tenants' --header 'Content-Type: application/json' --header "Authorization: Bearer $ASTRA_TOKEN" --data "@mytenant-config.json" | python3 -mjson.tool
Result
The output includes the "pulsarToken" which is the JWT for this Pulsar instance.
{
"namespace": "default",
"topic": "",
"id": "",
"tenantName": "mytenant",
"clusterName": "pulsar-aws-useast2",
"webServiceUrl": "https://pulsar-aws-useast2",
"brokerServiceUrl": "pulsar+ssl://pulsar-aws-useast2:6651",
"websocketUrl": "wss://pulsar-aws-useast2:8001/ws/v2",
"websocketQueryParamUrl": "wss://pulsar-aws-useast2:8964/ws/v2",
"pulsarToken": "eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9. . .",
"plan": "payg",
"planCode": "",
"astraOrgGUID": "b282a256-b129-43e9. . .",
"cloudProvider": "aws",
"cloudProviderCode": "",
"cloudRegion": "useast2",
"status": "active",
"jvmVersion": "JDK11",
"pulsarVersion": "2.10.2",
"regionZone": "",
"Email": "",
"userMetricsUrl": "",
"pulsarInstance": ""
}
Delete a tenant
curl --location --request DELETE 'https://api.astra.datastax.com/v2/streaming/tenants/{tenant}/clusters/{cluster}' --header 'Content-Type: application/json' --header "Authorization: Bearer $ASTRA_TOKEN"
No response indicates success.
Puslar API namespace operations
To manage Astra Streaming namespaces, use the Pulsar REST APIs.
Get existing namespaces
curl --location --request GET “https://$WEB_SERVICE_URL/admin/v2/namespaces/$TENANT" --header "Authorization: Bearer $PULSAR_TOKEN" | python3 -mjson.tool
Result
[
"mytenant/default",
"mytenant/mynamespace"
]
Create a namespace
curl -sS --fail --location --request PUT --header "Authorization: Bearer $PULSAR_TOKEN" "$WEB_SERVICE_URL/admin/v2/namespaces/$TENANT/$NAMESPACE"
Result
Output: No reply means successful.
Delete a namespace
curl -sS --fail --location --request DELETE --header "Authorization: Bearer $PULSAR_TOKEN" "$WEB_SERVICE_URL/admin/v2/namespaces/$TENANT/$NAMESPACE"
No response indicates success.
Get namespace message retention
curl -sS --fail --location --request GET "$WEB_SERVICE_URL/admin/v2/namespaces/$TENANT/$NAMESPACE/retention" --header "Authorization: Bearer $PULSAR_TOKEN" | python3 -mjson.tool
Result
{
"retentionTimeInMinutes": 0,
"retentionSizeInMB": 0
}
Set namespace message retention
curl --location "$WEB_SERVICE_URL/admin/v2/namespaces/$TENANT/$NAMESPACE/retention" --header 'Content-Type: application/json' --header "Authorization: Bearer $PULSAR_TOKEN" --data '{
"retentionTimeInMinutes": 360,
"retentionSizeInMB": 102
}'
No response indicates success.
Get namespace backlog quota
curl -sS --fail --location --request GET "$WEB_SERVICE_URL/admin/v2/namespaces/$TENANT/$NAMESPACE/backlogQuotaMap" --header "Authorization: Bearer $PULSAR_TOKEN" | python3 -mjson.tool
Result
{
"destination_storage": {
"limit": -1,
"limitSize": 102400,
"limitTime": 3600,
"policy": "producer_exception"
}
}
Set namespace backlog quota settings
curl -sS --fail --location --request POST "$WEB_SERVICE_URL/admin/v2/namespaces/$TENANT/$NAMESPACE/backlogQuota" --header "Authorization: Bearer $PULSAR_TOKEN" --header 'Content-Type: application/json' --data '{
"limit": -1,
"limitSize": 102400,
"limitTime": 3600,
"policy": "producer_exception"
}'
No response indicates success.
Get namespace message TTL
curl -sS --fail --location --request GET "$WEB_SERVICE_URL/admin/v2/namespaces/$TENANT/$NAMESPACE/messageTTL" --header "Authorization: Bearer $PULSAR_TOKEN" | python3 -mjson.tool
The response is a number, such as 3600
.
Set namespace message TTL
curl -sS --fail --location --request POST "$WEB_SERVICE_URL/admin/v2/namespaces/$TENANT/$NAMESPACE/messageTTL" --header "Authorization: Bearer $PULSAR_TOKEN" --header 'Content-Type: application/json' --data 3600
No response indicates success.
Set namespace AutoTopicCreation
Input parameter “topicType" should be either “non-partitioned" or “partitioned".
curl -sS --fail --location --request POST --header "Authorization: Bearer $PULSAR_TOKEN" "$WEB_SERVICE_URL/admin/v2/namespaces/$TENANT/$NAMESPACE/autoTopicCreation" --header 'Content-Type: application/json' --data '{
"allowAutoTopicCreation": false,
"topicType": "non-partitioned"
}'
No response indicates success.
Get namespace MaxConsumersPerTopic
curl -sS --fail --location --request GET "$WEB_SERVICE_URL/admin/v2/namespaces/$TENANT/$NAMESPACE/maxConsumersPerTopic" --header "Authorization: Bearer $PULSAR_TOKEN" | python3 -mjson.tool
The response is a number, such as 50
.
Set namespace MaxConsumersPerTopic
curl -sS --fail --location --request POST "$WEB_SERVICE_URL/admin/v2/namespaces/$TENANT/$NAMESPACE/maxConsumersPerTopic" --header "Authorization: Bearer $PULSAR_TOKEN" --header 'Content-Type: application/json' --data 100
Get namespace MaxTopicPerNamespace
curl -sS --fail --location --request GET "$WEB_SERVICE_URL/admin/v2/namespaces/$TENANT/$NAMESPACE/maxTopicsPerNamespace" --header "Authorization: Bearer $PULSAR_TOKEN" | python3 -mjson.tool
The response is a number.
Set namespace MaxTopicPerNamespace
curl -sS --fail --location "$WEB_SERVICE_URL/admin/v2/namespaces/$TENANT/$NAMESPACE/maxTopicsPerNamespace" --header 'Content-Type: application/json' --header "Authorization: Bearer $PULSAR_TOKEN" --data 1000
The response is a number.
Pulsar Admin API topic operations
Get topics in a namespace
curl -sS --fail --location --request GET "$WEB_SERVICE_URL/admin/v2/persistent/$TENANT/$NAMESPACE" --header "Authorization: Bearer $PULSAR_TOKEN" | python3 -mjson.tool
Result
[
"persistent://testtenant/ns0/mytopic-partition-0",
"persistent://testtenant/ns0/mytopic-partition-1",
"persistent://testtenant/ns0/topic1",
"persistent://testtenant/ns0/topic2",
"persistent://testtenant/ns0/tp1-partition-0",
"persistent://testtenant/ns0/tp1-partition-1",
"persistent://testtenant/ns0/tp1-partition-2",
"persistent://testtenant/ns0/tp1-partition-3"
]
Create non-partitioned topic
curl -sS --fail --location --request PUT "$WEB_SERVICE_URL/admin/v2/persistent/$TENANT/$NAMESPACE
/$TOPIC" --header "Authorization: Bearer $PULSAR_TOKEN"
No response indicates success.
Create partitioned topic
curl -sS --fail --location --request PUT "$WEB_SERVICE_URL/admin/v2/persistent/$TENANT/$NAMESPACE/$TOPIC/partitions" --header "Authorization: Bearer $PULSAR_TOKEN" --header "Content-Type: application/json" --data $NUM_OF_PARTITIONS
No response indicates success.
Delete a persistent topic
curl -sS --fail --location --request DELETE"$WEB_SERVICE_URL/admin/v2/persistent/$TENANT/$NAMESPACE/$TOPIC/partitions" --header "Authorization: Bearer $PULSAR_TOKEN"
No response indicates success.
Get InternalStats of non-partitioned topic
curl -sS --fail --location --request GET "$WEB_SERVICE_URL/admin/v2/persistent/$TENANT/$NAMESPACE/$TOPIC/internalStats" --header "Authorization: Bearer $PULSAR_TOKEN" | python3 -mjson.tool
Result
{
"entriesAddedCounter": 0,
"numberOfEntries": 0,
"totalSize": 0,
"currentLedgerEntries": 0,
"currentLedgerSize": 0,
"lastLedgerCreatedTimestamp": "2023-04-25T15:35:45.136Z",
"waitingCursorsCount": 0,
"pendingAddEntriesCount": 0,
"lastConfirmedEntry": "275812:-1",
"state": "LedgerOpened",
"ledgers": [
{
"ledgerId": 275812,
"entries": 0,
"size": 0,
"offloaded": false,
"underReplicated": false
}
],
"cursors": {},
"schemaLedgers": [],
"compactedLedger": {
"ledgerId": -1,
"entries": -1,
"size": -1,
"offloaded": false,
"underReplicated": false
}
}
Get stats of a non-partitioned topic
curl -sS --fail --location --request GET "$WEB_SERVICE_URL/admin/v2/persistent/$TENANT/$NAMESPACE/$TOPIC/stats" --header "Authorization: Bearer $PULSAR_TOKEN" | python3 -mjson.tool
Result
{
"msgRateIn": 0.0,
"msgThroughputIn": 0.0,
"msgRateOut": 0.0,
"msgThroughputOut": 0.0,
"bytesInCounter": 0,
"msgInCounter": 0,
"bytesOutCounter": 0,
"msgOutCounter": 0,
"averageMsgSize": 0.0,
"msgChunkPublished": false,
"storageSize": 0,
"backlogSize": 0,
"publishRateLimitedTimes": 0,
"earliestMsgPublishTimeInBacklogs": 0,
"offloadedStorageSize": 0,
"lastOffloadLedgerId": 0,
"lastOffloadSuccessTimeStamp": 0,
"lastOffloadFailureTimeStamp": 0,
"publishers": [],
"waitingPublishers": 0,
"subscriptions": {},
"replication": {},
"deduplicationStatus": "Disabled",
"nonContiguousDeletedMessagesRanges": 0,
"nonContiguousDeletedMessagesRangesSerializedSize": 0,
"compaction": {
"lastCompactionRemovedEventCount": 0,
"lastCompactionSucceedTimestamp": 0,
"lastCompactionFailedTimestamp": 0,
"lastCompactionDurationTimeInMills": 0
}
...TRUNCATED FOR READABILITY...
}
Get stats of a partitioned topic
curl -sS --fail --location --request GET "$WEB_SERVICE_URL/admin/v2/persistent/$TENANT/$NAMESPACE/$TOPIC/partitioned-stats" --header "Authorization: Bearer $PULSAR_TOKEN" | python3 -mjson.tool
Result
{
"msgRateIn": 0.0,
"msgThroughputIn": 0.0,
"msgRateOut": 0.0,
"msgThroughputOut": 0.0,
"bytesInCounter": 0,
"msgInCounter": 0,
"bytesOutCounter": 0,
"msgOutCounter": 0,
"averageMsgSize": 0.0,
"msgChunkPublished": false,
"storageSize": 0,
"backlogSize": 0,
"publishRateLimitedTimes": 0,
"earliestMsgPublishTimeInBacklogs": 0,
"offloadedStorageSize": 0,
"lastOffloadLedgerId": 0,
"lastOffloadSuccessTimeStamp": 0,
"lastOffloadFailureTimeStamp": 0,
"publishers": [],
"waitingPublishers": 0,
"subscriptions": {},
"replication": {},
"nonContiguousDeletedMessagesRanges": 0,
"nonContiguousDeletedMessagesRangesSerializedSize": 0,
"compaction": {
"lastCompactionRemovedEventCount": 0,
"lastCompactionSucceedTimestamp": 0,
"lastCompactionFailedTimestamp": 0,
"lastCompactionDurationTimeInMills": 0
},
"metadata": {
"partitions": 2,
"deleted": false
},
"partitions": {
"persistent://testcreate/ns0/mytopic-partition-1": {
"msgRateIn": 0.0,
"msgThroughputIn": 0.0,
"msgRateOut": 0.0,
"msgThroughputOut": 0.0,
"bytesInCounter": 0,
"msgInCounter": 0,
"bytesOutCounter": 0,
"msgOutCounter": 0,
"averageMsgSize": 0.0,
"msgChunkPublished": false,
"storageSize": 0,
"backlogSize": 0,
"publishRateLimitedTimes": 0,
"earliestMsgPublishTimeInBacklogs": 0,
"offloadedStorageSize": 0,
"lastOffloadLedgerId": 0,
"lastOffloadSuccessTimeStamp": 0,
"lastOffloadFailureTimeStamp": 0,
"publishers": [],
"waitingPublishers": 0,
"subscriptions": {},
"replication": {},
"deduplicationStatus": "Disabled",
"nonContiguousDeletedMessagesRanges": 0,
"nonContiguousDeletedMessagesRangesSerializedSize": 0,
"compaction": {
"lastCompactionRemovedEventCount": 0,
"lastCompactionSucceedTimestamp": 0,
"lastCompactionFailedTimestamp": 0,
"lastCompactionDurationTimeInMills": 0
}
},
"persistent://testcreate/ns0/mytopic-partition-0": {
"msgRateIn": 0.0,
"msgThroughputIn": 0.0,
"msgRateOut": 0.0,
"msgThroughputOut": 0.0,
"bytesInCounter": 0,
"msgInCounter": 0,
"bytesOutCounter": 0,
"msgOutCounter": 0,
"averageMsgSize": 0.0,
"msgChunkPublished": false,
"storageSize": 0,
"backlogSize": 0,
"publishRateLimitedTimes": 0,
"earliestMsgPublishTimeInBacklogs": 0,
"offloadedStorageSize": 0,
"lastOffloadLedgerId": 0,
"lastOffloadSuccessTimeStamp": 0,
"lastOffloadFailureTimeStamp": 0,
"publishers": [],
"waitingPublishers": 0,
"subscriptions": {},
"replication": {},
"deduplicationStatus": "Disabled",
"nonContiguousDeletedMessagesRanges": 0,
"nonContiguousDeletedMessagesRangesSerializedSize": 0,
"compaction": {
"lastCompactionRemovedEventCount": 0,
"lastCompactionSucceedTimestamp": 0,
"lastCompactionFailedTimestamp": 0,
"lastCompactionDurationTimeInMills": 0
}
}
}
...TRUNCATED FOR READABILITY...
}
Get stats of all topics
curl -sS --fail --location --request GET "$WEB_SERVICE_URL/admin/v2/stats/topics/$TENANT/$NAMESPACE" --header "Authorization: Bearer $PULSAR_TOKEN" | python3 -mjson.tool
Result
{
"persistent://testcreate/ns0/mytopic3": {
"name": "persistent://testcreate/ns0/mytopic3",
"totalMessagesIn": 0,
"totalMessagesOut": 0,
"totalBytesIn": 0,
"totalBytesOut": 0,
"msgRateIn": 0,
"msgRateOut": 0,
"throughputIn": 0,
"throughputOut": 0,
"subscriptionCount": 0,
"producerCount": 0,
"consumerCount": 0,
"subscriptionDelayed": 0,
"storageSize": 0,
"backlogStorageByteSize": 0,
"msgBacklogNumber": 0,
"updatedAt": "2023-04-25T16:00:24.252397617Z"
},
"persistent://testcreate/ns0/t1": {
"name": "persistent://testcreate/ns0/t1",
"totalMessagesIn": 0,
"totalMessagesOut": 0,
"totalBytesIn": 0,
"totalBytesOut": 0,
"msgRateIn": 0,
"msgRateOut": 0,
"throughputIn": 0,
"throughputOut": 0,
"subscriptionCount": 0,
"producerCount": 0,
"consumerCount": 0,
"subscriptionDelayed": 0,
"storageSize": 0,
"backlogStorageByteSize": 0,
"msgBacklogNumber": 0,
"updatedAt": "2023-04-25T16:00:24.252466612Z"
},
"persistent://testcreate/ns0/t1-partition-0": {
"name": "persistent://testcreate/ns0/t1-partition-0",
"totalMessagesIn": 516,
"totalMessagesOut": 514,
"totalBytesIn": 637776,
"totalBytesOut": 637674,
"msgRateIn": 0,
"msgRateOut": 0,
"throughputIn": 0,
"throughputOut": 0,
"subscriptionCount": 1,
"producerCount": 0,
"consumerCount": 0,
"subscriptionDelayed": 0,
"storageSize": 1899200,
"backlogStorageByteSize": 0,
"msgBacklogNumber": 0,
"updatedAt": "2023-04-25T16:00:24.252410963Z"
},
"persistent://testcreate/ns0/t1-partition-1": {
"name": "persistent://testcreate/ns0/t1-partition-1",
"totalMessagesIn": 534,
"totalMessagesOut": 531,
"totalBytesIn": 696340,
"totalBytesOut": 692347,
"msgRateIn": 0,
"msgRateOut": 0,
"throughputIn": 0,
"throughputOut": 0,
"subscriptionCount": 1,
"producerCount": 0,
"consumerCount": 0,
"subscriptionDelayed": 0,
"storageSize": 2020678,
"backlogStorageByteSize": 2151,
"msgBacklogNumber": 3,
"updatedAt": "2023-04-25T16:00:24.252425482Z"
},
"persistent://testcreate/ns0/t1-partition-2": {
"name": "persistent://testcreate/ns0/t1-partition-2",
"totalMessagesIn": 522,
"totalMessagesOut": 519,
"totalBytesIn": 653487,
"totalBytesOut": 649286,
"msgRateIn": 0,
"msgRateOut": 0,
"throughputIn": 0,
"throughputOut": 0,
"subscriptionCount": 1,
"producerCount": 0,
"consumerCount": 0,
"subscriptionDelayed": 0,
"storageSize": 1916574,
"backlogStorageByteSize": 0,
"msgBacklogNumber": 0,
"updatedAt": "2023-04-25T16:00:24.252438306Z"
},
"persistent://testcreate/ns0/t1-partition-3": {
"name": "persistent://testcreate/ns0/t1-partition-3",
"totalMessagesIn": 516,
"totalMessagesOut": 514,
"totalBytesIn": 631638,
"totalBytesOut": 631536,
"msgRateIn": 0,
"msgRateOut": 0,
"throughputIn": 0,
"throughputOut": 0,
"subscriptionCount": 1,
"producerCount": 0,
"consumerCount": 0,
"subscriptionDelayed": 0,
"storageSize": 1890920,
"backlogStorageByteSize": 1586,
"msgBacklogNumber": 4,
"updatedAt": "2023-04-25T16:00:24.252452735Z"
...TRUNCATED FOR READABILITY...
}
...TRUNCATED FOR READABILITY...
}
Get topic subscriptions
curl -sS --fail --location --request GET "$WEB_SERVICE_URL/admin/v2/persistent/$TENANT/$NAMESPACE/$TOPIC/subscriptions" --header "Authorization: Bearer $PULSAR_TOKEN" | python3 -mjson.tool
Result
[
"mysub",
"subscript2"
]
Create a subscription for a topic
Create a replicated or non-replicated subscription. "Replicated=true" can be set to “false" for non-replicated subscriptions.
curl -sS --fail --location --request PUT "$WEB_SERVICE_URL/admin/v2/persistent/$TENANT/$NAMESPACE/$TOPIC/subscription/$SUBSCRIPTION?replicated=true" --header "Authorization: Bearer $PULSAR_TOKEN" --header "Content-Type: application/json"
No response indicates success.
Delete a subscription for a topic
curl -sS --fail --location --request DELETE"$WEB_SERVICE_URL/admin/v2/persistent/$TENANT/$NAMESPACE/$TOPIC/subscription/$SUBSCRIPTION" --header "Authorization: Bearer $PULSAR_TOKEN"
No response indicates success.
Clear a subscription for a topic
curl -sS --fail --location --request POST "$WEB_SERVICE_URL/admin/v2/persistent/$TENANT/$NAMESPACE/$TOPIC/subscription/$SUBSCRIPTION/skip_all" --header "Authorization: Bearer $PULSAR_TOKEN"
No response indicates success.
Astra Streaming DevOps API geo-replication operations
Get status of geo-replication
curl --location --fail --request GET "https://api.astra.datastax.com/v2/streaming/replications/$INSTANCE/$TENANT/$NAMESPACE" --header "Authorization: Bearer $ASTRA_TOKEN" | python3 -mjson.tool
Result
{
"pulsarInstance": "prod0",
"tenant": "mytenant",
"namespace": "mynamespace",
"replications": {
"pulsar-aws-useast2": [
"pulsar-aws-uswest2",
"pulsar-aws-useast2"
],
"pulsar-aws-uswest2": [
"pulsar-aws-uswest2",
"pulsar-aws-useast2"
]
},
"clusters": {
"pulsar-aws-useast2": {
"clusterName": "pulsar-aws-useast2",
"cloudProvider": "aws",
"cloudRegion": "useast2",
"clusterType": "cloud",
"webServiceUrl": "https://pvt-pulsar-aws-useast2:8443",
"brokerServiceUrl": "pulsar+ssl://pulsar-aws-useast2:6651",
"websocketUrl": "",
"pulsarInstance": "prod0",
"regionZone": ""
},
"pulsar-aws-uswest2": {
"clusterName": "pulsar-aws-uswest2",
"cloudProvider": "aws",
"cloudRegion": "uswest2",
"clusterType": "cloud",
"webServiceUrl": "https://pvt-pulsar-aws-uswest2:8443",
"brokerServiceUrl": "pulsar+ssl://pulsar-aws-uswest2:6651",
"websocketUrl": "",
"pulsarInstance": "prod0",
"regionZone": ""
}
...TRUNCATED FOR READABILITY...
}
}
Create geo-replication between namespaces
The JSON input parameters can be obtained from List Tenants with Details and Get a list cloud providers of Astra Streaming sections of this guide.
curl --location --fail --request POST "https://api.astra.datastax.com/v2/streaming/replications/$INSTANCE/$TENANT/$NAMESPACE" --header "Content-Type: application/json" --header "Authorization: Bearer $ASTRA_TOKEN" --data-raw '{
"bidirection": true,
"destCluster": "pulsar-aws-uswest2",
"email": "joshua@example.com",
"namespace": "mynamespace",
"originCluster": "pulsar-aws-useast2"
}'
No response indicates success.
Delete geo-replication between namespaces
The JSON input parameters can be obtained from List Tenants with Details and Get a list cloud providers of Astra Streaming sections of this guide.
curl --location --fail --request DELETE "https://api.astra.datastax.com/v2/streaming/replications/$INSTANCE/$TENANT/$NAMESPACE" \
--header "Content-Type: application/json" \
--header "Authorization: Bearer $ASTRA_TOKEN" \
--data-raw '{
"bidirection": true,
"destCluster": "pulsar-aws-uswest2",
"email": "joshua@example.com",
"namespace": "ns0",
"originCluster": "pulsar-aws-useast2"
}'
No response indicates success.
Pulsar Admin API functions operations
List existing functions in a namespace
curl --fail --location --request GET "$WEB_SERVICE_URL/admin/v3/functions/$TENANT/$NAMESPACE" --header "Authorization: Bearer $PULSAR_TOKEN" | python3 -mjson.tool
Result
[
"testfunction1"
]
Get status of a function
curl --fail --location --request GET "$WEB_SERVICE_URL/admin/v3/functions/$TENANT/$NAMESPACE/$FUNCTION/status" --header "Authorization: Bearer $PULSAR_TOKEN" | python3 -mjson.tool
Result
{
"numInstances": 1,
"numRunning": 0,
"instances": [
{
"instanceId": 0,
"status": {
"running": false,
"error": "",
"numRestarts": 0,
"numReceived": 0,
"numSuccessfullyProcessed": 0,
"numUserExceptions": 0,
"latestUserExceptions": null,
"numSystemExceptions": 0,
"latestSystemExceptions": null,
"averageLatency": 0.0,
"lastInvocationTime": 0,
"workerId": "pulsar-aws-useast2-function-0"
}
}
]
}
Get stats of a function
curl --fail --location --request GET "$WEB_SERVICE_URL/admin/v3/functions/$TENANT/$NAMESPACE/$FUNCTION/stats" --header "Authorization: Bearer $PULSAR_TOKEN" | python3 -mjson.tool
Result
{
"receivedTotal": 0,
"processedSuccessfullyTotal": 0,
"systemExceptionsTotal": 0,
"userExceptionsTotal": 0,
"avgProcessLatency": null,
"1min": {
"receivedTotal": 0,
"processedSuccessfullyTotal": 0,
"systemExceptionsTotal": 0,
"userExceptionsTotal": 0,
"avgProcessLatency": null
},
"lastInvocation": null,
"instances": [
{
"instanceId": 0,
"metrics": {
"receivedTotal": 0,
"processedSuccessfullyTotal": 0,
"systemExceptionsTotal": 0,
"userExceptionsTotal": 0,
"avgProcessLatency": null,
"1min": {
"receivedTotal": 0,
"processedSuccessfullyTotal": 0,
"systemExceptionsTotal": 0,
"userExceptionsTotal": 0,
"avgProcessLatency": null
},
"lastInvocation": null,
"userMetrics": {}
}
}
]
}
Get function details
curl --fail --location --request GET "$WEB_SERVICE_URL/admin/v3/functions/$TENANT/$NAMESPACE/$FUNCTION" --header "Authorization: Bearer $PULSAR_TOKEN" | python3 -mjson.tool
Result
{
"runtimeFlags": null,
"tenant": "mytenant",
"namespace": "mynamespace",
"name": "testfunction1",
"className": "TransformFunction",
"inputs": null,
"customSerdeInputs": null,
"topicsPattern": null,
"customSchemaInputs": null,
"customSchemaOutputs": null,
"inputSpecs": {
"testcreate/ns0/tp1": {
"schemaType": null,
"serdeClassName": null,
"schemaProperties": {},
"consumerProperties": {},
"receiverQueueSize": null,
"cryptoConfig": null,
"poolMessages": false,
"regexPattern": false
}
},
"output": "mytenant/mynamespace/tp2",
"producerConfig": {
"maxPendingMessages": null,
"maxPendingMessagesAcrossPartitions": null,
"useThreadLocalProducers": false,
"cryptoConfig": null,
"batchBuilder": ""
},
"outputSchemaType": null,
"outputSerdeClassName": null,
"logTopic": null,
"processingGuarantees": "ATLEAST_ONCE",
"retainOrdering": false,
"retainKeyOrdering": false,
"batchBuilder": null,
"forwardSourceMessageProperty": true,
"userConfig": {
"steps": [
{
"schema-type": "STRING",
"type": "cast"
}
]
},
"secrets": null,
"runtime": "JAVA",
"autoAck": true,
"maxMessageRetries": null,
"deadLetterTopic": null,
"subName": null,
"parallelism": 1,
"resources": {
"cpu": 0.25,
"ram": 1000000000,
"disk": 1000000000
},
"fqfn": null,
"windowConfig": null,
"timeoutMs": 11000,
"jar": null,
"py": null,
"go": null,
"functionType": null,
"cleanupSubscription": false,
"customRuntimeOptions": "",
"maxPendingAsyncRequests": null,
"exposePulsarAdminClientEnabled": null,
"subscriptionPosition": "Latest"
}
Start a function
curl --fail --location --request POST "$WEB_SERVICE_URL/admin/v3/functions/$TENANT/$NAMESPACE/$FUNCTION/start" --header "Authorization: Bearer $PULSAR_TOKEN"
No response indicates success.
Stop a function
curl --fail --location --request POST "$WEB_SERVICE_URL/admin/v3/functions/$TENANT/$NAMESPACE/$FUNCTION/stop" --header "Authorization: Bearer $PULSAR_TOKEN"
No response indicates success.
Restart a function
curl --fail --location --request POST "$WEB_SERVICE_URL/admin/v3/functions/$TENANT/$NAMESPACE/$FUNCTION/restart" --header "Authorization: Bearer $PULSAR_TOKEN"
No response indicates success.
Astra Streaming DevOps API JWT operations
List existing token IDs
Get a list of Token IDs for your Cluster. With the TokenID, you can then lookup and obtain the Pulsar JWT string. The TokenIDs are also listed in the Astra Portal for that Tenant and Cluster.
Required parameters "CLUSTER" is obtained from the “List Tenants with Details" API command.
curl --location --request GET "https://api.astra.datastax.com/v2/streaming/tenants/$TENANT/tokens" --header "Authorization: Bearer $ASTRA_TOKEN" --header "X-DataStax-Pulsar-Cluster: $CLUSTER" | python3 -mjson.tool
Result
[
{
"iat": 1679335276,
"iss": "datastax",
"sub": "client;b282a256-b129-43e9-b870. . .",
"tokenid": "cdb87797. . ."
}
]
List token string by ID
curl --fail --location --request GET "https://api.astra.datastax.com/v2/streaming/tenants/$TENANT/tokens/$TOKENID" --header "X-DataStax-Pulsar-Cluster: $CLUSTER" --header "Authorization: Bearer $ASTRA_TOKEN"
Result
Output: Raw string JWT
eyJhbGciOiJSUzI1NiIsI . . .
Create a JWT
Create a new Pulsar JWT. The new JWT will also be visible in the Astra Portal for that Tenant and Cluster.
Required parameters "CLUSTER" is obtained from the “List Tenants with Details" API command.
curl --fail --location --request POST "https://api.astra.datastax.com/v2/streaming/tenants/$TENANT/tokens" --header "X-DataStax-Pulsar-Cluster: $CLUSTER" --header "Authorization: Bearer $ASTRA_TOKEN"
Result
Output: new raw string JWT
eyJhbGciOiJSUzI1NiIsI . . .
Delete a JWT
Required parameters "CLUSTER" is obtained from the “List Tenants with Details" API command. List of "TOKENID" can be obtained from List Existing Tokens IDs.
curl --fail --location --request DELETE "https://api.astra.datastax.com/v2/streaming/tenants/$TENANT/tokens" --header "X-DataStax-Pulsar-Cluster: $CLUSTER" --header "Authorization: Bearer $ASTRA_TOKEN"
No response indicates success.
Pulsar Admin API IO connectors operations
Pulsar Sources and Sinks share a similar API structure for most methods.
List existing sources in a namespace
curl --fail --location --request GET "$WEB_SERVICE_URL/admin/v3/sources/$TENANT/$NAMESPACE" --header "Authorization: Bearer $PULSAR_TOKEN" | python3 -mjson.tool
Result
[
"mysource1"
]
List existing sinks in a namespace
curl --fail --location --request GET "$WEB_SERVICE_URL/admin/v3/sinks/$TENANT/$NAMESPACE" --header "Authorization: Bearer $PULSAR_TOKEN" | python3 -mjson.tool
Result
[
"mysink1"
]
Get status of a source
curl --fail --location --request GET "$WEB_SERVICE_URL/admin/v3/sources/$TENANT/$NAMESPACE/$SOURCE/status" --header "Authorization: Bearer $PULSAR_TOKEN" | python3 -mjson.tool
Result
{
"numInstances": 1,
"numRunning": 1,
"instances": [
{
"instanceId": 0,
"status": {
"running": true,
"error": "",
"numRestarts": 0,
"numReceivedFromSource": 0,
"numSystemExceptions": 0,
"latestSystemExceptions": [],
"numSourceExceptions": 0,
"latestSourceExceptions": [],
"numWritten": 0,
"lastReceivedTime": 0,
"workerId": "pulsar-aws-useast2-function-0"
}
}
]
}
Get status of a sink
curl --fail --location --request GET "$WEB_SERVICE_URL/admin/v3/sources/$TENANT/$NAMESPACE/$SINK/status" --header "Authorization: Bearer $PULSAR_TOKEN" | python3 -mjson.tool
Result
{
"numInstances": 1,
"numRunning": 0,
"instances": [
{
"instanceId": 0,
"status": {
"running": false,
"error": "",
"numRestarts": 0,
"numReadFromPulsar": 0,
"numSystemExceptions": 0,
"latestSystemExceptions": null,
"numSinkExceptions": 0,
"latestSinkExceptions": null,
"numWrittenToSink": 0,
"lastReceivedTime": 0,
"workerId": "pulsar-useast-function-1"
}
}
]
}
Get source connector details
curl --fail --location --request GET "$WEB_SERVICE_URL/admin/v3/sources/$TENANT/$NAMESPACE/$SOURCE" --header "Authorization: Bearer $PULSAR_TOKEN" | python3 -mjson.tool
Result
{
"archive": "builtin://netty",
"batchBuilder": null,
"batchSourceConfig": null,
"className": "org.apache.pulsar.io.netty.NettySource",
"configs": {
"host": "127.0.0.1",
"numberOfThreads": "1",
"port": "10999",
"type": "tcp"
},
"customRuntimeOptions": "internal_data",
"name": "mysource",
"namespace": "ns0",
"parallelism": 1,
"processingGuarantees": "ATLEAST_ONCE",
"producerConfig": {
"batchBuilder": "",
"cryptoConfig": null,
"maxPendingMessages": null,
"maxPendingMessagesAcrossPartitions": null,
"useThreadLocalProducers": false
},
"resources": {
"cpu": 0.25,
"disk": 1000000000,
"ram": 1000000000
},
"runtimeFlags": null,
"schemaType": null,
"secrets": null,
"serdeClassName": null,
"tenant": "testcreate",
"topicName": "persistent://testcreate/ns0/t1"
}
Get sink details
curl --fail --location --request GET "$WEB_SERVICE_URL/admin/v3/sinks/$TENANT/$NAMESPACE/$SINK" --header "Authorization: Bearer $PULSAR_TOKEN" | python3 -mjson.tool
Result
{
"archive": "builtin://data-generator",
"autoAck": true,
"className": "org.apache.pulsar.io.datagenerator.DataGeneratorPrintSink",
"cleanupSubscription": false,
"configs": {},
"customRuntimeOptions": "internal_data",
"deadLetterTopic": null,
"inputSpecs": {
"persistent://testcreate/ns0/tp1": {
"consumerProperties": {},
"cryptoConfig": null,
"poolMessages": false,
"receiverQueueSize": null,
"regexPattern": false,
"schemaProperties": {},
"schemaType": null,
"serdeClassName": null
}
},
"inputs": [
"persistent://testcreate/ns0/tp1"
],
"maxMessageRetries": null,
"name": "mysink1",
"namespace": "ns0",
"negativeAckRedeliveryDelayMs": null,
"parallelism": 1,
"processingGuarantees": "ATLEAST_ONCE",
"resources": {
"cpu": 0.15,
"disk": 500000000,
"ram": 400000000
},
"retainKeyOrdering": false,
"retainOrdering": false,
"runtimeFlags": null,
"secrets": null,
"sourceSubscriptionName": null,
"sourceSubscriptionPosition": "Latest",
"tenant": "testcreate",
"timeoutMs": 5000,
"topicToSchemaProperties": null,
"topicToSchemaType": null,
"topicToSerdeClassName": null,
"topicsPattern": null,
"transformFunction": null,
"transformFunctionClassName": null,
"transformFunctionConfig": null
}
Start a source connector
curl --fail --location --request POST "$WEB_SERVICE_URL/admin/v3/sources/$TENANT/$NAMESPACE/$SOURCE/start" --header "Authorization: Bearer $PULSAR_TOKEN"
No response indicates success.
Start a sink
curl --fail --location --request POST "$WEB_SERVICE_URL/admin/v3/sinks/$TENANT/$NAMESPACE/$SINK/start" --header "Authorization: Bearer $PULSAR_TOKEN"
No response indicates success.
Stop a source connector
curl --fail --location --request POST "$WEB_SERVICE_URL/admin/v3/sources/$TENANT/$NAMESPACE/$SOURCE/stop" --header "Authorization: Bearer $PULSAR_TOKEN"
No response indicates success.
Stop a sink
curl --fail --location --request POST "$WEB_SERVICE_URL/admin/v3/sinks/$TENANT/$NAMESPACE/$SINK/stop" --header "Authorization: Bearer $PULSAR_TOKEN"
No response indicates success.
Create a source connector
curl --fail --location --request POST "$WEB_SERVICE_URL/admin/v3/sources/$TENANT/$NAMESPACE/$SOURCE" --header "Authorization: Bearer $PULSAR_TOKEN" --form "sourceConfig=@mynetty-source-config.json;type=application/json"
No response indicates success.
In the example above, a configuration file is provided as input to CURL. The file is named "mynetty-source-config.json", which has the following context for the built-in “netty" source connector in Astra Streaming.
The curl parameter |
Delete a source connector
curl --fail --location --request DELETE "$WEB_SERVICE_URL/admin/v3/sources/$TENANT/$NAMESPACE/$SOURCE" --header "Authorization: Bearer $PULSAR_TOKEN"
No response indicates success.
Create a sink
curl --fail --location --request POST "$WEB_SERVICE_URL/admin/v3/sinks/$TENANT/$NAMESPACE/$SINK" --header "Authorization: Bearer $PULSAR_TOKEN" --form "sinkConfig=@mykafka-sink-config.json;type=application/json"
No response indicates success.
In the example above, a configuration file is provided as input to CURL. The file is named mykafka-sink-config.json which has the following context for the built-in “kafka" source connector in Astra Streaming.
{
"tenant": "testcreate",
"namespace": "ns0",
"name": "mykafkaconnector",
"archive": "builtin://kafka",
"parallelism": 1,
"autoAck": true,
"cleanupSubscription": false,
"configs": {
"acks": "1",
"batchSize": "16384",
"bootstrapServers": "localhost:55200,localhost:55201",
"maxRequestSize": "1048576",
"producerConfigProperties": {
"client.id": "astra-streaming-client",
"sasl.jaas.config": "sensitive_data_removed",
"sasl.mechanism": "PLAIN",
"sasl.password": "sensitive_data_removed",
"sasl.username": "myuserid",
"security.protocol": "SASL_SSL"
},
"topic": "mykafka-topic"
},
"inputs": [ "persistent://testcreate/ns0/mytopic3" ]
}
The curl parameter |
Delete a sink
curl --fail --location --request DELETE "$WEB_SERVICE_URL/admin/v3/sinks/$TENANT/$NAMESPACE/$SINK" --header "Authorization: Bearer $PULSAR_TOKEN"
No response indicates success.