• Glossary
  • Support
  • Downloads
  • DataStax Home
Get Live Help
Expand All
Collapse All

DataStax Streaming Home

Astra Streaming Documentation

    • Getting Started
    • Developing
      • Using Pulsar binaries with Astra Streaming
      • Using curl with Astra Streaming
      • Astra CLI
      • Astra Streaming Functions
      • Starlight for Kafka
      • Starlight for RabbitMQ
      • Producing and consuming messages
        • Astra Portal
        • Pulsar Cli
        • Client Applications
          • Java
          • Python
          • C#
          • Golang
          • Node.js
      • Change data capture (CDC)
        • CDC for Astra DB
    • Operations
      • Geo-replication
      • Astra Streaming Limits
      • Astra Streaming Pricing
      • Astra Streaming Regions
      • Scrape Astra Streaming metrics with Prometheus
      • Manage Tokens
      • Enrollment FAQ
    • Guides and Examples
      • FAQs
      • Manage permissions
        • Use custom roles
      • Pulsar subscriptions
        • Exclusive
        • Shared
        • Failover
        • Key_shared
    • API Docs
      • Using the DevOps v2 API
      • API References
    • IO Connectors
    • Astra Streaming release notes
  • Astra Streaming Documentation
  • Developing
  • Producing and consuming messages
  • Client Applications
  • C#
Edit this Page

C# pulsar client on Astra Streaming

Prerequisites

You will need the following prerequisites in place to complete this guide:

  • .NET 7 SDK installed (install now↗)

  • A working Pulsar topic (get started here if you don’t have a topic)

  • A basic text editor or IDE

Visit our examples repo↗ to see the complete source of this example.

Create a console project

Create a new console project and add a reference to the dotpulsar client↗

dotnet new console \
  --output SimpleProducerConsumer \
  --framework net6.0

cd SimpleProducerConsumer

dotnet add package DotPulsar --version 2.7.0

Create a client instance

Create an instance of PulsarClient.

Open the Program.cs file in your favorite text editor and clear its contents.

Notice how we chain different configurations together to choose how connections are handled.

Add the following to the file, starting at line 1:

using DotPulsar;
using DotPulsar.Extensions;

var serviceUrl = "<REPLACE_WITH_SERVICE_URL>";
var pulsarToken = "<REPLACE_WITH_PULSAR_TOKEN>";

var tenantName = "<REPLACE_WITH_TENANT_NAME>";
var nmspace = "<REPLACE_WITH_NAMESPACE>";
var topicName = "<REPLACE_WITH_TOPIC>";

var topic = $"persistent://{tenantName}/{nmspace}/{topicName}";

await using var client = PulsarClient.Builder()
                                      .ServiceUrl(new Uri(serviceUrl))
                                      .Authentication(
                                        AuthenticationFactory.Token(pulsarToken)
                                      )
                                      .Build();

There are a few values that you’ll need to fill in:

serviceUrl

This is the URL for connecting to the Pulsar cluster.

In the Astra Portal, navigate to the "Connect" tab of your streaming tenant. In the "Details" area you will find the "Broker Service URL".

pulsarToken

This is the token used for Pulsar cluster authentications.

In the Astra Portal, navigate to the "Settings" tab of your streaming tenant. Select "Create Token". A new token will be generated and available to copy.

tenantName

The name of your streaming tenant.

In the Astra Portal, navigate to your streaming tenant. In the "Details" area you will find the "Name".

namespace

Within your streaming tenant, this is the segmented area for certain topics.

In the Astra Portal, navigate to the "Namespace And Topics" tab of your streaming tenant to see a list of namespaces.

topicName

Expanding the above chosen namespace will list the topics within. This value is just the topic name (not the "Full Name").

The producer

We’re making progress! Now that we have a Pulsar client connected, we will use that connection to produce a single message.

We will create a new producer instance and instruct it to produce a message in a string format (other examples are JSON, byte[], AVRO, etc).

In the Program.cs file, append the following code just below the client code.

await using var producer = client.NewProducer(Schema.String)
                                 .Topic(topic)
                                 .Create();

Add the following code to asynchronously send a single (string) message and wait for acknowledgment.

_ = await producer.Send("Hello World"); // Send a message and ignore the returned MessageId
Console.WriteLine("Sent message");

The consumer

The final piece to the puzzle - a consumer. What value is a message that’s never consumed!?

We will create a new consumer instance and instruct it to expect messages in a string format. The consumer uses broker subscriptions to gather messages.

The initial position is one of Pulsar’s superpowers: here we choose to start at the "earliest" message, which is the first message added to the topic.

Return to the Program.cs file and append the following code below the producer code.

await using var consumer = client.NewConsumer(Schema.String)
                                 .SubscriptionName("examples-subscription")
                                 .Topic(topic)
                                 .InitialPosition(SubscriptionInitialPosition.Earliest)
                                 .Create();

This code will loop through the messages provided to the consumer’s subscription and write its contents.

var msgCount = 0;
await foreach (var message in consumer.Messages())
{
	msgCount++;
	Console.WriteLine($"Received: {message.Value()}");
	await consumer.Acknowledge(message);
	
	if(msgCount > 0) break;
}

Run the example

The moment of truth…​ running the app. Return to the terminal where you created the dotnet project and run the following command.

dotnet run

You should see output similar to the following.

~/SimpleProducerConsumer$ dotnet run

Sent message
Received: Hello World

Woo-hoo🎉! You did it! You’re on your way to messaging glory.

Next steps

  • A guide to configuring Pulsar binaries with Astra Streaming

  • Astra Streaming Functions

  • Connectors

Python Golang

General Inquiries: +1 (650) 389-6000 info@datastax.com

© DataStax | Privacy policy | Terms of use

DataStax, Titan, and TitanDB are registered trademarks of DataStax, Inc. and its subsidiaries in the United States and/or other countries.

Apache, Apache Cassandra, Cassandra, Apache Tomcat, Tomcat, Apache Lucene, Apache Solr, Apache Hadoop, Hadoop, Apache Pulsar, Pulsar, Apache Spark, Spark, Apache TinkerPop, TinkerPop, Apache Kafka and Kafka are either registered trademarks or trademarks of the Apache Software Foundation or its subsidiaries in Canada, the United States and/or other countries.

Kubernetes is the registered trademark of the Linux Foundation.

landing_page landingpage