API Operations

This Astra Streaming workbook is a comprehensive guide that provides detailed examples and practices for managing the Astra Streaming platform using the DevOps APIs. It provides details on the most commonly used APIs for managing Astra Streaming and Pulsar instances. These details include required parameters and the expected output from the API. The workbook is designed to fill the gap between detailed API reference docs and HowTo guides. The result is to help customers in operating and managing Astra Streaming and provide guidance on how to use DevOps API to automate many common tasks.

The workbook covers a wide range of topics, including provisioning of resources, monitoring, and troubleshooting. It provides instructions for various operations, such as creating a new tenant, namespace, topics, geo-replication, and access tokens, to setting up monitoring and alerting, and troubleshooting common issues.

Overall, this Astra Streaming Workbook is a valuable resource for customers who want to leverage the benefits of Astra Streaming and manage their streaming environment effectively with DevOps APIs. By following the best practices and guidelines outlined in the workbook, customers can ensure that their streaming applications are secure, performant, and reliable.

Prerequisites

To get started using Astra Streaming DevOps APIs, you will need an active Astra Streaming account. You can setup this account by following the guide below: Create an Astra Streaming Account

Once an account has been established, you will need to set up a Role and Access Token for Streaming. See the guide below on how to setup these items: Generate Astra Streaming Admin Token

As noted in the guide above, save or download the new Astra Streaming Token values. They will be needed in nearly all DevOps API calls, especially the “Token" value.

Prerequisites

Export the following environmental variables to your path.

export PULSAR_TOKEN="<Pulsar JWT Token>"
export WEB_SERVICE_URL="<Pulsar Web Service URL>"
export NAMESPACE="<Namespace to create>"
export TENANT="<Tenant of new Namespace>"
export TOPIC="<name of topic>"
export NUM_OF_PARTITIONS="<number of partitions>"
export SUBSCRIPTION="<name of subscription>"
export INSTANCE="<tenant instance name>"
export SOURCE="<source name>"
export SINK="<sink name>"
export FUNCTION="<function name>"
export TOKENID="<Token Id>"

“python3 -mjson.tool" in the examples below is only used to format the output into JSON. It is NOT required to execute the API requests.

List tenants with details

  • Curl

  • Result

curl --location --request GET  'https://api.astra.datastax.com/v2/streaming/tenants' --header "Authorization: Bearer $ASTRA_ORG_TOKEN" | python3 -mjson.tool
[
    {
        "id": "14b77c47-bdfd-4ba1. . .",
        "tenantName": "mytenant",
        "clusterName": "pulsar-aws-useast2",
        "webServiceUrl": "https://pulsar-aws-useast2",
        "brokerServiceUrl": "pulsar+ssl://pulsar-aws-useast2:6651",
        "websocketUrl": "wss://pulsar-aws-useast2:8001/ws/v2",
        "websocketQueryParamUrl": "wss://pulsar-aws-useast2:8964/ws/v2",
        "pulsarToken": "eyJhbGciOiJSUzI1NiIsInR5cCI6IkpX…..",
        "plan": "payg",
        "planCode": "1",
        "astraOrgGUID": "b282a256-b129-......",
        "cloudProvider": "aws",
        "cloudProviderCode": "1",
        "cloudRegion": "useast2",
        "status": "active",
        "jvmVersion": "JDK11",
        "pulsarVersion": "2.10.2",
        "regionZone": "na",
        "Email": "",
        "userMetricsUrl": "https://prometheus-aws-useast2….",
        "pulsarInstance": "prod0"
    },
    {
        "id": "e8bf25d8-a6a1-4169-. . .",
        "tenantName": "mytenant2",
        "clusterName": "pulsar-gcp-useast1",
        "webServiceUrl": "https://pulsar-gcp-useast1",
        "brokerServiceUrl": "pulsar+ssl://pulsar-gcp-useast1:6651",
        "websocketUrl": "wss://pulsar-gcp-useast1m:8001/ws/v2",
        "websocketQueryParamUrl": "wss://pulsar-gcp-useast1:8964/ws/v2",
        "pulsarToken": "eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.ey. . .",
        "plan": "payg",
        "planCode": "1",
        "astraOrgGUID": "b282a256-b129-43e9. . .",
        "cloudProvider": "gcp",
        "cloudProviderCode": "2",
        "cloudRegion": "useast1",
        "status": "active",
        "jvmVersion": "JDK11",
        "pulsarVersion": "2.10.2",
        "regionZone": "na",
        "Email": "",
        "userMetricsUrl": "https://prometheus-gcp-useast1. . .",
        "pulsarInstance": "prod0"
    }
]

List Astra Streaming cloud providers

  • Curl

  • Result

curl --location --request GET  'https://api.astra.datastax.com/v2/streaming/providers' --header "Authorization: Bearer $ASTRA_ORG_TOKEN" | python3 -mjson.tool
{
    "aws": [
        "useast1",
        "uswest2",
        "useast2"
    ],
    "azure": [
        "westus2",
        "eastus",
        "australiaeast"
    ],
    "gcp": [
        "useast1",
        "uscentral1",
        "australiase1",
        "europewest1",
        "useast4"
    ]
}

Create DevOps API

  • Curl

  • With file input

  • Result

curl --location --request POST 'https://api.astra.datastax.com/v2/streaming/tenants' --header 'Content-Type: application/json' --header "Authorization: Bearer $ASTRA_ORG_TOKEN" --data-raw '{
 	"cloudProvider": "aws",
 	"cloudRegion": "useast2",
 	"tenantName": "mytenant",
 	"userEmail": "joshua@example.com"
 }' | python3 -mjson.tool
