Use the C# Pulsar client on Astra Streaming
You can produce and consume messages with the C# Pulsar client and Astra Streaming.
For a complete source code example, see the Astra Streaming Examples repository.
Prerequisites
-
An Apache Pulsar™ topic in Astra Streaming
-
A text editor or IDE
Create a console project
Create a new console project, and then add a reference to the dotpulsar client:
dotnet new console \
--output SimpleProducerConsumer \
--framework net6.0
cd SimpleProducerConsumer
dotnet add package DotPulsar --version 2.7.0
Write the script
-
In your new project, open the
Program.csfile, and then remove any existing content from the file. -
Enter the following code, starting at line 1:
Program.csusing DotPulsar; using DotPulsar.Extensions; var serviceUrl = "<REPLACE_WITH_SERVICE_URL>"; var pulsarToken = "<REPLACE_WITH_PULSAR_TOKEN>"; var tenantName = "<REPLACE_WITH_TENANT_NAME>"; var nmspace = "<REPLACE_WITH_NAMESPACE>"; var topicName = "<REPLACE_WITH_TOPIC>"; var topic = $"persistent://{tenantName}/{nmspace}/{topicName}"; await using var client = PulsarClient.Builder() .ServiceUrl(new Uri(serviceUrl)) .Authentication( AuthenticationFactory.Token(pulsarToken) ) .Build();This code creates an instance of
PulsarClient. Notice that different configurations are chained together to choose how connections are handled. -
Provide values for the following variables:
Parameter Definition Where to find the value serviceUrlThe URL to connect to the Pulsar cluster
-
In the Astra Portal header, click Applications, and then select Streaming.
-
Click the name of your tenant, and then click the Connect tab.
-
In the Tenant Details section, get the Broker Service URL.
pulsarTokenThe token for Pulsar cluster authentication
For information about creating Pulsar tokens, see Manage tokens.
tenantNameThe name of your streaming tenant
-
In the Astra Portal header, click Applications, and then select Streaming.
-
Get the tenant name from the Astra Streaming dashboard.
namespaceThe segmented area for certain topics in your streaming tenant
-
In the Astra Portal header, click Applications, and then select Streaming.
-
Click the name of your tenant, and then click the Namespace and Topics tab.
-
Choose the target namespace from the list of namespaces.
topicNameTopic name (not the full name)
-
In the Astra Portal header, click Applications, and then select Streaming.
-
Click the name of your tenant, and then click the Namespace and Topics tab.
-
Expand the target namespace in the list of namespaces to view the names of the topics within. Do not use the Full Name.
-
-
Create a new
producerinstance, and then instruct it to produce a message in a string format. Messages can also be in formats like JSON, byte, and AVRO.Program.csawait using var producer = client.NewProducer(Schema.String) .Topic(topic) .Create(); -
Asynchronously send a single message and wait for acknowledgment:
Program.csawait producer.Send("Hello World"); // Send a message and ignore the returned MessageId Console.WriteLine("Sent message"); -
Create a new
consumerinstance and instruct it to expect messages in string format. The consumer uses broker subscriptions to gather messages.Program.csawait using var consumer = client.NewConsumer(Schema.String) .SubscriptionName("examples-subscription") .Topic(topic) .InitialPosition(SubscriptionInitialPosition.Earliest) .Create();A subscription is set with a subscription name and an initial position. In this example, the subscription starts with the
Earliestunacknowledged message, which is also the first message in the topic. -
Loop through the messages provided to the consumer’s subscription and write their contents:
Program.csvar msgCount = 0; await foreach (var message in consumer.Messages()) { msgCount++; Console.WriteLine($"Received: {message.Value()}"); await consumer.Acknowledge(message); if(msgCount > 0) break; }
Run the script
In your project directory, run the script:
dotnet run
Output such as the following indicates the script succeeded:
Sent message
Received: Hello World