Produce and consume messages with the Pulsar CLIs and Astra Streaming
You can use the pulsar-client and pulsar-admin CLIs to interact with your Astra Streaming tenants.
Prerequisites
-
Optional: Set environment variables for common values, for example:
TOPIC="TOPIC_NAME" NAMESPACE="NAMESPACE_NAME" TENANT="TENANT_NAME" SINK_NAME="SINK_CONNECTOR_NAME" SOURCE_NAME="SOURCE_CONNECTOR_NAME"
Use the pulsar-client CLI
You can use the pulsar-client CLI to produce and consume messages in your Astra Streaming tenants.
For example:
-
Produce a new message on the given topic:
./bin/pulsar-client produce \ "$TENANT/$NAMESPACE/$TOPIC" \ --messages "Hi there" \ --num-produce 1Your topic now has a new, unacknowledged message.
-
Create a consumer to retrieve and acknowledge the message:
./bin/pulsar-client consume \ "$TENANT/$NAMESPACE/$TOPIC" \ --subscription-name "examples-subscriber" \ --num-messages 1 -
Make sure the output includes your message:
# consumer message output ----- got message ----- key:[null], properties:[], content:Hi there 2023-01-09T14:32:40,502-0500 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConsumerImpl - [new-tenant-from-cli/default/demo] [examples-subscriber] Closed consumer 2023-01-09T14:32:40,504-0500 [main] INFO org.apache.pulsar.client.impl.PulsarClientImpl - Client closing. URL: pulsar+ssl://pulsar-aws-useast2.streaming.datastax.com:6651 2023-01-09T14:32:40,514-0500 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ClientCnx - [id: 0x08d05240, L:/192.168.50.167:52883 ! R:pulsar-aws-useast2.streaming.datastax.com/3.138.177.230:6651] Disconnected 2023-01-09T14:32:40,520-0500 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ClientCnx - [id: 0x934157ae, L:/192.168.50.167:52884 ! R:pulsar-aws-useast2.streaming.datastax.com/3.138.177.230:6651] Disconnected 2023-01-09T14:32:42,613-0500 [main] INFO org.apache.pulsar.client.cli.PulsarClientTool - 1 messages successfully consumed
Use the pulsar-admin CLI
You can use the pulsar-admin CLI to manage topics, namespaces, connectors, and functions in your Astra Streaming tenants.
pulsar-admin CLI sink connector operations
Although you use the same base commands to create and update all Astra Streaming Pulsar connectors, each connector has different configuration options. For configuration details, see the documentation for your preferred sink connectors.
- Get available sink connectors
-
Get a list of sink connectors that are available in your Astra Streaming Pulsar tenant:
./bin/pulsar-admin sinks available-sinks - Get sink connector configuration data
-
Get the configuration for an existing sink connector:
# Get information about a connector ./bin/pulsar-admin sinks get \ --namespace "$NAMESPACE" \ --name "$SINK_NAME" \ --tenant "$TENANT" - Start a sink connector
-
# Start all instances of a connector ./bin/pulsar-admin sinks start \ --namespace "$NAMESPACE" \ --name "$SINK_NAME" \ --tenant "$TENANT" # optionally add --instance-id to only start an individual instance - Stop a sink connector
-
# Stop all instances of a connector ./bin/pulsar-admin sinks stop \ --namespace "$NAMESPACE" \ --name "$SINK_NAME" \ --tenant "$TENANT" # optionally add --instance-id to only stop an individual instance - Restart a sink connector
-
# Restart all instances of a connector ./bin/pulsar-admin sinks restart \ --namespace "$NAMESPACE" \ --name "$SINK_NAME" \ --tenant "$TENANT" # optionally add --instance-id to only restart an individual instance - Get sink connector status
-
# Check connector status ./bin/pulsar-admin sinks status \ --instance-id "$SINK_INSTANCE_ID" \ --namespace "$NAMESPACE" \ --name "$SINK_NAME" \ --tenant "$TENANT" - Delete a sink connector
-
# Delete all instances of a connector ./bin/pulsar-admin sinks delete \ --namespace "$NAMESPACE" \ --name "$SINK_NAME" \ --tenant "$TENANT"
pulsar-admin CLI source connector operations
Although you use the same base commands to create and update all Astra Streaming Pulsar connectors, each connector has different configuration options. For configuration details, see the documentation for your preferred source connectors.
- Get available source connectors
-
Get a list of source connectors that are available in your Astra Streaming Pulsar tenant:
./bin/pulsar-admin sources available-sources - Get source connector configuration data
-
Get the configuration for an existing source connector:
# Get information about connector ./bin/pulsar-admin sources get \ --namespace "$NAMESPACE" \ --name "$SOURCE_NAME" \ --tenant "$TENANT" - Start a source connector
-
# Start all instances of a connector ./bin/pulsar-admin sources start \ --namespace "$NAMESPACE" \ --name "$SOURCE_NAME" \ --tenant "$TENANT" # optionally add --instance-id to only start an individual instance - Stop a source connector
-
# Stop all instances of a connector ./bin/pulsar-admin sources stop \ --namespace "$NAMESPACE" \ --name "$SOURCE_NAME" \ --tenant "$TENANT" # optionally add --instance-id to only stop an individual instance - Restart a source connector
-
# Restart all instances of a connector ./bin/pulsar-admin sources restart \ --namespace "$NAMESPACE" \ --name "$SOURCE_NAME" \ --tenant "$TENANT" # optionally add --instance-id to only restart an individual instance - Get source connector status
-
# Check connector status ./bin/pulsar-admin sources status \ --instance-id "$SOURCE_INSTANCE_ID" \ --namespace "$NAMESPACE" \ --name "$SOURCE_NAME" \ --tenant "$TENANT" - Delete a source connector
-
# Delete all instances of a connector ./bin/pulsar-admin sources delete \ --namespace "$NAMESPACE" \ --name "$SOURCE_NAME" \ --tenant "$TENANT"