diff --git a/pyproject.toml b/pyproject.toml index 08b631c6..0f432636 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -31,7 +31,7 @@ line-length = 100 [tool.pytest.ini_options] addopts = "--doctest-modules --strict-markers" -testpaths = ["tests"] +testpaths = ["src/nitypes", "tests"] [build-system] requires = ["poetry>=1.2"] diff --git a/src/nitypes/waveform/__init__.py b/src/nitypes/waveform/__init__.py new file mode 100644 index 00000000..14c6731a --- /dev/null +++ b/src/nitypes/waveform/__init__.py @@ -0,0 +1,13 @@ +"""Waveform data types for NI Python APIs.""" + +from nitypes.waveform._analog_waveform import AnalogWaveform +from nitypes.waveform._extended_properties import ( + ExtendedPropertyDictionary, + ExtendedPropertyValue, +) + +__all__ = [ + "AnalogWaveform", + "ExtendedPropertyDictionary", + "ExtendedPropertyValue", +] diff --git a/src/nitypes/waveform/_analog_waveform.py b/src/nitypes/waveform/_analog_waveform.py new file mode 100644 index 00000000..816048db --- /dev/null +++ b/src/nitypes/waveform/_analog_waveform.py @@ -0,0 +1,466 @@ +from __future__ import annotations + +import sys +from collections.abc import Sequence +from typing import Any, Generic, SupportsIndex, TypeVar, overload + +import numpy as np +import numpy.typing as npt + +from nitypes.waveform._extended_properties import ( + CHANNEL_NAME, + UNIT_DESCRIPTION, + ExtendedPropertyDictionary, +) +from nitypes.waveform._utils import arg_to_uint, validate_dtype + +if sys.version_info < (3, 10): + import array as std_array + +_ScalarType = TypeVar("_ScalarType", bound=np.generic) +_ScalarType_co = TypeVar("_ScalarType_co", bound=np.generic, covariant=True) + +# Use the C types here because np.isdtype() considers some of them to be distinct types, even when +# they have the same size (e.g. np.intc vs. np.int_ vs. np.long). +_ANALOG_DTYPES = ( + # Floating point + np.single, + np.double, + # Signed integers + np.byte, + np.short, + np.intc, + np.int_, + np.long, + np.longlong, + # Unsigned integers + np.ubyte, + np.ushort, + np.uintc, + np.uint, + np.ulong, + np.ulonglong, +) + +# Note about NumPy type hints: +# - At time of writing (April 2025), shape typing is still under development, so we do not +# distinguish between 1D and 2D arrays in type hints. +# - npt.ArrayLike accepts some types that np.asarray() does not, such as buffers, so we are +# explicitly using npt.NDArray | Sequence instead of npt.ArrayLike. +# - _ScalarType is bound to np.generic, so Sequence[_ScalarType] will not match list[int]. +# - We are not using PEP 696 – Type Defaults for Type Parameters because it makes the type parameter +# default to np.float64 in some cases where it should be inferred as Any, such as when dtype is +# specified as a str. + + +class AnalogWaveform(Generic[_ScalarType_co]): + """An analog waveform, which encapsulates analog data and timing information.""" + + @overload + @staticmethod + def from_array_1d( + array: npt.NDArray[_ScalarType], + dtype: None = ..., + *, + copy: bool = ..., + start_index: SupportsIndex | None = ..., + sample_count: SupportsIndex | None = ..., + ) -> AnalogWaveform[_ScalarType]: ... + + @overload + @staticmethod + def from_array_1d( + array: npt.NDArray[Any] | Sequence[Any], + dtype: type[_ScalarType] | np.dtype[_ScalarType] = ..., + *, + copy: bool = ..., + start_index: SupportsIndex | None = ..., + sample_count: SupportsIndex | None = ..., + ) -> AnalogWaveform[_ScalarType]: ... + + @overload + @staticmethod + def from_array_1d( + array: npt.NDArray[Any] | Sequence[Any], + dtype: npt.DTypeLike = ..., + *, + copy: bool = ..., + start_index: SupportsIndex | None = ..., + sample_count: SupportsIndex | None = ..., + ) -> AnalogWaveform[Any]: ... + + @staticmethod + def from_array_1d( + array: npt.NDArray[Any] | Sequence[Any], + dtype: npt.DTypeLike = None, + *, + copy: bool = True, + start_index: SupportsIndex | None = 0, + sample_count: SupportsIndex | None = None, + ) -> AnalogWaveform[_ScalarType]: + """Construct an analog waveform from a one-dimensional array or sequence. + + Args: + array: The analog waveform data as a one-dimensional array or a sequence. + dtype: The NumPy data type for the analog waveform data. This argument is required + when array is a sequence. + copy: Specifies whether to copy the array or save a reference to it. + start_index: The sample index at which the analog waveform data begins. + sample_count: The number of samples in the analog waveform. + + Returns: + An analog waveform containing the specified data. + """ + if isinstance(array, np.ndarray): + if array.ndim != 1: + raise ValueError( + f"The input array must be a one-dimensional array or sequence.\n\nNumber of dimensions: {array.ndim}" + ) + elif isinstance(array, Sequence) or ( + sys.version_info < (3, 10) and isinstance(array, std_array.array) + ): + if dtype is None: + raise ValueError("You must specify a dtype when the input array is a sequence.") + else: + raise TypeError( + f"The input array must be a one-dimensional array or sequence.\n\nType: {type(array)}" + ) + + return AnalogWaveform( + _data=np.asarray(array, dtype, copy=copy), + start_index=start_index, + sample_count=sample_count, + ) + + @overload + @staticmethod + def from_array_2d( + array: npt.NDArray[_ScalarType], + dtype: None = ..., + *, + copy: bool = ..., + start_index: SupportsIndex | None = ..., + sample_count: SupportsIndex | None = ..., + ) -> list[AnalogWaveform[_ScalarType]]: ... + + @overload + @staticmethod + def from_array_2d( + array: npt.NDArray[Any] | Sequence[Sequence[Any]], + dtype: type[_ScalarType] | np.dtype[_ScalarType] = ..., + *, + copy: bool = ..., + start_index: SupportsIndex | None = ..., + sample_count: SupportsIndex | None = ..., + ) -> list[AnalogWaveform[_ScalarType]]: ... + + @overload + @staticmethod + def from_array_2d( + array: npt.NDArray[Any] | Sequence[Sequence[Any]], + dtype: npt.DTypeLike = ..., + *, + copy: bool = ..., + start_index: SupportsIndex | None = ..., + sample_count: SupportsIndex | None = ..., + ) -> list[AnalogWaveform[Any]]: ... + + @staticmethod + def from_array_2d( + array: npt.NDArray[Any] | Sequence[Sequence[Any]], + dtype: npt.DTypeLike = None, + *, + copy: bool = True, + start_index: SupportsIndex | None = 0, + sample_count: SupportsIndex | None = None, + ) -> list[AnalogWaveform[_ScalarType]]: + """Construct a list of analog waveforms from a two-dimensional array or nested sequence. + + Args: + array: The analog waveform data as a two-dimensional array or a nested sequence. + dtype: The NumPy data type for the analog waveform data. This argument is required + when array is a sequence. + copy: Specifies whether to copy the array or save a reference to it. + start_index: The sample index at which the analog waveform data begins. + sample_count: The number of samples in the analog waveform. + + Returns: + A list containing an analog waveform for each row of the specified data. + """ + if isinstance(array, np.ndarray): + if array.ndim != 2: + raise ValueError( + f"The input array must be a two-dimensional array or nested sequence.\n\nNumber of dimensions: {array.ndim}" + ) + elif isinstance(array, Sequence) or ( + sys.version_info < (3, 10) and isinstance(array, std_array.array) + ): + if dtype is None: + raise ValueError("You must specify a dtype when the input array is a sequence.") + else: + raise TypeError( + f"The input array must be a two-dimensional array or nested sequence.\n\nType: {type(array)}" + ) + + return [ + AnalogWaveform( + _data=np.asarray(array[i], dtype, copy=copy), + start_index=start_index, + sample_count=sample_count, + ) + for i in range(len(array)) + ] + + __slots__ = ["_data", "_start_index", "_sample_count", "_extended_properties", "__weakref__"] + + _data: npt.NDArray[_ScalarType_co] + _start_index: int + _sample_count: int + _extended_properties: ExtendedPropertyDictionary + + # If neither dtype nor _data is specified, the type parameter defaults to np.float64. + @overload + def __init__( # noqa: D107 - Missing docstring in __init__ (auto-generated noqa) + self: AnalogWaveform[np.float64], + sample_count: SupportsIndex | None = ..., + dtype: None = ..., + *, + start_index: SupportsIndex | None = ..., + capacity: SupportsIndex | None = ..., + _data: None = ..., + ) -> None: ... + + @overload + def __init__( # noqa: D107 - Missing docstring in __init__ (auto-generated noqa) + self: AnalogWaveform[_ScalarType_co], + sample_count: SupportsIndex | None = ..., + dtype: type[_ScalarType_co] | np.dtype[_ScalarType_co] = ..., + *, + start_index: SupportsIndex | None = ..., + capacity: SupportsIndex | None = ..., + _data: None = ..., + ) -> None: ... + + @overload + def __init__( # noqa: D107 - Missing docstring in __init__ (auto-generated noqa) + self: AnalogWaveform[_ScalarType_co], + sample_count: SupportsIndex | None = ..., + dtype: None = ..., + *, + start_index: SupportsIndex | None = ..., + capacity: SupportsIndex | None = ..., + _data: npt.NDArray[_ScalarType_co] | None = ..., + ) -> None: ... + + @overload + def __init__( # noqa: D107 - Missing docstring in __init__ (auto-generated noqa) + self: AnalogWaveform[Any], + sample_count: SupportsIndex | None = ..., + dtype: npt.DTypeLike = ..., + *, + start_index: SupportsIndex | None = ..., + capacity: SupportsIndex | None = ..., + _data: npt.NDArray[Any] | None = ..., + ) -> None: ... + + def __init__( + self, + sample_count: SupportsIndex | None = None, + dtype: npt.DTypeLike = None, + *, + start_index: SupportsIndex | None = None, + capacity: SupportsIndex | None = None, + _data: npt.NDArray[_ScalarType_co] | None = None, + ) -> None: + """Construct an analog waveform. + + Args: + sample_count: The number of samples in the analog waveform. + dtype: The NumPy data type for the analog waveform data. If not specified, the data + type defaults to np.float64. + start_index: The sample index at which the analog waveform data begins. + sample_count: The number of samples in the analog waveform. + capacity: The number of samples to allocate. Pre-allocating a larger buffer optimizes + appending samples to the waveform. + + Returns: + An analog waveform. + + Arguments that are prefixed with an underscore are internal implementation details and are + subject to change. + """ + if _data is None: + self._init_with_new_array( + sample_count, dtype, start_index=start_index, capacity=capacity + ) + else: + self._init_with_provided_array( + _data, dtype, start_index=start_index, sample_count=sample_count, capacity=capacity + ) + + def _init_with_new_array( + self, + sample_count: SupportsIndex | None = None, + dtype: npt.DTypeLike = None, + *, + start_index: SupportsIndex | None = None, + capacity: SupportsIndex | None = None, + ) -> None: + start_index = arg_to_uint("start index", start_index) + sample_count = arg_to_uint("sample count", sample_count) + capacity = arg_to_uint("capacity", capacity, sample_count) + + if dtype is None: + dtype = np.float64 + validate_dtype(dtype, _ANALOG_DTYPES) + + if start_index > capacity: + raise ValueError( + "The start index must be less than or equal to the capacity.\n\n" + f"Start index: {start_index}\n" + f"Capacity: {capacity}" + ) + if start_index + sample_count > capacity: + raise ValueError( + "The sum of the start index and sample count must be less than or equal to the capacity.\n\n" + f"Start index: {start_index}\n" + f"Sample count: {sample_count}\n" + f"Capacity: {capacity}" + ) + + self._data = np.zeros(capacity, dtype) + self._start_index = start_index + self._sample_count = sample_count + self._extended_properties = ExtendedPropertyDictionary() + + def _init_with_provided_array( + self, + data: npt.NDArray[_ScalarType_co], + dtype: npt.DTypeLike = None, + *, + start_index: SupportsIndex | None = None, + sample_count: SupportsIndex | None = None, + capacity: SupportsIndex | None = None, + ) -> None: + if not isinstance(data, np.ndarray): + raise TypeError("The input array must be a one-dimensional array.") + if data.ndim != 1: + raise ValueError("The input array must be a one-dimensional array.") + + if dtype is None: + dtype = data.dtype + if dtype != data.dtype: + # from_array_1d() converts the input array to the requested dtype, so this error may be + # unreachable. + raise TypeError("The data type of the input array must match the requested data type.") + validate_dtype(dtype, _ANALOG_DTYPES) + + capacity = arg_to_uint("capacity", capacity, len(data)) + if capacity != len(data): + raise ValueError( + "The capacity must match the input array length.\n\n" + f"Capacity: {capacity}\n" + f"Array length: {len(data)}" + ) + + start_index = arg_to_uint("start index", start_index) + if start_index > capacity: + raise ValueError( + "The start index must be less than or equal to the input array length.\n\n" + f"Start index: {start_index}\n" + f"Capacity: {capacity}" + ) + + sample_count = arg_to_uint("sample count", sample_count, len(data) - start_index) + if start_index + sample_count > len(data): + raise ValueError( + "The sum of the start index and sample count must be less than or equal to the input array length.\n\n" + f"Start index: {start_index}\n" + f"Sample count: {sample_count}\n" + f"Array length: {len(data)}" + ) + + self._data = data + self._start_index = start_index + self._sample_count = sample_count + self._extended_properties = ExtendedPropertyDictionary() + + @property + def raw_data(self) -> npt.NDArray[_ScalarType_co]: + """The raw analog waveform data.""" + return self._data[self._start_index : self._start_index + self._sample_count] + + @property + def scaled_data(self) -> npt.NDArray[np.float64]: + """The scaled analog waveform data.""" + # TODO: implement scaling + return self.raw_data.astype(np.float64) + + @property + def sample_count(self) -> int: + """The number of samples in the analog waveform.""" + return self._sample_count + + @property + def capacity(self) -> int: + """The total capacity available for analog waveform data. + + Setting the capacity resizes the underlying NumPy array in-place. + - Other Python objects with references to the array will see the array size change. + - If the array has a reference to an external buffer (such as an array.array), attempting + to resize it raises ValueError. + """ + return len(self._data) + + @capacity.setter + def capacity(self, value: int) -> None: + if value < 0: + raise ValueError( + "The capacity must be a non-negative integer.\n\n" f"Capacity: {value}" + ) + if value < self._start_index + self._sample_count: + raise ValueError( + "The capacity must be equal to or greater than the number of samples in the waveform.\n\n" + f"Capacity: {value}\n" + f"Number of samples: {self._start_index + self._sample_count}" + ) + if value != len(self._data): + self._data.resize(value, refcheck=False) + + @property + def dtype(self) -> np.dtype[_ScalarType_co]: + """The NumPy dtype for the analog waveform data.""" + return self._data.dtype + + @property + def extended_properties(self) -> ExtendedPropertyDictionary: + """The extended properties for the analog waveform.""" + return self._extended_properties + + @property + def channel_name(self) -> str: + """The name of the device channel from which the analog waveform was acquired.""" + value = self._extended_properties.get(CHANNEL_NAME, "") + assert isinstance(value, str) + return value + + @channel_name.setter + def channel_name(self, value: str) -> None: + if not isinstance(value, str): + raise TypeError("The channel name must be a str.\n\n" f"Channel name: {value!r}") + self._extended_properties[CHANNEL_NAME] = value + + @property + def unit_description(self) -> str: + """The unit of measurement, such as volts, of the analog waveform.""" + value = self._extended_properties.get(UNIT_DESCRIPTION, "") + assert isinstance(value, str) + return value + + @unit_description.setter + def unit_description(self, value: str) -> None: + if not isinstance(value, str): + raise TypeError( + "The unit description must be a str.\n\n" f"Unit description: {value!r}" + ) + self._extended_properties[UNIT_DESCRIPTION] = value diff --git a/src/nitypes/waveform/_extended_properties.py b/src/nitypes/waveform/_extended_properties.py new file mode 100644 index 00000000..2aa73126 --- /dev/null +++ b/src/nitypes/waveform/_extended_properties.py @@ -0,0 +1,62 @@ +from __future__ import annotations + +import operator +import sys +from typing import MutableMapping, Iterator + +if sys.version_info >= (3, 10): + from typing import TypeAlias +else: + from typing import Union + from typing_extensions import TypeAlias + +# Extended property keys +CHANNEL_NAME = "NI_ChannelName" +LINE_NAMES = "NI_LineNames" +UNIT_DESCRIPTION = "NI_UnitDescription" + + +if sys.version_info >= (3, 10): + ExtendedPropertyValue: TypeAlias = bool | float | int | str + """An ExtendedPropertyDictionary value.""" +else: + ExtendedPropertyValue: TypeAlias = Union[bool, float, int, str] + """An ExtendedPropertyDictionary value.""" + + +class ExtendedPropertyDictionary(MutableMapping[str, ExtendedPropertyValue]): + """A dictionary of extended properties.""" + + def __init__(self) -> None: + """Construct an ExtendedPropertyDictionary.""" + self._properties: dict[str, ExtendedPropertyValue] = {} + + def __len__( # noqa: D105 - Missing docstring in magic method (auto-generated noqa) + self, + ) -> int: + return len(self._properties) + + def __iter__( # noqa: D105 - Missing docstring in magic method (auto-generated noqa) + self, + ) -> Iterator[str]: + return iter(self._properties) + + def __contains__( # noqa: D105 - Missing docstring in magic method (auto-generated noqa) + self, x: object, / + ) -> bool: + return operator.contains(self._properties, x) + + def __getitem__( # noqa: D105 - Missing docstring in magic method (auto-generated noqa) + self, key: str, / + ) -> ExtendedPropertyValue: + return operator.getitem(self._properties, key) + + def __setitem__( # noqa: D105 - Missing docstring in magic method (auto-generated noqa) + self, key: str, value: ExtendedPropertyValue, / + ) -> None: + operator.setitem(self._properties, key, value) + + def __delitem__( # noqa: D105 - Missing docstring in magic method (auto-generated noqa) + self, key: str, / + ) -> None: + operator.delitem(self._properties, key) diff --git a/src/nitypes/waveform/_utils.py b/src/nitypes/waveform/_utils.py new file mode 100644 index 00000000..9b121cd4 --- /dev/null +++ b/src/nitypes/waveform/_utils.py @@ -0,0 +1,50 @@ +from __future__ import annotations + +import operator +from typing import SupportsIndex + +import numpy as np +import numpy.typing as npt + + +def arg_to_int(arg_description: str, value: SupportsIndex | None, default_value: int = 0) -> int: + """Convert an argument to a signed integer.""" + if value is None: + return default_value + return operator.index(value) + + +def arg_to_uint(arg_description: str, value: SupportsIndex | None, default_value: int = 0) -> int: + """Convert an argument to an unsigned integer.""" + value = arg_to_int(arg_description, value, default_value) + if value < 0: + raise ValueError( + f"The {arg_description} must be a non-negative integer.\n\nProvided value: {value}" + ) + return value + + +def validate_dtype(dtype: npt.DTypeLike, supported_dtypes: tuple[npt.DTypeLike, ...]) -> None: + """Validate a dtype-like object against a tuple of supported dtype-like objects. + + >>> validate_dtype(np.float64, (np.float64, np.intc, np.long,)) + >>> validate_dtype("float64", (np.float64, np.intc, np.long,)) + >>> validate_dtype(np.float64, (np.byte, np.short, np.intc, np.int_, np.long, np.longlong)) + Traceback (most recent call last): + ... + TypeError: The requested data type is not supported. + + Data type: float64 + Supported data types: int8, int16, int32, int64 + """ + if not isinstance(dtype, (type, np.dtype)): + dtype = np.dtype(dtype) + if not np.isdtype(dtype, supported_dtypes): + # Remove duplicate names because distinct types (e.g. int vs. long) may have the same name + # ("int32"). + supported_dtype_names = {np.dtype(d).name: None for d in supported_dtypes}.keys() + raise TypeError( + "The requested data type is not supported.\n\n" + f"Data type: {np.dtype(dtype)}\n" + f"Supported data types: {', '.join(supported_dtype_names)}" + ) diff --git a/tests/unit/waveform/__init__.py b/tests/unit/waveform/__init__.py new file mode 100644 index 00000000..9bfbbf2e --- /dev/null +++ b/tests/unit/waveform/__init__.py @@ -0,0 +1 @@ +"""Unit tests for the nitypes.waveform package.""" diff --git a/tests/unit/waveform/test_analog_waveform.py b/tests/unit/waveform/test_analog_waveform.py new file mode 100644 index 00000000..5bf69ab1 --- /dev/null +++ b/tests/unit/waveform/test_analog_waveform.py @@ -0,0 +1,689 @@ +from __future__ import annotations + +import array +import itertools +import sys +import weakref +from typing import Any, SupportsIndex + +import numpy as np +import pytest + +from nitypes.waveform import AnalogWaveform + +if sys.version_info >= (3, 11): + from typing import assert_type +else: + from typing_extensions import assert_type + + +############################################################################### +# create +############################################################################### +def test___no_args___create___creates_empty_waveform_with_default_dtype() -> None: + waveform = AnalogWaveform() + + assert waveform.sample_count == waveform.capacity == len(waveform.raw_data) == 0 + assert waveform.dtype == np.float64 + assert_type(waveform, AnalogWaveform[np.float64]) + + +def test___sample_count___create___creates_waveform_with_sample_count_and_default_dtype() -> None: + waveform = AnalogWaveform(10) + + assert waveform.sample_count == waveform.capacity == len(waveform.raw_data) == 10 + assert waveform.dtype == np.float64 + assert_type(waveform, AnalogWaveform[np.float64]) + + +def test___sample_count_and_dtype___create___creates_waveform_with_sample_count_and_dtype() -> None: + waveform = AnalogWaveform(10, np.int32) + + assert waveform.sample_count == waveform.capacity == len(waveform.raw_data) == 10 + assert waveform.dtype == np.int32 + assert_type(waveform, AnalogWaveform[np.int32]) + + +def test___sample_count_and_dtype_str___create___creates_waveform_with_sample_count_and_dtype() -> ( + None +): + waveform = AnalogWaveform(10, "i4") + + assert waveform.sample_count == waveform.capacity == len(waveform.raw_data) == 10 + assert waveform.dtype == np.int32 + assert_type(waveform, AnalogWaveform[Any]) # dtype not inferred from string + + +def test___sample_count_and_dtype_any___create___creates_waveform_with_sample_count_and_dtype() -> ( + None +): + dtype: np.dtype[Any] = np.dtype(np.int32) + waveform = AnalogWaveform(10, dtype) + + assert waveform.sample_count == waveform.capacity == len(waveform.raw_data) == 10 + assert waveform.dtype == np.int32 + assert_type(waveform, AnalogWaveform[Any]) # dtype not inferred from np.dtype[Any] + + +def test___sample_count_dtype_and_capacity___create___creates_waveform_with_sample_count_dtype_and_capacity() -> ( + None +): + waveform = AnalogWaveform(10, np.int32, capacity=20) + + assert waveform.sample_count == len(waveform.raw_data) == 10 + assert waveform.capacity == 20 + assert waveform.dtype == np.int32 + assert_type(waveform, AnalogWaveform[np.int32]) + + +def test___sample_count_and_unsupported_dtype___create___raises_type_error() -> None: + with pytest.raises(TypeError) as exc: + _ = AnalogWaveform(10, np.complex128) + + assert exc.value.args[0].startswith("The requested data type is not supported.") + + +############################################################################### +# from_array_1d +############################################################################### +def test___float64_ndarray___from_array_1d___creates_waveform_with_float64_dtype() -> None: + data = np.array([1.1, 2.2, 3.3, 4.4, 5.5], np.float64) + + waveform = AnalogWaveform.from_array_1d(data) + + assert waveform.raw_data.tolist() == data.tolist() + assert waveform.dtype == np.float64 + assert_type(waveform, AnalogWaveform[np.float64]) + + +def test___int32_ndarray___from_array_1d___creates_waveform_with_int32_dtype() -> None: + data = np.array([1, 2, 3, 4, 5], np.int32) + + waveform = AnalogWaveform.from_array_1d(data) + + assert waveform.raw_data.tolist() == data.tolist() + assert waveform.dtype == np.int32 + assert_type(waveform, AnalogWaveform[np.int32]) + + +def test___int32_array_with_dtype___from_array_1d___creates_waveform_with_specified_dtype() -> None: + data = array.array("i", [1, 2, 3, 4, 5]) + + waveform = AnalogWaveform.from_array_1d(data, np.int32) + + assert waveform.raw_data.tolist() == data.tolist() + assert waveform.dtype == np.int32 + assert_type(waveform, AnalogWaveform[np.int32]) + + +def test___int16_ndarray_with_mismatched_dtype___from_array_1d___creates_waveform_with_specified_dtype() -> ( + None +): + data = np.array([1, 2, 3, 4, 5], np.int16) + + waveform = AnalogWaveform.from_array_1d(data, np.int32) + + assert waveform.raw_data.tolist() == data.tolist() + assert waveform.dtype == np.int32 + assert_type(waveform, AnalogWaveform[np.int32]) + + +def test___int_list_with_dtype___from_array_1d___creates_waveform_with_specified_dtype() -> None: + data = [1, 2, 3, 4, 5] + + waveform = AnalogWaveform.from_array_1d(data, np.int32) + + assert waveform.raw_data.tolist() == data + assert waveform.dtype == np.int32 + assert_type(waveform, AnalogWaveform[np.int32]) + + +def test___int_list_with_dtype_str___from_array_1d___creates_waveform_with_specified_dtype() -> ( + None +): + data = [1, 2, 3, 4, 5] + + waveform = AnalogWaveform.from_array_1d(data, "int32") + + assert waveform.raw_data.tolist() == data # type: ignore[comparison-overlap] + assert waveform.dtype == np.int32 + assert_type(waveform, AnalogWaveform[Any]) # dtype not inferred from string + + +def test___int32_ndarray_2d___from_array_1d___raises_value_error() -> None: + data = np.array([[1, 2, 3], [4, 5, 6]], np.int32) + + with pytest.raises(ValueError) as exc: + _ = AnalogWaveform.from_array_1d(data) + + assert exc.value.args[0].startswith( + "The input array must be a one-dimensional array or sequence." + ) + + +def test___int_list_without_dtype___from_array_1d___raises_value_error() -> None: + data = [1, 2, 3, 4, 5] + + with pytest.raises(ValueError) as exc: + _ = AnalogWaveform.from_array_1d(data) + + assert exc.value.args[0].startswith( + "You must specify a dtype when the input array is a sequence." + ) + + +def test___bytes___from_array_1d___raises_value_error() -> None: + data = b"\x01\x00\x00\x00\x02\x00\x00\x00\x03\x00\x00\x00" + + with pytest.raises(ValueError) as exc: + _ = AnalogWaveform.from_array_1d(data, np.int32) + + assert exc.value.args[0].startswith("invalid literal for int() with base 10:") + + +def test___iterable___from_array_1d___raises_type_error() -> None: + data = itertools.repeat(3) + + with pytest.raises(TypeError) as exc: + _ = AnalogWaveform.from_array_1d(data, np.int32) # type: ignore[call-overload] + + assert exc.value.args[0].startswith( + "The input array must be a one-dimensional array or sequence." + ) + + +def test___ndarray_with_unsupported_dtype___from_array_1d___raises_type_error() -> None: + data = np.zeros(3, np.complex128) + + with pytest.raises(TypeError) as exc: + _ = AnalogWaveform.from_array_1d(data) + + assert exc.value.args[0].startswith("The requested data type is not supported.") + + +def test___copy___from_array_1d___creates_waveform_linked_to_different_buffer() -> None: + data = np.array([1, 2, 3, 4, 5], np.int32) + + waveform = AnalogWaveform.from_array_1d(data, copy=True) + + assert waveform._data is not data + assert waveform.raw_data.tolist() == data.tolist() + data[:] = [5, 4, 3, 2, 1] + assert waveform.raw_data.tolist() != data.tolist() + + +def test___int32_ndarray_no_copy___from_array_1d___creates_waveform_linked_to_same_buffer() -> None: + data = np.array([1, 2, 3, 4, 5], np.int32) + + waveform = AnalogWaveform.from_array_1d(data, copy=False) + + assert waveform._data is data + assert waveform.raw_data.tolist() == data.tolist() + data[:] = [5, 4, 3, 2, 1] + assert waveform.raw_data.tolist() == data.tolist() + + +def test___int32_array_no_copy___from_array_1d___creates_waveform_linked_to_same_buffer() -> None: + data = array.array("i", [1, 2, 3, 4, 5]) + + waveform = AnalogWaveform.from_array_1d(data, dtype=np.int32, copy=False) + + assert waveform.raw_data.tolist() == data.tolist() + data[:] = array.array("i", [5, 4, 3, 2, 1]) + assert waveform.raw_data.tolist() == data.tolist() + + +def test___int_list_no_copy___from_array_1d___raises_value_error() -> None: + data = [1, 2, 3, 4, 5] + + with pytest.raises(ValueError) as exc: + _ = AnalogWaveform.from_array_1d(data, np.int32, copy=False) + + assert exc.value.args[0].startswith( + "Unable to avoid copy while creating an array as requested." + ) + + +@pytest.mark.parametrize( + "start_index, sample_count, expected_data", + [ + (0, None, [1, 2, 3, 4, 5]), + (1, None, [2, 3, 4, 5]), + (4, None, [5]), + (5, None, []), + (0, 1, [1]), + (0, 4, [1, 2, 3, 4]), + (1, 1, [2]), + (1, 3, [2, 3, 4]), + (1, 4, [2, 3, 4, 5]), + ], +) +def test___array_subset___from_array_1d___creates_waveform_with_array_subset( + start_index: SupportsIndex, sample_count: SupportsIndex | None, expected_data: list[int] +) -> None: + data = np.array([1, 2, 3, 4, 5], np.int32) + + waveform = AnalogWaveform.from_array_1d( + data, start_index=start_index, sample_count=sample_count + ) + + assert waveform.raw_data.tolist() == expected_data + + +@pytest.mark.parametrize( + "start_index, sample_count, expected_message", + [ + (-2, None, "The start index must be a non-negative integer."), + (-1, None, "The start index must be a non-negative integer."), + (6, None, "The start index must be less than or equal to the input array length."), + (0, -2, "The sample count must be a non-negative integer."), + (0, -1, "The sample count must be a non-negative integer."), + ( + 0, + 6, + "The sum of the start index and sample count must be less than or equal to the input array length.", + ), + ( + 1, + 5, + "The sum of the start index and sample count must be less than or equal to the input array length.", + ), + ( + 5, + 1, + "The sum of the start index and sample count must be less than or equal to the input array length.", + ), + ], +) +def test___invalid_array_subset___from_array_1d___raises_value_error( + start_index: SupportsIndex, sample_count: SupportsIndex | None, expected_message: str +) -> None: + data = np.array([1, 2, 3, 4, 5], np.int32) + + with pytest.raises(ValueError) as exc: + _ = AnalogWaveform.from_array_1d(data, start_index=start_index, sample_count=sample_count) + + assert exc.value.args[0].startswith(expected_message) + + +############################################################################### +# from_array_2d +############################################################################### +def test___float64_ndarray___from_array_2d___creates_waveform_with_float64_dtype() -> None: + data = np.array([[1.1, 2.2, 3.3], [4.4, 5.5, 6.6]], np.float64) + + waveforms = AnalogWaveform.from_array_2d(data) + + assert len(waveforms) == 2 + for i in range(len(waveforms)): + assert waveforms[i].raw_data.tolist() == data[i].tolist() + assert waveforms[i].dtype == np.float64 + assert_type(waveforms[i], AnalogWaveform[np.float64]) + + +def test___int32_ndarray___from_array_2d___creates_waveform_with_int32_dtype() -> None: + data = np.array([[1, 2, 3], [4, 5, 6]], np.int32) + + waveforms = AnalogWaveform.from_array_2d(data) + + assert len(waveforms) == 2 + for i in range(len(waveforms)): + assert waveforms[i].raw_data.tolist() == data[i].tolist() + assert waveforms[i].dtype == np.int32 + assert_type(waveforms[i], AnalogWaveform[np.int32]) + + +def test___int16_ndarray_with_mismatched_dtype___from_array_2d___creates_waveform_with_specified_dtype() -> ( + None +): + data = np.array([[1, 2, 3], [4, 5, 6]], np.int16) + + waveforms = AnalogWaveform.from_array_2d(data, np.int32) + + assert len(waveforms) == 2 + for i in range(len(waveforms)): + assert waveforms[i].raw_data.tolist() == data[i].tolist() + assert waveforms[i].dtype == np.int32 + assert_type(waveforms[i], AnalogWaveform[np.int32]) + + +def test___int32_array_list_with_dtype___from_array_2d___creates_waveform_with_specified_dtype() -> ( + None +): + data = [array.array("i", [1, 2, 3]), array.array("i", [4, 5, 6])] + + waveforms = AnalogWaveform.from_array_2d(data, np.int32) + + assert len(waveforms) == 2 + for i in range(len(waveforms)): + assert waveforms[i].raw_data.tolist() == data[i].tolist() + assert waveforms[i].dtype == np.int32 + assert_type(waveforms[i], AnalogWaveform[np.int32]) + + +def test___int_list_list_with_dtype___from_array_2d___creates_waveform_with_specified_dtype() -> ( + None +): + data = [[1, 2, 3], [4, 5, 6]] + + waveforms = AnalogWaveform.from_array_2d(data, np.int32) + + assert len(waveforms) == 2 + for i in range(len(waveforms)): + assert waveforms[i].raw_data.tolist() == data[i] + assert waveforms[i].dtype == np.int32 + assert_type(waveforms[i], AnalogWaveform[np.int32]) + + +def test___int_list_list_with_dtype_str___from_array_2d___creates_waveform_with_specified_dtype() -> ( + None +): + data = [[1, 2, 3], [4, 5, 6]] + + waveforms = AnalogWaveform.from_array_2d(data, "int32") + + assert len(waveforms) == 2 + for i in range(len(waveforms)): + assert waveforms[i].raw_data.tolist() == data[i] # type: ignore[comparison-overlap] + assert waveforms[i].dtype == np.int32 + assert_type(waveforms[i], AnalogWaveform[Any]) # dtype not inferred from string + + +def test___int32_ndarray_1d___from_array_2d___raises_value_error() -> None: + data = np.array([1, 2, 3, 4, 5], np.int32) + + with pytest.raises(ValueError) as exc: + _ = AnalogWaveform.from_array_2d(data) + + assert exc.value.args[0].startswith( + "The input array must be a two-dimensional array or nested sequence." + ) + + +def test___int_list_list_without_dtype___from_array_2d___raises_value_error() -> None: + data = [[1, 2, 3], [4, 5, 6]] + + with pytest.raises(ValueError) as exc: + _ = AnalogWaveform.from_array_2d(data) + + assert exc.value.args[0].startswith( + "You must specify a dtype when the input array is a sequence." + ) + + +def test___bytes_list___from_array_2d___raises_value_error() -> None: + data = [ + b"\x01\x00\x00\x00\x02\x00\x00\x00\x03\x00\x00\x00", + b"\x04\x00\x00\x00\x05\x00\x00\x00\x06\x00\x00\x00", + ] + + with pytest.raises(ValueError) as exc: + _ = AnalogWaveform.from_array_2d(data, np.int32) + + assert exc.value.args[0].startswith("invalid literal for int() with base 10:") + + +def test___list_iterable___from_array_2d___raises_type_error() -> None: + data = itertools.repeat([3]) + + with pytest.raises(TypeError) as exc: + _ = AnalogWaveform.from_array_2d(data, np.int32) # type: ignore[call-overload] + + assert exc.value.args[0].startswith( + "The input array must be a two-dimensional array or nested sequence." + ) + + +def test___iterable_list___from_array_2d___raises_type_error() -> None: + data = [itertools.repeat(3), itertools.repeat(4)] + + with pytest.raises(TypeError) as exc: + _ = AnalogWaveform.from_array_2d(data, np.int32) # type: ignore[arg-type] + + assert exc.value.args[0].startswith("int() argument must be") + + +def test___ndarray_with_unsupported_dtype___from_array_2d___raises_type_error() -> None: + data = np.zeros((2, 3), np.complex128) + + with pytest.raises(TypeError) as exc: + _ = AnalogWaveform.from_array_2d(data) + + assert exc.value.args[0].startswith("The requested data type is not supported.") + + +def test___copy___from_array_2d___creates_waveform_linked_to_different_buffer() -> None: + data = np.array([[1, 2, 3], [4, 5, 6]], np.int32) + + waveforms = AnalogWaveform.from_array_2d(data, copy=True) + + assert len(waveforms) == 2 + for i in range(len(waveforms)): + assert waveforms[i].raw_data.tolist() == data[i].tolist() + data[0][:] = [3, 2, 1] + data[1][:] = [6, 5, 4] + for i in range(len(waveforms)): + assert waveforms[i].raw_data.tolist() != data[i].tolist() + + +def test___int32_ndarray_no_copy___from_array_2d___creates_waveform_linked_to_same_buffer() -> None: + data = np.array([[1, 2, 3], [4, 5, 6]], np.int32) + + waveforms = AnalogWaveform.from_array_2d(data, copy=False) + + assert len(waveforms) == 2 + for i in range(len(waveforms)): + assert waveforms[i].raw_data.tolist() == data[i].tolist() + data[0][:] = [3, 2, 1] + data[1][:] = [6, 5, 4] + for i in range(len(waveforms)): + assert waveforms[i].raw_data.tolist() == data[i].tolist() + + +def test___int32_array_list_no_copy___from_array_2d___creates_waveform_linked_to_same_buffer() -> ( + None +): + data = [array.array("i", [1, 2, 3]), array.array("i", [4, 5, 6])] + + waveforms = AnalogWaveform.from_array_2d(data, dtype=np.int32, copy=False) + + assert len(waveforms) == 2 + for i in range(len(waveforms)): + assert waveforms[i].raw_data.tolist() == data[i].tolist() + data[0][:] = array.array("i", [3, 2, 1]) + data[1][:] = array.array("i", [6, 5, 4]) + for i in range(len(waveforms)): + assert waveforms[i].raw_data.tolist() == data[i].tolist() + + +def test___int_list_list_no_copy___from_array_2d___raises_value_error() -> None: + data = [[1, 2, 3], [4, 5, 6]] + + with pytest.raises(ValueError) as exc: + _ = AnalogWaveform.from_array_2d(data, np.int32, copy=False) + + assert exc.value.args[0].startswith( + "Unable to avoid copy while creating an array as requested." + ) + + +@pytest.mark.parametrize( + "start_index, sample_count, expected_data", + [ + (0, None, [[1, 2, 3, 4, 5], [6, 7, 8, 9, 10]]), + (1, None, [[2, 3, 4, 5], [7, 8, 9, 10]]), + (4, None, [[5], [10]]), + (5, None, [[], []]), + (0, 1, [[1], [6]]), + (0, 4, [[1, 2, 3, 4], [6, 7, 8, 9]]), + (1, 1, [[2], [7]]), + (1, 3, [[2, 3, 4], [7, 8, 9]]), + (1, 4, [[2, 3, 4, 5], [7, 8, 9, 10]]), + ], +) +def test___array_subset___from_array_2d___creates_waveform_with_array_subset( + start_index: SupportsIndex, sample_count: SupportsIndex | None, expected_data: list[list[int]] +) -> None: + data = np.array([[1, 2, 3, 4, 5], [6, 7, 8, 9, 10]], np.int32) + + waveforms = AnalogWaveform.from_array_2d( + data, start_index=start_index, sample_count=sample_count + ) + + assert len(waveforms) == 2 + for i in range(len(waveforms)): + assert waveforms[i].raw_data.tolist() == expected_data[i] + + +@pytest.mark.parametrize( + "start_index, sample_count, expected_message", + [ + (-2, None, "The start index must be a non-negative integer."), + (-1, None, "The start index must be a non-negative integer."), + (6, None, "The start index must be less than or equal to the input array length."), + (0, -2, "The sample count must be a non-negative integer."), + (0, -1, "The sample count must be a non-negative integer."), + ( + 0, + 6, + "The sum of the start index and sample count must be less than or equal to the input array length.", + ), + ( + 1, + 5, + "The sum of the start index and sample count must be less than or equal to the input array length.", + ), + ( + 5, + 1, + "The sum of the start index and sample count must be less than or equal to the input array length.", + ), + ], +) +def test___invalid_array_subset___from_array_2d___raises_value_error( + start_index: SupportsIndex, sample_count: SupportsIndex | None, expected_message: str +) -> None: + data = np.array([[1, 2, 3, 4, 5], [6, 7, 8, 9, 10]], np.int32) + + with pytest.raises(ValueError) as exc: + _ = AnalogWaveform.from_array_2d(data, start_index=start_index, sample_count=sample_count) + + assert exc.value.args[0].startswith(expected_message) + + +############################################################################### +# capacity +############################################################################### +@pytest.mark.parametrize( + "capacity, expected_data", + [(3, [1, 2, 3]), (4, [1, 2, 3, 0]), (10, [1, 2, 3, 0, 0, 0, 0, 0, 0, 0])], +) +def test___waveform___set_capacity___resizes_array_and_pads_with_zeros( + capacity: int, expected_data: list[int] +) -> None: + data = [1, 2, 3] + waveform = AnalogWaveform.from_array_1d(data, np.int32) + + waveform.capacity = capacity + + assert waveform.capacity == capacity + assert waveform.raw_data.tolist() == data + assert waveform._data.tolist() == expected_data + + +@pytest.mark.parametrize( + "capacity, expected_message", + [ + (-2, "The capacity must be a non-negative integer."), + (-1, "The capacity must be a non-negative integer."), + (0, "The capacity must be equal to or greater than the number of samples in the waveform."), + (2, "The capacity must be equal to or greater than the number of samples in the waveform."), + ], +) +def test___invalid_capacity___set_capacity___raises_value_error( + capacity: int, expected_message: str +) -> None: + data = [1, 2, 3] + waveform = AnalogWaveform.from_array_1d(data, np.int32) + + with pytest.raises(ValueError) as exc: + waveform.capacity = capacity + + assert exc.value.args[0].startswith(expected_message) + + +def test___referenced_array___set_capacity___reference_sees_size_change() -> None: + data = np.array([1, 2, 3], np.int32) + waveform = AnalogWaveform.from_array_1d(data, np.int32, copy=False) + + waveform.capacity = 10 + + assert len(data) == 10 + assert waveform.capacity == 10 + assert data.tolist() == [1, 2, 3, 0, 0, 0, 0, 0, 0, 0] + assert waveform.raw_data.tolist() == [1, 2, 3] + assert waveform._data.tolist() == [1, 2, 3, 0, 0, 0, 0, 0, 0, 0] + + +def test___array_with_external_buffer___set_capacity___raises_value_error() -> None: + data = array.array("i", [1, 2, 3]) + waveform = AnalogWaveform.from_array_1d(data, np.int32, copy=False) + + with pytest.raises(ValueError) as exc: + waveform.capacity = 10 + + assert exc.value.args[0].startswith("cannot resize this array: it does not own its data") + + +############################################################################### +# misc +############################################################################### +def test___waveform___set_channel_name___sets_extended_property() -> None: + waveform = AnalogWaveform() + + waveform.channel_name = "Dev1/ai0" + + assert waveform.channel_name == "Dev1/ai0" + assert waveform.extended_properties["NI_ChannelName"] == "Dev1/ai0" + + +def test___invalid_type___set_channel_name___raises_type_error() -> None: + waveform = AnalogWaveform() + + with pytest.raises(TypeError) as exc: + waveform.channel_name = 1 # type: ignore[assignment] + + assert exc.value.args[0].startswith("The channel name must be a str.") + + +def test___waveform___set_unit_description___sets_extended_property() -> None: + waveform = AnalogWaveform() + + waveform.unit_description = "Volts" + + assert waveform.unit_description == "Volts" + assert waveform.extended_properties["NI_UnitDescription"] == "Volts" + + +def test___invalid_type___set_unit_description___raises_type_error() -> None: + waveform = AnalogWaveform() + + with pytest.raises(TypeError) as exc: + waveform.unit_description = None # type: ignore[assignment] + + assert exc.value.args[0].startswith("The unit description must be a str.") + + +def test___waveform___set_undefined_property___raises_attribute_error() -> None: + waveform = AnalogWaveform() + + with pytest.raises(AttributeError): + waveform.undefined_property = "Whatever" # type: ignore[attr-defined] + + +def test___waveform___take_weak_ref___references_waveform() -> None: + waveform = AnalogWaveform() + + waveform_ref = weakref.ref(waveform) + + assert waveform_ref() is waveform