Module astrapy.utils.api_commander

Expand source code
# Copyright DataStax, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import annotations

import json
import logging
import re
from decimal import Decimal
from types import TracebackType
from typing import Any, Iterable, Sequence, cast

import httpx

from astrapy.constants import CallerType
from astrapy.exceptions import (
    DataAPIHttpException,
    DataAPIResponseException,
    DevOpsAPIHttpException,
    DevOpsAPIResponseException,
    UnexpectedDataAPIResponseException,
    UnexpectedDevOpsAPIResponseException,
    _TimeoutContext,
    to_dataapi_timeout_exception,
    to_devopsapi_timeout_exception,
)
from astrapy.settings.defaults import (
    CHECK_DECIMAL_ESCAPING_CONSISTENCY,
    DEFAULT_REDACTED_HEADER_NAMES,
    FIXED_SECRET_PLACEHOLDER,
)
from astrapy.utils.request_tools import (
    HttpMethod,
    log_httpx_request,
    log_httpx_response,
    to_httpx_timeout,
)
from astrapy.utils.user_agents import (
    compose_full_user_agent,
    detect_astrapy_user_agent,
)

user_agent_astrapy = detect_astrapy_user_agent()

logger = logging.getLogger(__name__)


# these are a mixture from disparate alphabet, to minimize the chance
# of a collision with user-provided actual content:
DECIMAL_MARKER_PREFIX_STR = "𐐏丂"
DECIMAL_MARKER_SUFFIX_STR = "∀🇦🇫"
DECIMAL_CLEANER_PATTERN = re.compile(
    f'"{DECIMAL_MARKER_PREFIX_STR}([0-9.]+){DECIMAL_MARKER_SUFFIX_STR}"'
)


class _MarkedDecimalDefuser(json.JSONEncoder):
    def default(self, obj: object) -> Any:
        if isinstance(obj, Decimal):
            return "(defused decimal)"
        return super().default(obj)


class _MarkedDecimalEncoder(json.JSONEncoder):
    def default(self, obj: object) -> Any:
        if isinstance(obj, Decimal):
            return f"{DECIMAL_MARKER_PREFIX_STR}{obj}{DECIMAL_MARKER_SUFFIX_STR}"
        return super().default(obj)

    @staticmethod
    def _check_mark_match(json_string: str) -> bool:
        return bool(DECIMAL_CLEANER_PATTERN.search(json_string))

    @staticmethod
    def _clean_encoded_string(json_string: str) -> str:
        return re.sub(DECIMAL_CLEANER_PATTERN, r"\1", json_string)


