var events = require('events');
var util = require('util');
var async = require('async');
var utils = require('./utils.js');
var errors = require('./errors.js');
var types = require('./types.js');
var ControlConnection = require('./control-connection.js');
var RequestHandler = require('./request-handler.js');
var requests = require('./requests.js');
var clientOptions = require('./client-options.js');
var encoder = require('./encoder.js');
/**
* Client options
* @typedef {Object} ClientOptions
* @property {Array} contactPoints Array of addresses or host names of the nodes to add as contact point.
* @property {Object} policies
* @property {LoadBalancingPolicy} policies.loadBalancing The load balancing policy instance to be used to determine the coordinator per query.
* @property {RetryPolicy} policies.retry The retry policy.
* @property {ReconnectionPolicy} policies.reconnection The reconnection policy to be used.
* @property {QueryOptions} queryOptions Default options for all queries.
* @property {Object} pooling Pooling options.
* @property {Object} protocolOptions
* @property {Number} protocolOptions.port The port to use to connect to the Cassandra host. If not set through this method, the default port (9042) will be used instead.
* @property {Number} protocolOptions.maxSchemaAgreementWaitSeconds The maximum time in seconds to wait for schema agreement between nodes before returning from a DDL query (defaults to 10).
* @property {Object} socketOptions
* @property {Number} socketOptions.connectTimeout Connection timeout in milliseconds.
* @property {Boolean} socketOptions.keepAlive Whether to enable TCP keepalive on the socket. Default: true.
* @property {Number} socketOptions.keepAliveDelay TCP keepalive delay in milliseconds. Default: 10000.
* @property {AuthProvider} authProvider Provider to be used to authenticate to an auth-enabled host.
* @property {Object} sslOptions Client-to-node ssl options, when set the driver will use the secure layer. You can specify cert, ca, ... options named after the Node.js tls.connect options.
*/
/**
* Query options
* @typedef {Object} QueryOptions
* @property {Number} consistency Consistency level.
* @property {Number} fetchSize Amount of rows to retrieve per page.
* @property {Boolean} prepare Determines if the query must be executed as a prepared statement.
* @property {Boolean} autoPage Determines if the driver must retrieve the next pages.
* @property {Buffer|Array} routingKey Partition key(s) to determine which coordinator should be used for the query.
* @property {Array} routingIndexes Index of the parameters that are part of the partition key to determine the routing.
*/
/**
* A Client holds connections to a Cassandra cluster, allowing it to be queried.
* Each Client instance maintains multiple connections to the cluster nodes,
* provides [policies]{@link module:policies} to choose which node to use for each query,
* and handles [retries]{@link module:policies/retry} for failed query (when it makes sense), etc...
* <p>
* Client instances are designed to be long-lived and usually a single instance is enough
* per application. As a given Client can only be "logged" into one keyspace at
* a time (where the "logged" keyspace is the one used by query if the query doesn't
* explicitely use a fully qualified table name), it can make sense to create one
* client per keyspace used. This is however not necessary to query multiple keyspaces
* since it is always possible to use a single session with fully qualified table name
* in queries.
* </p>
* @param {ClientOptions} options
* @constructor
*/
function Client(options) {
events.EventEmitter.call(this);
//Unlimited amount of listeners for internal event queues by default
this.setMaxListeners(0);
this.options = clientOptions.extend({logEmitter: this.emit.bind(this)}, options);
this.controlConnection = new ControlConnection(this.options);
this.hosts = null;
this.connected = false;
this.keyspace = options.keyspace;
this.preparedQueries = {"__length": 0};
}
util.inherits(Client, events.EventEmitter);
/**
* Tries to connect to one of the [contactPoints]{@link ClientOptions} and discover the nodes of the cluster.
* <p>
* If the {@link Client} is already connected, it immediately invokes callback.
* </p>
* @param {function} callback The callback is invoked when the pool is connected
* (or at least 1 connected and the rest failed to connect) or it is not possible to connect
*/
Client.prototype.connect = function (callback) {
if (this.connected) return callback();
if (this.connecting) {
//add a listener and move on
return this.once('connected', callback);
}
this.connecting = true;
var self = this;
this.controlConnection.init(function (err) {
if (err) return connectCallback(err);
//we have all the data from the cluster
self.hosts = self.controlConnection.hosts;
self.metadata = self.controlConnection.metadata;
self.options.policies.loadBalancing.init(self, self.hosts, function (err) {
if (err) return connectCallback(err);
if (self.keyspace) {
return self._setKeyspaceFirst(connectCallback);
}
connectCallback();
});
});
function connectCallback(err) {
self.connected = !err;
self.connecting = false;
try{
callback(err);
}
finally {
self.emit('connected', err);
}
}
};
//noinspection JSValidateJSDoc,JSCommentMatchesSignature
/**
* Executes a query on an available connection.
* <p>
* The query can be prepared (recommended) or not depending on {@link QueryOptions}.prepare flag. Retries on multiple hosts if needed.
* </p>
* @param {String} query The query to execute
* @param {Array} [params] Array of params to replace
* @param {QueryOptions} [options]
* @param {ResultCallback} callback Executes callback(err, result) when finished
*/
Client.prototype.execute = function () {
var args = utils.parseCommonArgs.apply(null, arguments);
args.options = utils.extend({}, this.options.queryOptions, args.options);
args.callback = utils.bindDomain(args.callback);
this._innerExecute(args.query, args.params, args.options, args.callback);
};
//noinspection JSValidateJSDoc,JSCommentMatchesSignature
/**
* Executes the query and calls rowCallback for each row as soon as they are received.
* Calls final callback after all rows have been sent, or when there is an error.
* <p>
* The query can be prepared (recommended) or not depending on {@link QueryOptions}.prepare flag. Retries on multiple hosts if needed.
* </p>
* @param {String} query The query to prepare and execute
* @param {Array} [param] Array of params
* @param {QueryOptions} [options]
* @param {function} rowCallback Executes rowCallback(n, row) per each row received, where n is the row index and row is the current Row.
* @param {function} [callback] Executes callback(err, totalCount) after all rows have been received.
*/
Client.prototype.eachRow = function () {
var args = Array.prototype.slice.call(arguments);
var rowCallback;
//accepts an extra callback
if(typeof args[args.length-1] === 'function' && typeof args[args.length-2] === 'function') {
//pass it through the options parameter
rowCallback = args.splice(args.length-2, 1)[0];
}
args = utils.parseCommonArgs.apply(null, args);
if (!rowCallback) {
//only one callback has been defined
rowCallback = args.callback;
args.callback = function () {};
}
args.options = utils.extend({}, args.options, {
byRow: true,
rowCallback: utils.bindDomain(rowCallback)
});
args.callback = utils.bindDomain(args.callback);
var self = this;
function pageCallback (err, result) {
if (err) {
return args.callback(err);
}
if (args.options.autoPage) {
//Next requests for autopaging
args.options.rowLength = args.options.rowLength || 0;
args.options.rowLength += result.rowLength;
args.options.rowLengthArray = args.options.rowLengthArray || [];
args.options.rowLengthArray.push(result.rowLength);
if (result.meta && result.meta.pageState) {
//Use new page state as next request page state
args.options.pageState = result.meta.pageState;
//Issue next request for the next page
self._innerExecute(args.query, args.params, args.options, pageCallback);
return;
}
//finished auto-paging
result.rowLength = args.options.rowLength;
result.rowLengthArray = args.options.rowLengthArray;
}
args.callback(null, result);
}
this._innerExecute(args.query, args.params, args.options, pageCallback);
};
//noinspection JSValidateJSDoc,JSCommentMatchesSignature
/**
* Executes the query and pushes the rows to the result stream
* as soon as they received.
* Calls callback after all rows have been sent, or when there is an error.
* <p>
* The stream is a [Readable Streams2]{@link http://nodejs.org/api/stream.html#stream_class_stream_readable} object
* that contains the raw bytes of the field value.
* It can be piped downstream and provides automatic pause/resume logic (it buffers when not read).
* </p>
* <p>
* The query can be prepared (recommended) or not depending on {@link QueryOptions}.prepare flag. Retries on multiple hosts if needed.
* </p>
* @param {String} query The query to prepare and execute
* @param {Array} [param] Array of params
* @param {QueryOptions} [options]
* @param {function} [callback], executes callback(err) after all rows have been received or if there is an error
* @returns {exports.ResultStream}
*/
Client.prototype.stream = function () {
var args = Array.prototype.slice.call(arguments);
if (typeof args[args.length-1] !== 'function') {
//the callback is not required
args.push(function noop() {});
}
args = utils.parseCommonArgs.apply(null, args);
var resultStream = new types.ResultStream({objectMode: 1});
this.eachRow(args.query, args.params, args.options, function rowCallback(n, row) {
resultStream.add(row);
}, function (err) {
if (err) {
resultStream.emit('error', err);
}
resultStream.add(null);
args.callback(err);
});
return resultStream;
};
//noinspection JSValidateJSDoc,JSCommentMatchesSignature
/**
* Executes batch of queries on an available connection to a host.
* @param {Array.<string>|Array.<{query, params}>} queries The queries to execute as an Array of strings or as an array of object containing the query and params
* @param {QueryOptions} [options]
* @param {ResultCallback} callback Executes callback(err, result) when the batch was executed
*/
Client.prototype.batch = function () {
var self = this;
var args = this._parseBatchArgs.apply(null, arguments);
//logged batch by default
args.options = utils.extend({logged: true}, this.options.queryOptions, args.options);
args.callback = utils.bindDomain(args.callback);
this.connect(function afterConnect(err) {
if (err) {
return args.callback(err);
}
var request = new requests.BatchRequest(args.queries, args.options.consistency, args.options);
var handler = new RequestHandler(self, self.options);
handler.send(request, null, args.callback);
});
};
/**
* Gets the host list representing the replicas that contain such partition.
* @param {String} keyspace
* @param {Buffer} token
* @returns {Array}
*/
Client.prototype.getReplicas = function (keyspace, token) {
return this.metadata.getReplicas(keyspace, token);
};
Client.prototype.log = utils.log;
/**
* Closes all connections to all hosts
* @param {Function} [callback]
*/
Client.prototype.shutdown = function (callback) {
//Go through all the host and shut down their pools
this.log('info', 'Shutting down');
if (!callback) {
callback = function() {};
}
if (!this.hosts) {
return callback();
}
var hosts = this.hosts.slice(0);
async.each(hosts, function (h, next) {
h.shutdown(next);
}, callback);
};
/**
* Waits until that the schema version in all nodes is the same or the waiting time passed.
* @param {Function} callback
* @private
*/
Client.prototype.waitForSchemaAgreement = function (callback) {
if (this.hosts.length === 1) {
return setImmediate(callback);
}
var self = this;
var start = new Date();
var maxWaitTime = this.options.protocolOptions.maxSchemaAgreementWaitSeconds * 1000;
var versionCount = 0;
this.log('info', 'Waiting for schema agreement');
async.doWhilst(function (next) {
async.series([
self.controlConnection.getLocalSchemaVersion.bind(self.controlConnection),
self.controlConnection.getPeersSchemaVersions.bind(self.controlConnection)
], function (err, results) {
if (err) return next(err);
//check the different versions
var versions = {};
versions[results[0]] = true;
var peerVersions = results[1];
for (var i = 0; i < peerVersions.length; i++) {
versions[peerVersions[i]] = true;
}
versionCount = Object.keys(versions).length;
if (versionCount === 1) {
self.log('info', 'Schema versions match');
return next();
}
//let some time pass before the next check
return setTimeout(next, 500);
});
}, function () {
return versionCount !== 1 && (new Date() - start) < maxWaitTime;
}, callback);
};
/**
* Connects and handles the execution of prepared and simple statements. All parameters are mandatory.
* @param {string} query
* @param {Array} params
* @param {Object} options
* @param {Function} callback
* @private
*/
Client.prototype._innerExecute = function (query, params, options, callback) {
var self = this;
function innerCallback (err, result) {
if (err) {
//set query as an error property
err.query = query;
}
callback(err, result);
}
this.connect(function afterConnect(err) {
if (err) {
return innerCallback(err);
}
if (options.prepare) {
return self._executeAsPrepared(query, params, options, innerCallback);
}
try {
encoder.setRoutingKey(params, options);
}
catch (err) {
return innerCallback(err);
}
var request = new requests.QueryRequest(
query,
params,
options);
var handler = new RequestHandler(self, self.options);
handler.send(request, options, innerCallback);
});
};
/**
* Prepares (the first time) and executes the prepared query, retrying on multiple hosts if needed.
* @param {String} query The query to prepare and execute
* @param {Array} params Array of params
* @param {Object} options
* @param {ResultCallback} callback Executes callback(err, result) when finished
* @private
*/
Client.prototype._executeAsPrepared = function (query, params, options, callback) {
var self = this;
async.waterfall([
this.connect.bind(this),
function (next) {
self._getPrepared(query, next);
},
function (queryId, meta, next) {
options = utils.extend({}, options, {hints: utils.toHint(meta.columns)});
try {
encoder.setRoutingKey(params, options);
}
catch (err) {
return next(err);
}
var request = new requests.ExecuteRequest(
queryId,
params,
options);
request.query = query;
var handler = new RequestHandler(self, self.options);
handler.send(request, options, next);
}
], callback);
};
/**
* Parses and validates the arguments received by executeBatch
* @private
*/
Client.prototype._parseBatchArgs = function (queries, options, callback) {
var args = Array.prototype.slice.call(arguments);
if (args.length < 2 || typeof args[args.length-1] !== 'function') {
throw new Error('It should contain at least 2 arguments, with the callback as the last argument.');
}
if (!util.isArray(queries)) {
throw new Error('The first argument must be an Array of queries.');
}
if (args.length < 3) {
callback = args[args.length-1];
options = null;
}
args.queries = queries;
args.options = options;
args.callback = callback;
return args;
};
/**
* It returns the id of the prepared query.
* If its not prepared, it prepares the query.
* If its being prepared, it queues the callback
* @param {String} query Query to prepare with ? as placeholders
* @param {function} callback Executes callback(err, queryId) when there is a prepared statement on a connection or there is an error.
* @private
*/
Client.prototype._getPrepared = function (query, callback) {
//overflow protection
if (this.preparedQueries.__length >= this.options.maxPrepared) {
var toRemove;
this.log('warning', 'Prepared statements exceeded maximum. This could be caused by preparing queries that contain parameters');
for (var key in this.preparedQueries) {
if (this.preparedQueries.hasOwnProperty(key) && this.preparedQueries[key].queryId) {
toRemove = key;
break;
}
}
if (toRemove) {
delete this.preparedQueries[toRemove];
this.preparedQueries.__length--;
}
}
var name = ( this.keyspace || '' ) + query;
var info = this.preparedQueries[name];
if (!info) {
info = new events.EventEmitter();
info.setMaxListeners(0);
this.preparedQueries[name] = info;
this.preparedQueries.__length++;
}
if (info.queryId) {
return callback(null, info.queryId, info.meta);
}
info.once('prepared', callback);
if (info.preparing) {
//it is already being prepared
return;
}
info.preparing = true;
var request = new requests.PrepareRequest(query);
var handler = new RequestHandler(this, this.options);
handler.send(request, null, function (err, response) {
info.preparing = false;
if (err) {
return info.emit('prepared', err);
}
info.queryId = response.id;
info.meta = response.meta;
info.emit('prepared', null, info.queryId, info.meta);
});
};
/**
* Sets the keyspace to the first connection available.
* @param {Function} callback
* @private
*/
Client.prototype._setKeyspaceFirst = function (callback) {
var handler = new RequestHandler(this, this.options);
//Until now the pool is in mint condition
//When getting the next connection, it issues a USE keyspace if necessary
handler.getNextConnection(null, function (err) {
//In case there is an error it probably means that the keyspace could not be switched.
callback(err);
});
};
/**
* Callback used by execution methods.
* @callback ResultCallback
* @param {Error} err Error occurred in the execution of the query.
* @param {Object} result Result of the execution of the query.
* @param {Array} result.rows Array of rows.
*/
module.exports = Client;