Producing and consuming messages using the C# pulsar client on Astra Streaming

Prerequisites

You will need the following prerequisites in place to complete this guide:

  • .NET 7 SDK installed (install now)

  • A working Pulsar topic (get started here if you don’t have a topic)

  • A basic text editor or IDE

Visit our examples repo to see the complete source of this example.

Create a console project

Create a new console project and add a reference to the dotpulsar client

dotnet new console \
  --output SimpleProducerConsumer \
  --framework net6.0

cd SimpleProducerConsumer

dotnet add package DotPulsar --version 2.7.0

Create a client instance

Create an instance of PulsarClient.

Open the Program.cs file in your favorite text editor and clear its contents.

Notice how we chain different configurations together to choose how connections are handled.

Add the following to the file, starting at line 1:

using DotPulsar;
using DotPulsar.Extensions;

var serviceUrl = "<REPLACE_WITH_SERVICE_URL>";
var pulsarToken = "<REPLACE_WITH_PULSAR_TOKEN>";

var tenantName = "<REPLACE_WITH_TENANT_NAME>";
var nmspace = "<REPLACE_WITH_NAMESPACE>";
var topicName = "<REPLACE_WITH_TOPIC>";

var topic = $"persistent://{tenantName}/{nmspace}/{topicName}";

await using var client = PulsarClient.Builder()
                                      .ServiceUrl(new Uri(serviceUrl))
                                      .Authentication(
                                        AuthenticationFactory.Token(pulsarToken)
                                      )
                                      .Build();

There are a few values that you’ll need to fill in:

serviceUrl

This is the URL for connecting to the Pulsar cluster.

In the Astra Portal, navigate to the "Connect" tab of your streaming tenant. In the "Details" area you will find the "Broker Service URL".

pulsarToken

This is the token used for Pulsar cluster authentications.

In the Astra Portal, navigate to the "Settings" tab of your streaming tenant. Select "Create Token". A new token will be generated and available to copy.

tenantName

The name of your streaming tenant.

In the Astra Portal, navigate to your streaming tenant. In the "Details" area you will find the "Name".

namespace

Within your streaming tenant, this is the segmented area for certain topics.

In the Astra Portal, navigate to the "Namespace And Topics" tab of your streaming tenant to see a list of namespaces.

topicName

Expanding the above chosen namespace will list the topics within. This value is just the topic name (not the "Full Name").

The producer

We’re making progress! Now that we have a Pulsar client connected, we will use that connection to produce a single message.

We will create a new producer instance and instruct it to produce a message in a string format (other examples are JSON, byte[], AVRO, etc).

In the Program.cs file, append the following code just below the client code.

await using var producer = client.NewProducer(Schema.String)
                                 .Topic(topic)
                                 .Create();

Add the following code to asynchronously send a single (string) message and wait for acknowledgment.

_ = await producer.Send("Hello World"); // Send a message and ignore the returned MessageId
Console.WriteLine("Sent message");

The consumer

The final piece to the puzzle - a consumer. What value is a message that’s never consumed!?

We will create a new consumer instance and instruct it to expect messages in a string format. The consumer uses broker subscriptions to gather messages.

The initial position is one of Pulsar’s superpowers: here we choose to start at the "earliest" message, which is the first message added to the topic.

Return to the Program.cs file and append the following code below the producer code.

await using var consumer = client.NewConsumer(Schema.String)
                                 .SubscriptionName("examples-subscription")
                                 .Topic(topic)
                                 .InitialPosition(SubscriptionInitialPosition.Earliest)
                                 .Create();

This code will loop through the messages provided to the consumer’s subscription and write its contents.

var msgCount = 0;
await foreach (var message in consumer.Messages())
{
	msgCount++;
	Console.WriteLine($"Received: {message.Value()}");
	await consumer.Acknowledge(message);
	
	if(msgCount > 0) break;
}

Run the example

The moment of truth…​ running the app. Return to the terminal where you created the dotnet project and run the following command.

dotnet run

You should see output similar to the following.

~/SimpleProducerConsumer$ dotnet run

Sent message
Received: Hello World

Woo-hoo🎉! You did it! You’re on your way to messaging glory.

Was this helpful?

Give Feedback

How can we improve the documentation?

© 2024 DataStax | Privacy policy | Terms of use

Apache, Apache Cassandra, Cassandra, Apache Tomcat, Tomcat, Apache Lucene, Apache Solr, Apache Hadoop, Hadoop, Apache Pulsar, Pulsar, Apache Spark, Spark, Apache TinkerPop, TinkerPop, Apache Kafka and Kafka are either registered trademarks or trademarks of the Apache Software Foundation or its subsidiaries in Canada, the United States and/or other countries. Kubernetes is the registered trademark of the Linux Foundation.

General Inquiries: +1 (650) 389-6000, info@datastax.com