class APICommander:
    client = httpx.Client()

    def __init__(
        self,
        *,
        api_endpoint: str,
        path: str,
        headers: dict[str, str | None] = {},
        callers: Sequence[CallerType] = [],
        redacted_header_names: Iterable[str] | None = None,
        dev_ops_api: bool = False,
        handle_decimals_writes: bool = False,
        handle_decimals_reads: bool = False,
    ) -> None:
        self.async_client = httpx.AsyncClient()
        self.api_endpoint = api_endpoint.rstrip("/")
        self.path = path.lstrip("/")
        self.headers = headers
        self.callers = callers
        self.redacted_header_names = set(redacted_header_names or [])
        self.upper_full_redacted_header_names = {
            header_name.upper()
            for header_name in (
                self.redacted_header_names | DEFAULT_REDACTED_HEADER_NAMES
            )
        }
        self.dev_ops_api = dev_ops_api
        self.handle_decimals_writes = handle_decimals_writes
        self.handle_decimals_reads = handle_decimals_reads

        self._faulty_response_exc_class: (
            type[UnexpectedDevOpsAPIResponseException]
            | type[UnexpectedDataAPIResponseException]
        )
        self._response_exc_class: (
            type[DevOpsAPIResponseException] | type[DataAPIResponseException]
        )
        self._http_exc_class: type[DataAPIHttpException] | type[DevOpsAPIHttpException]
        if self.dev_ops_api:
            self._faulty_response_exc_class = UnexpectedDevOpsAPIResponseException
            self._response_exc_class = DevOpsAPIResponseException
            self._http_exc_class = DevOpsAPIHttpException
        else:
            self._faulty_response_exc_class = UnexpectedDataAPIResponseException
            self._response_exc_class = DataAPIResponseException
            self._http_exc_class = DataAPIHttpException
        self._api_description = "DevOps API" if self.dev_ops_api else "Data API"

        full_user_agent_string = compose_full_user_agent(
            list(self.callers) + [user_agent_astrapy]
        )
        self.caller_header: dict[str, str] = (
            {"User-Agent": full_user_agent_string} if full_user_agent_string else {}
        )
        self.full_headers: dict[str, str] = {
            k: v
            for k, v in {
                **{
                    "Content-Type": "application/json",
                    "Accept": "application/json",
                },
                **self.caller_header,
                **self.headers,
            }.items()
            if v is not None
        }
        self._loggable_headers = {
            k: v
            if k.upper() not in self.upper_full_redacted_header_names
            else FIXED_SECRET_PLACEHOLDER
            for k, v in self.full_headers.items()
        }
        self.full_path = ("/".join([self.api_endpoint, self.path])).rstrip("/")

    def __repr__(self) -> str:
        pieces = [
            pc
            for pc in (
                f"api_endpoint={self.api_endpoint}",
                f"path={self.path}",
                f"callers={self.callers}",
                f"dev_ops_api={self.dev_ops_api}",
            )
            if pc is not None
        ]
        inner_desc = ", ".join(pieces)
        return f"{self.__class__.__name__}({inner_desc})"

    def __eq__(self, other: Any) -> bool:
        if isinstance(other, APICommander):
            return all(
                [
                    self.api_endpoint == other.api_endpoint,
                    self.path == other.path,
                    self.headers == other.headers,
                    self.callers == other.callers,
                    self.redacted_header_names == other.redacted_header_names,
                    self.dev_ops_api == other.dev_ops_api,
                ]
            )
        else:
            return False

    async def __aenter__(self) -> APICommander:
        return self

    async def __aexit__(
        self,
        exc_type: type[BaseException] | None = None,
        exc_value: BaseException | None = None,
        traceback: TracebackType | None = None,
    ) -> None:
        await self.async_client.aclose()

    def _copy(
        self,
        api_endpoint: str | None = None,
        path: str | None = None,
        headers: dict[str, str | None] | None = None,
        callers: Sequence[CallerType] | None = None,
        redacted_header_names: list[str] | None = None,
        dev_ops_api: bool | None = None,
    ) -> APICommander:
        # some care in allowing e.g. {} to override (but not None):
        return APICommander(
            api_endpoint=(
                api_endpoint if api_endpoint is not None else self.api_endpoint
            ),
            path=path if path is not None else self.path,
            headers=headers if headers is not None else self.headers,
            callers=callers if callers is not None else self.callers,
            redacted_header_names=(
                redacted_header_names
                if redacted_header_names is not None
                else self.redacted_header_names
            ),
            dev_ops_api=dev_ops_api if dev_ops_api is not None else self.dev_ops_api,
        )

    def _compose_request_url(self, additional_path: str | None) -> str:
        if additional_path:
            return "/".join([self.full_path.rstrip("/"), additional_path.lstrip("/")])
        else:
            return self.full_path

    def _raw_response_to_json(
        self,
        raw_response: httpx.Response,
        raise_api_errors: bool,
        payload: dict[str, Any] | None,
    ) -> dict[str, Any]:
        # try to process the httpx raw response into a JSON or throw a failure
        raw_response_json: dict[str, Any]
        try:
            if self.handle_decimals_reads:
                # for decimal-aware contents (aka 'tables'), all number-looking things
                # are made into Decimal.
                # (for collections, this will be it. for Tables, schema-aware
                # proper post-processing will refine types, e.g. back to int, ...)
                raw_response_json = self._decimal_aware_parse_json_response(
                    raw_response.text,
                )
            else:
                raw_response_json = self._decimal_unaware_parse_json_response(
                    raw_response.text,
                )
        except ValueError:
            # json() parsing has failed (e.g., empty body)
            if payload is not None:
                command_desc = "/".join(sorted(payload.keys()))
            else:
                command_desc = "(none)"
            raise self._faulty_response_exc_class(
                text=f"Unparseable response from API '{command_desc}' command.",
                raw_response={
                    "raw_response": raw_response.text,
                },
            )

        if raise_api_errors and "errors" in raw_response_json:
            logger.warning(
                f"APICommander about to raise from: {raw_response_json['errors']}"
            )
            raise self._response_exc_class.from_response(
                command=payload,
                raw_response=raw_response_json,
            )

        # no warnings check for DevOps API (there, 'status' may contain a string)
        if not self.dev_ops_api:
            warning_messages: list[str] = (raw_response_json.get("status") or {}).get(
                "warnings"
            ) or []
            if warning_messages:
                for warning_message in warning_messages:
                    full_warning = f"The {self._api_description} returned a warning: {warning_message}"
                    logger.warning(full_warning)

        return raw_response_json

    @staticmethod
    def _decimal_unaware_parse_json_response(response_text: str) -> dict[str, Any]:
        return cast(
            dict[str, Any],
            json.loads(response_text),
        )

    @staticmethod
    def _decimal_aware_parse_json_response(response_text: str) -> dict[str, Any]:
        return cast(
            dict[str, Any],
            json.loads(
                response_text,
                parse_float=Decimal,
                parse_int=Decimal,
            ),
        )

    @staticmethod
    def _decimal_unaware_encode_payload(payload: dict[str, Any] | None) -> str | None:
        # This is the JSON encoder in absence of the workaround to treat Decimals
        if payload is not None:
            return json.dumps(
                payload,
                allow_nan=False,
                separators=(",", ":"),
                ensure_ascii=False,
            )
        else:
            return None

    @staticmethod
    def _decimal_aware_encode_payload(payload: dict[str, Any] | None) -> str | None:
        if payload is not None:
            if CHECK_DECIMAL_ESCAPING_CONSISTENCY:
                # check if escaping collision. This is expensive and 99.9999999% useless
                _naive_dump = json.dumps(
                    payload,
                    allow_nan=False,
                    separators=(",", ":"),
                    ensure_ascii=False,
                    cls=_MarkedDecimalDefuser,
                )
                if _MarkedDecimalEncoder._check_mark_match(_naive_dump):
                    raise ValueError(
                        "The pattern to work around Decimals was detected in a "
                        "user-provided item. This payload cannot be JSON-encoded."
                    )
            dec_marked_dump = json.dumps(
                payload,
                allow_nan=False,
                separators=(",", ":"),
                ensure_ascii=False,
                cls=_MarkedDecimalEncoder,
            )
            return _MarkedDecimalEncoder._clean_encoded_string(dec_marked_dump)
        else:
            return None

    def raw_request(
        self,
        *,
        http_method: str = HttpMethod.POST,
        payload: dict[str, Any] | None = None,
        additional_path: str | None = None,
        request_params: dict[str, Any] = {},
        raise_api_errors: bool = True,
        timeout_context: _TimeoutContext | None = None,
    ) -> httpx.Response:
        request_url = self._compose_request_url(additional_path)
        _timeout_context = timeout_context or _TimeoutContext(request_ms=None)
        encoded_payload = (
            self._decimal_aware_encode_payload(payload)
            if self.handle_decimals_writes
            else self._decimal_unaware_encode_payload(payload)
        )
        log_httpx_request(
            http_method=http_method,
            full_url=request_url,
            request_params=request_params,
            redacted_request_headers=self._loggable_headers,
            encoded_payload=encoded_payload,
            timeout_context=_timeout_context,
        )
        httpx_timeout_s = to_httpx_timeout(_timeout_context)

        try:
            raw_response = self.client.request(
                method=http_method,
                url=request_url,
                content=encoded_payload.encode()
                if encoded_payload is not None
                else None,
                params=request_params,
                timeout=httpx_timeout_s,
                headers=self.full_headers,
            )
        except httpx.TimeoutException as timeout_exc:
            if self.dev_ops_api:
                raise to_devopsapi_timeout_exception(
                    timeout_exc, timeout_context=_timeout_context
                )
            else:
                raise to_dataapi_timeout_exception(
                    timeout_exc, timeout_context=_timeout_context
                )

        try:
            raw_response.raise_for_status()
        except httpx.HTTPStatusError as http_exc:
            raise self._http_exc_class.from_httpx_error(http_exc)
        log_httpx_response(response=raw_response)
        return raw_response

    async def async_raw_request(
        self,
        *,
        http_method: str = HttpMethod.POST,
        payload: dict[str, Any] | None = None,
        additional_path: str | None = None,
        request_params: dict[str, Any] = {},
        raise_api_errors: bool = True,
        timeout_context: _TimeoutContext | None = None,
    ) -> httpx.Response:
        request_url = self._compose_request_url(additional_path)
        _timeout_context = timeout_context or _TimeoutContext(request_ms=None)
        encoded_payload = (
            self._decimal_aware_encode_payload(payload)
            if self.handle_decimals_writes
            else self._decimal_unaware_encode_payload(payload)
        )
        log_httpx_request(
            http_method=http_method,
            full_url=request_url,
            request_params=request_params,
            redacted_request_headers=self._loggable_headers,
            encoded_payload=encoded_payload,
            timeout_context=_timeout_context,
        )
        httpx_timeout_s = to_httpx_timeout(_timeout_context)

        try:
            raw_response = await self.async_client.request(
                method=http_method,
                url=request_url,
                content=encoded_payload.encode()
                if encoded_payload is not None
                else None,
                params=request_params,
                timeout=httpx_timeout_s,
                headers=self.full_headers,
            )
        except httpx.TimeoutException as timeout_exc:
            if self.dev_ops_api:
                raise to_devopsapi_timeout_exception(
                    timeout_exc, timeout_context=_timeout_context
                )
            else:
                raise to_dataapi_timeout_exception(
                    timeout_exc, timeout_context=_timeout_context
                )

        try:
            raw_response.raise_for_status()
        except httpx.HTTPStatusError as http_exc:
            raise self._http_exc_class.from_httpx_error(http_exc)
        log_httpx_response(response=raw_response)
        return raw_response

    def request(
        self,
        *,
        http_method: str = HttpMethod.POST,
        payload: dict[str, Any] | None = None,
        additional_path: str | None = None,
        request_params: dict[str, Any] = {},
        raise_api_errors: bool = True,
        timeout_context: _TimeoutContext | None = None,
    ) -> dict[str, Any]:
        raw_response = self.raw_request(
            http_method=http_method,
            payload=payload,
            additional_path=additional_path,
            request_params=request_params,
            raise_api_errors=raise_api_errors,
            timeout_context=timeout_context,
        )
        return self._raw_response_to_json(
            raw_response, raise_api_errors=raise_api_errors, payload=payload
        )

    async def async_request(
        self,
        *,
        http_method: str = HttpMethod.POST,
        payload: dict[str, Any] | None = None,
        additional_path: str | None = None,
        request_params: dict[str, Any] = {},
        raise_api_errors: bool = True,
        timeout_context: _TimeoutContext | None = None,
    ) -> dict[str, Any]:
        raw_response = await self.async_raw_request(
            http_method=http_method,
            payload=payload,
            additional_path=additional_path,
            request_params=request_params,
            raise_api_errors=raise_api_errors,
            timeout_context=timeout_context,
        )
        return self._raw_response_to_json(
            raw_response, raise_api_errors=raise_api_errors, payload=payload
        )

