Use the Golang Pulsar client with Astra Streaming
You can produce and consume messages with the Golang Pulsar client and Astra Streaming.
For a complete source code example, see the Astra Streaming Examples repository.
Prerequisites
-
Golang version 1.15 or later
-
An Apache Pulsar™ topic in Astra Streaming
-
A text editor or IDE
Create a project
-
Run the following script to create a project folder, change directory into it, and then initialize a new Go project:
sudo apt install -y golang-go mkdir SimpleProducerConsumer && cd SimpleProducerConsumer go mod init SimpleProducerConsumer touch main.go go get -u github.com/apache/pulsar-client-goThe new project includes a
mainfile and the retrieved Pulsar client package.
Write the script
-
Open the
main.gofile, and then add the following code to import the required packages, establish an entry point for the application, and create a new instance of a Pulsar client:main.gopackage main import ( "context" "fmt" "github.com/apache/pulsar-client-go/pulsar" "log" ) func main() { log.Println("Pulsar Producer") serviceUrl := "<REPLACE_WITH_SERVICE_URL>" pulsarToken := "<REPLACE_WITH_PULSAR_TOKEN>" tenantName := "<REPLACE_WITH_TENANT_NAME>" namespace := "<REPLACE_WITH_NAMESPACE>" topicName := "<REPLACE_WITH_TOPIC>" topic := fmt.Sprintf("persistent://%v/%v/%v", tenantName, namespace, topicName) token := pulsar.NewAuthenticationToken(pulsarToken) client, err := pulsar.NewClient(pulsar.ClientOptions{ URL: serviceUrl, Authentication: token, }) if err != nil { log.Fatalf("Could not instantiate Pulsar client: %v", err) } defer client.Close()This code is incomplete. Your editor might show errors until you complete the next steps.
-
Provide values for the following variables:
Parameter Definition Where to find the value serviceUrlThe URL to connect to the Pulsar cluster
-
In the Astra Portal header, click Applications, and then select Streaming.
-
Click the name of your tenant, and then click the Connect tab.
-
In the Tenant Details section, get the Broker Service URL.
pulsarTokenThe token for Pulsar cluster authentication
For information about creating Pulsar tokens, see Manage tokens.
tenantNameThe name of your streaming tenant
-
In the Astra Portal header, click Applications, and then select Streaming.
-
Get the tenant name from the Astra Streaming dashboard.
namespaceThe segmented area for certain topics in your streaming tenant
-
In the Astra Portal header, click Applications, and then select Streaming.
-
Click the name of your tenant, and then click the Namespace and Topics tab.
-
Choose the target namespace from the list of namespaces.
topicNameTopic name (not the full name)
-
In the Astra Portal header, click Applications, and then select Streaming.
-
Click the name of your tenant, and then click the Namespace and Topics tab.
-
Expand the target namespace in the list of namespaces to view the names of the topics within. Do not use the Full Name.
-
-
Use the client to create a producer.
main.golog.Printf("creating producer...") // Use the client to instantiate a producer producer, err := client.CreateProducer(pulsar.ProducerOptions{ Topic: topic, }) log.Printf("checking error of producer creation...") if err != nil { log.Fatal(err) } defer producer.Close() ctx := context.Background()While the client contains directions for where to connect and how to authenticate, the producer has the full topic address where a message is published. The full topic address includes the tenant, namespace, and topic names, as well as topic persistence.
-
Create a
Hello Worldmessage and send it asynchronously:main.goasyncMsg := pulsar.ProducerMessage{ Payload: []byte(fmt.Sprintf("Hello World")), } // Attempt to send the message asynchronously and handle the response producer.SendAsync(ctx, &asyncMsg, func(msgID pulsar.MessageID, msg *pulsar.ProducerMessage, err error) { if err != nil { log.Fatal(err) } log.Printf("the %s successfully published with the message ID %v", string(msg.Payload), msgID) }) -
Use the client to create a new subscription instance to receive the message:
main.goconsumer, err := client.Subscribe(pulsar.ConsumerOptions{ Topic: topic, SubscriptionName: "examples-subscription", SubscriptionInitialPosition: pulsar.SubscriptionPositionEarliest, }) if err != nil { log.Fatal(err) } defer consumer.Close() ctx = context.Background()A topic is set with a subscription name and an initial position. In this example, the subscription starts with the
Earliestunacknowledged message, which is also the first message in the topic. -
Make the consumer receive and acknowledge (
ack) messages provided to the subscription:main.gomsg, err := consumer.Receive(ctx) if err != nil { log.Fatal(err) } else { log.Printf("Received message : %s", string(msg.Payload())) } err = consumer.Ack(msg) if err != nil { log.Fatal(err) return } }
Run the script
In your project directory, run your Go app:
go run main.go
Output such as the following indicates the script succeeded:
2023/01/04 13:43:00 Pulsar Producer
2023/01/04 13:43:00 creating producer...
2023/01/04 13:43:01 checking error of producer creation...
2023/01/04 13:43:01 the Hello World successfully published with the message ID 421788:1:0
2023/01/04 13:43:01 Received message : Hello World