Use the Node.js Pulsar client with Astra Streaming

You can produce and consume messages with the Node.js Pulsar client and Astra Streaming.

Go to the examples repo for the complete source of this example.

Prerequisites

  • sudo permission to install dependencies

  • Node.js version 10 or later

  • npm 6 or later

  • A Pulsar topic in Astra Streaming

  • A text editor or IDE

Set up the environment

Install the C++ Pulsar library dependency required by the Node.js Pulsar client npm package.

Pulsar Node client versions 1.8 and later do not require installation of the C++ Pulsar library dependency.

  • Ubuntu-based Debian

  • Centos/RHEL-based rpm

wget https://archive.apache.org/dist/pulsar/pulsar-2.10.2/DEB/apache-pulsar-client.deb
wget https://archive.apache.org/dist/pulsar/pulsar-2.10.2/DEB/apache-pulsar-client-dev.deb

sudo apt install -y ./apache-pulsar-client.deb
sudo apt install -y ./apache-pulsar-client*.deb

sudo ldconfig
wget https://archive.apache.org/dist/pulsar/pulsar-2.10.2/RPMS/apache-pulsar-client-2.10.2-1.x86_64.rpm
wget https://archive.apache.org/dist/pulsar/pulsar-2.10.2/RPMS/apache-pulsar-client-devel-2.10.2-1.x86_64.rpm

sudo rpm -i ./apache-pulsar-client-2.10.2-1.x86_64.rpm
sudo rpm -i ./apache-pulsar-client-devel-2.10.2-1.x86_64.rpm

sudo ldconfig

Create a project

In a terminal, run the following script to create a new Node.js project. If NPM asks for project values, use the automatically suggested defaults.

mkdir SimpleProducerConsumer && cd SimpleProducerConsumer

touch index.js

npm init -y

npm install pulsar-client

Write the script

  1. In your new project, open the index.js file, and then add the following code.

    index.js
    const Pulsar = require("pulsar-client");
    
    (async () => {
      const serviceUrl = "<REPLACE_WITH_SERVICE_URL>";
      const pulsarToken = "<REPLACE_WITH_PULSAR_TOKEN>";
    
      const tenantName = "<REPLACE_WITH_TENANT_NAME>";
      const namespace = "<REPLACE_WITH_NAMESPACE>";
      const topicName = "<REPLACE_WITH_TOPIC>";
    
      const topic = `persistent://${tenantName}/${namespace}/${topicName}`;
    
      // Debian Ubuntu:
      const trustStore = '/etc/ssl/certs/ca-certificates.crt'
      // CentOS RHEL:
      // const trustStore = "/etc/ssl/certs/ca-bundle.crt";
    
      const auth = new Pulsar.AuthenticationToken({ token: pulsarToken });
    
      const client = new Pulsar.Client({
        serviceUrl: serviceUrl,
        authentication: auth,
        tlsTrustCertsFilePath: trustStore,
        operationTimeoutSeconds: 30,
      });

    This script is intentionally incomplete. Your IDE might show errors until you complete the next steps.

  2. Provide values for the following variables:

    Parameter Definition Where to find the value

    serviceUrl

    The URL to connect to the Pulsar cluster

    In the Astra Portal navigation menu, click Streaming, select your streaming tenant, and then click the Connect tab. In the Details section, get the Broker Service URL.

    pulsarToken

    The token for Pulsar cluster authentication

    For information about creating Pulsar tokens, see Manage tokens.

    tenantName

    The name of your streaming tenant

    In the Astra Portal navigation menu, click Streaming, select your streaming tenant. In the Details section, get the Name.

    namespace

    The segmented area for certain topics in your streaming tenant

    In the Astra Portal navigation menu, click Streaming, select your streaming tenant, and then click the Namespace and Topics tab. Choose the target namespace from the list of namespaces.

    topicName

    Topic name (not the full name)

    In the Astra Portal navigation menu, click Streaming, select your streaming tenant, and then click the Namespace and Topics tab. Expand the target namespace in the list of namespaces to view the names of the topics within. Do not use the Full Name.

  3. Use the client to create a producer.

    There are many configuration options for producers. For this example, declare the topic where messages should go.

    index.js
      const producer = await client.createProducer({
        topic: topic,
      });
  4. Send a message and receive acknowledgment:

    index.js
      producer.send({
        data: Buffer.from("Hello World"),
      });
      console.log("sent message");
  5. Clean up:

    index.js
      await producer.flush();
      await producer.close();

    At this point, the script produces a message that waits to be consumed and acknowledged.

  6. Create a new consumer subscription, name the subscription, and declare the topic to watch:

    index.js
      const consumer = await client.subscribe({
        topic: topic,
        subscription: "examples-subscription",
        subscriptionType: "Exclusive",
        ackTimeoutMs: 10000,
      });
  7. Receive messages, write them to the console, and acknowledge receipt with the broker:

    index.js
      const msg = await consumer.receive();
      console.log(msg.getData().toString());
      consumer.acknowledge(msg);
  8. Clean up:

    index.js
      await consumer.close();
      await client.close();
    })();

Run the script

In your Node.js project, run the script:

node index.js

Output such as the following confirms that the script succeeded:

sent message
Hello World

Was this helpful?

Give Feedback

How can we improve the documentation?

© 2024 DataStax | Privacy policy | Terms of use

Apache, Apache Cassandra, Cassandra, Apache Tomcat, Tomcat, Apache Lucene, Apache Solr, Apache Hadoop, Hadoop, Apache Pulsar, Pulsar, Apache Spark, Spark, Apache TinkerPop, TinkerPop, Apache Kafka and Kafka are either registered trademarks or trademarks of the Apache Software Foundation or its subsidiaries in Canada, the United States and/or other countries. Kubernetes is the registered trademark of the Linux Foundation.

General Inquiries: +1 (650) 389-6000, info@datastax.com