Module astrapy.event_observers.observers

Classes

class Observer
Expand source code
class Observer(ABC):
    """
    An observer that can be attached to astrapy events through the API options.

    Users can subclass Observer and provide their implementation
    of the `receive` method. Request-issuing classes (such as Database or Table)
    will dispatch events to the observers registered in their API options.

    This class offers two static factory methods for common use-cases:
    `from_event_list` and `from_event_dict`.
    """

    enabled: bool = True

    @abstractmethod
    def receive(
        self,
        event: ObservableEvent,
        sender: Any = None,
        function_name: str | None = None,
        request_id: str | None = None,
    ) -> None:
        """Receive and event.

        Args:
            event: the event that astrapy is dispatching to the observer.
            sender: the object directly responsible for generating the event.
            function_name: when applicable, the name of the function/method
                that triggered the event.
            request_id: an optional ID used to group different received events
                as occurring within the lifecycle of a single HTTP request.
        """
        ...

    @staticmethod
    def from_event_list(
        event_list: list[ObservableEvent],
        *,
        event_types: Iterable[ObservableEventType] | None = None,
    ) -> Observer:
        """
        Create an Observer object wrapping a caller-provided list.

        The resulting observer will simply append the events it receives into
        the list.

        Args:
            event_list: the list where the caller will find the received events.
            event_types: if provided, it's a list of event types so that only
                events matching this filter are processed.

        Example:
            >>> from astrapy.api_options import APIOptions
            >>> from astrapy.event_observers import ObservableEvent, Observer
            >>> my_ev_list: list[ObservableEvent] = []
            >>> my_observer = Observer.from_event_list(my_ev_list)
            >>> instrumented_table = my_table.with_options(
            ...     api_options=APIOptions(
            ...         event_observers={"obs000": my_observer},
            ...     ),
            ... )
            >>> # start using 'instrumented_table' ...
            >>> # ... and then inspect 'my_ev_list'
        """

        class _ObserverFromList(Observer):
            def __init__(
                self,
                _event_list: list[ObservableEvent],
                _event_types: Iterable[ObservableEventType] | None,
            ) -> None:
                self.event_list = _event_list
                self.event_types = (
                    set(ObservableEventType.__members__.values())
                    if _event_types is None
                    else set(_event_types)
                )

            def receive(
                self,
                event: ObservableEvent,
                sender: Any = None,
                function_name: str | None = None,
                request_id: str | None = None,
            ) -> None:
                if event.event_type in self.event_types:
                    self.event_list.append(event)

        return _ObserverFromList(event_list, event_types)

    @staticmethod
    def from_event_dict(
        event_dict: dict[ObservableEventType, list[ObservableEvent]],
        *,
        event_types: Iterable[ObservableEventType] | None = None,
    ) -> Observer:
        """
        Create an Observer object wrapping a caller-provided dictionary.

        The resulting observer will simply append the events it receives into
        the dictionary, grouped by event type. Dict values are lists of events.

        Args:
            event_dict: the dict where the caller will find the received events.
            event_types: if provided, it's a list of event types so that only
                events matching this filter are processed.

        Example:
            >>> from astrapy.api_options import APIOptions
            >>> from astrapy.event_observers import (
            ...     ObservableEvent,
            ...     ObservableEventType,
            ...     Observer,
            ... )
            >>> my_ev_map: dict[ObservableEventType, list[ObservableEvent]] = {}
            >>> my_observer = Observer.from_event_dict(my_ev_map)
            >>> instrumented_table = my_table.with_options(
            ...     api_options=APIOptions(
            ...         event_observers={"obs000": my_observer},
            ...     ),
            ... )
            >>> # start using 'instrumented_table' ...
            >>> # ... and then inspect 'my_ev_map'
        """

        class _ObserverFromDict(Observer):
            def __init__(
                self,
                _event_dict: dict[ObservableEventType, list[ObservableEvent]],
                _event_types: Iterable[ObservableEventType] | None,
            ) -> None:
                self.event_dict = _event_dict
                self.event_types = (
                    set(ObservableEventType.__members__.values())
                    if _event_types is None
                    else set(_event_types)
                )

            def receive(
                self,
                event: ObservableEvent,
                sender: Any = None,
                function_name: str | None = None,
                request_id: str | None = None,
            ) -> None:
                if event.event_type in self.event_types:
                    self.event_dict[event.event_type] = self.event_dict.get(
                        event.event_type, []
                    ) + [event]

        return _ObserverFromDict(event_dict, event_types)

An observer that can be attached to astrapy events through the API options.

Users can subclass Observer and provide their implementation of the receive method. Request-issuing classes (such as Database or Table) will dispatch events to the observers registered in their API options.

This class offers two static factory methods for common use-cases: from_event_list and from_event_dict.

Ancestors

  • abc.ABC

Class variables

var enabled : bool

The type of the None singleton.

Static methods

