Module astrapy.utils.api_commander

Classes

class APICommander (*,
api_endpoint: str,
path: str,
spawner: object | None,
headers: dict[str, str | None] = {},
callers: Sequence[CallerType] = [],
redacted_header_names: Iterable[str] | None = None,
dev_ops_api: bool = False,
event_observers: dict[str, Observer | None] = {},
handle_decimals_writes: bool = False,
handle_decimals_reads: bool = False)
Expand source code
class APICommander:
    client: httpx.Client
    async_client: httpx.AsyncClient

    def __init__(
        self,
        *,
        api_endpoint: str,
        path: str,
        spawner: object | None,
        headers: dict[str, str | None] = {},
        callers: Sequence[CallerType] = [],
        redacted_header_names: Iterable[str] | None = None,
        dev_ops_api: bool = False,
        event_observers: dict[str, Observer | None] = {},
        handle_decimals_writes: bool = False,
        handle_decimals_reads: bool = False,
    ) -> None:
        ssl_control_headers: dict[str, str | None]
        if disable_ssl_reuse:
            self.client = httpx.Client(
                limits=no_pooling_limits,
                verify=CLIENT_SSL_CONTEXT,
            )
            self.async_client = httpx.AsyncClient(
                limits=no_pooling_limits,
                verify=CLIENT_SSL_CONTEXT,
            )
            ssl_control_headers = {"Connection": "close"}
        else:
            self.client = httpx.Client(verify=CLIENT_SSL_CONTEXT)
            self.async_client = httpx.AsyncClient(verify=CLIENT_SSL_CONTEXT)
            ssl_control_headers = {}

        self.api_endpoint = api_endpoint.rstrip("/")
        self.path = path.lstrip("/")
        self.headers: dict[str, str | None] = {**ssl_control_headers, **headers}
        self.callers = callers
        self.redacted_header_names = set(redacted_header_names or [])
        self.upper_full_redacted_header_names = {
            header_name.upper()
            for header_name in (
                self.redacted_header_names | DEFAULT_REDACTED_HEADER_NAMES
            )
        }
        self.dev_ops_api = dev_ops_api
        self.event_observers = event_observers
        self.spawner_ref: weakref.ReferenceType[object] | None
        if spawner:
            self.spawner_ref = weakref.ref(spawner)
        else:
            self.spawner_ref = None
        self.handle_decimals_writes = handle_decimals_writes
        self.handle_decimals_reads = handle_decimals_reads

        self._faulty_response_exc_class: (
            type[UnexpectedDevOpsAPIResponseException]
            | type[UnexpectedDataAPIResponseException]
        )
        self._response_exc_class: (
            type[DevOpsAPIResponseException] | type[DataAPIResponseException]
        )
        self._http_exc_class: type[DataAPIHttpException] | type[DevOpsAPIHttpException]
        if self.dev_ops_api:
            self._faulty_response_exc_class = UnexpectedDevOpsAPIResponseException
            self._response_exc_class = DevOpsAPIResponseException
            self._http_exc_class = DevOpsAPIHttpException
        else:
            self._faulty_response_exc_class = UnexpectedDataAPIResponseException
            self._response_exc_class = DataAPIResponseException
            self._http_exc_class = DataAPIHttpException
        self._api_description = "DevOps API" if self.dev_ops_api else "Data API"

        full_user_agent_string = compose_full_user_agent(
            list(self.callers) + [user_agent_astrapy]
        )
        self.caller_header: dict[str, str] = (
            {"User-Agent": full_user_agent_string} if full_user_agent_string else {}
        )
        self.full_headers: dict[str, str] = {
            k: v
            for k, v in {
                **{
                    "Content-Type": "application/json",
                    "Accept": "application/json",
                },
                **self.caller_header,
                **self.headers,
            }.items()
            if v is not None
        }
        self._loggable_headers = {
            k: v
            if k.upper() not in self.upper_full_redacted_header_names
            else FIXED_SECRET_PLACEHOLDER
            for k, v in self.full_headers.items()
        }
        self.full_path = ("/".join([self.api_endpoint, self.path])).rstrip("/")

    def __repr__(self) -> str:
        pieces = [
            f"api_endpoint={self.api_endpoint}",
            f"path={self.path}",
            f"callers={self.callers}",
            f"dev_ops_api={self.dev_ops_api}",
        ]
        inner_desc = ", ".join(pieces)
        return f"{self.__class__.__name__}({inner_desc})"

    def __eq__(self, other: Any) -> bool:
        if isinstance(other, APICommander):
            return all(
                [
                    self.api_endpoint == other.api_endpoint,
                    self.path == other.path,
                    self.headers == other.headers,
                    self.callers == other.callers,
                    self.redacted_header_names == other.redacted_header_names,
                    self.dev_ops_api == other.dev_ops_api,
                ]
            )
        else:
            return False

    async def __aenter__(self) -> APICommander:
        return self

    async def __aexit__(
        self,
        exc_type: type[BaseException] | None = None,
        exc_value: BaseException | None = None,
        traceback: TracebackType | None = None,
    ) -> None:
        await self.async_client.aclose()

    def _get_spawner(self) -> object | None:
        if self.spawner_ref is None:
            return None
        return self.spawner_ref()

    def _copy(
        self,
        api_endpoint: str | None = None,
        path: str | None = None,
        headers: dict[str, str | None] | None = None,
        callers: Sequence[CallerType] | None = None,
        redacted_header_names: list[str] | None = None,
        dev_ops_api: bool | None = None,
    ) -> APICommander:
        # some care in allowing e.g. {} to override (but not None):
        return APICommander(
            api_endpoint=(
                api_endpoint if api_endpoint is not None else self.api_endpoint
            ),
            path=path if path is not None else self.path,
            spawner=self._get_spawner(),
            headers=headers if headers is not None else self.headers,
            callers=callers if callers is not None else self.callers,
            redacted_header_names=(
                redacted_header_names
                if redacted_header_names is not None
                else self.redacted_header_names
            ),
            dev_ops_api=dev_ops_api if dev_ops_api is not None else self.dev_ops_api,
        )

    def _compose_request_url(self, additional_path: str | None) -> str:
        if additional_path:
            return "/".join([self.full_path.rstrip("/"), additional_path.lstrip("/")])
        else:
            return self.full_path

    def _raw_response_to_json(
        self,
        raw_response: httpx.Response,
        raise_api_errors: bool,
        payload: dict[str, Any] | None,
        caller_function_name: str | None,
        request_id: str,
    ) -> dict[str, Any]:
        # try to process the httpx raw response into a JSON or throw a failure
        raw_response_json: dict[str, Any]
        try:
            if self.handle_decimals_reads:
                # for decimal-aware contents (aka 'tables'), all number-looking things
                # are made into Decimal.
                # (for collections, this will be it. for Tables, schema-aware
                # proper post-processing will refine types, e.g. back to int, ...)
                raw_response_json = self._decimal_aware_parse_json_response(
                    raw_response.text,
                )
            else:
                raw_response_json = self._decimal_unaware_parse_json_response(
                    raw_response.text,
                )
        except ValueError:
            # json() parsing has failed (e.g., empty body)
            if payload is not None:
                command_desc = "/".join(sorted(payload.keys()))
            else:
                command_desc = "(none)"
            raise self._faulty_response_exc_class(
                text=f"Unparseable response from API '{command_desc}' command.",
                raw_response={
                    "raw_response": raw_response.text,
                },
            )

        # no warnings check for DevOps API (there, 'status' may contain a string)
        dictforced_response: dict[str, Any] = (
            raw_response_json if isinstance(raw_response_json, dict) else {}
        )
        if not self.dev_ops_api:
            warning_items: list[str | dict[str, Any]] = (
                dictforced_response.get("status") or {}
            ).get("warnings") or []
            if warning_items:
                warning_descriptors = [
                    DataAPIWarningDescriptor(warning_item)
                    for warning_item in warning_items
                ]
                if self.event_observers:
                    wrn_events = [
                        ObservableWarning(warning=warning_descriptor)
                        for warning_descriptor in warning_descriptors
                    ]
                    sender = self._get_spawner()
                    for ev_obs in self.event_observers.values():
                        if ev_obs is not None and ev_obs.enabled:
                            for wrn_event in wrn_events:
                                ev_obs.receive(
                                    wrn_event,
                                    sender=sender,
                                    function_name=caller_function_name,
                                    request_id=request_id,
                                )
                for warning_descriptor in warning_descriptors:
                    full_warning = (
                        f"The {self._api_description} returned "
                        f"a warning: {warning_descriptor}"
                    )
                    logger.warning(full_warning)

        if "errors" in dictforced_response:
            if self.event_observers:
                err_events = [
                    ObservableError(error=DataAPIErrorDescriptor(err_dict))
                    for err_dict in dictforced_response["errors"]
                ]
                sender = self._get_spawner()
                for ev_obs in self.event_observers.values():
                    if ev_obs is not None and ev_obs.enabled:
                        for err_event in err_events:
                            ev_obs.receive(
                                err_event,
                                sender=sender,
                                function_name=caller_function_name,
                                request_id=request_id,
                            )
            if raise_api_errors:
                logger.warning(
                    f"APICommander about to raise from: {dictforced_response['errors']}"
                )
                raise self._response_exc_class.from_response(
                    command=payload,
                    raw_response=dictforced_response,
                )

        return raw_response_json

    @staticmethod
    def _decimal_unaware_parse_json_response(response_text: str) -> dict[str, Any]:
        return cast(
            dict[str, Any],
            json.loads(response_text),
        )

    @staticmethod
    def _decimal_aware_parse_json_response(response_text: str) -> dict[str, Any]:
        return cast(
            dict[str, Any],
            json.loads(
                response_text,
                parse_float=Decimal,
                parse_int=Decimal,
            ),
        )

    @staticmethod
    def _decimal_unaware_encode_payload(payload: dict[str, Any] | None) -> str | None:
        # This is the JSON encoder in absence of the workaround to treat Decimals
        if payload is not None:
            return json.dumps(
                payload,
                allow_nan=False,
                separators=(",", ":"),
                ensure_ascii=False,
            )
        else:
            return None

    @staticmethod
    def _decimal_aware_encode_payload(payload: dict[str, Any] | None) -> str | None:
        if payload is not None:
            if CHECK_DECIMAL_ESCAPING_CONSISTENCY:
                # check if escaping collision. This is expensive and 99.9999999% useless
                _naive_dump = json.dumps(
                    payload,
                    allow_nan=False,
                    separators=(",", ":"),
                    ensure_ascii=False,
                    cls=_MarkedDecimalDefuser,
                )
                if _MarkedDecimalEncoder._check_mark_match(_naive_dump):
                    raise ValueError(
                        "The pattern to work around Decimals was detected in a "
                        "user-provided item. This payload cannot be JSON-encoded."
                    )
            dec_marked_dump = json.dumps(
                payload,
                allow_nan=False,
                separators=(",", ":"),
                ensure_ascii=False,
                cls=_MarkedDecimalEncoder,
            )
            return _MarkedDecimalEncoder._clean_encoded_string(dec_marked_dump)
        else:
            return None

    def raw_request(
        self,
        *,
        caller_function_name: str | None,
        request_id: str | None,
        http_method: str = HttpMethod.POST,
        payload: dict[str, Any] | None = None,
        additional_path: str | None = None,
        request_params: dict[str, Any] = {},
        timeout_context: _TimeoutContext | None = None,
    ) -> httpx.Response:
        if request_id is None:
            request_id = str(uuid7())
        request_url = self._compose_request_url(additional_path)
        _timeout_context = timeout_context or _TimeoutContext(request_ms=None)
        encoded_payload = (
            self._decimal_aware_encode_payload(payload)
            if self.handle_decimals_writes
            else self._decimal_unaware_encode_payload(payload)
        )
        log_httpx_request(
            http_method=http_method,
            full_url=request_url,
            request_params=request_params,
            redacted_request_headers=self._loggable_headers,
            encoded_payload=encoded_payload,
            timeout_context=_timeout_context,
            caller_function_name=caller_function_name,
        )
        httpx_timeout_s = to_httpx_timeout(_timeout_context)
        if self.event_observers:
            req_event = ObservableRequest(
                payload=encoded_payload,
                http_method=http_method,
                url=request_url,
                query_parameters=request_params,
                redacted_headers=self._loggable_headers,
                dev_ops_api=self.dev_ops_api,
            )
            sender = self._get_spawner()
            for ev_obs in self.event_observers.values():
                if ev_obs is not None and ev_obs.enabled:
                    ev_obs.receive(
                        req_event,
                        sender=sender,
                        function_name=caller_function_name,
                        request_id=request_id,
                    )

        try:
            raw_response = self.client.request(
                method=http_method,
                url=request_url,
                content=encoded_payload.encode()
                if encoded_payload is not None
                else None,
                params=request_params,
                timeout=httpx_timeout_s,
                headers=self.full_headers,
            )
        except httpx.TimeoutException as timeout_exc:
            if self.dev_ops_api:
                raise to_devopsapi_timeout_exception(
                    timeout_exc, timeout_context=_timeout_context
                )
            else:
                raise to_dataapi_timeout_exception(
                    timeout_exc, timeout_context=_timeout_context
                )

        if self.event_observers:
            rsp_event = ObservableResponse(
                body=raw_response.text,
                status_code=raw_response.status_code,
            )
            for ev_obs in self.event_observers.values():
                if ev_obs is not None and ev_obs.enabled:
                    ev_obs.receive(
                        rsp_event,
                        sender=self._get_spawner(),
                        function_name=caller_function_name,
                        request_id=request_id,
                    )
        try:
            raw_response.raise_for_status()
        except httpx.HTTPStatusError as http_exc:
            raise self._http_exc_class.from_httpx_error(http_exc)
        log_httpx_response(response=raw_response)
        return raw_response

    async def async_raw_request(
        self,
        *,
        caller_function_name: str | None,
        request_id: str | None,
        http_method: str = HttpMethod.POST,
        payload: dict[str, Any] | None = None,
        additional_path: str | None = None,
        request_params: dict[str, Any] = {},
        timeout_context: _TimeoutContext | None = None,
    ) -> httpx.Response:
        if request_id is None:
            request_id = str(uuid7())
        request_url = self._compose_request_url(additional_path)
        _timeout_context = timeout_context or _TimeoutContext(request_ms=None)
        encoded_payload = (
            self._decimal_aware_encode_payload(payload)
            if self.handle_decimals_writes
            else self._decimal_unaware_encode_payload(payload)
        )
        log_httpx_request(
            http_method=http_method,
            full_url=request_url,
            request_params=request_params,
            redacted_request_headers=self._loggable_headers,
            encoded_payload=encoded_payload,
            timeout_context=_timeout_context,
            caller_function_name=caller_function_name,
        )
        httpx_timeout_s = to_httpx_timeout(_timeout_context)
        if self.event_observers:
            req_event = ObservableRequest(
                payload=encoded_payload,
                http_method=http_method,
                url=request_url,
                query_parameters=request_params,
                redacted_headers=self._loggable_headers,
                dev_ops_api=self.dev_ops_api,
            )
            sender = self._get_spawner()
            for ev_obs in self.event_observers.values():
                if ev_obs is not None and ev_obs.enabled:
                    ev_obs.receive(
                        req_event,
                        sender=sender,
                        function_name=caller_function_name,
                        request_id=request_id,
                    )

        try:
            raw_response = await self.async_client.request(
                method=http_method,
                url=request_url,
                content=encoded_payload.encode()
                if encoded_payload is not None
                else None,
                params=request_params,
                timeout=httpx_timeout_s,
                headers=self.full_headers,
            )
        except httpx.TimeoutException as timeout_exc:
            if self.dev_ops_api:
                raise to_devopsapi_timeout_exception(
                    timeout_exc, timeout_context=_timeout_context
                )
            else:
                raise to_dataapi_timeout_exception(
                    timeout_exc, timeout_context=_timeout_context
                )

        if self.event_observers:
            rsp_event = ObservableResponse(
                body=raw_response.text,
                status_code=raw_response.status_code,
            )
            sender = self._get_spawner()
            for ev_obs in self.event_observers.values():
                if ev_obs is not None and ev_obs.enabled:
                    ev_obs.receive(
                        rsp_event,
                        sender=sender,
                        function_name=caller_function_name,
                        request_id=request_id,
                    )
        try:
            raw_response.raise_for_status()
        except httpx.HTTPStatusError as http_exc:
            raise self._http_exc_class.from_httpx_error(http_exc)
        log_httpx_response(response=raw_response)
        return raw_response

    def request(
        self,
        *,
        http_method: str = HttpMethod.POST,
        payload: dict[str, Any] | None = None,
        additional_path: str | None = None,
        request_params: dict[str, Any] = {},
        raise_api_errors: bool = True,
        timeout_context: _TimeoutContext | None = None,
        caller_function_name: str | None = None,
    ) -> dict[str, Any]:
        request_id = str(uuid7())
        raw_response = self.raw_request(
            http_method=http_method,
            payload=payload,
            additional_path=additional_path,
            request_params=request_params,
            timeout_context=timeout_context,
            caller_function_name=caller_function_name,
            request_id=request_id,
        )
        return self._raw_response_to_json(
            raw_response,
            raise_api_errors=raise_api_errors,
            payload=payload,
            caller_function_name=caller_function_name,
            request_id=request_id,
        )

    async def async_request(
        self,
        *,
        http_method: str = HttpMethod.POST,
        payload: dict[str, Any] | None = None,
        additional_path: str | None = None,
        request_params: dict[str, Any] = {},
        raise_api_errors: bool = True,
        timeout_context: _TimeoutContext | None = None,
        caller_function_name: str | None = None,
    ) -> dict[str, Any]:
        request_id = str(uuid7())
        raw_response = await self.async_raw_request(
            http_method=http_method,
            payload=payload,
            additional_path=additional_path,
            request_params=request_params,
            timeout_context=timeout_context,
            caller_function_name=caller_function_name,
            request_id=request_id,
        )
        return self._raw_response_to_json(
            raw_response,
            raise_api_errors=raise_api_errors,
            payload=payload,
            caller_function_name=caller_function_name,
            request_id=request_id,
        )

