API operations

This Astra Streaming workbook is a comprehensive guide that provides detailed examples and practices for managing the Astra Streaming platform using the DevOps APIs. It provides details on the most commonly used APIs for managing Astra Streaming and Pulsar instances. These details include required parameters and the expected output from the API. The workbook is designed to fill the gap between detailed API reference docs and HowTo guides. The result is to help customers in operating and managing Astra Streaming and provide guidance on how to use DevOps API to automate many common tasks.

The workbook covers a wide range of topics, including provisioning of resources, monitoring, and troubleshooting. It provides instructions for various operations, such as creating a new tenant, namespace, topics, geo-replication, and access tokens, to setting up monitoring and alerting, and troubleshooting common issues.

Overall, this Astra Streaming Workbook is a valuable resource for customers who want to leverage the benefits of Astra Streaming and manage their streaming environment effectively with DevOps APIs. By following the best practices and guidelines outlined in the workbook, customers can ensure that their streaming applications are secure, performant, and reliable.

Prerequisites

  • An active Astra DB account with access to Astra Streaming

  • Credentials and values required to form HTTP requests

Set environment variables

Due to their frequency in Astra Streaming API calls, you might find it helpful to set the following environment variables:

export PULSAR_TOKEN="PULSAR_TOKEN"
export WEB_SERVICE_URL="PULSAR_WEB_SERVICE_URL"
export NAMESPACE="STREAMING_NAMESPACE_NAME"
export TENANT="STREAMING_TENANT_NAME"
export TOPIC="STREAMING_TOPIC_NAME"
export NUM_OF_PARTITIONS="PARTITIONS"
export SUBSCRIPTION="SUBSCRIPTION_NAME"
export INSTANCE="TENANT_INSTANCE_NAME"
export SOURCE="SOURCE_CONNECTOR_NAME"
export SINK="SINK_NAME"
export FUNCTION="FUNCTION_DISPLAY_NAME"
export TOKENID="PULSAR_TOKEN_UUID"
export ASTRA_TOKEN="ASTRA_DB_APPLICATION_TOKEN"

The examples in this guide use environment variables for these values.

Additionally, some examples in this guide use python3 -mjson.tool to format the JSON response. This is optional; it is not required to execute API requests.

Astra Streaming DevOps API tenant operations

The following examples demonstrate how to use the Astra Streaming DevOps API to manage Astra Streaming resources.

Get tenant information

curl --location --request GET  'https://api.astra.datastax.com/v2/streaming/tenants' --header "Authorization: Bearer $ASTRA_TOKEN" | python3 -mjson.tool
Result
[
    {
        "id": "14b77c47-bdfd-4ba1. . .",
        "tenantName": "mytenant",
        "clusterName": "pulsar-aws-useast2",
        "webServiceUrl": "https://pulsar-aws-useast2",
        "brokerServiceUrl": "pulsar+ssl://pulsar-aws-useast2:6651",
        "websocketUrl": "wss://pulsar-aws-useast2:8001/ws/v2",
        "websocketQueryParamUrl": "wss://pulsar-aws-useast2:8964/ws/v2",
        "pulsarToken": "eyJhbGciOiJSUzI1NiIsInR5cCI6IkpX…..",
        "plan": "payg",
        "planCode": "1",
        "astraOrgGUID": "b282a256-b129-......",
        "cloudProvider": "aws",
        "cloudProviderCode": "1",
        "cloudRegion": "useast2",
        "status": "active",
        "jvmVersion": "JDK11",
        "pulsarVersion": "2.10.2",
        "regionZone": "na",
        "Email": "",
        "userMetricsUrl": "https://prometheus-aws-useast2….",
        "pulsarInstance": "prod0"
    },
    {
        "id": "e8bf25d8-a6a1-4169-. . .",
        "tenantName": "mytenant2",
        "clusterName": "pulsar-gcp-useast1",
        "webServiceUrl": "https://pulsar-gcp-useast1",
        "brokerServiceUrl": "pulsar+ssl://pulsar-gcp-useast1:6651",
        "websocketUrl": "wss://pulsar-gcp-useast1m:8001/ws/v2",
        "websocketQueryParamUrl": "wss://pulsar-gcp-useast1:8964/ws/v2",
        "pulsarToken": "eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.ey. . .",
        "plan": "payg",
        "planCode": "1",
        "astraOrgGUID": "b282a256-b129-43e9. . .",
        "cloudProvider": "gcp",
        "cloudProviderCode": "2",
        "cloudRegion": "useast1",
        "status": "active",
        "jvmVersion": "JDK11",
        "pulsarVersion": "2.10.2",
        "regionZone": "na",
        "Email": "",
        "userMetricsUrl": "https://prometheus-gcp-useast1. . .",
        "pulsarInstance": "prod0"
    }
]

Get Astra Streaming cloud providers and regions

curl --location --request GET  'https://api.astra.datastax.com/v2/streaming/providers' --header "Authorization: Bearer $ASTRA_TOKEN" | python3 -mjson.tool
Result
{
    "aws": [
        "useast1",
        "uswest2",
        "useast2"
    ],
    "azure": [
        "westus2",
        "eastus",
        "australiaeast"
    ],
    "gcp": [
        "useast1",
        "uscentral1",
        "australiase1",
        "europewest1",
        "useast4"
    ]
}

Create a tenant

Create a tenant:

curl --location --request POST 'https://api.astra.datastax.com/v2/streaming/tenants' --header 'Content-Type: application/json' --header "Authorization: Bearer $ASTRA_TOKEN" --data-raw '{
 	"cloudProvider": "aws",
 	"cloudRegion": "useast2",
 	"tenantName": "mytenant",
 	"userEmail": "joshua@example.com"
 }' | python3 -mjson.tool

Create a tenant with file input:

curl --fail --location --request POST 'https://api.astra.datastax.com/v2/streaming/tenants' --header 'Content-Type: application/json' --header "Authorization: Bearer $ASTRA_TOKEN" --data "@mytenant-config.json" | python3 -mjson.tool
Result

