Module astrapy.utils.api_commander
Classes
class APICommander (*,
api_endpoint: str,
path: str,
spawner: object | None,
headers: dict[str, str | None] = {},
callers: Sequence[CallerType] = [],
redacted_header_names: Iterable[str] | None = None,
dev_ops_api: bool = False,
event_observers: dict[str, Observer | None] = {},
handle_decimals_writes: bool = False,
handle_decimals_reads: bool = False)-
Expand source code
class APICommander: client: httpx.Client async_client: httpx.AsyncClient def __init__( self, *, api_endpoint: str, path: str, spawner: object | None, headers: dict[str, str | None] = {}, callers: Sequence[CallerType] = [], redacted_header_names: Iterable[str] | None = None, dev_ops_api: bool = False, event_observers: dict[str, Observer | None] = {}, handle_decimals_writes: bool = False, handle_decimals_reads: bool = False, ) -> None: ssl_control_headers: dict[str, str | None] if disable_ssl_reuse: self.client = httpx.Client( limits=no_pooling_limits, verify=CLIENT_SSL_CONTEXT, ) self.async_client = httpx.AsyncClient( limits=no_pooling_limits, verify=CLIENT_SSL_CONTEXT, ) ssl_control_headers = {"Connection": "close"} else: self.client = httpx.Client(verify=CLIENT_SSL_CONTEXT) self.async_client = httpx.AsyncClient(verify=CLIENT_SSL_CONTEXT) ssl_control_headers = {} self.api_endpoint = api_endpoint.rstrip("/") self.path = path.lstrip("/") self.headers: dict[str, str | None] = {**ssl_control_headers, **headers} self.callers = callers self.redacted_header_names = set(redacted_header_names or []) self.upper_full_redacted_header_names = { header_name.upper() for header_name in ( self.redacted_header_names | DEFAULT_REDACTED_HEADER_NAMES ) } self.dev_ops_api = dev_ops_api self.event_observers = event_observers self.spawner_ref: weakref.ReferenceType[object] | None if spawner: self.spawner_ref = weakref.ref(spawner) else: self.spawner_ref = None self.handle_decimals_writes = handle_decimals_writes self.handle_decimals_reads = handle_decimals_reads self._faulty_response_exc_class: ( type[UnexpectedDevOpsAPIResponseException] | type[UnexpectedDataAPIResponseException] ) self._response_exc_class: ( type[DevOpsAPIResponseException] | type[DataAPIResponseException] ) self._http_exc_class: type[DataAPIHttpException] | type[DevOpsAPIHttpException] if self.dev_ops_api: self._faulty_response_exc_class = UnexpectedDevOpsAPIResponseException self._response_exc_class = DevOpsAPIResponseException self._http_exc_class = DevOpsAPIHttpException else: self._faulty_response_exc_class = UnexpectedDataAPIResponseException self._response_exc_class = DataAPIResponseException self._http_exc_class = DataAPIHttpException self._api_description = "DevOps API" if self.dev_ops_api else "Data API" full_user_agent_string = compose_full_user_agent( list(self.callers) + [user_agent_astrapy] ) self.caller_header: dict[str, str] = ( {"User-Agent": full_user_agent_string} if full_user_agent_string else {} ) self.full_headers: dict[str, str] = { k: v for k, v in { **{ "Content-Type": "application/json", "Accept": "application/json", }, **self.caller_header, **self.headers, }.items() if v is not None } self._loggable_headers = { k: v if k.upper() not in self.upper_full_redacted_header_names else FIXED_SECRET_PLACEHOLDER for k, v in self.full_headers.items() } self.full_path = ("/".join([self.api_endpoint, self.path])).rstrip("/") def __repr__(self) -> str: pieces = [ f"api_endpoint={self.api_endpoint}", f"path={self.path}", f"callers={self.callers}", f"dev_ops_api={self.dev_ops_api}", ] inner_desc = ", ".join(pieces) return f"{self.__class__.__name__}({inner_desc})" def __eq__(self, other: Any) -> bool: if isinstance(other, APICommander): return all( [ self.api_endpoint == other.api_endpoint, self.path == other.path, self.headers == other.headers, self.callers == other.callers, self.redacted_header_names == other.redacted_header_names, self.dev_ops_api == other.dev_ops_api, ] ) else: return False async def __aenter__(self) -> APICommander: return self async def __aexit__( self, exc_type: type[BaseException] | None = None, exc_value: BaseException | None = None, traceback: TracebackType | None = None, ) -> None: await self.async_client.aclose() def _get_spawner(self) -> object | None: if self.spawner_ref is None: return None return self.spawner_ref() def _copy( self, api_endpoint: str | None = None, path: str | None = None, headers: dict[str, str | None] | None = None, callers: Sequence[CallerType] | None = None, redacted_header_names: list[str] | None = None, dev_ops_api: bool | None = None, ) -> APICommander: # some care in allowing e.g. {} to override (but not None): return APICommander( api_endpoint=( api_endpoint if api_endpoint is not None else self.api_endpoint ), path=path if path is not None else self.path, spawner=self._get_spawner(), headers=headers if headers is not None else self.headers, callers=callers if callers is not None else self.callers, redacted_header_names=( redacted_header_names if redacted_header_names is not None else self.redacted_header_names ), dev_ops_api=dev_ops_api if dev_ops_api is not None else self.dev_ops_api, ) def _compose_request_url(self, additional_path: str | None) -> str: if additional_path: return "/".join([self.full_path.rstrip("/"), additional_path.lstrip("/")]) else: return self.full_path def _raw_response_to_json( self, raw_response: httpx.Response, raise_api_errors: bool, payload: dict[str, Any] | None, caller_function_name: str | None, request_id: str, ) -> dict[str, Any]: # try to process the httpx raw response into a JSON or throw a failure raw_response_json: dict[str, Any] try: if self.handle_decimals_reads: # for decimal-aware contents (aka 'tables'), all number-looking things # are made into Decimal. # (for collections, this will be it. for Tables, schema-aware # proper post-processing will refine types, e.g. back to int, ...) raw_response_json = self._decimal_aware_parse_json_response( raw_response.text, ) else: raw_response_json = self._decimal_unaware_parse_json_response( raw_response.text, ) except ValueError: # json() parsing has failed (e.g., empty body) if payload is not None: command_desc = "/".join(sorted(payload.keys())) else: command_desc = "(none)" raise self._faulty_response_exc_class( text=f"Unparseable response from API '{command_desc}' command.", raw_response={ "raw_response": raw_response.text, }, ) # no warnings check for DevOps API (there, 'status' may contain a string) dictforced_response: dict[str, Any] = ( raw_response_json if isinstance(raw_response_json, dict) else {} ) if not self.dev_ops_api: warning_items: list[str | dict[str, Any]] = ( dictforced_response.get("status") or {} ).get("warnings") or [] if warning_items: warning_descriptors = [ DataAPIWarningDescriptor(warning_item) for warning_item in warning_items ] if self.event_observers: wrn_events = [ ObservableWarning(warning=warning_descriptor) for warning_descriptor in warning_descriptors ] sender = self._get_spawner() for ev_obs in self.event_observers.values(): if ev_obs is not None and ev_obs.enabled: for wrn_event in wrn_events: ev_obs.receive( wrn_event, sender=sender, function_name=caller_function_name, request_id=request_id, ) for warning_descriptor in warning_descriptors: full_warning = ( f"The {self._api_description} returned " f"a warning: {warning_descriptor}" ) logger.warning(full_warning) if "errors" in dictforced_response: if self.event_observers: err_events = [ ObservableError(error=DataAPIErrorDescriptor(err_dict)) for err_dict in dictforced_response["errors"] ] sender = self._get_spawner() for ev_obs in self.event_observers.values(): if ev_obs is not None and ev_obs.enabled: for err_event in err_events: ev_obs.receive( err_event, sender=sender, function_name=caller_function_name, request_id=request_id, ) if raise_api_errors: logger.warning( f"APICommander about to raise from: {dictforced_response['errors']}" ) raise self._response_exc_class.from_response( command=payload, raw_response=dictforced_response, ) return raw_response_json @staticmethod def _decimal_unaware_parse_json_response(response_text: str) -> dict[str, Any]: return cast( dict[str, Any], json.loads(response_text), ) @staticmethod def _decimal_aware_parse_json_response(response_text: str) -> dict[str, Any]: return cast( dict[str, Any], json.loads( response_text, parse_float=Decimal, parse_int=Decimal, ), ) @staticmethod def _decimal_unaware_encode_payload(payload: dict[str, Any] | None) -> str | None: # This is the JSON encoder in absence of the workaround to treat Decimals if payload is not None: return json.dumps( payload, allow_nan=False, separators=(",", ":"), ensure_ascii=False, ) else: return None @staticmethod def _decimal_aware_encode_payload(payload: dict[str, Any] | None) -> str | None: if payload is not None: if CHECK_DECIMAL_ESCAPING_CONSISTENCY: # check if escaping collision. This is expensive and 99.9999999% useless _naive_dump = json.dumps( payload, allow_nan=False, separators=(",", ":"), ensure_ascii=False, cls=_MarkedDecimalDefuser, ) if _MarkedDecimalEncoder._check_mark_match(_naive_dump): raise ValueError( "The pattern to work around Decimals was detected in a " "user-provided item. This payload cannot be JSON-encoded." ) dec_marked_dump = json.dumps( payload, allow_nan=False, separators=(",", ":"), ensure_ascii=False, cls=_MarkedDecimalEncoder, ) return _MarkedDecimalEncoder._clean_encoded_string(dec_marked_dump) else: return None def raw_request( self, *, caller_function_name: str | None, request_id: str | None, http_method: str = HttpMethod.POST, payload: dict[str, Any] | None = None, additional_path: str | None = None, request_params: dict[str, Any] = {}, timeout_context: _TimeoutContext | None = None, ) -> httpx.Response: if request_id is None: request_id = str(uuid7()) request_url = self._compose_request_url(additional_path) _timeout_context = timeout_context or _TimeoutContext(request_ms=None) encoded_payload = ( self._decimal_aware_encode_payload(payload) if self.handle_decimals_writes else self._decimal_unaware_encode_payload(payload) ) log_httpx_request( http_method=http_method, full_url=request_url, request_params=request_params, redacted_request_headers=self._loggable_headers, encoded_payload=encoded_payload, timeout_context=_timeout_context, caller_function_name=caller_function_name, ) httpx_timeout_s = to_httpx_timeout(_timeout_context) if self.event_observers: req_event = ObservableRequest( payload=encoded_payload, http_method=http_method, url=request_url, query_parameters=request_params, redacted_headers=self._loggable_headers, dev_ops_api=self.dev_ops_api, ) sender = self._get_spawner() for ev_obs in self.event_observers.values(): if ev_obs is not None and ev_obs.enabled: ev_obs.receive( req_event, sender=sender, function_name=caller_function_name, request_id=request_id, ) try: raw_response = self.client.request( method=http_method, url=request_url, content=encoded_payload.encode() if encoded_payload is not None else None, params=request_params, timeout=httpx_timeout_s, headers=self.full_headers, ) except httpx.TimeoutException as timeout_exc: if self.dev_ops_api: raise to_devopsapi_timeout_exception( timeout_exc, timeout_context=_timeout_context ) else: raise to_dataapi_timeout_exception( timeout_exc, timeout_context=_timeout_context ) if self.event_observers: rsp_event = ObservableResponse( body=raw_response.text, status_code=raw_response.status_code, ) for ev_obs in self.event_observers.values(): if ev_obs is not None and ev_obs.enabled: ev_obs.receive( rsp_event, sender=self._get_spawner(), function_name=caller_function_name, request_id=request_id, ) try: raw_response.raise_for_status() except httpx.HTTPStatusError as http_exc: raise self._http_exc_class.from_httpx_error(http_exc) log_httpx_response(response=raw_response) return raw_response async def async_raw_request( self, *, caller_function_name: str | None, request_id: str | None, http_method: str = HttpMethod.POST, payload: dict[str, Any] | None = None, additional_path: str | None = None, request_params: dict[str, Any] = {}, timeout_context: _TimeoutContext | None = None, ) -> httpx.Response: if request_id is None: request_id = str(uuid7()) request_url = self._compose_request_url(additional_path) _timeout_context = timeout_context or _TimeoutContext(request_ms=None) encoded_payload = ( self._decimal_aware_encode_payload(payload) if self.handle_decimals_writes else self._decimal_unaware_encode_payload(payload) ) log_httpx_request( http_method=http_method, full_url=request_url, request_params=request_params, redacted_request_headers=self._loggable_headers, encoded_payload=encoded_payload, timeout_context=_timeout_context, caller_function_name=caller_function_name, ) httpx_timeout_s = to_httpx_timeout(_timeout_context) if self.event_observers: req_event = ObservableRequest( payload=encoded_payload, http_method=http_method, url=request_url, query_parameters=request_params, redacted_headers=self._loggable_headers, dev_ops_api=self.dev_ops_api, ) sender = self._get_spawner() for ev_obs in self.event_observers.values(): if ev_obs is not None and ev_obs.enabled: ev_obs.receive( req_event, sender=sender, function_name=caller_function_name, request_id=request_id, ) try: raw_response = await self.async_client.request( method=http_method, url=request_url, content=encoded_payload.encode() if encoded_payload is not None else None, params=request_params, timeout=httpx_timeout_s, headers=self.full_headers, ) except httpx.TimeoutException as timeout_exc: if self.dev_ops_api: raise to_devopsapi_timeout_exception( timeout_exc, timeout_context=_timeout_context ) else: raise to_dataapi_timeout_exception( timeout_exc, timeout_context=_timeout_context ) if self.event_observers: rsp_event = ObservableResponse( body=raw_response.text, status_code=raw_response.status_code, ) sender = self._get_spawner() for ev_obs in self.event_observers.values(): if ev_obs is not None and ev_obs.enabled: ev_obs.receive( rsp_event, sender=sender, function_name=caller_function_name, request_id=request_id, ) try: raw_response.raise_for_status() except httpx.HTTPStatusError as http_exc: raise self._http_exc_class.from_httpx_error(http_exc) log_httpx_response(response=raw_response) return raw_response def request( self, *, http_method: str = HttpMethod.POST, payload: dict[str, Any] | None = None, additional_path: str | None = None, request_params: dict[str, Any] = {}, raise_api_errors: bool = True, timeout_context: _TimeoutContext | None = None, caller_function_name: str | None = None, ) -> dict[str, Any]: request_id = str(uuid7()) raw_response = self.raw_request( http_method=http_method, payload=payload, additional_path=additional_path, request_params=request_params, timeout_context=timeout_context, caller_function_name=caller_function_name, request_id=request_id, ) return self._raw_response_to_json( raw_response, raise_api_errors=raise_api_errors, payload=payload, caller_function_name=caller_function_name, request_id=request_id, ) async def async_request( self, *, http_method: str = HttpMethod.POST, payload: dict[str, Any] | None = None, additional_path: str | None = None, request_params: dict[str, Any] = {}, raise_api_errors: bool = True, timeout_context: _TimeoutContext | None = None, caller_function_name: str | None = None, ) -> dict[str, Any]: request_id = str(uuid7()) raw_response = await self.async_raw_request( http_method=http_method, payload=payload, additional_path=additional_path, request_params=request_params, timeout_context=timeout_context, caller_function_name=caller_function_name, request_id=request_id, ) return self._raw_response_to_json( raw_response, raise_api_errors=raise_api_errors, payload=payload, caller_function_name=caller_function_name, request_id=request_id, )Class variables
var async_client : httpx.AsyncClient-
The type of the None singleton.
var client : httpx.Client-
The type of the None singleton.
Methods
async def async_raw_request(self,
*,
caller_function_name: str | None,
request_id: str | None,
http_method: str = 'POST',
payload: dict[str, Any] | None = None,
additional_path: str | None = None,
request_params: dict[str, Any] = {},
timeout_context: _TimeoutContext | None = None) ‑> httpx.Response-
Expand source code
async def async_raw_request( self, *, caller_function_name: str | None, request_id: str | None, http_method: str = HttpMethod.POST, payload: dict[str, Any] | None = None, additional_path: str | None = None, request_params: dict[str, Any] = {}, timeout_context: _TimeoutContext | None = None, ) -> httpx.Response: if request_id is None: request_id = str(uuid7()) request_url = self._compose_request_url(additional_path) _timeout_context = timeout_context or _TimeoutContext(request_ms=None) encoded_payload = ( self._decimal_aware_encode_payload(payload) if self.handle_decimals_writes else self._decimal_unaware_encode_payload(payload) ) log_httpx_request( http_method=http_method, full_url=request_url, request_params=request_params, redacted_request_headers=self._loggable_headers, encoded_payload=encoded_payload, timeout_context=_timeout_context, caller_function_name=caller_function_name, ) httpx_timeout_s = to_httpx_timeout(_timeout_context) if self.event_observers: req_event = ObservableRequest( payload=encoded_payload, http_method=http_method, url=request_url, query_parameters=request_params, redacted_headers=self._loggable_headers, dev_ops_api=self.dev_ops_api, ) sender = self._get_spawner() for ev_obs in self.event_observers.values(): if ev_obs is not None and ev_obs.enabled: ev_obs.receive( req_event, sender=sender, function_name=caller_function_name, request_id=request_id, ) try: raw_response = await self.async_client.request( method=http_method, url=request_url, content=encoded_payload.encode() if encoded_payload is not None else None, params=request_params, timeout=httpx_timeout_s, headers=self.full_headers, ) except httpx.TimeoutException as timeout_exc: if self.dev_ops_api: raise to_devopsapi_timeout_exception( timeout_exc, timeout_context=_timeout_context ) else: raise to_dataapi_timeout_exception( timeout_exc, timeout_context=_timeout_context ) if self.event_observers: rsp_event = ObservableResponse( body=raw_response.text, status_code=raw_response.status_code, ) sender = self._get_spawner() for ev_obs in self.event_observers.values(): if ev_obs is not None and ev_obs.enabled: ev_obs.receive( rsp_event, sender=sender, function_name=caller_function_name, request_id=request_id, ) try: raw_response.raise_for_status() except httpx.HTTPStatusError as http_exc: raise self._http_exc_class.from_httpx_error(http_exc) log_httpx_response(response=raw_response) return raw_response async def async_request(self,
*,
http_method: str = 'POST',
payload: dict[str, Any] | None = None,
additional_path: str | None = None,
request_params: dict[str, Any] = {},
raise_api_errors: bool = True,
timeout_context: _TimeoutContext | None = None,
caller_function_name: str | None = None) ‑> dict[str, typing.Any]-
Expand source code
async def async_request( self, *, http_method: str = HttpMethod.POST, payload: dict[str, Any] | None = None, additional_path: str | None = None, request_params: dict[str, Any] = {}, raise_api_errors: bool = True, timeout_context: _TimeoutContext | None = None, caller_function_name: str | None = None, ) -> dict[str, Any]: request_id = str(uuid7()) raw_response = await self.async_raw_request( http_method=http_method, payload=payload, additional_path=additional_path, request_params=request_params, timeout_context=timeout_context, caller_function_name=caller_function_name, request_id=request_id, ) return self._raw_response_to_json( raw_response, raise_api_errors=raise_api_errors, payload=payload, caller_function_name=caller_function_name, request_id=request_id, ) def raw_request(self,
*,
caller_function_name: str | None,
request_id: str | None,
http_method: str = 'POST',
payload: dict[str, Any] | None = None,
additional_path: str | None = None,
request_params: dict[str, Any] = {},
timeout_context: _TimeoutContext | None = None) ‑> httpx.Response-
Expand source code
def raw_request( self, *, caller_function_name: str | None, request_id: str | None, http_method: str = HttpMethod.POST, payload: dict[str, Any] | None = None, additional_path: str | None = None, request_params: dict[str, Any] = {}, timeout_context: _TimeoutContext | None = None, ) -> httpx.Response: if request_id is None: request_id = str(uuid7()) request_url = self._compose_request_url(additional_path) _timeout_context = timeout_context or _TimeoutContext(request_ms=None) encoded_payload = ( self._decimal_aware_encode_payload(payload) if self.handle_decimals_writes else self._decimal_unaware_encode_payload(payload) ) log_httpx_request( http_method=http_method, full_url=request_url, request_params=request_params, redacted_request_headers=self._loggable_headers, encoded_payload=encoded_payload, timeout_context=_timeout_context, caller_function_name=caller_function_name, ) httpx_timeout_s = to_httpx_timeout(_timeout_context) if self.event_observers: req_event = ObservableRequest( payload=encoded_payload, http_method=http_method, url=request_url, query_parameters=request_params, redacted_headers=self._loggable_headers, dev_ops_api=self.dev_ops_api, ) sender = self._get_spawner() for ev_obs in self.event_observers.values(): if ev_obs is not None and ev_obs.enabled: ev_obs.receive( req_event, sender=sender, function_name=caller_function_name, request_id=request_id, ) try: raw_response = self.client.request( method=http_method, url=request_url, content=encoded_payload.encode() if encoded_payload is not None else None, params=request_params, timeout=httpx_timeout_s, headers=self.full_headers, ) except httpx.TimeoutException as timeout_exc: if self.dev_ops_api: raise to_devopsapi_timeout_exception( timeout_exc, timeout_context=_timeout_context ) else: raise to_dataapi_timeout_exception( timeout_exc, timeout_context=_timeout_context ) if self.event_observers: rsp_event = ObservableResponse( body=raw_response.text, status_code=raw_response.status_code, ) for ev_obs in self.event_observers.values(): if ev_obs is not None and ev_obs.enabled: ev_obs.receive( rsp_event, sender=self._get_spawner(), function_name=caller_function_name, request_id=request_id, ) try: raw_response.raise_for_status() except httpx.HTTPStatusError as http_exc: raise self._http_exc_class.from_httpx_error(http_exc) log_httpx_response(response=raw_response) return raw_response def request(self,
*,
http_method: str = 'POST',
payload: dict[str, Any] | None = None,
additional_path: str | None = None,
request_params: dict[str, Any] = {},
raise_api_errors: bool = True,
timeout_context: _TimeoutContext | None = None,
caller_function_name: str | None = None) ‑> dict[str, typing.Any]-
Expand source code
def request( self, *, http_method: str = HttpMethod.POST, payload: dict[str, Any] | None = None, additional_path: str | None = None, request_params: dict[str, Any] = {}, raise_api_errors: bool = True, timeout_context: _TimeoutContext | None = None, caller_function_name: str | None = None, ) -> dict[str, Any]: request_id = str(uuid7()) raw_response = self.raw_request( http_method=http_method, payload=payload, additional_path=additional_path, request_params=request_params, timeout_context=timeout_context, caller_function_name=caller_function_name, request_id=request_id, ) return self._raw_response_to_json( raw_response, raise_api_errors=raise_api_errors, payload=payload, caller_function_name=caller_function_name, request_id=request_id, )