dse.cluster - Clusters and Sessions

class Cluster

The main class to use when interacting with a Cassandra cluster. Typically, one instance of this class will be created for each separate Cassandra cluster that your application interacts with.

Example usage:

>>> from dse.cluster import Cluster
>>> cluster = Cluster(['192.168.1.1', '192.168.1.2'])
>>> session = cluster.connect()
>>> session.execute("CREATE KEYSPACE ...")
>>> ...
>>> cluster.shutdown()

Cluster and Session also provide context management functions which implicitly handle shutdown when leaving scope.

executor_threads defines the number of threads in a pool for handling asynchronous tasks such as extablishing connection pools or refreshing metadata.

Any of the mutable Cluster attributes may be set as keyword arguments to the constructor.

Attributes

contact_points

= [‘127.0.0.1’]

The list of contact points to try connecting for cluster discovery. A contact point can be a string (ip, hostname) or a connection.EndPoint instance.

Defaults to loopback interface.

Note: When using DCAwareLoadBalancingPolicy with no explicit local_dc set (as is the default), the DC is chosen from an arbitrary host in contact_points. In this case, contact_points should contain only nodes from a single, local DC.

Note: In the next major version, if you specify contact points, you will also be required to also explicitly specify a load-balancing policy. This change will help prevent cases where users had hard-to-debug issues surrounding unintuitive default load-balancing policy behavior.

port

= 9042

The server-side port to open connections to. Defaults to 9042.

cql_version

= None

If a specific version of CQL should be used, this may be set to that string version. Otherwise, the highest CQL version supported by the server will be automatically used.

protocol_version

= 66

The maximum version of the native protocol to use.

See ProtocolVersion for more information about versions.

If not set in the constructor, the driver will automatically downgrade version based on a negotiation with the server, but it is most efficient to set this to the maximum supported by your version of Cassandra. Setting this will also prevent conflicting versions negotiated if your cluster is upgraded.

compression

= True

Controls compression for communications between the driver and Cassandra. If left as the default of True, either lz4 or snappy compression may be used, depending on what is supported by both the driver and Cassandra. If both are fully supported, lz4 will be preferred.

You may also set this to ‘snappy’ or ‘lz4’ to request that specific compression type.

Setting this to False disables compression.

auth_provider

This should be an instance of a subclass of AuthProvider, such as PlainTextAuthProvider.

When not using authentication, this should be left as None.

reconnection_policy

= <dse.policies.ExponentialReconnectionPolicy object>

An instance of policies.ReconnectionPolicy. Defaults to an instance of ExponentialReconnectionPolicy with a base delay of one second and a max delay of ten minutes.

conviction_policy_factory

= <class ‘dse.policies.SimpleConvictionPolicy’>

A factory function which creates instances of policies.ConvictionPolicy. Defaults to policies.SimpleConvictionPolicy.

address_translator

= <dse.policies.IdentityTranslator object>

policies.AddressTranslator instance to be used in translating server node addresses to driver connection addresses.

metrics_enabled

= False

Whether or not metric collection is enabled. If enabled, metrics will be an instance of Metrics.

metrics

= None

An instance of dse.metrics.Metrics if metrics_enabled is True, else None.

ssl_context

= None

An optional ssl.SSLContext instance which will be used when new sockets are created. This should be used when client encryption is enabled in Cassandra.

wrap_socket options can be set using ssl_options. ssl_options will be used as kwargs for ssl.SSLContext.wrap_socket.

New in version 2.8.0.

ssl_options

= None

Using ssl_options without ssl_context is deprecated and will be removed in the next major release.

An optional dict which will be used as kwargs for ssl.SSLContext.wrap_socket (or ssl.wrap_socket() if used without ssl_context) when new sockets are created. This should be used when client encryption is enabled in Cassandra.

The following documentation only applies when ssl_options is used without ssl_context.

By default, a ca_certs value should be supplied (the value should be a string pointing to the location of the CA certs file), and you probably want to specify ssl_version as ssl.PROTOCOL_TLSv1 to match Cassandra’s default protocol.

Changed in version 3.3.0.

