Producing and consuming messages using the Node.js Pulsar client on Astra Streaming

Prerequisites

You will need the following prerequisites in place to complete this guide:

  • sudo permission to install dependencies

  • Node.js version 10+ and npm 6+

  • A working Pulsar topic (get started here if you don’t have a topic)

  • A basic text editor or IDE

Visit our examples repo to see the complete source of this example.

Setup environment

Before we get started with the app, a little pre-work needs to be done.

The Node.js Pulsar client npm package depends on the C++ Pulsar library.

Pulsar Node client versions 1.8 and greater do not require installation of the C++ Pulsar library dependency.

Install the C++ Pulsar library dependency.

  • Ubuntu-based deb

  • Centos/RHEL based rpm

wget https://archive.apache.org/dist/pulsar/pulsar-2.10.2/DEB/apache-pulsar-client.deb
wget https://archive.apache.org/dist/pulsar/pulsar-2.10.2/DEB/apache-pulsar-client-dev.deb

sudo apt install -y ./apache-pulsar-client.deb
sudo apt install -y ./apache-pulsar-client*.deb

sudo ldconfig
wget https://archive.apache.org/dist/pulsar/pulsar-2.10.2/RPMS/apache-pulsar-client-2.10.2-1.x86_64.rpm
wget https://archive.apache.org/dist/pulsar/pulsar-2.10.2/RPMS/apache-pulsar-client-devel-2.10.2-1.x86_64.rpm

sudo rpm -i ./apache-pulsar-client-2.10.2-1.x86_64.rpm
sudo rpm -i ./apache-pulsar-client-devel-2.10.2-1.x86_64.rpm

sudo ldconfig

Create a project

With the environment dependencies set up, let’s create a new Node.js project.

Run the following script in a terminal. If NPM asks for project values, use the defaults it suggests.

mkdir SimpleProducerConsumer && cd SimpleProducerConsumer

touch index.js

npm init -y

npm install pulsar-client

Add the client

A new "index.js" file will be created within the project folder. Open that file in your editor of choice and add the following code.

const Pulsar = require("pulsar-client");

(async () => {
  const serviceUrl = "<REPLACE_WITH_SERVICE_URL>";
  const pulsarToken = "<REPLACE_WITH_PULSAR_TOKEN>";

  const tenantName = "<REPLACE_WITH_TENANT_NAME>";
  const namespace = "<REPLACE_WITH_NAMESPACE>";
  const topicName = "<REPLACE_WITH_TOPIC>";

  const topic = `persistent://${tenantName}/${namespace}/${topicName}`;

  // Debian Ubuntu:
  const trustStore = '/etc/ssl/certs/ca-certificates.crt'
  // CentOS RHEL:
  // const trustStore = "/etc/ssl/certs/ca-bundle.crt";

  const auth = new Pulsar.AuthenticationToken({ token: pulsarToken });

  const client = new Pulsar.Client({
    serviceUrl: serviceUrl,
    authentication: auth,
    tlsTrustCertsFilePath: trustStore,
    operationTimeoutSeconds: 30,
  });

This isn’t complete code (yet), so don’t be alarmed if your editor shows errors.

Notice there are a few variables waiting for replacement values. You can find those values here:

serviceUrl

This is the URL for connecting to the Pulsar cluster.

In the Astra Portal, navigate to the "Connect" tab of your streaming tenant. In the "Details" area you will find the "Broker Service URL".

pulsarToken

This is the token used for Pulsar cluster authentications.

In the Astra Portal, navigate to the "Settings" tab of your streaming tenant. Select "Create Token". A new token will be generated and available to copy.

tenantName

The name of your streaming tenant.

In the Astra Portal, navigate to your streaming tenant. In the "Details" area you will find the "Name".

namespace

Within your streaming tenant, this is the segmented area for certain topics.

In the Astra Portal, navigate to the "Namespace And Topics" tab of your streaming tenant to see a list of namespaces.

topicName

Expanding the above chosen namespace will list the topics within. This value is just the topic name (not the "Full Name").

Create a producer

Use the client to create a producer.

While there are quite a few configuration options available for the producer, for now we’ll just declare the topic where messages should go.

Add the following code to "index.js".

  const producer = await client.createProducer({
    topic: topic,
  });

After the code above creates a producer, add this code to actually send the message and receive acknowledgment.

  producer.send({
    data: Buffer.from("Hello World"),
  });
  console.log("sent message");

Finally, add a little clean-up.

  await producer.flush();
  await producer.close();

Create a consumer

If we ran the above example, we’d have a produced message waiting to be consumed and acknowledged.

The code below creates a new consumer subscription, names the subscription, and declares what topic to watch.

Add this code to "index.js".

  const consumer = await client.subscribe({
    topic: topic,
    subscription: "examples-subscription",
    subscriptionType: "Exclusive",
    ackTimeoutMs: 10000,
  });

We want this consumer to receive messages, write them to console, and acknowledge receipt with the broker.

Add this code to "index.js".

  const msg = await consumer.receive();
  console.log(msg.getData().toString());
  consumer.acknowledge(msg);

Finally, add a little clean-up. No one likes loose ends, right?!

  await consumer.close();
  await client.close();
})();

Run the example

Alright, it’s that time! Let’s see if all that hard work will pay off.

Return to the terminal and run the following command.

node index.js

You should see output similar to the following.

sent message
Hello World

Next steps

You did it🎉! You’re on your way to messaging glory. Let’s continue learning.

Was this helpful?

Give Feedback

How can we improve the documentation?

© 2024 DataStax | Privacy policy | Terms of use

Apache, Apache Cassandra, Cassandra, Apache Tomcat, Tomcat, Apache Lucene, Apache Solr, Apache Hadoop, Hadoop, Apache Pulsar, Pulsar, Apache Spark, Spark, Apache TinkerPop, TinkerPop, Apache Kafka and Kafka are either registered trademarks or trademarks of the Apache Software Foundation or its subsidiaries in Canada, the United States and/or other countries. Kubernetes is the registered trademark of the Linux Foundation.

General Inquiries: +1 (650) 389-6000, info@datastax.com