Classes

class APICommander (*, api_endpoint: str, path: str, headers: dict[str, str | None] = {}, callers: Sequence[CallerType] = [], redacted_header_names: Iterable[str] | None = None, dev_ops_api: bool = False, handle_decimals_writes: bool = False, handle_decimals_reads: bool = False)
Expand source code
class APICommander:
    client = httpx.Client()

    def __init__(
        self,
        *,
        api_endpoint: str,
        path: str,
        headers: dict[str, str | None] = {},
        callers: Sequence[CallerType] = [],
        redacted_header_names: Iterable[str] | None = None,
        dev_ops_api: bool = False,
        handle_decimals_writes: bool = False,
        handle_decimals_reads: bool = False,
    ) -> None:
        self.async_client = httpx.AsyncClient()
        self.api_endpoint = api_endpoint.rstrip("/")
        self.path = path.lstrip("/")
        self.headers = headers
        self.callers = callers
        self.redacted_header_names = set(redacted_header_names or [])
        self.upper_full_redacted_header_names = {
            header_name.upper()
            for header_name in (
                self.redacted_header_names | DEFAULT_REDACTED_HEADER_NAMES
            )
        }
        self.dev_ops_api = dev_ops_api
        self.handle_decimals_writes = handle_decimals_writes
        self.handle_decimals_reads = handle_decimals_reads

        self._faulty_response_exc_class: (
            type[UnexpectedDevOpsAPIResponseException]
            | type[UnexpectedDataAPIResponseException]
        )
        self._response_exc_class: (
            type[DevOpsAPIResponseException] | type[DataAPIResponseException]
        )
        self._http_exc_class: type[DataAPIHttpException] | type[DevOpsAPIHttpException]
        if self.dev_ops_api:
            self._faulty_response_exc_class = UnexpectedDevOpsAPIResponseException
            self._response_exc_class = DevOpsAPIResponseException
            self._http_exc_class = DevOpsAPIHttpException
        else:
            self._faulty_response_exc_class = UnexpectedDataAPIResponseException
            self._response_exc_class = DataAPIResponseException
            self._http_exc_class = DataAPIHttpException
        self._api_description = "DevOps API" if self.dev_ops_api else "Data API"

        full_user_agent_string = compose_full_user_agent(
            list(self.callers) + [user_agent_astrapy]
        )
        self.caller_header: dict[str, str] = (
            {"User-Agent": full_user_agent_string} if full_user_agent_string else {}
        )
        self.full_headers: dict[str, str] = {
            k: v
            for k, v in {
                **{
                    "Content-Type": "application/json",
                    "Accept": "application/json",
                },
                **self.caller_header,
                **self.headers,
            }.items()
            if v is not None
        }
        self._loggable_headers = {
            k: v
            if k.upper() not in self.upper_full_redacted_header_names
            else FIXED_SECRET_PLACEHOLDER
            for k, v in self.full_headers.items()
        }
        self.full_path = ("/".join([self.api_endpoint, self.path])).rstrip("/")

    def __repr__(self) -> str:
        pieces = [
            pc
            for pc in (
                f"api_endpoint={self.api_endpoint}",
                f"path={self.path}",
                f"callers={self.callers}",
                f"dev_ops_api={self.dev_ops_api}",
            )
            if pc is not None
        ]
        inner_desc = ", ".join(pieces)
        return f"{self.__class__.__name__}({inner_desc})"

    def __eq__(self, other: Any) -> bool:
        if isinstance(other, APICommander):
            return all(
                [
                    self.api_endpoint == other.api_endpoint,
                    self.path == other.path,
                    self.headers == other.headers,
                    self.callers == other.callers,
                    self.redacted_header_names == other.redacted_header_names,
                    self.dev_ops_api == other.dev_ops_api,
                ]
            )
        else:
            return False

    async def __aenter__(self) -> APICommander:
        return self

    async def __aexit__(
        self,
        exc_type: type[BaseException] | None = None,
        exc_value: BaseException | None = None,
        traceback: TracebackType | None = None,
    ) -> None:
        await self.async_client.aclose()

    def _copy(
        self,
        api_endpoint: str | None = None,
        path: str | None = None,
        headers: dict[str, str | None] | None = None,
        callers: Sequence[CallerType] | None = None,
        redacted_header_names: list[str] | None = None,
        dev_ops_api: bool | None = None,
    ) -> APICommander:
        # some care in allowing e.g. {} to override (but not None):
        return APICommander(
            api_endpoint=(
                api_endpoint if api_endpoint is not None else self.api_endpoint
            ),
            path=path if path is not None else self.path,
            headers=headers if headers is not None else self.headers,
            callers=callers if callers is not None else self.callers,
            redacted_header_names=(
                redacted_header_names
                if redacted_header_names is not None
                else self.redacted_header_names
            ),
            dev_ops_api=dev_ops_api if dev_ops_api is not None else self.dev_ops_api,
        )

    def _compose_request_url(self, additional_path: str | None) -> str:
        if additional_path:
            return "/".join([self.full_path.rstrip("/"), additional_path.lstrip("/")])
        else:
            return self.full_path

    def _raw_response_to_json(
        self,
        raw_response: httpx.Response,
        raise_api_errors: bool,
        payload: dict[str, Any] | None,
    ) -> dict[str, Any]:
        # try to process the httpx raw response into a JSON or throw a failure
        raw_response_json: dict[str, Any]
        try:
            if self.handle_decimals_reads:
                # for decimal-aware contents (aka 'tables'), all number-looking things
                # are made into Decimal.
                # (for collections, this will be it. for Tables, schema-aware
                # proper post-processing will refine types, e.g. back to int, ...)
                raw_response_json = self._decimal_aware_parse_json_response(
                    raw_response.text,
                )
            else:
                raw_response_json = self._decimal_unaware_parse_json_response(
                    raw_response.text,
                )
        except ValueError:
            # json() parsing has failed (e.g., empty body)
            if payload is not None:
                command_desc = "/".join(sorted(payload.keys()))
            else:
                command_desc = "(none)"
            raise self._faulty_response_exc_class(
                text=f"Unparseable response from API '{command_desc}' command.",
                raw_response={
                    "raw_response": raw_response.text,
                },
            )

        if raise_api_errors and "errors" in raw_response_json:
            logger.warning(
                f"APICommander about to raise from: {raw_response_json['errors']}"
            )
            raise self._response_exc_class.from_response(
                command=payload,
                raw_response=raw_response_json,
            )

        # no warnings check for DevOps API (there, 'status' may contain a string)
        if not self.dev_ops_api:
            warning_messages: list[str] = (raw_response_json.get("status") or {}).get(
                "warnings"
            ) or []
            if warning_messages:
                for warning_message in warning_messages:
                    full_warning = f"The {self._api_description} returned a warning: {warning_message}"
                    logger.warning(full_warning)

        return raw_response_json

    @staticmethod
    def _decimal_unaware_parse_json_response(response_text: str) -> dict[str, Any]:
        return cast(
            dict[str, Any],
            json.loads(response_text),
        )

    @staticmethod
    def _decimal_aware_parse_json_response(response_text: str) -> dict[str, Any]:
        return cast(
            dict[str, Any],
            json.loads(
                response_text,
                parse_float=Decimal,
                parse_int=Decimal,
            ),
        )

    @staticmethod
    def _decimal_unaware_encode_payload(payload: dict[str, Any] | None) -> str | None:
        # This is the JSON encoder in absence of the workaround to treat Decimals
        if payload is not None:
            return json.dumps(
                payload,
                allow_nan=False,
                separators=(",", ":"),
                ensure_ascii=False,
            )
        else:
            return None

    @staticmethod
    def _decimal_aware_encode_payload(payload: dict[str, Any] | None) -> str | None:
        if payload is not None:
            if CHECK_DECIMAL_ESCAPING_CONSISTENCY:
                # check if escaping collision. This is expensive and 99.9999999% useless
                _naive_dump = json.dumps(
                    payload,
                    allow_nan=False,
                    separators=(",", ":"),
                    ensure_ascii=False,
                    cls=_MarkedDecimalDefuser,
                )
                if _MarkedDecimalEncoder._check_mark_match(_naive_dump):
                    raise ValueError(
                        "The pattern to work around Decimals was detected in a "
                        "user-provided item. This payload cannot be JSON-encoded."
                    )
            dec_marked_dump = json.dumps(
                payload,
                allow_nan=False,
                separators=(",", ":"),
                ensure_ascii=False,
                cls=_MarkedDecimalEncoder,
            )
            return _MarkedDecimalEncoder._clean_encoded_string(dec_marked_dump)
        else:
            return None

    def raw_request(
        self,
        *,
        http_method: str = HttpMethod.POST,
        payload: dict[str, Any] | None = None,
        additional_path: str | None = None,
        request_params: dict[str, Any] = {},
        raise_api_errors: bool = True,
        timeout_context: _TimeoutContext | None = None,
    ) -> httpx.Response:
        request_url = self._compose_request_url(additional_path)
        _timeout_context = timeout_context or _TimeoutContext(request_ms=None)
        encoded_payload = (
            self._decimal_aware_encode_payload(payload)
            if self.handle_decimals_writes
            else self._decimal_unaware_encode_payload(payload)
        )
        log_httpx_request(
            http_method=http_method,
            full_url=request_url,
            request_params=request_params,
            redacted_request_headers=self._loggable_headers,
            encoded_payload=encoded_payload,
            timeout_context=_timeout_context,
        )
        httpx_timeout_s = to_httpx_timeout(_timeout_context)

        try:
            raw_response = self.client.request(
                method=http_method,
                url=request_url,
                content=encoded_payload.encode()
                if encoded_payload is not None
                else None,
                params=request_params,
                timeout=httpx_timeout_s,
                headers=self.full_headers,
            )
        except httpx.TimeoutException as timeout_exc:
            if self.dev_ops_api:
                raise to_devopsapi_timeout_exception(
                    timeout_exc, timeout_context=_timeout_context
                )
            else:
                raise to_dataapi_timeout_exception(
                    timeout_exc, timeout_context=_timeout_context
                )

        try:
            raw_response.raise_for_status()
        except httpx.HTTPStatusError as http_exc:
            raise self._http_exc_class.from_httpx_error(http_exc)
        log_httpx_response(response=raw_response)
        return raw_response

    async def async_raw_request(
        self,
        *,
        http_method: str = HttpMethod.POST,
        payload: dict[str, Any] | None = None,
        additional_path: str | None = None,
        request_params: dict[str, Any] = {},
        raise_api_errors: bool = True,
        timeout_context: _TimeoutContext | None = None,
    ) -> httpx.Response:
        request_url = self._compose_request_url(additional_path)
        _timeout_context = timeout_context or _TimeoutContext(request_ms=None)
        encoded_payload = (
            self._decimal_aware_encode_payload(payload)
            if self.handle_decimals_writes
            else self._decimal_unaware_encode_payload(payload)
        )
        log_httpx_request(
            http_method=http_method,
            full_url=request_url,
            request_params=request_params,
            redacted_request_headers=self._loggable_headers,
            encoded_payload=encoded_payload,
            timeout_context=_timeout_context,
        )
        httpx_timeout_s = to_httpx_timeout(_timeout_context)

        try:
            raw_response = await self.async_client.request(
                method=http_method,
                url=request_url,
                content=encoded_payload.encode()
                if encoded_payload is not None
                else None,
                params=request_params,
                timeout=httpx_timeout_s,
                headers=self.full_headers,
            )
        except httpx.TimeoutException as timeout_exc:
            if self.dev_ops_api:
                raise to_devopsapi_timeout_exception(
                    timeout_exc, timeout_context=_timeout_context
                )
            else:
                raise to_dataapi_timeout_exception(
                    timeout_exc, timeout_context=_timeout_context
                )

        try:
            raw_response.raise_for_status()
        except httpx.HTTPStatusError as http_exc:
            raise self._http_exc_class.from_httpx_error(http_exc)
        log_httpx_response(response=raw_response)
        return raw_response

    def request(
        self,
        *,
        http_method: str = HttpMethod.POST,
        payload: dict[str, Any] | None = None,
        additional_path: str | None = None,
        request_params: dict[str, Any] = {},
        raise_api_errors: bool = True,
        timeout_context: _TimeoutContext | None = None,
    ) -> dict[str, Any]:
        raw_response = self.raw_request(
            http_method=http_method,
            payload=payload,
            additional_path=additional_path,
            request_params=request_params,
            raise_api_errors=raise_api_errors,
            timeout_context=timeout_context,
        )
        return self._raw_response_to_json(
            raw_response, raise_api_errors=raise_api_errors, payload=payload
        )

    async def async_request(
        self,
        *,
        http_method: str = HttpMethod.POST,
        payload: dict[str, Any] | None = None,
        additional_path: str | None = None,
        request_params: dict[str, Any] = {},
        raise_api_errors: bool = True,
        timeout_context: _TimeoutContext | None = None,
    ) -> dict[str, Any]:
        raw_response = await self.async_raw_request(
            http_method=http_method,
            payload=payload,
            additional_path=additional_path,
            request_params=request_params,
            raise_api_errors=raise_api_errors,
            timeout_context=timeout_context,
        )
        return self._raw_response_to_json(
            raw_response, raise_api_errors=raise_api_errors, payload=payload
        )