The output includes the "pulsarToken" which is the JWT for this Pulsar instance.

{
    "namespace": "default",
    "topic": "",
    "id": "",
    "tenantName": "mytenant",
    "clusterName": "pulsar-aws-useast2",
    "webServiceUrl": "https://pulsar-aws-useast2",
    "brokerServiceUrl": "pulsar+ssl://pulsar-aws-useast2:6651",
    "websocketUrl": "wss://pulsar-aws-useast2:8001/ws/v2",
    "websocketQueryParamUrl": "wss://pulsar-aws-useast2:8964/ws/v2",
    "pulsarToken": "eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9. . .",
    "plan": "payg",
    "planCode": "",
    "astraOrgGUID": "b282a256-b129-43e9. . .",
    "cloudProvider": "aws",
    "cloudProviderCode": "",
    "cloudRegion": "useast2",
    "status": "active",
    "jvmVersion": "JDK11",
    "pulsarVersion": "2.10.2",
    "regionZone": "",
    "Email": "",
    "userMetricsUrl": "",
    "pulsarInstance": ""
}

Delete a tenant

curl --location --request DELETE 'https://api.astra.datastax.com/v2/streaming/tenants/{tenant}/clusters/{cluster}' --header 'Content-Type: application/json' --header "Authorization: Bearer $ASTRA_TOKEN"

No response indicates success.

Puslar API namespace operations

To manage Astra Streaming namespaces, use the Pulsar REST APIs.

Get existing namespaces

curl --location --request GET “https://$WEB_SERVICE_URL/admin/v2/namespaces/$TENANT" --header "Authorization: Bearer $PULSAR_TOKEN" | python3 -mjson.tool
Result
[
    "mytenant/default",
    "mytenant/mynamespace"
]

Create a namespace

curl -sS --fail --location --request PUT --header "Authorization: Bearer $PULSAR_TOKEN" "$WEB_SERVICE_URL/admin/v2/namespaces/$TENANT/$NAMESPACE"
Result
Output: No reply means successful.

Delete a namespace

curl -sS --fail --location --request DELETE --header "Authorization: Bearer $PULSAR_TOKEN" "$WEB_SERVICE_URL/admin/v2/namespaces/$TENANT/$NAMESPACE"

No response indicates success.

Get namespace message retention

curl -sS --fail --location --request GET "$WEB_SERVICE_URL/admin/v2/namespaces/$TENANT/$NAMESPACE/retention" --header "Authorization: Bearer $PULSAR_TOKEN" | python3 -mjson.tool
Result
{
    "retentionTimeInMinutes": 0,
    "retentionSizeInMB": 0
}

Set namespace message retention

curl --location "$WEB_SERVICE_URL/admin/v2/namespaces/$TENANT/$NAMESPACE/retention" --header 'Content-Type: application/json' --header "Authorization: Bearer $PULSAR_TOKEN" --data '{
    "retentionTimeInMinutes": 360,
    "retentionSizeInMB": 102
}'

No response indicates success.

Get namespace backlog quota

curl -sS --fail --location --request GET "$WEB_SERVICE_URL/admin/v2/namespaces/$TENANT/$NAMESPACE/backlogQuotaMap" --header "Authorization: Bearer $PULSAR_TOKEN" | python3 -mjson.tool
Result
{
    "destination_storage": {
        "limit": -1,
        "limitSize": 102400,
        "limitTime": 3600,
        "policy": "producer_exception"
    }
}

Set namespace backlog quota settings

curl -sS --fail --location --request POST  "$WEB_SERVICE_URL/admin/v2/namespaces/$TENANT/$NAMESPACE/backlogQuota" --header "Authorization: Bearer $PULSAR_TOKEN" --header 'Content-Type: application/json' --data '{
"limit": -1,
"limitSize": 102400,
"limitTime": 3600,
"policy": "producer_exception"
}'

No response indicates success.

Get namespace message TTL

curl -sS --fail --location --request GET "$WEB_SERVICE_URL/admin/v2/namespaces/$TENANT/$NAMESPACE/messageTTL" --header "Authorization: Bearer $PULSAR_TOKEN" | python3 -mjson.tool

The response is a number, such as 3600.

Set namespace message TTL

curl -sS --fail --location --request POST  "$WEB_SERVICE_URL/admin/v2/namespaces/$TENANT/$NAMESPACE/messageTTL" --header "Authorization: Bearer $PULSAR_TOKEN" --header 'Content-Type: application/json' --data 3600

No response indicates success.

Set namespace AutoTopicCreation

Input parameter “topicType" should be either “non-partitioned" or “partitioned".

curl -sS --fail --location --request POST --header "Authorization: Bearer $PULSAR_TOKEN"  "$WEB_SERVICE_URL/admin/v2/namespaces/$TENANT/$NAMESPACE/autoTopicCreation" --header 'Content-Type: application/json' --data '{
  "allowAutoTopicCreation": false,
  "topicType": "non-partitioned"
}'

No response indicates success.

Get namespace MaxConsumersPerTopic

curl -sS --fail --location --request GET "$WEB_SERVICE_URL/admin/v2/namespaces/$TENANT/$NAMESPACE/maxConsumersPerTopic" --header "Authorization: Bearer $PULSAR_TOKEN" | python3 -mjson.tool

The response is a number, such as 50.

Set namespace MaxConsumersPerTopic

curl -sS --fail --location --request POST "$WEB_SERVICE_URL/admin/v2/namespaces/$TENANT/$NAMESPACE/maxConsumersPerTopic" --header "Authorization: Bearer $PULSAR_TOKEN" --header 'Content-Type: application/json' --data 100

Get namespace MaxTopicPerNamespace

curl -sS --fail --location --request GET "$WEB_SERVICE_URL/admin/v2/namespaces/$TENANT/$NAMESPACE/maxTopicsPerNamespace" --header "Authorization: Bearer $PULSAR_TOKEN" | python3 -mjson.tool

