Module astrapy.event_observers

Sub-modules

astrapy.event_observers.context_managers
astrapy.event_observers.events
astrapy.event_observers.observers

Functions

def event_collector(target: DB_OBJ,
*,
destination: list[ObservableEvent] | dict[ObservableEventType, list[ObservableEvent]],
event_types: Iterable[ObservableEventType] | None = None) ‑> Iterator[~DB_OBJ]
Expand source code
@contextmanager
def event_collector(
    target: DB_OBJ,
    *,
    destination: list[ObservableEvent]
    | dict[ObservableEventType, list[ObservableEvent]],
    event_types: Iterable[ObservableEventType] | None = None,
) -> Iterator[DB_OBJ]:
    """
    Create a context manager wrapping a list (of `ObservableEvent` objects) or
    a dict (with `ObservableEventType` keys and `list[ObservableEvent]` values),
    for quickly instrumenting a client, database, table, collection or admin object.

    This utility is meant to be used in a `with` statement, so that events emitted
    by the instrumented classes within the with block are captured into the provided
    lists/dictionaries. Events emitted by spawned classes (e.g. when a Database
    creates a Collection in the `with` block) are also received this way.

    Once outside the `with` block, collection of events stops, but the provided
    destination can still be accessed according to its ordinary scoping.

    Args:
        target: an object that issues Data API / DevOps API requests. The target
            must have a `with_options` suitable method: meaning, it can be any of
            the following: `DataAPIClient`, `AsyncDatabase`, `Database`,
            `AsyncCollection`, `Collection`, `AsyncTable`, `Table`, `AstraDBAdmin`,
            `AstraDBDatabaseAdmin`, `DataAPIDatabaseAdmin`.
        destination: a list or a dictionary where the collected events will be stored.
            For dictionaries, events are grouped into lists, one per each event type,
            stored under the corresponding `ObservableEventType` value as dict key.
        event_types: if provided, it's a list of event types so that only
            events matching this filter are processed.

    Returns:
        Yields an instrumented version of the input target, with an added observer
        set to accumulate the received events into the provided destination. Any
        pre-existing observer is untouched.

    Example:
        >>> ev_lst: list[ObservableEvent] = []
        >>> with event_collector(db, destination=ev_lst) as instrumented_db:
        ...     _ = instrumented_db.list_table_names()
        ...     table = instrumented_db.get_table("my_table")
        ...     _ = table.find_one({"k": 101})
        ...
        >>> print(len(ev_lst))
        5
        >>> print(ev_lst[0].event_type)
        ObservableEventType.REQUEST
    """
    observer_id_ = f"observer_{str(uuid7())}"
    observer_: Observer
    if isinstance(destination, list):
        observer_ = Observer.from_event_list(destination, event_types=event_types)
    else:
        observer_ = Observer.from_event_dict(destination, event_types=event_types)
    api_options = APIOptions(event_observers={observer_id_: observer_})
    target_ = target.with_options(api_options=api_options)
    try:
        yield target_
    finally:
        observer_.enabled = False

Create a context manager wrapping a list (of ObservableEvent objects) or a dict (with ObservableEventType keys and list[ObservableEvent] values), for quickly instrumenting a client, database, table, collection or admin object.

This utility is meant to be used in a with statement, so that events emitted by the instrumented classes within the with block are captured into the provided lists/dictionaries. Events emitted by spawned classes (e.g. when a Database creates a Collection in the with block) are also received this way.

Once outside the with block, collection of events stops, but the provided destination can still be accessed according to its ordinary scoping.

Args

target
an object that issues Data API / DevOps API requests. The target must have a with_options suitable method: meaning, it can be any of the following: DataAPIClient, AsyncDatabase, Database, AsyncCollection, Collection, AsyncTable, Table, AstraDBAdmin, AstraDBDatabaseAdmin, DataAPIDatabaseAdmin.
destination
a list or a dictionary where the collected events will be stored. For dictionaries, events are grouped into lists, one per each event type, stored under the corresponding ObservableEventType value as dict key.
event_types
if provided, it's a list of event types so that only events matching this filter are processed.

Returns

Yields an instrumented version of the input target, with an added observer set to accumulate the received events into the provided destination. Any pre-existing observer is untouched.

Example

>>> ev_lst: list[ObservableEvent] = []
>>> with event_collector(db, destination=ev_lst) as instrumented_db:
...     _ = instrumented_db.list_table_names()
...     table = instrumented_db.get_table("my_table")
...     _ = table.find_one({"k": 101})
...
>>> print(len(ev_lst))
5
>>> print(ev_lst[0].event_type)
ObservableEventType.REQUEST

