Producing and consuming messages using the Golang pulsar client on Astra Streaming

Prerequisites

You will need the following prerequisites in place to complete this guide:

  • Golang version 1.15 and above

  • A working Pulsar topic (get started here if you don’t have a topic)

  • A basic text editor or IDE

Visit our examples repo to see the complete source of this example.

Create a project

Create a new GO project.

Open a new terminal and run the following script to create a project folder, change directory into it, and initialize a new GO project.

#sudo apt install -y golang-go
mkdir SimpleProducerConsumer && cd SimpleProducerConsumer

go mod init SimpleProducerConsumer

touch main.go

go get -u github.com/apache/pulsar-client-go

Within the new project, a "main" file is created and the Pulsar client package is retrieved.

Add the client

Open the new "main.go" file in your editor and paste the following code within.

package main

import (
	"context"
	"fmt"
	"github.com/apache/pulsar-client-go/pulsar"
	"log"
)

func main() {
	log.Println("Pulsar Producer")

	serviceUrl := "<REPLACE_WITH_SERVICE_URL>"
	pulsarToken := "<REPLACE_WITH_PULSAR_TOKEN>"

	tenantName := "<REPLACE_WITH_TENANT_NAME>"
	namespace := "<REPLACE_WITH_NAMESPACE>"
	topicName := "<REPLACE_WITH_TOPIC>"

	topic := fmt.Sprintf("persistent://%v/%v/%v", tenantName, namespace, topicName)

	token := pulsar.NewAuthenticationToken(pulsarToken)

	client, err := pulsar.NewClient(pulsar.ClientOptions{
		URL:            serviceUrl,
		Authentication: token,
	})

	if err != nil {
		log.Fatalf("Could not instantiate Pulsar client: %v", err)
	}

	defer client.Close()

This is NOT a complete code snippet: it imports the required packages, establishes an entry point for the application, and creates a new instance of a Pulsar client.

Notice there are a few variables that are waiting for replacement values. You can find those values here:

serviceUrl

This is the URL for connecting to the Pulsar cluster.

In the Astra Portal, navigate to the "Connect" tab of your streaming tenant. In the "Details" area you will find the "Broker Service URL".

pulsarToken

This is the token used for Pulsar cluster authentications.

In the Astra Portal, navigate to the "Settings" tab of your streaming tenant. Select "Create Token". A new token will be generated and available to copy.

tenantName

The name of your streaming tenant.

In the Astra Portal, navigate to your streaming tenant. In the "Details" area you will find the "Name".

namespace

Within your streaming tenant, this is the segmented area for certain topics.

In the Astra Portal, navigate to the "Namespace And Topics" tab of your streaming tenant to see a list of namespaces.

topicName

Expanding the above chosen namespace will list the topics within. This value is just the topic name (not the "Full Name").

Create a producer

Use the client to create a producer.

While the client contains directions for where to connect and how to authenticate, the producer has the full topic address where a message will be published. The full topic address includes the tenant, namespace, and topic names, as well as topic persistence.

Paste the following code in "main.go".

	log.Printf("creating producer...")

	// Use the client to instantiate a producer
	producer, err := client.CreateProducer(pulsar.ProducerOptions{
		Topic: topic,
	})

	log.Printf("checking error of producer creation...")
	if err != nil {
		log.Fatal(err)
	}

	defer producer.Close()

	ctx := context.Background()

Next step is to create a message to be sent. In our case we’ll set the payload to something super memorable - "Hello World". And with a producer and message ready to go, we can asynchronously send.

Paste the following code in "main.go".

	asyncMsg := pulsar.ProducerMessage{
		Payload: []byte(fmt.Sprintf("Hello World")),
	}

	// Attempt to send the message asynchronously and handle the response
	producer.SendAsync(ctx, &asyncMsg, func(msgID pulsar.MessageID, msg *pulsar.ProducerMessage, err error) {
		if err != nil {
			log.Fatal(err)
		}

		log.Printf("the %s successfully published with the message ID %v", string(msg.Payload), msgID)
	})

Create a consumer

What’s a message for if it never gets read, right? Let’s create a new subscription instance from the client.

Subscriptions are covered in more depth in their own topic😀, but for now, think of a consumer and a subscription as the same idea.

A topic is set with a subscription name and an initial position. In this case our subscription will start with the "Earliest" unacknowledged message, which happens to be the first message in the topic.

Add the following code to your "main.go" file.

	consumer, err := client.Subscribe(pulsar.ConsumerOptions{
		Topic:                       topic,
		SubscriptionName:            "examples-subscription",
		SubscriptionInitialPosition: pulsar.SubscriptionPositionEarliest,
	})

	if err != nil {
		log.Fatal(err)
	}

	defer consumer.Close()

	ctx = context.Background()

Add this code to your file to have the consumer receive and then acknowledge (or "ack") messages provided to the subscription.

	msg, err := consumer.Receive(ctx)
	if err != nil {
		log.Fatal(err)
	} else {
		log.Printf("Received message : %s", string(msg.Payload()))
	}

	err = consumer.Ack(msg)
	if err != nil {
		log.Fatal(err)
		return
	}
}

Run the example

Your example GO application is complete. Here it is, your big moment!

Return to the terminal in the same folder as the "main.go" file, and run the application.

go run main.go

If everything works as planned, you should see output similar to the following.

2023/01/04 13:43:00 Pulsar Producer
2023/01/04 13:43:00 creating producer...
2023/01/04 13:43:01 checking error of producer creation...
2023/01/04 13:43:01 the Hello World successfully published with the message ID 421788:1:0
2023/01/04 13:43:01 Received message : Hello World

Next steps

Your messaging skills are progressing well! Let’s continue learning.

Was this helpful?

Give Feedback

How can we improve the documentation?

© 2024 DataStax | Privacy policy | Terms of use

Apache, Apache Cassandra, Cassandra, Apache Tomcat, Tomcat, Apache Lucene, Apache Solr, Apache Hadoop, Hadoop, Apache Pulsar, Pulsar, Apache Spark, Spark, Apache TinkerPop, TinkerPop, Apache Kafka and Kafka are either registered trademarks or trademarks of the Apache Software Foundation or its subsidiaries in Canada, the United States and/or other countries. Kubernetes is the registered trademark of the Linux Foundation.

General Inquiries: +1 (650) 389-6000, info@datastax.com