Previous topic

cassandra - Exceptions and Enums

Next topic

cassandra.policies - Load balancing and Failure Handling Policies

This Page

cassandra.cluster - Clusters and Sessions

class cassandra.cluster.Cluster([contact_points=('127.0.0.1',)][, port=9042][, executor_threads=2], **attr_kwargs)[source]

The main class to use when interacting with a Cassandra cluster. Typically, one instance of this class will be created for each separate Cassandra cluster that your application interacts with.

Example usage:

>>> from cassandra.cluster import Cluster
>>> cluster = Cluster(['192.168.1.1', '192.168.1.2'])
>>> session = cluster.connect()
>>> session.execute("CREATE KEYSPACE ...")
>>> ...
>>> cluster.shutdown()

Any of the mutable Cluster attributes may be set as keyword arguments to the constructor.

cql_version = None

If a specific version of CQL should be used, this may be set to that string version. Otherwise, the highest CQL version supported by the server will be automatically used.

protocol_version = 2

The version of the native protocol to use. The protocol version 2 add support for lightweight transactions, batch operations, and automatic query paging, but is only supported by Cassandra 2.0+. When working with Cassandra 1.2, this must be set to 1. You can also set this to 1 when working with Cassandra 2.0+, but features that require the version 2 protocol will not be enabled.

port = 9042

The server-side port to open connections to. Defaults to 9042.

compression = True

Controls compression for communications between the driver and Cassandra. If left as the default of True, either lz4 or snappy compression may be used, depending on what is supported by both the driver and Cassandra. If both are fully supported, lz4 will be preferred.

You may also set this to ‘snappy’ or ‘lz4’ to request that specific compression type.

Setting this to False disables compression.

auth_provider None[source]

When protocol_version is 2 or higher, this should be an instance of a subclass of AuthProvider, such as PlainTextAuthProvider.

When protocol_version is 1, this should be a function that accepts one argument, the IP address of a node, and returns a dict of credentials for that node.

When not using authentication, this should be left as None.

load_balancing_policy = None

An instance of policies.LoadBalancingPolicy or one of its subclasses. Defaults to RoundRobinPolicy.

reconnection_policy = <cassandra.policies.ExponentialReconnectionPolicy object at 0x110e420d0>

An instance of policies.ReconnectionPolicy. Defaults to an instance of ExponentialReconnectionPolicy with a base delay of one second and a max delay of ten minutes.

default_retry_policy = <cassandra.policies.RetryPolicy object at 0x110e42110>

A default policies.RetryPolicy instance to use for all Statement objects which do not have a retry_policy explicitly set.

conviction_policy_factory = <class 'cassandra.policies.SimpleConvictionPolicy'>

A factory function which creates instances of policies.ConvictionPolicy. Defaults to policies.SimpleConvictionPolicy.

connection_class = <class 'cassandra.io.asyncorereactor.AsyncoreConnection'>

This determines what event loop system will be used for managing I/O with Cassandra. These are the current options:

By default, AsyncoreConnection will be used, which uses the asyncore module in the Python standard library. The performance is slightly worse than with libev, but it is supported on a wider range of systems.

If libev is installed, LibevConnection will be used instead.

metrics_enabled = False

Whether or not metric collection is enabled. If enabled, metrics will be an instance of Metrics.

metrics = None

An instance of cassandra.metrics.Metrics if metrics_enabled is True, else None.

metadata = None

An instance of cassandra.metadata.Metadata.

ssl_options = None

A optional dict which will be used as kwargs for ssl.wrap_socket() when new sockets are created. This should be used when client encryption is enabled in Cassandra.

By default, a ca_certs value should be supplied (the value should be a string pointing to the location of the CA certs file), and you probably want to specify ssl_version as ssl.PROTOCOL_TLSv1 to match Cassandra’s default protocol.

sockopts = None

An optional list of tuples which will be used as arguments to socket.setsockopt() for all created sockets.

max_schema_agreement_wait = 10

The maximum duration (in seconds) that the driver will wait for schema agreement across the cluster. Defaults to ten seconds.

control_connection_timeout = 2.0

A timeout, in seconds, for queries made by the control connection, such as querying the current schema and information about nodes in the cluster. If set to None, there will be no timeout for these queries.

connect(keyspace=None)[source]

Creates and returns a new Session object. If keyspace is specified, that keyspace will be the default keyspace for operations on the Session.

shutdown()[source]

Closes all sessions and connection associated with this Cluster. To ensure all connections are properly closed, you should always call shutdown() on a Cluster instance when you are done with it.

Once shutdown, a Cluster should not be used for any purpose.

register_listener(listener)[source]