def from_event_dict(event_dict: dict[ObservableEventType, list[ObservableEvent]],
*,
event_types: Iterable[ObservableEventType] | None = None) ‑> Observer
Expand source code
@staticmethod
def from_event_dict(
    event_dict: dict[ObservableEventType, list[ObservableEvent]],
    *,
    event_types: Iterable[ObservableEventType] | None = None,
) -> Observer:
    """
    Create an Observer object wrapping a caller-provided dictionary.

    The resulting observer will simply append the events it receives into
    the dictionary, grouped by event type. Dict values are lists of events.

    Args:
        event_dict: the dict where the caller will find the received events.
        event_types: if provided, it's a list of event types so that only
            events matching this filter are processed.

    Example:
        >>> from astrapy.api_options import APIOptions
        >>> from astrapy.event_observers import (
        ...     ObservableEvent,
        ...     ObservableEventType,
        ...     Observer,
        ... )
        >>> my_ev_map: dict[ObservableEventType, list[ObservableEvent]] = {}
        >>> my_observer = Observer.from_event_dict(my_ev_map)
        >>> instrumented_table = my_table.with_options(
        ...     api_options=APIOptions(
        ...         event_observers={"obs000": my_observer},
        ...     ),
        ... )
        >>> # start using 'instrumented_table' ...
        >>> # ... and then inspect 'my_ev_map'
    """

    class _ObserverFromDict(Observer):
        def __init__(
            self,
            _event_dict: dict[ObservableEventType, list[ObservableEvent]],
            _event_types: Iterable[ObservableEventType] | None,
        ) -> None:
            self.event_dict = _event_dict
            self.event_types = (
                set(ObservableEventType.__members__.values())
                if _event_types is None
                else set(_event_types)
            )

        def receive(
            self,
            event: ObservableEvent,
            sender: Any = None,
            function_name: str | None = None,
            request_id: str | None = None,
        ) -> None:
            if event.event_type in self.event_types:
                self.event_dict[event.event_type] = self.event_dict.get(
                    event.event_type, []
                ) + [event]

    return _ObserverFromDict(event_dict, event_types)

Create an Observer object wrapping a caller-provided dictionary.

The resulting observer will simply append the events it receives into the dictionary, grouped by event type. Dict values are lists of events.

Args

event_dict
the dict where the caller will find the received events.
event_types
if provided, it's a list of event types so that only events matching this filter are processed.

Example

>>> from astrapy.api_options import APIOptions
>>> from astrapy.event_observers import (
...     ObservableEvent,
...     ObservableEventType,
...     Observer,
... )
>>> my_ev_map: dict[ObservableEventType, list[ObservableEvent]] = {}
>>> my_observer = Observer.from_event_dict(my_ev_map)
>>> instrumented_table = my_table.with_options(
...     api_options=APIOptions(
...         event_observers={"obs000": my_observer},
...     ),
... )
>>> # start using 'instrumented_table' ...
>>> # ... and then inspect 'my_ev_map'
def from_event_list(event_list: list[ObservableEvent],
*,
event_types: Iterable[ObservableEventType] | None = None) ‑> Observer
Expand source code
@staticmethod
def from_event_list(
    event_list: list[ObservableEvent],
    *,
    event_types: Iterable[ObservableEventType] | None = None,
) -> Observer:
    """
    Create an Observer object wrapping a caller-provided list.

    The resulting observer will simply append the events it receives into
    the list.

    Args:
        event_list: the list where the caller will find the received events.
        event_types: if provided, it's a list of event types so that only
            events matching this filter are processed.

    Example:
        >>> from astrapy.api_options import APIOptions
        >>> from astrapy.event_observers import ObservableEvent, Observer
        >>> my_ev_list: list[ObservableEvent] = []
        >>> my_observer = Observer.from_event_list(my_ev_list)
        >>> instrumented_table = my_table.with_options(
        ...     api_options=APIOptions(
        ...         event_observers={"obs000": my_observer},
        ...     ),
        ... )
        >>> # start using 'instrumented_table' ...
        >>> # ... and then inspect 'my_ev_list'
    """

    class _ObserverFromList(Observer):
        def __init__(
            self,
            _event_list: list[ObservableEvent],
            _event_types: Iterable[ObservableEventType] | None,
        ) -> None:
            self.event_list = _event_list
            self.event_types = (
                set(ObservableEventType.__members__.values())
                if _event_types is None
                else set(_event_types)
            )

        def receive(
            self,
            event: ObservableEvent,
            sender: Any = None,
            function_name: str | None = None,
            request_id: str | None = None,
        ) -> None:
            if event.event_type in self.event_types:
                self.event_list.append(event)

    return _ObserverFromList(event_list, event_types)

Create an Observer object wrapping a caller-provided list.

The resulting observer will simply append the events it receives into the list.

Args

event_list
the list where the caller will find the received events.
event_types
if provided, it's a list of event types so that only events matching this filter are processed.

Example

>>> from astrapy.api_options import APIOptions
>>> from astrapy.event_observers import ObservableEvent, Observer
>>> my_ev_list: list[ObservableEvent] = []
>>> my_observer = Observer.from_event_list(my_ev_list)
>>> instrumented_table = my_table.with_options(
...     api_options=APIOptions(
...         event_observers={"obs000": my_observer},
...     ),
... )
>>> # start using 'instrumented_table' ...
>>> # ... and then inspect 'my_ev_list'

Methods

def receive(self,
event: ObservableEvent,
sender: Any = None,
function_name: str | None = None,
request_id: str | None = None) ‑> None
Expand source code
@abstractmethod
def receive(
    self,
    event: ObservableEvent,
    sender: Any = None,
    function_name: str | None = None,
    request_id: str | None = None,
) -> None:
    """Receive and event.

    Args:
        event: the event that astrapy is dispatching to the observer.
        sender: the object directly responsible for generating the event.
        function_name: when applicable, the name of the function/method
            that triggered the event.
        request_id: an optional ID used to group different received events
            as occurring within the lifecycle of a single HTTP request.
    """
    ...

Receive and event.

Args

event
the event that astrapy is dispatching to the observer.
sender
the object directly responsible for generating the event.
function_name
when applicable, the name of the function/method that triggered the event.
request_id
an optional ID used to group different received events as occurring within the lifecycle of a single HTTP request.