diff --git a/poetry.lock b/poetry.lock index 15c39006..4a2416af 100644 --- a/poetry.lock +++ b/poetry.lock @@ -275,9 +275,14 @@ version = "0.2.2" description = "Hightime Python API" optional = false python-versions = "*" -files = [ - {file = "hightime-0.2.2-py3-none-any.whl", hash = "sha256:5109a449bb3a75dbf305147777de71634c91b943d47cfbee18ed2f34a8307e0b"}, -] +files = [] +develop = false + +[package.source] +type = "git" +url = "https://github.com/ni/hightime.git" +reference = "HEAD" +resolved_reference = "2cde13a89c443dcfbdbf3d00b483b62d7a3e9fc8" [[package]] name = "iniconfig" @@ -968,4 +973,4 @@ files = [ [metadata] lock-version = "2.0" python-versions = "^3.9" -content-hash = "40bb6c7868636a199d0fe96cc3879155eb23cb6b837e68a2c82ee136e7433439" +content-hash = "c366b73f5478503e15e680172753359d476906c979e10791b88e273e8ba65761" diff --git a/pyproject.toml b/pyproject.toml index 0f432636..236b1215 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -14,7 +14,8 @@ numpy = [ {version = ">=1.26", python = ">=3.12,<3.13"}, {version = ">=2.1", python = "^3.13"}, ] -hightime = "^0.2.2" +# hightime = "^0.2.2" +hightime = { git = "https://github.com/ni/hightime.git" } [tool.poetry.group.lint.dependencies] bandit = { version = ">=1.7", extras = ["toml"] } @@ -43,13 +44,6 @@ plugins = "numpy.typing.mypy_plugin" namespace_packages = true strict = true -[[tool.mypy.overrides]] -module = [ - # https://github.com/ni/hightime/issues/4 - Add type annotations - "hightime.*", -] -ignore_missing_imports = true - [tool.bandit] skips = [ "B101", # assert_used diff --git a/src/nitypes/time/__init__.py b/src/nitypes/time/__init__.py new file mode 100644 index 00000000..9c3475c1 --- /dev/null +++ b/src/nitypes/time/__init__.py @@ -0,0 +1,5 @@ +"""Time data types for NI Python APIs.""" + +from nitypes.time._conversion import convert_datetime, convert_timedelta + +__all__ = ["convert_datetime", "convert_timedelta"] diff --git a/src/nitypes/time/_conversion.py b/src/nitypes/time/_conversion.py new file mode 100644 index 00000000..ad54c2cf --- /dev/null +++ b/src/nitypes/time/_conversion.py @@ -0,0 +1,136 @@ +from __future__ import annotations + +import datetime as dt +import sys +from collections.abc import Callable +from functools import singledispatch +from typing import Any, TypeVar, Union, cast + +import hightime as ht + +if sys.version_info >= (3, 10): + from typing import TypeAlias +else: + from typing_extensions import TypeAlias + +_AnyDateTime: TypeAlias = Union[dt.datetime, ht.datetime] +_TDateTime = TypeVar("_TDateTime", dt.datetime, ht.datetime) + +_AnyTimeDelta: TypeAlias = Union[dt.timedelta, ht.timedelta] +_TTimeDelta = TypeVar("_TTimeDelta", dt.timedelta, ht.timedelta) + + +def convert_datetime(requested_type: type[_TDateTime], value: _AnyDateTime, /) -> _TDateTime: + """Convert a datetime object to the specified type.""" + convert_func = _CONVERT_DATETIME_FOR_TYPE.get(requested_type) + if convert_func is None: + raise TypeError( + "The requested type must be a datetime type.\n\n" f"Requested type: {requested_type}" + ) + return cast(_TDateTime, convert_func(value)) + + +@singledispatch +def _convert_to_dt_datetime(value: object, /) -> dt.datetime: + raise TypeError("The value must be a datetime.\n\n" f"Provided value: {value}") + + +@_convert_to_dt_datetime.register +def _(value: dt.datetime, /) -> dt.datetime: + return value + + +@_convert_to_dt_datetime.register +def _(value: ht.datetime, /) -> dt.datetime: + return dt.datetime( + value.year, + value.month, + value.day, + value.hour, + value.minute, + value.second, + value.microsecond, + value.tzinfo, + fold=value.fold, + ) + + +@singledispatch +def _convert_to_ht_datetime(value: object, /) -> ht.datetime: + raise TypeError("The value must be a datetime.\n\n" f"Provided value: {value}") + + +@_convert_to_ht_datetime.register +def _(value: dt.datetime, /) -> ht.datetime: + return ht.datetime( + value.year, + value.month, + value.day, + value.hour, + value.minute, + value.second, + value.microsecond, + value.tzinfo, + fold=value.fold, + ) + + +@_convert_to_ht_datetime.register +def _(value: ht.datetime, /) -> ht.datetime: + return value + + +_CONVERT_DATETIME_FOR_TYPE: dict[type[Any], Callable[[object], object]] = { + dt.datetime: _convert_to_dt_datetime, + ht.datetime: _convert_to_ht_datetime, +} + + +def convert_timedelta(requested_type: type[_TTimeDelta], value: _AnyTimeDelta, /) -> _TTimeDelta: + """Convert a timedelta object to the specified type.""" + convert_func = _CONVERT_TIMEDELTA_FOR_TYPE.get(requested_type) + if convert_func is None: + raise TypeError( + "The requested type must be a timedelta type.\n\n" f"Requested type: {requested_type}" + ) + return cast(_TTimeDelta, convert_func(value)) + + +@singledispatch +def _convert_to_dt_timedelta(value: object, /) -> dt.timedelta: + raise TypeError("The value must be a timedelta.\n\n" f"Provided value: {value}") + + +@_convert_to_dt_timedelta.register +def _(value: dt.timedelta, /) -> dt.timedelta: + return value + + +@_convert_to_dt_timedelta.register +def _(value: ht.timedelta, /) -> dt.timedelta: + return dt.timedelta(value.days, value.seconds, value.microseconds) + + +@singledispatch +def _convert_to_ht_timedelta(value: object, /) -> ht.timedelta: + raise TypeError("The value must be a timedelta.\n\n" f"Provided value: {value}") + + +@_convert_to_ht_timedelta.register +def _(value: dt.timedelta, /) -> ht.timedelta: + return ht.timedelta( + value.days, + value.seconds, + value.microseconds, + ) + + +@_convert_to_ht_timedelta.register +def _(value: ht.timedelta, /) -> ht.timedelta: + return value + + +_CONVERT_TIMEDELTA_FOR_TYPE: dict[type[Any], Callable[[object], object]] = { + dt.timedelta: _convert_to_dt_timedelta, + ht.timedelta: _convert_to_ht_timedelta, +} diff --git a/src/nitypes/waveform/__init__.py b/src/nitypes/waveform/__init__.py index 14c6731a..64f49953 100644 --- a/src/nitypes/waveform/__init__.py +++ b/src/nitypes/waveform/__init__.py @@ -5,9 +5,25 @@ ExtendedPropertyDictionary, ExtendedPropertyValue, ) +from nitypes.waveform._timing._base import BaseTiming, SampleIntervalMode +from nitypes.waveform._timing._precision import PrecisionTiming +from nitypes.waveform._timing._standard import Timing __all__ = [ "AnalogWaveform", + "BaseTiming", "ExtendedPropertyDictionary", "ExtendedPropertyValue", + "PrecisionTiming", + "SampleIntervalMode", + "Timing", ] + +# Hide that it was defined in a helper file +AnalogWaveform.__module__ = __name__ +BaseTiming.__module__ = __name__ +ExtendedPropertyDictionary.__module__ = __name__ +# ExtendedPropertyValue is a TypeAlias +PrecisionTiming.__module__ = __name__ +SampleIntervalMode.__module__ = __name__ +Timing.__module__ = __name__ diff --git a/src/nitypes/waveform/_analog_waveform.py b/src/nitypes/waveform/_analog_waveform.py index 816048db..3fc785ce 100644 --- a/src/nitypes/waveform/_analog_waveform.py +++ b/src/nitypes/waveform/_analog_waveform.py @@ -12,6 +12,9 @@ UNIT_DESCRIPTION, ExtendedPropertyDictionary, ) +from nitypes.waveform._timing._conversion import convert_timing +from nitypes.waveform._timing._precision import PrecisionTiming +from nitypes.waveform._timing._standard import Timing from nitypes.waveform._utils import arg_to_uint, validate_dtype if sys.version_info < (3, 10): @@ -211,12 +214,22 @@ def from_array_2d( for i in range(len(array)) ] - __slots__ = ["_data", "_start_index", "_sample_count", "_extended_properties", "__weakref__"] + __slots__ = [ + "_data", + "_start_index", + "_sample_count", + "_extended_properties", + "_timing", + "_precision_timing", + "__weakref__", + ] _data: npt.NDArray[_ScalarType_co] _start_index: int _sample_count: int _extended_properties: ExtendedPropertyDictionary + _timing: Timing | None + _precision_timing: PrecisionTiming | None # If neither dtype nor _data is specified, the type parameter defaults to np.float64. @overload @@ -332,6 +345,8 @@ def _init_with_new_array( self._start_index = start_index self._sample_count = sample_count self._extended_properties = ExtendedPropertyDictionary() + self._timing = Timing.empty + self._precision_timing = None def _init_with_provided_array( self, @@ -384,6 +399,8 @@ def _init_with_provided_array( self._start_index = start_index self._sample_count = sample_count self._extended_properties = ExtendedPropertyDictionary() + self._timing = Timing.empty + self._precision_timing = None @property def raw_data(self) -> npt.NDArray[_ScalarType_co]: @@ -464,3 +481,62 @@ def unit_description(self, value: str) -> None: "The unit description must be a str.\n\n" f"Unit description: {value!r}" ) self._extended_properties[UNIT_DESCRIPTION] = value + + @property + def timing(self) -> Timing: + """The timing information of the analog waveform. + + The default value is Timing.empty. + """ + if self._timing is None: + if self._precision_timing is PrecisionTiming.empty: + self._timing = Timing.empty + elif self._precision_timing is not None: + self._timing = convert_timing(Timing, self._precision_timing) + else: + raise RuntimeError("The waveform has no timing information.") + return self._timing + + @timing.setter + def timing(self, value: Timing) -> None: + if not isinstance(value, Timing): + raise TypeError("The timing information must be a Timing object.") + self._timing = value + self._precision_timing = None + + @property + def is_precision_timing_initialized(self) -> bool: + """Indicates whether the waveform's precision timing information is initialized.""" + return self._precision_timing is not None + + @property + def precision_timing(self) -> PrecisionTiming: + """The precision timing information of the analog waveform. + + The default value is PrecisionTiming.empty. + + Use AnalogWaveform.precision_timing instead of AnalogWaveform.timing to obtain timing + information with higher precision than AnalogWaveform.timing. If the timing information is + set using AnalogWaveform.precision_timing, then this property returns timing information + with up to yoctosecond precision. If the timing information is set using + AnalogWaveform.timing, then the timing information returned has up to microsecond precision. + + Accessing this property can potentially decrease performance if the timing information is + set using AnalogWaveform.timing. Use AnalogWaveform.is_precision_timing_initialized to + determine if AnalogWaveform.precision_timing has been initialized. + """ + if self._precision_timing is None: + if self._timing is Timing.empty: + self._precision_timing = PrecisionTiming.empty + elif self._timing is not None: + self._precision_timing = convert_timing(PrecisionTiming, self._timing) + else: + raise RuntimeError("The waveform has no timing information.") + return self._precision_timing + + @precision_timing.setter + def precision_timing(self, value: PrecisionTiming) -> None: + if not isinstance(value, PrecisionTiming): + raise TypeError("The precision timing information must be a PrecisionTiming object.") + self._precision_timing = value + self._timing = None diff --git a/src/nitypes/waveform/_timing/__init__.py b/src/nitypes/waveform/_timing/__init__.py new file mode 100644 index 00000000..527b989b --- /dev/null +++ b/src/nitypes/waveform/_timing/__init__.py @@ -0,0 +1 @@ +"""Waveform timing data types for NI Python APIs.""" diff --git a/src/nitypes/waveform/_timing/_base.py b/src/nitypes/waveform/_timing/_base.py new file mode 100644 index 00000000..1632e779 --- /dev/null +++ b/src/nitypes/waveform/_timing/_base.py @@ -0,0 +1,282 @@ +from __future__ import annotations + +import datetime as dt +import operator +from abc import ABC, abstractmethod +from collections.abc import Generator, Iterable, Sequence +from enum import Enum +from typing import Generic, SupportsIndex, TypeVar + +from nitypes.waveform._utils import add_note + + +class SampleIntervalMode(Enum): + """The sample interval mode that specifies how the waveform is sampled.""" + + NONE = 0 + """No sample interval.""" + + REGULAR = 1 + """Regular sample interval.""" + + IRREGULAR = 2 + """Irregular sample interval.""" + + +# TODO: should these be constrained types? I guess we'll find out when we add NI-BTF types. +_TDateTime_co = TypeVar("_TDateTime_co", bound=dt.datetime) +_TTimeDelta_co = TypeVar("_TTimeDelta_co", bound=dt.timedelta) + + +def _validate_unsupported_arg(arg_description: str, value: object) -> None: + if value is not None: + raise ValueError( + f"The {arg_description} argument is not supported.\n\n" f"Provided value: {value}" + ) + + +class BaseTiming(ABC, Generic[_TDateTime_co, _TTimeDelta_co]): + """Base class for waveform timing information.""" + + @staticmethod + @abstractmethod + def _get_datetime_type() -> type[_TDateTime_co]: + raise NotImplementedError() + + @staticmethod + @abstractmethod + def _get_timedelta_type() -> type[_TTimeDelta_co]: + raise NotImplementedError() + + @staticmethod + @abstractmethod + def _get_default_time_offset() -> _TTimeDelta_co: + raise NotImplementedError() + + __slots__ = [ + "_sample_interval_mode", + "_timestamp", + "_time_offset", + "_sample_interval", + "_timestamps", + "__weakref__", + ] + + _sample_interval_mode: SampleIntervalMode + _timestamp: _TDateTime_co | None + _time_offset: _TTimeDelta_co | None + _sample_interval: _TTimeDelta_co | None + _timestamps: list[_TDateTime_co] | None + + def __init__( + self, + sample_interval_mode: SampleIntervalMode, + timestamp: _TDateTime_co | None, + time_offset: _TTimeDelta_co | None, + sample_interval: _TTimeDelta_co | None, + timestamps: Sequence[_TDateTime_co] | None, + ) -> None: + """Construct a base waveform timing object.""" + self._validate_init_args( + sample_interval_mode, timestamp, time_offset, sample_interval, timestamps + ) + + if timestamps is not None and not isinstance(timestamps, list): + timestamps = list(timestamps) + + self._sample_interval_mode = sample_interval_mode + self._timestamp = timestamp + self._time_offset = time_offset + self._sample_interval = sample_interval + self._timestamps = timestamps + + def _validate_init_args( + self, + sample_interval_mode: SampleIntervalMode, + timestamp: _TDateTime_co | None, + time_offset: _TTimeDelta_co | None, + sample_interval: _TTimeDelta_co | None, + timestamps: Sequence[_TDateTime_co] | None, + ) -> None: + validate_func = self.__class__._VALIDATE_INIT_ARGS_FOR_MODE.get(sample_interval_mode) + if validate_func is None: + raise ValueError(f"Unsupported sample interval mode {sample_interval_mode}.") + + try: + validate_func(self, timestamp, time_offset, sample_interval, timestamps) + except (TypeError, ValueError) as e: + add_note(e, f"Sample interval mode: {sample_interval_mode}") + raise + + def _validate_init_args_none( + self, + timestamp: _TDateTime_co | None, + time_offset: _TTimeDelta_co | None, + sample_interval: _TTimeDelta_co | None, + timestamps: Sequence[_TDateTime_co] | None, + ) -> None: + datetime_type = self.__class__._get_datetime_type() + timedelta_type = self.__class__._get_timedelta_type() + if not isinstance(timestamp, (datetime_type, type(None))): + raise TypeError( + "The timestamp must be a datetime or None.\n\n" f"Provided value: {timestamp}" + ) + if not isinstance(time_offset, (timedelta_type, type(None))): + raise TypeError( + f"The time offset must be a timedelta or None.\n\n" f"Provided value: {time_offset}" + ) + _validate_unsupported_arg("sample interval", sample_interval) + _validate_unsupported_arg("timestamps", timestamps) + + def _validate_init_args_regular( + self, + timestamp: _TDateTime_co | None, + time_offset: _TTimeDelta_co | None, + sample_interval: _TTimeDelta_co | None, + timestamps: Sequence[_TDateTime_co] | None, + ) -> None: + datetime_type = self.__class__._get_datetime_type() + timedelta_type = self.__class__._get_timedelta_type() + if not isinstance(timestamp, (datetime_type, type(None))): + raise TypeError( + "The timestamp must be a datetime or None.\n\n" f"Provided value: {timestamp}" + ) + if not isinstance(time_offset, (timedelta_type, type(None))): + raise TypeError( + f"The time offset must be a timedelta or None.\n\n" f"Provided value: {time_offset}" + ) + if not isinstance(sample_interval, timedelta_type): + raise TypeError( + "The sample interval must be a timedelta.\n\n" f"Provided value: {sample_interval}" + ) + _validate_unsupported_arg("timestamps", timestamps) + + def _validate_init_args_irregular( + self, + timestamp: _TDateTime_co | None, + time_offset: _TTimeDelta_co | None, + sample_interval: _TTimeDelta_co | None, + timestamps: Sequence[_TDateTime_co] | None, + ) -> None: + datetime_type = self.__class__._get_datetime_type() + _validate_unsupported_arg("timestamp", timestamp) + _validate_unsupported_arg("time offset", time_offset) + _validate_unsupported_arg("sample interval", sample_interval) + if not isinstance(timestamps, Sequence) or not all( + isinstance(ts, datetime_type) for ts in timestamps + ): + raise TypeError( + "The timestamps must be a sequence of datetime objects.\n\n" + f"Provided value: {timestamps}" + ) + + _VALIDATE_INIT_ARGS_FOR_MODE = { + SampleIntervalMode.NONE: _validate_init_args_none, + SampleIntervalMode.REGULAR: _validate_init_args_regular, + SampleIntervalMode.IRREGULAR: _validate_init_args_irregular, + } + + @property + def has_timestamp(self) -> bool: + """Indicates whether the waveform timing has a timestamp.""" + return self._timestamp is not None + + @property + def timestamp(self) -> _TDateTime_co: + """A timestamp representing the start of an acquisition or a related occurrence.""" + value = self._timestamp + if value is None: + raise RuntimeError("The waveform timing does not have a timestamp.") + return value + + @property + def start_time(self) -> _TDateTime_co: + """The time that the first sample in the waveform was acquired.""" + return self.timestamp + self.time_offset + + @property + def time_offset(self) -> _TTimeDelta_co: + """The time difference between the timestamp and the first sample.""" + value = self._time_offset + if value is None: + return self.__class__._get_default_time_offset() + return value + + @property + def sample_interval(self) -> _TTimeDelta_co: + """The time interval between samples.""" + value = self._sample_interval + if value is None: + raise RuntimeError("The waveform timing does not have a sample interval.") + return value + + @property + def sample_interval_mode(self) -> SampleIntervalMode: + """The sample interval mode that specifies how the waveform is sampled.""" + return self._sample_interval_mode + + def get_timestamps( + self, start_index: SupportsIndex, count: SupportsIndex + ) -> Iterable[_TDateTime_co]: + """Retrieve the timestamps of the waveform samples. + + Args: + start_index: The sample index of the first timestamp to retrieve. + count: The number of timestamps to retrieve. + + Returns: + An iterable containing the requested timestamps. + """ + start_index = operator.index(start_index) + count = operator.index(count) + + if start_index < 0: + raise ValueError("The sample index must be a non-negative integer.") + if count < 0: + raise ValueError("The count must be a non-negative integer.") + + if self._sample_interval_mode == SampleIntervalMode.REGULAR and self.has_timestamp: + return self._generate_regular_timestamps(start_index, count) + elif self._sample_interval_mode == SampleIntervalMode.IRREGULAR: + assert self._timestamps is not None + if count > len(self._timestamps): + raise ValueError("The count must be less or equal to the number of timestamps.") + return self._timestamps[start_index : start_index + count] + else: + raise RuntimeError( + "The waveform timing does not have valid timestamp information. To obtain timestamps, the waveform must be irregular or must be initialized with a valid time stamp and sample interval." + ) + + def _generate_regular_timestamps( + self, start_index: int, count: int + ) -> Generator[_TDateTime_co]: + sample_interval = self.sample_interval + timestamp = self.start_time + start_index * sample_interval + for i in range(count): + if i != 0: + timestamp += sample_interval + yield timestamp + + def __eq__(self, value: object) -> bool: # noqa: D105 - Missing docstring in magic method + if not isinstance(value, self.__class__): + return NotImplemented + return ( + self._timestamp == value._timestamp + and self._time_offset == value._time_offset + and self._sample_interval == value._sample_interval + and self._sample_interval_mode == value._sample_interval_mode + and self._timestamps == value._timestamps + ) + + def __repr__(self) -> str: # noqa: D105 - Missing docstring in magic method + # For Enum, __str__ is an unqualified ctor expression like E.V and __repr__ is . + args = [f"{self.sample_interval_mode.__class__.__module__}.{self.sample_interval_mode}"] + if self._timestamp is not None: + args.append(f"timestamp={self._timestamp!r}") + if self._time_offset is not None: + args.append(f"time_offset={self._time_offset!r}") + if self._sample_interval is not None: + args.append(f"sample_interval={self._sample_interval!r}") + if self._timestamps is not None: + args.append(f"timestamps={self._timestamps!r}") + return f"{self.__class__.__module__}.{self.__class__.__name__}({', '.join(args)})" diff --git a/src/nitypes/waveform/_timing/_conversion.py b/src/nitypes/waveform/_timing/_conversion.py new file mode 100644 index 00000000..22ea0c96 --- /dev/null +++ b/src/nitypes/waveform/_timing/_conversion.py @@ -0,0 +1,104 @@ +from __future__ import annotations + +import datetime as dt +import sys +from collections.abc import Callable +from functools import singledispatch +from typing import Any, TypeVar, Union, cast + +import hightime as ht + +from nitypes.time._conversion import convert_datetime, convert_timedelta +from nitypes.waveform._timing._precision import PrecisionTiming +from nitypes.waveform._timing._standard import Timing + +if sys.version_info >= (3, 10): + from typing import TypeAlias +else: + from typing_extensions import TypeAlias + +_AnyTiming: TypeAlias = Union[Timing, PrecisionTiming] +_TTiming = TypeVar("_TTiming", Timing, PrecisionTiming) + + +def convert_timing(requested_type: type[_TTiming], value: _AnyTiming, /) -> _TTiming: + """Convert a waveform timing object to the specified type.""" + convert_func = _CONVERT_TIMING_FOR_TYPE.get(requested_type) + if convert_func is None: + raise TypeError( + "The requested type must be a waveform timing type.\n\n" + f"Requested type: {requested_type}" + ) + return cast(_TTiming, convert_func(value)) + + +@singledispatch +def _convert_to_standard_timing(value: object, /) -> Timing: + raise TypeError("The value must be a waveform timing object.\n\n" f"Provided value: {value}") + + +@_convert_to_standard_timing.register +def _(value: Timing, /) -> Timing: + return value + + +@_convert_to_standard_timing.register +def _(value: PrecisionTiming, /) -> Timing: + return Timing( + value._sample_interval_mode, + None if value._timestamp is None else convert_datetime(dt.datetime, value._timestamp), + ( + None + if value._time_offset is None + else convert_timedelta(dt.timedelta, value._time_offset) + ), + ( + None + if value._sample_interval is None + else convert_timedelta(dt.timedelta, value._sample_interval) + ), + ( + None + if value._timestamps is None + else [convert_datetime(dt.datetime, ts) for ts in value._timestamps] + ), + ) + + +@singledispatch +def _convert_to_precision_timing(value: object, /) -> PrecisionTiming: + raise TypeError("The value must be a waveform timing object.\n\n" f"Provided value: {value}") + + +@_convert_to_precision_timing.register +def _(value: Timing, /) -> PrecisionTiming: + return PrecisionTiming( + value._sample_interval_mode, + None if value._timestamp is None else convert_datetime(ht.datetime, value._timestamp), + ( + None + if value._time_offset is None + else convert_timedelta(ht.timedelta, value._time_offset) + ), + ( + None + if value._sample_interval is None + else convert_timedelta(ht.timedelta, value._sample_interval) + ), + ( + None + if value._timestamps is None + else [convert_datetime(ht.datetime, ts) for ts in value._timestamps] + ), + ) + + +@_convert_to_precision_timing.register +def _(value: PrecisionTiming, /) -> PrecisionTiming: + return value + + +_CONVERT_TIMING_FOR_TYPE: dict[type[Any], Callable[[object], object]] = { + Timing: _convert_to_standard_timing, + PrecisionTiming: _convert_to_precision_timing, +} diff --git a/src/nitypes/waveform/_timing/_precision.py b/src/nitypes/waveform/_timing/_precision.py new file mode 100644 index 00000000..dd8a5f64 --- /dev/null +++ b/src/nitypes/waveform/_timing/_precision.py @@ -0,0 +1,104 @@ +from __future__ import annotations + +from collections.abc import Sequence +from typing import ClassVar + +import hightime as ht + +from nitypes.waveform._timing._base import BaseTiming, SampleIntervalMode + + +class PrecisionTiming(BaseTiming[ht.datetime, ht.timedelta]): + """High-precision waveform timing using the hightime package. + + The hightime package has up to yoctosecond precision. + """ + + _DEFAULT_TIME_OFFSET = ht.timedelta() + + empty: ClassVar[PrecisionTiming] + + @staticmethod + def create_with_no_interval( + timestamp: ht.datetime | None = None, time_offset: ht.timedelta | None = None + ) -> PrecisionTiming: + """Create a waveform timing object with no sample interval. + + Args: + timestamp: A timestamp representing the start of an acquisition or a related + occurrence. + time_offset: The time difference between the timestamp and the time that the first + sample was acquired. + + Returns: + A waveform timing object. + """ + return PrecisionTiming(SampleIntervalMode.NONE, timestamp, time_offset) + + @staticmethod + def create_with_regular_interval( + sample_interval: ht.timedelta, + timestamp: ht.datetime | None = None, + time_offset: ht.timedelta | None = None, + ) -> PrecisionTiming: + """Create a waveform timing object with a regular sample interval. + + Args: + sample_interval: The time difference between samples. + timestamp: A timestamp representing the start of an acquisition or a related + occurrence. + time_offset: The time difference between the timestamp and the time that the first + sample was acquired. + + Returns: + A waveform timing object. + """ + return PrecisionTiming(SampleIntervalMode.REGULAR, timestamp, time_offset, sample_interval) + + @staticmethod + def create_with_irregular_interval( + timestamps: Sequence[ht.datetime], + ) -> PrecisionTiming: + """Create a waveform timing object with an irregular sample interval. + + Args: + timestamps: A sequence containing a timestamp for each sample in the waveform, + specifying the time that the sample was acquired. + + Returns: + A waveform timing object. + """ + return PrecisionTiming(SampleIntervalMode.IRREGULAR, timestamps=timestamps) + + @staticmethod + def _get_datetime_type() -> type[ht.datetime]: + return ht.datetime + + @staticmethod + def _get_timedelta_type() -> type[ht.timedelta]: + return ht.timedelta + + @staticmethod + def _get_default_time_offset() -> ht.timedelta: + return PrecisionTiming._DEFAULT_TIME_OFFSET + + def __init__( + self, + sample_interval_mode: SampleIntervalMode, + timestamp: ht.datetime | None = None, + time_offset: ht.timedelta | None = None, + sample_interval: ht.timedelta | None = None, + timestamps: Sequence[ht.datetime] | None = None, + ) -> None: + """Construct a high-precision waveform timing object. + + Most applications should use the named constructors instead: + - PrecisionTiming.create_with_no_interval + - PrecisionTiming.create_with_regular_interval + - PrecisionTiming.create_with_irregular_interval + """ + super().__init__(sample_interval_mode, timestamp, time_offset, sample_interval, timestamps) + + +PrecisionTiming.empty = PrecisionTiming.create_with_no_interval() +"""A waveform timing object with no timestamp, time offset, or sample interval.""" diff --git a/src/nitypes/waveform/_timing/_standard.py b/src/nitypes/waveform/_timing/_standard.py new file mode 100644 index 00000000..8f7e918f --- /dev/null +++ b/src/nitypes/waveform/_timing/_standard.py @@ -0,0 +1,105 @@ +from __future__ import annotations + +import datetime as dt +from collections.abc import Sequence +from typing import ClassVar + +from nitypes.waveform._timing._base import BaseTiming, SampleIntervalMode + + +class Timing(BaseTiming[dt.datetime, dt.timedelta]): + """Waveform timing using the standard datetime module. + + The standard datetime module has up to microsecond precision. For higher precision, use + PrecisionTiming. + """ + + _DEFAULT_TIME_OFFSET = dt.timedelta() + + empty: ClassVar[Timing] + + # TODO: can these be classmethods in BaseTiming? + @staticmethod + def create_with_no_interval( + timestamp: dt.datetime | None = None, time_offset: dt.timedelta | None = None + ) -> Timing: + """Create a waveform timing object with no sample interval. + + Args: + timestamp: A timestamp representing the start of an acquisition or a related + occurrence. + time_offset: The time difference between the timestamp and the time that the first + sample was acquired. + + Returns: + A waveform timing object. + """ + return Timing(SampleIntervalMode.NONE, timestamp, time_offset) + + @staticmethod + def create_with_regular_interval( + sample_interval: dt.timedelta, + timestamp: dt.datetime | None = None, + time_offset: dt.timedelta | None = None, + ) -> Timing: + """Create a waveform timing object with a regular sample interval. + + Args: + sample_interval: The time difference between samples. + timestamp: A timestamp representing the start of an acquisition or a related + occurrence. + time_offset: The time difference between the timestamp and the time that the first + sample was acquired. + + Returns: + A waveform timing object. + """ + return Timing(SampleIntervalMode.REGULAR, timestamp, time_offset, sample_interval) + + @staticmethod + def create_with_irregular_interval( + timestamps: Sequence[dt.datetime], + ) -> Timing: + """Create a waveform timing object with an irregular sample interval. + + Args: + timestamps: A sequence containing a timestamp for each sample in the waveform, + specifying the time that the sample was acquired. + + Returns: + A waveform timing object. + """ + return Timing(SampleIntervalMode.IRREGULAR, timestamps=timestamps) + + @staticmethod + def _get_datetime_type() -> type[dt.datetime]: + return dt.datetime + + @staticmethod + def _get_timedelta_type() -> type[dt.timedelta]: + return dt.timedelta + + @staticmethod + def _get_default_time_offset() -> dt.timedelta: + return Timing._DEFAULT_TIME_OFFSET + + def __init__( + self, + sample_interval_mode: SampleIntervalMode, + timestamp: dt.datetime | None = None, + time_offset: dt.timedelta | None = None, + sample_interval: dt.timedelta | None = None, + timestamps: Sequence[dt.datetime] | None = None, + ) -> None: + """Construct a waveform timing object. + + Most applications should use the named constructors instead: + - Timing.create_with_no_interval + - Timing.create_with_regular_interval + - Timing.create_with_irregular_interval + """ + super().__init__(sample_interval_mode, timestamp, time_offset, sample_interval, timestamps) + + +Timing.empty = Timing.create_with_no_interval() +"""A waveform timing object with no timestamp, time offset, or sample interval.""" diff --git a/src/nitypes/waveform/_utils.py b/src/nitypes/waveform/_utils.py index 9b121cd4..a37a663a 100644 --- a/src/nitypes/waveform/_utils.py +++ b/src/nitypes/waveform/_utils.py @@ -1,12 +1,33 @@ from __future__ import annotations import operator +import sys from typing import SupportsIndex import numpy as np import numpy.typing as npt +def add_note(exception: Exception, note: str) -> None: + """Add a note to an exception. + + >>> try: + ... raise ValueError("Oh no") + ... except Exception as e: + ... add_note(e, "p.s. This is bad") + ... raise + Traceback (most recent call last): + ... + ValueError: Oh no + p.s. This is bad + """ + if sys.version_info >= (3, 11): + exception.add_note(note) + else: + message = exception.args[0] + "\n" + note + exception.args = (message,) + exception.args[1:] + + def arg_to_int(arg_description: str, value: SupportsIndex | None, default_value: int = 0) -> int: """Convert an argument to a signed integer.""" if value is None: @@ -19,7 +40,7 @@ def arg_to_uint(arg_description: str, value: SupportsIndex | None, default_value value = arg_to_int(arg_description, value, default_value) if value < 0: raise ValueError( - f"The {arg_description} must be a non-negative integer.\n\nProvided value: {value}" + f"The {arg_description} must be a non-negative integer.\n\n" f"Provided value: {value}" ) return value diff --git a/tests/unit/time/__init__.py b/tests/unit/time/__init__.py new file mode 100644 index 00000000..ea3d3827 --- /dev/null +++ b/tests/unit/time/__init__.py @@ -0,0 +1 @@ +"""Unit tests for the nitypes.time package.""" diff --git a/tests/unit/time/test_conversion.py b/tests/unit/time/test_conversion.py new file mode 100644 index 00000000..f281f8cb --- /dev/null +++ b/tests/unit/time/test_conversion.py @@ -0,0 +1,190 @@ +from __future__ import annotations + +import datetime as dt +import sys +from typing import Any + +import hightime as ht +import pytest + +from nitypes.time import convert_datetime, convert_timedelta + +if sys.version_info >= (3, 11): + from typing import assert_type +else: + from typing_extensions import assert_type + + +############################################################################### +# convert_datetime +############################################################################### +def test___dt_to_dt___convert_datetime___returns_original_object() -> None: + value_in = dt.datetime.now(dt.timezone.utc) + + value_out = convert_datetime(dt.datetime, value_in) + + assert_type(value_out, dt.datetime) + assert isinstance(value_out, dt.datetime) + assert value_out is value_in + + +def test___ht_to_ht___convert_datetime___returns_original_object() -> None: + value_in = ht.datetime.now(dt.timezone.utc) + + value_out = convert_datetime(ht.datetime, value_in) + + assert_type(value_out, ht.datetime) + assert value_out is value_in + + +def test___dt_to_ht___convert_datetime___returns_equivalant_ht_datetime() -> None: + value_in = dt.datetime.now(dt.timezone.utc) + + value_out = convert_datetime(ht.datetime, value_in) + + assert_type(value_out, ht.datetime) + assert isinstance(value_out, ht.datetime) + assert value_out == value_in + assert value_out.tzinfo is value_in.tzinfo + assert value_out.fold == value_in.fold + + +def test___ht_to_dt___convert_datetime___returns_equivalant_dt_datetime() -> None: + value_in = ht.datetime.now(dt.timezone.utc) + + value_out = convert_datetime(dt.datetime, value_in) + + assert_type(value_out, dt.datetime) + assert isinstance(value_out, dt.datetime) + assert value_out == value_in + assert value_out.tzinfo is value_in.tzinfo + assert value_out.fold == value_in.fold + + +def test___precise_ht_to_dt___convert_datetime___loses_precision() -> None: + # ht.datetime.now always sets femtosecond and yoctosecond to 0, so add an offset. + value_in = ht.datetime.now(dt.timezone.utc) + ht.timedelta(femtoseconds=1, yoctoseconds=2) + + value_out = convert_datetime(dt.datetime, value_in) + + assert_type(value_out, dt.datetime) + assert isinstance(value_out, dt.datetime) + assert value_out != value_in + assert value_out == value_in.replace(femtosecond=0, yoctosecond=0) + + +@pytest.mark.parametrize("requested_type", [dt.datetime, ht.datetime]) +def test___variable_requested_type___convert_datetime___static_return_type_unknown( + requested_type: type[Any], +) -> None: + value_in = dt.datetime.now(dt.timezone.utc) + + value_out = convert_datetime(requested_type, value_in) + + assert_type(value_out, Any) + assert isinstance(value_out, requested_type) + + +@pytest.mark.parametrize("value_in", [dt.datetime(2025, 1, 1), ht.datetime(2025, 1, 1)]) +def test___invalid_requested_type___convert_datetime___raises_type_error( + value_in: dt.datetime | ht.datetime, +) -> None: + with pytest.raises(TypeError) as exc: + _ = convert_datetime(str, value_in) # type: ignore[type-var] + + assert exc.value.args[0].startswith("The requested type must be a datetime type.") + + +@pytest.mark.parametrize("requested_type", [dt.datetime, ht.datetime]) +def test___invalid_value___convert_datetime___raises_type_error(requested_type: type[Any]) -> None: + value_in = "10:30 a.m." + + with pytest.raises(TypeError) as exc: + _ = convert_datetime(requested_type, value_in) # type: ignore[arg-type] + + assert exc.value.args[0].startswith("The value must be a datetime.") + + +############################################################################### +# convert_timedelta +############################################################################### +def test___dt_to_dt___convert_timedelta___returns_original_object() -> None: + value_in = dt.timedelta(days=1, seconds=2, microseconds=3) + + value_out = convert_timedelta(dt.timedelta, value_in) + + assert_type(value_out, dt.timedelta) + assert isinstance(value_out, dt.timedelta) + assert value_out is value_in + + +def test___ht_to_ht___convert_timedelta___returns_original_object() -> None: + value_in = ht.timedelta(days=1, seconds=2, microseconds=3, femtoseconds=4, yoctoseconds=5) + + value_out = convert_timedelta(ht.timedelta, value_in) + + assert_type(value_out, ht.timedelta) + assert value_out is value_in + + +def test___dt_to_ht___convert_timedelta___returns_equivalant_ht_timedelta() -> None: + value_in = dt.timedelta(days=1, seconds=2, microseconds=3) + + value_out = convert_timedelta(ht.timedelta, value_in) + + assert_type(value_out, ht.timedelta) + assert isinstance(value_out, ht.timedelta) + assert value_out == value_in + + +def test___ht_to_dt___convert_timedelta___returns_equivalant_dt_timedelta() -> None: + value_in = ht.timedelta(days=1, seconds=2, microseconds=3) + + value_out = convert_timedelta(dt.timedelta, value_in) + + assert_type(value_out, dt.timedelta) + assert isinstance(value_out, dt.timedelta) + assert value_out == value_in + + +def test___precise_ht_to_dt___convert_timedelta___loses_precision() -> None: + value_in = ht.timedelta(days=1, seconds=2, microseconds=3, femtoseconds=4, yoctoseconds=5) + + value_out = convert_timedelta(dt.timedelta, value_in) + + assert_type(value_out, dt.timedelta) + assert isinstance(value_out, dt.timedelta) + assert value_out != value_in + assert value_out == dt.timedelta(days=1, seconds=2, microseconds=3) + + +@pytest.mark.parametrize("requested_type", [dt.timedelta, ht.timedelta]) +def test___variable_requested_type___convert_timedelta___static_return_type_unknown( + requested_type: type[Any], +) -> None: + value_in = dt.timedelta(days=1, seconds=2, microseconds=3) + + value_out = convert_timedelta(requested_type, value_in) + + assert_type(value_out, Any) + assert isinstance(value_out, requested_type) + + +@pytest.mark.parametrize("value_in", [dt.timedelta(), ht.timedelta()]) +def test___invalid_requested_type___convert_timedelta___raises_type_error( + value_in: dt.timedelta | ht.timedelta, +) -> None: + with pytest.raises(TypeError) as exc: + _ = convert_timedelta(str, value_in) # type: ignore[type-var] + + assert exc.value.args[0].startswith("The requested type must be a timedelta type.") + + +@pytest.mark.parametrize("requested_type", [dt.timedelta, ht.timedelta]) +def test___invalid_value___convert_timedelta___raises_type_error(requested_type: type[Any]) -> None: + value_in = "10:30 a.m." + + with pytest.raises(TypeError) as exc: + _ = convert_timedelta(requested_type, value_in) # type: ignore[arg-type] + + assert exc.value.args[0].startswith("The value must be a timedelta.") diff --git a/tests/unit/waveform/_timing/__init__.py b/tests/unit/waveform/_timing/__init__.py new file mode 100644 index 00000000..d3f5ff72 --- /dev/null +++ b/tests/unit/waveform/_timing/__init__.py @@ -0,0 +1 @@ +"""Unit tests for the nitypes.waveform._timing package.""" diff --git a/tests/unit/waveform/_timing/test_conversion.py b/tests/unit/waveform/_timing/test_conversion.py new file mode 100644 index 00000000..90120f6b --- /dev/null +++ b/tests/unit/waveform/_timing/test_conversion.py @@ -0,0 +1,120 @@ +from __future__ import annotations + +import datetime as dt +import sys + +import hightime as ht + +from nitypes.waveform import PrecisionTiming, SampleIntervalMode, Timing +from nitypes.waveform._timing._conversion import convert_timing + +if sys.version_info >= (3, 11): + from typing import assert_type +else: + from typing_extensions import assert_type + + +def test___standard_to_standard___convert_timing___returns_original_object() -> None: + value_in = Timing.create_with_regular_interval( + dt.timedelta(milliseconds=1), dt.datetime(2025, 1, 1), dt.timedelta(seconds=1) + ) + + value_out = convert_timing(Timing, value_in) + + assert_type(value_out, Timing) + assert value_out is value_in + + +def test___precision_to_precision___convert_timing___returns_original_object() -> None: + value_in = PrecisionTiming.create_with_regular_interval( + ht.timedelta(milliseconds=1), ht.datetime(2025, 1, 1), ht.timedelta(seconds=1) + ) + + value_out = convert_timing(PrecisionTiming, value_in) + + assert_type(value_out, PrecisionTiming) + assert value_out is value_in + + +def test___standard_to_precision_empty___convert_timing___returns_equivalent_timing() -> None: + value_in = Timing.empty + + value_out = convert_timing(PrecisionTiming, value_in) + + assert_type(value_out, PrecisionTiming) + assert isinstance(value_out, PrecisionTiming) + assert value_out == PrecisionTiming.empty + + +def test___precision_to_standard_empty___convert_timing___returns_equivalent_timing() -> None: + value_in = PrecisionTiming.empty + + value_out = convert_timing(Timing, value_in) + + assert_type(value_out, Timing) + assert isinstance(value_out, Timing) + assert value_out == Timing.empty + + +def test___standard_to_precision_regular_interval___convert_timing___returns_equivalent_timing() -> ( + None +): + value_in = Timing.create_with_regular_interval( + dt.timedelta(milliseconds=1), dt.datetime(2025, 1, 1), dt.timedelta(seconds=1) + ) + + value_out = convert_timing(PrecisionTiming, value_in) + + assert_type(value_out, PrecisionTiming) + assert isinstance(value_out, PrecisionTiming) + assert value_out.sample_interval_mode == SampleIntervalMode.REGULAR + assert value_out.sample_interval == ht.timedelta(milliseconds=1) + assert value_out.timestamp == ht.datetime(2025, 1, 1) + assert value_out.time_offset == ht.timedelta(seconds=1) + + +def test___precision_to_standard_regular_interval___convert_timing___returns_equivalent_timing() -> ( + None +): + value_in = PrecisionTiming.create_with_regular_interval( + ht.timedelta(milliseconds=1), ht.datetime(2025, 1, 1), ht.timedelta(seconds=1) + ) + + value_out = convert_timing(Timing, value_in) + + assert_type(value_out, Timing) + assert isinstance(value_out, Timing) + assert value_out.sample_interval_mode == SampleIntervalMode.REGULAR + assert value_out.sample_interval == dt.timedelta(milliseconds=1) + assert value_out.timestamp == dt.datetime(2025, 1, 1) + assert value_out.time_offset == dt.timedelta(seconds=1) + + +def test___standard_to_precision_irregular_interval___convert_timing___returns_equivalent_timing() -> ( + None +): + value_in = Timing.create_with_irregular_interval( + [dt.datetime(2025, 1, 1), dt.datetime(2025, 1, 2)] + ) + + value_out = convert_timing(PrecisionTiming, value_in) + + assert_type(value_out, PrecisionTiming) + assert isinstance(value_out, PrecisionTiming) + assert value_out.sample_interval_mode == SampleIntervalMode.IRREGULAR + assert value_out._timestamps == [ht.datetime(2025, 1, 1), ht.datetime(2025, 1, 2)] + + +def test___precision_to_standard_irregular_interval___convert_timing___returns_equivalent_timing() -> ( + None +): + value_in = PrecisionTiming.create_with_irregular_interval( + [ht.datetime(2025, 1, 1), ht.datetime(2025, 1, 2)] + ) + + value_out = convert_timing(Timing, value_in) + + assert_type(value_out, Timing) + assert isinstance(value_out, Timing) + assert value_out.sample_interval_mode == SampleIntervalMode.IRREGULAR + assert value_out._timestamps == [dt.datetime(2025, 1, 1), dt.datetime(2025, 1, 2)] diff --git a/tests/unit/waveform/_timing/test_precision.py b/tests/unit/waveform/_timing/test_precision.py new file mode 100644 index 00000000..24c40185 --- /dev/null +++ b/tests/unit/waveform/_timing/test_precision.py @@ -0,0 +1,382 @@ +from __future__ import annotations + +import datetime as dt +import sys +from copy import deepcopy + +import hightime as ht +import pytest + +from nitypes.waveform import PrecisionTiming, SampleIntervalMode + +if sys.version_info >= (3, 11): + from typing import assert_type +else: + from typing_extensions import assert_type + + +############################################################################### +# empty +############################################################################### +def test___empty___is_waveform_timing() -> None: + assert_type(PrecisionTiming.empty, PrecisionTiming) + assert isinstance(PrecisionTiming.empty, PrecisionTiming) + + +def test___empty___no_timestamp() -> None: + assert not PrecisionTiming.empty.has_timestamp + with pytest.raises(RuntimeError) as exc: + _ = PrecisionTiming.empty.timestamp + + assert exc.value.args[0] == "The waveform timing does not have a timestamp." + + +def test___empty___no_start_time() -> None: + with pytest.raises(RuntimeError) as exc: + _ = PrecisionTiming.empty.start_time + + assert exc.value.args[0] == "The waveform timing does not have a timestamp." + + +def test___empty___default_time_offset() -> None: + assert PrecisionTiming.empty.time_offset == ht.timedelta() + + +def test___empty___no_sample_interval() -> None: + assert PrecisionTiming.empty._sample_interval is None + with pytest.raises(RuntimeError) as exc: + _ = PrecisionTiming.empty.sample_interval + + assert exc.value.args[0] == "The waveform timing does not have a sample interval." + + +def test___empty___sample_interval_mode_none() -> None: + assert PrecisionTiming.empty.sample_interval_mode == SampleIntervalMode.NONE + + +############################################################################### +# create_with_no_interval +############################################################################### +def test___no_args___create_with_no_interval___creates_empty_waveform_timing() -> None: + timing = PrecisionTiming.create_with_no_interval() + + assert_type(timing, PrecisionTiming) + assert not timing.has_timestamp + assert timing.time_offset == ht.timedelta() + assert timing._sample_interval is None + assert timing.sample_interval_mode == SampleIntervalMode.NONE + + +def test___timestamp___create_with_no_interval___creates_waveform_timing_with_timestamp() -> None: + timestamp = ht.datetime.now(dt.timezone.utc) + timing = PrecisionTiming.create_with_no_interval(timestamp) + + assert_type(timing, PrecisionTiming) + assert timing.timestamp == timestamp + assert timing.time_offset == ht.timedelta() + assert timing._sample_interval is None + assert timing.sample_interval_mode == SampleIntervalMode.NONE + + +def test___timestamp_and_time_offset___create_with_no_interval___creates_waveform_timing_with_timestamp_and_time_offset() -> ( + None +): + timestamp = ht.datetime.now(dt.timezone.utc) + time_offset = ht.timedelta(seconds=1.23) + timing = PrecisionTiming.create_with_no_interval(timestamp, time_offset) + + assert_type(timing, PrecisionTiming) + assert timing.timestamp == timestamp + assert timing.time_offset == time_offset + assert timing._sample_interval is None + assert timing.sample_interval_mode == SampleIntervalMode.NONE + + +def test___time_offset___create_with_no_interval___creates_waveform_timing_with_time_offset() -> ( + None +): + time_offset = ht.timedelta(seconds=1.23) + timing = PrecisionTiming.create_with_no_interval(time_offset=time_offset) + + assert_type(timing, PrecisionTiming) + assert not timing.has_timestamp + assert timing.time_offset == time_offset + assert timing._sample_interval is None + assert timing.sample_interval_mode == SampleIntervalMode.NONE + + +############################################################################### +# create_with_regular_interval +############################################################################### +def test___sample_interval___create_with_regular_interval___creates_waveform_timing_with_sample_interval() -> ( + None +): + sample_interval = ht.timedelta(milliseconds=1) + + timing = PrecisionTiming.create_with_regular_interval(sample_interval) + + assert_type(timing, PrecisionTiming) + assert not timing.has_timestamp + assert timing.time_offset == ht.timedelta() + assert timing.sample_interval == sample_interval + assert timing.sample_interval_mode == SampleIntervalMode.REGULAR + + +def test___sample_interval_and_timestamp___create_with_regular_interval___creates_waveform_timing_with_sample_interval_and_timestamp() -> ( + None +): + sample_interval = ht.timedelta(milliseconds=1) + timestamp = ht.datetime.now(dt.timezone.utc) + + timing = PrecisionTiming.create_with_regular_interval(sample_interval, timestamp) + + assert_type(timing, PrecisionTiming) + assert timing.timestamp == timestamp + assert timing.time_offset == ht.timedelta() + assert timing.sample_interval == sample_interval + assert timing.sample_interval_mode == SampleIntervalMode.REGULAR + + +def test___sample_interval_timestamp_and_time_offset___create_with_regular_interval___creates_waveform_timing_with_sample_interval_timestamp_and_time_offset() -> ( + None +): + sample_interval = ht.timedelta(milliseconds=1) + timestamp = ht.datetime.now(dt.timezone.utc) + time_offset = ht.timedelta(seconds=1.23) + + timing = PrecisionTiming.create_with_regular_interval(sample_interval, timestamp, time_offset) + + assert_type(timing, PrecisionTiming) + assert timing.timestamp == timestamp + assert timing.time_offset == time_offset + assert timing.sample_interval == sample_interval + assert timing.sample_interval_mode == SampleIntervalMode.REGULAR + + +def test___sample_interval_and_time_offset___create_with_regular_interval___creates_waveform_timing_with_sample_interval_and_time_offset() -> ( + None +): + sample_interval = ht.timedelta(milliseconds=1) + time_offset = ht.timedelta(seconds=1.23) + + timing = PrecisionTiming.create_with_regular_interval(sample_interval, time_offset=time_offset) + + assert_type(timing, PrecisionTiming) + assert not timing.has_timestamp + assert timing.time_offset == time_offset + assert timing.sample_interval == sample_interval + assert timing.sample_interval_mode == SampleIntervalMode.REGULAR + + +############################################################################### +# create_with_irregular_interval +############################################################################### +def test___timestamps___create_with_irregular_interval___creates_waveform_timing_with_timestamps() -> ( + None +): + start_time = ht.datetime.now(dt.timezone.utc) + timestamps = [ + start_time, + start_time + ht.timedelta(seconds=1), + start_time + ht.timedelta(seconds=2.3), + start_time + ht.timedelta(seconds=2.5), + ] + + timing = PrecisionTiming.create_with_irregular_interval(timestamps) + + assert_type(timing, PrecisionTiming) + assert not timing.has_timestamp + assert timing.time_offset == ht.timedelta() + assert timing._sample_interval is None + assert timing.sample_interval_mode == SampleIntervalMode.IRREGULAR + assert timing._timestamps == timestamps + + +############################################################################### +# get_timestamps +############################################################################### +def test___no_interval___get_timestamps___raises_runtime_error() -> None: + start_time = ht.datetime.now(dt.timezone.utc) + timing = PrecisionTiming.create_with_no_interval(start_time) + + with pytest.raises(RuntimeError) as exc: + _ = timing.get_timestamps(0, 5) + + assert exc.value.args[0].startswith( + "The waveform timing does not have valid timestamp information." + ) + + +def test___regular_interval___get_timestamps___gets_timestamps() -> None: + start_time = ht.datetime.now(dt.timezone.utc) + sample_interval = ht.timedelta(milliseconds=1) + timing = PrecisionTiming.create_with_regular_interval(sample_interval, start_time) + + assert list(timing.get_timestamps(3, 4)) == [ + start_time + 3 * sample_interval, + start_time + 4 * sample_interval, + start_time + 5 * sample_interval, + start_time + 6 * sample_interval, + ] + + +def test___irregular_interval___get_timestamps___gets_timestamps() -> None: + start_time = ht.datetime.now(dt.timezone.utc) + sample_interval = ht.timedelta(milliseconds=1) + timestamps = [start_time + i * sample_interval for i in range(10)] + timing = PrecisionTiming.create_with_irregular_interval(timestamps) + + assert list(timing.get_timestamps(0, 10)) == timestamps + + +def test___irregular_interval_subset___get_timestamps___gets_timestamps() -> None: + start_time = ht.datetime.now(dt.timezone.utc) + sample_interval = ht.timedelta(milliseconds=1) + timestamps = [start_time + i * sample_interval for i in range(10)] + timing = PrecisionTiming.create_with_irregular_interval(timestamps) + + assert list(timing.get_timestamps(3, 4)) == timestamps[3:7] + + +############################################################################### +# magic methods +############################################################################### +@pytest.mark.xfail(raises=TypeError, reason="https://github.com/ni/hightime/issues/49") +@pytest.mark.parametrize( + "value", + [ + PrecisionTiming.create_with_no_interval(), + PrecisionTiming.create_with_no_interval(ht.datetime(2025, 1, 1)), + PrecisionTiming.create_with_no_interval(None, ht.timedelta(seconds=1)), + PrecisionTiming.create_with_no_interval(ht.datetime(2025, 1, 1), ht.timedelta(seconds=1)), + PrecisionTiming.create_with_regular_interval(ht.timedelta(milliseconds=1)), + PrecisionTiming.create_with_regular_interval( + ht.timedelta(milliseconds=1), ht.datetime(2025, 1, 1) + ), + PrecisionTiming.create_with_regular_interval( + ht.timedelta(milliseconds=1), ht.datetime(2025, 1, 1), ht.timedelta(seconds=1) + ), + PrecisionTiming.create_with_irregular_interval( + [ht.datetime(2025, 1, 1), ht.datetime(2025, 1, 2)] + ), + ], +) +def test___deep_copy___equality___equal(value: PrecisionTiming) -> None: + other = deepcopy(value) + + assert value == other + assert not (value != other) + + +@pytest.mark.parametrize( + "lhs, rhs", + [ + ( + PrecisionTiming.create_with_no_interval( + ht.datetime(2025, 1, 1), ht.timedelta(seconds=1) + ), + PrecisionTiming.create_with_no_interval( + ht.datetime(2025, 1, 2), ht.timedelta(seconds=1) + ), + ), + ( + PrecisionTiming.create_with_no_interval( + ht.datetime(2025, 1, 1), ht.timedelta(seconds=1) + ), + PrecisionTiming.create_with_no_interval( + ht.datetime(2025, 1, 1), ht.timedelta(seconds=2) + ), + ), + ( + PrecisionTiming.create_with_regular_interval( + ht.timedelta(milliseconds=1), ht.datetime(2025, 1, 1), ht.timedelta(seconds=1) + ), + PrecisionTiming.create_with_regular_interval( + ht.timedelta(milliseconds=2), ht.datetime(2025, 1, 1), ht.timedelta(seconds=1) + ), + ), + ( + PrecisionTiming.create_with_regular_interval( + ht.timedelta(milliseconds=1), ht.datetime(2025, 1, 1), ht.timedelta(seconds=1) + ), + PrecisionTiming.create_with_regular_interval( + ht.timedelta(milliseconds=1), ht.datetime(2025, 1, 2), ht.timedelta(seconds=1) + ), + ), + ( + PrecisionTiming.create_with_regular_interval( + ht.timedelta(milliseconds=1), ht.datetime(2025, 1, 1), ht.timedelta(seconds=1) + ), + PrecisionTiming.create_with_regular_interval( + ht.timedelta(milliseconds=1), ht.datetime(2025, 1, 1), ht.timedelta(seconds=2) + ), + ), + ( + PrecisionTiming.create_with_irregular_interval( + [ht.datetime(2025, 1, 1), ht.datetime(2025, 1, 2)] + ), + PrecisionTiming.create_with_irregular_interval( + [ht.datetime(2025, 1, 3), ht.datetime(2025, 1, 2)] + ), + ), + ], +) +def test___different_value___equality___not_equal( + lhs: PrecisionTiming, + rhs: PrecisionTiming, +) -> None: + assert not (lhs == rhs) + assert lhs != rhs + + +@pytest.mark.parametrize( + "value, expected_repr", + [ + ( + PrecisionTiming.create_with_no_interval(), + "nitypes.waveform.PrecisionTiming(nitypes.waveform.SampleIntervalMode.NONE)", + ), + ( + PrecisionTiming.create_with_no_interval(ht.datetime(2025, 1, 1)), + "nitypes.waveform.PrecisionTiming(nitypes.waveform.SampleIntervalMode.NONE, timestamp=hightime.datetime(2025, 1, 1, 0, 0))", + ), + ( + PrecisionTiming.create_with_no_interval(None, ht.timedelta(seconds=1)), + "nitypes.waveform.PrecisionTiming(nitypes.waveform.SampleIntervalMode.NONE, time_offset=hightime.timedelta(seconds=1))", + ), + ( + PrecisionTiming.create_with_no_interval( + ht.datetime(2025, 1, 1), ht.timedelta(seconds=1) + ), + "nitypes.waveform.PrecisionTiming(nitypes.waveform.SampleIntervalMode.NONE, timestamp=hightime.datetime(2025, 1, 1, 0, 0), time_offset=hightime.timedelta(seconds=1))", + ), + ( + PrecisionTiming.create_with_no_interval(ht.datetime(2025, 1, 1), ht.timedelta()), + "nitypes.waveform.PrecisionTiming(nitypes.waveform.SampleIntervalMode.NONE, timestamp=hightime.datetime(2025, 1, 1, 0, 0), time_offset=hightime.timedelta())", + ), + ( + PrecisionTiming.create_with_regular_interval(ht.timedelta(milliseconds=1)), + "nitypes.waveform.PrecisionTiming(nitypes.waveform.SampleIntervalMode.REGULAR, sample_interval=hightime.timedelta(microseconds=1000))", + ), + ( + PrecisionTiming.create_with_regular_interval( + ht.timedelta(milliseconds=1), ht.datetime(2025, 1, 1) + ), + "nitypes.waveform.PrecisionTiming(nitypes.waveform.SampleIntervalMode.REGULAR, timestamp=hightime.datetime(2025, 1, 1, 0, 0), sample_interval=hightime.timedelta(microseconds=1000))", + ), + ( + PrecisionTiming.create_with_regular_interval( + ht.timedelta(milliseconds=1), ht.datetime(2025, 1, 1), ht.timedelta(seconds=1) + ), + "nitypes.waveform.PrecisionTiming(nitypes.waveform.SampleIntervalMode.REGULAR, timestamp=hightime.datetime(2025, 1, 1, 0, 0), time_offset=hightime.timedelta(seconds=1), sample_interval=hightime.timedelta(microseconds=1000))", + ), + ( + PrecisionTiming.create_with_irregular_interval( + [ht.datetime(2025, 1, 1), ht.datetime(2025, 1, 2)] + ), + "nitypes.waveform.PrecisionTiming(nitypes.waveform.SampleIntervalMode.IRREGULAR, timestamps=[hightime.datetime(2025, 1, 1, 0, 0), hightime.datetime(2025, 1, 2, 0, 0)])", + ), + ], +) +def test___various_values___repr___looks_ok(value: PrecisionTiming, expected_repr: str) -> None: + assert repr(value) == expected_repr diff --git a/tests/unit/waveform/_timing/test_standard.py b/tests/unit/waveform/_timing/test_standard.py new file mode 100644 index 00000000..10001791 --- /dev/null +++ b/tests/unit/waveform/_timing/test_standard.py @@ -0,0 +1,366 @@ +from __future__ import annotations + +import datetime as dt +import sys +from copy import deepcopy + +import pytest + +from nitypes.waveform import SampleIntervalMode, Timing + +if sys.version_info >= (3, 11): + from typing import assert_type +else: + from typing_extensions import assert_type + + +############################################################################### +# empty +############################################################################### +def test___empty___is_waveform_timing() -> None: + assert_type(Timing.empty, Timing) + assert isinstance(Timing.empty, Timing) + + +def test___empty___no_timestamp() -> None: + assert not Timing.empty.has_timestamp + with pytest.raises(RuntimeError) as exc: + _ = Timing.empty.timestamp + + assert exc.value.args[0] == "The waveform timing does not have a timestamp." + + +def test___empty___no_start_time() -> None: + with pytest.raises(RuntimeError) as exc: + _ = Timing.empty.start_time + + assert exc.value.args[0] == "The waveform timing does not have a timestamp." + + +def test___empty___default_time_offset() -> None: + assert Timing.empty.time_offset == dt.timedelta() + + +def test___empty___no_sample_interval() -> None: + assert Timing.empty._sample_interval is None + with pytest.raises(RuntimeError) as exc: + _ = Timing.empty.sample_interval + + assert exc.value.args[0] == "The waveform timing does not have a sample interval." + + +def test___empty___sample_interval_mode_none() -> None: + assert Timing.empty.sample_interval_mode == SampleIntervalMode.NONE + + +############################################################################### +# create_with_no_interval +############################################################################### +def test___no_args___create_with_no_interval___creates_empty_waveform_timing() -> None: + timing = Timing.create_with_no_interval() + + assert_type(timing, Timing) + assert not timing.has_timestamp + assert timing.time_offset == dt.timedelta() + assert timing._sample_interval is None + assert timing.sample_interval_mode == SampleIntervalMode.NONE + + +def test___timestamp___create_with_no_interval___creates_waveform_timing_with_timestamp() -> None: + timestamp = dt.datetime.now(dt.timezone.utc) + timing = Timing.create_with_no_interval(timestamp) + + assert_type(timing, Timing) + assert timing.timestamp == timestamp + assert timing.time_offset == dt.timedelta() + assert timing._sample_interval is None + assert timing.sample_interval_mode == SampleIntervalMode.NONE + + +def test___timestamp_and_time_offset___create_with_no_interval___creates_waveform_timing_with_timestamp_and_time_offset() -> ( + None +): + timestamp = dt.datetime.now(dt.timezone.utc) + time_offset = dt.timedelta(seconds=1.23) + timing = Timing.create_with_no_interval(timestamp, time_offset) + + assert_type(timing, Timing) + assert timing.timestamp == timestamp + assert timing.time_offset == time_offset + assert timing._sample_interval is None + assert timing.sample_interval_mode == SampleIntervalMode.NONE + + +def test___time_offset___create_with_no_interval___creates_waveform_timing_with_time_offset() -> ( + None +): + time_offset = dt.timedelta(seconds=1.23) + timing = Timing.create_with_no_interval(time_offset=time_offset) + + assert_type(timing, Timing) + assert not timing.has_timestamp + assert timing.time_offset == time_offset + assert timing._sample_interval is None + assert timing.sample_interval_mode == SampleIntervalMode.NONE + + +############################################################################### +# create_with_regular_interval +############################################################################### +def test___sample_interval___create_with_regular_interval___creates_waveform_timing_with_sample_interval() -> ( + None +): + sample_interval = dt.timedelta(milliseconds=1) + + timing = Timing.create_with_regular_interval(sample_interval) + + assert_type(timing, Timing) + assert not timing.has_timestamp + assert timing.time_offset == dt.timedelta() + assert timing.sample_interval == sample_interval + assert timing.sample_interval_mode == SampleIntervalMode.REGULAR + + +def test___sample_interval_and_timestamp___create_with_regular_interval___creates_waveform_timing_with_sample_interval_and_timestamp() -> ( + None +): + sample_interval = dt.timedelta(milliseconds=1) + timestamp = dt.datetime.now(dt.timezone.utc) + + timing = Timing.create_with_regular_interval(sample_interval, timestamp) + + assert_type(timing, Timing) + assert timing.timestamp == timestamp + assert timing.time_offset == dt.timedelta() + assert timing.sample_interval == sample_interval + assert timing.sample_interval_mode == SampleIntervalMode.REGULAR + + +def test___sample_interval_timestamp_and_time_offset___create_with_regular_interval___creates_waveform_timing_with_sample_interval_timestamp_and_time_offset() -> ( + None +): + sample_interval = dt.timedelta(milliseconds=1) + timestamp = dt.datetime.now(dt.timezone.utc) + time_offset = dt.timedelta(seconds=1.23) + + timing = Timing.create_with_regular_interval(sample_interval, timestamp, time_offset) + + assert_type(timing, Timing) + assert timing.timestamp == timestamp + assert timing.time_offset == time_offset + assert timing.sample_interval == sample_interval + assert timing.sample_interval_mode == SampleIntervalMode.REGULAR + + +def test___sample_interval_and_time_offset___create_with_regular_interval___creates_waveform_timing_with_sample_interval_and_time_offset() -> ( + None +): + sample_interval = dt.timedelta(milliseconds=1) + time_offset = dt.timedelta(seconds=1.23) + + timing = Timing.create_with_regular_interval(sample_interval, time_offset=time_offset) + + assert_type(timing, Timing) + assert not timing.has_timestamp + assert timing.time_offset == time_offset + assert timing.sample_interval == sample_interval + assert timing.sample_interval_mode == SampleIntervalMode.REGULAR + + +############################################################################### +# create_with_irregular_interval +############################################################################### +def test___timestamps___create_with_irregular_interval___creates_waveform_timing_with_timestamps() -> ( + None +): + start_time = dt.datetime.now(dt.timezone.utc) + timestamps = [ + start_time, + start_time + dt.timedelta(seconds=1), + start_time + dt.timedelta(seconds=2.3), + start_time + dt.timedelta(seconds=2.5), + ] + + timing = Timing.create_with_irregular_interval(timestamps) + + assert_type(timing, Timing) + assert not timing.has_timestamp + assert timing.time_offset == dt.timedelta() + assert timing._sample_interval is None + assert timing.sample_interval_mode == SampleIntervalMode.IRREGULAR + assert timing._timestamps == timestamps + + +############################################################################### +# get_timestamps +############################################################################### +def test___no_interval___get_timestamps___raises_runtime_error() -> None: + start_time = dt.datetime.now(dt.timezone.utc) + timing = Timing.create_with_no_interval(start_time) + + with pytest.raises(RuntimeError) as exc: + _ = timing.get_timestamps(0, 5) + + assert exc.value.args[0].startswith( + "The waveform timing does not have valid timestamp information." + ) + + +def test___regular_interval___get_timestamps___gets_timestamps() -> None: + start_time = dt.datetime.now(dt.timezone.utc) + sample_interval = dt.timedelta(milliseconds=1) + timing = Timing.create_with_regular_interval(sample_interval, start_time) + + assert list(timing.get_timestamps(3, 4)) == [ + start_time + 3 * sample_interval, + start_time + 4 * sample_interval, + start_time + 5 * sample_interval, + start_time + 6 * sample_interval, + ] + + +def test___irregular_interval___get_timestamps___gets_timestamps() -> None: + start_time = dt.datetime.now(dt.timezone.utc) + sample_interval = dt.timedelta(milliseconds=1) + timestamps = [start_time + i * sample_interval for i in range(10)] + timing = Timing.create_with_irregular_interval(timestamps) + + assert list(timing.get_timestamps(0, 10)) == timestamps + + +def test___irregular_interval_subset___get_timestamps___gets_timestamps() -> None: + start_time = dt.datetime.now(dt.timezone.utc) + sample_interval = dt.timedelta(milliseconds=1) + timestamps = [start_time + i * sample_interval for i in range(10)] + timing = Timing.create_with_irregular_interval(timestamps) + + assert list(timing.get_timestamps(3, 4)) == timestamps[3:7] + + +############################################################################### +# magic methods +############################################################################### +@pytest.mark.parametrize( + "value", + [ + Timing.create_with_no_interval(), + Timing.create_with_no_interval(dt.datetime(2025, 1, 1)), + Timing.create_with_no_interval(None, dt.timedelta(seconds=1)), + Timing.create_with_no_interval(dt.datetime(2025, 1, 1), dt.timedelta(seconds=1)), + Timing.create_with_regular_interval(dt.timedelta(milliseconds=1)), + Timing.create_with_regular_interval(dt.timedelta(milliseconds=1), dt.datetime(2025, 1, 1)), + Timing.create_with_regular_interval( + dt.timedelta(milliseconds=1), dt.datetime(2025, 1, 1), dt.timedelta(seconds=1) + ), + Timing.create_with_irregular_interval([dt.datetime(2025, 1, 1), dt.datetime(2025, 1, 2)]), + ], +) +def test___deep_copy___equality___equal(value: Timing) -> None: + other = deepcopy(value) + + assert value == other + assert not (value != other) + + +@pytest.mark.parametrize( + "lhs, rhs", + [ + ( + Timing.create_with_no_interval(dt.datetime(2025, 1, 1), dt.timedelta(seconds=1)), + Timing.create_with_no_interval(dt.datetime(2025, 1, 2), dt.timedelta(seconds=1)), + ), + ( + Timing.create_with_no_interval(dt.datetime(2025, 1, 1), dt.timedelta(seconds=1)), + Timing.create_with_no_interval(dt.datetime(2025, 1, 1), dt.timedelta(seconds=2)), + ), + ( + Timing.create_with_regular_interval( + dt.timedelta(milliseconds=1), dt.datetime(2025, 1, 1), dt.timedelta(seconds=1) + ), + Timing.create_with_regular_interval( + dt.timedelta(milliseconds=2), dt.datetime(2025, 1, 1), dt.timedelta(seconds=1) + ), + ), + ( + Timing.create_with_regular_interval( + dt.timedelta(milliseconds=1), dt.datetime(2025, 1, 1), dt.timedelta(seconds=1) + ), + Timing.create_with_regular_interval( + dt.timedelta(milliseconds=1), dt.datetime(2025, 1, 2), dt.timedelta(seconds=1) + ), + ), + ( + Timing.create_with_regular_interval( + dt.timedelta(milliseconds=1), dt.datetime(2025, 1, 1), dt.timedelta(seconds=1) + ), + Timing.create_with_regular_interval( + dt.timedelta(milliseconds=1), dt.datetime(2025, 1, 1), dt.timedelta(seconds=2) + ), + ), + ( + Timing.create_with_irregular_interval( + [dt.datetime(2025, 1, 1), dt.datetime(2025, 1, 2)] + ), + Timing.create_with_irregular_interval( + [dt.datetime(2025, 1, 3), dt.datetime(2025, 1, 2)] + ), + ), + ], +) +def test___different_value___equality___not_equal( + lhs: Timing, + rhs: Timing, +) -> None: + assert not (lhs == rhs) + assert lhs != rhs + + +@pytest.mark.parametrize( + "value, expected_repr", + [ + ( + Timing.create_with_no_interval(), + "nitypes.waveform.Timing(nitypes.waveform.SampleIntervalMode.NONE)", + ), + ( + Timing.create_with_no_interval(dt.datetime(2025, 1, 1)), + "nitypes.waveform.Timing(nitypes.waveform.SampleIntervalMode.NONE, timestamp=datetime.datetime(2025, 1, 1, 0, 0))", + ), + ( + Timing.create_with_no_interval(None, dt.timedelta(seconds=1)), + "nitypes.waveform.Timing(nitypes.waveform.SampleIntervalMode.NONE, time_offset=datetime.timedelta(seconds=1))", + ), + ( + Timing.create_with_no_interval(dt.datetime(2025, 1, 1), dt.timedelta(seconds=1)), + "nitypes.waveform.Timing(nitypes.waveform.SampleIntervalMode.NONE, timestamp=datetime.datetime(2025, 1, 1, 0, 0), time_offset=datetime.timedelta(seconds=1))", + ), + ( + Timing.create_with_no_interval(dt.datetime(2025, 1, 1), dt.timedelta()), + "nitypes.waveform.Timing(nitypes.waveform.SampleIntervalMode.NONE, timestamp=datetime.datetime(2025, 1, 1, 0, 0), time_offset=datetime.timedelta(0))", + ), + ( + Timing.create_with_regular_interval(dt.timedelta(milliseconds=1)), + "nitypes.waveform.Timing(nitypes.waveform.SampleIntervalMode.REGULAR, sample_interval=datetime.timedelta(microseconds=1000))", + ), + ( + Timing.create_with_regular_interval( + dt.timedelta(milliseconds=1), dt.datetime(2025, 1, 1) + ), + "nitypes.waveform.Timing(nitypes.waveform.SampleIntervalMode.REGULAR, timestamp=datetime.datetime(2025, 1, 1, 0, 0), sample_interval=datetime.timedelta(microseconds=1000))", + ), + ( + Timing.create_with_regular_interval( + dt.timedelta(milliseconds=1), dt.datetime(2025, 1, 1), dt.timedelta(seconds=1) + ), + "nitypes.waveform.Timing(nitypes.waveform.SampleIntervalMode.REGULAR, timestamp=datetime.datetime(2025, 1, 1, 0, 0), time_offset=datetime.timedelta(seconds=1), sample_interval=datetime.timedelta(microseconds=1000))", + ), + ( + Timing.create_with_irregular_interval( + [dt.datetime(2025, 1, 1), dt.datetime(2025, 1, 2)] + ), + "nitypes.waveform.Timing(nitypes.waveform.SampleIntervalMode.IRREGULAR, timestamps=[datetime.datetime(2025, 1, 1, 0, 0), datetime.datetime(2025, 1, 2, 0, 0)])", + ), + ], +) +def test___various_values___repr___looks_ok(value: Timing, expected_repr: str) -> None: + assert repr(value) == expected_repr diff --git a/tests/unit/waveform/test_analog_waveform.py b/tests/unit/waveform/test_analog_waveform.py index 5bf69ab1..eca00e75 100644 --- a/tests/unit/waveform/test_analog_waveform.py +++ b/tests/unit/waveform/test_analog_waveform.py @@ -1,15 +1,17 @@ from __future__ import annotations import array +import datetime as dt import itertools import sys import weakref from typing import Any, SupportsIndex +import hightime as ht import numpy as np import pytest -from nitypes.waveform import AnalogWaveform +from nitypes.waveform import AnalogWaveform, PrecisionTiming, Timing if sys.version_info >= (3, 11): from typing import assert_type @@ -636,7 +638,7 @@ def test___array_with_external_buffer___set_capacity___raises_value_error() -> N ############################################################################### -# misc +# extended properties ############################################################################### def test___waveform___set_channel_name___sets_extended_property() -> None: waveform = AnalogWaveform() @@ -687,3 +689,45 @@ def test___waveform___take_weak_ref___references_waveform() -> None: waveform_ref = weakref.ref(waveform) assert waveform_ref() is waveform + + +############################################################################### +# timing +############################################################################### +def test___waveform___has_empty_timing() -> None: + waveform = AnalogWaveform() + + assert_type(waveform.timing, Timing) + assert waveform.timing is Timing.empty + assert_type(waveform.precision_timing, PrecisionTiming) + assert waveform.precision_timing is PrecisionTiming.empty + + +def test___waveform_with_timing___get_precision_timing___converts_timing() -> None: + waveform = AnalogWaveform() + waveform.timing = Timing.create_with_regular_interval( + dt.timedelta(milliseconds=1), dt.datetime(2025, 1, 1), dt.timedelta(seconds=1) + ) + + precision_timing = waveform.precision_timing + + assert_type(precision_timing, PrecisionTiming) + assert isinstance(precision_timing, PrecisionTiming) + assert precision_timing == PrecisionTiming.create_with_regular_interval( + ht.timedelta(milliseconds=1), ht.datetime(2025, 1, 1), ht.timedelta(seconds=1) + ) + + +def test___waveform_with_precision_timing___get_timing___converts_timing() -> None: + waveform = AnalogWaveform() + waveform.precision_timing = PrecisionTiming.create_with_regular_interval( + ht.timedelta(milliseconds=1), ht.datetime(2025, 1, 1), ht.timedelta(seconds=1) + ) + + timing = waveform.timing + + assert_type(timing, Timing) + assert isinstance(timing, Timing) + assert timing == Timing.create_with_regular_interval( + dt.timedelta(milliseconds=1), dt.datetime(2025, 1, 1), dt.timedelta(seconds=1) + )