cassandra.concurrent
- Utilities for Concurrent Statement Execution¶
-
cassandra.concurrent.
execute_concurrent
(session, statements_and_parameters, concurrency=100, raise_on_first_error=True, results_generator=False)[source]¶ Executes a sequence of (statement, parameters) tuples concurrently. Each
parameters
item must be a sequence orNone
.The concurrency parameter controls how many statements will be executed concurrently. When
Cluster.protocol_version
is set to 1 or 2, it is recommended that this be kept below 100 times the number of core connections per host times the number of connected hosts (seeCluster.set_core_connections_per_host()
). If that amount is exceeded, the event loop thread may attempt to block on new connection creation, substantially impacting throughput. Ifprotocol_version
is 3 or higher, you can safely experiment with higher levels of concurrency.If raise_on_first_error is left as
True
, execution will stop after the first failed statement and the corresponding exception will be raised.results_generator controls how the results are returned.
If
False
, the results are returned only after all requests have completed.If
True
, a generator expression is returned. Using a generator results in a constrained memory footprint when the results set will be large – results are yielded as they return instead of materializing the entire list at once. The trade for lower memory footprint is marginal CPU overhead (more thread coordination and sorting out-of-order results on-the-fly).A sequence of
(success, result_or_exc)
tuples is returned in the same order that the statements were passed in. Ifsuccess
isFalse
, there was an error executing the statement, andresult_or_exc
will be anException
. Ifsuccess
isTrue
,result_or_exc
will be the query result.Example usage:
select_statement = session.prepare("SELECT * FROM users WHERE id=?") statements_and_params = [] for user_id in user_ids: params = (user_id, ) statements_and_params.append((select_statement, params)) results = execute_concurrent( session, statements_and_params, raise_on_first_error=False) for (success, result) in results: if not success: handle_error(result) # result will be an Exception else: process_user(result[0]) # result will be a list of rows
-
cassandra.concurrent.
execute_concurrent_with_args
(session, statement, parameters, *args, **kwargs)[source]¶ Like
execute_concurrent()
, but takes a single statement and a sequence of parameters. Each item inparameters
should be a sequence orNone
.Example usage:
statement = session.prepare("INSERT INTO mytable (a, b) VALUES (1, ?)") parameters = [(x,) for x in range(1000)] execute_concurrent_with_args(session, statement, parameters, concurrency=50)