In addition to wrap_socket kwargs, clients may also specify 'check_hostname': True to verify the cert hostname as outlined in RFC 2818 and RFC 6125. Note that this requires the certificate to be transferred, so should almost always require the option 'cert_reqs': ssl.CERT_REQUIRED. Note also that this functionality was not built into Python standard library until (2.7.9, 3.2). To enable this mechanism in earlier versions, patch ssl.match_hostname with a custom or back-ported function.

sockopts

= None

An optional list of tuples which will be used as arguments to socket.setsockopt() for all created sockets.

Note: some drivers find setting TCPNODELAY beneficial in the context of their execution model. It was not found generally beneficial for this driver. To try with your own workload, set sockopts = [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)]

max_schema_agreement_wait

= 10

The maximum duration (in seconds) that the driver will wait for schema agreement across the cluster. Defaults to ten seconds. If set <= 0, the driver will bypass schema agreement waits altogether.

metadata

= None

An instance of dse.metadata.Metadata.

connection_class

= <class ‘dse.io.libevreactor.LibevConnection’>

This determines what event loop system will be used for managing I/O with Cassandra. These are the current options:

By default, AsyncoreConnection will be used, which uses the asyncore module in the Python standard library.

If libev is installed, LibevConnection will be used instead.

If gevent or eventlet monkey-patching is detected, the corresponding connection class will be used automatically.

AsyncioConnection, which uses the asyncio module in the Python standard library, is also available, but currently experimental. Note that it requires asyncio features that were only introduced in the 3.4 line in 3.4.6, and in the 3.5 line in 3.5.1.

control_connection_timeout

= 2.0

A timeout, in seconds, for queries made by the control connection, such as querying the current schema and information about nodes in the cluster. If set to None, there will be no timeout for these queries.

idle_heartbeat_interval

= 30

Interval, in seconds, on which to heartbeat idle connections. This helps keep connections open through network devices that expire idle connections. It also helps discover bad connections early in low-traffic scenarios. Setting to zero disables heartbeats.

idle_heartbeat_timeout

= 30

Timeout, in seconds, on which the heartbeat wait for idle connection responses. Lowering this value can help to discover bad connections earlier.

schema_event_refresh_window

= 2

Window, in seconds, within which a schema component will be refreshed after receiving a schema_change event.

The driver delays a random amount of time in the range [0.0, window) before executing the refresh. This serves two purposes:

1.) Spread the refresh for deployments with large fanout from C* to client tier, preventing a ‘thundering herd’ problem with many clients refreshing simultaneously.

2.) Remove redundant refreshes. Redundant events arriving within the delay period are discarded, and only one refresh is executed.

Setting this to zero will execute refreshes immediately.

Setting this negative will disable schema refreshes in response to push events (refreshes will still occur in response to schema change responses to DDL statements executed by Sessions of this Cluster).

topology_event_refresh_window

= 10

Window, in seconds, within which the node and token list will be refreshed after receiving a topology_change event.

Setting this to zero will execute refreshes immediately.

Setting this negative will disable node refreshes in response to push events.

See schema_event_refresh_window for discussion of rationale

status_event_refresh_window

= 2

Window, in seconds, within which the driver will start the reconnect after receiving a status_change event.

Setting this to zero will connect immediately.

This is primarily used to avoid ‘thundering herd’ in deployments with large fanout from cluster to clients. When nodes come up, clients attempt to reprepare prepared statements (depending on reprepare_on_up), and establish connection pools. This can cause a rush of connections and queries if not mitigated with this factor.

prepare_on_all_hosts

= True

Specifies whether statements should be prepared on all hosts, or just one.

This can reasonably be disabled on long-running applications with numerous clients preparing statements on startup, where a randomized initial condition of the load balancing policy can be expected to distribute prepares from different clients across the cluster.

reprepare_on_up

= True

Specifies whether all known prepared statements should be prepared on a node when it comes up.

May be used to avoid overwhelming a node on return, or if it is supposed that the node was only marked down due to network. If statements are not reprepared, they are prepared on the first execution, causing an extra roundtrip for one or more client requests.

connect_timeout

= 5

Timeout, in seconds, for creating new connections.

This timeout covers the entire connection negotiation, including TCP establishment, options passing, and authentication.

schema_metadata_enabled

= True

Flag indicating whether internal schema metadata is updated.

When disabled, the driver does not populate Cluster.metadata.keyspaces on connect, or on schema change events. This can be used to speed initial connection, and reduce load on client and server during operation. Turning this off gives away token aware request routing, and programmatic inspection of the metadata model.

