Node.js code examples

Astra Streaming is currently in private beta. Join the Astra Streaming Waitlist.

Astra Streaming is powered by Apache Pulsar. To connect to your service, use the open-source client APIs provided by the Apache Pulsar project.

The Node.js client APIs are distributed through NPM. On Linux, we recommend Node version 10+ and npm 6+. You can find document and source code for the Node.js client library here.

Astra Streaming is running Pulsar version 2.6.3. You should use this API version or higher. Since the current Node.js client library depends on the C+\+ client library, please install the C+\+ library version 2.6.3.

In the code examples, you’ll need to make the following substitutions:

  • <service-url>: The Pulsar service URL from the Astra Streaming Console Connect tab.

  • <streaming-token>: Your access token from the Astra Streaming Console Connect tab.

  • <topic-url>: A topic URL of your choice from the Astra Streaming Console Topics tab.

  • <subscription-name>: A subscription name of your choice, or one that you’ve configured in the Details tab of an existing topic.

  • npm

  • Producer

  • Consumer

  • Reader

Since the current Node.js client library depends on the C+\+ client library, please install the C+\+ library version 2.6.3. For an Ubuntu based deb package:

wget https://archive.apache.org/dist/pulsar/pulsar-2.6.3/DEB/apache-pulsar-client.deb
wget https://archive.apache.org/dist/pulsar/pulsar-2.6.3/DEB/apache-pulsar-client-dev.deb

sudo apt install -y ./apache-pulsar-client.deb
sudo apt install -y ./apache-pulsar-client-dev.deb

sudo ldconfig

This sample creates a producer on a topic, sends 10 messages.

const Pulsar = require('pulsar-client');
(async () => {
  // Token based authentication
  const tokenStr = '<streaming-token>';
  const pulsarUri = '<service-url>';
  const topicName = '<topic-url>';

  const trustStore = '/etc/ssl/certs/ca-bundle.crt';
  // Debian Ubuntu:
  // const trustStore = '/etc/ssl/certs/ca-certificates.crt'
  // OSX:
  // Export the default certificates to a file, then use that file:
  // security find-certificate -a -p /System/Library/Keychains/SystemCACertificates.keychain > ./ca-certificates.crt
  // trust_certs='./ca-certificates.crt'

  const auth = new Pulsar.AuthenticationToken({token: tokenStr});

  // Create Pulsar Client
  const client = new Pulsar.Client({
    serviceUrl: pulsarUri,
    authentication: auth,
    tlsTrustCertsFilePath: trustStore,
    operationTimeoutSeconds: 30,
  });

  // Create a producer
  const producer = await client.createProducer({
    topic: topicName,
  });

  // Send messages
  for (let i = 0; i < 10; i += 1) {
    producer.send({
      data: Buffer.from('nodejs-message-'+i)
    });
    console.log('send message ' + i);
  }
  await producer.flush();

  await producer.close();
  await client.close();
})();

This sample creates a consumer on a topic, waits for a message, acknowledges it in a loop.

const Pulsar = require('pulsar-client');
(async () => {
  // Token based on authentication
  const tokenStr = '<streaming-token>';
  const pulsarUri = '<service-url>';
  const topicName = '<topic-url>';
  const subscriptionName = '<topic-name>'

  const trustStore = '/etc/ssl/certs/ca-bundle.crt';
  // Debian Ubuntu:
  // const trustStore = '/etc/ssl/certs/ca-certificates.crt'
  // OSX:
  // Export the default certificates to a file, then use that file:
  // security find-certificate -a -p /System/Library/Keychains/SystemCACertificates.keychain > ./ca-certificates.crt
  // trust_certs='./ca-certificates.crt'

  const auth = new Pulsar.AuthenticationToken({token: tokenStr});

  // Create a client
  const client = new Pulsar.Client({
    serviceUrl: pulsarUri,
    authentication: auth,
    tlsTrustCertsFilePath: trustStore,
    operationTimeoutSeconds: 30,
  });

  // Create consumer
  const consumer = await client.subscribe({
    topic: topicName,
    subscription: subscriptionName,
    subscriptionType: 'Exclusive',
    ackTimeoutMs: 10000,
  });

  // Receive and acknowledge messages
  for (let i = 0; i < 100; i += 1) {
    const msg = await consumer.receive();
    console.log(msg.getData().toString());
    consumer.acknowledge(msg);
  }

  await consumer.close();
  await client.close();
})();

This sample creates a reader on a topic and reads the earliest or latest message from the topic up to 1000 messages.

const Pulsar = require('pulsar-client');

(async () => {
  // Token based authentication
  const tokenStr = '<streaming-token>';
  const pulsarUri = '<service-url>';
  const topicName = '<topic-url>';

  const trustStore = '/etc/ssl/certs/ca-bundle.crt';
  // Debian Ubuntu:
  // const trustStore = '/etc/ssl/certs/ca-certificates.crt'
  // OSX:
  // Export the default certificates to a file, then use that file:
  // security find-certificate -a -p /System/Library/Keychains/SystemCACertificates.keychain > ./ca-certificates.crt
  // trust_certs='./ca-certificates.crt'

  const auth = new Pulsar.AuthenticationToken({token: tokenStr});

  // Create a client
   const client = new Pulsar.Client({
    serviceUrl: pulsarUri,
    authentication: auth,
    tlsTrustCertsFilePath: trustStore,
    operationTimeoutSeconds: 30,
  });

  // Create a reader
  const reader = await client.createReader({
    topic: topicName,
    startMessageId: Pulsar.MessageId.earliest(),
  });

  for (let i = 0; i < 1000; i += 1) {
    console.log((await reader.readNext()).getData().toString());
  }

  await reader.close();
  await client.close();
})();