Golang code examples

Astra Streaming is currently in private beta. Join the Astra Streaming Waitlist.

Astra Streaming is powered by Apache Pulsar. To connect to your service, use the open-source client APIs provided by the Apache Pulsar project.

The Golang client APIs are distributed through Golang Package Dependency. On Linux and MacOS, the Golang version 1.12 and above are supported. You can find the documentation for the Golang client here.

In the code examples, you’ll need to make the following substitutions:

  • <service-url>: The Pulsar service URL from the Astra Streaming Console Connect tab.

  • <streaming-token>: Your access token from the Astra Streaming Console Connect tab.

  • <topic-url>: A topic URL of your choice from the Astra Streaming Console Topics tab.

  • <subscription-name>: A subscription name of your choice, or one that you’ve configured in the Details tab of an existing topic.

  • go get

  • Producer

  • Consumer

  • Reader

To do a native install using go get:

go get -u github.com/apache/pulsar-client-go

This sample creates a producer on a topic, sends 3 messages synchronously and 3 messages asynchronously.

package main
import (
	"context"
	"fmt"
	"log"

	"github.com/apache/pulsar-client-go/pulsar"
)

func main() {
    fmt.Println("Pulsar Producer")

    // Configuration variables pertaining to this consumer
    tokenStr := "<streaming-token>"
    uri := "<service-url>"
    // RHEL CentOS:
    trustStore := "/etc/ssl/certs/ca-bundle.crt"
    // Debian Ubuntu:
    // trustStore := '/etc/ssl/certs/ca-certificates.crt'
    // OSX:
    // Export the default certificates to a file, then use that file:
    // security find-certificate -a -p /System/Library/Keychains/SystemCACertificates.keychain > ./ca-certificates.crt
    // trust_certs='./ca-certificates.crt'
    topicName := "<topic-url>"

	token := pulsar.NewAuthenticationToken(tokenStr)

	client, err := pulsar.NewClient(pulsar.ClientOptions{
		URL:                     uri,
		Authentication:          token,
		TLSTrustCertsFilePath:   trustStore,
	})

	if err != nil {
		log.Fatalf("Could not instantiate Pulsar client: %v", err)
	}

	defer client.Close()

	log.Printf("creating producer...")

	// Use the client to instantiate a producer
	producer, err := client.CreateProducer(pulsar.ProducerOptions{
		Topic: topicName,
	})

	log.Printf("checking error of producer creation...")
	if err != nil {
		log.Fatal(err)
	}

	defer producer.Close()

	ctx := context.Background()

	// Send 3 messages synchronously and 3 messages asynchronously
	for i := 0; i < 3; i++ {
		// Create a message
		msg := pulsar.ProducerMessage{
			Payload: []byte(fmt.Sprintf("messageId-%d", i)),
		}

		// Attempt to send the message
        messageID, err := producer.Send(ctx, &msg)
		if err != nil {
			log.Fatal(err)
		}
        log.Printf("the %s successfully published with the message ID %v", string(msg.Payload), messageID)

		// Create a different message to send asynchronously
		asyncMsg := pulsar.ProducerMessage{
			Payload: []byte(fmt.Sprintf("asyncMessageId-%d", i)),
		}

		// Attempt to send the message asynchronously and handle the response
		producer.SendAsync(ctx, &asyncMsg, func(msgID pulsar.MessageID, msg *pulsar.ProducerMessage, err error) {
			if err != nil {
				log.Fatal(err)
			}

			log.Printf("the %s successfully published with the message ID %v", string(msg.Payload), msgID)
		})
	}
}

This sample creates a consumer on a topic, waits for a message, acknowledges it in an infinite loop.

package main
import (
	"context"
	"fmt"
	"log"

	"github.com/apache/pulsar-client-go/pulsar"
)

func main() {
    fmt.Println("Pulsar Consumer")

    // Configuration variables pertaining to this consumer
    tokenStr := "<streaming-token>"
    uri := "pulsar+ssl://uswest2.aws.kafkaesque.io:6651"
    // RHEL CentOS:
    trustStore := "/etc/ssl/certs/ca-bundle.crt"
    // Debian Ubuntu:
    // trustStore := '/etc/ssl/certs/ca-certificates.crt'
    // OSX:
    // Export the default certificates to a file, then use that file:
    // security find-certificate -a -p /System/Library/Keychains/SystemCACertificates.keychain > ./ca-certificates.crt
    // trust_certs='./ca-certificates.crt'
    topicName := "<topic-url>"
    subscriptionName := "<subscription-name>"

	token := pulsar.NewAuthenticationToken(tokenStr)

	// Pulsar client
	client, err := pulsar.NewClient(pulsar.ClientOptions{
		URL:                   uri,
		Authentication:        token,
		TLSTrustCertsFilePath: trustStore,
	})

	if err != nil {
		log.Fatal(err)
	}

	defer client.Close()

	consumer, err := client.Subscribe(pulsar.ConsumerOptions{
		Topic:            topicName,
		SubscriptionName: subscriptionName,
	})

	if err != nil {
		log.Fatal(err)
	}

	defer consumer.Close()

	ctx := context.Background()

	// infinite loop to receive messages
	for {
		msg, err := consumer.Receive(ctx)
		if err != nil {
			log.Fatal(err)
		} else {
			fmt.Println("Received message : ", string(msg.Payload()))
		}

		consumer.Ack(msg)
	}

}

This sample creates a reader on a topic and reads the earliest or latest messages.

package main
import (
	"context"
	"fmt"
	"log"

	"github.com/apache/pulsar-client-go/pulsar"
)

func main() {
    fmt.Println("Pulsar Reader")

    // Configuration variables pertaining to this reader
    tokenStr := "<streaming-token>"
    uri := "<service-url>"
    // RHEL CentOS:
    trustStore := "/etc/ssl/certs/ca-bundle.crt"
    // Debian Ubuntu:
    // trustStore := '/etc/ssl/certs/ca-certificates.crt'
    // OSX:
    // Export the default certificates to a file, then use that file:
    // security find-certificate -a -p /System/Library/Keychains/SystemCACertificates.keychain > ./ca-certificates.crt
    // trust_certs='./ca-certificates.crt'
    topicName := "<topic-url>"
	token := pulsar.NewAuthenticationToken(tokenStr)

	// Pulsar client
	client, err := pulsar.NewClient(pulsar.ClientOptions{
		URL:                   uri,
		Authentication:        token,
		TLSTrustCertsFilePath: trustStore,
	})

	if err != nil {
		log.Fatal(err)
	}

	defer client.Close()

	reader, err := client.CreateReader(pulsar.ReaderOptions{
		Topic:          topicName,
		StartMessageID: pulsar.EarliestMessageID(),
	})

	if err != nil {
		log.Fatal(err)
	}

	defer reader.Close()

	ctx := context.Background()

	// infinite loop to receive messages
	for {
		msg, err := reader.Next(ctx)
		if err != nil {
			log.Fatal(err)
		} else {
			fmt.Println("Received message : ", string(msg.Payload()))
		}
	}

}