token_metadata_enabled

= True

Flag indicating whether internal token metadata is updated.

When disabled, the driver does not query node token information on connect, or on topology change events. This can be used to speed initial connection, and reduce load on client and server during operation. It is most useful in large clusters using vnodes, where the token map can be expensive to compute. Turning this off gives away token aware request routing, and programmatic inspection of the token ring.

timestamp_generator

= None

An object, shared between all sessions created by this cluster instance, that generates timestamps when client-side timestamp generation is enabled. By default, each Cluster uses a new MonotonicTimestampGenerator.

Applications can set this value for custom timestamp behavior. See the documentation for Session.timestamp_generator().

endpoint_factory

= None

An EndPointFactory instance to use internally when creating a socket connection to a node. You can ignore this unless you need a special connection mechanism.

application_name

= ”

A string identifying this application to Insights.

application_version

= ”

A string identifiying this application’s version to Insights

monitor_reporting_enabled

= True

A boolean indicating if monitor reporting, which sends gathered data to Insights when running against DSE 6.8 and higher.

monitor_reporting_interval

= 30

A boolean indicating if monitor reporting, which sends gathered data to Insights when running against DSE 6.8 and higher.

client_id

= None

A UUID that uniquely identifies this Cluster object to Insights. This will be generated automatically unless the user provides one.

cloud

= None

A dict of the cloud configuration. Example:

{
    # path to the secure connect bundle
    'secure_connect_bundle': '/path/to/secure-connect-dbname.zip'
}

The zip file will be temporarily extracted in the same directory to load the configuration and certificates.

Methods

connect

(keyspace=None, wait_for_all_pools=False)

Creates and returns a new Session object.

If keyspace is specified, that keyspace will be the default keyspace for operations on the Session.

wait_for_all_pools specifies whether this call should wait for all connection pools to be established or attempted. Default is False, which means it will return when the first successful connection is established. Remaining pools are added asynchronously.

shutdown

()

Closes all sessions and connection associated with this Cluster. To ensure all connections are properly closed, you should always call shutdown() on a Cluster instance when you are done with it.

Once shutdown, a Cluster should not be used for any purpose.

register_user_type

(keyspace, user_type, klass)

Registers a class to use to represent a particular user-defined type. Query parameters for this user-defined type will be assumed to be instances of klass. Result sets for this user-defined type will be instances of klass. If no class is registered for a user-defined type, a namedtuple will be used for result sets, and non-prepared statements may not encode parameters for this type correctly.

keyspace is the name of the keyspace that the UDT is defined in.

user_type is the string name of the UDT to register the mapping for.

klass should be a class with attributes whose names match the fields of the user-defined type. The constructor must accepts kwargs for each of the fields in the UDT.

This method should only be called after the type has been created within Cassandra.

Example:

cluster = Cluster(protocol_version=3)
session = cluster.connect()
session.set_keyspace('mykeyspace')
session.execute("CREATE TYPE address (street text, zipcode int)")
session.execute("CREATE TABLE users (id int PRIMARY KEY, location address)")

# create a class to map to the "address" UDT
class Address(object):

    def __init__(self, street, zipcode):
        self.street = street
        self.zipcode = zipcode

cluster.register_user_type('mykeyspace', 'address', Address)

# insert a row using an instance of Address
session.execute("INSERT INTO users (id, location) VALUES (%s, %s)",
                (0, Address("123 Main St.", 78723)))

# results will include Address instances
results = session.execute("SELECT * FROM users")
row = results[0]
print row.id, row.location.street, row.location.zipcode

register_listener

(listener)

Adds a dse.policies.HostStateListener subclass instance to the list of listeners to be notified when a host is added, removed, marked up, or marked down.

unregister_listener

(listener)

Removes a registered listener.

add_execution_profile

(name, profile, pool_wait_timeout=5)

Adds an ExecutionProfile to the cluster. This makes it available for use by name in Session.execute() and Session.execute_async(). This method will raise if the profile already exists.

Normally profiles will be injected at cluster initialization via Cluster(execution_profiles). This method provides a way of adding them dynamically.

Adding a new profile updates the connection pools according to the specified load_balancing_policy. By default, this method will wait up to five seconds for the pool creation to complete, so the profile can be used immediately upon return. This behavior can be controlled using pool_wait_timeout (see concurrent.futures.wait for timeout semantics).

