API Operations
This Astra Streaming workbook is a comprehensive guide that provides detailed examples and practices for managing the Astra Streaming platform using the DevOps APIs. It provides details on the most commonly used APIs for managing Astra Streaming and Pulsar instances. These details include required parameters and the expected output from the API. The workbook is designed to fill the gap between detailed API reference docs and HowTo guides. The result is to help customers in operating and managing Astra Streaming and provide guidance on how to use DevOps API to automate many common tasks.
The workbook covers a wide range of topics, including provisioning of resources, monitoring, and troubleshooting. It provides instructions for various operations, such as creating a new tenant, namespace, topics, geo-replication, and access tokens, to setting up monitoring and alerting, and troubleshooting common issues.
Overall, this Astra Streaming Workbook is a valuable resource for customers who want to leverage the benefits of Astra Streaming and manage their streaming environment effectively with DevOps APIs. By following the best practices and guidelines outlined in the workbook, customers can ensure that their streaming applications are secure, performant, and reliable.
Prerequisites
To get started using Astra Streaming DevOps APIs, you will need an active Astra Streaming account. You can setup this account by following the guide below: Create an Astra Streaming Account
Once an account has been established, you will need to set up a Role and Access Token for Streaming. See the guide below on how to setup these items: Generate Astra Streaming Admin Token
As noted in the guide above, save or download the new Astra Streaming Token values. They will be needed in nearly all DevOps API calls, especially the “Token" value.
Prerequisites
Export the following environmental variables to your path.
export PULSAR_TOKEN="<Pulsar JWT Token>"
export WEB_SERVICE_URL="<Pulsar Web Service URL>"
export NAMESPACE="<Namespace to create>"
export TENANT="<Tenant of new Namespace>"
export TOPIC="<name of topic>"
export NUM_OF_PARTITIONS="<number of partitions>"
export SUBSCRIPTION="<name of subscription>"
export INSTANCE="<tenant instance name>"
export SOURCE="<source name>"
export SINK="<sink name>"
export FUNCTION="<function name>"
export TOKENID="<Token Id>"
“python3 -mjson.tool" in the examples below is only used to format the output into JSON. It is NOT required to execute the API requests. |
List tenants with details
-
Curl
-
Result
curl --location --request GET 'https://api.astra.datastax.com/v2/streaming/tenants' --header "Authorization: Bearer $ASTRA_ORG_TOKEN" | python3 -mjson.tool
[
{
"id": "14b77c47-bdfd-4ba1. . .",
"tenantName": "mytenant",
"clusterName": "pulsar-aws-useast2",
"webServiceUrl": "https://pulsar-aws-useast2",
"brokerServiceUrl": "pulsar+ssl://pulsar-aws-useast2:6651",
"websocketUrl": "wss://pulsar-aws-useast2:8001/ws/v2",
"websocketQueryParamUrl": "wss://pulsar-aws-useast2:8964/ws/v2",
"pulsarToken": "eyJhbGciOiJSUzI1NiIsInR5cCI6IkpX…..",
"plan": "payg",
"planCode": "1",
"astraOrgGUID": "b282a256-b129-......",
"cloudProvider": "aws",
"cloudProviderCode": "1",
"cloudRegion": "useast2",
"status": "active",
"jvmVersion": "JDK11",
"pulsarVersion": "2.10.2",
"regionZone": "na",
"Email": "",
"userMetricsUrl": "https://prometheus-aws-useast2….",
"pulsarInstance": "prod0"
},
{
"id": "e8bf25d8-a6a1-4169-. . .",
"tenantName": "mytenant2",
"clusterName": "pulsar-gcp-useast1",
"webServiceUrl": "https://pulsar-gcp-useast1",
"brokerServiceUrl": "pulsar+ssl://pulsar-gcp-useast1:6651",
"websocketUrl": "wss://pulsar-gcp-useast1m:8001/ws/v2",
"websocketQueryParamUrl": "wss://pulsar-gcp-useast1:8964/ws/v2",
"pulsarToken": "eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.ey. . .",
"plan": "payg",
"planCode": "1",
"astraOrgGUID": "b282a256-b129-43e9. . .",
"cloudProvider": "gcp",
"cloudProviderCode": "2",
"cloudRegion": "useast1",
"status": "active",
"jvmVersion": "JDK11",
"pulsarVersion": "2.10.2",
"regionZone": "na",
"Email": "",
"userMetricsUrl": "https://prometheus-gcp-useast1. . .",
"pulsarInstance": "prod0"
}
]
List Astra Streaming cloud providers
-
Curl
-
Result
curl --location --request GET 'https://api.astra.datastax.com/v2/streaming/providers' --header "Authorization: Bearer $ASTRA_ORG_TOKEN" | python3 -mjson.tool
{
"aws": [
"useast1",
"uswest2",
"useast2"
],
"azure": [
"westus2",
"eastus",
"australiaeast"
],
"gcp": [
"useast1",
"uscentral1",
"australiase1",
"europewest1",
"useast4"
]
}
Create DevOps API
-
Curl
-
With file input
-
Result
curl --location --request POST 'https://api.astra.datastax.com/v2/streaming/tenants' --header 'Content-Type: application/json' --header "Authorization: Bearer $ASTRA_ORG_TOKEN" --data-raw '{
"cloudProvider": "aws",
"cloudRegion": "useast2",
"tenantName": "mytenant",
"userEmail": "joshua@example.com"
}' | python3 -mjson.tool
curl --fail --location --request POST 'https://api.astra.datastax.com/v2/streaming/tenants' --header 'Content-Type: application/json' --header "Authorization: Bearer $ASTRA_ORG_TOKEN" --data "@mytenant-config.json" | python3 -mjson.tool
{
"namespace": "default",
"topic": "",
"id": "",
"tenantName": "mytenant",
"clusterName": "pulsar-aws-useast2",
"webServiceUrl": "https://pulsar-aws-useast2",
"brokerServiceUrl": "pulsar+ssl://pulsar-aws-useast2:6651",
"websocketUrl": "wss://pulsar-aws-useast2:8001/ws/v2",
"websocketQueryParamUrl": "wss://pulsar-aws-useast2:8964/ws/v2",
"pulsarToken": "eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9. . .",
"plan": "payg",
"planCode": "",
"astraOrgGUID": "b282a256-b129-43e9. . .",
"cloudProvider": "aws",
"cloudProviderCode": "",
"cloudRegion": "useast2",
"status": "active",
"jvmVersion": "JDK11",
"pulsarVersion": "2.10.2",
"regionZone": "",
"Email": "",
"userMetricsUrl": "",
"pulsarInstance": ""
}
The output includes the "pulsarToken" which is the JWT token for this Pulsar instance. |
Delete a tenant
-
Curl
-
Result
curl --location --request DELETE 'https://api.astra.datastax.com/v2/streaming/tenants/{tenant}/clusters/{cluster}' --header 'Content-Type: application/json' --header "Authorization: Bearer $ASTRA_ORG_TOKEN"
Output: No reply means successful.
Namespace DevOps APIs
For managing Astra Streaming Namespaces, we use the native Pulsar REST APIs. These are documented on the Apache Pulsar Docs for REST API.
List Existing Namespaces
-
Curl
-
Result
curl --location --request GET “https://$WEB_SERVER_URL/admin/v2/namespaces/$TENANT" --header "Authorization: Bearer $PULSAR_TOKEN" | python3 -mjson.tool
[
"mytenant/default",
"mytenant/mynamespace"
]
Create a Namespace
-
Curl
-
Result
curl -sS --fail --location --request PUT --header "Authorization: Bearer $PULSAR_TOKEN" "$WEB_SERVICE_URL/admin/v2/namespaces/$TENANT/$NAMESPACE"
Output: No reply means successful.
Delete a Namespace
-
Curl
-
Result
curl -sS --fail --location --request DELETE --header "Authorization: Bearer $PULSAR_TOKEN" "$WEB_SERVICE_URL/admin/v2/namespaces/$TENANT/$NAMESPACE"
Output: No reply means successful.
Get Namespace Message Retention
-
Curl
-
Result
curl -sS --fail --location --request GET "$WEB_SERVICE_URL/admin/v2/namespaces/$TENANT/$NAMESPACE/retention" --header "Authorization: Bearer $PULSAR_TOKEN" | python3 -mjson.tool
{
"retentionTimeInMinutes": 0,
"retentionSizeInMB": 0
}
Set Namespace Message Retention
-
Curl
-
Result
curl --location "$WEB_SERVICE_URL/admin/v2/namespaces/$TENANT/$NAMESPACE/retention" --header 'Content-Type: application/json' --header "Authorization: Bearer $PULSAR_TOKEN" --data '{
"retentionTimeInMinutes": 360,
"retentionSizeInMB": 102
}'
Output: No reply means successful.
Get Namespace BacklogQuota
-
Curl
-
Result
curl -sS --fail --location --request GET "$WEB_SERVICE_URL/admin/v2/namespaces/$TENANT/$NAMESPACE/backlogQuotaMap" --header "Authorization: Bearer $PULSAR_TOKEN" | python3 -mjson.tool
{
"destination_storage": {
"limit": -1,
"limitSize": 102400,
"limitTime": 3600,
"policy": "producer_exception"
}
Set Namespace BacklogQuota Settings
-
Curl
-
Result
curl -sS --fail --location --request POST "$WEB_SERVICE_URL/admin/v2/namespaces/$TENANT/$NAMESPACE/backlogQuota" --header "Authorization: Bearer $PULSAR_TOKEN" --header 'Content-Type: application/json' --data '{
"limit": -1,
"limitSize": 102400,
"limitTime": 3600,
"policy": "producer_exception"
}'
Output: No reply means successful.
Get Namespace Message TTL
-
Curl
-
Result
curl -sS --fail --location --request GET "$WEB_SERVICE_URL/admin/v2/namespaces/$TENANT/$NAMESPACE/messageTTL" --header "Authorization: Bearer $PULSAR_TOKEN" | python3 -mjson.tool
Output - Return raw number, like:
3600
Set Namespace Message TTL
-
Curl
-
Result
curl -sS --fail --location --request POST "$WEB_SERVICE_URL/admin/v2/namespaces/$TENANT/$NAMESPACE/messageTTL" --header "Authorization: Bearer $PULSAR_TOKEN" --header 'Content-Type: application/json' --data 3600
Output: No reply means successful.
Set AutoTopicCreation True/False on Namespace
Input parameter “topicType" should be either “non-partitioned" or “partitioned". |
-
Curl
-
Result
curl -sS --fail --location --request POST --header "Authorization: Bearer $PULSAR_TOKEN" "$WEB_SERVICE_URL/admin/v2/namespaces/$TENANT/$NAMESPACE/autoTopicCreation" --header 'Content-Type: application/json' --data '{
"allowAutoTopicCreation": false,
"topicType": "non-partitioned"
}'
Output: No reply means successful.
Get Namespace MaxConsumersPerTopic
-
Curl
-
Result
curl -sS --fail --location --request GET "$WEB_SERVICE_URL/admin/v2/namespaces/$TENANT/$NAMESPACE/maxConsumersPerTopic" --header "Authorization: Bearer $PULSAR_TOKEN" | python3 -mjson.tool
Output - Return raw number, like:
50
Set Namespace MaxConsumersPerTopic
-
Curl
-
Result
curl -sS --fail --location --request POST "$WEB_SERVICE_URL/admin/v2/namespaces/$TENANT/$NAMESPACE/maxConsumersPerTopic" --header "Authorization: Bearer $PULSAR_TOKEN" --header 'Content-Type: application/json' --data 100
Output - 409 Forbidden (Contact Astra Streaming Support to increase Max)
Get Namespace MaxTopicPerNamespace
-
Curl
-
Result
curl -sS --fail --location --request GET "$WEB_SERVICE_URL/admin/v2/namespaces/$TENANT/$NAMESPACE/maxTopicsPerNamespace" --header "Authorization: Bearer $PULSAR_TOKEN" | python3 -mjson.tool
Output - Return raw number, like:
50
Set Namespace MaxTopicPerNamespace
-
Curl
-
Result
curl -sS --fail --location "$WEB_SERVICE_URL/admin/v2/namespaces/$TENANT/$NAMESPACE/maxTopicsPerNamespace" --header 'Content-Type: application/json' --header "Authorization: Bearer $PULSAR_TOKEN" --data 1000
Output - Return raw number, like:
50
Topics DevOps APIs
-
Curl
-
Result
curl -sS --fail --location --request GET "$WEB_SERVICE_URL/admin/v2/persistent/$TENANT/$NAMESPACE" --header "Authorization: Bearer $PULSAR_TOKEN" | python3 -mjson.tool
[
"persistent://testtenant/ns0/mytopic-partition-0",
"persistent://testtenant/ns0/mytopic-partition-1",
"persistent://testtenant/ns0/topic1",
"persistent://testtenant/ns0/topic2",
"persistent://testtenant/ns0/tp1-partition-0",
"persistent://testtenant/ns0/tp1-partition-1",
"persistent://testtenant/ns0/tp1-partition-2",
"persistent://testtenant/ns0/tp1-partition-3"
]
Create Non-partitioned Topic
-
Curl
-
Result
curl -sS --fail --location --request PUT "$WEB_SERVICE_URL/admin/v2/persistent/$TENANT/$NAMESPACE
/$TOPIC" --header "Authorization: Bearer $PULSAR_TOKEN"
Output: No reply means successful.
Create Partitioned Topic
-
Curl
-
Result
curl -sS --fail --location --request PUT "$WEB_SERVICE_URL/admin/v2/persistent/$TENANT/$NAMESPACE/$TOPIC/partitions" --header "Authorization: Bearer $PULSAR_TOKEN" --header "Content-Type: application/json" --data $NUM_OF_PARTITIONS
Output: No reply means successful.
Delete a Persistent Topic
-
Curl
-
Result
curl -sS --fail --location --request DELETE"$WEB_SERVICE_URL/admin/v2/persistent/$TENANT/$NAMESPACE/$TOPIC/partitions" --header "Authorization: Bearer $PULSAR_TOKEN"
Output: No reply means successful.
Get InternalStats of Non-Partitioned Topic
-
Curl
-
Result
curl -sS --fail --location --request GET "$WEB_SERVICE_URL/admin/v2/persistent/$TENANT/$NAMESPACE/$TOPIC/internalStats" --header "Authorization: Bearer $PULSAR_TOKEN" | python3 -mjson.tool
{
"entriesAddedCounter": 0,
"numberOfEntries": 0,
"totalSize": 0,
"currentLedgerEntries": 0,
"currentLedgerSize": 0,
"lastLedgerCreatedTimestamp": "2023-04-25T15:35:45.136Z",
"waitingCursorsCount": 0,
"pendingAddEntriesCount": 0,
"lastConfirmedEntry": "275812:-1",
"state": "LedgerOpened",
"ledgers": [
{
"ledgerId": 275812,
"entries": 0,
"size": 0,
"offloaded": false,
"underReplicated": false
}
],
"cursors": {},
"schemaLedgers": [],
"compactedLedger": {
"ledgerId": -1,
"entries": -1,
"size": -1,
"offloaded": false,
"underReplicated": false
}
Get Stats of All Topics
-
Curl
-
Result
curl -sS --fail --location --request GET "$WEB_SERVICE_URL/admin/v2/stats/topics/$TENANT/$NAMESPACE" --header "Authorization: Bearer $PULSAR_TOKEN" | python3 -mjson.tool
{
"persistent://testcreate/ns0/mytopic3": {
"name": "persistent://testcreate/ns0/mytopic3",
"totalMessagesIn": 0,
"totalMessagesOut": 0,
"totalBytesIn": 0,
"totalBytesOut": 0,
"msgRateIn": 0,
"msgRateOut": 0,
"throughputIn": 0,
"throughputOut": 0,
"subscriptionCount": 0,
"producerCount": 0,
"consumerCount": 0,
"subscriptionDelayed": 0,
"storageSize": 0,
"backlogStorageByteSize": 0,
"msgBacklogNumber": 0,
"updatedAt": "2023-04-25T16:00:24.252397617Z"
},
"persistent://testcreate/ns0/t1": {
"name": "persistent://testcreate/ns0/t1",
"totalMessagesIn": 0,
"totalMessagesOut": 0,
"totalBytesIn": 0,
"totalBytesOut": 0,
"msgRateIn": 0,
"msgRateOut": 0,
"throughputIn": 0,
"throughputOut": 0,
"subscriptionCount": 0,
"producerCount": 0,
"consumerCount": 0,
"subscriptionDelayed": 0,
"storageSize": 0,
"backlogStorageByteSize": 0,
"msgBacklogNumber": 0,
"updatedAt": "2023-04-25T16:00:24.252466612Z"
},
"persistent://testcreate/ns0/t1-partition-0": {
"name": "persistent://testcreate/ns0/t1-partition-0",
"totalMessagesIn": 516,
"totalMessagesOut": 514,
"totalBytesIn": 637776,
"totalBytesOut": 637674,
"msgRateIn": 0,
"msgRateOut": 0,
"throughputIn": 0,
"throughputOut": 0,
"subscriptionCount": 1,
"producerCount": 0,
"consumerCount": 0,
"subscriptionDelayed": 0,
"storageSize": 1899200,
"backlogStorageByteSize": 0,
"msgBacklogNumber": 0,
"updatedAt": "2023-04-25T16:00:24.252410963Z"
},
"persistent://testcreate/ns0/t1-partition-1": {
"name": "persistent://testcreate/ns0/t1-partition-1",
"totalMessagesIn": 534,
"totalMessagesOut": 531,
"totalBytesIn": 696340,
"totalBytesOut": 692347,
"msgRateIn": 0,
"msgRateOut": 0,
"throughputIn": 0,
"throughputOut": 0,
"subscriptionCount": 1,
"producerCount": 0,
"consumerCount": 0,
"subscriptionDelayed": 0,
"storageSize": 2020678,
"backlogStorageByteSize": 2151,
"msgBacklogNumber": 3,
"updatedAt": "2023-04-25T16:00:24.252425482Z"
},
"persistent://testcreate/ns0/t1-partition-2": {
"name": "persistent://testcreate/ns0/t1-partition-2",
"totalMessagesIn": 522,
"totalMessagesOut": 519,
"totalBytesIn": 653487,
"totalBytesOut": 649286,
"msgRateIn": 0,
"msgRateOut": 0,
"throughputIn": 0,
"throughputOut": 0,
"subscriptionCount": 1,
"producerCount": 0,
"consumerCount": 0,
"subscriptionDelayed": 0,
"storageSize": 1916574,
"backlogStorageByteSize": 0,
"msgBacklogNumber": 0,
"updatedAt": "2023-04-25T16:00:24.252438306Z"
},
"persistent://testcreate/ns0/t1-partition-3": {
"name": "persistent://testcreate/ns0/t1-partition-3",
"totalMessagesIn": 516,
"totalMessagesOut": 514,
"totalBytesIn": 631638,
"totalBytesOut": 631536,
"msgRateIn": 0,
"msgRateOut": 0,
"throughputIn": 0,
"throughputOut": 0,
"subscriptionCount": 1,
"producerCount": 0,
"consumerCount": 0,
"subscriptionDelayed": 0,
"storageSize": 1890920,
"backlogStorageByteSize": 1586,
"msgBacklogNumber": 4,
"updatedAt": "2023-04-25T16:00:24.252452735Z"
Get Stats of Partitioned Topic
-
Curl
-
Result
curl -sS --fail --location --request GET "$WEB_SERVICE_URL/admin/v2/persistent/$TENANT/$NAMESPACE/$TOPIC/partitioned-stats" --header "Authorization: Bearer $PULSAR_TOKEN" | python3 -mjson.tool
{
"msgRateIn": 0.0,
"msgThroughputIn": 0.0,
"msgRateOut": 0.0,
"msgThroughputOut": 0.0,
"bytesInCounter": 0,
"msgInCounter": 0,
"bytesOutCounter": 0,
"msgOutCounter": 0,
"averageMsgSize": 0.0,
"msgChunkPublished": false,
"storageSize": 0,
"backlogSize": 0,
"publishRateLimitedTimes": 0,
"earliestMsgPublishTimeInBacklogs": 0,
"offloadedStorageSize": 0,
"lastOffloadLedgerId": 0,
"lastOffloadSuccessTimeStamp": 0,
"lastOffloadFailureTimeStamp": 0,
"publishers": [],
"waitingPublishers": 0,
"subscriptions": {},
"replication": {},
"nonContiguousDeletedMessagesRanges": 0,
"nonContiguousDeletedMessagesRangesSerializedSize": 0,
"compaction": {
"lastCompactionRemovedEventCount": 0,
"lastCompactionSucceedTimestamp": 0,
"lastCompactionFailedTimestamp": 0,
"lastCompactionDurationTimeInMills": 0
},
"metadata": {
"partitions": 2,
"deleted": false
},
"partitions": {
"persistent://testcreate/ns0/mytopic-partition-1": {
"msgRateIn": 0.0,
"msgThroughputIn": 0.0,
"msgRateOut": 0.0,
"msgThroughputOut": 0.0,
"bytesInCounter": 0,
"msgInCounter": 0,
"bytesOutCounter": 0,
"msgOutCounter": 0,
"averageMsgSize": 0.0,
"msgChunkPublished": false,
"storageSize": 0,
"backlogSize": 0,
"publishRateLimitedTimes": 0,
"earliestMsgPublishTimeInBacklogs": 0,
"offloadedStorageSize": 0,
"lastOffloadLedgerId": 0,
"lastOffloadSuccessTimeStamp": 0,
"lastOffloadFailureTimeStamp": 0,
"publishers": [],
"waitingPublishers": 0,
"subscriptions": {},
"replication": {},
"deduplicationStatus": "Disabled",
"nonContiguousDeletedMessagesRanges": 0,
"nonContiguousDeletedMessagesRangesSerializedSize": 0,
"compaction": {
"lastCompactionRemovedEventCount": 0,
"lastCompactionSucceedTimestamp": 0,
"lastCompactionFailedTimestamp": 0,
"lastCompactionDurationTimeInMills": 0
}
},
"persistent://testcreate/ns0/mytopic-partition-0": {
"msgRateIn": 0.0,
"msgThroughputIn": 0.0,
"msgRateOut": 0.0,
"msgThroughputOut": 0.0,
"bytesInCounter": 0,
"msgInCounter": 0,
"bytesOutCounter": 0,
"msgOutCounter": 0,
"averageMsgSize": 0.0,
"msgChunkPublished": false,
"storageSize": 0,
"backlogSize": 0,
"publishRateLimitedTimes": 0,
"earliestMsgPublishTimeInBacklogs": 0,
"offloadedStorageSize": 0,
"lastOffloadLedgerId": 0,
"lastOffloadSuccessTimeStamp": 0,
"lastOffloadFailureTimeStamp": 0,
"publishers": [],
"waitingPublishers": 0,
"subscriptions": {},
"replication": {},
"deduplicationStatus": "Disabled",
"nonContiguousDeletedMessagesRanges": 0,
"nonContiguousDeletedMessagesRangesSerializedSize": 0,
"compaction": {
"lastCompactionRemovedEventCount": 0,
"lastCompactionSucceedTimestamp": 0,
"lastCompactionFailedTimestamp": 0,
"lastCompactionDurationTimeInMills": 0
}
}
}
Get Stats of Non-partition Topic
-
Curl
-
Result
curl -sS --fail --location --request GET "$WEB_SERVICE_URL/admin/v2/persistent/$TENANT/$NAMESPACE/$TOPIC/stats" --header "Authorization: Bearer $PULSAR_TOKEN" | python3 -mjson.tool
{
"msgRateIn": 0.0,
"msgThroughputIn": 0.0,
"msgRateOut": 0.0,
"msgThroughputOut": 0.0,
"bytesInCounter": 0,
"msgInCounter": 0,
"bytesOutCounter": 0,
"msgOutCounter": 0,
"averageMsgSize": 0.0,
"msgChunkPublished": false,
"storageSize": 0,
"backlogSize": 0,
"publishRateLimitedTimes": 0,
"earliestMsgPublishTimeInBacklogs": 0,
"offloadedStorageSize": 0,
"lastOffloadLedgerId": 0,
"lastOffloadSuccessTimeStamp": 0,
"lastOffloadFailureTimeStamp": 0,
"publishers": [],
"waitingPublishers": 0,
"subscriptions": {},
"replication": {},
"deduplicationStatus": "Disabled",
"nonContiguousDeletedMessagesRanges": 0,
"nonContiguousDeletedMessagesRangesSerializedSize": 0,
"compaction": {
"lastCompactionRemovedEventCount": 0,
"lastCompactionSucceedTimestamp": 0,
"lastCompactionFailedTimestamp": 0,
"lastCompactionDurationTimeInMills": 0
}
Get List of Subscriptions for a Topic
-
Curl
-
Result
curl -sS --fail --location --request GET "$WEB_SERVICE_URL/admin/v2/persistent/$TENANT/$NAMESPACE/$TOPIC/subscriptions" --header "Authorization: Bearer $PULSAR_TOKEN" | python3 -mjson.tool
[
"mysub",
"subscript2"
]
Create a Subscription for a Topic (Replicated)
"Replicated=true" can be set to “false" for non replicated subscriptions. |
-
Curl
-
Result
curl -sS --fail --location --request PUT "$WEB_SERVICE_URL/admin/v2/persistent/$TENANT/$NAMESPACE/$TOPIC/subscription/$SUBSCRIPTION?replicated=true" --header "Authorization: Bearer $PULSAR_TOKEN" --header "Content-Type: application/json"
Output: No reply means successful.
Delete a Subscription for a Topic
-
Curl
-
Result
curl -sS --fail --location --request DELETE"$WEB_SERVICE_URL/admin/v2/persistent/$TENANT/$NAMESPACE/$TOPIC/subscription/$SUBSCRIPTION" --header "Authorization: Bearer $PULSAR_TOKEN"
Output: No reply means successful.
Clear a Subscription for a Topic
-
Curl
-
Result
curl -sS --fail --location --request POST "$WEB_SERVICE_URL/admin/v2/persistent/$TENANT/$NAMESPACE/$TOPIC/subscription/$SUBSCRIPTION/skip_all" --header "Authorization: Bearer $PULSAR_TOKEN"
Output: No reply means successful.
Geo-Replication DevOps APIs
Get Status of Geo-Replication
-
Curl
-
Result
curl --location --fail --request GET "https://api.astra.datastax.com/v2/streaming/replications/$INSTANCE/$TENANT/$NAMESPACE" --header "Authorization: Bearer $ASTRA_ORG_TOKEN" | python3 -mjson.tool
{
"pulsarInstance": "prod0",
"tenant": "mytenant",
"namespace": "mynamespace",
"replications": {
"pulsar-aws-useast2": [
"pulsar-aws-uswest2",
"pulsar-aws-useast2"
],
"pulsar-aws-uswest2": [
"pulsar-aws-uswest2",
"pulsar-aws-useast2"
]
},
"clusters": {
"pulsar-aws-useast2": {
"clusterName": "pulsar-aws-useast2",
"cloudProvider": "aws",
"cloudRegion": "useast2",
"clusterType": "cloud",
"webServiceUrl": "https://pvt-pulsar-aws-useast2:8443",
"brokerServiceUrl": "pulsar+ssl://pulsar-aws-useast2:6651",
"websocketUrl": "",
"pulsarInstance": "prod0",
"regionZone": ""
},
"pulsar-aws-uswest2": {
"clusterName": "pulsar-aws-uswest2",
"cloudProvider": "aws",
"cloudRegion": "uswest2",
"clusterType": "cloud",
"webServiceUrl": "https://pvt-pulsar-aws-uswest2:8443",
"brokerServiceUrl": "pulsar+ssl://pulsar-aws-uswest2:6651",
"websocketUrl": "",
"pulsarInstance": "prod0",
"regionZone": ""
}
Create Geo-Replication between Namespaces
The JSON input parameters can be obtained from List Tenants with Details and Get a list cloud providers of Astra Streaming sections of this guide. |
-
Curl
-
Result
curl --location --fail --request POST "https://api.astra.datastax.com/v2/streaming/replications/$INSTANCE/$TENANT/$NAMESPACE" --header "Content-Type: application/json" --header "Authorization: Bearer $ASTRA_ORG_TOKEN" --data-raw '{
"bidirection": true,
"destCluster": "pulsar-aws-uswest2",
"email": "joshua@example.com",
"namespace": "mynamespace",
"originCluster": "pulsar-aws-useast2"
}'
Output: No reply means successful.
Delete Geo-Replication between Namespaces
The JSON input parameters can be obtained from List Tenants with Details and Get a list cloud providers of Astra Streaming sections of this guide. |
-
Curl
-
Result
curl --location --fail --request DELETE "https://api.astra.datastax.com/v2/streaming/replications/$INSTANCE/$TENANT/$NAMESPACE" \
--header "Content-Type: application/json" \
--header "Authorization: Bearer $ASTRA_ORG_TOKEN" \
--data-raw '{
"bidirection": true,
"destCluster": "pulsar-aws-uswest2",
"email": "joshua@example.com",
"namespace": "ns0",
"originCluster": "pulsar-aws-useast2"
}'
Output: No reply means successful.
Functions DevOps APIs
List Existing Functions in a Namespace
-
Curl
-
Result
curl --fail --location --request GET "$WEB_SERVICE_URL/admin/v3/functions/$TENANT/$NAMESPACE" --header "Authorization: Bearer $PULSAR_TOKEN" | python3 -mjson.tool
[
"testfunction1"
]
Get Status of a Function
-
Curl
-
Result
curl --fail --location --request GET "$WEB_SERVICE_URL/admin/v3/functions/$TENANT/$NAMESPACE/$FUNCTION/status" --header "Authorization: Bearer $PULSAR_TOKEN" | python3 -mjson.tool
{
"numInstances": 1,
"numRunning": 0,
"instances": [
{
"instanceId": 0,
"status": {
"running": false,
"error": "",
"numRestarts": 0,
"numReceived": 0,
"numSuccessfullyProcessed": 0,
"numUserExceptions": 0,
"latestUserExceptions": null,
"numSystemExceptions": 0,
"latestSystemExceptions": null,
"averageLatency": 0.0,
"lastInvocationTime": 0,
"workerId": "pulsar-aws-useast2-function-0"
}
}
]
Get Stats of a Function
-
Curl
-
Result
curl --fail --location --request GET "$WEB_SERVICE_URL/admin/v3/functions/$TENANT/$NAMESPACE/$FUNCTION/stats" --header "Authorization: Bearer $PULSAR_TOKEN" | python3 -mjson.tool
{
"receivedTotal": 0,
"processedSuccessfullyTotal": 0,
"systemExceptionsTotal": 0,
"userExceptionsTotal": 0,
"avgProcessLatency": null,
"1min": {
"receivedTotal": 0,
"processedSuccessfullyTotal": 0,
"systemExceptionsTotal": 0,
"userExceptionsTotal": 0,
"avgProcessLatency": null
},
"lastInvocation": null,
"instances": [
{
"instanceId": 0,
"metrics": {
"receivedTotal": 0,
"processedSuccessfullyTotal": 0,
"systemExceptionsTotal": 0,
"userExceptionsTotal": 0,
"avgProcessLatency": null,
"1min": {
"receivedTotal": 0,
"processedSuccessfullyTotal": 0,
"systemExceptionsTotal": 0,
"userExceptionsTotal": 0,
"avgProcessLatency": null
},
"lastInvocation": null,
"userMetrics": {}
}
}
]
Get Function Details
-
Curl
-
Result
curl --fail --location --request GET "$WEB_SERVICE_URL/admin/v3/functions/$TENANT/$NAMESPACE/$FUNCTION" --header "Authorization: Bearer $PULSAR_TOKEN" | python3 -mjson.tool
{
"runtimeFlags": null,
"tenant": "mytenant",
"namespace": "mynamespace",
"name": "testfunction1",
"className": "TransformFunction",
"inputs": null,
"customSerdeInputs": null,
"topicsPattern": null,
"customSchemaInputs": null,
"customSchemaOutputs": null,
"inputSpecs": {
"testcreate/ns0/tp1": {
"schemaType": null,
"serdeClassName": null,
"schemaProperties": {},
"consumerProperties": {},
"receiverQueueSize": null,
"cryptoConfig": null,
"poolMessages": false,
"regexPattern": false
}
},
"output": "mytenant/mynamespace/tp2",
"producerConfig": {
"maxPendingMessages": null,
"maxPendingMessagesAcrossPartitions": null,
"useThreadLocalProducers": false,
"cryptoConfig": null,
"batchBuilder": ""
},
"outputSchemaType": null,
"outputSerdeClassName": null,
"logTopic": null,
"processingGuarantees": "ATLEAST_ONCE",
"retainOrdering": false,
"retainKeyOrdering": false,
"batchBuilder": null,
"forwardSourceMessageProperty": true,
"userConfig": {
"steps": [
{
"schema-type": "STRING",
"type": "cast"
}
]
},
"secrets": null,
"runtime": "JAVA",
"autoAck": true,
"maxMessageRetries": null,
"deadLetterTopic": null,
"subName": null,
"parallelism": 1,
"resources": {
"cpu": 0.25,
"ram": 1000000000,
"disk": 1000000000
},
"fqfn": null,
"windowConfig": null,
"timeoutMs": 11000,
"jar": null,
"py": null,
"go": null,
"functionType": null,
"cleanupSubscription": false,
"customRuntimeOptions": "",
"maxPendingAsyncRequests": null,
"exposePulsarAdminClientEnabled": null,
"subscriptionPosition": "Latest"
Start a Function
-
Curl
-
Result
curl --fail --location --request POST "$WEB_SERVICE_URL/admin/v3/functions/$TENANT/$NAMESPACE/$FUNCTION/start" --header "Authorization: Bearer $PULSAR_TOKEN"
Output: No reply means successful.
Stop a Function
-
Curl
-
Result
curl --fail --location --request POST "$WEB_SERVICE_URL/admin/v3/functions/$TENANT/$NAMESPACE/$FUNCTION/stop" --header "Authorization: Bearer $PULSAR_TOKEN"
Output: No reply means successful.
Restart a Function
-
Curl
-
Result
curl --fail --location --request POST "$WEB_SERVICE_URL/admin/v3/functions/$TENANT/$NAMESPACE/$FUNCTION/restart" --header "Authorization: Bearer $PULSAR_TOKEN"
Output: No reply means successful.
Astra Streaming JWT Token DevOps APIs
List Existing Tokens IDs
Get a list of Token IDs for your Cluster. With the TokenID, you can then lookup and obtain the Pulsar JWT Token string. The TokenIDs are also listed in the Astra UI for that Tenant and Cluster.
Note - Required parameters "CLUSTER" is obtained from the “List Tenants with Details" API command. |
-
Curl
-
Result
curl --location --request GET "https://api.astra.datastax.com/v2/streaming/tenants/$TENANT/tokens" --header "Authorization: Bearer $ASTRA_ORG_TOKEN" --header "X-DataStax-Pulsar-Cluster: $CLUSTER" | python3 -mjson.tool
[
{
"iat": 1679335276,
"iss": "datastax",
"sub": "client;b282a256-b129-43e9-b870. . .",
"tokenid": "cdb87797. . ."
}
]
List Token String by ID
-
Curl
-
Result
curl --fail --location --request GET "https://api.astra.datastax.com/v2/streaming/tenants/$TENANT/tokens/$TOKENID" --header "X-DataStax-Pulsar-Cluster: $CLUSTER" --header "Authorization: Bearer $ASTRA_ORG_TOKEN"
Output: Raw string JWT token
eyJhbGciOiJSUzI1NiIsI . . .
Create a JWT Token
Create a new Pulsar JWT Token. The new JWT Token will also be visible in the Astra UI for that Tenant and Cluster.
Required parameters "CLUSTER" is obtained from the “List Tenants with Details" API command. |
-
Curl
-
Result
curl --fail --location --request POST "https://api.astra.datastax.com/v2/streaming/tenants/$TENANT/tokens" --header "X-DataStax-Pulsar-Cluster: $CLUSTER" --header "Authorization: Bearer $ASTRA_ORG_TOKEN"
Output: new raw string JWT token
eyJhbGciOiJSUzI1NiIsI . . .
Delete a JWT Token
Required parameters "CLUSTER" is obtained from the “List Tenants with Details" API command. List of "TOKENID" can be obtained from List Existing Tokens IDs. |
-
Curl
-
Result
curl --fail --location --request DELETE "https://api.astra.datastax.com/v2/streaming/tenants/$TENANT/tokens" --header "X-DataStax-Pulsar-Cluster: $CLUSTER" --header "Authorization: Bearer $ASTRA_ORG_TOKEN"
Output: No reply means successful.
Pulsar IO Connectors DevOps APIs
Pulsar Sources and Sinks share a similar API structure for most methods. As such, this guide will show both Source and Sink CURL examples together.
List Existing Sources in a Namespace
-
Curl
-
Result
curl --fail --location --request GET "$WEB_SERVICE_URL/admin/v3/sources/$TENANT/$NAMESPACE" --header "Authorization: Bearer $PULSAR_TOKEN" | python3 -mjson.tool
[
"mysource1"
]
List Existing Sinks in a Namespace
-
Curl
-
Result
curl --fail --location --request GET "$WEB_SERVICE_URL/admin/v3/sinks/$TENANT/$NAMESPACE" --header "Authorization: Bearer $PULSAR_TOKEN" | python3 -mjson.tool
[
"mysink1"
]
Get Status of a Source
-
Curl
-
Result
curl --fail --location --request GET "$WEB_SERVICE_URL/admin/v3/sources/$TENANT/$NAMESPACE/$SOURCE/status" --header "Authorization: Bearer $PULSAR_TOKEN" | python3 -mjson.tool
{
"numInstances": 1,
"numRunning": 1,
"instances": [
{
"instanceId": 0,
"status": {
"running": true,
"error": "",
"numRestarts": 0,
"numReceivedFromSource": 0,
"numSystemExceptions": 0,
"latestSystemExceptions": [],
"numSourceExceptions": 0,
"latestSourceExceptions": [],
"numWritten": 0,
"lastReceivedTime": 0,
"workerId": "pulsar-aws-useast2-function-0"
}
}
]
Status of a Sink in a Namespace
-
Curl
-
Result
curl --fail --location --request GET "$WEB_SERVICE_URL/admin/v3/sources/$TENANT/$NAMESPACE/$SINK/status" --header "Authorization: Bearer $PULSAR_TOKEN" | python3 -mjson.tool
{
"numInstances": 1,
"numRunning": 0,
"instances": [
{
"instanceId": 0,
"status": {
"running": false,
"error": "",
"numRestarts": 0,
"numReadFromPulsar": 0,
"numSystemExceptions": 0,
"latestSystemExceptions": null,
"numSinkExceptions": 0,
"latestSinkExceptions": null,
"numWrittenToSink": 0,
"lastReceivedTime": 0,
"workerId": "pulsar-useast-function-1"
}
}
]
Get Source Connector Details
-
Curl
-
Result
curl --fail --location --request GET "$WEB_SERVICE_URL/admin/v3/sources/$TENANT/$NAMESPACE/$SOURCE" --header "Authorization: Bearer $PULSAR_TOKEN" | python3 -mjson.tool
{
"archive": "builtin://netty",
"batchBuilder": null,
"batchSourceConfig": null,
"className": "org.apache.pulsar.io.netty.NettySource",
"configs": {
"host": "127.0.0.1",
"numberOfThreads": "1",
"port": "10999",
"type": "tcp"
},
"customRuntimeOptions": "internal_data",
"name": "mysource",
"namespace": "ns0",
"parallelism": 1,
"processingGuarantees": "ATLEAST_ONCE",
"producerConfig": {
"batchBuilder": "",
"cryptoConfig": null,
"maxPendingMessages": null,
"maxPendingMessagesAcrossPartitions": null,
"useThreadLocalProducers": false
},
"resources": {
"cpu": 0.25,
"disk": 1000000000,
"ram": 1000000000
},
"runtimeFlags": null,
"schemaType": null,
"secrets": null,
"serdeClassName": null,
"tenant": "testcreate",
"topicName": "persistent://testcreate/ns0/t1"
Get Sink Details
-
Curl
-
Result
curl --fail --location --request GET "$WEB_SERVICE_URL/admin/v3/sinks/$TENANT/$NAMESPACE/$SINK" --header "Authorization: Bearer $PULSAR_TOKEN" | python3 -mjson.tool
{
"archive": "builtin://data-generator",
"autoAck": true,
"className": "org.apache.pulsar.io.datagenerator.DataGeneratorPrintSink",
"cleanupSubscription": false,
"configs": {},
"customRuntimeOptions": "internal_data",
"deadLetterTopic": null,
"inputSpecs": {
"persistent://testcreate/ns0/tp1": {
"consumerProperties": {},
"cryptoConfig": null,
"poolMessages": false,
"receiverQueueSize": null,
"regexPattern": false,
"schemaProperties": {},
"schemaType": null,
"serdeClassName": null
}
},
"inputs": [
"persistent://testcreate/ns0/tp1"
],
"maxMessageRetries": null,
"name": "mysink1",
"namespace": "ns0",
"negativeAckRedeliveryDelayMs": null,
"parallelism": 1,
"processingGuarantees": "ATLEAST_ONCE",
"resources": {
"cpu": 0.15,
"disk": 500000000,
"ram": 400000000
},
"retainKeyOrdering": false,
"retainOrdering": false,
"runtimeFlags": null,
"secrets": null,
"sourceSubscriptionName": null,
"sourceSubscriptionPosition": "Latest",
"tenant": "testcreate",
"timeoutMs": 5000,
"topicToSchemaProperties": null,
"topicToSchemaType": null,
"topicToSerdeClassName": null,
"topicsPattern": null,
"transformFunction": null,
"transformFunctionClassName": null,
"transformFunctionConfig": null
Start a Source Connector
-
Curl
-
Result
curl --fail --location --request POST "$WEB_SERVICE_URL/admin/v3/sources/$TENANT/$NAMESPACE/$SOURCE/start" --header "Authorization: Bearer $PULSAR_TOKEN"
Output: No reply means successful.
Start a Sink
-
Curl
-
Result
curl --fail --location --request POST "$WEB_SERVICE_URL/admin/v3/sinks/$TENANT/$NAMESPACE/$SINK/start" --header "Authorization: Bearer $PULSAR_TOKEN"
Output: No reply means successful.
Stop a Source Connector
-
Curl
-
Result
curl --fail --location --request POST "$WEB_SERVICE_URL/admin/v3/sources/$TENANT/$NAMESPACE/$SOURCE/stop" --header "Authorization: Bearer $PULSAR_TOKEN"
Output: No reply means successful.
Stop a Sink Connector
-
Curl
-
Result
curl --fail --location --request POST "$WEB_SERVICE_URL/admin/v3/sinks/$TENANT/$NAMESPACE/$SINK/stop" --header "Authorization: Bearer $PULSAR_TOKEN"
Output: No reply means successful.
Create a Source Connector
-
Curl
-
Result
curl --fail --location --request POST "$WEB_SERVICE_URL/admin/v3/sources/$TENANT/$NAMESPACE/$SOURCE" --header "Authorization: Bearer $PULSAR_TOKEN" --form "sourceConfig=@mynetty-source-config.json;type=application/json"
Output: No reply means successful.
In the example above, a configuration file is provided as input to CURL. The file is named "mynetty-source-config.json", which has the following context for the built-in “netty" source connector in Astra Streaming.
Output: No reply means successful.
The CURL parameter “@" indicates an input file. When executing the CURL command, ensure the input file is accessible, and in the proper directory for reading. |
Delete a Source Connector
-
Curl
-
Result
curl --fail --location --request DELETE "$WEB_SERVICE_URL/admin/v3/sources/$TENANT/$NAMESPACE/$SOURCE" --header "Authorization: Bearer $PULSAR_TOKEN"
Output: No reply means successful.
Create a Sink Connector
-
Curl
-
Result
curl --fail --location --request POST "$WEB_SERVICE_URL/admin/v3/sinks/$TENANT/$NAMESPACE/$SINK" --header "Authorization: Bearer $PULSAR_TOKEN" --form "sinkConfig=@mykafka-sink-config.json;type=application/json"
Output: No reply means successful.
In the example above, a configuration file is provided as input to CURL. The file is named mykafka-sink-config.json which has the following context for the built-in “kafka" source connector in Astra Streaming.
{
"tenant": "testcreate",
"namespace": "ns0",
"name": "mykafkaconnector",
"archive": "builtin://kafka",
"parallelism": 1,
"autoAck": true,
"cleanupSubscription": false,
"configs": {
"acks": "1",
"batchSize": "16384",
"bootstrapServers": "localhost:55200,localhost:55201",
"maxRequestSize": "1048576",
"producerConfigProperties": {
"client.id": "astra-streaming-client",
"sasl.jaas.config": "sensitive_data_removed",
"sasl.mechanism": "PLAIN",
"sasl.password": "sensitive_data_removed",
"sasl.username": "myuserid",
"security.protocol": "SASL_SSL"
},
"topic": "mykafka-topic"
},
"inputs": [ "persistent://testcreate/ns0/mytopic3" ]
The CURL parameter “@" indicates an input file. When executing the CURL command, ensure the input file is accessible, and in the proper directory for reading. |
Delete a Sink Connector
-
Curl
-
Result
curl --fail --location --request DELETE "$WEB_SERVICE_URL/admin/v3/sinks/$TENANT/$NAMESPACE/$SINK" --header "Authorization: Bearer $PULSAR_TOKEN"
Output: No reply means successful.