The response is a number.

Set namespace MaxTopicPerNamespace

curl -sS --fail --location  "$WEB_SERVICE_URL/admin/v2/namespaces/$TENANT/$NAMESPACE/maxTopicsPerNamespace" --header 'Content-Type: application/json' --header "Authorization: Bearer $PULSAR_TOKEN" --data 1000

The response is a number.

Pulsar Admin API topic operations

Get topics in a namespace

curl -sS --fail --location --request GET "$WEB_SERVICE_URL/admin/v2/persistent/$TENANT/$NAMESPACE" --header "Authorization: Bearer $PULSAR_TOKEN" | python3 -mjson.tool
Result
[
    "persistent://testtenant/ns0/mytopic-partition-0",
    "persistent://testtenant/ns0/mytopic-partition-1",
    "persistent://testtenant/ns0/topic1",
    "persistent://testtenant/ns0/topic2",
    "persistent://testtenant/ns0/tp1-partition-0",
    "persistent://testtenant/ns0/tp1-partition-1",
    "persistent://testtenant/ns0/tp1-partition-2",
    "persistent://testtenant/ns0/tp1-partition-3"
]

Create non-partitioned topic

curl -sS --fail --location --request PUT "$WEB_SERVICE_URL/admin/v2/persistent/$TENANT/$NAMESPACE
/$TOPIC" --header "Authorization: Bearer $PULSAR_TOKEN"

No response indicates success.

Create partitioned topic

curl -sS --fail --location --request PUT "$WEB_SERVICE_URL/admin/v2/persistent/$TENANT/$NAMESPACE/$TOPIC/partitions" --header "Authorization: Bearer $PULSAR_TOKEN" --header "Content-Type: application/json" --data $NUM_OF_PARTITIONS

No response indicates success.

Delete a persistent topic

curl -sS --fail --location --request DELETE"$WEB_SERVICE_URL/admin/v2/persistent/$TENANT/$NAMESPACE/$TOPIC/partitions" --header "Authorization: Bearer $PULSAR_TOKEN"

No response indicates success.

Get InternalStats of non-partitioned topic

curl -sS --fail --location --request GET "$WEB_SERVICE_URL/admin/v2/persistent/$TENANT/$NAMESPACE/$TOPIC/internalStats" --header "Authorization: Bearer $PULSAR_TOKEN" | python3 -mjson.tool
Result
{
    "entriesAddedCounter": 0,
    "numberOfEntries": 0,
    "totalSize": 0,
    "currentLedgerEntries": 0,
    "currentLedgerSize": 0,
    "lastLedgerCreatedTimestamp": "2023-04-25T15:35:45.136Z",
    "waitingCursorsCount": 0,
    "pendingAddEntriesCount": 0,
    "lastConfirmedEntry": "275812:-1",
    "state": "LedgerOpened",
    "ledgers": [
        {
            "ledgerId": 275812,
            "entries": 0,
            "size": 0,
            "offloaded": false,
            "underReplicated": false
        }
    ],
    "cursors": {},
    "schemaLedgers": [],
    "compactedLedger": {
        "ledgerId": -1,
        "entries": -1,
        "size": -1,
        "offloaded": false,
        "underReplicated": false
    }
}

Get stats of a non-partitioned topic

curl -sS --fail --location --request GET "$WEB_SERVICE_URL/admin/v2/persistent/$TENANT/$NAMESPACE/$TOPIC/stats" --header "Authorization: Bearer $PULSAR_TOKEN" | python3 -mjson.tool
Result
{
    "msgRateIn": 0.0,
    "msgThroughputIn": 0.0,
    "msgRateOut": 0.0,
    "msgThroughputOut": 0.0,
    "bytesInCounter": 0,
    "msgInCounter": 0,
    "bytesOutCounter": 0,
    "msgOutCounter": 0,
    "averageMsgSize": 0.0,
    "msgChunkPublished": false,
    "storageSize": 0,
    "backlogSize": 0,
    "publishRateLimitedTimes": 0,
    "earliestMsgPublishTimeInBacklogs": 0,
    "offloadedStorageSize": 0,
    "lastOffloadLedgerId": 0,
    "lastOffloadSuccessTimeStamp": 0,
    "lastOffloadFailureTimeStamp": 0,
    "publishers": [],
    "waitingPublishers": 0,
    "subscriptions": {},
    "replication": {},
    "deduplicationStatus": "Disabled",
    "nonContiguousDeletedMessagesRanges": 0,
    "nonContiguousDeletedMessagesRangesSerializedSize": 0,
    "compaction": {
        "lastCompactionRemovedEventCount": 0,
        "lastCompactionSucceedTimestamp": 0,
        "lastCompactionFailedTimestamp": 0,
        "lastCompactionDurationTimeInMills": 0
    }
    ...TRUNCATED FOR READABILITY...
}

Get stats of a partitioned topic