get_control_connection_host

()

Returns the control connection host metadata.

refresh_schema_metadata

(max_schema_agreement_wait=None)

Synchronously refresh all schema metadata.

By default, the timeout for this operation is governed by max_schema_agreement_wait and control_connection_timeout.

Passing max_schema_agreement_wait here overrides max_schema_agreement_wait.

Setting max_schema_agreement_wait <= 0 will bypass schema agreement and refresh schema immediately.

An Exception is raised if schema refresh fails for any reason.

refresh_keyspace_metadata

(keyspace, max_schema_agreement_wait=None)

Synchronously refresh keyspace metadata. This applies to keyspace-level information such as replication and durability settings. It does not refresh tables, types, etc. contained in the keyspace.

See refresh_schema_metadata() for description of max_schema_agreement_wait behavior

refresh_table_metadata

(keyspace, table, max_schema_agreement_wait=None)

Synchronously refresh table metadata. This applies to a table, and any triggers or indexes attached to the table.

See refresh_schema_metadata() for description of max_schema_agreement_wait behavior

refresh_user_type_metadata

(keyspace, user_type, max_schema_agreement_wait=None)

Synchronously refresh user defined type metadata.

See refresh_schema_metadata() for description of max_schema_agreement_wait behavior

refresh_user_function_metadata

(keyspace, function, max_schema_agreement_wait=None)

Synchronously refresh user defined function metadata.

function is a dse.UserFunctionDescriptor.

See refresh_schema_metadata() for description of max_schema_agreement_wait behavior

refresh_user_aggregate_metadata

(keyspace, aggregate, max_schema_agreement_wait=None)

Synchronously refresh user defined aggregate metadata.

aggregate is a dse.UserAggregateDescriptor.

See refresh_schema_metadata() for description of max_schema_agreement_wait behavior

refresh_nodes

(force_token_rebuild=False)

Synchronously refresh the node list and token metadata

force_token_rebuild can be used to rebuild the token map metadata, even if no new nodes are discovered.

An Exception is raised if node refresh fails for any reason.

set_meta_refresh_enabled

(enabled)

Deprecated: set schema_metadata_enabled token_metadata_enabled instead

Sets a flag to enable (True) or disable (False) all metadata refresh queries. This applies to both schema and node topology.

Disabling this is useful to minimize refreshes during multiple changes.

Meta refresh must be enabled for the driver to become aware of any cluster topology changes or schema updates.

class ExecutionProfile

Attributes

consistency_level

= LOCAL_ONE

ConsistencyLevel used when not specified on a Statement.

load_balancing_policy

= None

An instance of policies.LoadBalancingPolicy or one of its subclasses.

Used in determining host distance for establishing connections, and routing requests.

Defaults to TokenAwarePolicy(DCAwareRoundRobinPolicy()) if not specified

retry_policy

= None

An instance of policies.RetryPolicy instance used when Statement objects do not have a retry_policy explicitly set.

Defaults to RetryPolicy if not specified

serial_consistency_level

= None

Serial ConsistencyLevel used when not specified on a Statement (for LWT conditional statements).

request_timeout

= 10.0

Request timeout used when not overridden in Session.execute()

Static Methods

static

row_factory

(colnames, rows)

A callable to format results, accepting (colnames, rows) where colnames is a list of column names, and rows is a list of tuples, with each tuple representing a row of parsed values.

Some example implementations:

Attributes

speculative_execution_policy

= None

An instance of policies.SpeculativeExecutionPolicy

Defaults to NoSpeculativeExecutionPolicy if not specified

continuous_paging_options

= None

Note: This feature is implemented to facilitate server integration testing. It is not intended for general use in the Python driver. See Statement.fetch_size or Session.default_fetch_size for configuring normal paging.

When set, requests will use DSE’s continuous paging, which streams multiple pages without intermediate requests.

This has the potential to materialize all results in memory at once if the consumer cannot keep up. Use options to constrain page size and rate.

Module Data

dse.cluster.

EXEC_PROFILE_DEFAULT

Key for the Cluster default execution profile, used when no other profile is selected in Session.execute(execution_profile).

Use this as the key in Cluster(execution_profiles) to override the default profile.

class GraphExecutionProfile