curl --fail --location --request POST 'https://api.astra.datastax.com/v2/streaming/tenants' --header 'Content-Type: application/json' --header "Authorization: Bearer $ASTRA_ORG_TOKEN" --data "@mytenant-config.json" | python3 -mjson.tool
{
    "namespace": "default",
    "topic": "",
    "id": "",
    "tenantName": "mytenant",
    "clusterName": "pulsar-aws-useast2",
    "webServiceUrl": "https://pulsar-aws-useast2",
    "brokerServiceUrl": "pulsar+ssl://pulsar-aws-useast2:6651",
    "websocketUrl": "wss://pulsar-aws-useast2:8001/ws/v2",
    "websocketQueryParamUrl": "wss://pulsar-aws-useast2:8964/ws/v2",
    "pulsarToken": "eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9. . .",
    "plan": "payg",
    "planCode": "",
    "astraOrgGUID": "b282a256-b129-43e9. . .",
    "cloudProvider": "aws",
    "cloudProviderCode": "",
    "cloudRegion": "useast2",
    "status": "active",
    "jvmVersion": "JDK11",
    "pulsarVersion": "2.10.2",
    "regionZone": "",
    "Email": "",
    "userMetricsUrl": "",
    "pulsarInstance": ""
}

The output includes the "pulsarToken" which is the JWT token for this Pulsar instance.

Delete a tenant

  • Curl

  • Result

curl --location --request DELETE 'https://api.astra.datastax.com/v2/streaming/tenants/{tenant}/clusters/{cluster}' --header 'Content-Type: application/json' --header "Authorization: Bearer $ASTRA_ORG_TOKEN"
Output: No reply means successful.

Namespace DevOps APIs

For managing Astra Streaming Namespaces, we use the native Pulsar REST APIs. These are documented on the Apache Pulsar Docs for REST API.

List Existing Namespaces

  • Curl

  • Result

curl --location --request GET “https://$WEB_SERVER_URL/admin/v2/namespaces/$TENANT" --header "Authorization: Bearer $PULSAR_TOKEN" | python3 -mjson.tool
[
    "mytenant/default",
    "mytenant/mynamespace"
]

Create a Namespace

  • Curl

  • Result

curl -sS --fail --location --request PUT --header "Authorization: Bearer $PULSAR_TOKEN" "$WEB_SERVICE_URL/admin/v2/namespaces/$TENANT/$NAMESPACE"
Output: No reply means successful.

Delete a Namespace

  • Curl

  • Result

curl -sS --fail --location --request DELETE --header "Authorization: Bearer $PULSAR_TOKEN" "$WEB_SERVICE_URL/admin/v2/namespaces/$TENANT/$NAMESPACE"
Output: No reply means successful.

Get Namespace Message Retention

  • Curl

  • Result

curl -sS --fail --location --request GET "$WEB_SERVICE_URL/admin/v2/namespaces/$TENANT/$NAMESPACE/retention" --header "Authorization: Bearer $PULSAR_TOKEN" | python3 -mjson.tool
{
    "retentionTimeInMinutes": 0,
    "retentionSizeInMB": 0
}

Set Namespace Message Retention

  • Curl

  • Result

curl --location "$WEB_SERVICE_URL/admin/v2/namespaces/$TENANT/$NAMESPACE/retention" --header 'Content-Type: application/json' --header "Authorization: Bearer $PULSAR_TOKEN" --data '{
    "retentionTimeInMinutes": 360,
    "retentionSizeInMB": 102
}'
Output: No reply means successful.

Get Namespace BacklogQuota

  • Curl

  • Result

curl -sS --fail --location --request GET "$WEB_SERVICE_URL/admin/v2/namespaces/$TENANT/$NAMESPACE/backlogQuotaMap" --header "Authorization: Bearer $PULSAR_TOKEN" | python3 -mjson.tool
{
    "destination_storage": {
        "limit": -1,
        "limitSize": 102400,
        "limitTime": 3600,
        "policy": "producer_exception"
    }

Set Namespace BacklogQuota Settings

  • Curl

  • Result

curl -sS --fail --location --request POST  "$WEB_SERVICE_URL/admin/v2/namespaces/$TENANT/$NAMESPACE/backlogQuota" --header "Authorization: Bearer $PULSAR_TOKEN" --header 'Content-Type: application/json' --data '{
"limit": -1,
"limitSize": 102400,
"limitTime": 3600,
"policy": "producer_exception"
}'
Output: No reply means successful.

Get Namespace Message TTL

  • Curl

  • Result

curl -sS --fail --location --request GET "$WEB_SERVICE_URL/admin/v2/namespaces/$TENANT/$NAMESPACE/messageTTL" --header "Authorization: Bearer $PULSAR_TOKEN" | python3 -mjson.tool
Output - Return raw number, like:
3600

Set Namespace Message TTL

  • Curl

  • Result

curl -sS --fail --location --request POST  "$WEB_SERVICE_URL/admin/v2/namespaces/$TENANT/$NAMESPACE/messageTTL" --header "Authorization: Bearer $PULSAR_TOKEN" --header 'Content-Type: application/json' --data 3600
Output: No reply means successful.

Set AutoTopicCreation True/False on Namespace

Input parameter “topicType" should be either “non-partitioned" or “partitioned".

  • Curl

  • Result

curl -sS --fail --location --request POST --header "Authorization: Bearer $PULSAR_TOKEN"  "$WEB_SERVICE_URL/admin/v2/namespaces/$TENANT/$NAMESPACE/autoTopicCreation" --header 'Content-Type: application/json' --data '{
  "allowAutoTopicCreation": false,
  "topicType": "non-partitioned"
}'
Output: No reply means successful.

Get Namespace MaxConsumersPerTopic

  • Curl

  • Result

curl -sS --fail --location --request GET "$WEB_SERVICE_URL/admin/v2/namespaces/$TENANT/$NAMESPACE/maxConsumersPerTopic" --header "Authorization: Bearer $PULSAR_TOKEN" | python3 -mjson.tool
Output - Return raw number, like:
50

Set Namespace MaxConsumersPerTopic

  • Curl

  • Result

curl -sS --fail --location --request POST "$WEB_SERVICE_URL/admin/v2/namespaces/$TENANT/$NAMESPACE/maxConsumersPerTopic" --header "Authorization: Bearer $PULSAR_TOKEN" --header 'Content-Type: application/json' --data 100
Output - 409 Forbidden (Contact Astra Streaming Support to increase Max)

Get Namespace MaxTopicPerNamespace

  • Curl

  • Result

curl -sS --fail --location --request GET "$WEB_SERVICE_URL/admin/v2/namespaces/$TENANT/$NAMESPACE/maxTopicsPerNamespace" --header "Authorization: Bearer $PULSAR_TOKEN" | python3 -mjson.tool
Output - Return raw number, like:
50