curl -sS --fail --location --request GET "$WEB_SERVICE_URL/admin/v2/persistent/$TENANT/$NAMESPACE/$TOPIC/partitioned-stats" --header "Authorization: Bearer $PULSAR_TOKEN" | python3 -mjson.tool
Result
{
    "msgRateIn": 0.0,
    "msgThroughputIn": 0.0,
    "msgRateOut": 0.0,
    "msgThroughputOut": 0.0,
    "bytesInCounter": 0,
    "msgInCounter": 0,
    "bytesOutCounter": 0,
    "msgOutCounter": 0,
    "averageMsgSize": 0.0,
    "msgChunkPublished": false,
    "storageSize": 0,
    "backlogSize": 0,
    "publishRateLimitedTimes": 0,
    "earliestMsgPublishTimeInBacklogs": 0,
    "offloadedStorageSize": 0,
    "lastOffloadLedgerId": 0,
    "lastOffloadSuccessTimeStamp": 0,
    "lastOffloadFailureTimeStamp": 0,
    "publishers": [],
    "waitingPublishers": 0,
    "subscriptions": {},
    "replication": {},
    "nonContiguousDeletedMessagesRanges": 0,
    "nonContiguousDeletedMessagesRangesSerializedSize": 0,
    "compaction": {
        "lastCompactionRemovedEventCount": 0,
        "lastCompactionSucceedTimestamp": 0,
        "lastCompactionFailedTimestamp": 0,
        "lastCompactionDurationTimeInMills": 0
    },
    "metadata": {
        "partitions": 2,
        "deleted": false
    },
    "partitions": {
        "persistent://testcreate/ns0/mytopic-partition-1": {
            "msgRateIn": 0.0,
            "msgThroughputIn": 0.0,
            "msgRateOut": 0.0,
            "msgThroughputOut": 0.0,
            "bytesInCounter": 0,
            "msgInCounter": 0,
            "bytesOutCounter": 0,
            "msgOutCounter": 0,
            "averageMsgSize": 0.0,
            "msgChunkPublished": false,
            "storageSize": 0,
            "backlogSize": 0,
            "publishRateLimitedTimes": 0,
            "earliestMsgPublishTimeInBacklogs": 0,
            "offloadedStorageSize": 0,
            "lastOffloadLedgerId": 0,
            "lastOffloadSuccessTimeStamp": 0,
            "lastOffloadFailureTimeStamp": 0,
            "publishers": [],
            "waitingPublishers": 0,
            "subscriptions": {},
            "replication": {},
            "deduplicationStatus": "Disabled",
            "nonContiguousDeletedMessagesRanges": 0,
            "nonContiguousDeletedMessagesRangesSerializedSize": 0,
            "compaction": {
                "lastCompactionRemovedEventCount": 0,
                "lastCompactionSucceedTimestamp": 0,
                "lastCompactionFailedTimestamp": 0,
                "lastCompactionDurationTimeInMills": 0
            }
        },
        "persistent://testcreate/ns0/mytopic-partition-0": {
            "msgRateIn": 0.0,
            "msgThroughputIn": 0.0,
            "msgRateOut": 0.0,
            "msgThroughputOut": 0.0,
            "bytesInCounter": 0,
            "msgInCounter": 0,
            "bytesOutCounter": 0,
            "msgOutCounter": 0,
            "averageMsgSize": 0.0,
            "msgChunkPublished": false,
            "storageSize": 0,
            "backlogSize": 0,
            "publishRateLimitedTimes": 0,
            "earliestMsgPublishTimeInBacklogs": 0,
            "offloadedStorageSize": 0,
            "lastOffloadLedgerId": 0,
            "lastOffloadSuccessTimeStamp": 0,
            "lastOffloadFailureTimeStamp": 0,
            "publishers": [],
            "waitingPublishers": 0,
            "subscriptions": {},
            "replication": {},
            "deduplicationStatus": "Disabled",
            "nonContiguousDeletedMessagesRanges": 0,
            "nonContiguousDeletedMessagesRangesSerializedSize": 0,
            "compaction": {
                "lastCompactionRemovedEventCount": 0,
                "lastCompactionSucceedTimestamp": 0,
                "lastCompactionFailedTimestamp": 0,
                "lastCompactionDurationTimeInMills": 0
            }
        }
    }
    ...TRUNCATED FOR READABILITY...
}

Get stats of all topics

curl -sS --fail --location --request GET "$WEB_SERVICE_URL/admin/v2/stats/topics/$TENANT/$NAMESPACE" --header "Authorization: Bearer $PULSAR_TOKEN" | python3 -mjson.tool
Result
{
    "persistent://testcreate/ns0/mytopic3": {
        "name": "persistent://testcreate/ns0/mytopic3",
        "totalMessagesIn": 0,
        "totalMessagesOut": 0,
        "totalBytesIn": 0,
        "totalBytesOut": 0,
        "msgRateIn": 0,
        "msgRateOut": 0,
        "throughputIn": 0,
        "throughputOut": 0,
        "subscriptionCount": 0,
        "producerCount": 0,
        "consumerCount": 0,
        "subscriptionDelayed": 0,
        "storageSize": 0,
        "backlogStorageByteSize": 0,
        "msgBacklogNumber": 0,
        "updatedAt": "2023-04-25T16:00:24.252397617Z"
    },
    "persistent://testcreate/ns0/t1": {
        "name": "persistent://testcreate/ns0/t1",
        "totalMessagesIn": 0,
        "totalMessagesOut": 0,
        "totalBytesIn": 0,
        "totalBytesOut": 0,
        "msgRateIn": 0,
        "msgRateOut": 0,
        "throughputIn": 0,
        "throughputOut": 0,
        "subscriptionCount": 0,
        "producerCount": 0,
        "consumerCount": 0,
        "subscriptionDelayed": 0,
        "storageSize": 0,
        "backlogStorageByteSize": 0,
        "msgBacklogNumber": 0,
        "updatedAt": "2023-04-25T16:00:24.252466612Z"
    },
    "persistent://testcreate/ns0/t1-partition-0": {
        "name": "persistent://testcreate/ns0/t1-partition-0",
        "totalMessagesIn": 516,
        "totalMessagesOut": 514,
        "totalBytesIn": 637776,
        "totalBytesOut": 637674,
        "msgRateIn": 0,
        "msgRateOut": 0,
        "throughputIn": 0,
        "throughputOut": 0,
        "subscriptionCount": 1,
        "producerCount": 0,
        "consumerCount": 0,
        "subscriptionDelayed": 0,
        "storageSize": 1899200,
        "backlogStorageByteSize": 0,
        "msgBacklogNumber": 0,
        "updatedAt": "2023-04-25T16:00:24.252410963Z"
    },
    "persistent://testcreate/ns0/t1-partition-1": {
        "name": "persistent://testcreate/ns0/t1-partition-1",
        "totalMessagesIn": 534,
        "totalMessagesOut": 531,
        "totalBytesIn": 696340,
        "totalBytesOut": 692347,
        "msgRateIn": 0,
        "msgRateOut": 0,
        "throughputIn": 0,
        "throughputOut": 0,
        "subscriptionCount": 1,
        "producerCount": 0,
        "consumerCount": 0,
        "subscriptionDelayed": 0,
        "storageSize": 2020678,
        "backlogStorageByteSize": 2151,
        "msgBacklogNumber": 3,
        "updatedAt": "2023-04-25T16:00:24.252425482Z"
    },
    "persistent://testcreate/ns0/t1-partition-2": {
        "name": "persistent://testcreate/ns0/t1-partition-2",
        "totalMessagesIn": 522,
        "totalMessagesOut": 519,
        "totalBytesIn": 653487,
        "totalBytesOut": 649286,
        "msgRateIn": 0,
        "msgRateOut": 0,
        "throughputIn": 0,
        "throughputOut": 0,
        "subscriptionCount": 1,
        "producerCount": 0,
        "consumerCount": 0,
        "subscriptionDelayed": 0,
        "storageSize": 1916574,
        "backlogStorageByteSize": 0,
        "msgBacklogNumber": 0,
        "updatedAt": "2023-04-25T16:00:24.252438306Z"
    },
    "persistent://testcreate/ns0/t1-partition-3": {
        "name": "persistent://testcreate/ns0/t1-partition-3",
        "totalMessagesIn": 516,
        "totalMessagesOut": 514,
        "totalBytesIn": 631638,
        "totalBytesOut": 631536,
        "msgRateIn": 0,
        "msgRateOut": 0,
        "throughputIn": 0,
        "throughputOut": 0,
        "subscriptionCount": 1,
        "producerCount": 0,
        "consumerCount": 0,
        "subscriptionDelayed": 0,
        "storageSize": 1890920,
        "backlogStorageByteSize": 1586,
        "msgBacklogNumber": 4,
        "updatedAt": "2023-04-25T16:00:24.252452735Z"
        ...TRUNCATED FOR READABILITY...
    }
    ...TRUNCATED FOR READABILITY...
}