Default execution profile for graph execution.

See ExecutionProfile for base attributes.

In addition to default parameters shown in the signature, this profile also defaults retry_policy to dse.policies.NeverRetryPolicy.

Attributes

graph_options

= None

GraphOptions to use with this execution

Default options for graph queries, initialized as follows by default:

GraphOptions(graph_language=b'gremlin-groovy')

See dse.graph.GraphOptions

class GraphAnalyticsExecutionProfile

Execution profile with timeout and load balancing appropriate for graph analytics queries.

See also GraphExecutionPolicy.

In addition to default parameters shown in the signature, this profile also defaults retry_policy to dse.policies.NeverRetryPolicy, and load_balancing_policy to one that targets the current Spark master.

Note: The graph_options.graph_source is set automatically to b’a’ (analytics) when using GraphAnalyticsExecutionProfile. This is mandatory to target analytics nodes.

dse.cluster.

EXEC_PROFILE_GRAPH_DEFAULT

Key for the default graph execution profile, used when no other profile is selected in Session.execute_graph(execution_profile).

Use this as the key in Cluster(execution_profiles) to override the default graph profile.

dse.cluster.

EXEC_PROFILE_GRAPH_SYSTEM_DEFAULT

Key for the default graph system execution profile. This can be used for graph statements using the DSE graph system API.

Selected using Session.execute_graph(execution_profile=EXEC_PROFILE_GRAPH_SYSTEM_DEFAULT).

dse.cluster.

EXEC_PROFILE_GRAPH_ANALYTICS_DEFAULT

Key for the default graph analytics execution profile. This can be used for graph statements intended to use Spark/analytics as the traversal source.

Selected using Session.execute_graph(execution_profile=EXEC_PROFILE_GRAPH_ANALYTICS_DEFAULT).

class Session

A collection of connection pools for each host in the cluster. Instances of this class should not be created directly, only using Cluster.connect().

Queries and statements can be executed through Session instances using the execute() and execute_async() methods.

Example usage:

>>> session = cluster.connect()
>>> session.set_keyspace("mykeyspace")
>>> session.execute("SELECT * FROM mycf")

Attributes

default_fetch_size

= 5000

By default, this many rows will be fetched at a time. Setting this to None will disable automatic paging for large query results. The fetch size can be also specified per-query through Statement.fetch_size.

New in version 2.0.0.

use_client_timestamp

= True

Clients provide write timestamps by default. Note that timestamps specified within a CQL query will override this timestamp.

If client timestamps are disabled, the coordinator node will provide the timestamp server-side.

New in version 2.1.0.

timestamp_generator

= None

When use_client_timestamp is set, sessions call this object and use the result as the timestamp. (Note that timestamps specified within a CQL query will override this timestamp.) By default, a new MonotonicTimestampGenerator is created for each Cluster instance.

Applications can set this value for custom timestamp behavior. For example, an application could share a timestamp generator across Cluster objects to guarantee that the application will use unique, increasing timestamps across clusters, or set it to to lambda: int(time.time() * 1e6) if losing records over clock inconsistencies is acceptable for the application. Custom timestamp_generator s should be callable, and calling them should return an integer representing microseconds since some point in time, typically UNIX epoch.

New in version 3.8.0.

encoder

= None

A Encoder instance that will be used when formatting query parameters for non-prepared statements. This is not used for prepared statements (because prepared statements give the driver more information about what CQL types are expected, allowing it to accept a wider range of python types).

The encoder uses a mapping from python types to encoder methods (for specific CQL types). This mapping can be be modified by users as they see fit. Methods of Encoder should be used for mapping values if possible, because they take precautions to avoid injections and properly sanitize data.

Example:

cluster = Cluster()
session = cluster.connect("mykeyspace")
session.encoder.mapping[tuple] = session.encoder.cql_encode_tuple

session.execute("CREATE TABLE mytable (k int PRIMARY KEY, col tuple<int, ascii>)")
session.execute("INSERT INTO mytable (k, col) VALUES (%s, %s)", [0, (123, 'abc')])
New in version 2.1.0.

client_protocol_handler

= <class ‘dse.protocol._ProtocolHandler’>

Specifies a protocol handler that will be used for client-initiated requests (i.e. no internal driver requests). This can be used to override or extend features such as message or type ser/des.

