Use the Node.js Pulsar client with Astra Streaming
You can produce and consume messages with the Node.js Pulsar client and Astra Streaming.
For a complete source code example, see the Astra Streaming Examples repository.
Prerequisites
-
sudopermission to install dependencies -
Node.js version 10 or later
-
npm 6 or later
-
An Apache Pulsar™ topic in Astra Streaming
-
A text editor or IDE
Set up the environment
Install the C++ Pulsar library dependency required by the Node.js Pulsar client npm package.
Pulsar Node client versions 1.8 and later do not require installation of the C++ Pulsar library dependency.
-
Ubuntu-based Debian
-
Centos/RHEL-based rpm
wget https://archive.apache.org/dist/pulsar/pulsar-2.10.2/DEB/apache-pulsar-client.deb
wget https://archive.apache.org/dist/pulsar/pulsar-2.10.2/DEB/apache-pulsar-client-dev.deb
sudo apt install -y ./apache-pulsar-client.deb
sudo apt install -y ./apache-pulsar-client*.deb
sudo ldconfig
wget https://archive.apache.org/dist/pulsar/pulsar-2.10.2/RPMS/apache-pulsar-client-2.10.2-1.x86_64.rpm
wget https://archive.apache.org/dist/pulsar/pulsar-2.10.2/RPMS/apache-pulsar-client-devel-2.10.2-1.x86_64.rpm
sudo rpm -i ./apache-pulsar-client-2.10.2-1.x86_64.rpm
sudo rpm -i ./apache-pulsar-client-devel-2.10.2-1.x86_64.rpm
sudo ldconfig
Create a project
In a terminal, run the following script to create a new Node.js project. If NPM asks for project values, use the automatically suggested defaults.
mkdir SimpleProducerConsumer && cd SimpleProducerConsumer
touch index.js
npm init -y
npm install pulsar-client
Write the script
-
In your new project, open the
index.jsfile, and then add the following code.index.jsconst Pulsar = require("pulsar-client"); (async () => { const serviceUrl = "<REPLACE_WITH_SERVICE_URL>"; const pulsarToken = "<REPLACE_WITH_PULSAR_TOKEN>"; const tenantName = "<REPLACE_WITH_TENANT_NAME>"; const namespace = "<REPLACE_WITH_NAMESPACE>"; const topicName = "<REPLACE_WITH_TOPIC>"; const topic = `persistent://${tenantName}/${namespace}/${topicName}`; // Debian Ubuntu: const trustStore = '/etc/ssl/certs/ca-certificates.crt' // CentOS RHEL: // const trustStore = "/etc/ssl/certs/ca-bundle.crt"; const auth = new Pulsar.AuthenticationToken({ token: pulsarToken }); const client = new Pulsar.Client({ serviceUrl: serviceUrl, authentication: auth, tlsTrustCertsFilePath: trustStore, operationTimeoutSeconds: 30, });This script is intentionally incomplete. Your IDE might show errors until you complete the next steps.
-
Provide values for the following variables:
Parameter Definition Where to find the value serviceUrlThe URL to connect to the Pulsar cluster
-
In the Astra Portal header, click Applications, and then select Streaming.
-
Click the name of your tenant, and then click the Connect tab.
-
In the Tenant Details section, get the Broker Service URL.
pulsarTokenThe token for Pulsar cluster authentication
For information about creating Pulsar tokens, see Manage tokens.
tenantNameThe name of your streaming tenant
-
In the Astra Portal header, click Applications, and then select Streaming.
-
Get the tenant name from the Astra Streaming dashboard.
namespaceThe segmented area for certain topics in your streaming tenant
-
In the Astra Portal header, click Applications, and then select Streaming.
-
Click the name of your tenant, and then click the Namespace and Topics tab.
-
Choose the target namespace from the list of namespaces.
topicNameTopic name (not the full name)
-
In the Astra Portal header, click Applications, and then select Streaming.
-
Click the name of your tenant, and then click the Namespace and Topics tab.
-
Expand the target namespace in the list of namespaces to view the names of the topics within. Do not use the Full Name.
-
-
Use the client to create a producer.
There are many configuration options for producers. For this example, declare the topic where messages should go.
index.jsconst producer = await client.createProducer({ topic: topic, }); -
Send a message and receive acknowledgment:
index.jsproducer.send({ data: Buffer.from("Hello World"), }); console.log("sent message"); -
Clean up:
index.jsawait producer.flush(); await producer.close();At this point, the script produces a message that waits to be consumed and acknowledged.
-
Create a new consumer subscription, name the subscription, and declare the topic to watch:
index.jsconst consumer = await client.subscribe({ topic: topic, subscription: "examples-subscription", subscriptionType: "Exclusive", ackTimeoutMs: 10000, }); -
Receive messages, write them to the console, and acknowledge receipt with the broker:
index.jsconst msg = await consumer.receive(); console.log(msg.getData().toString()); consumer.acknowledge(msg); -
Clean up:
index.jsawait consumer.close(); await client.close(); })();
Run the script
In your Node.js project, run the script:
node index.js
Output such as the following confirms that the script succeeded:
sent message
Hello World