Classes

class ObservableError (error: DataAPIErrorDescriptor)
Expand source code
@dataclass
class ObservableError(ObservableEvent):
    """
    An event representing an error returned from the Data API in a response.

    These are dispatched unconditionally to the attached observers as the
    response is parsed. The actual raising of an exception does not always
    follow; moreover, further operations may take place before that occurs.

    Note:
        Only errors returned within the Data API response in the
        "errors" field are dispatched this way. The most general exception that
        can occur during a method call are not necessarily of this form.

    Attributes:
        event_type: it has value ObservableEventType.ERROR in this case.
        error: a descriptor of the error, as found in the Data API response.
    """

    error: DataAPIErrorDescriptor

    def __init__(self, error: DataAPIErrorDescriptor) -> None:
        self.event_type = ObservableEventType.ERROR
        self.error = error

An event representing an error returned from the Data API in a response.

These are dispatched unconditionally to the attached observers as the response is parsed. The actual raising of an exception does not always follow; moreover, further operations may take place before that occurs.

Note

Only errors returned within the Data API response in the "errors" field are dispatched this way. The most general exception that can occur during a method call are not necessarily of this form.

Attributes

event_type
it has value ObservableEventType.ERROR in this case.
error
a descriptor of the error, as found in the Data API response.

Ancestors

Instance variables

var errorDataAPIErrorDescriptor

The type of the None singleton.

Inherited members

class ObservableEvent (event_type: ObservableEventType)
Expand source code
@dataclass
class ObservableEvent(ABC):
    """
    Class that represents the most general 'event' that is sent to observers.

    Attributes:
        event_type: the type of the event, such as "log", "error", or "warning".
    """

    event_type: ObservableEventType

Class that represents the most general 'event' that is sent to observers.

Attributes

event_type
the type of the event, such as "log", "error", or "warning".

Ancestors

  • abc.ABC

Subclasses

Instance variables

var event_typeObservableEventType

The type of the None singleton.

class ObservableEventType (*args, **kwds)
Expand source code
class ObservableEventType(StrEnum):
    """
    Enum for the possible values of the event type for observable events
    """

    WARNING = "warning"
    ERROR = "error"
    REQUEST = "request"
    RESPONSE = "response"

Enum for the possible values of the event type for observable events

Ancestors

Class variables

var ERROR

The type of the None singleton.

var REQUEST

The type of the None singleton.

var RESPONSE

The type of the None singleton.

var WARNING

The type of the None singleton.

Inherited members

class ObservableRequest (payload: str | None,
http_method: str,
url: str,
query_parameters: dict[str, Any] | None,
redacted_headers: dict[str, Any] | None,
dev_ops_api: bool)
Expand source code
@dataclass
class ObservableRequest(ObservableEvent):
    """
    An event representing a request being sent, captured with its
    payload exactly as will be sent to the API.

    Attributes:
        event_type: it has value ObservableEventType.REQUEST in this case.
        payload: the payload as a string.
        http_method: one of `astrapy.utils.request_tools.HttpMethod`, e.g. "POST".
        url: the complete URL the request is targeted at.
        query_parameters: if present, all query parameters in dict form.
        redacted_headers: a dictionary of the non-sensitive headers being used
            for the request. Authentication credentials and API Keys are removed.
        dev_ops_api: true if and only if the request is aimed at the DevOps API.
    """

    payload: str | None
    http_method: str
    url: str
    query_parameters: dict[str, Any] | None
    redacted_headers: dict[str, Any] | None
    dev_ops_api: bool

    def __init__(
        self,
        payload: str | None,
        http_method: str,
        url: str,
        query_parameters: dict[str, Any] | None,
        redacted_headers: dict[str, Any] | None,
        dev_ops_api: bool,
    ) -> None:
        self.event_type = ObservableEventType.REQUEST
        self.payload = payload
        self.http_method = http_method
        self.url = url
        self.query_parameters = query_parameters
        self.redacted_headers = redacted_headers
        self.dev_ops_api = dev_ops_api

An event representing a request being sent, captured with its payload exactly as will be sent to the API.

Attributes

event_type
it has value ObservableEventType.REQUEST in this case.
payload
the payload as a string.
http_method
one of HttpMethod, e.g. "POST".
url
the complete URL the request is targeted at.
query_parameters
if present, all query parameters in dict form.
redacted_headers
a dictionary of the non-sensitive headers being used for the request. Authentication credentials and API Keys are removed.
dev_ops_api
true if and only if the request is aimed at the DevOps API.