Get topic subscriptions

curl -sS --fail --location --request GET "$WEB_SERVICE_URL/admin/v2/persistent/$TENANT/$NAMESPACE/$TOPIC/subscriptions" --header "Authorization: Bearer $PULSAR_TOKEN" | python3 -mjson.tool
Result
[
    "mysub",
    "subscript2"
]

Create a subscription for a topic

Create a replicated or non-replicated subscription. "Replicated=true" can be set to “false" for non-replicated subscriptions.

curl -sS --fail --location --request PUT "$WEB_SERVICE_URL/admin/v2/persistent/$TENANT/$NAMESPACE/$TOPIC/subscription/$SUBSCRIPTION?replicated=true" --header "Authorization: Bearer $PULSAR_TOKEN"  --header "Content-Type: application/json"

No response indicates success.

Delete a subscription for a topic

curl -sS --fail --location --request DELETE"$WEB_SERVICE_URL/admin/v2/persistent/$TENANT/$NAMESPACE/$TOPIC/subscription/$SUBSCRIPTION" --header "Authorization: Bearer $PULSAR_TOKEN"

No response indicates success.

Clear a subscription for a topic

curl -sS --fail --location --request POST "$WEB_SERVICE_URL/admin/v2/persistent/$TENANT/$NAMESPACE/$TOPIC/subscription/$SUBSCRIPTION/skip_all" --header "Authorization: Bearer $PULSAR_TOKEN"

No response indicates success.

Astra Streaming DevOps API geo-replication operations

Get status of geo-replication

curl --location --fail --request GET "https://api.astra.datastax.com/v2/streaming/replications/$INSTANCE/$TENANT/$NAMESPACE"  --header "Authorization: Bearer $ASTRA_TOKEN" | python3 -mjson.tool
Result
{
    "pulsarInstance": "prod0",
    "tenant": "mytenant",
    "namespace": "mynamespace",
    "replications": {
        "pulsar-aws-useast2": [
            "pulsar-aws-uswest2",
            "pulsar-aws-useast2"
        ],
        "pulsar-aws-uswest2": [
            "pulsar-aws-uswest2",
            "pulsar-aws-useast2"
        ]
    },
    "clusters": {
        "pulsar-aws-useast2": {
            "clusterName": "pulsar-aws-useast2",
            "cloudProvider": "aws",
            "cloudRegion": "useast2",
            "clusterType": "cloud",
            "webServiceUrl": "https://pvt-pulsar-aws-useast2:8443",
            "brokerServiceUrl": "pulsar+ssl://pulsar-aws-useast2:6651",
            "websocketUrl": "",
            "pulsarInstance": "prod0",
            "regionZone": ""
        },
        "pulsar-aws-uswest2": {
            "clusterName": "pulsar-aws-uswest2",
            "cloudProvider": "aws",
            "cloudRegion": "uswest2",
            "clusterType": "cloud",
            "webServiceUrl": "https://pvt-pulsar-aws-uswest2:8443",
            "brokerServiceUrl": "pulsar+ssl://pulsar-aws-uswest2:6651",
            "websocketUrl": "",
            "pulsarInstance": "prod0",
            "regionZone": ""
        }
        ...TRUNCATED FOR READABILITY...
    }
}

Create geo-replication between namespaces

The JSON input parameters can be obtained from List Tenants with Details and Get a list cloud providers of Astra Streaming sections of this guide.

curl --location --fail --request POST "https://api.astra.datastax.com/v2/streaming/replications/$INSTANCE/$TENANT/$NAMESPACE"  --header "Content-Type: application/json"  --header "Authorization: Bearer $ASTRA_TOKEN"  --data-raw '{
  "bidirection": true,
  "destCluster": "pulsar-aws-uswest2",
  "email": "joshua@example.com",
  "namespace": "mynamespace",
  "originCluster": "pulsar-aws-useast2"
}'

No response indicates success.

Delete geo-replication between namespaces

The JSON input parameters can be obtained from List Tenants with Details and Get a list cloud providers of Astra Streaming sections of this guide.

