Use the C# Pulsar client on Astra Streaming

You can produce and consume messages with the C# Pulsar client and Astra Streaming.

Go to the examples repo for the complete source of this example.

Prerequisites

  • .NET 7 SDK

  • A Pulsar topic in Astra Streaming

  • A text editor or IDE

Create a console project

Create a new console project, and then add a reference to the dotpulsar client:

dotnet new console \
  --output SimpleProducerConsumer \
  --framework net6.0

cd SimpleProducerConsumer

dotnet add package DotPulsar --version 2.7.0

Write the script

  1. In your new project, open the Program.cs file, and then remove any existing content from the file.

  2. Enter the following code, starting at line 1:

    Program.cs
    using DotPulsar;
    using DotPulsar.Extensions;
    
    var serviceUrl = "<REPLACE_WITH_SERVICE_URL>";
    var pulsarToken = "<REPLACE_WITH_PULSAR_TOKEN>";
    
    var tenantName = "<REPLACE_WITH_TENANT_NAME>";
    var nmspace = "<REPLACE_WITH_NAMESPACE>";
    var topicName = "<REPLACE_WITH_TOPIC>";
    
    var topic = $"persistent://{tenantName}/{nmspace}/{topicName}";
    
    await using var client = PulsarClient.Builder()
                                          .ServiceUrl(new Uri(serviceUrl))
                                          .Authentication(
                                            AuthenticationFactory.Token(pulsarToken)
                                          )
                                          .Build();

    This code creates an instance of PulsarClient. Notice that different configurations are chained together to choose how connections are handled.

  3. Provide values for the following variables:

    Parameter Definition Where to find the value

    serviceUrl

    The URL to connect to the Pulsar cluster

    In the Astra Portal navigation menu, click Streaming, select your streaming tenant, and then click the Connect tab. In the Details section, get the Broker Service URL.

    pulsarToken

    The token for Pulsar cluster authentication

    For information about creating Pulsar tokens, see Manage tokens.

    tenantName

    The name of your streaming tenant

    In the Astra Portal navigation menu, click Streaming, select your streaming tenant. In the Details section, get the Name.

    namespace

    The segmented area for certain topics in your streaming tenant

    In the Astra Portal navigation menu, click Streaming, select your streaming tenant, and then click the Namespace and Topics tab. Choose the target namespace from the list of namespaces.

    topicName

    Topic name (not the full name)

    In the Astra Portal navigation menu, click Streaming, select your streaming tenant, and then click the Namespace and Topics tab. Expand the target namespace in the list of namespaces to view the names of the topics within. Do not use the Full Name.

  4. Create a new producer instance, and then instruct it to produce a message in a string format. Messages can also be in formats like JSON, byte, and AVRO.

    Program.cs
    await using var producer = client.NewProducer(Schema.String)
                                     .Topic(topic)
                                     .Create();
  5. Asynchronously send a single message and wait for acknowledgment:

    Program.cs
    _ = await producer.Send("Hello World"); // Send a message and ignore the returned MessageId
    Console.WriteLine("Sent message");
  6. Create a new consumer instance and instruct it to expect messages in string format. The consumer uses broker subscriptions to gather messages.

    Program.cs
    await using var consumer = client.NewConsumer(Schema.String)
                                     .SubscriptionName("examples-subscription")
                                     .Topic(topic)
                                     .InitialPosition(SubscriptionInitialPosition.Earliest)
                                     .Create();

    A subscription is set with a subscription name and an initial position. In this example, the subscription starts with the Earliest unacknowledged message, which is also the first message in the topic.

  7. Loop through the messages provided to the consumer’s subscription and write their contents:

    Program.cs
    var msgCount = 0;
    await foreach (var message in consumer.Messages())
    {
    	msgCount++;
    	Console.WriteLine($"Received: {message.Value()}");
    	await consumer.Acknowledge(message);
    	
    	if(msgCount > 0) break;
    }

Run the script

In your project directory, run the script:

dotnet run

Output such as the following indicates the script succeeded:

Sent message
Received: Hello World

Was this helpful?

Give Feedback

How can we improve the documentation?

© 2024 DataStax | Privacy policy | Terms of use

Apache, Apache Cassandra, Cassandra, Apache Tomcat, Tomcat, Apache Lucene, Apache Solr, Apache Hadoop, Hadoop, Apache Pulsar, Pulsar, Apache Spark, Spark, Apache TinkerPop, TinkerPop, Apache Kafka and Kafka are either registered trademarks or trademarks of the Apache Software Foundation or its subsidiaries in Canada, the United States and/or other countries. Kubernetes is the registered trademark of the Linux Foundation.

General Inquiries: +1 (650) 389-6000, info@datastax.com