# Copyright 2013-2015 DataStax, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import six
import sys
from itertools import count, cycle
import logging
from six.moves import xrange
from threading import Event
from cassandra.cluster import PagedResult
log = logging.getLogger(__name__)
[docs]def execute_concurrent(session, statements_and_parameters, concurrency=100, raise_on_first_error=True):
"""
Executes a sequence of (statement, parameters) tuples concurrently. Each
``parameters`` item must be a sequence or :const:`None`.
A sequence of ``(success, result_or_exc)`` tuples is returned in the same
order that the statements were passed in. If ``success`` is :const:`False`,
there was an error executing the statement, and ``result_or_exc`` will be
an :class:`Exception`. If ``success`` is :const:`True`, ``result_or_exc``
will be the query result.
If `raise_on_first_error` is left as :const:`True`, execution will stop
after the first failed statement and the corresponding exception will be
raised.
The `concurrency` parameter controls how many statements will be executed
concurrently. When :attr:`.Cluster.protocol_version` is set to 1 or 2,
it is recommended that this be kept below 100 times the number of
core connections per host times the number of connected hosts (see
:meth:`.Cluster.set_core_connections_per_host`). If that amount is exceeded,
the event loop thread may attempt to block on new connection creation,
substantially impacting throughput. If :attr:`~.Cluster.protocol_version`
is 3 or higher, you can safely experiment with higher levels of concurrency.
Example usage::
select_statement = session.prepare("SELECT * FROM users WHERE id=?")
statements_and_params = []
for user_id in user_ids:
params = (user_id, )
statements_and_params.append((select_statement, params))
results = execute_concurrent(
session, statements_and_params, raise_on_first_error=False)
for (success, result) in results:
if not success:
handle_error(result) # result will be an Exception
else:
process_user(result[0]) # result will be a list of rows
"""
if concurrency <= 0:
raise ValueError("concurrency must be greater than 0")
if not statements_and_parameters:
return []
# TODO handle iterators and generators naturally without converting the
# whole thing to a list. This would require not building a result
# list of Nones up front (we don't know how many results there will be),
# so a dict keyed by index should be used instead. The tricky part is
# knowing when you're the final statement to finish.
statements_and_parameters = list(statements_and_parameters)
event = Event()
first_error = [] if raise_on_first_error else None
to_execute = len(statements_and_parameters)
results = [None] * to_execute
num_finished = count(1)
statements = enumerate(iter(statements_and_parameters))
for i in xrange(min(concurrency, len(statements_and_parameters))):
_execute_next(_sentinel, i, event, session, statements, results, None, num_finished, to_execute, first_error)
event.wait()
if first_error:
exc = first_error[0]
if six.PY2 and isinstance(exc, tuple):
(exc_type, value, traceback) = exc
six.reraise(exc_type, value, traceback)
else:
raise exc
else:
return results
[docs]def execute_concurrent_with_args(session, statement, parameters, *args, **kwargs):
"""
Like :meth:`~cassandra.concurrent.execute_concurrent()`, but takes a single
statement and a sequence of parameters. Each item in ``parameters``
should be a sequence or :const:`None`.
Example usage::
statement = session.prepare("INSERT INTO mytable (a, b) VALUES (1, ?)")
parameters = [(x,) for x in range(1000)]
execute_concurrent_with_args(session, statement, parameters, concurrency=50)
"""
return execute_concurrent(session, list(zip(cycle((statement,)), parameters)), *args, **kwargs)
_sentinel = object()
def _handle_error(error, result_index, event, session, statements, results,
future, num_finished, to_execute, first_error):
if first_error is not None:
first_error.append(error)
event.set()
return
else:
results[result_index] = (False, error)
if next(num_finished) >= to_execute:
event.set()
return
try:
(next_index, (statement, params)) = next(statements)
except StopIteration:
return
try:
future = session.execute_async(statement, params)
args = (next_index, event, session, statements, results, future, num_finished, to_execute, first_error)
future.add_callbacks(
callback=_execute_next, callback_args=args,
errback=_handle_error, errback_args=args)
except Exception as exc:
if first_error is not None:
if six.PY2:
first_error.append(sys.exc_info())
else:
first_error.append(exc)
event.set()
return
else:
results[next_index] = (False, exc)
if next(num_finished) >= to_execute:
event.set()
return
def _execute_next(result, result_index, event, session, statements, results,
future, num_finished, to_execute, first_error):
if result is not _sentinel:
if future.has_more_pages:
result = PagedResult(future, result)
future.clear_callbacks()
results[result_index] = (True, result)
finished = next(num_finished)
if finished >= to_execute:
event.set()
return
try:
(next_index, (statement, params)) = next(statements)
except StopIteration:
return
try:
future = session.execute_async(statement, params)
args = (next_index, event, session, statements, results, future, num_finished, to_execute, first_error)
future.add_callbacks(
callback=_execute_next, callback_args=args,
errback=_handle_error, errback_args=args)
except Exception as exc:
if first_error is not None:
if six.PY2:
first_error.append(sys.exc_info())
else:
first_error.append(exc)
event.set()
return
else:
results[next_index] = (False, exc)
if next(num_finished) >= to_execute:
event.set()
return