curl --location --fail --request DELETE "https://api.astra.datastax.com/v2/streaming/replications/$INSTANCE/$TENANT/$NAMESPACE" \
 --header "Content-Type: application/json" \
 --header "Authorization: Bearer $ASTRA_TOKEN" \
 --data-raw '{
  "bidirection": true,
  "destCluster": "pulsar-aws-uswest2",
  "email": "joshua@example.com",
  "namespace": "ns0",
  "originCluster": "pulsar-aws-useast2"
}'

No response indicates success.

Pulsar Admin API functions operations

List existing functions in a namespace

curl --fail --location --request GET "$WEB_SERVICE_URL/admin/v3/functions/$TENANT/$NAMESPACE" --header "Authorization: Bearer $PULSAR_TOKEN" | python3 -mjson.tool
Result
[
    "testfunction1"
]

Get status of a function

curl --fail --location --request GET "$WEB_SERVICE_URL/admin/v3/functions/$TENANT/$NAMESPACE/$FUNCTION/status" --header "Authorization: Bearer $PULSAR_TOKEN" | python3 -mjson.tool
Result
{
    "numInstances": 1,
    "numRunning": 0,
    "instances": [
        {
            "instanceId": 0,
            "status": {
                "running": false,
                "error": "",
                "numRestarts": 0,
                "numReceived": 0,
                "numSuccessfullyProcessed": 0,
                "numUserExceptions": 0,
                "latestUserExceptions": null,
                "numSystemExceptions": 0,
                "latestSystemExceptions": null,
                "averageLatency": 0.0,
                "lastInvocationTime": 0,
                "workerId": "pulsar-aws-useast2-function-0"
            }
        }
    ]
}

Get stats of a function

curl --fail --location --request GET "$WEB_SERVICE_URL/admin/v3/functions/$TENANT/$NAMESPACE/$FUNCTION/stats" --header "Authorization: Bearer $PULSAR_TOKEN" | python3 -mjson.tool
Result
{
    "receivedTotal": 0,
    "processedSuccessfullyTotal": 0,
    "systemExceptionsTotal": 0,
    "userExceptionsTotal": 0,
    "avgProcessLatency": null,
    "1min": {
        "receivedTotal": 0,
        "processedSuccessfullyTotal": 0,
        "systemExceptionsTotal": 0,
        "userExceptionsTotal": 0,
        "avgProcessLatency": null
    },
    "lastInvocation": null,
    "instances": [
        {
            "instanceId": 0,
            "metrics": {
                "receivedTotal": 0,
                "processedSuccessfullyTotal": 0,
                "systemExceptionsTotal": 0,
                "userExceptionsTotal": 0,
                "avgProcessLatency": null,
                "1min": {
                    "receivedTotal": 0,
                    "processedSuccessfullyTotal": 0,
                    "systemExceptionsTotal": 0,
                    "userExceptionsTotal": 0,
                    "avgProcessLatency": null
                },
                "lastInvocation": null,
                "userMetrics": {}
            }
        }
    ]
}

Get function details

curl --fail --location --request GET "$WEB_SERVICE_URL/admin/v3/functions/$TENANT/$NAMESPACE/$FUNCTION" --header "Authorization: Bearer $PULSAR_TOKEN" | python3 -mjson.tool
Result
{
    "runtimeFlags": null,
    "tenant": "mytenant",
    "namespace": "mynamespace",
    "name": "testfunction1",
    "className": "TransformFunction",
    "inputs": null,
    "customSerdeInputs": null,
    "topicsPattern": null,
    "customSchemaInputs": null,
    "customSchemaOutputs": null,
    "inputSpecs": {
        "testcreate/ns0/tp1": {
            "schemaType": null,
            "serdeClassName": null,
            "schemaProperties": {},
            "consumerProperties": {},
            "receiverQueueSize": null,
            "cryptoConfig": null,
            "poolMessages": false,
            "regexPattern": false
        }
    },
    "output": "mytenant/mynamespace/tp2",
    "producerConfig": {
        "maxPendingMessages": null,
        "maxPendingMessagesAcrossPartitions": null,
        "useThreadLocalProducers": false,
        "cryptoConfig": null,
        "batchBuilder": ""
    },
    "outputSchemaType": null,
    "outputSerdeClassName": null,
    "logTopic": null,
    "processingGuarantees": "ATLEAST_ONCE",
    "retainOrdering": false,
    "retainKeyOrdering": false,
    "batchBuilder": null,
    "forwardSourceMessageProperty": true,
    "userConfig": {
        "steps": [
            {
                "schema-type": "STRING",
                "type": "cast"
            }
        ]
    },
    "secrets": null,
    "runtime": "JAVA",
    "autoAck": true,
    "maxMessageRetries": null,
    "deadLetterTopic": null,
    "subName": null,
    "parallelism": 1,
    "resources": {
        "cpu": 0.25,
        "ram": 1000000000,
        "disk": 1000000000
    },
    "fqfn": null,
    "windowConfig": null,
    "timeoutMs": 11000,
    "jar": null,
    "py": null,
    "go": null,
    "functionType": null,
    "cleanupSubscription": false,
    "customRuntimeOptions": "",
    "maxPendingAsyncRequests": null,
    "exposePulsarAdminClientEnabled": null,
    "subscriptionPosition": "Latest"
}

Start a function

curl --fail --location --request POST "$WEB_SERVICE_URL/admin/v3/functions/$TENANT/$NAMESPACE/$FUNCTION/start" --header "Authorization: Bearer $PULSAR_TOKEN"

No response indicates success.

Stop a function

curl --fail --location --request POST "$WEB_SERVICE_URL/admin/v3/functions/$TENANT/$NAMESPACE/$FUNCTION/stop" --header "Authorization: Bearer $PULSAR_TOKEN"

No response indicates success.

Restart a function

curl --fail --location --request POST "$WEB_SERVICE_URL/admin/v3/functions/$TENANT/$NAMESPACE/$FUNCTION/restart" --header "Authorization: Bearer $PULSAR_TOKEN"