The default pure python implementation is dse.protocol.ProtocolHandler.

When compiled with Cython, there are also built-in faster alternatives. See Faster Deserialization

Methods

execute

(statement[, parameters][, timeout][, trace][, custom_payload][, execute_as][, paging_state][, host])

Execute the given query and synchronously wait for the response.

If an error is encountered while executing the query, an Exception will be raised.

query may be a query string or an instance of dse.query.Statement.

parameters may be a sequence or dict of parameters to bind. If a sequence is used, %s should be used the placeholder for each argument. If a dict is used, %(name)s style placeholders must be used.

timeout should specify a floating-point timeout (in seconds) after which an OperationTimedOut exception will be raised if the query has not completed. If not set, the timeout defaults to the request_timeout of the selected execution_profile. If set to None, there is no timeout. Please see ResponseFuture.result() for details on the scope and effect of this timeout.

If trace is set to True, the query will be sent with tracing enabled. The trace details can be obtained using the returned ResultSet object.

custom_payload is a Custom Payloads dict to be passed to the server. If query is a Statement with its own custom_payload. The message payload will be a union of the two, with the values specified here taking precedence.

execution_profile is the execution profile to use for this request. It can be a key to a profile configured via Cluster.add_execution_profile() or an instance (from Session.execution_profile_clone_update(), for example

paging_state is an optional paging state, reused from a previous ResultSet.

execute_as the user that will be used on the server to execute the request.

host is the dse.pool.Host that should handle the query. If the host specified is down or not yet connected, the query will fail with NoHostAvailable. Using this is discouraged except in a few cases, e.g., querying node-local tables and applying schema changes.

execute_async

(statement[, parameters][, trace][, custom_payload][, execute_as][, paging_state][, host])

Execute the given query and return a ResponseFuture object which callbacks may be attached to for asynchronous response delivery. You may also call result() on the ResponseFuture to synchronously block for results at any time.

See Session.execute() for parameter definitions.

Example usage:

>>> session = cluster.connect()
>>> future = session.execute_async("SELECT * FROM mycf")

>>> def log_results(results):
...     for row in results:
...         log.info("Results: %s", row)

>>> def log_error(exc):
>>>     log.error("Operation failed: %s", exc)

>>> future.add_callbacks(log_results, log_error)

Async execution with blocking wait for results:

>>> future = session.execute_async("SELECT * FROM mycf")
>>> # do other stuff...

>>> try:
...     results = future.result()
... except Exception:
...     log.exception("Operation failed:")

execute_graph

(statement[, parameters][, trace][, execution_profile][, execute_as])

Executes a Gremlin query string or GraphStatement synchronously, and returns a ResultSet from this execution.

parameters is dict of named parameters to bind. The values must be JSON-serializable.

execution_profile: Selects an execution profile for the request.

execute_as the user that will be used on the server to execute the request.

execute_graph_async

(statement[, parameters][, trace][, execution_profile][, execute_as])

Execute the graph query and return a ResponseFuture object which callbacks may be attached to for asynchronous response delivery. You may also call ResponseFuture.result() to synchronously block for results at any time.

prepare

(statement)

Prepares a query string, returning a PreparedStatement instance which can be used as follows:

>>> session = cluster.connect("mykeyspace")
>>> query = "INSERT INTO users (id, name, age) VALUES (?, ?, ?)"
>>> prepared = session.prepare(query)
>>> session.execute(prepared, (user.id, user.name, user.age))

Or you may bind values to the prepared statement ahead of time:

>>> prepared = session.prepare(query)
>>> bound_stmt = prepared.bind((user.id, user.name, user.age))
>>> session.execute(bound_stmt)

Of course, prepared statements may (and should) be reused:

>>> prepared = session.prepare(query)
>>> for user in users:
...     bound = prepared.bind((user.id, user.name, user.age))
...     session.execute(bound)

Alternatively, if protocol_version is 5 or higher (requires Cassandra 4.0+), the keyspace can be specified as a parameter. This will allow you to avoid specifying the keyspace in the query without specifying a keyspace in connect(). It even will let you prepare and use statements against a keyspace other than the one originally specified on connection:

>>> analyticskeyspace_prepared = session.prepare( … “INSERT INTO user_activity id, last_activity VALUES (?, ?)”, … keyspace=”analyticskeyspace”) # note the different keyspace

Important: PreparedStatements should be prepared only once. Preparing the same query more than once will likely affect performance.

custom_payload is a key value map to be passed along with the prepare message. See Custom Payloads.

shutdown

()

Close all connections. Session instances should not be used for any purpose after being shutdown.

set_keyspace

(keyspace)

Set the default keyspace for all queries made through this Session. This operation blocks until complete.

get_execution_profile

(name)

Returns the execution profile associated with the provided name.

Parameters

name – The name (or key) of the execution profile.

execution_profile_clone_update

(ep, **kwargs)

Returns a clone of the ep profile. kwargs can be specified to update attributes of the returned profile.

This is a shallow clone, so any objects referenced by the profile are shared. This means Load Balancing Policy is maintained by inclusion in the active profiles. It also means updating any other rich objects will be seen by the active profile. In cases where this is not desirable, be sure to replace the instance instead of manipulating the shared object.

add_request_init_listener

(fn, *args, **kwargs)

Adds a callback with arguments to be called when any request is created.

It will be invoked as fn(response_future, *args, **kwargs) after each client request is created, and before the request is sent. This can be used to create extensions by adding result callbacks to the response future.

response_future is the ResponseFuture for the request.

Note that the init callback is done on the client thread creating the request, so you may need to consider synchronization if you have multiple threads. Any callbacks added to the response future will be executed on the event loop thread, so the normal advice about minimizing cycles and avoiding blocking apply (see Note in ResponseFuture.add_callbacks().

See this example in the source tree for an example.

remove_request_init_listener

(fn, *args, **kwargs)

Removes a callback and arguments from the list.

See Session.add_request_init_listener().

class ResponseFuture

An asynchronous response delivery mechanism that is returned from calls to Session.execute_async().

There are two ways for results to be delivered:

Attributes

query

= None

The Statement instance that is being executed through this ResponseFuture.

Methods

result

()

Return the final result or raise an Exception if errors were encountered. If the final result or error has not been set yet, this method will block until it is set, or the timeout set for the request expires.

Timeout is specified in the Session request execution functions. If the timeout is exceeded, an dse.OperationTimedOut will be raised. This is a client-side timeout. For more information about server-side coordinator timeouts, see policies.RetryPolicy.

Example usage:

>>> future = session.execute_async("SELECT * FROM mycf")
>>> # do other stuff...

>>> try:
...     rows = future.result()
...     for row in rows:
...         ... # process results
... except Exception:
...     log.exception("Operation failed:")

get_query_trace

()

Fetches and returns the query trace of the last response, or None if tracing was not enabled.

Note that this may raise an exception if there are problems retrieving the trace details from Cassandra. If the trace is not available after max_wait, dse.query.TraceUnavailable will be raised.

If the ResponseFuture is not done (async execution) and you try to retrieve the trace, cassandra.query.TraceUnavailable will be raised.

query_cl is the consistency level used to poll the trace tables.

get_all_query_traces

()

Fetches and returns the query traces for all query pages, if tracing was enabled.

See note in get_query_trace() regarding possible exceptions.

Attributes

custom_payload

The custom payload returned from the server, if any. This will only be set by Cassandra servers implementing a custom QueryHandler, and only for protocol_version 4+.

Ensure the future is complete before trying to access this property (call result(), or after callback is invoked). Otherwise it may throw if the response has not been received.

Returns

Custom Payloads.

is_schema_agreed

= True

For DDL requests, this may be set False if the schema agreement poll after the response fails.

Always True for non-DDL requests.

has_more_pages

Returns True if there are more pages left in the query results, False otherwise. This should only be checked after the first page has been returned.

New in version 2.0.0.

warnings

Warnings returned from the server, if any. This will only be set for protocol_version 4+.

Warnings may be returned for such things as oversized batches, or too many tombstones in slice queries.

Ensure the future is complete before trying to access this property (call result(), or after callback is invoked). Otherwise it may throw if the response has not been received.

Methods

start_fetching_next_page

()

If there are more pages left in the query result, this asynchronously starts fetching the next page. If there are no pages left, QueryExhausted is raised. Also see has_more_pages.

This should only be called after the first page has been returned.

New in version 2.0.0.

add_callback

(fn, *args, **kwargs)

Attaches a callback function to be called when the final results arrive.

By default, fn will be called with the results as the first and only argument. If *args or **kwargs are supplied, they will be passed through as additional positional or keyword arguments to fn.

If an error is hit while executing the operation, a callback attached here will not be called. Use add_errback() or add_callbacks() if you wish to handle that case.

If the final result has already been seen when this method is called, the callback will be called immediately (before this method returns).

Note: in the case that the result is not available when the callback is added, the callback is executed by IO event thread. This means that the callback should not block or attempt further synchronous requests, because no further IO will be processed until the callback returns.

Important: if the callback you attach results in an exception being raised, the exception will be ignored, so please ensure your callback handles all error cases that you care about.

Usage example:

>>> session = cluster.connect("mykeyspace")

>>> def handle_results(rows, start_time, should_log=False):
...     if should_log:
...         log.info("Total time: %f", time.time() - start_time)
...     ...

>>> future = session.execute_async("SELECT * FROM users")
>>> future.add_callback(handle_results, time.time(), should_log=True)

add_errback

(fn, *args, **kwargs)

Like add_callback(), but handles error cases. An Exception instance will be passed as the first positional argument to fn.

add_callbacks

(callback, errback, callback_args=(), callback_kwargs=None, errback_args=(), errback_args=None)

A convenient combination of add_callback() and add_errback().

Example usage:

>>> session = cluster.connect()
>>> query = "SELECT * FROM mycf"
>>> future = session.execute_async(query)

>>> def log_results(results, level='debug'):
...     for row in results:
...         log.log(level, "Result: %s", row)

>>> def log_error(exc, query):
...     log.error("Query '%s' failed: %s", query, exc)

>>> future.add_callbacks(
...     callback=log_results, callback_kwargs={'level': 'info'},
...     errback=log_error, errback_args=(query,))

class ResultSet

An iterator over the rows from a query result. Also supplies basic equality and indexing methods for backward-compatability. These methods materialize the entire result set (loading all pages), and should only be used if the total result size is understood. Warnings are emitted when paged results are materialized in this fashion.

You can treat this as a normal iterator over rows:

>>> from dse.query import SimpleStatement
>>> statement = SimpleStatement("SELECT * FROM users", fetch_size=10)
>>> for user_row in session.execute(statement):
...     process_user(user_row)

Whenever there are no more rows in the current page, the next page will be fetched transparently. However, note that it is possible for an Exception to be raised while fetching the next page, just like you might see on a normal call to session.execute().

Attributes

has_more_pages

True if the last response indicated more pages; False otherwise

current_rows

The list of current page rows. May be empty if the result was empty, or this is the last page.

Methods

one

()

Return a single row of the results or None if empty. This is basically a shortcut to result_set.current_rows[0] and should only be used when you know a query returns a single row. Consider using an iterator if the ResultSet contains more than one row.

fetch_next_page

()

Manually, synchronously fetch the next page. Supplied for manually retrieving pages and inspecting current_page(). It is not necessary to call this when iterating through results; paging happens implicitly in iteration.

get_query_trace

(max_wait_sec=None)

Gets the last query trace from the associated future. See ResponseFuture.get_query_trace() for details.

get_all_query_traces

(max_wait_sec_per=None)

Gets all query traces from the associated future. See ResponseFuture.get_all_query_traces() for details.

Attributes

was_applied

For LWT results, returns whether the transaction was applied.

Result is indeterminate if called on a result that was not an LWT request or on a query.BatchStatement containing LWT. In the latter case either all the batch succeeds or fails.

Only valid when one of the of the internal row factories is in use.

paging_state

Server paging state of the query. Can be None if the query was not paged.

The driver treats paging state as opaque, but it may contain primary key data, so applications may want to avoid sending this to untrusted parties.

exception QueryExhausted

Raised when ResponseFuture.start_fetching_next_page() is called and there are no more pages. You can check ResponseFuture.has_more_pages before calling to avoid this.

New in version 2.0.0.

exception NoHostAvailable

Raised when an operation is attempted but all connections are busy, defunct, closed, or resulted in errors when used.

Attributes

errors

= None

A map of the form {ip: exception} which details the particular Exception that was caught for each host the operation was attempted against.

exception UserTypeDoesNotExist

An attempt was made to use a user-defined type that does not exist.

New in version 2.1.0.