Adds a cassandra.policies.HostStateListener subclass instance to the list of listeners to be notified when a host is added, removed, marked up, or marked down.

unregister_listener(listener)[source]

Removes a registered listener.

get_core_connections_per_host(host_distance)[source]

Gets the minimum number of connections per Session that will be opened for each host with HostDistance equal to host_distance. The default is 2 for LOCAL and 1 for REMOTE.

set_core_connections_per_host(host_distance, core_connections)[source]

Sets the minimum number of connections per Session that will be opened for each host with HostDistance equal to host_distance. The default is 2 for LOCAL and 1 for REMOTE.

get_max_connections_per_host(host_distance)[source]

Gets the maximum number of connections per Session that will be opened for each host with HostDistance equal to host_distance. The default is 8 for LOCAL and 2 for REMOTE.

set_max_connections_per_host(host_distance, max_connections)[source]

Gets the maximum number of connections per Session that will be opened for each host with HostDistance equal to host_distance. The default is 2 for LOCAL and 1 for REMOTE.

class cassandra.cluster.Session[source]

A collection of connection pools for each host in the cluster. Instances of this class should not be created directly, only using Cluster.connect().

Queries and statements can be executed through Session instances using the execute() and execute_async() methods.

Example usage:

>>> session = cluster.connect()
>>> session.set_keyspace("mykeyspace")
>>> session.execute("SELECT * FROM mycf")
default_timeout = 10.0

A default timeout, measured in seconds, for queries executed through execute() or execute_async(). This default may be overridden with the timeout parameter for either of those methods or the timeout parameter for ResponseFuture.result().

Setting this to None will cause no timeouts to be set by default.

Important: This timeout currently has no effect on callbacks registered on a ResponseFuture through ResponseFuture.add_callback() or ResponseFuture.add_errback(); even if a query exceeds this default timeout, neither the registered callback or errback will be called.

New in version 2.0.0.

row_factory = <function named_tuple_factory at 0x110e20758>

The format to return row results in. By default, each returned row will be a named tuple. You can alternatively use any of the following:

default_fetch_size = 5000

By default, this many rows will be fetched at a time. This can be specified per-query through Statement.fetch_size.

This only takes effect when protocol version 2 or higher is used. See Cluster.protocol_version for details.

New in version 2.0.0.

execute(statement[, parameters][, timeout][, trace])[source]

Execute the given query and synchronously wait for the response.

If an error is encountered while executing the query, an Exception will be raised.

query may be a query string or an instance of cassandra.query.Statement.

parameters may be a sequence or dict of parameters to bind. If a sequence is used, %s should be used the placeholder for each argument. If a dict is used, %(name)s style placeholders must be used.

timeout should specify a floating-point timeout (in seconds) after which an OperationTimedOut exception will be raised if the query has not completed. If not set, the timeout defaults to default_timeout. If set to None, there is no timeout.

If trace is set to True, an attempt will be made to fetch the trace details and attach them to the query‘s trace attribute in the form of a QueryTrace instance. This requires that query be a Statement subclass instance and not just a string. If there is an error fetching the trace details, the trace attribute will be left as None.

execute_async(statement[, parameters][, trace])[source]

Execute the given query and return a ResponseFuture object which callbacks may be attached to for asynchronous response delivery. You may also call result() on the ResponseFuture to syncronously block for results at any time.

If trace is set to True, you may call ResponseFuture.get_query_trace() after the request completes to retrieve a QueryTrace instance.

Example usage:

>>> session = cluster.connect()
>>> future = session.execute_async("SELECT * FROM mycf")

>>> def log_results(results):
...     for row in results:
...         log.info("Results: %s", row)

>>> def log_error(exc):
>>>     log.error("Operation failed: %s", exc)

>>> future.add_callbacks(log_results, log_error)

Async execution with blocking wait for results:

>>> future = session.execute_async("SELECT * FROM mycf")
>>> # do other stuff...

>>> try:
...     results = future.result()
... except Exception:
...     log.exception("Operation failed:")
prepare(statement)[source]

Prepares a query string, returing a PreparedStatement instance which can be used as follows:

>>> session = cluster.connect("mykeyspace")
>>> query = "INSERT INTO users (id, name, age) VALUES (?, ?, ?)"
>>> prepared = session.prepare(query)
>>> session.execute(prepared, (user.id, user.name, user.age))

Or you may bind values to the prepared statement ahead of time:

>>> prepared = session.prepare(query)
>>> bound_stmt = prepared.bind((user.id, user.name, user.age))
>>> session.execute(bound_stmt)

Of course, prepared statements may (and should) be reused:

>>> prepared = session.prepare(query)
>>> for user in users:
...     bound = prepared.bind((user.id, user.name, user.age))
...     session.execute(bound)