Class variables

var client

Methods

async def async_raw_request(self, *, http_method: str = 'POST', payload: dict[str, Any] | None = None, additional_path: str | None = None, request_params: dict[str, Any] = {}, raise_api_errors: bool = True, timeout_context: _TimeoutContext | None = None) ‑> httpx.Response
Expand source code
async def async_raw_request(
    self,
    *,
    http_method: str = HttpMethod.POST,
    payload: dict[str, Any] | None = None,
    additional_path: str | None = None,
    request_params: dict[str, Any] = {},
    raise_api_errors: bool = True,
    timeout_context: _TimeoutContext | None = None,
) -> httpx.Response:
    request_url = self._compose_request_url(additional_path)
    _timeout_context = timeout_context or _TimeoutContext(request_ms=None)
    encoded_payload = (
        self._decimal_aware_encode_payload(payload)
        if self.handle_decimals_writes
        else self._decimal_unaware_encode_payload(payload)
    )
    log_httpx_request(
        http_method=http_method,
        full_url=request_url,
        request_params=request_params,
        redacted_request_headers=self._loggable_headers,
        encoded_payload=encoded_payload,
        timeout_context=_timeout_context,
    )
    httpx_timeout_s = to_httpx_timeout(_timeout_context)

    try:
        raw_response = await self.async_client.request(
            method=http_method,
            url=request_url,
            content=encoded_payload.encode()
            if encoded_payload is not None
            else None,
            params=request_params,
            timeout=httpx_timeout_s,
            headers=self.full_headers,
        )
    except httpx.TimeoutException as timeout_exc:
        if self.dev_ops_api:
            raise to_devopsapi_timeout_exception(
                timeout_exc, timeout_context=_timeout_context
            )
        else:
            raise to_dataapi_timeout_exception(
                timeout_exc, timeout_context=_timeout_context
            )

    try:
        raw_response.raise_for_status()
    except httpx.HTTPStatusError as http_exc:
        raise self._http_exc_class.from_httpx_error(http_exc)
    log_httpx_response(response=raw_response)
    return raw_response