No response indicates success.

Astra Streaming DevOps API JWT operations

List existing token IDs

Get a list of Token IDs for your Cluster. With the TokenID, you can then lookup and obtain the Pulsar JWT string. The TokenIDs are also listed in the Astra Portal for that Tenant and Cluster.

Required parameters "CLUSTER" is obtained from the “List Tenants with Details" API command.

curl --location --request GET "https://api.astra.datastax.com/v2/streaming/tenants/$TENANT/tokens" --header "Authorization: Bearer $ASTRA_TOKEN" --header "X-DataStax-Pulsar-Cluster: $CLUSTER" | python3 -mjson.tool
Result
[
    {
        "iat": 1679335276,
        "iss": "datastax",
        "sub": "client;b282a256-b129-43e9-b870. . .",
        "tokenid": "cdb87797. . ."
    }
]

List token string by ID

curl --fail --location --request GET "https://api.astra.datastax.com/v2/streaming/tenants/$TENANT/tokens/$TOKENID" --header "X-DataStax-Pulsar-Cluster: $CLUSTER" --header "Authorization: Bearer $ASTRA_TOKEN"
Result
Output: Raw string JWT
eyJhbGciOiJSUzI1NiIsI . . .

Create a JWT

Create a new Pulsar JWT. The new JWT will also be visible in the Astra Portal for that Tenant and Cluster.

Required parameters "CLUSTER" is obtained from the “List Tenants with Details" API command.

curl --fail --location --request POST "https://api.astra.datastax.com/v2/streaming/tenants/$TENANT/tokens" --header "X-DataStax-Pulsar-Cluster: $CLUSTER" --header "Authorization: Bearer $ASTRA_TOKEN"
Result
Output: new raw string JWT
eyJhbGciOiJSUzI1NiIsI . . .

Delete a JWT

Required parameters "CLUSTER" is obtained from the “List Tenants with Details" API command. List of "TOKENID" can be obtained from List Existing Tokens IDs.

curl --fail --location --request DELETE "https://api.astra.datastax.com/v2/streaming/tenants/$TENANT/tokens" --header "X-DataStax-Pulsar-Cluster: $CLUSTER" --header "Authorization: Bearer $ASTRA_TOKEN"

No response indicates success.

Pulsar Admin API IO connectors operations

Pulsar Sources and Sinks share a similar API structure for most methods.

List existing sources in a namespace

curl --fail --location --request GET "$WEB_SERVICE_URL/admin/v3/sources/$TENANT/$NAMESPACE" --header "Authorization: Bearer $PULSAR_TOKEN" | python3 -mjson.tool
Result
[
    "mysource1"
]

List existing sinks in a namespace

curl --fail --location --request GET "$WEB_SERVICE_URL/admin/v3/sinks/$TENANT/$NAMESPACE" --header "Authorization: Bearer $PULSAR_TOKEN" | python3 -mjson.tool
Result
[
    "mysink1"
]

Get status of a source

curl --fail --location --request GET "$WEB_SERVICE_URL/admin/v3/sources/$TENANT/$NAMESPACE/$SOURCE/status" --header "Authorization: Bearer $PULSAR_TOKEN" | python3 -mjson.tool
Result
{
    "numInstances": 1,
    "numRunning": 1,
    "instances": [
        {
            "instanceId": 0,
            "status": {
                "running": true,
                "error": "",
                "numRestarts": 0,
                "numReceivedFromSource": 0,
                "numSystemExceptions": 0,
                "latestSystemExceptions": [],
                "numSourceExceptions": 0,
                "latestSourceExceptions": [],
                "numWritten": 0,
                "lastReceivedTime": 0,
                "workerId": "pulsar-aws-useast2-function-0"
            }
        }
    ]
}

Get status of a sink

curl --fail --location --request GET "$WEB_SERVICE_URL/admin/v3/sources/$TENANT/$NAMESPACE/$SINK/status" --header "Authorization: Bearer $PULSAR_TOKEN" | python3 -mjson.tool
Result
{
    "numInstances": 1,
    "numRunning": 0,
    "instances": [
        {
            "instanceId": 0,
            "status": {
                "running": false,
                "error": "",
                "numRestarts": 0,
                "numReadFromPulsar": 0,
                "numSystemExceptions": 0,
                "latestSystemExceptions": null,
                "numSinkExceptions": 0,
                "latestSinkExceptions": null,
                "numWrittenToSink": 0,
                "lastReceivedTime": 0,
                "workerId": "pulsar-useast-function-1"
            }
        }
    ]
}

Get source connector details

curl --fail --location --request GET "$WEB_SERVICE_URL/admin/v3/sources/$TENANT/$NAMESPACE/$SOURCE" --header "Authorization: Bearer $PULSAR_TOKEN" | python3 -mjson.tool
Result
{
    "archive": "builtin://netty",
    "batchBuilder": null,
    "batchSourceConfig": null,
    "className": "org.apache.pulsar.io.netty.NettySource",
    "configs": {
        "host": "127.0.0.1",
        "numberOfThreads": "1",
        "port": "10999",
        "type": "tcp"
    },
    "customRuntimeOptions": "internal_data",
    "name": "mysource",
    "namespace": "ns0",
    "parallelism": 1,
    "processingGuarantees": "ATLEAST_ONCE",
    "producerConfig": {
        "batchBuilder": "",
        "cryptoConfig": null,
        "maxPendingMessages": null,
        "maxPendingMessagesAcrossPartitions": null,
        "useThreadLocalProducers": false
    },
    "resources": {
        "cpu": 0.25,
        "disk": 1000000000,
        "ram": 1000000000
    },
    "runtimeFlags": null,
    "schemaType": null,
    "secrets": null,
    "serdeClassName": null,
    "tenant": "testcreate",
    "topicName": "persistent://testcreate/ns0/t1"
}

Get sink details