Important: PreparedStatements should be prepared only once. Preparing the same query more than once will likely affect performance.

shutdown()[source]

Close all connections. Session instances should not be used for any purpose after being shutdown.

set_keyspace(keyspace)[source]

Set the default keyspace for all queries made through this Session. This operation blocks until complete.

class cassandra.cluster.ResponseFuture[source]

An asynchronous response delivery mechanism that is returned from calls to Session.execute_async().

There are two ways for results to be delivered:
query = None

The Statement instance that is being executed through this ResponseFuture.

result([timeout])[source]

Return the final result or raise an Exception if errors were encountered. If the final result or error has not been set yet, this method will block until that time.

You may set a timeout (in seconds) with the timeout parameter. By default, the default_timeout for the Session this was created through will be used for the timeout on this operation. If the timeout is exceeded, an cassandra.OperationTimedOut will be raised.

Example usage:

>>> future = session.execute_async("SELECT * FROM mycf")
>>> # do other stuff...

>>> try:
...     rows = future.result()
...     for row in rows:
...         ... # process results
... except Exception:
...     log.exception("Operation failed:")
get_query_trace()[source]

Returns the QueryTrace instance representing a trace of the last attempt for this operation, or None if tracing was not enabled for this query. Note that this may raise an exception if there are problems retrieving the trace details from Cassandra. If the trace is not available after max_wait seconds, cassandra.query.TraceUnavailable will be raised.

has_more_pages None[source]

Returns True if there are more pages left in the query results, False otherwise. This should only be checked after the first page has been returned.

New in version 2.0.0.

start_fetching_next_page()[source]

If there are more pages left in the query result, this asynchronously starts fetching the next page. If there are no pages left, QueryExhausted is raised. Also see has_more_pages.

This should only be called after the first page has been returned.

New in version 2.0.0.

add_callback(fn, *args, **kwargs)[source]

Attaches a callback function to be called when the final results arrive.

By default, fn will be called with the results as the first and only argument. If *args or **kwargs are supplied, they will be passed through as additional positional or keyword arguments to fn.

If an error is hit while executing the operation, a callback attached here will not be called. Use add_errback() or add_callbacks() if you wish to handle that case.

If the final result has already been seen when this method is called, the callback will be called immediately (before this method returns).

Important: if the callback you attach results in an exception being raised, the exception will be ignored, so please ensure your callback handles all error cases that you care about.

Usage example:

>>> session = cluster.connect("mykeyspace")

>>> def handle_results(rows, start_time, should_log=False):
...     if should_log:
...         log.info("Total time: %f", time.time() - start_time)
...     ...

>>> future = session.execute_async("SELECT * FROM users")
>>> future.add_callback(handle_results, time.time(), should_log=True)
add_errback(fn, *args, **kwargs)[source]

Like add_callback(), but handles error cases. An Exception instance will be passed as the first positional argument to fn.

add_callbacks(callback, errback, callback_args=(), callback_kwargs=None, errback_args=(), errback_args=None)[source]

A convenient combination of add_callback() and add_errback().

Example usage:

>>> session = cluster.connect()
>>> query = "SELECT * FROM mycf"
>>> future = session.execute_async(query)

>>> def log_results(results, level='debug'):
...     for row in results:
...         log.log(level, "Result: %s", row)

>>> def log_error(exc, query):
...     log.error("Query '%s' failed: %s", query, exc)

>>> future.add_callbacks(
...     callback=log_results, callback_kwargs={'level': 'info'},
...     errback=log_error, errback_args=(query,))
class cassandra.cluster.PagedResult[source]

An iterator over the rows from a paged query result. Whenever the number of result rows for a query exceed the fetch_size (or default_fetch_size, if not set) an instance of this class will be returned.

You can treat this as a normal iterator over rows:

>>> from cassandra.query import SimpleStatement
>>> statement = SimpleStatement("SELECT * FROM users", fetch_size=10)
>>> for user_row in session.execute(statement):
...     process_user(user_row)

Whenever there are no more rows in the current page, the next page will be fetched transparently. However, note that it is possible for an Exception to be raised while fetching the next page, just like you might see on a normal call to session.execute().

exception cassandra.cluster.QueryExhausted[source]

Raised when ResponseFuture.start_fetching_next_page() is called and there are no more pages. You can check ResponseFuture.has_more_pages before calling to avoid this.

New in version 2.0.0.

exception cassandra.cluster.NoHostAvailable[source]

Raised when an operation is attempted but all connections are busy, defunct, closed, or resulted in errors when used.

errors = None

A map of the form {ip: exception} which details the particular Exception that was caught for each host the operation was attempted against.