async def async_request(self, *, http_method: str = 'POST', payload: dict[str, Any] | None = None, additional_path: str | None = None, request_params: dict[str, Any] = {}, raise_api_errors: bool = True, timeout_context: _TimeoutContext | None = None) ‑> dict[str, typing.Any]
Expand source code
async def async_request(
    self,
    *,
    http_method: str = HttpMethod.POST,
    payload: dict[str, Any] | None = None,
    additional_path: str | None = None,
    request_params: dict[str, Any] = {},
    raise_api_errors: bool = True,
    timeout_context: _TimeoutContext | None = None,
) -> dict[str, Any]:
    raw_response = await self.async_raw_request(
        http_method=http_method,
        payload=payload,
        additional_path=additional_path,
        request_params=request_params,
        raise_api_errors=raise_api_errors,
        timeout_context=timeout_context,
    )
    return self._raw_response_to_json(
        raw_response, raise_api_errors=raise_api_errors, payload=payload
    )
def raw_request(self, *, http_method: str = 'POST', payload: dict[str, Any] | None = None, additional_path: str | None = None, request_params: dict[str, Any] = {}, raise_api_errors: bool = True, timeout_context: _TimeoutContext | None = None) ‑> httpx.Response
Expand source code
def raw_request(
    self,
    *,
    http_method: str = HttpMethod.POST,
    payload: dict[str, Any] | None = None,
    additional_path: str | None = None,
    request_params: dict[str, Any] = {},
    raise_api_errors: bool = True,
    timeout_context: _TimeoutContext | None = None,
) -> httpx.Response:
    request_url = self._compose_request_url(additional_path)
    _timeout_context = timeout_context or _TimeoutContext(request_ms=None)
    encoded_payload = (
        self._decimal_aware_encode_payload(payload)
        if self.handle_decimals_writes
        else self._decimal_unaware_encode_payload(payload)
    )
    log_httpx_request(
        http_method=http_method,
        full_url=request_url,
        request_params=request_params,
        redacted_request_headers=self._loggable_headers,
        encoded_payload=encoded_payload,
        timeout_context=_timeout_context,
    )
    httpx_timeout_s = to_httpx_timeout(_timeout_context)

    try:
        raw_response = self.client.request(
            method=http_method,
            url=request_url,
            content=encoded_payload.encode()
            if encoded_payload is not None
            else None,
            params=request_params,
            timeout=httpx_timeout_s,
            headers=self.full_headers,
        )
    except httpx.TimeoutException as timeout_exc:
        if self.dev_ops_api:
            raise to_devopsapi_timeout_exception(
                timeout_exc, timeout_context=_timeout_context
            )
        else:
            raise to_dataapi_timeout_exception(
                timeout_exc, timeout_context=_timeout_context
            )

    try:
        raw_response.raise_for_status()
    except httpx.HTTPStatusError as http_exc:
        raise self._http_exc_class.from_httpx_error(http_exc)
    log_httpx_response(response=raw_response)
    return raw_response
def request(self, *, http_method: str = 'POST', payload: dict[str, Any] | None = None, additional_path: str | None = None, request_params: dict[str, Any] = {}, raise_api_errors: bool = True, timeout_context: _TimeoutContext | None = None) ‑> dict[str, typing.Any]
Expand source code
def request(
    self,
    *,
    http_method: str = HttpMethod.POST,
    payload: dict[str, Any] | None = None,
    additional_path: str | None = None,
    request_params: dict[str, Any] = {},
    raise_api_errors: bool = True,
    timeout_context: _TimeoutContext | None = None,
) -> dict[str, Any]:
    raw_response = self.raw_request(
        http_method=http_method,
        payload=payload,
        additional_path=additional_path,
        request_params=request_params,
        raise_api_errors=raise_api_errors,
        timeout_context=timeout_context,
    )
    return self._raw_response_to_json(
        raw_response, raise_api_errors=raise_api_errors, payload=payload
    )