Class variables

var async_client : httpx.AsyncClient

The type of the None singleton.

var client : httpx.Client

The type of the None singleton.

Methods

async def async_raw_request(self,
*,
caller_function_name: str | None,
request_id: str | None,
http_method: str = 'POST',
payload: dict[str, Any] | None = None,
additional_path: str | None = None,
request_params: dict[str, Any] = {},
timeout_context: _TimeoutContext | None = None) ‑> httpx.Response
Expand source code
async def async_raw_request(
    self,
    *,
    caller_function_name: str | None,
    request_id: str | None,
    http_method: str = HttpMethod.POST,
    payload: dict[str, Any] | None = None,
    additional_path: str | None = None,
    request_params: dict[str, Any] = {},
    timeout_context: _TimeoutContext | None = None,
) -> httpx.Response:
    if request_id is None:
        request_id = str(uuid7())
    request_url = self._compose_request_url(additional_path)
    _timeout_context = timeout_context or _TimeoutContext(request_ms=None)
    encoded_payload = (
        self._decimal_aware_encode_payload(payload)
        if self.handle_decimals_writes
        else self._decimal_unaware_encode_payload(payload)
    )
    log_httpx_request(
        http_method=http_method,
        full_url=request_url,
        request_params=request_params,
        redacted_request_headers=self._loggable_headers,
        encoded_payload=encoded_payload,
        timeout_context=_timeout_context,
        caller_function_name=caller_function_name,
    )
    httpx_timeout_s = to_httpx_timeout(_timeout_context)
    if self.event_observers:
        req_event = ObservableRequest(
            payload=encoded_payload,
            http_method=http_method,
            url=request_url,
            query_parameters=request_params,
            redacted_headers=self._loggable_headers,
            dev_ops_api=self.dev_ops_api,
        )
        sender = self._get_spawner()
        for ev_obs in self.event_observers.values():
            if ev_obs is not None and ev_obs.enabled:
                ev_obs.receive(
                    req_event,
                    sender=sender,
                    function_name=caller_function_name,
                    request_id=request_id,
                )

    try:
        raw_response = await self.async_client.request(
            method=http_method,
            url=request_url,
            content=encoded_payload.encode()
            if encoded_payload is not None
            else None,
            params=request_params,
            timeout=httpx_timeout_s,
            headers=self.full_headers,
        )
    except httpx.TimeoutException as timeout_exc:
        if self.dev_ops_api:
            raise to_devopsapi_timeout_exception(
                timeout_exc, timeout_context=_timeout_context
            )
        else:
            raise to_dataapi_timeout_exception(
                timeout_exc, timeout_context=_timeout_context
            )

    if self.event_observers:
        rsp_event = ObservableResponse(
            body=raw_response.text,
            status_code=raw_response.status_code,
        )
        sender = self._get_spawner()
        for ev_obs in self.event_observers.values():
            if ev_obs is not None and ev_obs.enabled:
                ev_obs.receive(
                    rsp_event,
                    sender=sender,
                    function_name=caller_function_name,
                    request_id=request_id,
                )
    try:
        raw_response.raise_for_status()
    except httpx.HTTPStatusError as http_exc:
        raise self._http_exc_class.from_httpx_error(http_exc)
    log_httpx_response(response=raw_response)
    return raw_response
