Module astrapy.data.utils.table_converters
Functions
def create_key_ktpostprocessor(primary_key_schema: dict[str, TableColumnTypeDescriptor],
options: FullSerdesOptions) ‑> Callable[[list[Any]], tuple[tuple[Any, ...], dict[str, Any]]]-
Expand source code
def create_key_ktpostprocessor( primary_key_schema: dict[str, TableColumnTypeDescriptor], options: FullSerdesOptions, ) -> Callable[[list[Any]], tuple[tuple[Any, ...], dict[str, Any]]]: ktpostprocessor_list: list[tuple[str, Callable[[Any], Any]]] = [ (col_name, _create_column_tpostprocessor(col_definition, options=options)) for col_name, col_definition in primary_key_schema.items() ] def _ktpostprocessor( primary_key_list: list[Any], ) -> tuple[tuple[Any, ...], dict[str, Any]]: if len(primary_key_list) != len(ktpostprocessor_list): raise ValueError( "Primary key list length / schema mismatch " f"(expected {len(ktpostprocessor_list)}, " f"received {len(primary_key_list)} fields)" ) k_tuple = tuple( [ ktpostprocessor(pk_col_value) for pk_col_value, (_, ktpostprocessor) in zip( primary_key_list, ktpostprocessor_list, ) ] ) k_dict = { pk_col_name: pk_processed_value for pk_processed_value, (pk_col_name, _) in zip( k_tuple, ktpostprocessor_list, ) } return k_tuple, k_dict return _ktpostprocessor def create_row_tpostprocessor(columns: dict[str, TableColumnTypeDescriptor],
options: FullSerdesOptions,
similarity_pseudocolumn: str | None) ‑> Callable[[dict[str, Any]], dict[str, Any]]-
Expand source code
def create_row_tpostprocessor( columns: dict[str, TableColumnTypeDescriptor], options: FullSerdesOptions, similarity_pseudocolumn: str | None, ) -> Callable[[dict[str, Any]], dict[str, Any]]: tpostprocessor_map = { col_name: _create_column_tpostprocessor(col_definition, options=options) for col_name, col_definition in columns.items() } tfiller_map = { col_name: _column_filler_value(col_definition) for col_name, col_definition in columns.items() } if similarity_pseudocolumn is not None: # whatever in the passed schema, requiring similarity overrides that 'column': tpostprocessor_map[similarity_pseudocolumn] = _create_scalar_tpostprocessor( column_type=ColumnType.FLOAT, options=options ) tfiller_map[similarity_pseudocolumn] = None column_name_set = set(tpostprocessor_map.keys()) def _tpostprocessor(raw_dict: dict[str, Any]) -> dict[str, Any]: extra_fields = set(raw_dict.keys()) - column_name_set if extra_fields: xf_desc = ", ".join(f'"{f}"' for f in sorted(extra_fields)) raise ValueError(f"Returned row has unexpected fields: {xf_desc}") return { col_name: ( # making a copy here, since the user may mutate e.g. a map: tpostprocessor(copy.copy(tfiller_map[col_name])) if col_name not in raw_dict else tpostprocessor(raw_dict[col_name]) ) for col_name, tpostprocessor in tpostprocessor_map.items() } return _tpostprocessor def preprocess_table_payload(payload: dict[str, Any] | None,
options: FullSerdesOptions,
map2tuple_checker: Callable[[list[str]], bool] | None) ‑> dict[str, typing.Any] | None-
Expand source code
def preprocess_table_payload( payload: dict[str, Any] | None, options: FullSerdesOptions, map2tuple_checker: Callable[[list[str]], bool] | None, ) -> dict[str, Any] | None: """ Normalize a payload for API calls. This includes e.g. ensuring values for "$vector" key are made into plain lists of floats. Args: payload (dict[str, Any]): A dict expressing a payload for an API call options: a FullSerdesOptions setting the preprocessing configuration map2tuple_checker: a boolean function of a path in the doc, that returns True for "doc-like" portions of a payload, i.e. whose maps/DataAPIMaps can be converted into association lists, if such autoconversion is turned on. If this parameter is None, no paths are autoconverted. Returns: dict[str, Any]: a payload dict, pre-processed, ready for HTTP requests. """ if payload: return cast( dict[str, Any], preprocess_table_payload_value( [], payload, options=options, map2tuple_checker=map2tuple_checker, ), ) else: return payloadNormalize a payload for API calls. This includes e.g. ensuring values for "$vector" key are made into plain lists of floats.
Args
payload:dict[str, Any]- A dict expressing a payload for an API call
options- a FullSerdesOptions setting the preprocessing configuration
map2tuple_checker- a boolean function of a path in the doc, that returns True for "doc-like" portions of a payload, i.e. whose maps/DataAPIMaps can be converted into association lists, if such autoconversion is turned on. If this parameter is None, no paths are autoconverted.
Returns
dict[str, Any]- a payload dict, pre-processed, ready for HTTP requests.
def preprocess_table_payload_value(path: list[str],
value: Any,
options: FullSerdesOptions,
map2tuple_checker: Callable[[list[str]], bool] | None) ‑> Any-
Expand source code
def preprocess_table_payload_value( path: list[str], value: Any, options: FullSerdesOptions, map2tuple_checker: Callable[[list[str]], bool] | None, ) -> Any: """ Walk a payload for Tables and apply the necessary and required conversions to make it into a ready-to-jsondumps object. """ # The check for UDT dict-wrapper must come before the "plain dict" check if isinstance(value, DataAPIDictUDT): # field-wise serialize and return as (JSON-ready) map: udt_dict = dict(value) return { udt_k: preprocess_table_payload_value( path + [udt_k], udt_v, options=options, map2tuple_checker=map2tuple_checker, ) for udt_k, udt_v in udt_dict.items() } elif isinstance(value, (dict, DataAPIMap)): # This is a nesting structure (but not the dict-wrapper for UDTs) maps_can_become_tuples: bool if options.encode_maps_as_lists_in_tables == MapEncodingMode.NEVER: maps_can_become_tuples = False elif options.encode_maps_as_lists_in_tables == MapEncodingMode.DATAAPIMAPS: maps_can_become_tuples = isinstance(value, DataAPIMap) else: # 'ALWAYS' setting maps_can_become_tuples = True maps_become_tuples: bool if maps_can_become_tuples: if map2tuple_checker is None: maps_become_tuples = False else: maps_become_tuples = map2tuple_checker(path) else: maps_become_tuples = False # empty maps must always be encoded as `{}`, never as `[]` (#2005) if maps_become_tuples and value: return [ [ preprocess_table_payload_value( path, k, options=options, map2tuple_checker=map2tuple_checker, ), preprocess_table_payload_value( path + [k], v, options=options, map2tuple_checker=map2tuple_checker, ), ] for k, v in value.items() ] return { preprocess_table_payload_value( path, k, options=options, map2tuple_checker=map2tuple_checker ): preprocess_table_payload_value( path + [k], v, options=options, map2tuple_checker=map2tuple_checker ) for k, v in value.items() } elif isinstance(value, (list, set, DataAPISet)): return [ preprocess_table_payload_value( path + [""], v, options=options, map2tuple_checker=map2tuple_checker ) for v in value ] # it's a scalar of some kind (which includes DataAPIVector) if isinstance(value, float): # Non-numbers must be manually made into a string if math.isnan(value): return NAN_FLOAT_STRING_REPRESENTATION elif math.isinf(value): if value > 0: return PLUS_INFINITY_FLOAT_STRING_REPRESENTATION else: return MINUS_INFINITY_FLOAT_STRING_REPRESENTATION return value elif isinstance(value, bytes): return convert_to_ejson_bytes(value) elif isinstance(value, DataAPIVector): if options.binary_encode_vectors: return convert_to_ejson_bytes(value.to_bytes()) else: # regular list of floats - which can contain non-numbers: return [ preprocess_table_payload_value( path + [""], fval, options=options, map2tuple_checker=map2tuple_checker, ) for fval in value.data ] elif isinstance(value, DataAPITimestamp): return value.to_string() elif isinstance(value, DataAPIDate): return value.to_string() elif isinstance(value, DataAPITime): return value.to_string() elif isinstance(value, datetime.datetime): # encoding in two steps (that's because the '%:z' strftime directive # is not in all supported Python versions). offset_tuple = _get_datetime_offset(value) if offset_tuple is None: if options.accept_naive_datetimes: return DataAPITimestamp(int(value.timestamp() * 1000)).to_string() raise ValueError(CANNOT_ENCODE_NAIVE_DATETIME_ERROR_MESSAGE) date_part_str = value.strftime(DATETIME_DATETIME_FORMAT) offset_h, offset_m = offset_tuple offset_part_str = f"{offset_h:+03}:{offset_m:02}" return f"{date_part_str}{offset_part_str}" elif isinstance(value, datetime.date): # there's no format to specify - and this is compliant anyway: return value.strftime(DATETIME_DATE_FORMAT) elif isinstance(value, datetime.time): return value.strftime(DATETIME_TIME_FORMAT) elif isinstance(value, decimal.Decimal): # Non-numbers must be manually made into a string, just like floats if math.isnan(value): return NAN_FLOAT_STRING_REPRESENTATION elif math.isinf(value): if value > 0: return PLUS_INFINITY_FLOAT_STRING_REPRESENTATION else: return MINUS_INFINITY_FLOAT_STRING_REPRESENTATION # actually-numeric decimals: leave them as they are for the encoding step, # which will apply the nasty trick to ensure all digits get there. return value elif isinstance(value, DataAPIDuration): # using to_c_string over to_string until the ISO-format parsing can # cope with subsecond fractions: return value.to_c_string() elif isinstance(value, UUID): return str(value) elif isinstance(value, (ipaddress.IPv4Address, ipaddress.IPv6Address)): return str(value) elif isinstance(value, datetime.timedelta): return DataAPIDuration.from_timedelta(value).to_c_string() elif isinstance(value, ObjectId): raise ValueError( "Values of type ObjectId are not supported. Consider switching to " "using UUID-based identifiers instead." ) # try to unroll if applicable and then preprocess known types: _uvalue: Any if options.unroll_iterables_to_lists: _uvalue = ensure_unrolled_if_iterable(value) else: _uvalue = value # process it as if isinstance(_uvalue, list): return [ preprocess_table_payload_value( path + [""], v, options=options, map2tuple_checker=map2tuple_checker ) for v in _uvalue ] # is it a well-known, natively-JSON-serializable type: if isinstance(_uvalue, (str, int, float, bool, type(None))): return _uvalue # check whether instance of a class with a registered serializer: for k_cls, k_serializer in options.serializer_by_class.items(): if isinstance(_uvalue, k_cls) and k_serializer is not None: udt_dict_form = k_serializer(_uvalue) return { udt_k: preprocess_table_payload_value( path + [udt_k], udt_v, options=options, map2tuple_checker=map2tuple_checker, ) for udt_k, udt_v in udt_dict_form.items() } # this is a last-ditch attempt. Likely results in a "not JSON serializable" error" return _uvalueWalk a payload for Tables and apply the necessary and required conversions to make it into a ready-to-jsondumps object.