Module astrapy.event_observers.context_managers

Functions

def event_collector(target: DB_OBJ,
*,
destination: list[ObservableEvent] | dict[ObservableEventType, list[ObservableEvent]],
event_types: Iterable[ObservableEventType] | None = None) ‑> Iterator[~DB_OBJ]
Expand source code
@contextmanager
def event_collector(
    target: DB_OBJ,
    *,
    destination: list[ObservableEvent]
    | dict[ObservableEventType, list[ObservableEvent]],
    event_types: Iterable[ObservableEventType] | None = None,
) -> Iterator[DB_OBJ]:
    """
    Create a context manager wrapping a list (of `ObservableEvent` objects) or
    a dict (with `ObservableEventType` keys and `list[ObservableEvent]` values),
    for quickly instrumenting a client, database, table, collection or admin object.

    This utility is meant to be used in a `with` statement, so that events emitted
    by the instrumented classes within the with block are captured into the provided
    lists/dictionaries. Events emitted by spawned classes (e.g. when a Database
    creates a Collection in the `with` block) are also received this way.

    Once outside the `with` block, collection of events stops, but the provided
    destination can still be accessed according to its ordinary scoping.

    Args:
        target: an object that issues Data API / DevOps API requests. The target
            must have a `with_options` suitable method: meaning, it can be any of
            the following: `DataAPIClient`, `AsyncDatabase`, `Database`,
            `AsyncCollection`, `Collection`, `AsyncTable`, `Table`, `AstraDBAdmin`,
            `AstraDBDatabaseAdmin`, `DataAPIDatabaseAdmin`.
        destination: a list or a dictionary where the collected events will be stored.
            For dictionaries, events are grouped into lists, one per each event type,
            stored under the corresponding `ObservableEventType` value as dict key.
        event_types: if provided, it's a list of event types so that only
            events matching this filter are processed.

    Returns:
        Yields an instrumented version of the input target, with an added observer
        set to accumulate the received events into the provided destination. Any
        pre-existing observer is untouched.

    Example:
        >>> ev_lst: list[ObservableEvent] = []
        >>> with event_collector(db, destination=ev_lst) as instrumented_db:
        ...     _ = instrumented_db.list_table_names()
        ...     table = instrumented_db.get_table("my_table")
        ...     _ = table.find_one({"k": 101})
        ...
        >>> print(len(ev_lst))
        5
        >>> print(ev_lst[0].event_type)
        ObservableEventType.REQUEST
    """
    observer_id_ = f"observer_{str(uuid7())}"
    observer_: Observer
    if isinstance(destination, list):
        observer_ = Observer.from_event_list(destination, event_types=event_types)
    else:
        observer_ = Observer.from_event_dict(destination, event_types=event_types)
    api_options = APIOptions(event_observers={observer_id_: observer_})
    target_ = target.with_options(api_options=api_options)
    try:
        yield target_
    finally:
        observer_.enabled = False

Create a context manager wrapping a list (of ObservableEvent objects) or a dict (with ObservableEventType keys and list[ObservableEvent] values), for quickly instrumenting a client, database, table, collection or admin object.

This utility is meant to be used in a with statement, so that events emitted by the instrumented classes within the with block are captured into the provided lists/dictionaries. Events emitted by spawned classes (e.g. when a Database creates a Collection in the with block) are also received this way.

Once outside the with block, collection of events stops, but the provided destination can still be accessed according to its ordinary scoping.

Args

target
an object that issues Data API / DevOps API requests. The target must have a with_options suitable method: meaning, it can be any of the following: DataAPIClient, AsyncDatabase, Database, AsyncCollection, Collection, AsyncTable, Table, AstraDBAdmin, AstraDBDatabaseAdmin, DataAPIDatabaseAdmin.
destination
a list or a dictionary where the collected events will be stored. For dictionaries, events are grouped into lists, one per each event type, stored under the corresponding ObservableEventType value as dict key.
event_types
if provided, it's a list of event types so that only events matching this filter are processed.

Returns

Yields an instrumented version of the input target, with an added observer set to accumulate the received events into the provided destination. Any pre-existing observer is untouched.

Example

>>> ev_lst: list[ObservableEvent] = []
>>> with event_collector(db, destination=ev_lst) as instrumented_db:
...     _ = instrumented_db.list_table_names()
...     table = instrumented_db.get_table("my_table")
...     _ = table.find_one({"k": 101})
...
>>> print(len(ev_lst))
5
>>> print(ev_lst[0].event_type)
ObservableEventType.REQUEST

Classes

class OptionAwareDatabaseObject (*args, **kwargs)
Expand source code
class OptionAwareDatabaseObject(Protocol):
    def with_options(
        self,
        *,
        api_options: APIOptions | UnsetType = _UNSET,
    ) -> Self: ...

Base class for protocol classes.

Protocol classes are defined as::

class Proto(Protocol):
    def meth(self) -> int:
        ...

Such classes are primarily used with static type checkers that recognize structural subtyping (static duck-typing).

For example::

class C:
    def meth(self) -> int:
        return 0

def func(x: Proto) -> int:
    return x.meth()

func(C())  # Passes static type check

See PEP 544 for details. Protocol classes decorated with @typing.runtime_checkable act as simple-minded runtime protocols that check only the presence of given attributes, ignoring their type signatures. Protocol classes can be generic, they are defined as::

class GenProto[T](Protocol):
    def meth(self) -> T:
        ...

Ancestors

  • typing.Protocol
  • typing.Generic

Methods

def with_options(self, *, api_options: APIOptions | UnsetType = (unset)) ‑> Self
Expand source code
def with_options(
    self,
    *,
    api_options: APIOptions | UnsetType = _UNSET,
) -> Self: ...