Concurrent Execution API

The DataStax Node.js driver provides a set of utilities for concurrent query execution, to facilitate executing multiple queries in parallel while controlling the concurrency level.

The concurrent execution API can useful when, for example, you want to insert a large group of rows from an Array or a Stream and evaluate failures, if any, at the end.

Usage samples

Using a fixed query and an Array of arrays as parameters

When an Array of arrays is provided, one query per each item in the Array will be executed, using each item as parameters.

const query = 'INSERT INTO table1 (id, value) VALUES (?, ?)';
const parameters = [[1, 'a'], [2, 'b'], [3, 'c'], ]; // ...
const result = await executeConcurrent(client, query, parameters);

You can visit the code examples in the driver repository to check out a working example.

Using a fixed query and a readable stream

When a Stream instance is provided the driver will read from the input stream and execute one query per item emitted. The driver will throttle reads of the input stream based on the concurrency level configured and the amount of current in-flight requests.

The Stream instance should be a readable, in object mode, and emit Array instances. Per each item emitted, one query will be executed.

const stream = csvStream.pipe(transformLineToArrayStream);
const result = await executeConcurrent(client, query, stream);

Using a different queries

const queryAndParameters = [
  { query: 'INSERT INTO videos (id, name, user_id) VALUES (?, ?, ?)',
    params: [ id, name, userId ] },
  { query: 'INSERT INTO user_videos (user_id, id, name) VALUES (?, ?, ?)',
    params: [ userId, id, name ] },
  { query: 'INSERT INTO latest_videos (id, name, user_id) VALUES (?, ?, ?)',
    params: [ id, name, userId ] },
];

const result = await executeConcurrent(client, queryAndParameters);

Execute all queries and deal with execution errors at the end

When setting raiseOnFirstError to false, the driver will continue to execute the queries even when one or more errors are encountered. The returned Promise will be resolved and you can inspect the property errors to obtain each individual error information.

const result = await executeConcurrent(client, query, parameters, { raiseOnFirstError: false });

for (let err of result.errors) {
  // ...
}

Defining concurrency level

Use the concurrencyLevel option property to set the maximum amount of requests that can be executed simultaneously. It defaults to 100.

const result = await executeConcurrent(client, query, parameters, { concurrencyLevel: 200 });

Note that increasing the amount of simultaneous requests will result in further queueing at the driver level and the server nodes level. You should find the optimal to get high throughput and low latency, based on your cluster size and hardware specifications. Using a higher concurrency level setting than optimal might result in query timeouts.

Collecting all the ResultSet instances of each individual execution

In the case you want the driver to collect each individual ResultSet instance, you can use the collectResults flag.

const result = await executeConcurrent(client, query, parameters, { collectResults: true });

for (let rs of result.resultItems) {
  // ...
}