Ancestors

Instance variables

var dev_ops_api : bool

The type of the None singleton.

var http_method : str

The type of the None singleton.

var payload : str | None

The type of the None singleton.

var query_parameters : dict[str, typing.Any] | None

The type of the None singleton.

var redacted_headers : dict[str, typing.Any] | None

The type of the None singleton.

var url : str

The type of the None singleton.

Inherited members

class ObservableResponse (body: str | None, *, status_code: int)
Expand source code
@dataclass
class ObservableResponse(ObservableEvent):
    """
    An event representing a response received by the Data API, whose body
    is captured exactly as is sent by the Data API.

    Attributes:
        event_type: it has value ObservableEventType.RESPONSE in this case.
        body: a string expressing the response body.
        status_code: the response HTTP status code.
    """

    body: str | None
    status_code: int

    def __init__(self, body: str | None, *, status_code: int) -> None:
        self.event_type = ObservableEventType.RESPONSE
        self.body = body
        self.status_code = status_code

An event representing a response received by the Data API, whose body is captured exactly as is sent by the Data API.

Attributes

event_type
it has value ObservableEventType.RESPONSE in this case.
body
a string expressing the response body.
status_code
the response HTTP status code.

Ancestors

Instance variables

var body : str | None

The type of the None singleton.

var status_code : int

The type of the None singleton.

Inherited members

class ObservableWarning (warning: DataAPIWarningDescriptor)
Expand source code
@dataclass
class ObservableWarning(ObservableEvent):
    """
    An event representing a warning returned by a Data API command.

    These are dispatched to the attached observers as the response is parsed.

    Attributes:
        event_type: it has value ObservableEventType.WARNING in this case.
        warning: a descriptor of the warning, as found in the Data API response.
    """

    warning: DataAPIWarningDescriptor

    def __init__(self, warning: DataAPIWarningDescriptor) -> None:
        self.event_type = ObservableEventType.WARNING
        self.warning = warning

An event representing a warning returned by a Data API command.

These are dispatched to the attached observers as the response is parsed.

Attributes

event_type
it has value ObservableEventType.WARNING in this case.
warning
a descriptor of the warning, as found in the Data API response.

Ancestors

Instance variables

var warningDataAPIWarningDescriptor

The type of the None singleton.

Inherited members

