"""Misc public utilities, e.g. to manage the timezone cache, or patch the time"""
from __future__ import annotations
import os.path # NOTE: we don't use pathlib here to keep our imports light
import sysconfig
from contextlib import contextmanager
from functools import partial
from typing import Any, Iterable, Iterator, no_type_check
from ._core import (
Instant,
OffsetDateTime,
ZonedDateTime,
_clear_tz_cache,
_clear_tz_cache_by_keys,
_ignore_days_not_always_24h_warning,
_ignore_potentially_stale_offset_warning,
_ignore_timezone_unaware_arithmetic_warning,
_patch_time_frozen,
_patch_time_keep_ticking,
_set_tzpath,
_unpatch_time,
)
# Maintainer's notes:
# - Yes I dislike the name `utils` too, but it seems to fit OK in this case.
# - These functions are implemented in Python regardless of whether the Rust
# extension is active. This is fine because they are not performance-critical,
# and build upon the core API.
__all__ = [
"patch_current_time",
"reset_tzpath",
"clear_tzcache",
"available_timezones",
"ignore_days_not_always_24h_warning",
"ignore_potentially_stale_offset_warning",
"ignore_timezone_unaware_arithmetic_warning",
]
[docs]
@contextmanager
def ignore_days_not_always_24h_warning() -> Iterator[None]:
"""Context manager to ignore warnings about days that are not always 24 hours
long (due to DST transitions).
Example
-------
>>> from whenever import TimeDelta
>>> d = TimeDelta(hours=100)
>>> with ignore_days_not_always_24h_warning():
... d.total("days") # no warning
"""
token = _ignore_days_not_always_24h_warning.set(True)
try:
yield
finally:
_ignore_days_not_always_24h_warning.reset(token)
[docs]
@contextmanager
def ignore_potentially_stale_offset_warning() -> Iterator[None]:
"""Context manager to suppress :class:`~whenever.PotentiallyStaleOffsetWarning`.
This warning is emitted when operations on an :class:`~whenever.OffsetDateTime`
may produce an incorrect UTC offset (e.g. shifting, rounding, replacing fields,
or constructing from the current time or a UNIX timestamp).
Use this context manager when the fixed offset is intentional and correct.
Example
-------
>>> from whenever import OffsetDateTime, TimeDelta
>>> dt = OffsetDateTime(2023, 3, 23, offset=+1)
>>> with ignore_potentially_stale_offset_warning():
... dt + TimeDelta(hours=1_000) # no warning
"""
token = _ignore_potentially_stale_offset_warning.set(True)
try:
yield
finally:
_ignore_potentially_stale_offset_warning.reset(token)
[docs]
@contextmanager
def ignore_timezone_unaware_arithmetic_warning() -> Iterator[None]:
"""Context manager to suppress :class:`~whenever.TimeZoneUnawareArithmeticWarning`.
This warning is always emitted when performing arithmetic with exact time units
on a :class:`~whenever.PlainDateTime`, or when measuring the difference between
two :class:`~whenever.PlainDateTime` values.
Use this context manager if you: (a) explicitly accept potentially incorrect
results, (b) know no timezone transitions occur in the interval, or (c) are
working with clock times not representing a real-world timezone.
Example
-------
>>> from whenever import PlainDateTime, TimeDelta
>>> dt = PlainDateTime(2023, 3, 23, hour=12)
>>> with ignore_timezone_unaware_arithmetic_warning():
... dt.add(hours=2) # no warning
PlainDateTime("2023-03-23 14:00:00")
"""
token = _ignore_timezone_unaware_arithmetic_warning.set(True)
try:
yield
finally:
_ignore_timezone_unaware_arithmetic_warning.reset(token)
class _TimePatch:
_pin: Instant | ZonedDateTime | OffsetDateTime
_keep_ticking: bool
def __init__(
self,
pin: Instant | ZonedDateTime | OffsetDateTime,
keep_ticking: bool,
):
self._pin = pin
self._keep_ticking = keep_ticking
# NOTE: permissively typechecked, but that's OK for a testing utility
def shift(self, *args: Any, **kwargs: Any) -> None:
if self._keep_ticking:
self._pin = new = (self._pin + (Instant.now() - self._pin)).add(
*args, **kwargs
)
_patch_time_keep_ticking(
new if isinstance(new, Instant) else new.to_instant()
)
else:
self._pin = new = self._pin.add(*args, **kwargs)
_patch_time_frozen(
new if isinstance(new, Instant) else new.to_instant()
)
[docs]
@contextmanager
def patch_current_time(
dt: Instant | ZonedDateTime | OffsetDateTime,
/,
*,
keep_ticking: bool,
) -> Iterator[_TimePatch]:
"""Patch the current time to a fixed value (for testing purposes).
Behaves as a context manager or decorator, with similar semantics to
``unittest.mock.patch``.
Important
---------
* This function should be used only for testing purposes. It is not
thread-safe or part of the stable API.
* This function only affects whenever's ``now`` functions. It does not
affect the standard library's time functions or any other libraries.
Use the ``time_machine`` package if you also want to patch other libraries.
* It doesn't affect the system timezone.
If you need to patch the system timezone, set the ``TZ`` environment
variable in combination with :func:`~whenever.reset_system_tz`.
Example
-------
>>> from whenever import Instant, patch_current_time
>>> i = Instant.from_utc(1980, 3, 2, hour=2)
>>> with patch_current_time(i, keep_ticking=False) as p:
... assert Instant.now() == i
... p.shift(hours=4)
... assert i.now() == i.add(hours=4)
...
>>> assert Instant.now() != i
...
>>> @patch_current_time(i, keep_ticking=True)
... def test_thing(p):
... assert (Instant.now() - i) < seconds(1)
... p.shift(hours=8)
... sleep(0.000001)
... assert hours(8) < (Instant.now() - i) < hours(8.1)
"""
instant = dt if isinstance(dt, Instant) else dt.to_instant()
if keep_ticking:
_patch_time_keep_ticking(instant)
else:
_patch_time_frozen(instant)
try:
yield _TimePatch(dt, keep_ticking)
finally:
_unpatch_time()
TZPATH: tuple[str, ...] = ()
[docs]
def reset_tzpath(
target: Iterable[str | os.PathLike[str]] | None = None, /
) -> None:
"""Reset or set the paths in which ``whenever`` will search for timezone data.
It does not affect the :mod:`zoneinfo` module or other libraries.
Note
----
Due to caching, you may find that looking up a timezone after setting the tzpath
doesn't load the timezone data from the new path. You may need to call
:func:`clear_tzcache` if you want to force loading *all* timezones from the new path.
Note that clearing the cache may have unexpected side effects, however.
Behaves similarly to :func:`zoneinfo.reset_tzpath`
"""
global TZPATH
if target is not None:
# This is such a common mistake, that we raise a descriptive error
if isinstance(target, (str, bytes)):
raise TypeError("tzpath must be an iterable of paths")
if not all(map(os.path.isabs, target)):
raise ValueError("tzpaths must be absolute paths")
# mypy doesn't seem to follow, but it appears correct
TZPATH = tuple(map(os.fspath, target)) # type: ignore[arg-type]
else:
TZPATH = _tzpath_from_env()
_set_tzpath(TZPATH)
def _tzpath_from_env() -> tuple[str, ...]:
try:
env_var = os.environ["PYTHONTZPATH"]
except KeyError:
env_var = sysconfig.get_config_var("TZPATH")
# FUTURE: include in test coverage
if not env_var:
return () # pragma: no cover
raw_tzpath = env_var.split(os.pathsep)
# according to spec, we're allowed to silently ignore invalid paths
new_tzpath = tuple(filter(os.path.isabs, raw_tzpath))
return new_tzpath
[docs]
def clear_tzcache(*, only_keys: Iterable[str] | None = None) -> None:
"""Clear the timezone cache. If ``only_keys`` is provided, only the cache for those
keys will be cleared.
Caution
-------
Calling this function may change the behavior of existing ``ZonedDateTime``
instances in surprising ways. Most significantly, ``exact_eq()`` may
return ``False`` between two timezone instances with the same TZ ID,
if this timezone definition was changed on disk.
**Use this function only if you know that you need to.**
Behaves similarly to :meth:`zoneinfo.ZoneInfo.clear_cache`.
"""
if only_keys is None:
_clear_tz_cache()
else:
_clear_tz_cache_by_keys(tuple(only_keys))
[docs]
def available_timezones() -> set[str]:
"""Gather the set of all available timezones.
Each call to this function will recalculate the available timezone names
depending on the currently configured ``TZPATH``, and the
presence of the ``tzdata`` package.
Warning
-------
This function may open a large number of files, since the first few bytes
of timezone files must be read to determine if they are valid.
Note
----
This function behaves similarly to :func:`zoneinfo.available_timezones`,
which means it ignores the "special" zones (e.g. posixrules, right/posix, etc.)
It should give the same result as :func:`zoneinfo.available_timezones`,
unless ``whenever`` was configured to use a different tzpath
using :func:`reset_tzpath`.
"""
zones: set[str] = set()
# Get the zones from the tzdata package, if available
try:
# NOTE: we don't use importlib.resources here,
# to keep our imports lighter
tzdata = __import__("tzdata").__path__[0]
with open(os.path.join(tzdata, "zones")) as f:
zones.update(map(str.strip, f))
# coverage note: we *do* test tzdata and non-tzdata installs in CI
except (ImportError, FileNotFoundError): # pragma: no cover
pass
# Get the zones from the tzpath directories
for base in TZPATH:
zones.update(_find_all_tznames(base))
zones.discard("posixrules") # a special file that shouldn't be included
return zones
# Recursively find all tzfiles in the tzpath directories.
# Recursion is safe here since the file tree is trusted, and nesting doesn't
# even approach the recursion limit.
# NOTE: we don't use pathlib here, since we want to keep our imports light
def _find_all_tznames(base: str) -> Iterator[str]:
if not os.path.isdir(base):
return
for name in os.listdir(base):
entry = os.path.join(base, name)
if os.path.isdir(entry):
# FUTURE: expand test coverage for this
if name in ("right", "posix"): # pragma: no cover
# These directories contain special files that shouldn't be included
continue
else:
for path in _find_nested_tzfiles(entry):
yield os.path.relpath(path, base).replace("\\", "/")
elif _is_tzifile(entry):
yield name
def _find_nested_tzfiles(path: str) -> Iterator[str]:
assert os.path.isdir(path)
for name in os.listdir(path):
entry = os.path.join(path, name)
if os.path.isdir(entry):
yield from _find_nested_tzfiles(entry)
elif _is_tzifile(entry):
yield entry
def _is_tzifile(p: str) -> bool:
"""Check if the file is a tzifile."""
try:
with open(p, "rb") as f:
return f.read(4) == b"TZif"
except OSError: # pragma: no cover
return False
@no_type_check
def _pydantic_parse(cls: type, v: object) -> object:
# exact type comparison is OK: whenever types don't allow subclassing
if type(v) is cls:
return v
# whenever also doesn't allow string subclasses
elif type(v) is str:
return cls.parse_iso(v)
else:
raise ValueError(f"Cannot parse {cls.__name__} from type {type(v)}")
@no_type_check
def pydantic_schema(cls):
from pydantic_core import core_schema
return core_schema.json_or_python_schema(
# NOTE: We can't use no_info_plain_validator_function here, because
# this breaks JSON schema generation...but only when used with the
# "serialization" mode for some reason...
json_schema=core_schema.no_info_after_validator_function(
cls.parse_iso,
core_schema.str_schema(strict=True),
serialization=core_schema.to_string_ser_schema(),
),
python_schema=core_schema.no_info_plain_validator_function(
partial(_pydantic_parse, cls),
# NOTE: not setting serializer here somehow breaks the JSON schema
# generation when defaults are present...yeah...
serialization=core_schema.to_string_ser_schema(),
),
)