Module astrapy.data.utils.table_converters

Functions

def create_key_ktpostprocessor(primary_key_schema: dict[str, TableColumnTypeDescriptor],
options: FullSerdesOptions) ‑> Callable[[list[Any]], tuple[tuple[Any, ...], dict[str, Any]]]
Expand source code
def create_key_ktpostprocessor(
    primary_key_schema: dict[str, TableColumnTypeDescriptor],
    options: FullSerdesOptions,
) -> Callable[[list[Any]], tuple[tuple[Any, ...], dict[str, Any]]]:
    ktpostprocessor_list: list[tuple[str, Callable[[Any], Any]]] = [
        (col_name, _create_column_tpostprocessor(col_definition, options=options))
        for col_name, col_definition in primary_key_schema.items()
    ]

    def _ktpostprocessor(
        primary_key_list: list[Any],
    ) -> tuple[tuple[Any, ...], dict[str, Any]]:
        if len(primary_key_list) != len(ktpostprocessor_list):
            raise ValueError(
                "Primary key list length / schema mismatch "
                f"(expected {len(ktpostprocessor_list)}, "
                f"received {len(primary_key_list)} fields)"
            )
        k_tuple = tuple(
            [
                ktpostprocessor(pk_col_value)
                for pk_col_value, (_, ktpostprocessor) in zip(
                    primary_key_list,
                    ktpostprocessor_list,
                )
            ]
        )
        k_dict = {
            pk_col_name: pk_processed_value
            for pk_processed_value, (pk_col_name, _) in zip(
                k_tuple,
                ktpostprocessor_list,
            )
        }
        return k_tuple, k_dict

    return _ktpostprocessor
def create_row_tpostprocessor(columns: dict[str, TableColumnTypeDescriptor],
options: FullSerdesOptions,
similarity_pseudocolumn: str | None) ‑> Callable[[dict[str, Any]], dict[str, Any]]
Expand source code
def create_row_tpostprocessor(
    columns: dict[str, TableColumnTypeDescriptor],
    options: FullSerdesOptions,
    similarity_pseudocolumn: str | None,
) -> Callable[[dict[str, Any]], dict[str, Any]]:
    tpostprocessor_map = {
        col_name: _create_column_tpostprocessor(col_definition, options=options)
        for col_name, col_definition in columns.items()
    }
    tfiller_map = {
        col_name: _column_filler_value(col_definition)
        for col_name, col_definition in columns.items()
    }
    if similarity_pseudocolumn is not None:
        # whatever in the passed schema, requiring similarity overrides that 'column':
        tpostprocessor_map[similarity_pseudocolumn] = _create_scalar_tpostprocessor(
            column_type=ColumnType.FLOAT, options=options
        )
        tfiller_map[similarity_pseudocolumn] = None
    column_name_set = set(tpostprocessor_map.keys())

    def _tpostprocessor(raw_dict: dict[str, Any]) -> dict[str, Any]:
        extra_fields = set(raw_dict.keys()) - column_name_set
        if extra_fields:
            xf_desc = ", ".join(f'"{f}"' for f in sorted(extra_fields))
            raise ValueError(f"Returned row has unexpected fields: {xf_desc}")
        return {
            col_name: (
                # making a copy here, since the user may mutate e.g. a map:
                tpostprocessor(copy.copy(tfiller_map[col_name]))
                if col_name not in raw_dict
                else tpostprocessor(raw_dict[col_name])
            )
            for col_name, tpostprocessor in tpostprocessor_map.items()
        }

    return _tpostprocessor
def preprocess_table_payload(payload: dict[str, Any] | None,
options: FullSerdesOptions,
map2tuple_checker: Callable[[list[str]], bool] | None) ‑> dict[str, typing.Any] | None
Expand source code
def preprocess_table_payload(
    payload: dict[str, Any] | None,
    options: FullSerdesOptions,
    map2tuple_checker: Callable[[list[str]], bool] | None,
) -> dict[str, Any] | None:
    """
    Normalize a payload for API calls.
    This includes e.g. ensuring values for "$vector" key
    are made into plain lists of floats.

    Args:
        payload (dict[str, Any]): A dict expressing a payload for an API call
        options: a FullSerdesOptions setting the preprocessing configuration
        map2tuple_checker: a boolean function of a path in the doc, that returns
            True for "doc-like" portions of a payload, i.e. whose maps/DataAPIMaps
            can be converted into association lists, if such autoconversion is
            turned on. If this parameter is None, no paths are autoconverted.

    Returns:
        dict[str, Any]: a payload dict, pre-processed, ready for HTTP requests.
    """

    if payload:
        return cast(
            dict[str, Any],
            preprocess_table_payload_value(
                [],
                payload,
                options=options,
                map2tuple_checker=map2tuple_checker,
            ),
        )
    else:
        return payload

