Module astrapy.event_observers
Sub-modules
astrapy.event_observers.context_managersastrapy.event_observers.eventsastrapy.event_observers.observers
Functions
def event_collector(target: DB_OBJ,
*,
destination: list[ObservableEvent] | dict[ObservableEventType, list[ObservableEvent]],
event_types: Iterable[ObservableEventType] | None = None) ‑> Iterator[~DB_OBJ]-
Expand source code
@contextmanager def event_collector( target: DB_OBJ, *, destination: list[ObservableEvent] | dict[ObservableEventType, list[ObservableEvent]], event_types: Iterable[ObservableEventType] | None = None, ) -> Iterator[DB_OBJ]: """ Create a context manager wrapping a list (of `ObservableEvent` objects) or a dict (with `ObservableEventType` keys and `list[ObservableEvent]` values), for quickly instrumenting a client, database, table, collection or admin object. This utility is meant to be used in a `with` statement, so that events emitted by the instrumented classes within the with block are captured into the provided lists/dictionaries. Events emitted by spawned classes (e.g. when a Database creates a Collection in the `with` block) are also received this way. Once outside the `with` block, collection of events stops, but the provided destination can still be accessed according to its ordinary scoping. Args: target: an object that issues Data API / DevOps API requests. The target must have a `with_options` suitable method: meaning, it can be any of the following: `DataAPIClient`, `AsyncDatabase`, `Database`, `AsyncCollection`, `Collection`, `AsyncTable`, `Table`, `AstraDBAdmin`, `AstraDBDatabaseAdmin`, `DataAPIDatabaseAdmin`. destination: a list or a dictionary where the collected events will be stored. For dictionaries, events are grouped into lists, one per each event type, stored under the corresponding `ObservableEventType` value as dict key. event_types: if provided, it's a list of event types so that only events matching this filter are processed. Returns: Yields an instrumented version of the input target, with an added observer set to accumulate the received events into the provided destination. Any pre-existing observer is untouched. Example: >>> ev_lst: list[ObservableEvent] = [] >>> with event_collector(db, destination=ev_lst) as instrumented_db: ... _ = instrumented_db.list_table_names() ... table = instrumented_db.get_table("my_table") ... _ = table.find_one({"k": 101}) ... >>> print(len(ev_lst)) 5 >>> print(ev_lst[0].event_type) ObservableEventType.REQUEST """ observer_id_ = f"observer_{str(uuid7())}" observer_: Observer if isinstance(destination, list): observer_ = Observer.from_event_list(destination, event_types=event_types) else: observer_ = Observer.from_event_dict(destination, event_types=event_types) api_options = APIOptions(event_observers={observer_id_: observer_}) target_ = target.with_options(api_options=api_options) try: yield target_ finally: observer_.enabled = FalseCreate a context manager wrapping a list (of
ObservableEventobjects) or a dict (withObservableEventTypekeys andlist[ObservableEvent]values), for quickly instrumenting a client, database, table, collection or admin object.This utility is meant to be used in a
withstatement, so that events emitted by the instrumented classes within the with block are captured into the provided lists/dictionaries. Events emitted by spawned classes (e.g. when a Database creates a Collection in thewithblock) are also received this way.Once outside the
withblock, collection of events stops, but the provided destination can still be accessed according to its ordinary scoping.Args
target- an object that issues Data API / DevOps API requests. The target
must have a
with_optionssuitable method: meaning, it can be any of the following:DataAPIClient,AsyncDatabase,Database,AsyncCollection,Collection,AsyncTable,Table,AstraDBAdmin,AstraDBDatabaseAdmin,DataAPIDatabaseAdmin. destination- a list or a dictionary where the collected events will be stored.
For dictionaries, events are grouped into lists, one per each event type,
stored under the corresponding
ObservableEventTypevalue as dict key. event_types- if provided, it's a list of event types so that only events matching this filter are processed.
Returns
Yields an instrumented version of the input target, with an added observer set to accumulate the received events into the provided destination. Any pre-existing observer is untouched.
Example
>>> ev_lst: list[ObservableEvent] = [] >>> with event_collector(db, destination=ev_lst) as instrumented_db: ... _ = instrumented_db.list_table_names() ... table = instrumented_db.get_table("my_table") ... _ = table.find_one({"k": 101}) ... >>> print(len(ev_lst)) 5 >>> print(ev_lst[0].event_type) ObservableEventType.REQUEST
Classes
class ObservableError (error: DataAPIErrorDescriptor)-
Expand source code
@dataclass class ObservableError(ObservableEvent): """ An event representing an error returned from the Data API in a response. These are dispatched unconditionally to the attached observers as the response is parsed. The actual raising of an exception does not always follow; moreover, further operations may take place before that occurs. Note: Only errors returned within the Data API response in the "errors" field are dispatched this way. The most general exception that can occur during a method call are not necessarily of this form. Attributes: event_type: it has value ObservableEventType.ERROR in this case. error: a descriptor of the error, as found in the Data API response. """ error: DataAPIErrorDescriptor def __init__(self, error: DataAPIErrorDescriptor) -> None: self.event_type = ObservableEventType.ERROR self.error = errorAn event representing an error returned from the Data API in a response.
These are dispatched unconditionally to the attached observers as the response is parsed. The actual raising of an exception does not always follow; moreover, further operations may take place before that occurs.
Note
Only errors returned within the Data API response in the "errors" field are dispatched this way. The most general exception that can occur during a method call are not necessarily of this form.
Attributes
event_type- it has value ObservableEventType.ERROR in this case.
error- a descriptor of the error, as found in the Data API response.
Ancestors
- ObservableEvent
- abc.ABC
Instance variables
var error : DataAPIErrorDescriptor-
The type of the None singleton.
Inherited members
class ObservableEvent (event_type: ObservableEventType)-
Expand source code
@dataclass class ObservableEvent(ABC): """ Class that represents the most general 'event' that is sent to observers. Attributes: event_type: the type of the event, such as "log", "error", or "warning". """ event_type: ObservableEventTypeClass that represents the most general 'event' that is sent to observers.
Attributes
event_type- the type of the event, such as "log", "error", or "warning".
Ancestors
- abc.ABC
Subclasses
Instance variables
var event_type : ObservableEventType-
The type of the None singleton.
class ObservableEventType (*args, **kwds)-
Expand source code
class ObservableEventType(StrEnum): """ Enum for the possible values of the event type for observable events """ WARNING = "warning" ERROR = "error" REQUEST = "request" RESPONSE = "response"Enum for the possible values of the event type for observable events
Ancestors
- StrEnum
- enum.Enum
Class variables
var ERROR-
The type of the None singleton.
var REQUEST-
The type of the None singleton.
var RESPONSE-
The type of the None singleton.
var WARNING-
The type of the None singleton.
Inherited members
class ObservableRequest (payload: str | None,
http_method: str,
url: str,
query_parameters: dict[str, Any] | None,
redacted_headers: dict[str, Any] | None,
dev_ops_api: bool)-
Expand source code
@dataclass class ObservableRequest(ObservableEvent): """ An event representing a request being sent, captured with its payload exactly as will be sent to the API. Attributes: event_type: it has value ObservableEventType.REQUEST in this case. payload: the payload as a string. http_method: one of `astrapy.utils.request_tools.HttpMethod`, e.g. "POST". url: the complete URL the request is targeted at. query_parameters: if present, all query parameters in dict form. redacted_headers: a dictionary of the non-sensitive headers being used for the request. Authentication credentials and API Keys are removed. dev_ops_api: true if and only if the request is aimed at the DevOps API. """ payload: str | None http_method: str url: str query_parameters: dict[str, Any] | None redacted_headers: dict[str, Any] | None dev_ops_api: bool def __init__( self, payload: str | None, http_method: str, url: str, query_parameters: dict[str, Any] | None, redacted_headers: dict[str, Any] | None, dev_ops_api: bool, ) -> None: self.event_type = ObservableEventType.REQUEST self.payload = payload self.http_method = http_method self.url = url self.query_parameters = query_parameters self.redacted_headers = redacted_headers self.dev_ops_api = dev_ops_apiAn event representing a request being sent, captured with its payload exactly as will be sent to the API.
Attributes
event_type- it has value ObservableEventType.REQUEST in this case.
payload- the payload as a string.
http_method- one of
HttpMethod, e.g. "POST". url- the complete URL the request is targeted at.
query_parameters- if present, all query parameters in dict form.
redacted_headers- a dictionary of the non-sensitive headers being used for the request. Authentication credentials and API Keys are removed.
dev_ops_api- true if and only if the request is aimed at the DevOps API.
Ancestors
- ObservableEvent
- abc.ABC
Instance variables
var dev_ops_api : bool-
The type of the None singleton.
var http_method : str-
The type of the None singleton.
var payload : str | None-
The type of the None singleton.
var query_parameters : dict[str, typing.Any] | None-
The type of the None singleton.
var redacted_headers : dict[str, typing.Any] | None-
The type of the None singleton.
var url : str-
The type of the None singleton.
Inherited members
class ObservableResponse (body: str | None, *, status_code: int)-
Expand source code
@dataclass class ObservableResponse(ObservableEvent): """ An event representing a response received by the Data API, whose body is captured exactly as is sent by the Data API. Attributes: event_type: it has value ObservableEventType.RESPONSE in this case. body: a string expressing the response body. status_code: the response HTTP status code. """ body: str | None status_code: int def __init__(self, body: str | None, *, status_code: int) -> None: self.event_type = ObservableEventType.RESPONSE self.body = body self.status_code = status_codeAn event representing a response received by the Data API, whose body is captured exactly as is sent by the Data API.
Attributes
event_type- it has value ObservableEventType.RESPONSE in this case.
body- a string expressing the response body.
status_code- the response HTTP status code.
Ancestors
- ObservableEvent
- abc.ABC
Instance variables
var body : str | None-
The type of the None singleton.
var status_code : int-
The type of the None singleton.
Inherited members
class ObservableWarning (warning: DataAPIWarningDescriptor)-
Expand source code
@dataclass class ObservableWarning(ObservableEvent): """ An event representing a warning returned by a Data API command. These are dispatched to the attached observers as the response is parsed. Attributes: event_type: it has value ObservableEventType.WARNING in this case. warning: a descriptor of the warning, as found in the Data API response. """ warning: DataAPIWarningDescriptor def __init__(self, warning: DataAPIWarningDescriptor) -> None: self.event_type = ObservableEventType.WARNING self.warning = warningAn event representing a warning returned by a Data API command.
These are dispatched to the attached observers as the response is parsed.
Attributes
event_type- it has value ObservableEventType.WARNING in this case.
warning- a descriptor of the warning, as found in the Data API response.
Ancestors
- ObservableEvent
- abc.ABC
Instance variables
var warning : DataAPIWarningDescriptor-
The type of the None singleton.
Inherited members
class Observer-
Expand source code
class Observer(ABC): """ An observer that can be attached to astrapy events through the API options. Users can subclass Observer and provide their implementation of the `receive` method. Request-issuing classes (such as Database or Table) will dispatch events to the observers registered in their API options. This class offers two static factory methods for common use-cases: `from_event_list` and `from_event_dict`. """ enabled: bool = True @abstractmethod def receive( self, event: ObservableEvent, sender: Any = None, function_name: str | None = None, request_id: str | None = None, ) -> None: """Receive and event. Args: event: the event that astrapy is dispatching to the observer. sender: the object directly responsible for generating the event. function_name: when applicable, the name of the function/method that triggered the event. request_id: an optional ID used to group different received events as occurring within the lifecycle of a single HTTP request. """ ... @staticmethod def from_event_list( event_list: list[ObservableEvent], *, event_types: Iterable[ObservableEventType] | None = None, ) -> Observer: """ Create an Observer object wrapping a caller-provided list. The resulting observer will simply append the events it receives into the list. Args: event_list: the list where the caller will find the received events. event_types: if provided, it's a list of event types so that only events matching this filter are processed. Example: >>> from astrapy.api_options import APIOptions >>> from astrapy.event_observers import ObservableEvent, Observer >>> my_ev_list: list[ObservableEvent] = [] >>> my_observer = Observer.from_event_list(my_ev_list) >>> instrumented_table = my_table.with_options( ... api_options=APIOptions( ... event_observers={"obs000": my_observer}, ... ), ... ) >>> # start using 'instrumented_table' ... >>> # ... and then inspect 'my_ev_list' """ class _ObserverFromList(Observer): def __init__( self, _event_list: list[ObservableEvent], _event_types: Iterable[ObservableEventType] | None, ) -> None: self.event_list = _event_list self.event_types = ( set(ObservableEventType.__members__.values()) if _event_types is None else set(_event_types) ) def receive( self, event: ObservableEvent, sender: Any = None, function_name: str | None = None, request_id: str | None = None, ) -> None: if event.event_type in self.event_types: self.event_list.append(event) return _ObserverFromList(event_list, event_types) @staticmethod def from_event_dict( event_dict: dict[ObservableEventType, list[ObservableEvent]], *, event_types: Iterable[ObservableEventType] | None = None, ) -> Observer: """ Create an Observer object wrapping a caller-provided dictionary. The resulting observer will simply append the events it receives into the dictionary, grouped by event type. Dict values are lists of events. Args: event_dict: the dict where the caller will find the received events. event_types: if provided, it's a list of event types so that only events matching this filter are processed. Example: >>> from astrapy.api_options import APIOptions >>> from astrapy.event_observers import ( ... ObservableEvent, ... ObservableEventType, ... Observer, ... ) >>> my_ev_map: dict[ObservableEventType, list[ObservableEvent]] = {} >>> my_observer = Observer.from_event_dict(my_ev_map) >>> instrumented_table = my_table.with_options( ... api_options=APIOptions( ... event_observers={"obs000": my_observer}, ... ), ... ) >>> # start using 'instrumented_table' ... >>> # ... and then inspect 'my_ev_map' """ class _ObserverFromDict(Observer): def __init__( self, _event_dict: dict[ObservableEventType, list[ObservableEvent]], _event_types: Iterable[ObservableEventType] | None, ) -> None: self.event_dict = _event_dict self.event_types = ( set(ObservableEventType.__members__.values()) if _event_types is None else set(_event_types) ) def receive( self, event: ObservableEvent, sender: Any = None, function_name: str | None = None, request_id: str | None = None, ) -> None: if event.event_type in self.event_types: self.event_dict[event.event_type] = self.event_dict.get( event.event_type, [] ) + [event] return _ObserverFromDict(event_dict, event_types)An observer that can be attached to astrapy events through the API options.
Users can subclass Observer and provide their implementation of the
receivemethod. Request-issuing classes (such as Database or Table) will dispatch events to the observers registered in their API options.This class offers two static factory methods for common use-cases:
from_event_listandfrom_event_dict.Ancestors
- abc.ABC
Class variables
var enabled : bool-
The type of the None singleton.
Static methods
def from_event_dict(event_dict: dict[ObservableEventType, list[ObservableEvent]],
*,
event_types: Iterable[ObservableEventType] | None = None) ‑> Observer-
Expand source code
@staticmethod def from_event_dict( event_dict: dict[ObservableEventType, list[ObservableEvent]], *, event_types: Iterable[ObservableEventType] | None = None, ) -> Observer: """ Create an Observer object wrapping a caller-provided dictionary. The resulting observer will simply append the events it receives into the dictionary, grouped by event type. Dict values are lists of events. Args: event_dict: the dict where the caller will find the received events. event_types: if provided, it's a list of event types so that only events matching this filter are processed. Example: >>> from astrapy.api_options import APIOptions >>> from astrapy.event_observers import ( ... ObservableEvent, ... ObservableEventType, ... Observer, ... ) >>> my_ev_map: dict[ObservableEventType, list[ObservableEvent]] = {} >>> my_observer = Observer.from_event_dict(my_ev_map) >>> instrumented_table = my_table.with_options( ... api_options=APIOptions( ... event_observers={"obs000": my_observer}, ... ), ... ) >>> # start using 'instrumented_table' ... >>> # ... and then inspect 'my_ev_map' """ class _ObserverFromDict(Observer): def __init__( self, _event_dict: dict[ObservableEventType, list[ObservableEvent]], _event_types: Iterable[ObservableEventType] | None, ) -> None: self.event_dict = _event_dict self.event_types = ( set(ObservableEventType.__members__.values()) if _event_types is None else set(_event_types) ) def receive( self, event: ObservableEvent, sender: Any = None, function_name: str | None = None, request_id: str | None = None, ) -> None: if event.event_type in self.event_types: self.event_dict[event.event_type] = self.event_dict.get( event.event_type, [] ) + [event] return _ObserverFromDict(event_dict, event_types)Create an Observer object wrapping a caller-provided dictionary.
The resulting observer will simply append the events it receives into the dictionary, grouped by event type. Dict values are lists of events.
Args
event_dict- the dict where the caller will find the received events.
event_types- if provided, it's a list of event types so that only events matching this filter are processed.
Example
>>> from astrapy.api_options import APIOptions >>> from astrapy.event_observers import ( ... ObservableEvent, ... ObservableEventType, ... Observer, ... ) >>> my_ev_map: dict[ObservableEventType, list[ObservableEvent]] = {} >>> my_observer = Observer.from_event_dict(my_ev_map) >>> instrumented_table = my_table.with_options( ... api_options=APIOptions( ... event_observers={"obs000": my_observer}, ... ), ... ) >>> # start using 'instrumented_table' ... >>> # ... and then inspect 'my_ev_map' def from_event_list(event_list: list[ObservableEvent],
*,
event_types: Iterable[ObservableEventType] | None = None) ‑> Observer-
Expand source code
@staticmethod def from_event_list( event_list: list[ObservableEvent], *, event_types: Iterable[ObservableEventType] | None = None, ) -> Observer: """ Create an Observer object wrapping a caller-provided list. The resulting observer will simply append the events it receives into the list. Args: event_list: the list where the caller will find the received events. event_types: if provided, it's a list of event types so that only events matching this filter are processed. Example: >>> from astrapy.api_options import APIOptions >>> from astrapy.event_observers import ObservableEvent, Observer >>> my_ev_list: list[ObservableEvent] = [] >>> my_observer = Observer.from_event_list(my_ev_list) >>> instrumented_table = my_table.with_options( ... api_options=APIOptions( ... event_observers={"obs000": my_observer}, ... ), ... ) >>> # start using 'instrumented_table' ... >>> # ... and then inspect 'my_ev_list' """ class _ObserverFromList(Observer): def __init__( self, _event_list: list[ObservableEvent], _event_types: Iterable[ObservableEventType] | None, ) -> None: self.event_list = _event_list self.event_types = ( set(ObservableEventType.__members__.values()) if _event_types is None else set(_event_types) ) def receive( self, event: ObservableEvent, sender: Any = None, function_name: str | None = None, request_id: str | None = None, ) -> None: if event.event_type in self.event_types: self.event_list.append(event) return _ObserverFromList(event_list, event_types)Create an Observer object wrapping a caller-provided list.
The resulting observer will simply append the events it receives into the list.
Args
event_list- the list where the caller will find the received events.
event_types- if provided, it's a list of event types so that only events matching this filter are processed.
Example
>>> from astrapy.api_options import APIOptions >>> from astrapy.event_observers import ObservableEvent, Observer >>> my_ev_list: list[ObservableEvent] = [] >>> my_observer = Observer.from_event_list(my_ev_list) >>> instrumented_table = my_table.with_options( ... api_options=APIOptions( ... event_observers={"obs000": my_observer}, ... ), ... ) >>> # start using 'instrumented_table' ... >>> # ... and then inspect 'my_ev_list'
Methods
def receive(self,
event: ObservableEvent,
sender: Any = None,
function_name: str | None = None,
request_id: str | None = None) ‑> None-
Expand source code
@abstractmethod def receive( self, event: ObservableEvent, sender: Any = None, function_name: str | None = None, request_id: str | None = None, ) -> None: """Receive and event. Args: event: the event that astrapy is dispatching to the observer. sender: the object directly responsible for generating the event. function_name: when applicable, the name of the function/method that triggered the event. request_id: an optional ID used to group different received events as occurring within the lifecycle of a single HTTP request. """ ...Receive and event.
Args
event- the event that astrapy is dispatching to the observer.
sender- the object directly responsible for generating the event.
function_name- when applicable, the name of the function/method that triggered the event.
request_id- an optional ID used to group different received events as occurring within the lifecycle of a single HTTP request.