async def async_request(self,
*,
http_method: str = 'POST',
payload: dict[str, Any] | None = None,
additional_path: str | None = None,
request_params: dict[str, Any] = {},
raise_api_errors: bool = True,
timeout_context: _TimeoutContext | None = None,
caller_function_name: str | None = None) ‑> dict[str, typing.Any]
Expand source code
async def async_request(
    self,
    *,
    http_method: str = HttpMethod.POST,
    payload: dict[str, Any] | None = None,
    additional_path: str | None = None,
    request_params: dict[str, Any] = {},
    raise_api_errors: bool = True,
    timeout_context: _TimeoutContext | None = None,
    caller_function_name: str | None = None,
) -> dict[str, Any]:
    request_id = str(uuid7())
    raw_response = await self.async_raw_request(
        http_method=http_method,
        payload=payload,
        additional_path=additional_path,
        request_params=request_params,
        timeout_context=timeout_context,
        caller_function_name=caller_function_name,
        request_id=request_id,
    )
    return self._raw_response_to_json(
        raw_response,
        raise_api_errors=raise_api_errors,
        payload=payload,
        caller_function_name=caller_function_name,
        request_id=request_id,
    )
def raw_request(self,
*,
caller_function_name: str | None,
request_id: str | None,
http_method: str = 'POST',
payload: dict[str, Any] | None = None,
additional_path: str | None = None,
request_params: dict[str, Any] = {},
timeout_context: _TimeoutContext | None = None) ‑> httpx.Response
Expand source code
def raw_request(
    self,
    *,
    caller_function_name: str | None,
    request_id: str | None,
    http_method: str = HttpMethod.POST,
    payload: dict[str, Any] | None = None,
    additional_path: str | None = None,
    request_params: dict[str, Any] = {},
    timeout_context: _TimeoutContext | None = None,
) -> httpx.Response:
    if request_id is None:
        request_id = str(uuid7())
    request_url = self._compose_request_url(additional_path)
    _timeout_context = timeout_context or _TimeoutContext(request_ms=None)
    encoded_payload = (
        self._decimal_aware_encode_payload(payload)
        if self.handle_decimals_writes
        else self._decimal_unaware_encode_payload(payload)
    )
    log_httpx_request(
        http_method=http_method,
        full_url=request_url,
        request_params=request_params,
        redacted_request_headers=self._loggable_headers,
        encoded_payload=encoded_payload,
        timeout_context=_timeout_context,
        caller_function_name=caller_function_name,
    )
    httpx_timeout_s = to_httpx_timeout(_timeout_context)
    if self.event_observers:
        req_event = ObservableRequest(
            payload=encoded_payload,
            http_method=http_method,
            url=request_url,
            query_parameters=request_params,
            redacted_headers=self._loggable_headers,
            dev_ops_api=self.dev_ops_api,
        )
        sender = self._get_spawner()
        for ev_obs in self.event_observers.values():
            if ev_obs is not None and ev_obs.enabled:
                ev_obs.receive(
                    req_event,
                    sender=sender,
                    function_name=caller_function_name,
                    request_id=request_id,
                )

    try:
        raw_response = self.client.request(
            method=http_method,
            url=request_url,
            content=encoded_payload.encode()
            if encoded_payload is not None
            else None,
            params=request_params,
            timeout=httpx_timeout_s,
            headers=self.full_headers,
        )
    except httpx.TimeoutException as timeout_exc:
        if self.dev_ops_api:
            raise to_devopsapi_timeout_exception(
                timeout_exc, timeout_context=_timeout_context
            )
        else:
            raise to_dataapi_timeout_exception(
                timeout_exc, timeout_context=_timeout_context
            )

    if self.event_observers:
        rsp_event = ObservableResponse(
            body=raw_response.text,
            status_code=raw_response.status_code,
        )
        for ev_obs in self.event_observers.values():
            if ev_obs is not None and ev_obs.enabled:
                ev_obs.receive(
                    rsp_event,
                    sender=self._get_spawner(),
                    function_name=caller_function_name,
                    request_id=request_id,
                )
    try:
        raw_response.raise_for_status()
    except httpx.HTTPStatusError as http_exc:
        raise self._http_exc_class.from_httpx_error(http_exc)
    log_httpx_response(response=raw_response)
    return raw_response
def request(self,
*,
http_method: str = 'POST',
payload: dict[str, Any] | None = None,
additional_path: str | None = None,
request_params: dict[str, Any] = {},
raise_api_errors: bool = True,
timeout_context: _TimeoutContext | None = None,
caller_function_name: str | None = None) ‑> dict[str, typing.Any]
Expand source code
def request(
    self,
    *,
    http_method: str = HttpMethod.POST,
    payload: dict[str, Any] | None = None,
    additional_path: str | None = None,
    request_params: dict[str, Any] = {},
    raise_api_errors: bool = True,
    timeout_context: _TimeoutContext | None = None,
    caller_function_name: str | None = None,
) -> dict[str, Any]:
    request_id = str(uuid7())
    raw_response = self.raw_request(
        http_method=http_method,
        payload=payload,
        additional_path=additional_path,
        request_params=request_params,
        timeout_context=timeout_context,
        caller_function_name=caller_function_name,
        request_id=request_id,
    )
    return self._raw_response_to_json(
        raw_response,
        raise_api_errors=raise_api_errors,
        payload=payload,
        caller_function_name=caller_function_name,
        request_id=request_id,
    )