Commit d5a7fe72 authored by M. Huang's avatar M. Huang
Browse files

add test leapseconds, fix before 1972

parent cf013035
......@@ -30,7 +30,7 @@ from collections import namedtuple
from datetime import datetime, timedelta, timezone
from struct import Struct
from warnings import warn
from functools import partial
from functools import partial, lru_cache
__all__ = ['leapseconds', 'LeapSecond',
'dTAI_UTC_from_utc', 'dTAI_UTC_from_tai',
......@@ -99,8 +99,11 @@ def leapseconds(tzfiles=['/usr/share/zoneinfo/right/UTC',
(magic, version, _, _, leapcnt, timecnt, typecnt,
charcnt) = header.unpack_from(file.read(header.size))
if magic != "TZif".encode():
raise ValueError('Wrong magic %r in tzfile: %s' % (
magic, file.name))
if not use_fallback:
raise ValueError('Wrong magic %r in tzfile: %s' % (
magic, file.name))
else:
return _fallback()
if version not in '\x0023'.encode():
warn('Unsupported version %r in tzfile: %s' % (
version, file.name), RuntimeWarning)
......@@ -128,7 +131,8 @@ def leapseconds(tzfiles=['/usr/share/zoneinfo/right/UTC',
"""
file.read(timecnt * 5 + typecnt * 6 + charcnt) # skip
result = [LeapSecond(datetime(1972, 1, 1, tzinfo=timezone.utc), timedelta(seconds=10))]
result = [LeapSecond(
datetime(1972, 1, 1, tzinfo=timezone.utc), timedelta(seconds=10))]
nleap_seconds = 10
tai_epoch_as_tai = datetime(1970, 1, 1, 0, 0, 10, tzinfo=timezone.utc)
buf = Struct(">2i")
......@@ -136,12 +140,14 @@ def leapseconds(tzfiles=['/usr/share/zoneinfo/right/UTC',
t, cnt = buf.unpack_from(file.read(buf.size))
dTAI_UTC = nleap_seconds + cnt
utc = tai_epoch_as_tai + timedelta(seconds=t - dTAI_UTC + 1)
assert utc - datetime(utc.year, utc.month, utc.day, tzinfo=timezone.utc) == timedelta(0)
assert utc - datetime(utc.year, utc.month,
utc.day, tzinfo=timezone.utc) == timedelta(0)
result.append(LeapSecond(utc, timedelta(seconds=dTAI_UTC)))
result.append(sentinel)
return result
@lru_cache(maxsize=8)
def _fallback():
"""Leap seconds list if no tzfiles are available."""
td = timedelta
......@@ -228,8 +234,12 @@ def _dTAI_UTC(time, leapsecond_to_time, leapseconds=leapseconds):
leapseconds_list = leapseconds()
transition_times = list(map(leapsecond_to_time, leapseconds_list))
if time < transition_times[0]:
raise ValueError("Dates before %s are not supported, got %r" % (
transition_times[0], time))
# try fallback first
leapseconds_list = _fallback()
transition_times = list(map(leapsecond_to_time, leapseconds_list))
if time < transition_times[0]:
raise ValueError("Dates before %s are not supported, got %r" % (
transition_times[0], time))
for i, (start, end) in enumerate(zip(transition_times,
transition_times[1:])):
if start <= time < end:
......
# -*- coding: utf-8 -*-
from fdi.utils.leapseconds import utc_to_tai, tai_to_utc, dTAI_UTC_from_utc, _fallback
from fdi.dataset.eq import deepcmp
from fdi.dataset.metadata import make_jsonable
from fdi.dataset.datatypes import Vector, Quaternion
......@@ -15,6 +16,7 @@ from fdi.utils.fetch import fetch
import traceback
import copy
from datetime import timezone, timedelta, datetime
import sys
import os
import os.path
......@@ -346,3 +348,24 @@ def test_getConfig_conf(getConfig):
check_conf(cp, typ, getConfig)
# non-existing. the file has been deleted by the check_conf in the last line
w = getConfig(conf=typ)
def test_leapseconds():
t0 = datetime(2019, 2, 19, 1, 2, 3, 456789, tzinfo=timezone.utc)
assert dTAI_UTC_from_utc(t0) == timedelta(seconds=37)
# the above just means ...
assert utc_to_tai(t0) - t0 == timedelta(seconds=37)
t1 = datetime(1972, 1, 1, 0, 0, 0, 000000, tzinfo=timezone.utc)
assert dTAI_UTC_from_utc(t1) == timedelta(seconds=10)
t2 = datetime(1970, 1, 1, 0, 0, 0, 000000, tzinfo=timezone.utc)
# interpolation not implemented
assert dTAI_UTC_from_utc(t2) == timedelta(seconds=4.213170)
t3 = datetime(1968, 2, 1, 0, 0, 0, 000000, tzinfo=timezone.utc)
assert dTAI_UTC_from_utc(t2) == timedelta(seconds=4.213170)
t1958 = datetime(1958, 1, 1, 0, 0, 0, 0, tzinfo=timezone.utc)
assert dTAI_UTC_from_utc(t1958) == timedelta(seconds=0)
# leap seconds is added on transition
t4 = datetime(2017, 1, 1, 0, 0, 0, 000000, tzinfo=timezone.utc)
assert utc_to_tai(t4) - utc_to_tai(t4 - timedelta(seconds=1)) == \
timedelta(seconds=2)
print(_fallback.cache_info())
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment