Node.js Pulsar client on Astra Streaming
Prerequisites
You will need the following prerequisites in place to complete this guide:
-
sudo
permission to install dependencies -
Node.js version 10+ and npm 6+
-
A working Pulsar topic (get started here if you don’t have a topic)
-
A basic text editor or IDE
Visit our examples repo↗ to see the complete source of this example. |
Setup environment
Before we get started with the app, a little pre-work needs to be done.
The Node.js Pulsar client npm package depends on the C++ Pulsar library.
Pulsar Node client versions 1.8 and greater do not require installation of the C++ Pulsar library dependency. |
Install the C++ Pulsar library dependency.
wget https://archive.apache.org/dist/pulsar/pulsar-2.10.2/DEB/apache-pulsar-client.deb
wget https://archive.apache.org/dist/pulsar/pulsar-2.10.2/DEB/apache-pulsar-client-dev.deb
sudo apt install -y ./apache-pulsar-client.deb
sudo apt install -y ./apache-pulsar-client*.deb
sudo ldconfig
wget https://archive.apache.org/dist/pulsar/pulsar-2.10.2/RPMS/apache-pulsar-client-2.10.2-1.x86_64.rpm
wget https://archive.apache.org/dist/pulsar/pulsar-2.10.2/RPMS/apache-pulsar-client-devel-2.10.2-1.x86_64.rpm
sudo rpm -i ./apache-pulsar-client-2.10.2-1.x86_64.rpm
sudo rpm -i ./apache-pulsar-client-devel-2.10.2-1.x86_64.rpm
sudo ldconfig
Create a project
With the environment dependencies set up, let’s create a new Node.js project.
Run the following script in a terminal. If NPM asks for project values, use the defaults it suggests.
mkdir SimpleProducerConsumer && cd SimpleProducerConsumer
touch index.js
npm init -y
npm install pulsar-client
Add the client
A new "index.js" file will be created within the project folder. Open that file in your editor of choice and add the following code.
const Pulsar = require("pulsar-client");
(async () => {
const serviceUrl = "<REPLACE_WITH_SERVICE_URL>";
const pulsarToken = "<REPLACE_WITH_PULSAR_TOKEN>";
const tenantName = "<REPLACE_WITH_TENANT_NAME>";
const namespace = "<REPLACE_WITH_NAMESPACE>";
const topicName = "<REPLACE_WITH_TOPIC>";
const topic = `persistent://${tenantName}/${namespace}/${topicName}`;
// Debian Ubuntu:
const trustStore = '/etc/ssl/certs/ca-certificates.crt'
// CentOS RHEL:
// const trustStore = "/etc/ssl/certs/ca-bundle.crt";
const auth = new Pulsar.AuthenticationToken({ token: pulsarToken });
const client = new Pulsar.Client({
serviceUrl: serviceUrl,
authentication: auth,
tlsTrustCertsFilePath: trustStore,
operationTimeoutSeconds: 30,
});
This isn’t complete code (yet), so don’t be alarmed if your editor shows errors.
Notice there are a few variables waiting for replacement values. You can find those values here:
serviceUrl |
This is the URL for connecting to the Pulsar cluster. In the Astra Portal, navigate to the "Connect" tab of your streaming tenant. In the "Details" area you will find the "Broker Service URL". |
pulsarToken |
This is the token used for Pulsar cluster authentications. In the Astra Portal, navigate to the "Settings" tab of your streaming tenant. Select "Create Token". A new token will be generated and available to copy. |
tenantName |
The name of your streaming tenant. In the Astra Portal, navigate to your streaming tenant. In the "Details" area you will find the "Name". |
namespace |
Within your streaming tenant, this is the segmented area for certain topics. In the Astra Portal, navigate to the "Namespace And Topics" tab of your streaming tenant to see a list of namespaces. |
topicName |
Expanding the above chosen namespace will list the topics within. This value is just the topic name (not the "Full Name"). |
Create a producer
Use the client to create a producer.
While there are quite a few configuration options available for the producer, for now we’ll just declare the topic where messages should go.
Add the following code to "index.js".
const producer = await client.createProducer({
topic: topic,
});
After the code above creates a producer, add this code to actually send the message and receive acknowledgment.
producer.send({
data: Buffer.from("Hello World"),
});
console.log("sent message");
Finally, add a little clean-up.
await producer.flush();
await producer.close();
Create a consumer
If we ran the above example, we’d have a produced message waiting to be consumed and acknowledged.
The code below creates a new consumer subscription, names the subscription, and declares what topic to watch.
Add this code to "index.js".
const consumer = await client.subscribe({
topic: topic,
subscription: "examples-subscription",
subscriptionType: "Exclusive",
ackTimeoutMs: 10000,
});
We want this consumer to receive messages, write them to console, and acknowledge receipt with the broker.
Add this code to "index.js".
const msg = await consumer.receive();
console.log(msg.getData().toString());
consumer.acknowledge(msg);
Finally, add a little clean-up. No one likes loose ends, right?!
await consumer.close();
await client.close();
})();
Run the example
Alright, it’s that time! Let’s see if all that hard work will pay off.
Return to the terminal and run the following command.
node index.js
You should see output similar to the following.
sent message
Hello World
Next steps
You did it🎉! You’re on your way to messaging glory. Let’s continue learning.