Normalize a payload for API calls. This includes e.g. ensuring values for "$vector" key are made into plain lists of floats.

Args

payload : dict[str, Any]
A dict expressing a payload for an API call
options
a FullSerdesOptions setting the preprocessing configuration
map2tuple_checker
a boolean function of a path in the doc, that returns True for "doc-like" portions of a payload, i.e. whose maps/DataAPIMaps can be converted into association lists, if such autoconversion is turned on. If this parameter is None, no paths are autoconverted.

Returns

dict[str, Any]
a payload dict, pre-processed, ready for HTTP requests.
def preprocess_table_payload_value(path: list[str],
value: Any,
options: FullSerdesOptions,
map2tuple_checker: Callable[[list[str]], bool] | None) ‑> Any
Expand source code
def preprocess_table_payload_value(
    path: list[str],
    value: Any,
    options: FullSerdesOptions,
    map2tuple_checker: Callable[[list[str]], bool] | None,
) -> Any:
    """
    Walk a payload for Tables and apply the necessary and required conversions
    to make it into a ready-to-jsondumps object.
    """

    # The check for UDT dict-wrapper must come before the "plain dict" check
    if isinstance(value, DataAPIDictUDT):
        # field-wise serialize and return as (JSON-ready) map:
        udt_dict = dict(value)
        return {
            udt_k: preprocess_table_payload_value(
                path + [udt_k],
                udt_v,
                options=options,
                map2tuple_checker=map2tuple_checker,
            )
            for udt_k, udt_v in udt_dict.items()
        }
    elif isinstance(value, (dict, DataAPIMap)):
        # This is a nesting structure (but not the dict-wrapper for UDTs)
        maps_can_become_tuples: bool
        if options.encode_maps_as_lists_in_tables == MapEncodingMode.NEVER:
            maps_can_become_tuples = False
        elif options.encode_maps_as_lists_in_tables == MapEncodingMode.DATAAPIMAPS:
            maps_can_become_tuples = isinstance(value, DataAPIMap)
        else:
            # 'ALWAYS' setting
            maps_can_become_tuples = True

        maps_become_tuples: bool
        if maps_can_become_tuples:
            if map2tuple_checker is None:
                maps_become_tuples = False
            else:
                maps_become_tuples = map2tuple_checker(path)
        else:
            maps_become_tuples = False

        # empty maps must always be encoded as `{}`, never as `[]` (#2005)
        if maps_become_tuples and value:
            return [
                [
                    preprocess_table_payload_value(
                        path,
                        k,
                        options=options,
                        map2tuple_checker=map2tuple_checker,
                    ),
                    preprocess_table_payload_value(
                        path + [k],
                        v,
                        options=options,
                        map2tuple_checker=map2tuple_checker,
                    ),
                ]
                for k, v in value.items()
            ]

        return {
            preprocess_table_payload_value(
                path, k, options=options, map2tuple_checker=map2tuple_checker
            ): preprocess_table_payload_value(
                path + [k], v, options=options, map2tuple_checker=map2tuple_checker
            )
            for k, v in value.items()
        }
    elif isinstance(value, (list, set, DataAPISet)):
        return [
            preprocess_table_payload_value(
                path + [""], v, options=options, map2tuple_checker=map2tuple_checker
            )
            for v in value
        ]

    # it's a scalar of some kind (which includes DataAPIVector)
    if isinstance(value, float):
        # Non-numbers must be manually made into a string
        if math.isnan(value):
            return NAN_FLOAT_STRING_REPRESENTATION
        elif math.isinf(value):
            if value > 0:
                return PLUS_INFINITY_FLOAT_STRING_REPRESENTATION
            else:
                return MINUS_INFINITY_FLOAT_STRING_REPRESENTATION
        return value
    elif isinstance(value, bytes):
        return convert_to_ejson_bytes(value)
    elif isinstance(value, DataAPIVector):
        if options.binary_encode_vectors:
            return convert_to_ejson_bytes(value.to_bytes())
        else:
            # regular list of floats - which can contain non-numbers:
            return [
                preprocess_table_payload_value(
                    path + [""],
                    fval,
                    options=options,
                    map2tuple_checker=map2tuple_checker,
                )
                for fval in value.data
            ]
    elif isinstance(value, DataAPITimestamp):
        return value.to_string()
    elif isinstance(value, DataAPIDate):
        return value.to_string()
    elif isinstance(value, DataAPITime):
        return value.to_string()
    elif isinstance(value, datetime.datetime):
        # encoding in two steps (that's because the '%:z' strftime directive
        # is not in all supported Python versions).
        offset_tuple = _get_datetime_offset(value)
        if offset_tuple is None:
            if options.accept_naive_datetimes:
                return DataAPITimestamp(int(value.timestamp() * 1000)).to_string()
            raise ValueError(CANNOT_ENCODE_NAIVE_DATETIME_ERROR_MESSAGE)
        date_part_str = value.strftime(DATETIME_DATETIME_FORMAT)
        offset_h, offset_m = offset_tuple
        offset_part_str = f"{offset_h:+03}:{offset_m:02}"
        return f"{date_part_str}{offset_part_str}"
    elif isinstance(value, datetime.date):
        # there's no format to specify - and this is compliant anyway:
        return value.strftime(DATETIME_DATE_FORMAT)
    elif isinstance(value, datetime.time):
        return value.strftime(DATETIME_TIME_FORMAT)
    elif isinstance(value, decimal.Decimal):
        # Non-numbers must be manually made into a string, just like floats
        if math.isnan(value):
            return NAN_FLOAT_STRING_REPRESENTATION
        elif math.isinf(value):
            if value > 0:
                return PLUS_INFINITY_FLOAT_STRING_REPRESENTATION
            else:
                return MINUS_INFINITY_FLOAT_STRING_REPRESENTATION
        # actually-numeric decimals: leave them as they are for the encoding step,
        # which will apply the nasty trick to ensure all digits get there.
        return value
    elif isinstance(value, DataAPIDuration):
        # using to_c_string over to_string until the ISO-format parsing can
        # cope with subsecond fractions:
        return value.to_c_string()
    elif isinstance(value, UUID):
        return str(value)
    elif isinstance(value, (ipaddress.IPv4Address, ipaddress.IPv6Address)):
        return str(value)
    elif isinstance(value, datetime.timedelta):
        return DataAPIDuration.from_timedelta(value).to_c_string()
    elif isinstance(value, ObjectId):
        raise ValueError(
            "Values of type ObjectId are not supported. Consider switching to "
            "using UUID-based identifiers instead."
        )

    # try to unroll if applicable and then preprocess known types:
    _uvalue: Any
    if options.unroll_iterables_to_lists:
        _uvalue = ensure_unrolled_if_iterable(value)
    else:
        _uvalue = value
    # process it as
    if isinstance(_uvalue, list):
        return [
            preprocess_table_payload_value(
                path + [""], v, options=options, map2tuple_checker=map2tuple_checker
            )
            for v in _uvalue
        ]

    # is it a well-known, natively-JSON-serializable type:
    if isinstance(_uvalue, (str, int, float, bool, type(None))):
        return _uvalue

    # check whether instance of a class with a registered serializer:
    for k_cls, k_serializer in options.serializer_by_class.items():
        if isinstance(_uvalue, k_cls) and k_serializer is not None:
            udt_dict_form = k_serializer(_uvalue)
            return {
                udt_k: preprocess_table_payload_value(
                    path + [udt_k],
                    udt_v,
                    options=options,
                    map2tuple_checker=map2tuple_checker,
                )
                for udt_k, udt_v in udt_dict_form.items()
            }

    # this is a last-ditch attempt. Likely results in a "not JSON serializable" error"
    return _uvalue

Walk a payload for Tables and apply the necessary and required conversions to make it into a ready-to-jsondumps object.