Module astrapy.utils.duration_std_utils

Expand source code
# Copyright DataStax, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import annotations

import re

DURATION_STD_MONTH_MULTIPLIER = {
    "Y": 12,
    "M": 1,
}
DURATION_STD_DAY_MULTIPLIER = {
    "D": 1,
}
DURATION_STD_NANOSECOND_MULTIPLIER = {
    "H": 3600000000000,
    "M": 60000000000,
    "S": 1000000000,
}

DURATION_STD_FORMAT_DESC = (
    "Durations must be expressed according to the '[-]P[nY][nM][nD][T[nH][nM][fS]]' "
    "pattern, with: an optional leading minus sign; a literal P; a sequence of at "
    "least one '<quantity><unit>' specification. These, if appearing, must be in "
    "strict order (resp. years, months, days, hours, minutes, seconds). If sub-day "
    "specifications are present, a literal 'T' must separate them from the whole-day "
    "part. Quantities are non-negative integers ('n') except for the seconds ('S') "
    "specification, which can be a decimal number. At least one time specification "
    'must be provided. Examples: "P1M10D", "P1Y3MT4H15.43S", "-PT2M55S".'
)


def _parse_std_duration_string(duration_string: str) -> tuple[int, int, int, int]:
    # format: "[-]P[nY][nM][nD][T[nH][nM][fS]]" (n=non-neg int, f=non-neg float)
    if duration_string == "":
        raise ValueError(
            f"Empty strings are not valid durations. {DURATION_STD_FORMAT_DESC}"
        )

    signum0: int
    # without the '[-]P' preamble:
    _stripped0: str
    if duration_string[:2] == "-P":
        signum0 = -1
        _stripped0 = duration_string[2:]
    elif duration_string[:1] == "P":
        signum0 = +1
        _stripped0 = duration_string[1:]
    else:
        raise ValueError(
            "A string not starting with '-P' or 'P' is not a valid "
            f"duration (received: {duration_string}. {DURATION_STD_FORMAT_DESC}"
        )

    # chunks before and after the "T" (even if absent, filled here)
    _pre_t: str
    _post_t: str
    if _stripped0 == "" or _stripped0 == "T":
        raise ValueError(
            "A string without quantity-unit specifications is not a valid "
            f"duration (received: {duration_string}. {DURATION_STD_FORMAT_DESC}"
        )
    t_blocks = _stripped0.split("T")
    if len(t_blocks) == 0:
        raise ValueError(
            "A string without quantity-unit specifications is not a valid "
            f"duration (received: {duration_string}. {DURATION_STD_FORMAT_DESC}"
        )
    elif len(t_blocks) == 1:
        _pre_t = t_blocks[0]
        _post_t = ""
    elif len(t_blocks) == 2:
        _pre_t = t_blocks[0]
        _post_t = t_blocks[1]
    else:
        raise ValueError(
            "A duration string must containt at most one single 'T' separator "
            f"(received: {duration_string}. {DURATION_STD_FORMAT_DESC}"
        )

    # parsing the pre-T block
    months: int
    days: int
    PRE_T_VALIDATOR = re.compile(r"^(\d+Y)?(\d+M)?(\d+D)?$")
    PRE_T_PARSER = re.compile(r"(\d+)(Y|M|D)")
    if PRE_T_VALIDATOR.fullmatch(_pre_t):
        pre_t_qunits = {
            match.group(2): match.group(1) for match in PRE_T_PARSER.finditer(_pre_t)
        }
        parsed_pre_t_vals = {unit: int(valstr) for unit, valstr in pre_t_qunits.items()}
        months = sum(
            umult * parsed_pre_t_vals.get(unit, 0)
            for unit, umult in DURATION_STD_MONTH_MULTIPLIER.items()
        )
        days = sum(
            umult * parsed_pre_t_vals.get(unit, 0)
            for unit, umult in DURATION_STD_DAY_MULTIPLIER.items()
        )
    else:
        raise ValueError(
            "Invalid whole-day component for a duration string "
            f"(received: {duration_string}. {DURATION_STD_FORMAT_DESC}"
        )

    # parsing the post-T block. Here only the last one is allowed to be float
    nanoseconds: int
    POST_T_VALIDATOR = re.compile(r"^(\d+H)?(\d+M)?(\d*\.?\d*S)?$")
    POST_T_PARSER = re.compile(r"(\d*\.?\d*)(H|M|S)")
    if POST_T_VALIDATOR.fullmatch(_post_t):
        post_t_qunits = {
            match.group(2): match.group(1) for match in POST_T_PARSER.finditer(_post_t)
        }
        try:
            parsed_post_t_vals = {
                unit: int(valstr) if unit != "S" else float(valstr)
                for unit, valstr in post_t_qunits.items()
            }
        except ValueError:
            raise ValueError(
                "Float values are accepted only for the seconds duration specification "
                f"(received: {duration_string}. {DURATION_STD_FORMAT_DESC}"
            )
        nanoseconds = sum(
            int(umult * parsed_post_t_vals.get(unit, 0) + (0.5 if unit == "S" else 0.0))
            for unit, umult in DURATION_STD_NANOSECOND_MULTIPLIER.items()
        )
    else:
        raise ValueError(
            "Invalid fraction-of-day component for a duration string "
            f"(received: {duration_string}. {DURATION_STD_FORMAT_DESC}"
        )

    # resolve signum ambiguity if null duration
    signum: int
    if months != 0 or days != 0 or nanoseconds != 0:
        signum = signum0
    else:
        signum = +1

    return (signum, months, days, nanoseconds)


def _build_std_duration_string(
    signum: int,
    months: int,
    days: int,
    nanoseconds: int,
) -> str:
    _signum_string: str | None = None
    _preamble_string = "P"
    _month_string: str | None = None
    _day_string: str | None = None
    _t_block: str | None = None
    _nanosecond_string: str | None = None

    if months == 0 and days == 0 and nanoseconds == 0:
        # a special case
        return "PT0S"

    if signum < 0:
        _signum_string = "-"
    if months:
        _month_string = ""
        _residual_months = months
        for u, div in DURATION_STD_MONTH_MULTIPLIER.items():
            u_qty = _residual_months // div
            if u_qty >= 1:
                _month_string += f"{u_qty}{u}"
                _residual_months -= u_qty * div
    _day_string = f"{days}D" if days > 0 else None
    if nanoseconds:
        _nanosecond_string = ""
        _residual_nanoseconds = nanoseconds
        for u, div in DURATION_STD_NANOSECOND_MULTIPLIER.items():
            if u != "S":
                u_qty = _residual_nanoseconds // div
                if u_qty >= 1:
                    _nanosecond_string += f"{u_qty}{u}"
                    _residual_nanoseconds -= u_qty * div
        if _residual_nanoseconds != 0:
            _seconds_part = f"{_residual_nanoseconds / 1000000000:.9f}".rstrip(
                "0"
            ).rstrip(".")
            _nanosecond_string += f"{_seconds_part}S"
    if _nanosecond_string is not None:
        _t_block = "T"
    return "".join(
        b
        for b in (
            _signum_string,
            _preamble_string,
            _month_string,
            _day_string,
            _t_block,
            _nanosecond_string,
        )
        if b is not None
    )