class Observer
Expand source code
class Observer(ABC):
    """
    An observer that can be attached to astrapy events through the API options.

    Users can subclass Observer and provide their implementation
    of the `receive` method. Request-issuing classes (such as Database or Table)
    will dispatch events to the observers registered in their API options.

    This class offers two static factory methods for common use-cases:
    `from_event_list` and `from_event_dict`.
    """

    enabled: bool = True

    @abstractmethod
    def receive(
        self,
        event: ObservableEvent,
        sender: Any = None,
        function_name: str | None = None,
        request_id: str | None = None,
    ) -> None:
        """Receive and event.

        Args:
            event: the event that astrapy is dispatching to the observer.
            sender: the object directly responsible for generating the event.
            function_name: when applicable, the name of the function/method
                that triggered the event.
            request_id: an optional ID used to group different received events
                as occurring within the lifecycle of a single HTTP request.
        """
        ...

    @staticmethod
    def from_event_list(
        event_list: list[ObservableEvent],
        *,
        event_types: Iterable[ObservableEventType] | None = None,
    ) -> Observer:
        """
        Create an Observer object wrapping a caller-provided list.

        The resulting observer will simply append the events it receives into
        the list.

        Args:
            event_list: the list where the caller will find the received events.
            event_types: if provided, it's a list of event types so that only
                events matching this filter are processed.

        Example:
            >>> from astrapy.api_options import APIOptions
            >>> from astrapy.event_observers import ObservableEvent, Observer
            >>> my_ev_list: list[ObservableEvent] = []
            >>> my_observer = Observer.from_event_list(my_ev_list)
            >>> instrumented_table = my_table.with_options(
            ...     api_options=APIOptions(
            ...         event_observers={"obs000": my_observer},
            ...     ),
            ... )
            >>> # start using 'instrumented_table' ...
            >>> # ... and then inspect 'my_ev_list'
        """

        class _ObserverFromList(Observer):
            def __init__(
                self,
                _event_list: list[ObservableEvent],
                _event_types: Iterable[ObservableEventType] | None,
            ) -> None:
                self.event_list = _event_list
                self.event_types = (
                    set(ObservableEventType.__members__.values())
                    if _event_types is None
                    else set(_event_types)
                )

            def receive(
                self,
                event: ObservableEvent,
                sender: Any = None,
                function_name: str | None = None,
                request_id: str | None = None,
            ) -> None:
                if event.event_type in self.event_types:
                    self.event_list.append(event)

        return _ObserverFromList(event_list, event_types)

    @staticmethod
    def from_event_dict(
        event_dict: dict[ObservableEventType, list[ObservableEvent]],
        *,
        event_types: Iterable[ObservableEventType] | None = None,
    ) -> Observer:
        """
        Create an Observer object wrapping a caller-provided dictionary.

        The resulting observer will simply append the events it receives into
        the dictionary, grouped by event type. Dict values are lists of events.

        Args:
            event_dict: the dict where the caller will find the received events.
            event_types: if provided, it's a list of event types so that only
                events matching this filter are processed.

        Example:
            >>> from astrapy.api_options import APIOptions
            >>> from astrapy.event_observers import (
            ...     ObservableEvent,
            ...     ObservableEventType,
            ...     Observer,
            ... )
            >>> my_ev_map: dict[ObservableEventType, list[ObservableEvent]] = {}
            >>> my_observer = Observer.from_event_dict(my_ev_map)
            >>> instrumented_table = my_table.with_options(
            ...     api_options=APIOptions(
            ...         event_observers={"obs000": my_observer},
            ...     ),
            ... )
            >>> # start using 'instrumented_table' ...
            >>> # ... and then inspect 'my_ev_map'
        """

        class _ObserverFromDict(Observer):
            def __init__(
                self,
                _event_dict: dict[ObservableEventType, list[ObservableEvent]],
                _event_types: Iterable[ObservableEventType] | None,
            ) -> None:
                self.event_dict = _event_dict
                self.event_types = (
                    set(ObservableEventType.__members__.values())
                    if _event_types is None
                    else set(_event_types)
                )

            def receive(
                self,
                event: ObservableEvent,
                sender: Any = None,
                function_name: str | None = None,
                request_id: str | None = None,
            ) -> None:
                if event.event_type in self.event_types:
                    self.event_dict[event.event_type] = self.event_dict.get(
                        event.event_type, []
                    ) + [event]

        return _ObserverFromDict(event_dict, event_types)

An observer that can be attached to astrapy events through the API options.

Users can subclass Observer and provide their implementation of the receive method. Request-issuing classes (such as Database or Table) will dispatch events to the observers registered in their API options.

This class offers two static factory methods for common use-cases: from_event_list and from_event_dict.

Ancestors

  • abc.ABC

Class variables

var enabled : bool

The type of the None singleton.

Static methods

def from_event_dict(event_dict: dict[ObservableEventType, list[ObservableEvent]],
*,
event_types: Iterable[ObservableEventType] | None = None) ‑> Observer
Expand source code
@staticmethod
def from_event_dict(
    event_dict: dict[ObservableEventType, list[ObservableEvent]],
    *,
    event_types: Iterable[ObservableEventType] | None = None,
) -> Observer:
    """
    Create an Observer object wrapping a caller-provided dictionary.

    The resulting observer will simply append the events it receives into
    the dictionary, grouped by event type. Dict values are lists of events.

    Args:
        event_dict: the dict where the caller will find the received events.
        event_types: if provided, it's a list of event types so that only
            events matching this filter are processed.

    Example:
        >>> from astrapy.api_options import APIOptions
        >>> from astrapy.event_observers import (
        ...     ObservableEvent,
        ...     ObservableEventType,
        ...     Observer,
        ... )
        >>> my_ev_map: dict[ObservableEventType, list[ObservableEvent]] = {}
        >>> my_observer = Observer.from_event_dict(my_ev_map)
        >>> instrumented_table = my_table.with_options(
        ...     api_options=APIOptions(
        ...         event_observers={"obs000": my_observer},
        ...     ),
        ... )
        >>> # start using 'instrumented_table' ...
        >>> # ... and then inspect 'my_ev_map'
    """

    class _ObserverFromDict(Observer):
        def __init__(
            self,
            _event_dict: dict[ObservableEventType, list[ObservableEvent]],
            _event_types: Iterable[ObservableEventType] | None,
        ) -> None:
            self.event_dict = _event_dict
            self.event_types = (
                set(ObservableEventType.__members__.values())
                if _event_types is None
                else set(_event_types)
            )

        def receive(
            self,
            event: ObservableEvent,
            sender: Any = None,
            function_name: str | None = None,
            request_id: str | None = None,
        ) -> None:
            if event.event_type in self.event_types:
                self.event_dict[event.event_type] = self.event_dict.get(
                    event.event_type, []
                ) + [event]

    return _ObserverFromDict(event_dict, event_types)

