Use the Golang Pulsar client with Astra Streaming

You can produce and consume messages with the Golang Pulsar client and Astra Streaming.

Go to the examples repo for the complete source of this example.

Prerequisites

  • Golang version 1.15 or later

  • A Pulsar topic in Astra Streaming

  • A text editor or IDE

Create a project

  1. Run the following script to create a project folder, change directory into it, and then initialize a new Go project:

    sudo apt install -y golang-go
    mkdir SimpleProducerConsumer && cd SimpleProducerConsumer
    
    go mod init SimpleProducerConsumer
    
    touch main.go
    
    go get -u github.com/apache/pulsar-client-go

    The new project includes a main file and the retrieved Pulsar client package.

Write the script

  1. Open the main.go file, and then add the following code to import the required packages, establish an entry point for the application, and create a new instance of a Pulsar client:

    main.go
    package main
    
    import (
    	"context"
    	"fmt"
    	"github.com/apache/pulsar-client-go/pulsar"
    	"log"
    )
    
    func main() {
    	log.Println("Pulsar Producer")
    
    	serviceUrl := "<REPLACE_WITH_SERVICE_URL>"
    	pulsarToken := "<REPLACE_WITH_PULSAR_TOKEN>"
    
    	tenantName := "<REPLACE_WITH_TENANT_NAME>"
    	namespace := "<REPLACE_WITH_NAMESPACE>"
    	topicName := "<REPLACE_WITH_TOPIC>"
    
    	topic := fmt.Sprintf("persistent://%v/%v/%v", tenantName, namespace, topicName)
    
    	token := pulsar.NewAuthenticationToken(pulsarToken)
    
    	client, err := pulsar.NewClient(pulsar.ClientOptions{
    		URL:            serviceUrl,
    		Authentication: token,
    	})
    
    	if err != nil {
    		log.Fatalf("Could not instantiate Pulsar client: %v", err)
    	}
    
    	defer client.Close()

    This code is incomplete. Your editor might show errors until you complete the next steps.

  2. Provide values for the following variables:

    Parameter Definition Where to find the value

    serviceUrl

    The URL to connect to the Pulsar cluster

    In the Astra Portal navigation menu, click Streaming, select your streaming tenant, and then click the Connect tab. In the Details section, get the Broker Service URL.

    pulsarToken

    The token for Pulsar cluster authentication

    For information about creating Pulsar tokens, see Manage tokens.

    tenantName

    The name of your streaming tenant

    In the Astra Portal navigation menu, click Streaming, select your streaming tenant. In the Details section, get the Name.

    namespace

    The segmented area for certain topics in your streaming tenant

    In the Astra Portal navigation menu, click Streaming, select your streaming tenant, and then click the Namespace and Topics tab. Choose the target namespace from the list of namespaces.

    topicName

    Topic name (not the full name)

    In the Astra Portal navigation menu, click Streaming, select your streaming tenant, and then click the Namespace and Topics tab. Expand the target namespace in the list of namespaces to view the names of the topics within. Do not use the Full Name.

  3. Use the client to create a producer.

    main.go
    	log.Printf("creating producer...")
    
    	// Use the client to instantiate a producer
    	producer, err := client.CreateProducer(pulsar.ProducerOptions{
    		Topic: topic,
    	})
    
    	log.Printf("checking error of producer creation...")
    	if err != nil {
    		log.Fatal(err)
    	}
    
    	defer producer.Close()
    
    	ctx := context.Background()

    While the client contains directions for where to connect and how to authenticate, the producer has the full topic address where a message is published. The full topic address includes the tenant, namespace, and topic names, as well as topic persistence.

  4. Create a Hello World message and send it asynchronously:

    main.go
    	asyncMsg := pulsar.ProducerMessage{
    		Payload: []byte(fmt.Sprintf("Hello World")),
    	}
    
    	// Attempt to send the message asynchronously and handle the response
    	producer.SendAsync(ctx, &asyncMsg, func(msgID pulsar.MessageID, msg *pulsar.ProducerMessage, err error) {
    		if err != nil {
    			log.Fatal(err)
    		}
    
    		log.Printf("the %s successfully published with the message ID %v", string(msg.Payload), msgID)
    	})
  5. Use the client to create a new subscription instance to receive the message:

    main.go
    	consumer, err := client.Subscribe(pulsar.ConsumerOptions{
    		Topic:                       topic,
    		SubscriptionName:            "examples-subscription",
    		SubscriptionInitialPosition: pulsar.SubscriptionPositionEarliest,
    	})
    
    	if err != nil {
    		log.Fatal(err)
    	}
    
    	defer consumer.Close()
    
    	ctx = context.Background()

    A topic is set with a subscription name and an initial position. In this example, the subscription starts with the Earliest unacknowledged message, which is also the first message in the topic.

  6. Make the consumer receive and acknowledge (ack) messages provided to the subscription:

    main.go
    	msg, err := consumer.Receive(ctx)
    	if err != nil {
    		log.Fatal(err)
    	} else {
    		log.Printf("Received message : %s", string(msg.Payload()))
    	}
    
    	err = consumer.Ack(msg)
    	if err != nil {
    		log.Fatal(err)
    		return
    	}
    }

Run the script

In your project directory, run your Go app:

go run main.go

Output such as the following indicates the script succeeded:

2023/01/04 13:43:00 Pulsar Producer
2023/01/04 13:43:00 creating producer...
2023/01/04 13:43:01 checking error of producer creation...
2023/01/04 13:43:01 the Hello World successfully published with the message ID 421788:1:0
2023/01/04 13:43:01 Received message : Hello World

Was this helpful?

Give Feedback

How can we improve the documentation?

© 2024 DataStax | Privacy policy | Terms of use

Apache, Apache Cassandra, Cassandra, Apache Tomcat, Tomcat, Apache Lucene, Apache Solr, Apache Hadoop, Hadoop, Apache Pulsar, Pulsar, Apache Spark, Spark, Apache TinkerPop, TinkerPop, Apache Kafka and Kafka are either registered trademarks or trademarks of the Apache Software Foundation or its subsidiaries in Canada, the United States and/or other countries. Kubernetes is the registered trademark of the Linux Foundation.

General Inquiries: +1 (650) 389-6000, info@datastax.com