Set Namespace MaxTopicPerNamespace

  • Curl

  • Result

curl -sS --fail --location  "$WEB_SERVICE_URL/admin/v2/namespaces/$TENANT/$NAMESPACE/maxTopicsPerNamespace" --header 'Content-Type: application/json' --header "Authorization: Bearer $PULSAR_TOKEN" --data 1000
Output - Return raw number, like:
50

Topics DevOps APIs

  • Curl

  • Result

curl -sS --fail --location --request GET "$WEB_SERVICE_URL/admin/v2/persistent/$TENANT/$NAMESPACE" --header "Authorization: Bearer $PULSAR_TOKEN" | python3 -mjson.tool
[
    "persistent://testtenant/ns0/mytopic-partition-0",
    "persistent://testtenant/ns0/mytopic-partition-1",
    "persistent://testtenant/ns0/topic1",
    "persistent://testtenant/ns0/topic2",
    "persistent://testtenant/ns0/tp1-partition-0",
    "persistent://testtenant/ns0/tp1-partition-1",
    "persistent://testtenant/ns0/tp1-partition-2",
    "persistent://testtenant/ns0/tp1-partition-3"
]

Create Non-partitioned Topic

  • Curl

  • Result

curl -sS --fail --location --request PUT "$WEB_SERVICE_URL/admin/v2/persistent/$TENANT/$NAMESPACE
/$TOPIC" --header "Authorization: Bearer $PULSAR_TOKEN"
Output: No reply means successful.

Create Partitioned Topic

  • Curl

  • Result

curl -sS --fail --location --request PUT "$WEB_SERVICE_URL/admin/v2/persistent/$TENANT/$NAMESPACE/$TOPIC/partitions" --header "Authorization: Bearer $PULSAR_TOKEN" --header "Content-Type: application/json" --data $NUM_OF_PARTITIONS
Output: No reply means successful.

Delete a Persistent Topic

  • Curl

  • Result

curl -sS --fail --location --request DELETE"$WEB_SERVICE_URL/admin/v2/persistent/$TENANT/$NAMESPACE/$TOPIC/partitions" --header "Authorization: Bearer $PULSAR_TOKEN"
Output: No reply means successful.

Get InternalStats of Non-Partitioned Topic

  • Curl

  • Result