Create an Observer object wrapping a caller-provided dictionary.

The resulting observer will simply append the events it receives into the dictionary, grouped by event type. Dict values are lists of events.

Args

event_dict
the dict where the caller will find the received events.
event_types
if provided, it's a list of event types so that only events matching this filter are processed.

Example

>>> from astrapy.api_options import APIOptions
>>> from astrapy.event_observers import (
...     ObservableEvent,
...     ObservableEventType,
...     Observer,
... )
>>> my_ev_map: dict[ObservableEventType, list[ObservableEvent]] = {}
>>> my_observer = Observer.from_event_dict(my_ev_map)
>>> instrumented_table = my_table.with_options(
...     api_options=APIOptions(
...         event_observers={"obs000": my_observer},
...     ),
... )
>>> # start using 'instrumented_table' ...
>>> # ... and then inspect 'my_ev_map'
def from_event_list(event_list: list[ObservableEvent],
*,
event_types: Iterable[ObservableEventType] | None = None) ‑> Observer
Expand source code
@staticmethod
def from_event_list(
    event_list: list[ObservableEvent],
    *,
    event_types: Iterable[ObservableEventType] | None = None,
) -> Observer:
    """
    Create an Observer object wrapping a caller-provided list.

    The resulting observer will simply append the events it receives into
    the list.

    Args:
        event_list: the list where the caller will find the received events.
        event_types: if provided, it's a list of event types so that only
            events matching this filter are processed.

    Example:
        >>> from astrapy.api_options import APIOptions
        >>> from astrapy.event_observers import ObservableEvent, Observer
        >>> my_ev_list: list[ObservableEvent] = []
        >>> my_observer = Observer.from_event_list(my_ev_list)
        >>> instrumented_table = my_table.with_options(
        ...     api_options=APIOptions(
        ...         event_observers={"obs000": my_observer},
        ...     ),
        ... )
        >>> # start using 'instrumented_table' ...
        >>> # ... and then inspect 'my_ev_list'
    """

    class _ObserverFromList(Observer):
        def __init__(
            self,
            _event_list: list[ObservableEvent],
            _event_types: Iterable[ObservableEventType] | None,
        ) -> None:
            self.event_list = _event_list
            self.event_types = (
                set(ObservableEventType.__members__.values())
                if _event_types is None
                else set(_event_types)
            )

        def receive(
            self,
            event: ObservableEvent,
            sender: Any = None,
            function_name: str | None = None,
            request_id: str | None = None,
        ) -> None:
            if event.event_type in self.event_types:
                self.event_list.append(event)

    return _ObserverFromList(event_list, event_types)

Create an Observer object wrapping a caller-provided list.

The resulting observer will simply append the events it receives into the list.

Args

event_list
the list where the caller will find the received events.
event_types
if provided, it's a list of event types so that only events matching this filter are processed.

Example

>>> from astrapy.api_options import APIOptions
>>> from astrapy.event_observers import ObservableEvent, Observer
>>> my_ev_list: list[ObservableEvent] = []
>>> my_observer = Observer.from_event_list(my_ev_list)
>>> instrumented_table = my_table.with_options(
...     api_options=APIOptions(
...         event_observers={"obs000": my_observer},
...     ),
... )
>>> # start using 'instrumented_table' ...
>>> # ... and then inspect 'my_ev_list'

Methods

def receive(self,
event: ObservableEvent,
sender: Any = None,
function_name: str | None = None,
request_id: str | None = None) ‑> None
Expand source code
@abstractmethod
def receive(
    self,
    event: ObservableEvent,
    sender: Any = None,
    function_name: str | None = None,
    request_id: str | None = None,
) -> None:
    """Receive and event.

    Args:
        event: the event that astrapy is dispatching to the observer.
        sender: the object directly responsible for generating the event.
        function_name: when applicable, the name of the function/method
            that triggered the event.
        request_id: an optional ID used to group different received events
            as occurring within the lifecycle of a single HTTP request.
    """
    ...

Receive and event.

Args

event
the event that astrapy is dispatching to the observer.
sender
the object directly responsible for generating the event.
function_name
when applicable, the name of the function/method that triggered the event.
request_id
an optional ID used to group different received events as occurring within the lifecycle of a single HTTP request.