Module astrapy.event_observers.context_managers
Functions
def event_collector(target: DB_OBJ,
*,
destination: list[ObservableEvent] | dict[ObservableEventType, list[ObservableEvent]],
event_types: Iterable[ObservableEventType] | None = None) ‑> Iterator[~DB_OBJ]-
Expand source code
@contextmanager def event_collector( target: DB_OBJ, *, destination: list[ObservableEvent] | dict[ObservableEventType, list[ObservableEvent]], event_types: Iterable[ObservableEventType] | None = None, ) -> Iterator[DB_OBJ]: """ Create a context manager wrapping a list (of `ObservableEvent` objects) or a dict (with `ObservableEventType` keys and `list[ObservableEvent]` values), for quickly instrumenting a client, database, table, collection or admin object. This utility is meant to be used in a `with` statement, so that events emitted by the instrumented classes within the with block are captured into the provided lists/dictionaries. Events emitted by spawned classes (e.g. when a Database creates a Collection in the `with` block) are also received this way. Once outside the `with` block, collection of events stops, but the provided destination can still be accessed according to its ordinary scoping. Args: target: an object that issues Data API / DevOps API requests. The target must have a `with_options` suitable method: meaning, it can be any of the following: `DataAPIClient`, `AsyncDatabase`, `Database`, `AsyncCollection`, `Collection`, `AsyncTable`, `Table`, `AstraDBAdmin`, `AstraDBDatabaseAdmin`, `DataAPIDatabaseAdmin`. destination: a list or a dictionary where the collected events will be stored. For dictionaries, events are grouped into lists, one per each event type, stored under the corresponding `ObservableEventType` value as dict key. event_types: if provided, it's a list of event types so that only events matching this filter are processed. Returns: Yields an instrumented version of the input target, with an added observer set to accumulate the received events into the provided destination. Any pre-existing observer is untouched. Example: >>> ev_lst: list[ObservableEvent] = [] >>> with event_collector(db, destination=ev_lst) as instrumented_db: ... _ = instrumented_db.list_table_names() ... table = instrumented_db.get_table("my_table") ... _ = table.find_one({"k": 101}) ... >>> print(len(ev_lst)) 5 >>> print(ev_lst[0].event_type) ObservableEventType.REQUEST """ observer_id_ = f"observer_{str(uuid7())}" observer_: Observer if isinstance(destination, list): observer_ = Observer.from_event_list(destination, event_types=event_types) else: observer_ = Observer.from_event_dict(destination, event_types=event_types) api_options = APIOptions(event_observers={observer_id_: observer_}) target_ = target.with_options(api_options=api_options) try: yield target_ finally: observer_.enabled = FalseCreate a context manager wrapping a list (of
ObservableEventobjects) or a dict (withObservableEventTypekeys andlist[ObservableEvent]values), for quickly instrumenting a client, database, table, collection or admin object.This utility is meant to be used in a
withstatement, so that events emitted by the instrumented classes within the with block are captured into the provided lists/dictionaries. Events emitted by spawned classes (e.g. when a Database creates a Collection in thewithblock) are also received this way.Once outside the
withblock, collection of events stops, but the provided destination can still be accessed according to its ordinary scoping.Args
target- an object that issues Data API / DevOps API requests. The target
must have a
with_optionssuitable method: meaning, it can be any of the following:DataAPIClient,AsyncDatabase,Database,AsyncCollection,Collection,AsyncTable,Table,AstraDBAdmin,AstraDBDatabaseAdmin,DataAPIDatabaseAdmin. destination- a list or a dictionary where the collected events will be stored.
For dictionaries, events are grouped into lists, one per each event type,
stored under the corresponding
ObservableEventTypevalue as dict key. event_types- if provided, it's a list of event types so that only events matching this filter are processed.
Returns
Yields an instrumented version of the input target, with an added observer set to accumulate the received events into the provided destination. Any pre-existing observer is untouched.
Example
>>> ev_lst: list[ObservableEvent] = [] >>> with event_collector(db, destination=ev_lst) as instrumented_db: ... _ = instrumented_db.list_table_names() ... table = instrumented_db.get_table("my_table") ... _ = table.find_one({"k": 101}) ... >>> print(len(ev_lst)) 5 >>> print(ev_lst[0].event_type) ObservableEventType.REQUEST
Classes
class OptionAwareDatabaseObject (*args, **kwargs)-
Expand source code
class OptionAwareDatabaseObject(Protocol): def with_options( self, *, api_options: APIOptions | UnsetType = _UNSET, ) -> Self: ...Base class for protocol classes.
Protocol classes are defined as::
class Proto(Protocol): def meth(self) -> int: ...Such classes are primarily used with static type checkers that recognize structural subtyping (static duck-typing).
For example::
class C: def meth(self) -> int: return 0 def func(x: Proto) -> int: return x.meth() func(C()) # Passes static type checkSee PEP 544 for details. Protocol classes decorated with @typing.runtime_checkable act as simple-minded runtime protocols that check only the presence of given attributes, ignoring their type signatures. Protocol classes can be generic, they are defined as::
class GenProto[T](Protocol): def meth(self) -> T: ...Ancestors
- typing.Protocol
- typing.Generic
Methods
def with_options(self, *, api_options: APIOptions | UnsetType = (unset)) ‑> Self-
Expand source code
def with_options( self, *, api_options: APIOptions | UnsetType = _UNSET, ) -> Self: ...