Connect to Astra Streaming using NodeJS
Astra Streaming is powered by Apache Pulsar. To connect to your service, use the open-source client APIs provided by the Apache Pulsar project.
Astra Streaming is running Pulsar version 2.8.1. You should use this API version or higher.
Choose the language that you would like to use:
Node.js
The Node.js client APIs are distributed through NPM. On Linux, we recommend Node version 10+ and npm 6+. You can find document and source code for the Node.js client library here.
Since the current Node.js client library depends on the C+\+ client library, please install the C+\+ library version 2.7.2.
In the code examples, you’ll need to make the following substitutions:
-
<service-url>: The Pulsar Broker Service URL from the Astra Streaming Console Connect tab.
-
<streaming-token>: Your access token from the Astra Streaming Console Connect tab.
-
<topic-url>: A topic URL of your choice from the Astra Streaming Console Topics tab.
-
<subscription-name>: A subscription name of your choice, or one that you’ve configured in the Details tab of an existing topic.
Since the current Node.js client library depends on the C+\+ client library, please install the C+\+ library version 2.7.2. For an Ubuntu based deb package:
wget https://archive.apache.org/dist/pulsar/pulsar-2.7.2/DEB/apache-pulsar-client.deb
wget https://archive.apache.org/dist/pulsar/pulsar-2.7.2/DEB/apache-pulsar-client-dev.deb
sudo apt install -y ./apache-pulsar-client.deb
sudo apt install -y ./apache-pulsar-client-dev.deb
sudo ldconfig
This sample creates a producer on a topic, sends 10 messages.
const Pulsar = require('pulsar-client');
(async () => {
// Token based authentication
const tokenStr = '<streaming-token>';
const pulsarUri = '<service-url>';
const topicName = '<topic-url>';
const trustStore = '/etc/ssl/certs/ca-bundle.crt';
// Debian Ubuntu:
// const trustStore = '/etc/ssl/certs/ca-certificates.crt'
// OSX:
// Export the default certificates to a file, then use that file:
// security find-certificate -a -p /System/Library/Keychains/SystemCACertificates.keychain > ./ca-certificates.crt
// trust_certs='./ca-certificates.crt'
const auth = new Pulsar.AuthenticationToken({token: tokenStr});
// Create Pulsar Client
const client = new Pulsar.Client({
serviceUrl: pulsarUri,
authentication: auth,
tlsTrustCertsFilePath: trustStore,
operationTimeoutSeconds: 30,
});
// Create a producer
const producer = await client.createProducer({
topic: topicName,
});
// Send messages
for (let i = 0; i < 10; i += 1) {
producer.send({
data: Buffer.from('nodejs-message-'+i)
});
console.log('send message ' + i);
}
await producer.flush();
await producer.close();
await client.close();
})();
This sample creates a consumer on a topic, waits for a message, acknowledges it in a loop.
const Pulsar = require('pulsar-client');
(async () => {
// Token based on authentication
const tokenStr = '<streaming-token>';
const pulsarUri = '<service-url>';
const topicName = '<topic-url>';
const subscriptionName = '<topic-name>'
const trustStore = '/etc/ssl/certs/ca-bundle.crt';
// Debian Ubuntu:
// const trustStore = '/etc/ssl/certs/ca-certificates.crt'
// OSX:
// Export the default certificates to a file, then use that file:
// security find-certificate -a -p /System/Library/Keychains/SystemCACertificates.keychain > ./ca-certificates.crt
// trust_certs='./ca-certificates.crt'
const auth = new Pulsar.AuthenticationToken({token: tokenStr});
// Create a client
const client = new Pulsar.Client({
serviceUrl: pulsarUri,
authentication: auth,
tlsTrustCertsFilePath: trustStore,
operationTimeoutSeconds: 30,
});
// Create consumer
const consumer = await client.subscribe({
topic: topicName,
subscription: subscriptionName,
subscriptionType: 'Exclusive',
ackTimeoutMs: 10000,
});
// Receive and acknowledge messages
for (let i = 0; i < 100; i += 1) {
const msg = await consumer.receive();
console.log(msg.getData().toString());
consumer.acknowledge(msg);
}
await consumer.close();
await client.close();
})();
This sample creates a reader on a topic and reads the earliest or latest message from the topic up to 1000 messages.
const Pulsar = require('pulsar-client');
(async () => {
// Token based authentication
const tokenStr = '<streaming-token>';
const pulsarUri = '<service-url>';
const topicName = '<topic-url>';
const trustStore = '/etc/ssl/certs/ca-bundle.crt';
// Debian Ubuntu:
// const trustStore = '/etc/ssl/certs/ca-certificates.crt'
// OSX:
// Export the default certificates to a file, then use that file:
// security find-certificate -a -p /System/Library/Keychains/SystemCACertificates.keychain > ./ca-certificates.crt
// trust_certs='./ca-certificates.crt'
const auth = new Pulsar.AuthenticationToken({token: tokenStr});
// Create a client
const client = new Pulsar.Client({
serviceUrl: pulsarUri,
authentication: auth,
tlsTrustCertsFilePath: trustStore,
operationTimeoutSeconds: 30,
});
// Create a reader
const reader = await client.createReader({
topic: topicName,
startMessageId: Pulsar.MessageId.earliest(),
});
for (let i = 0; i < 1000; i += 1) {
console.log((await reader.readNext()).getData().toString());
}
await reader.close();
await client.close();
})();
Next
-
Browse the Astra API References