curl -sS --fail --location --request GET "$WEB_SERVICE_URL/admin/v2/persistent/$TENANT/$NAMESPACE/$TOPIC/internalStats" --header "Authorization: Bearer $PULSAR_TOKEN" | python3 -mjson.tool
{
    "entriesAddedCounter": 0,
    "numberOfEntries": 0,
    "totalSize": 0,
    "currentLedgerEntries": 0,
    "currentLedgerSize": 0,
    "lastLedgerCreatedTimestamp": "2023-04-25T15:35:45.136Z",
    "waitingCursorsCount": 0,
    "pendingAddEntriesCount": 0,
    "lastConfirmedEntry": "275812:-1",
    "state": "LedgerOpened",
    "ledgers": [
        {
            "ledgerId": 275812,
            "entries": 0,
            "size": 0,
            "offloaded": false,
            "underReplicated": false
        }
    ],
    "cursors": {},
    "schemaLedgers": [],
    "compactedLedger": {
        "ledgerId": -1,
        "entries": -1,
        "size": -1,
        "offloaded": false,
        "underReplicated": false
    }

Get Stats of All Topics

  • Curl

  • Result

curl -sS --fail --location --request GET "$WEB_SERVICE_URL/admin/v2/stats/topics/$TENANT/$NAMESPACE" --header "Authorization: Bearer $PULSAR_TOKEN" | python3 -mjson.tool
{
    "persistent://testcreate/ns0/mytopic3": {
        "name": "persistent://testcreate/ns0/mytopic3",
        "totalMessagesIn": 0,
        "totalMessagesOut": 0,
        "totalBytesIn": 0,
        "totalBytesOut": 0,
        "msgRateIn": 0,
        "msgRateOut": 0,
        "throughputIn": 0,
        "throughputOut": 0,
        "subscriptionCount": 0,
        "producerCount": 0,
        "consumerCount": 0,
        "subscriptionDelayed": 0,
        "storageSize": 0,
        "backlogStorageByteSize": 0,
        "msgBacklogNumber": 0,
        "updatedAt": "2023-04-25T16:00:24.252397617Z"
    },
    "persistent://testcreate/ns0/t1": {
        "name": "persistent://testcreate/ns0/t1",
        "totalMessagesIn": 0,
        "totalMessagesOut": 0,
        "totalBytesIn": 0,
        "totalBytesOut": 0,
        "msgRateIn": 0,
        "msgRateOut": 0,
        "throughputIn": 0,
        "throughputOut": 0,
        "subscriptionCount": 0,
        "producerCount": 0,
        "consumerCount": 0,
        "subscriptionDelayed": 0,
        "storageSize": 0,
        "backlogStorageByteSize": 0,
        "msgBacklogNumber": 0,
        "updatedAt": "2023-04-25T16:00:24.252466612Z"
    },
    "persistent://testcreate/ns0/t1-partition-0": {
        "name": "persistent://testcreate/ns0/t1-partition-0",
        "totalMessagesIn": 516,
        "totalMessagesOut": 514,
        "totalBytesIn": 637776,
        "totalBytesOut": 637674,
        "msgRateIn": 0,
        "msgRateOut": 0,
        "throughputIn": 0,
        "throughputOut": 0,
        "subscriptionCount": 1,
        "producerCount": 0,
        "consumerCount": 0,
        "subscriptionDelayed": 0,
        "storageSize": 1899200,
        "backlogStorageByteSize": 0,
        "msgBacklogNumber": 0,
        "updatedAt": "2023-04-25T16:00:24.252410963Z"
    },
    "persistent://testcreate/ns0/t1-partition-1": {
        "name": "persistent://testcreate/ns0/t1-partition-1",
        "totalMessagesIn": 534,
        "totalMessagesOut": 531,
        "totalBytesIn": 696340,
        "totalBytesOut": 692347,
        "msgRateIn": 0,
        "msgRateOut": 0,
        "throughputIn": 0,
        "throughputOut": 0,
        "subscriptionCount": 1,
        "producerCount": 0,
        "consumerCount": 0,
        "subscriptionDelayed": 0,
        "storageSize": 2020678,
        "backlogStorageByteSize": 2151,
        "msgBacklogNumber": 3,
        "updatedAt": "2023-04-25T16:00:24.252425482Z"
    },
    "persistent://testcreate/ns0/t1-partition-2": {
        "name": "persistent://testcreate/ns0/t1-partition-2",
        "totalMessagesIn": 522,
        "totalMessagesOut": 519,
        "totalBytesIn": 653487,
        "totalBytesOut": 649286,
        "msgRateIn": 0,
        "msgRateOut": 0,
        "throughputIn": 0,
        "throughputOut": 0,
        "subscriptionCount": 1,
        "producerCount": 0,
        "consumerCount": 0,
        "subscriptionDelayed": 0,
        "storageSize": 1916574,
        "backlogStorageByteSize": 0,
        "msgBacklogNumber": 0,
        "updatedAt": "2023-04-25T16:00:24.252438306Z"
    },
    "persistent://testcreate/ns0/t1-partition-3": {
        "name": "persistent://testcreate/ns0/t1-partition-3",
        "totalMessagesIn": 516,
        "totalMessagesOut": 514,
        "totalBytesIn": 631638,
        "totalBytesOut": 631536,
        "msgRateIn": 0,
        "msgRateOut": 0,
        "throughputIn": 0,
        "throughputOut": 0,
        "subscriptionCount": 1,
        "producerCount": 0,
        "consumerCount": 0,
        "subscriptionDelayed": 0,
        "storageSize": 1890920,
        "backlogStorageByteSize": 1586,
        "msgBacklogNumber": 4,
        "updatedAt": "2023-04-25T16:00:24.252452735Z"

Get Stats of Partitioned Topic

  • Curl

  • Result

curl -sS --fail --location --request GET "$WEB_SERVICE_URL/admin/v2/persistent/$TENANT/$NAMESPACE/$TOPIC/partitioned-stats" --header "Authorization: Bearer $PULSAR_TOKEN" | python3 -mjson.tool
{
    "msgRateIn": 0.0,
    "msgThroughputIn": 0.0,
    "msgRateOut": 0.0,
    "msgThroughputOut": 0.0,
    "bytesInCounter": 0,
    "msgInCounter": 0,
    "bytesOutCounter": 0,
    "msgOutCounter": 0,
    "averageMsgSize": 0.0,
    "msgChunkPublished": false,
    "storageSize": 0,
    "backlogSize": 0,
    "publishRateLimitedTimes": 0,
    "earliestMsgPublishTimeInBacklogs": 0,
    "offloadedStorageSize": 0,
    "lastOffloadLedgerId": 0,
    "lastOffloadSuccessTimeStamp": 0,
    "lastOffloadFailureTimeStamp": 0,
    "publishers": [],
    "waitingPublishers": 0,
    "subscriptions": {},
    "replication": {},
    "nonContiguousDeletedMessagesRanges": 0,
    "nonContiguousDeletedMessagesRangesSerializedSize": 0,
    "compaction": {
        "lastCompactionRemovedEventCount": 0,
        "lastCompactionSucceedTimestamp": 0,
        "lastCompactionFailedTimestamp": 0,
        "lastCompactionDurationTimeInMills": 0
    },
    "metadata": {
        "partitions": 2,
        "deleted": false
    },
    "partitions": {
        "persistent://testcreate/ns0/mytopic-partition-1": {
            "msgRateIn": 0.0,
            "msgThroughputIn": 0.0,
            "msgRateOut": 0.0,
            "msgThroughputOut": 0.0,
            "bytesInCounter": 0,
            "msgInCounter": 0,
            "bytesOutCounter": 0,
            "msgOutCounter": 0,
            "averageMsgSize": 0.0,
            "msgChunkPublished": false,
            "storageSize": 0,
            "backlogSize": 0,
            "publishRateLimitedTimes": 0,
            "earliestMsgPublishTimeInBacklogs": 0,
            "offloadedStorageSize": 0,
            "lastOffloadLedgerId": 0,
            "lastOffloadSuccessTimeStamp": 0,
            "lastOffloadFailureTimeStamp": 0,
            "publishers": [],
            "waitingPublishers": 0,
            "subscriptions": {},
            "replication": {},
            "deduplicationStatus": "Disabled",
            "nonContiguousDeletedMessagesRanges": 0,
            "nonContiguousDeletedMessagesRangesSerializedSize": 0,
            "compaction": {
                "lastCompactionRemovedEventCount": 0,
                "lastCompactionSucceedTimestamp": 0,
                "lastCompactionFailedTimestamp": 0,
                "lastCompactionDurationTimeInMills": 0
            }
        },
        "persistent://testcreate/ns0/mytopic-partition-0": {
            "msgRateIn": 0.0,
            "msgThroughputIn": 0.0,
            "msgRateOut": 0.0,
            "msgThroughputOut": 0.0,
            "bytesInCounter": 0,
            "msgInCounter": 0,
            "bytesOutCounter": 0,
            "msgOutCounter": 0,
            "averageMsgSize": 0.0,
            "msgChunkPublished": false,
            "storageSize": 0,
            "backlogSize": 0,
            "publishRateLimitedTimes": 0,
            "earliestMsgPublishTimeInBacklogs": 0,
            "offloadedStorageSize": 0,
            "lastOffloadLedgerId": 0,
            "lastOffloadSuccessTimeStamp": 0,
            "lastOffloadFailureTimeStamp": 0,
            "publishers": [],
            "waitingPublishers": 0,
            "subscriptions": {},
            "replication": {},
            "deduplicationStatus": "Disabled",
            "nonContiguousDeletedMessagesRanges": 0,
            "nonContiguousDeletedMessagesRangesSerializedSize": 0,
            "compaction": {
                "lastCompactionRemovedEventCount": 0,
                "lastCompactionSucceedTimestamp": 0,
                "lastCompactionFailedTimestamp": 0,
                "lastCompactionDurationTimeInMills": 0
            }
        }
    }

Get Stats of Non-partition Topic

  • Curl

  • Result

curl -sS --fail --location --request GET "$WEB_SERVICE_URL/admin/v2/persistent/$TENANT/$NAMESPACE/$TOPIC/stats" --header "Authorization: Bearer $PULSAR_TOKEN" | python3 -mjson.tool
{
    "msgRateIn": 0.0,
    "msgThroughputIn": 0.0,
    "msgRateOut": 0.0,
    "msgThroughputOut": 0.0,
    "bytesInCounter": 0,
    "msgInCounter": 0,
    "bytesOutCounter": 0,
    "msgOutCounter": 0,
    "averageMsgSize": 0.0,
    "msgChunkPublished": false,
    "storageSize": 0,
    "backlogSize": 0,
    "publishRateLimitedTimes": 0,
    "earliestMsgPublishTimeInBacklogs": 0,
    "offloadedStorageSize": 0,
    "lastOffloadLedgerId": 0,
    "lastOffloadSuccessTimeStamp": 0,
    "lastOffloadFailureTimeStamp": 0,
    "publishers": [],
    "waitingPublishers": 0,
    "subscriptions": {},
    "replication": {},
    "deduplicationStatus": "Disabled",
    "nonContiguousDeletedMessagesRanges": 0,
    "nonContiguousDeletedMessagesRangesSerializedSize": 0,
    "compaction": {
        "lastCompactionRemovedEventCount": 0,
        "lastCompactionSucceedTimestamp": 0,
        "lastCompactionFailedTimestamp": 0,
        "lastCompactionDurationTimeInMills": 0
    }

Get List of Subscriptions for a Topic

  • Curl

  • Result

curl -sS --fail --location --request GET "$WEB_SERVICE_URL/admin/v2/persistent/$TENANT/$NAMESPACE/$TOPIC/subscriptions" --header "Authorization: Bearer $PULSAR_TOKEN" | python3 -mjson.tool
[
    "mysub",
    "subscript2"
]

Create a Subscription for a Topic (Replicated)

"Replicated=true" can be set to “false" for non replicated subscriptions.

  • Curl

  • Result

curl -sS --fail --location --request PUT "$WEB_SERVICE_URL/admin/v2/persistent/$TENANT/$NAMESPACE/$TOPIC/subscription/$SUBSCRIPTION?replicated=true" --header "Authorization: Bearer $PULSAR_TOKEN"  --header "Content-Type: application/json"
Output: No reply means successful.

Delete a Subscription for a Topic

  • Curl

  • Result

curl -sS --fail --location --request DELETE"$WEB_SERVICE_URL/admin/v2/persistent/$TENANT/$NAMESPACE/$TOPIC/subscription/$SUBSCRIPTION" --header "Authorization: Bearer $PULSAR_TOKEN"
Output: No reply means successful.

Clear a Subscription for a Topic

  • Curl

  • Result

curl -sS --fail --location --request POST "$WEB_SERVICE_URL/admin/v2/persistent/$TENANT/$NAMESPACE/$TOPIC/subscription/$SUBSCRIPTION/skip_all" --header "Authorization: Bearer $PULSAR_TOKEN"
Output: No reply means successful.

Geo-Replication DevOps APIs

Get Status of Geo-Replication

  • Curl

  • Result

curl --location --fail --request GET "https://api.astra.datastax.com/v2/streaming/replications/$INSTANCE/$TENANT/$NAMESPACE"  --header "Authorization: Bearer $ASTRA_ORG_TOKEN" | python3 -mjson.tool
{
    "pulsarInstance": "prod0",
    "tenant": "mytenant",
    "namespace": "mynamespace",
    "replications": {
        "pulsar-aws-useast2": [
            "pulsar-aws-uswest2",
            "pulsar-aws-useast2"
        ],
        "pulsar-aws-uswest2": [
            "pulsar-aws-uswest2",
            "pulsar-aws-useast2"
        ]
    },
    "clusters": {
        "pulsar-aws-useast2": {
            "clusterName": "pulsar-aws-useast2",
            "cloudProvider": "aws",
            "cloudRegion": "useast2",
            "clusterType": "cloud",
            "webServiceUrl": "https://pvt-pulsar-aws-useast2:8443",
            "brokerServiceUrl": "pulsar+ssl://pulsar-aws-useast2:6651",
            "websocketUrl": "",
            "pulsarInstance": "prod0",
            "regionZone": ""
        },
        "pulsar-aws-uswest2": {
            "clusterName": "pulsar-aws-uswest2",
            "cloudProvider": "aws",
            "cloudRegion": "uswest2",
            "clusterType": "cloud",
            "webServiceUrl": "https://pvt-pulsar-aws-uswest2:8443",
            "brokerServiceUrl": "pulsar+ssl://pulsar-aws-uswest2:6651",
            "websocketUrl": "",
            "pulsarInstance": "prod0",
            "regionZone": ""
        }

Create Geo-Replication between Namespaces

The JSON input parameters can be obtained from List Tenants with Details and Get a list cloud providers of Astra Streaming sections of this guide.

  • Curl

  • Result

curl --location --fail --request POST "https://api.astra.datastax.com/v2/streaming/replications/$INSTANCE/$TENANT/$NAMESPACE"  --header "Content-Type: application/json"  --header "Authorization: Bearer $ASTRA_ORG_TOKEN"  --data-raw '{
  "bidirection": true,
  "destCluster": "pulsar-aws-uswest2",
  "email": "joshua@example.com",
  "namespace": "mynamespace",
  "originCluster": "pulsar-aws-useast2"
}'
Output: No reply means successful.

Delete Geo-Replication between Namespaces

The JSON input parameters can be obtained from List Tenants with Details and Get a list cloud providers of Astra Streaming sections of this guide.

  • Curl

  • Result

curl --location --fail --request DELETE "https://api.astra.datastax.com/v2/streaming/replications/$INSTANCE/$TENANT/$NAMESPACE" \
 --header "Content-Type: application/json" \
 --header "Authorization: Bearer $ASTRA_ORG_TOKEN" \
 --data-raw '{
  "bidirection": true,
  "destCluster": "pulsar-aws-uswest2",
  "email": "joshua@example.com",
  "namespace": "ns0",
  "originCluster": "pulsar-aws-useast2"
}'
Output: No reply means successful.

Functions DevOps APIs

List Existing Functions in a Namespace

  • Curl

  • Result

curl --fail --location --request GET "$WEB_SERVICE_URL/admin/v3/functions/$TENANT/$NAMESPACE" --header "Authorization: Bearer $PULSAR_TOKEN" | python3 -mjson.tool
[
    "testfunction1"
]

Get Status of a Function

  • Curl

  • Result

curl --fail --location --request GET "$WEB_SERVICE_URL/admin/v3/functions/$TENANT/$NAMESPACE/$FUNCTION/status" --header "Authorization: Bearer $PULSAR_TOKEN" | python3 -mjson.tool
{
    "numInstances": 1,
    "numRunning": 0,
    "instances": [
        {
            "instanceId": 0,
            "status": {
                "running": false,
                "error": "",
                "numRestarts": 0,
                "numReceived": 0,
                "numSuccessfullyProcessed": 0,
                "numUserExceptions": 0,
                "latestUserExceptions": null,
                "numSystemExceptions": 0,
                "latestSystemExceptions": null,
                "averageLatency": 0.0,
                "lastInvocationTime": 0,
                "workerId": "pulsar-aws-useast2-function-0"
            }
        }
    ]

Get Stats of a Function

  • Curl

  • Result

curl --fail --location --request GET "$WEB_SERVICE_URL/admin/v3/functions/$TENANT/$NAMESPACE/$FUNCTION/stats" --header "Authorization: Bearer $PULSAR_TOKEN" | python3 -mjson.tool
{
    "receivedTotal": 0,
    "processedSuccessfullyTotal": 0,
    "systemExceptionsTotal": 0,
    "userExceptionsTotal": 0,
    "avgProcessLatency": null,
    "1min": {
        "receivedTotal": 0,
        "processedSuccessfullyTotal": 0,
        "systemExceptionsTotal": 0,
        "userExceptionsTotal": 0,
        "avgProcessLatency": null
    },
    "lastInvocation": null,
    "instances": [
        {
            "instanceId": 0,
            "metrics": {
                "receivedTotal": 0,
                "processedSuccessfullyTotal": 0,
                "systemExceptionsTotal": 0,
                "userExceptionsTotal": 0,
                "avgProcessLatency": null,
                "1min": {
                    "receivedTotal": 0,
                    "processedSuccessfullyTotal": 0,
                    "systemExceptionsTotal": 0,
                    "userExceptionsTotal": 0,
                    "avgProcessLatency": null
                },
                "lastInvocation": null,
                "userMetrics": {}
            }
        }
    ]

Get Function Details

  • Curl

  • Result

curl --fail --location --request GET "$WEB_SERVICE_URL/admin/v3/functions/$TENANT/$NAMESPACE/$FUNCTION" --header "Authorization: Bearer $PULSAR_TOKEN" | python3 -mjson.tool
{
    "runtimeFlags": null,
    "tenant": "mytenant",
    "namespace": "mynamespace",
    "name": "testfunction1",
    "className": "TransformFunction",
    "inputs": null,
    "customSerdeInputs": null,
    "topicsPattern": null,
    "customSchemaInputs": null,
    "customSchemaOutputs": null,
    "inputSpecs": {
        "testcreate/ns0/tp1": {
            "schemaType": null,
            "serdeClassName": null,
            "schemaProperties": {},
            "consumerProperties": {},
            "receiverQueueSize": null,
            "cryptoConfig": null,
            "poolMessages": false,
            "regexPattern": false
        }
    },
    "output": "mytenant/mynamespace/tp2",
    "producerConfig": {
        "maxPendingMessages": null,
        "maxPendingMessagesAcrossPartitions": null,
        "useThreadLocalProducers": false,
        "cryptoConfig": null,
        "batchBuilder": ""
    },
    "outputSchemaType": null,
    "outputSerdeClassName": null,
    "logTopic": null,
    "processingGuarantees": "ATLEAST_ONCE",
    "retainOrdering": false,
    "retainKeyOrdering": false,
    "batchBuilder": null,
    "forwardSourceMessageProperty": true,
    "userConfig": {
        "steps": [
            {
                "schema-type": "STRING",
                "type": "cast"
            }
        ]
    },
    "secrets": null,
    "runtime": "JAVA",
    "autoAck": true,
    "maxMessageRetries": null,
    "deadLetterTopic": null,
    "subName": null,
    "parallelism": 1,
    "resources": {
        "cpu": 0.25,
        "ram": 1000000000,
        "disk": 1000000000
    },
    "fqfn": null,
    "windowConfig": null,
    "timeoutMs": 11000,
    "jar": null,
    "py": null,
    "go": null,
    "functionType": null,
    "cleanupSubscription": false,
    "customRuntimeOptions": "",
    "maxPendingAsyncRequests": null,
    "exposePulsarAdminClientEnabled": null,
    "subscriptionPosition": "Latest"

Start a Function

  • Curl

  • Result

curl --fail --location --request POST "$WEB_SERVICE_URL/admin/v3/functions/$TENANT/$NAMESPACE/$FUNCTION/start" --header "Authorization: Bearer $PULSAR_TOKEN"
Output: No reply means successful.

Stop a Function

  • Curl

  • Result

curl --fail --location --request POST "$WEB_SERVICE_URL/admin/v3/functions/$TENANT/$NAMESPACE/$FUNCTION/stop" --header "Authorization: Bearer $PULSAR_TOKEN"
Output: No reply means successful.

Restart a Function

  • Curl

  • Result

curl --fail --location --request POST "$WEB_SERVICE_URL/admin/v3/functions/$TENANT/$NAMESPACE/$FUNCTION/restart" --header "Authorization: Bearer $PULSAR_TOKEN"
Output: No reply means successful.

Astra Streaming JWT Token DevOps APIs

List Existing Tokens IDs

Get a list of Token IDs for your Cluster. With the TokenID, you can then lookup and obtain the Pulsar JWT Token string. The TokenIDs are also listed in the Astra UI for that Tenant and Cluster.

Note - Required parameters "CLUSTER" is obtained from the “List Tenants with Details" API command.

  • Curl

  • Result

curl --location --request GET "https://api.astra.datastax.com/v2/streaming/tenants/$TENANT/tokens" --header "Authorization: Bearer $ASTRA_ORG_TOKEN" --header "X-DataStax-Pulsar-Cluster: $CLUSTER" | python3 -mjson.tool
[
    {
        "iat": 1679335276,
        "iss": "datastax",
        "sub": "client;b282a256-b129-43e9-b870. . .",
        "tokenid": "cdb87797. . ."
    }
]

List Token String by ID

  • Curl

  • Result

curl --fail --location --request GET "https://api.astra.datastax.com/v2/streaming/tenants/$TENANT/tokens/$TOKENID" --header "X-DataStax-Pulsar-Cluster: $CLUSTER" --header "Authorization: Bearer $ASTRA_ORG_TOKEN"
Output: Raw string JWT token
eyJhbGciOiJSUzI1NiIsI . . .

Create a JWT Token

Create a new Pulsar JWT Token. The new JWT Token will also be visible in the Astra UI for that Tenant and Cluster.

Required parameters "CLUSTER" is obtained from the “List Tenants with Details" API command.

  • Curl

  • Result

curl --fail --location --request POST "https://api.astra.datastax.com/v2/streaming/tenants/$TENANT/tokens" --header "X-DataStax-Pulsar-Cluster: $CLUSTER" --header "Authorization: Bearer $ASTRA_ORG_TOKEN"
Output: new raw string JWT token
eyJhbGciOiJSUzI1NiIsI . . .

Delete a JWT Token

Required parameters "CLUSTER" is obtained from the “List Tenants with Details" API command. List of "TOKENID" can be obtained from List Existing Tokens IDs.

  • Curl

  • Result

curl --fail --location --request DELETE "https://api.astra.datastax.com/v2/streaming/tenants/$TENANT/tokens" --header "X-DataStax-Pulsar-Cluster: $CLUSTER" --header "Authorization: Bearer $ASTRA_ORG_TOKEN"
Output: No reply means successful.

Pulsar IO Connectors DevOps APIs

Pulsar Sources and Sinks share a similar API structure for most methods. As such, this guide will show both Source and Sink CURL examples together.

List Existing Sources in a Namespace

  • Curl

  • Result

curl --fail --location --request GET "$WEB_SERVICE_URL/admin/v3/sources/$TENANT/$NAMESPACE" --header "Authorization: Bearer $PULSAR_TOKEN" | python3 -mjson.tool
[
    "mysource1"
]

List Existing Sinks in a Namespace

  • Curl

  • Result

curl --fail --location --request GET "$WEB_SERVICE_URL/admin/v3/sinks/$TENANT/$NAMESPACE" --header "Authorization: Bearer $PULSAR_TOKEN" | python3 -mjson.tool
[
    "mysink1"
]

Get Status of a Source

  • Curl

  • Result

curl --fail --location --request GET "$WEB_SERVICE_URL/admin/v3/sources/$TENANT/$NAMESPACE/$SOURCE/status" --header "Authorization: Bearer $PULSAR_TOKEN" | python3 -mjson.tool
{
    "numInstances": 1,
    "numRunning": 1,
    "instances": [
        {
            "instanceId": 0,
            "status": {
                "running": true,
                "error": "",
                "numRestarts": 0,
                "numReceivedFromSource": 0,
                "numSystemExceptions": 0,
                "latestSystemExceptions": [],
                "numSourceExceptions": 0,
                "latestSourceExceptions": [],
                "numWritten": 0,
                "lastReceivedTime": 0,
                "workerId": "pulsar-aws-useast2-function-0"
            }
        }
    ]

Status of a Sink in a Namespace

  • Curl

  • Result

curl --fail --location --request GET "$WEB_SERVICE_URL/admin/v3/sources/$TENANT/$NAMESPACE/$SINK/status" --header "Authorization: Bearer $PULSAR_TOKEN" | python3 -mjson.tool
{
    "numInstances": 1,
    "numRunning": 0,
    "instances": [
        {
            "instanceId": 0,
            "status": {
                "running": false,
                "error": "",
                "numRestarts": 0,
                "numReadFromPulsar": 0,
                "numSystemExceptions": 0,
                "latestSystemExceptions": null,
                "numSinkExceptions": 0,
                "latestSinkExceptions": null,
                "numWrittenToSink": 0,
                "lastReceivedTime": 0,
                "workerId": "pulsar-useast-function-1"
            }
        }
    ]

Get Source Connector Details

  • Curl

  • Result

curl --fail --location --request GET "$WEB_SERVICE_URL/admin/v3/sources/$TENANT/$NAMESPACE/$SOURCE" --header "Authorization: Bearer $PULSAR_TOKEN" | python3 -mjson.tool
{
    "archive": "builtin://netty",
    "batchBuilder": null,
    "batchSourceConfig": null,
    "className": "org.apache.pulsar.io.netty.NettySource",
    "configs": {
        "host": "127.0.0.1",
        "numberOfThreads": "1",
        "port": "10999",
        "type": "tcp"
    },
    "customRuntimeOptions": "internal_data",
    "name": "mysource",
    "namespace": "ns0",
    "parallelism": 1,
    "processingGuarantees": "ATLEAST_ONCE",
    "producerConfig": {
        "batchBuilder": "",
        "cryptoConfig": null,
        "maxPendingMessages": null,
        "maxPendingMessagesAcrossPartitions": null,
        "useThreadLocalProducers": false
    },
    "resources": {
        "cpu": 0.25,
        "disk": 1000000000,
        "ram": 1000000000
    },
    "runtimeFlags": null,
    "schemaType": null,
    "secrets": null,
    "serdeClassName": null,
    "tenant": "testcreate",
    "topicName": "persistent://testcreate/ns0/t1"

Get Sink Details

  • Curl

  • Result

curl --fail --location --request GET "$WEB_SERVICE_URL/admin/v3/sinks/$TENANT/$NAMESPACE/$SINK" --header "Authorization: Bearer $PULSAR_TOKEN" | python3 -mjson.tool
{
    "archive": "builtin://data-generator",
    "autoAck": true,
    "className": "org.apache.pulsar.io.datagenerator.DataGeneratorPrintSink",
    "cleanupSubscription": false,
    "configs": {},
    "customRuntimeOptions": "internal_data",
    "deadLetterTopic": null,
    "inputSpecs": {
        "persistent://testcreate/ns0/tp1": {
            "consumerProperties": {},
            "cryptoConfig": null,
            "poolMessages": false,
            "receiverQueueSize": null,
            "regexPattern": false,
            "schemaProperties": {},
            "schemaType": null,
            "serdeClassName": null
        }
    },
    "inputs": [
        "persistent://testcreate/ns0/tp1"
    ],
    "maxMessageRetries": null,
    "name": "mysink1",
    "namespace": "ns0",
    "negativeAckRedeliveryDelayMs": null,
    "parallelism": 1,
    "processingGuarantees": "ATLEAST_ONCE",
    "resources": {
        "cpu": 0.15,
        "disk": 500000000,
        "ram": 400000000
    },
    "retainKeyOrdering": false,
    "retainOrdering": false,
    "runtimeFlags": null,
    "secrets": null,
    "sourceSubscriptionName": null,
    "sourceSubscriptionPosition": "Latest",
    "tenant": "testcreate",
    "timeoutMs": 5000,
    "topicToSchemaProperties": null,
    "topicToSchemaType": null,
    "topicToSerdeClassName": null,
    "topicsPattern": null,
    "transformFunction": null,
    "transformFunctionClassName": null,
    "transformFunctionConfig": null

Start a Source Connector

  • Curl

  • Result

curl --fail --location --request POST "$WEB_SERVICE_URL/admin/v3/sources/$TENANT/$NAMESPACE/$SOURCE/start" --header "Authorization: Bearer $PULSAR_TOKEN"
Output: No reply means successful.

Start a Sink

  • Curl

  • Result

curl --fail --location --request POST "$WEB_SERVICE_URL/admin/v3/sinks/$TENANT/$NAMESPACE/$SINK/start" --header "Authorization: Bearer $PULSAR_TOKEN"
Output: No reply means successful.

Stop a Source Connector

  • Curl

  • Result

curl --fail --location --request POST "$WEB_SERVICE_URL/admin/v3/sources/$TENANT/$NAMESPACE/$SOURCE/stop" --header "Authorization: Bearer $PULSAR_TOKEN"
Output: No reply means successful.

Stop a Sink Connector

  • Curl

  • Result

curl --fail --location --request POST "$WEB_SERVICE_URL/admin/v3/sinks/$TENANT/$NAMESPACE/$SINK/stop" --header "Authorization: Bearer $PULSAR_TOKEN"
Output: No reply means successful.

Create a Source Connector

  • Curl

  • Result

curl --fail --location --request POST "$WEB_SERVICE_URL/admin/v3/sources/$TENANT/$NAMESPACE/$SOURCE" --header "Authorization: Bearer $PULSAR_TOKEN" --form "sourceConfig=@mynetty-source-config.json;type=application/json"
Output: No reply means successful.

In the example above, a configuration file is provided as input to CURL. The file is named "mynetty-source-config.json", which has the following context for the built-in “netty" source connector in Astra Streaming.

Output: No reply means successful.

The CURL parameter “@" indicates an input file. When executing the CURL command, ensure the input file is accessible, and in the proper directory for reading.

Delete a Source Connector

  • Curl

  • Result

curl --fail --location --request DELETE "$WEB_SERVICE_URL/admin/v3/sources/$TENANT/$NAMESPACE/$SOURCE" --header "Authorization: Bearer $PULSAR_TOKEN"
Output: No reply means successful.

Create a Sink Connector

  • Curl

  • Result

curl --fail --location --request POST "$WEB_SERVICE_URL/admin/v3/sinks/$TENANT/$NAMESPACE/$SINK" --header "Authorization: Bearer $PULSAR_TOKEN" --form "sinkConfig=@mykafka-sink-config.json;type=application/json"
Output: No reply means successful.

In the example above, a configuration file is provided as input to CURL. The file is named mykafka-sink-config.json which has the following context for the built-in “kafka" source connector in Astra Streaming.

{
    "tenant": "testcreate",
    "namespace": "ns0",
    "name": "mykafkaconnector",
    "archive": "builtin://kafka",
    "parallelism": 1,
    "autoAck": true,
    "cleanupSubscription": false,
    "configs": {
      "acks": "1",
      "batchSize": "16384",
      "bootstrapServers": "localhost:55200,localhost:55201",
      "maxRequestSize": "1048576",
      "producerConfigProperties": {
        "client.id": "astra-streaming-client",
        "sasl.jaas.config": "sensitive_data_removed",
        "sasl.mechanism": "PLAIN",
        "sasl.password": "sensitive_data_removed",
        "sasl.username": "myuserid",
        "security.protocol": "SASL_SSL"
      },
      "topic": "mykafka-topic"
    },
    "inputs": [ "persistent://testcreate/ns0/mytopic3" ]

The CURL parameter “@" indicates an input file. When executing the CURL command, ensure the input file is accessible, and in the proper directory for reading.

Delete a Sink Connector

  • Curl

  • Result

curl --fail --location --request DELETE "$WEB_SERVICE_URL/admin/v3/sinks/$TENANT/$NAMESPACE/$SINK" --header "Authorization: Bearer $PULSAR_TOKEN"
Output: No reply means successful.

Was this helpful?

Give Feedback

How can we improve the documentation?

© 2024 DataStax | Privacy policy | Terms of use

Apache, Apache Cassandra, Cassandra, Apache Tomcat, Tomcat, Apache Lucene, Apache Solr, Apache Hadoop, Hadoop, Apache Pulsar, Pulsar, Apache Spark, Spark, Apache TinkerPop, TinkerPop, Apache Kafka and Kafka are either registered trademarks or trademarks of the Apache Software Foundation or its subsidiaries in Canada, the United States and/or other countries. Kubernetes is the registered trademark of the Linux Foundation.

General Inquiries: +1 (650) 389-6000, info@datastax.com