curl --fail --location --request GET "$WEB_SERVICE_URL/admin/v3/sinks/$TENANT/$NAMESPACE/$SINK" --header "Authorization: Bearer $PULSAR_TOKEN" | python3 -mjson.tool
Result
{
    "archive": "builtin://data-generator",
    "autoAck": true,
    "className": "org.apache.pulsar.io.datagenerator.DataGeneratorPrintSink",
    "cleanupSubscription": false,
    "configs": {},
    "customRuntimeOptions": "internal_data",
    "deadLetterTopic": null,
    "inputSpecs": {
        "persistent://testcreate/ns0/tp1": {
            "consumerProperties": {},
            "cryptoConfig": null,
            "poolMessages": false,
            "receiverQueueSize": null,
            "regexPattern": false,
            "schemaProperties": {},
            "schemaType": null,
            "serdeClassName": null
        }
    },
    "inputs": [
        "persistent://testcreate/ns0/tp1"
    ],
    "maxMessageRetries": null,
    "name": "mysink1",
    "namespace": "ns0",
    "negativeAckRedeliveryDelayMs": null,
    "parallelism": 1,
    "processingGuarantees": "ATLEAST_ONCE",
    "resources": {
        "cpu": 0.15,
        "disk": 500000000,
        "ram": 400000000
    },
    "retainKeyOrdering": false,
    "retainOrdering": false,
    "runtimeFlags": null,
    "secrets": null,
    "sourceSubscriptionName": null,
    "sourceSubscriptionPosition": "Latest",
    "tenant": "testcreate",
    "timeoutMs": 5000,
    "topicToSchemaProperties": null,
    "topicToSchemaType": null,
    "topicToSerdeClassName": null,
    "topicsPattern": null,
    "transformFunction": null,
    "transformFunctionClassName": null,
    "transformFunctionConfig": null
}

Start a source connector

curl --fail --location --request POST "$WEB_SERVICE_URL/admin/v3/sources/$TENANT/$NAMESPACE/$SOURCE/start" --header "Authorization: Bearer $PULSAR_TOKEN"

No response indicates success.

Start a sink

curl --fail --location --request POST "$WEB_SERVICE_URL/admin/v3/sinks/$TENANT/$NAMESPACE/$SINK/start" --header "Authorization: Bearer $PULSAR_TOKEN"

No response indicates success.

Stop a source connector

curl --fail --location --request POST "$WEB_SERVICE_URL/admin/v3/sources/$TENANT/$NAMESPACE/$SOURCE/stop" --header "Authorization: Bearer $PULSAR_TOKEN"

No response indicates success.

Stop a sink

curl --fail --location --request POST "$WEB_SERVICE_URL/admin/v3/sinks/$TENANT/$NAMESPACE/$SINK/stop" --header "Authorization: Bearer $PULSAR_TOKEN"

No response indicates success.

Create a source connector

curl --fail --location --request POST "$WEB_SERVICE_URL/admin/v3/sources/$TENANT/$NAMESPACE/$SOURCE" --header "Authorization: Bearer $PULSAR_TOKEN" --form "sourceConfig=@mynetty-source-config.json;type=application/json"

No response indicates success.

In the example above, a configuration file is provided as input to CURL. The file is named "mynetty-source-config.json", which has the following context for the built-in “netty" source connector in Astra Streaming.

The curl parameter @ indicates an input file. When executing the curl command, ensure the input file is accessible and in the proper directory for reading.

Delete a source connector

curl --fail --location --request DELETE "$WEB_SERVICE_URL/admin/v3/sources/$TENANT/$NAMESPACE/$SOURCE" --header "Authorization: Bearer $PULSAR_TOKEN"

No response indicates success.

Create a sink

curl --fail --location --request POST "$WEB_SERVICE_URL/admin/v3/sinks/$TENANT/$NAMESPACE/$SINK" --header "Authorization: Bearer $PULSAR_TOKEN" --form "sinkConfig=@mykafka-sink-config.json;type=application/json"

No response indicates success.

In the example above, a configuration file is provided as input to CURL. The file is named mykafka-sink-config.json which has the following context for the built-in “kafka" source connector in Astra Streaming.

{
    "tenant": "testcreate",
    "namespace": "ns0",
    "name": "mykafkaconnector",
    "archive": "builtin://kafka",
    "parallelism": 1,
    "autoAck": true,
    "cleanupSubscription": false,
    "configs": {
      "acks": "1",
      "batchSize": "16384",
      "bootstrapServers": "localhost:55200,localhost:55201",
      "maxRequestSize": "1048576",
      "producerConfigProperties": {
        "client.id": "astra-streaming-client",
        "sasl.jaas.config": "sensitive_data_removed",
        "sasl.mechanism": "PLAIN",
        "sasl.password": "sensitive_data_removed",
        "sasl.username": "myuserid",
        "security.protocol": "SASL_SSL"
      },
      "topic": "mykafka-topic"
    },
    "inputs": [ "persistent://testcreate/ns0/mytopic3" ]
}

The curl parameter @ indicates an input file. When executing the curl command, ensure the input file is accessible and in the proper directory for reading.

Delete a sink

curl --fail --location --request DELETE "$WEB_SERVICE_URL/admin/v3/sinks/$TENANT/$NAMESPACE/$SINK" --header "Authorization: Bearer $PULSAR_TOKEN"

No response indicates success.

Was this helpful?

Give Feedback

How can we improve the documentation?

© 2025 DataStax | Privacy policy | Terms of use

Apache, Apache Cassandra, Cassandra, Apache Tomcat, Tomcat, Apache Lucene, Apache Solr, Apache Hadoop, Hadoop, Apache Pulsar, Pulsar, Apache Spark, Spark, Apache TinkerPop, TinkerPop, Apache Kafka and Kafka are either registered trademarks or trademarks of the Apache Software Foundation or its subsidiaries in Canada, the United States and/or other countries. Kubernetes is the registered trademark of the Linux Foundation.

General Inquiries: +1 (650) 389-6000, info@datastax.com