diff --git a/src/nitypes/waveform/__init__.py b/src/nitypes/waveform/__init__.py index bf43d817..a4137c40 100644 --- a/src/nitypes/waveform/__init__.py +++ b/src/nitypes/waveform/__init__.py @@ -65,7 +65,7 @@ To construct a complex waveform, use the :any:`ComplexWaveform` class: >>> ComplexWaveform.from_array_1d([1 + 2j, 3 + 4j], np.complex128) -nitypes.waveform.ComplexWaveform(2, complex128, raw_data=array([1.+2.j, 3.+4.j])) +nitypes.waveform.ComplexWaveform(2, raw_data=array([1.+2.j, 3.+4.j])) Scaling complex-number data --------------------------- @@ -84,7 +84,22 @@ array([(1, 2), (3, 4)], dtype=[('real', '>> wfm.scaled_data array([2.5+4.j, 6.5+8.j]) -""" + +Frequency Spectrums +=================== + +A frequency spectrum represents an analog signal with frequency information and extended properties +such as units. + +Constructing spectrums +---------------------- + +To construct a spectrum, use the :any:`Spectrum` class: + +>>> Spectrum.from_array_1d([1, 2, 3], np.float64, start_frequency=100, frequency_increment=10) # doctest: +NORMALIZE_WHITESPACE +nitypes.waveform.Spectrum(3, data=array([1., 2., 3.]), start_frequency=100.0, + frequency_increment=10.0) +""" # noqa: W505 - doc line too long from nitypes.waveform._analog import AnalogWaveform from nitypes.waveform._complex import ComplexWaveform @@ -100,6 +115,7 @@ NoneScaleMode, ScaleMode, ) +from nitypes.waveform._spectrum import Spectrum from nitypes.waveform._timing import ( BaseTiming, PrecisionTiming, @@ -122,6 +138,7 @@ "SampleIntervalMode", "ScaleMode", "ScalingMismatchWarning", + "Spectrum", "Timing", "TimingMismatchError", "TimingMismatchWarning", @@ -141,6 +158,7 @@ SampleIntervalMode.__module__ = __name__ ScaleMode.__module__ = __name__ ScalingMismatchWarning.__module__ = __name__ +Spectrum.__module__ = __name__ Timing.__module__ = __name__ TimingMismatchError.__module__ = __name__ TimingMismatchWarning.__module__ = __name__ diff --git a/src/nitypes/waveform/_analog.py b/src/nitypes/waveform/_analog.py index 3e0c8d10..e8096ab9 100644 --- a/src/nitypes/waveform/_analog.py +++ b/src/nitypes/waveform/_analog.py @@ -265,6 +265,7 @@ def __init__( # noqa: D107 - Missing docstring in __init__ (auto-generated noqa start_index: SupportsIndex | None = ..., capacity: SupportsIndex | None = ..., extended_properties: Mapping[str, ExtendedPropertyValue] | None = ..., + copy_extended_properties: bool = ..., timing: Timing | PrecisionTiming | None = ..., scale_mode: ScaleMode | None = ..., ) -> None: ... @@ -279,6 +280,7 @@ def __init__( # noqa: D107 - Missing docstring in __init__ (auto-generated noqa start_index: SupportsIndex | None = ..., capacity: SupportsIndex | None = ..., extended_properties: Mapping[str, ExtendedPropertyValue] | None = ..., + copy_extended_properties: bool = ..., timing: Timing | PrecisionTiming | None = ..., scale_mode: ScaleMode | None = ..., ) -> None: ... @@ -293,6 +295,7 @@ def __init__( # noqa: D107 - Missing docstring in __init__ (auto-generated noqa start_index: SupportsIndex | None = ..., capacity: SupportsIndex | None = ..., extended_properties: Mapping[str, ExtendedPropertyValue] | None = ..., + copy_extended_properties: bool = ..., timing: Timing | PrecisionTiming | None = ..., scale_mode: ScaleMode | None = ..., ) -> None: ... @@ -307,6 +310,7 @@ def __init__( # noqa: D107 - Missing docstring in __init__ (auto-generated noqa start_index: SupportsIndex | None = ..., capacity: SupportsIndex | None = ..., extended_properties: Mapping[str, ExtendedPropertyValue] | None = ..., + copy_extended_properties: bool = ..., timing: Timing | PrecisionTiming | None = ..., scale_mode: ScaleMode | None = ..., ) -> None: ... diff --git a/src/nitypes/waveform/_complex.py b/src/nitypes/waveform/_complex.py index b82525c4..ddaab68e 100644 --- a/src/nitypes/waveform/_complex.py +++ b/src/nitypes/waveform/_complex.py @@ -253,6 +253,7 @@ def __init__( # noqa: D107 - Missing docstring in __init__ (auto-generated noqa start_index: SupportsIndex | None = ..., capacity: SupportsIndex | None = ..., extended_properties: Mapping[str, ExtendedPropertyValue] | None = ..., + copy_extended_properties: bool = ..., timing: Timing | PrecisionTiming | None = ..., scale_mode: ScaleMode | None = ..., ) -> None: ... @@ -267,6 +268,7 @@ def __init__( # noqa: D107 - Missing docstring in __init__ (auto-generated noqa start_index: SupportsIndex | None = ..., capacity: SupportsIndex | None = ..., extended_properties: Mapping[str, ExtendedPropertyValue] | None = ..., + copy_extended_properties: bool = ..., timing: Timing | PrecisionTiming | None = ..., scale_mode: ScaleMode | None = ..., ) -> None: ... @@ -281,6 +283,7 @@ def __init__( # noqa: D107 - Missing docstring in __init__ (auto-generated noqa start_index: SupportsIndex | None = ..., capacity: SupportsIndex | None = ..., extended_properties: Mapping[str, ExtendedPropertyValue] | None = ..., + copy_extended_properties: bool = ..., timing: Timing | PrecisionTiming | None = ..., scale_mode: ScaleMode | None = ..., ) -> None: ... @@ -295,6 +298,7 @@ def __init__( # noqa: D107 - Missing docstring in __init__ (auto-generated noqa start_index: SupportsIndex | None = ..., capacity: SupportsIndex | None = ..., extended_properties: Mapping[str, ExtendedPropertyValue] | None = ..., + copy_extended_properties: bool = ..., timing: Timing | PrecisionTiming | None = ..., scale_mode: ScaleMode | None = ..., ) -> None: ... diff --git a/src/nitypes/waveform/_exceptions.py b/src/nitypes/waveform/_exceptions.py index 6a6da901..c10cfdf9 100644 --- a/src/nitypes/waveform/_exceptions.py +++ b/src/nitypes/waveform/_exceptions.py @@ -1,5 +1,7 @@ from __future__ import annotations +from typing_extensions import Literal + class TimingMismatchError(RuntimeError): """Exception used when appending waveforms with mismatched timing.""" @@ -7,21 +9,121 @@ class TimingMismatchError(RuntimeError): pass -def input_array_data_type_mismatch(input_dtype: object, waveform_dtype: object) -> TypeError: - """Create a TypeError for an input array data type mismatch.""" - return TypeError( - "The data type of the input array must match the waveform data type.\n\n" - f"Input array data type: {input_dtype}\n" - f"Waveform data type: {waveform_dtype}" +def capacity_mismatch(capacity: int, array_length: int) -> ValueError: + """Create a ValueError for an invalid capacity.""" + return ValueError( + f"The capacity must match the input array length.\n\n" + f"Capacity: {capacity}\n" + f"Array length: {array_length}" ) -def input_waveform_data_type_mismatch(input_dtype: object, waveform_dtype: object) -> TypeError: - """Create a TypeError for an input waveform data type mismatch.""" +def capacity_too_small(capacity: int, min_capacity: int, object_description: str) -> ValueError: + """Create a ValueError for an invalid capacity argument.""" + return ValueError( + f"The capacity must be equal to or greater than the number of samples in the {object_description}.\n\n" + f"Capacity: {capacity}\n" + f"Number of samples: {min_capacity}" + ) + + +def data_type_mismatch( + arg_description: Literal["input array", "input spectrum", "input waveform"], + arg_dtype: object, + other_description: Literal["requested", "spectrum", "waveform"], + other_dtype: object, +) -> TypeError: + """Create a TypeError for a data type mismatch.""" + arg_key = { + "input array": "Input array data type", + "input spectrum": "Input spectrum data type", + "input waveform": "Input waveform data type", + } + other_key = { + "requested": "Requested data type", + "spectrum": "Spectrum data type", + "waveform": "Waveform data type", + } return TypeError( - "The data type of the input waveform must match the waveform data type.\n\n" - f"Input waveform data type: {input_dtype}\n" - f"Waveform data type: {waveform_dtype}" + f"The data type of the {arg_description} must match the {other_description} data type.\n\n" + f"{arg_key[arg_description]}: {arg_dtype}\n" + f"{other_key[other_description]}: {other_dtype}" + ) + + +def irregular_timestamp_count_mismatch( + irregular_timestamp_count: int, + other_description: Literal["input array length", "number of samples in the waveform"], + other: int, + *, + reversed: bool = False, +) -> ValueError: + """Create a ValueError for an irregular timestamp count mismatch.""" + other_key = { + "input array length": "Array length", + "number of samples in the waveform": "Number of samples", + } + if reversed: + raise ValueError( + "The input array length must be equal to the number of irregular timestamps.\n\n" + f"{other_key[other_description]}: {other}\n" + f"Number of timestamps: {irregular_timestamp_count}" + ) + else: + raise ValueError( + f"The number of irregular timestamps must be equal to the {other_description}.\n\n" + f"Number of timestamps: {irregular_timestamp_count}\n" + f"{other_key[other_description]}: {other}" + ) + + +def start_index_too_large( + start_index: int, + capacity_description: Literal[ + "capacity", + "input array length", + "number of samples in the spectrum", + "number of samples in the waveform", + ], + capacity: int, +) -> ValueError: + """Create a ValueError for an invalid start index argument.""" + capacity_key = { + "capacity": "Capacity", + "input array length": "Array length", + "number of samples in the spectrum": "Number of samples", + "number of samples in the waveform": "Number of samples", + } + return ValueError( + f"The start index must be less than or equal to the {capacity_description}.\n\n" + f"Start index: {start_index}\n" + f"{capacity_key[capacity_description]}: {capacity}" + ) + + +def start_index_or_sample_count_too_large( + start_index: int, + sample_count: int, + capacity_description: Literal[ + "capacity", + "input array length", + "number of samples in the spectrum", + "number of samples in the waveform", + ], + capacity: int, +) -> ValueError: + """Create a ValueError for an invalid start index or sample count argument.""" + capacity_key = { + "capacity": "Capacity", + "input array length": "Array length", + "number of samples in the spectrum": "Number of samples", + "number of samples in the waveform": "Number of samples", + } + return ValueError( + f"The sum of the start index and sample count must be less than or equal to the {capacity_description}.\n\n" + f"Start index: {start_index}\n" + f"Sample count: {sample_count}\n" + f"{capacity_key[capacity_description]}: {capacity}" ) diff --git a/src/nitypes/waveform/_numeric.py b/src/nitypes/waveform/_numeric.py index 5227ca47..a8c9e86e 100644 --- a/src/nitypes/waveform/_numeric.py +++ b/src/nitypes/waveform/_numeric.py @@ -15,8 +15,12 @@ from nitypes._arguments import arg_to_uint, validate_dtype, validate_unsupported_arg from nitypes._exceptions import invalid_arg_type, invalid_array_ndim from nitypes.waveform._exceptions import ( - input_array_data_type_mismatch, - input_waveform_data_type_mismatch, + capacity_mismatch, + capacity_too_small, + data_type_mismatch, + irregular_timestamp_count_mismatch, + start_index_or_sample_count_too_large, + start_index_too_large, ) from nitypes.waveform._extended_properties import ( CHANNEL_NAME, @@ -294,17 +298,10 @@ def _init_with_new_array( validate_dtype(dtype, self.__class__._get_supported_raw_dtypes()) if start_index > capacity: - raise ValueError( - "The start index must be less than or equal to the capacity.\n\n" - f"Start index: {start_index}\n" - f"Capacity: {capacity}" - ) + raise start_index_too_large(start_index, "capacity", capacity) if start_index + sample_count > capacity: - raise ValueError( - "The sum of the start index and sample count must be less than or equal to the capacity.\n\n" - f"Start index: {start_index}\n" - f"Sample count: {sample_count}\n" - f"Capacity: {capacity}" + raise start_index_or_sample_count_too_large( + start_index, sample_count, "capacity", capacity ) self._data = np.zeros(capacity, dtype) @@ -328,36 +325,21 @@ def _init_with_provided_array( if dtype is None: dtype = data.dtype if dtype != data.dtype: - raise TypeError( - "The data type of the input array must match the requested data type.\n\n" - f"Array data type: {data.dtype}\n" - f"Requested data type: {np.dtype(dtype)}" - ) + raise data_type_mismatch("input array", data.dtype, "requested", np.dtype(dtype)) validate_dtype(dtype, self.__class__._get_supported_raw_dtypes()) capacity = arg_to_uint("capacity", capacity, len(data)) if capacity != len(data): - raise ValueError( - "The capacity must match the input array length.\n\n" - f"Capacity: {capacity}\n" - f"Array length: {len(data)}" - ) + raise capacity_mismatch(capacity, len(data)) start_index = arg_to_uint("start index", start_index, 0) if start_index > capacity: - raise ValueError( - "The start index must be less than or equal to the input array length.\n\n" - f"Start index: {start_index}\n" - f"Capacity: {capacity}" - ) + raise start_index_too_large(start_index, "input array length", capacity) sample_count = arg_to_uint("sample count", sample_count, len(data) - start_index) if start_index + sample_count > len(data): - raise ValueError( - "The sum of the start index and sample count must be less than or equal to the input array length.\n\n" - f"Start index: {start_index}\n" - f"Sample count: {sample_count}\n" - f"Array length: {len(data)}" + raise start_index_or_sample_count_too_large( + start_index, sample_count, "input array length", len(data) ) self._data = data @@ -383,19 +365,14 @@ def get_raw_data( """ start_index = arg_to_uint("sample index", start_index, 0) if start_index > self.sample_count: - raise ValueError( - "The start index must be less than or equal to the number of samples in the waveform.\n\n" - f"Start index: {start_index}\n" - f"Number of samples: {self.sample_count}" + raise start_index_too_large( + start_index, "number of samples in the waveform", self.sample_count ) sample_count = arg_to_uint("sample count", sample_count, self.sample_count - start_index) if start_index + sample_count > self.sample_count: - raise ValueError( - "The sum of the start index and sample count must be less than or equal to the number of samples in the waveform.\n\n" - f"Start index: {start_index}\n" - f"Sample count: {sample_count}\n" - f"Number of samples: {self.sample_count}" + raise start_index_or_sample_count_too_large( + start_index, sample_count, "number of samples in the waveform", self.sample_count ) return self.raw_data[start_index : start_index + sample_count] @@ -492,12 +469,9 @@ def capacity(self) -> int: @capacity.setter def capacity(self, value: int) -> None: value = arg_to_uint("capacity", value) - if value < self._start_index + self._sample_count: - raise ValueError( - "The capacity must be equal to or greater than the number of samples in the waveform.\n\n" - f"Capacity: {value}\n" - f"Number of samples: {self._start_index + self._sample_count}" - ) + min_capacity = self._start_index + self._sample_count + if value < min_capacity: + raise capacity_too_small(value, min_capacity, "waveform") if value != len(self._data): self._data.resize(value, refcheck=False) @@ -553,10 +527,8 @@ def _set_timing(self, value: _AnyTiming) -> None: def _validate_timing(self, value: _AnyTiming) -> None: if value._timestamps is not None and len(value._timestamps) != self._sample_count: - raise ValueError( - "The number of irregular timestamps is not equal to the number of samples in the waveform.\n\n" - f"Number of timestamps: {len(value._timestamps)}\n" - f"Number of samples in the waveform: {self._sample_count}" + raise irregular_timestamp_count_mismatch( + len(value._timestamps), "number of samples in the waveform", self._sample_count ) @property @@ -681,14 +653,12 @@ def _append_array( timestamps: Sequence[dt.datetime] | Sequence[ht.datetime] | None = None, ) -> None: if array.dtype != self.dtype: - raise input_array_data_type_mismatch(array.dtype, self.dtype) + raise data_type_mismatch("input array", array.dtype, "waveform", self.dtype) if array.ndim != 1: raise invalid_array_ndim("input array", "one-dimensional array", array.ndim) if timestamps is not None and len(array) != len(timestamps): - raise ValueError( - "The number of irregular timestamps must be equal to the input array length.\n\n" - f"Number of timestamps: {len(timestamps)}\n" - f"Array length: {len(array)}" + raise irregular_timestamp_count_mismatch( + len(timestamps), "input array length", len(array) ) new_timing = self._timing._append_timestamps(timestamps) @@ -708,7 +678,7 @@ def _append_waveforms( ) -> None: for waveform in waveforms: if waveform.dtype != self.dtype: - raise input_waveform_data_type_mismatch(waveform.dtype, self.dtype) + raise data_type_mismatch("input waveform", waveform.dtype, "waveform", self.dtype) if waveform._scale_mode != self._scale_mode: warnings.warn(scale_mode_mismatch()) @@ -761,14 +731,12 @@ def _load_array( sample_count: SupportsIndex | None = None, ) -> None: if array.dtype != self.dtype: - raise input_array_data_type_mismatch(array.dtype, self.dtype) + raise data_type_mismatch("input array", array.dtype, "waveform", self.dtype) if array.ndim != 1: raise invalid_array_ndim("input array", "one-dimensional array", array.ndim) if self._timing._timestamps is not None and len(array) != len(self._timing._timestamps): - raise ValueError( - "The input array length must be equal to the number of irregular timestamps.\n\n" - f"Array length: {len(array)}\n" - f"Number of timestamps: {len(self._timing._timestamps)}" + raise irregular_timestamp_count_mismatch( + len(self._timing._timestamps), "input array length", len(array), reversed=True ) start_index = arg_to_uint("start index", start_index, 0) @@ -816,7 +784,7 @@ def _unpickle(cls, args: tuple[Any, ...], kwargs: dict[str, Any]) -> Self: def __repr__(self) -> str: """Return repr(self).""" args = [f"{self._sample_count}"] - if self.dtype != np.float64: + if self.dtype != self.__class__._get_default_raw_dtype(): args.append(f"{self.dtype.name}") # start_index and capacity are not shown because they are allocation details. raw_data hides # the unused data before start_index and after start_index+sample_count. diff --git a/src/nitypes/waveform/_spectrum.py b/src/nitypes/waveform/_spectrum.py new file mode 100644 index 00000000..27d05a9c --- /dev/null +++ b/src/nitypes/waveform/_spectrum.py @@ -0,0 +1,732 @@ +from __future__ import annotations + +import sys +from collections.abc import Mapping, Sequence +from typing import Any, Generic, SupportsFloat, SupportsIndex, Union, final, overload + +import numpy as np +import numpy.typing as npt +from typing_extensions import Self, TypeVar + +from nitypes._arguments import arg_to_float, arg_to_uint, validate_dtype +from nitypes._exceptions import invalid_arg_type, invalid_array_ndim +from nitypes.waveform._exceptions import ( + capacity_mismatch, + capacity_too_small, + data_type_mismatch, + start_index_or_sample_count_too_large, + start_index_too_large, +) +from nitypes.waveform._extended_properties import ( + CHANNEL_NAME, + UNIT_DESCRIPTION, + ExtendedPropertyDictionary, + ExtendedPropertyValue, +) + +if sys.version_info < (3, 10): + import array as std_array + +_TData = TypeVar("_TData", bound=Union[np.floating, np.integer]) +_TData_co = TypeVar("_TData_co", bound=Union[np.floating, np.integer], covariant=True) + +# Use the C types here because np.isdtype() considers some of them to be distinct types, even when +# they have the same size (e.g. np.intc vs. np.int_ vs. np.long). +_DATA_DTYPES = ( + # Floating point + np.single, + np.double, + # Signed integers + np.byte, + np.short, + np.intc, + np.int_, + np.long, + np.longlong, + # Unsigned integers + np.ubyte, + np.ushort, + np.uintc, + np.uint, + np.ulong, + np.ulonglong, +) + + +@final +class Spectrum(Generic[_TData_co]): + """A frequency spectrum, which encapsulates analog data and frequency information.""" + + @overload + @classmethod + def from_array_1d( + cls, + array: npt.NDArray[_TData], + dtype: None = ..., + *, + copy: bool = ..., + start_index: SupportsIndex | None = ..., + sample_count: SupportsIndex | None = ..., + start_frequency: SupportsFloat | None = ..., + frequency_increment: SupportsFloat | None = ..., + extended_properties: Mapping[str, ExtendedPropertyValue] | None = ..., + ) -> Spectrum[_TData]: ... + + @overload + @classmethod + def from_array_1d( + cls, + array: npt.NDArray[Any] | Sequence[Any], + dtype: type[_TData] | np.dtype[_TData], + *, + copy: bool = ..., + start_index: SupportsIndex | None = ..., + sample_count: SupportsIndex | None = ..., + start_frequency: SupportsFloat | None = ..., + frequency_increment: SupportsFloat | None = ..., + extended_properties: Mapping[str, ExtendedPropertyValue] | None = ..., + ) -> Spectrum[_TData]: ... + + @overload + @classmethod + def from_array_1d( + cls, + array: npt.NDArray[Any] | Sequence[Any], + dtype: npt.DTypeLike = ..., + *, + copy: bool = ..., + start_index: SupportsIndex | None = ..., + sample_count: SupportsIndex | None = ..., + start_frequency: SupportsFloat | None = ..., + frequency_increment: SupportsFloat | None = ..., + extended_properties: Mapping[str, ExtendedPropertyValue] | None = ..., + ) -> Spectrum[Any]: ... + + @classmethod + def from_array_1d( + cls, + array: npt.NDArray[Any] | Sequence[Any], + dtype: npt.DTypeLike = None, + *, + copy: bool = True, + start_index: SupportsIndex | None = 0, + sample_count: SupportsIndex | None = None, + start_frequency: SupportsFloat | None = None, + frequency_increment: SupportsFloat | None = None, + extended_properties: Mapping[str, ExtendedPropertyValue] | None = None, + ) -> Spectrum[Any]: + """Construct a spectrum from a one-dimensional array or sequence. + + Args: + array: The spectrum data as a one-dimensional array or a sequence. + dtype: The NumPy data type for the spectrum data. This argument is required + when array is a sequence. + copy: Specifies whether to copy the array or save a reference to it. + start_index: The sample index at which the spectrum data begins. + sample_count: The number of samples in the spectrum. + start_frequency: The start frequency of the spectrum. + frequency_increment: The frequency increment of the spectrum. + extended_properties: The extended properties of the spectrum. + + Returns: + A spectrum containing the specified data. + """ + if isinstance(array, np.ndarray): + if array.ndim != 1: + raise invalid_array_ndim( + "input array", "one-dimensional array or sequence", array.ndim + ) + elif isinstance(array, Sequence) or ( + sys.version_info < (3, 10) and isinstance(array, std_array.array) + ): + if dtype is None: + raise ValueError("You must specify a dtype when the input array is a sequence.") + else: + raise invalid_arg_type("input array", "one-dimensional array or sequence", array) + + return cls( + data=np.asarray(array, dtype, copy=copy), + start_index=start_index, + sample_count=sample_count, + start_frequency=start_frequency, + frequency_increment=frequency_increment, + extended_properties=extended_properties, + ) + + @overload + @classmethod + def from_array_2d( + cls, + array: npt.NDArray[_TData], + dtype: None = ..., + *, + copy: bool = ..., + start_index: SupportsIndex | None = ..., + sample_count: SupportsIndex | None = ..., + start_frequency: SupportsFloat | None = ..., + frequency_increment: SupportsFloat | None = ..., + extended_properties: Mapping[str, ExtendedPropertyValue] | None = ..., + ) -> list[Spectrum[_TData]]: ... + + @overload + @classmethod + def from_array_2d( + cls, + array: npt.NDArray[Any] | Sequence[Sequence[Any]], + dtype: type[_TData] | np.dtype[_TData], + *, + copy: bool = ..., + start_index: SupportsIndex | None = ..., + sample_count: SupportsIndex | None = ..., + start_frequency: SupportsFloat | None = ..., + frequency_increment: SupportsFloat | None = ..., + extended_properties: Mapping[str, ExtendedPropertyValue] | None = ..., + ) -> list[Spectrum[_TData]]: ... + + @overload + @classmethod + def from_array_2d( + cls, + array: npt.NDArray[Any] | Sequence[Sequence[Any]], + dtype: npt.DTypeLike = ..., + *, + copy: bool = ..., + start_index: SupportsIndex | None = ..., + sample_count: SupportsIndex | None = ..., + start_frequency: SupportsFloat | None = ..., + frequency_increment: SupportsFloat | None = ..., + extended_properties: Mapping[str, ExtendedPropertyValue] | None = ..., + ) -> list[Spectrum[Any]]: ... + + @classmethod + def from_array_2d( + cls, + array: npt.NDArray[Any] | Sequence[Sequence[Any]], + dtype: npt.DTypeLike = None, + *, + copy: bool = True, + start_index: SupportsIndex | None = 0, + sample_count: SupportsIndex | None = None, + start_frequency: SupportsFloat | None = None, + frequency_increment: SupportsFloat | None = None, + extended_properties: Mapping[str, ExtendedPropertyValue] | None = None, + ) -> list[Spectrum[Any]]: + """Construct a list of spectrums from a two-dimensional array or nested sequence. + + Args: + array: The spectrum data as a two-dimensional array or a nested sequence. + dtype: The NumPy data type for the spectrum data. This argument is required + when array is a sequence. + copy: Specifies whether to copy the array or save a reference to it. + start_index: The sample index at which the spectrum data begins. + sample_count: The number of samples in the spectrum. + start_frequency: The start frequency of the spectrum. + frequency_increment: The frequency increment of the spectrum. + extended_properties: The extended properties of the spectrum. + + Returns: + A list containing a spectrum for each row of the specified data. + + When constructing multiple spectrums, the same extended properties, timing + information, and scale mode are applied to all spectrums. Consider assigning + these properties after construction. + """ + if isinstance(array, np.ndarray): + if array.ndim != 2: + raise invalid_array_ndim( + "input array", "two-dimensional array or nested sequence", array.ndim + ) + elif isinstance(array, Sequence) or ( + sys.version_info < (3, 10) and isinstance(array, std_array.array) + ): + if dtype is None: + raise ValueError("You must specify a dtype when the input array is a sequence.") + else: + raise invalid_arg_type("input array", "two-dimensional array or nested sequence", array) + + return [ + cls( + data=np.asarray(array[i], dtype, copy=copy), + start_index=start_index, + sample_count=sample_count, + start_frequency=start_frequency, + frequency_increment=frequency_increment, + extended_properties=extended_properties, + ) + for i in range(len(array)) + ] + + __slots__ = [ + "_data", + "_start_index", + "_sample_count", + "_start_frequency", + "_frequency_increment", + "_extended_properties", + "__weakref__", + ] + + _data: npt.NDArray[_TData_co] + _start_index: int + _sample_count: int + _start_frequency: float + _frequency_increment: float + _extended_properties: ExtendedPropertyDictionary + + # If neither dtype nor data is specified, _TData_co defaults to np.float64. + @overload + def __init__( # noqa: D107 - Missing docstring in __init__ (auto-generated noqa) + self: Spectrum[np.float64], + sample_count: SupportsIndex | None = ..., + dtype: None = ..., + *, + data: None = ..., + start_index: SupportsIndex | None = ..., + capacity: SupportsIndex | None = ..., + start_frequency: SupportsFloat | None = ..., + frequency_increment: SupportsFloat | None = ..., + extended_properties: Mapping[str, ExtendedPropertyValue] | None = ..., + copy_extended_properties: bool = ..., + ) -> None: ... + + @overload + def __init__( # noqa: D107 - Missing docstring in __init__ (auto-generated noqa) + self: Spectrum[_TData], + sample_count: SupportsIndex | None = ..., + dtype: type[_TData] | np.dtype[_TData] = ..., + *, + data: None = ..., + start_index: SupportsIndex | None = ..., + capacity: SupportsIndex | None = ..., + start_frequency: SupportsFloat | None = ..., + frequency_increment: SupportsFloat | None = ..., + extended_properties: Mapping[str, ExtendedPropertyValue] | None = ..., + copy_extended_properties: bool = ..., + ) -> None: ... + + @overload + def __init__( # noqa: D107 - Missing docstring in __init__ (auto-generated noqa) + self: Spectrum[_TData], + sample_count: SupportsIndex | None = ..., + dtype: None = ..., + *, + data: npt.NDArray[_TData] = ..., + start_index: SupportsIndex | None = ..., + capacity: SupportsIndex | None = ..., + start_frequency: SupportsFloat | None = ..., + frequency_increment: SupportsFloat | None = ..., + extended_properties: Mapping[str, ExtendedPropertyValue] | None = ..., + copy_extended_properties: bool = ..., + ) -> None: ... + + @overload + def __init__( # noqa: D107 - Missing docstring in __init__ (auto-generated noqa) + self: Spectrum[Any], + sample_count: SupportsIndex | None = ..., + dtype: npt.DTypeLike = ..., + *, + data: npt.NDArray[Any] | None = ..., + start_index: SupportsIndex | None = ..., + capacity: SupportsIndex | None = ..., + start_frequency: SupportsFloat | None = ..., + frequency_increment: SupportsFloat | None = ..., + extended_properties: Mapping[str, ExtendedPropertyValue] | None = ..., + copy_extended_properties: bool = ..., + ) -> None: ... + + def __init__( + self, + sample_count: SupportsIndex | None = None, + dtype: npt.DTypeLike = None, + *, + data: npt.NDArray[Any] | None = None, + start_index: SupportsIndex | None = None, + capacity: SupportsIndex | None = None, + start_frequency: SupportsFloat | None = None, + frequency_increment: SupportsFloat | None = None, + extended_properties: Mapping[str, ExtendedPropertyValue] | None = None, + copy_extended_properties: bool = True, + ) -> None: + """Construct a frequency spectrum. + + Args: + sample_count: The number of samples in the spectrum. + dtype: The NumPy data type for the spectrum data. + data: A NumPy ndarray to use for sample storage. The spectrum takes ownership + of this array. If not specified, an ndarray is created based on the specified dtype, + start index, sample count, and capacity. + start_index: The sample index at which the spectrum data begins. + sample_count: The number of samples in the spectrum. + capacity: The number of samples to allocate. Pre-allocating a larger buffer optimizes + appending samples to the spectrum. + start_frequency: The start frequency of the spectrum. + frequency_increment: The frequency increment of the spectrum. + extended_properties: The extended properties of the spectrum. + copy_extended_properties: Specifies whether to copy the extended properties or take + ownership. + + Returns: + A frequency spectrum. + """ + if data is None: + self._init_with_new_array( + sample_count, dtype, start_index=start_index, capacity=capacity + ) + elif isinstance(data, np.ndarray): + self._init_with_provided_array( + data, + dtype, + start_index=start_index, + sample_count=sample_count, + capacity=capacity, + ) + else: + raise invalid_arg_type("raw data", "NumPy ndarray", data) + + self._start_frequency = arg_to_float("start frequency", start_frequency, 0.0) + self._frequency_increment = arg_to_float("frequency increment", frequency_increment, 0.0) + + if copy_extended_properties or not isinstance( + extended_properties, ExtendedPropertyDictionary + ): + extended_properties = ExtendedPropertyDictionary(extended_properties) + self._extended_properties = extended_properties + + def _init_with_new_array( + self, + sample_count: SupportsIndex | None = None, + dtype: npt.DTypeLike = None, + *, + start_index: SupportsIndex | None = None, + capacity: SupportsIndex | None = None, + ) -> None: + start_index = arg_to_uint("start index", start_index, 0) + sample_count = arg_to_uint("sample count", sample_count, 0) + capacity = arg_to_uint("capacity", capacity, sample_count) + + if dtype is None: + dtype = np.float64 + validate_dtype(dtype, _DATA_DTYPES) + + if start_index > capacity: + raise start_index_too_large(start_index, "capacity", capacity) + if start_index + sample_count > capacity: + raise start_index_or_sample_count_too_large( + start_index, sample_count, "capacity", capacity + ) + + self._data = np.zeros(capacity, dtype) + self._start_index = start_index + self._sample_count = sample_count + + def _init_with_provided_array( + self, + data: npt.NDArray[_TData_co], + dtype: npt.DTypeLike = None, + *, + start_index: SupportsIndex | None = None, + sample_count: SupportsIndex | None = None, + capacity: SupportsIndex | None = None, + ) -> None: + if not isinstance(data, np.ndarray): + raise invalid_arg_type("input array", "one-dimensional array", data) + if data.ndim != 1: + raise invalid_array_ndim("input array", "one-dimensional array", data.ndim) + + if dtype is None: + dtype = data.dtype + if dtype != data.dtype: + raise data_type_mismatch("input array", data.dtype, "requested", np.dtype(dtype)) + validate_dtype(dtype, _DATA_DTYPES) + + capacity = arg_to_uint("capacity", capacity, len(data)) + if capacity != len(data): + raise capacity_mismatch(capacity, len(data)) + + start_index = arg_to_uint("start index", start_index, 0) + if start_index > capacity: + raise start_index_too_large(start_index, "input array length", capacity) + + sample_count = arg_to_uint("sample count", sample_count, len(data) - start_index) + if start_index + sample_count > len(data): + raise start_index_or_sample_count_too_large( + start_index, sample_count, "input array length", len(data) + ) + + self._data = data + self._start_index = start_index + self._sample_count = sample_count + + @property + def data(self) -> npt.NDArray[_TData_co]: + """The spectrum data.""" + return self._data[self._start_index : self._start_index + self._sample_count] + + def get_data( + self, start_index: SupportsIndex | None = 0, sample_count: SupportsIndex | None = None + ) -> npt.NDArray[_TData_co]: + """Get a subset of the spectrum data. + + Args: + start_index: The sample index at which the data begins. + sample_count: The number of samples to return. + + Returns: + A subset of the spectrum data. + """ + start_index = arg_to_uint("start index", start_index, 0) + if start_index > self.sample_count: + raise start_index_too_large( + start_index, "number of samples in the spectrum", self.sample_count + ) + + sample_count = arg_to_uint("sample count", sample_count, self.sample_count - start_index) + if start_index + sample_count > self.sample_count: + raise start_index_or_sample_count_too_large( + start_index, sample_count, "number of samples in the spectrum", self.sample_count + ) + + return self.data[start_index : start_index + sample_count] + + @property + def sample_count(self) -> int: + """The number of samples in the spectrum.""" + return self._sample_count + + @property + def capacity(self) -> int: + """The total capacity available for spectrum data. + + Setting the capacity resizes the underlying NumPy array in-place. + + * Other Python objects with references to the array will see the array size change. + * If the array has a reference to an external buffer (such as an array.array), attempting + to resize it raises ValueError. + """ + return len(self._data) + + @capacity.setter + def capacity(self, value: int) -> None: + value = arg_to_uint("capacity", value) + min_capacity = self._start_index + self._sample_count + if value < min_capacity: + raise capacity_too_small(value, min_capacity, "spectrum") + if value != len(self._data): + self._data.resize(value, refcheck=False) + + @property + def dtype(self) -> np.dtype[_TData_co]: + """The NumPy dtype for the spectrum data.""" + return self._data.dtype + + @property + def start_frequency(self) -> float: + """The start frequency of the spectrum.""" + return self._start_frequency + + @start_frequency.setter + def start_frequency(self, value: float) -> None: + if not isinstance(value, (float, int)): + raise invalid_arg_type("start frequency", "float", value) + self._start_frequency = value + + @property + def frequency_increment(self) -> float: + """The frequency increment of the spectrum.""" + return self._frequency_increment + + @frequency_increment.setter + def frequency_increment(self, value: float) -> None: + if not isinstance(value, (float, int)): + raise invalid_arg_type("frequency increment", "float", value) + self._frequency_increment = value + + @property + def extended_properties(self) -> ExtendedPropertyDictionary: + """The extended properties for the spectrum.""" + return self._extended_properties + + @property + def channel_name(self) -> str: + """The name of the device channel from which the spectrum was acquired.""" + value = self._extended_properties.get(CHANNEL_NAME, "") + assert isinstance(value, str) + return value + + @channel_name.setter + def channel_name(self, value: str) -> None: + if not isinstance(value, str): + raise invalid_arg_type("channel name", "str", value) + self._extended_properties[CHANNEL_NAME] = value + + @property + def unit_description(self) -> str: + """The unit of measurement, such as volts, of the spectrum.""" + value = self._extended_properties.get(UNIT_DESCRIPTION, "") + assert isinstance(value, str) + return value + + @unit_description.setter + def unit_description(self, value: str) -> None: + if not isinstance(value, str): + raise invalid_arg_type("unit description", "str", value) + self._extended_properties[UNIT_DESCRIPTION] = value + + def append( + self, + other: npt.NDArray[_TData_co] | Spectrum[_TData_co] | Sequence[Spectrum[_TData_co]], + /, + ) -> None: + """Append data to the spectrum. + + Args: + other: The array or spectrum(s) to append. + + Raises: + ValueError: The other array has the wrong number of dimensions. + TypeError: The data types of the current spectrum and other array or spectrum(s) do not + match, or an argument has the wrong data type. + + When appending spectrums: + + * Extended properties of the other spectrum(s) are merged into the current spectrum if they + are not already set in the current spectrum. + """ + if isinstance(other, np.ndarray): + self._append_array(other) + elif isinstance(other, Spectrum): + self._append_spectrum(other) + elif isinstance(other, Sequence) and all(isinstance(x, Spectrum) for x in other): + self._append_spectrums(other) + else: + raise invalid_arg_type("input", "array or spectrum(s)", other) + + def _append_array( + self, + array: npt.NDArray[_TData_co], + ) -> None: + if array.dtype != self.dtype: + raise data_type_mismatch("input array", array.dtype, "spectrum", self.dtype) + if array.ndim != 1: + raise invalid_array_ndim("input array", "one-dimensional array", array.ndim) + + self._increase_capacity(len(array)) + + offset = self._start_index + self._sample_count + self._data[offset : offset + len(array)] = array + self._sample_count += len(array) + + def _append_spectrum(self, spectrum: Spectrum[_TData_co]) -> None: + self._append_spectrums([spectrum]) + + def _append_spectrums(self, spectrums: Sequence[Spectrum[_TData_co]]) -> None: + for spectrum in spectrums: + if spectrum.dtype != self.dtype: + raise data_type_mismatch("input spectrum", spectrum.dtype, "spectrum", self.dtype) + + self._increase_capacity(sum(spectrum.sample_count for spectrum in spectrums)) + + offset = self._start_index + self._sample_count + for spectrum in spectrums: + self._data[offset : offset + spectrum.sample_count] = spectrum.data + offset += spectrum.sample_count + self._sample_count += spectrum.sample_count + self._extended_properties._merge(spectrum._extended_properties) + + def _increase_capacity(self, amount: int) -> None: + new_capacity = self._start_index + self._sample_count + amount + if new_capacity > self.capacity: + self.capacity = new_capacity + + def load_data( + self, + array: npt.NDArray[_TData_co], + *, + copy: bool = True, + start_index: SupportsIndex | None = 0, + sample_count: SupportsIndex | None = None, + ) -> None: + """Load new data into an existing spectrum. + + Args: + array: A NumPy array containing the data to load. + copy: Specifies whether to copy the array or save a reference to it. + start_index: The sample index at which the spectrum data begins. + sample_count: The number of samples in the spectrum. + """ + if isinstance(array, np.ndarray): + self._load_array(array, copy=copy, start_index=start_index, sample_count=sample_count) + else: + raise invalid_arg_type("input array", "array", array) + + def _load_array( + self, + array: npt.NDArray[_TData_co], + *, + copy: bool = True, + start_index: SupportsIndex | None = 0, + sample_count: SupportsIndex | None = None, + ) -> None: + if array.dtype != self.dtype: + raise data_type_mismatch("input array", array.dtype, "spectrum", self.dtype) + if array.ndim != 1: + raise invalid_array_ndim("input array", "one-dimensional array", array.ndim) + + start_index = arg_to_uint("start index", start_index, 0) + sample_count = arg_to_uint("sample count", sample_count, len(array) - start_index) + + if copy: + if sample_count > len(self._data): + self.capacity = sample_count + self._data[0:sample_count] = array[start_index : start_index + sample_count] + self._start_index = 0 + self._sample_count = sample_count + else: + self._data = array + self._start_index = start_index + self._sample_count = sample_count + + def __eq__(self, value: object, /) -> bool: + """Return self==value.""" + if not isinstance(value, self.__class__): + return NotImplemented + return ( + self.dtype == value.dtype + and np.array_equal(self.data, value.data) + and self.start_frequency == value.start_frequency + and self.frequency_increment == value.frequency_increment + and self._extended_properties == value._extended_properties + ) + + def __reduce__(self) -> tuple[Any, ...]: + """Return object state for pickling.""" + ctor_args = (self._sample_count, self.dtype) + ctor_kwargs: dict[str, Any] = { + "data": self.data, + "start_frequency": self._start_frequency, + "frequency_increment": self._frequency_increment, + "extended_properties": self._extended_properties, + "copy_extended_properties": False, + } + return (self.__class__._unpickle, (ctor_args, ctor_kwargs)) + + @classmethod + def _unpickle(cls, args: tuple[Any, ...], kwargs: dict[str, Any]) -> Self: + return cls(*args, **kwargs) + + def __repr__(self) -> str: + """Return repr(self).""" + args = [f"{self._sample_count}"] + if self.dtype != np.float64: + args.append(f"{self.dtype.name}") + # start_index and capacity are not shown because they are allocation details. data hides + # the unused data before start_index and after start_index+sample_count. + if self._sample_count > 0: + args.append(f"data={self.data!r}") + if self._start_frequency != 0.0: + args.append(f"start_frequency={self.start_frequency!r}") + if self._frequency_increment != 0.0: + args.append(f"frequency_increment={self._frequency_increment!r}") + if self._extended_properties: + args.append(f"extended_properties={self._extended_properties._properties!r}") + return f"{self.__class__.__module__}.{self.__class__.__name__}({', '.join(args)})" diff --git a/tests/unit/waveform/test_analog_waveform.py b/tests/unit/waveform/test_analog_waveform.py index 13048316..714722e5 100644 --- a/tests/unit/waveform/test_analog_waveform.py +++ b/tests/unit/waveform/test_analog_waveform.py @@ -1903,7 +1903,8 @@ def test___waveform___pickle___references_public_modules() -> None: value_bytes = pickle.dumps(value) assert b"nitypes.waveform" in value_bytes - assert b"nitypes.waveform._analog_waveform" not in value_bytes + assert b"nitypes.waveform._analog" not in value_bytes assert b"nitypes.waveform._extended_properties" not in value_bytes + assert b"nitypes.waveform._numeric" not in value_bytes assert b"nitypes.waveform._timing" not in value_bytes assert b"nitypes.waveform._scaling" not in value_bytes diff --git a/tests/unit/waveform/test_complex_waveform.py b/tests/unit/waveform/test_complex_waveform.py index 25df9d94..cdd98dc3 100644 --- a/tests/unit/waveform/test_complex_waveform.py +++ b/tests/unit/waveform/test_complex_waveform.py @@ -1,14 +1,24 @@ from __future__ import annotations +import copy +import datetime as dt +import pickle from typing import Any +import hightime as ht import numpy as np import numpy.typing as npt import pytest from typing_extensions import assert_type from nitypes.complex import ComplexInt32Base, ComplexInt32DType -from nitypes.waveform import ComplexWaveform, LinearScaleMode +from nitypes.waveform import ( + NO_SCALING, + ComplexWaveform, + LinearScaleMode, + PrecisionTiming, + Timing, +) ############################################################################### @@ -322,3 +332,322 @@ def test___complexint32_waveform_with_unknown_structured_dtype___get_scaled_data assert exc.value.args[0].startswith("The requested data type is not supported.") assert "Data type: [('a', ' None: + assert left == right + assert not (left != right) + + +@pytest.mark.parametrize( + "left, right", + [ + (ComplexWaveform(), ComplexWaveform(10)), + (ComplexWaveform(10), ComplexWaveform(11)), + (ComplexWaveform(10, np.complex128), ComplexWaveform(10, ComplexInt32DType)), + ( + ComplexWaveform(15, ComplexInt32DType, start_index=5, capacity=20), + ComplexWaveform(10, ComplexInt32DType, start_index=5, capacity=20), + ), + ( + ComplexWaveform.from_array_1d([1 + 2j, 3 + 5j, 5 + 6j], np.complex128), + ComplexWaveform.from_array_1d([1 + 2j, 3 + 4j, 5 + 6j], np.complex128), + ), + ( + ComplexWaveform.from_array_1d([(1, 2), (3, 4), (5, 6)], ComplexInt32DType), + ComplexWaveform.from_array_1d([1 + 2j, 3 + 4j, 5 + 6j], np.complex128), + ), + ( + ComplexWaveform( + timing=Timing.create_with_regular_interval(dt.timedelta(milliseconds=1)) + ), + ComplexWaveform( + timing=Timing.create_with_regular_interval(dt.timedelta(milliseconds=2)) + ), + ), + ( + ComplexWaveform( + timing=PrecisionTiming.create_with_regular_interval(ht.timedelta(milliseconds=1)) + ), + ComplexWaveform( + timing=PrecisionTiming.create_with_regular_interval(ht.timedelta(milliseconds=2)) + ), + ), + ( + ComplexWaveform( + extended_properties={"NI_ChannelName": "Dev1/ai0", "NI_UnitDescription": "Volts"} + ), + ComplexWaveform( + extended_properties={"NI_ChannelName": "Dev1/ai0", "NI_UnitDescription": "Amps"} + ), + ), + ( + ComplexWaveform(scale_mode=LinearScaleMode(2.0, 1.0)), + ComplexWaveform(scale_mode=LinearScaleMode(2.0, 1.1)), + ), + ( + ComplexWaveform(scale_mode=NO_SCALING), + ComplexWaveform(scale_mode=LinearScaleMode(2.0, 1.0)), + ), + # __eq__ does not convert timing, even if the values are equivalent. + ( + ComplexWaveform( + timing=Timing.create_with_regular_interval(dt.timedelta(milliseconds=1)) + ), + ComplexWaveform( + timing=PrecisionTiming.create_with_regular_interval(ht.timedelta(milliseconds=1)) + ), + ), + ( + ComplexWaveform( + timing=PrecisionTiming.create_with_regular_interval(ht.timedelta(milliseconds=1)) + ), + ComplexWaveform( + timing=Timing.create_with_regular_interval(dt.timedelta(milliseconds=1)) + ), + ), + ], +) +def test___different_value___equality___not_equal( + left: ComplexWaveform[Any], right: ComplexWaveform[Any] +) -> None: + assert not (left == right) + assert left != right + + +@pytest.mark.parametrize( + "value, expected_repr", + [ + (ComplexWaveform(), "nitypes.waveform.ComplexWaveform(0)"), + ( + ComplexWaveform(5), + "nitypes.waveform.ComplexWaveform(5, raw_data=array([0.+0.j, 0.+0.j, 0.+0.j, 0.+0.j, 0.+0.j]))", + ), + ( + ComplexWaveform(5, np.complex128), + "nitypes.waveform.ComplexWaveform(5, raw_data=array([0.+0.j, 0.+0.j, 0.+0.j, 0.+0.j, 0.+0.j]))", + ), + (ComplexWaveform(0, ComplexInt32DType), "nitypes.waveform.ComplexWaveform(0, void32)"), + ( + ComplexWaveform(5, ComplexInt32DType), + "nitypes.waveform.ComplexWaveform(5, void32, raw_data=array([(0, 0), (0, 0), (0, 0), (0, 0), (0, 0)],\n" + " dtype=[('real', ' None: + assert repr(value) == expected_repr + + +_VARIOUS_VALUES = [ + ComplexWaveform(), + ComplexWaveform(10), + ComplexWaveform(10, np.complex128), + ComplexWaveform(10, ComplexInt32DType), + ComplexWaveform(10, ComplexInt32DType, start_index=5, capacity=20), + ComplexWaveform.from_array_1d([123 + 3.45j, 6.78 - 9.01j], np.complex128), + ComplexWaveform.from_array_1d([(1, 2), (3, 4), (5, 6)], ComplexInt32DType), + ComplexWaveform(timing=Timing.create_with_regular_interval(dt.timedelta(milliseconds=1))), + ComplexWaveform( + timing=PrecisionTiming.create_with_regular_interval(ht.timedelta(milliseconds=1)) + ), + ComplexWaveform( + extended_properties={"NI_ChannelName": "Dev1/ai0", "NI_UnitDescription": "Volts"} + ), + ComplexWaveform(scale_mode=LinearScaleMode(2.0, 1.0)), + ComplexWaveform(10, ComplexInt32DType, start_index=5, capacity=20), + ComplexWaveform.from_array_1d( + [(0, 0), (0, 0), (1, 1), (2, -2), (3, 33), (4, -44), (5, 50), (0, 0)], + ComplexInt32DType, + start_index=2, + sample_count=5, + ), +] + + +@pytest.mark.parametrize("value", _VARIOUS_VALUES) +def test___various_values___copy___makes_shallow_copy(value: ComplexWaveform[Any]) -> None: + new_value = copy.copy(value) + + _assert_shallow_copy(new_value, value) + + +def _assert_shallow_copy(value: ComplexWaveform[Any], other: ComplexWaveform[Any]) -> None: + assert value == other + assert value is not other + # _data may be a view of the original array. + assert value._data is other._data or value._data.base is other._data + assert value._extended_properties is other._extended_properties + assert value._timing is other._timing + assert value._scale_mode is other._scale_mode + + +@pytest.mark.parametrize("value", _VARIOUS_VALUES) +def test___various_values___deepcopy___makes_shallow_copy(value: ComplexWaveform[Any]) -> None: + new_value = copy.deepcopy(value) + + _assert_deep_copy(new_value, value) + + +def _assert_deep_copy(value: ComplexWaveform[Any], other: ComplexWaveform[Any]) -> None: + assert value == other + assert value is not other + assert value._data is not other._data and value._data.base is not other._data + assert value._extended_properties is not other._extended_properties + if other._timing is not Timing.empty and other._timing is not PrecisionTiming.empty: + assert value._timing is not other._timing + if other._scale_mode is not NO_SCALING: + assert value._scale_mode is not other._scale_mode + + +@pytest.mark.parametrize("value", _VARIOUS_VALUES) +def test___various_values___pickle_unpickle___makes_deep_copy( + value: ComplexWaveform[Any], +) -> None: + new_value = pickle.loads(pickle.dumps(value)) + + _assert_deep_copy(new_value, value) + + +def test___waveform___pickle___references_public_modules() -> None: + value = ComplexWaveform( + raw_data=np.array([1, 2, 3], np.complex128), + extended_properties={"NI_ChannelName": "Dev1/ai0", "NI_UnitDescription": "Volts"}, + timing=Timing.create_with_regular_interval(dt.timedelta(milliseconds=1)), + scale_mode=LinearScaleMode(2.0, 1.0), + ) + + value_bytes = pickle.dumps(value) + + assert b"nitypes.waveform" in value_bytes + assert b"nitypes.waveform._complex" not in value_bytes + assert b"nitypes.waveform._extended_properties" not in value_bytes + assert b"nitypes.waveform._numeric" not in value_bytes + assert b"nitypes.waveform._timing" not in value_bytes + assert b"nitypes.waveform._scaling" not in value_bytes diff --git a/tests/unit/waveform/test_spectrum.py b/tests/unit/waveform/test_spectrum.py new file mode 100644 index 00000000..006dd158 --- /dev/null +++ b/tests/unit/waveform/test_spectrum.py @@ -0,0 +1,1301 @@ +from __future__ import annotations + +import array +import copy +import itertools +import pickle +import weakref +from typing import Any, SupportsIndex + +import numpy as np +import numpy.typing as npt +import pytest +from typing_extensions import assert_type + +from nitypes.waveform import Spectrum + + +############################################################################### +# create +############################################################################### +def test___no_args___create___creates_empty_spectrum_with_default_dtype() -> None: + spectrum = Spectrum() + + assert spectrum.sample_count == spectrum.capacity == len(spectrum.data) == 0 + assert spectrum.dtype == np.float64 + assert_type(spectrum, Spectrum[np.float64]) + + +def test___sample_count___create___creates_spectrum_with_sample_count_and_default_dtype() -> None: + spectrum = Spectrum(10) + + assert spectrum.sample_count == spectrum.capacity == len(spectrum.data) == 10 + assert spectrum.dtype == np.float64 + assert_type(spectrum, Spectrum[np.float64]) + + +def test___sample_count_and_dtype___create___creates_spectrum_with_sample_count_and_dtype() -> None: + spectrum = Spectrum(10, np.int32) + + assert spectrum.sample_count == spectrum.capacity == len(spectrum.data) == 10 + assert spectrum.dtype == np.int32 + assert_type(spectrum, Spectrum[np.int32]) + + +def test___sample_count_and_dtype_str___create___creates_spectrum_with_sample_count_and_dtype() -> ( + None +): + spectrum = Spectrum(10, "i4") + + assert spectrum.sample_count == spectrum.capacity == len(spectrum.data) == 10 + assert spectrum.dtype == np.int32 + assert_type(spectrum, Spectrum[Any]) # dtype not inferred from string + + +def test___sample_count_and_dtype_any___create___creates_spectrum_with_sample_count_and_dtype() -> ( + None +): + dtype: np.dtype[Any] = np.dtype(np.int32) + spectrum = Spectrum(10, dtype) + + assert spectrum.sample_count == spectrum.capacity == len(spectrum.data) == 10 + assert spectrum.dtype == np.int32 + assert_type(spectrum, Spectrum[Any]) # dtype not inferred from np.dtype[Any] + + +def test___sample_count_dtype_and_capacity___create___creates_spectrum_with_sample_count_dtype_and_capacity() -> ( + None +): + spectrum = Spectrum(10, np.int32, capacity=20) + + assert spectrum.sample_count == len(spectrum.data) == 10 + assert spectrum.capacity == 20 + assert spectrum.dtype == np.int32 + assert_type(spectrum, Spectrum[np.int32]) + + +@pytest.mark.parametrize("dtype", [np.complex128, np.str_, np.void, "i2, i2"]) +def test___sample_count_and_unsupported_dtype___create___raises_type_error( + dtype: npt.DTypeLike, +) -> None: + with pytest.raises(TypeError) as exc: + _ = Spectrum(10, dtype) + + assert exc.value.args[0].startswith("The requested data type is not supported.") + + +def test___dtype_str_with_unsupported_traw_hint___create___mypy_type_var_warning() -> None: + spectrum1: Spectrum[np.complex128] = Spectrum(dtype="int32") # type: ignore[type-var] + spectrum2: Spectrum[np.str_] = Spectrum(dtype="int32") # type: ignore[type-var] + spectrum3: Spectrum[np.void] = Spectrum(dtype="int32") # type: ignore[type-var] + _ = spectrum1, spectrum2, spectrum3 + + +def test___dtype_str_with_traw_hint___create___narrows_traw() -> None: + spectrum: Spectrum[np.int32] = Spectrum(dtype="int32") + + assert_type(spectrum, Spectrum[np.int32]) + + +############################################################################### +# from_array_1d +############################################################################### +def test___float64_ndarray___from_array_1d___creates_spectrum_with_float64_dtype() -> None: + data = np.array([1.1, 2.2, 3.3, 4.4, 5.5], np.float64) + + spectrum = Spectrum.from_array_1d(data) + + assert spectrum.data.tolist() == data.tolist() + assert spectrum.dtype == np.float64 + assert_type(spectrum, Spectrum[np.float64]) + + +def test___int32_ndarray___from_array_1d___creates_spectrum_with_int32_dtype() -> None: + data = np.array([1, 2, 3, 4, 5], np.int32) + + spectrum = Spectrum.from_array_1d(data) + + assert spectrum.data.tolist() == data.tolist() + assert spectrum.dtype == np.int32 + assert_type(spectrum, Spectrum[np.int32]) + + +def test___int32_array_with_dtype___from_array_1d___creates_spectrum_with_specified_dtype() -> None: + data = array.array("i", [1, 2, 3, 4, 5]) + + spectrum = Spectrum.from_array_1d(data, np.int32) + + assert spectrum.data.tolist() == data.tolist() + assert spectrum.dtype == np.int32 + assert_type(spectrum, Spectrum[np.int32]) + + +def test___int16_ndarray_with_mismatched_dtype___from_array_1d___creates_spectrum_with_specified_dtype() -> ( + None +): + data = np.array([1, 2, 3, 4, 5], np.int16) + + spectrum = Spectrum.from_array_1d(data, np.int32) + + assert spectrum.data.tolist() == data.tolist() + assert spectrum.dtype == np.int32 + assert_type(spectrum, Spectrum[np.int32]) + + +def test___int_list_with_dtype___from_array_1d___creates_spectrum_with_specified_dtype() -> None: + data = [1, 2, 3, 4, 5] + + spectrum = Spectrum.from_array_1d(data, np.int32) + + assert spectrum.data.tolist() == data + assert spectrum.dtype == np.int32 + assert_type(spectrum, Spectrum[np.int32]) + + +def test___int_list_with_dtype_str___from_array_1d___creates_spectrum_with_specified_dtype() -> ( + None +): + data = [1, 2, 3, 4, 5] + + spectrum = Spectrum.from_array_1d(data, "int32") + + assert spectrum.data.tolist() == data + assert spectrum.dtype == np.int32 + assert_type(spectrum, Spectrum[Any]) # dtype not inferred from string + + +def test___int32_ndarray_2d___from_array_1d___raises_value_error() -> None: + data = np.array([[1, 2, 3], [4, 5, 6]], np.int32) + + with pytest.raises(ValueError) as exc: + _ = Spectrum.from_array_1d(data) + + assert exc.value.args[0].startswith( + "The input array must be a one-dimensional array or sequence." + ) + + +def test___int_list_without_dtype___from_array_1d___raises_value_error() -> None: + data = [1, 2, 3, 4, 5] + + with pytest.raises(ValueError) as exc: + _ = Spectrum.from_array_1d(data) + + assert exc.value.args[0].startswith( + "You must specify a dtype when the input array is a sequence." + ) + + +def test___bytes___from_array_1d___raises_value_error() -> None: + data = b"\x01\x00\x00\x00\x02\x00\x00\x00\x03\x00\x00\x00" + + with pytest.raises(ValueError) as exc: + _ = Spectrum.from_array_1d(data, np.int32) + + assert exc.value.args[0].startswith("invalid literal for int() with base 10:") + + +def test___iterable___from_array_1d___raises_type_error() -> None: + data = itertools.repeat(3) + + with pytest.raises(TypeError) as exc: + _ = Spectrum.from_array_1d(data, np.int32) # type: ignore[call-overload] + + assert exc.value.args[0].startswith( + "The input array must be a one-dimensional array or sequence." + ) + + +def test___ndarray_with_unsupported_dtype___from_array_1d___raises_type_error() -> None: + data = np.zeros(3, np.str_) + + with pytest.raises(TypeError) as exc: + _ = Spectrum.from_array_1d(data) + + assert exc.value.args[0].startswith("The requested data type is not supported.") + + +def test___copy___from_array_1d___creates_spectrum_linked_to_different_buffer() -> None: + data = np.array([1, 2, 3, 4, 5], np.int32) + + spectrum = Spectrum.from_array_1d(data, copy=True) + + assert spectrum._data is not data + assert spectrum.data.tolist() == data.tolist() + data[:] = [5, 4, 3, 2, 1] + assert spectrum.data.tolist() != data.tolist() + + +def test___int32_ndarray_no_copy___from_array_1d___creates_spectrum_linked_to_same_buffer() -> None: + data = np.array([1, 2, 3, 4, 5], np.int32) + + spectrum = Spectrum.from_array_1d(data, copy=False) + + assert spectrum._data is data + assert spectrum.data.tolist() == data.tolist() + data[:] = [5, 4, 3, 2, 1] + assert spectrum.data.tolist() == data.tolist() + + +def test___int32_array_no_copy___from_array_1d___creates_spectrum_linked_to_same_buffer() -> None: + data = array.array("i", [1, 2, 3, 4, 5]) + + spectrum = Spectrum.from_array_1d(data, dtype=np.int32, copy=False) + + assert spectrum.data.tolist() == data.tolist() + data[:] = array.array("i", [5, 4, 3, 2, 1]) + assert spectrum.data.tolist() == data.tolist() + + +def test___int_list_no_copy___from_array_1d___raises_value_error() -> None: + data = [1, 2, 3, 4, 5] + + with pytest.raises(ValueError) as exc: + _ = Spectrum.from_array_1d(data, np.int32, copy=False) + + assert exc.value.args[0].startswith( + "Unable to avoid copy while creating an array as requested." + ) + + +@pytest.mark.parametrize( + "start_index, sample_count, expected_data", + [ + (0, None, [1, 2, 3, 4, 5]), + (1, None, [2, 3, 4, 5]), + (4, None, [5]), + (5, None, []), + (0, 1, [1]), + (0, 4, [1, 2, 3, 4]), + (1, 1, [2]), + (1, 3, [2, 3, 4]), + (1, 4, [2, 3, 4, 5]), + ], +) +def test___array_subset___from_array_1d___creates_spectrum_with_array_subset( + start_index: SupportsIndex, sample_count: SupportsIndex | None, expected_data: list[int] +) -> None: + data = np.array([1, 2, 3, 4, 5], np.int32) + + spectrum = Spectrum.from_array_1d(data, start_index=start_index, sample_count=sample_count) + + assert spectrum.data.tolist() == expected_data + + +@pytest.mark.parametrize( + "start_index, sample_count, expected_message", + [ + (-2, None, "The start index must be a non-negative integer."), + (-1, None, "The start index must be a non-negative integer."), + (6, None, "The start index must be less than or equal to the input array length."), + (0, -2, "The sample count must be a non-negative integer."), + (0, -1, "The sample count must be a non-negative integer."), + ( + 0, + 6, + "The sum of the start index and sample count must be less than or equal to the input array length.", + ), + ( + 1, + 5, + "The sum of the start index and sample count must be less than or equal to the input array length.", + ), + ( + 5, + 1, + "The sum of the start index and sample count must be less than or equal to the input array length.", + ), + ], +) +def test___invalid_array_subset___from_array_1d___raises_value_error( + start_index: SupportsIndex, sample_count: SupportsIndex | None, expected_message: str +) -> None: + data = np.array([1, 2, 3, 4, 5], np.int32) + + with pytest.raises(ValueError) as exc: + _ = Spectrum.from_array_1d(data, start_index=start_index, sample_count=sample_count) + + assert exc.value.args[0].startswith(expected_message) + + +############################################################################### +# from_array_2d +############################################################################### +def test___float64_ndarray___from_array_2d___creates_spectrum_with_float64_dtype() -> None: + data = np.array([[1.1, 2.2, 3.3], [4.4, 5.5, 6.6]], np.float64) + + spectrums = Spectrum.from_array_2d(data) + + assert len(spectrums) == 2 + for i in range(len(spectrums)): + assert spectrums[i].data.tolist() == data[i].tolist() + assert spectrums[i].dtype == np.float64 + assert_type(spectrums[i], Spectrum[np.float64]) + + +def test___int32_ndarray___from_array_2d___creates_spectrum_with_int32_dtype() -> None: + data = np.array([[1, 2, 3], [4, 5, 6]], np.int32) + + spectrums = Spectrum.from_array_2d(data) + + assert len(spectrums) == 2 + for i in range(len(spectrums)): + assert spectrums[i].data.tolist() == data[i].tolist() + assert spectrums[i].dtype == np.int32 + assert_type(spectrums[i], Spectrum[np.int32]) + + +def test___int16_ndarray_with_mismatched_dtype___from_array_2d___creates_spectrum_with_specified_dtype() -> ( + None +): + data = np.array([[1, 2, 3], [4, 5, 6]], np.int16) + + spectrums = Spectrum.from_array_2d(data, np.int32) + + assert len(spectrums) == 2 + for i in range(len(spectrums)): + assert spectrums[i].data.tolist() == data[i].tolist() + assert spectrums[i].dtype == np.int32 + assert_type(spectrums[i], Spectrum[np.int32]) + + +def test___int32_array_list_with_dtype___from_array_2d___creates_spectrum_with_specified_dtype() -> ( + None +): + data = [array.array("i", [1, 2, 3]), array.array("i", [4, 5, 6])] + + spectrums = Spectrum.from_array_2d(data, np.int32) + + assert len(spectrums) == 2 + for i in range(len(spectrums)): + assert spectrums[i].data.tolist() == data[i].tolist() + assert spectrums[i].dtype == np.int32 + assert_type(spectrums[i], Spectrum[np.int32]) + + +def test___int_list_list_with_dtype___from_array_2d___creates_spectrum_with_specified_dtype() -> ( + None +): + data = [[1, 2, 3], [4, 5, 6]] + + spectrums = Spectrum.from_array_2d(data, np.int32) + + assert len(spectrums) == 2 + for i in range(len(spectrums)): + assert spectrums[i].data.tolist() == data[i] + assert spectrums[i].dtype == np.int32 + assert_type(spectrums[i], Spectrum[np.int32]) + + +def test___int_list_list_with_dtype_str___from_array_2d___creates_spectrum_with_specified_dtype() -> ( + None +): + data = [[1, 2, 3], [4, 5, 6]] + + spectrums = Spectrum.from_array_2d(data, "int32") + + assert len(spectrums) == 2 + for i in range(len(spectrums)): + assert spectrums[i].data.tolist() == data[i] + assert spectrums[i].dtype == np.int32 + assert_type(spectrums[i], Spectrum[Any]) # dtype not inferred from string + + +def test___int32_ndarray_1d___from_array_2d___raises_value_error() -> None: + data = np.array([1, 2, 3, 4, 5], np.int32) + + with pytest.raises(ValueError) as exc: + _ = Spectrum.from_array_2d(data) + + assert exc.value.args[0].startswith( + "The input array must be a two-dimensional array or nested sequence." + ) + + +def test___int_list_list_without_dtype___from_array_2d___raises_value_error() -> None: + data = [[1, 2, 3], [4, 5, 6]] + + with pytest.raises(ValueError) as exc: + _ = Spectrum.from_array_2d(data) + + assert exc.value.args[0].startswith( + "You must specify a dtype when the input array is a sequence." + ) + + +def test___bytes_list___from_array_2d___raises_value_error() -> None: + data = [ + b"\x01\x00\x00\x00\x02\x00\x00\x00\x03\x00\x00\x00", + b"\x04\x00\x00\x00\x05\x00\x00\x00\x06\x00\x00\x00", + ] + + with pytest.raises(ValueError) as exc: + _ = Spectrum.from_array_2d(data, np.int32) + + assert exc.value.args[0].startswith("invalid literal for int() with base 10:") + + +def test___list_iterable___from_array_2d___raises_type_error() -> None: + data = itertools.repeat([3]) + + with pytest.raises(TypeError) as exc: + _ = Spectrum.from_array_2d(data, np.int32) # type: ignore[call-overload] + + assert exc.value.args[0].startswith( + "The input array must be a two-dimensional array or nested sequence." + ) + + +def test___iterable_list___from_array_2d___raises_type_error() -> None: + data = [itertools.repeat(3), itertools.repeat(4)] + + with pytest.raises(TypeError) as exc: + _ = Spectrum.from_array_2d(data, np.int32) # type: ignore[arg-type] + + assert exc.value.args[0].startswith("int() argument must be") + + +def test___ndarray_with_unsupported_dtype___from_array_2d___raises_type_error() -> None: + data = np.zeros((2, 3), np.str_) + + with pytest.raises(TypeError) as exc: + _ = Spectrum.from_array_2d(data) + + assert exc.value.args[0].startswith("The requested data type is not supported.") + + +def test___copy___from_array_2d___creates_spectrum_linked_to_different_buffer() -> None: + data = np.array([[1, 2, 3], [4, 5, 6]], np.int32) + + spectrums = Spectrum.from_array_2d(data, copy=True) + + assert len(spectrums) == 2 + for i in range(len(spectrums)): + assert spectrums[i].data.tolist() == data[i].tolist() + data[0][:] = [3, 2, 1] + data[1][:] = [6, 5, 4] + for i in range(len(spectrums)): + assert spectrums[i].data.tolist() != data[i].tolist() + + +def test___int32_ndarray_no_copy___from_array_2d___creates_spectrum_linked_to_same_buffer() -> None: + data = np.array([[1, 2, 3], [4, 5, 6]], np.int32) + + spectrums = Spectrum.from_array_2d(data, copy=False) + + assert len(spectrums) == 2 + for i in range(len(spectrums)): + assert spectrums[i].data.tolist() == data[i].tolist() + data[0][:] = [3, 2, 1] + data[1][:] = [6, 5, 4] + for i in range(len(spectrums)): + assert spectrums[i].data.tolist() == data[i].tolist() + + +def test___int32_array_list_no_copy___from_array_2d___creates_spectrum_linked_to_same_buffer() -> ( + None +): + data = [array.array("i", [1, 2, 3]), array.array("i", [4, 5, 6])] + + spectrums = Spectrum.from_array_2d(data, dtype=np.int32, copy=False) + + assert len(spectrums) == 2 + for i in range(len(spectrums)): + assert spectrums[i].data.tolist() == data[i].tolist() + data[0][:] = array.array("i", [3, 2, 1]) + data[1][:] = array.array("i", [6, 5, 4]) + for i in range(len(spectrums)): + assert spectrums[i].data.tolist() == data[i].tolist() + + +def test___int_list_list_no_copy___from_array_2d___raises_value_error() -> None: + data = [[1, 2, 3], [4, 5, 6]] + + with pytest.raises(ValueError) as exc: + _ = Spectrum.from_array_2d(data, np.int32, copy=False) + + assert exc.value.args[0].startswith( + "Unable to avoid copy while creating an array as requested." + ) + + +@pytest.mark.parametrize( + "start_index, sample_count, expected_data", + [ + (0, None, [[1, 2, 3, 4, 5], [6, 7, 8, 9, 10]]), + (1, None, [[2, 3, 4, 5], [7, 8, 9, 10]]), + (4, None, [[5], [10]]), + (5, None, [[], []]), + (0, 1, [[1], [6]]), + (0, 4, [[1, 2, 3, 4], [6, 7, 8, 9]]), + (1, 1, [[2], [7]]), + (1, 3, [[2, 3, 4], [7, 8, 9]]), + (1, 4, [[2, 3, 4, 5], [7, 8, 9, 10]]), + ], +) +def test___array_subset___from_array_2d___creates_spectrum_with_array_subset( + start_index: SupportsIndex, sample_count: SupportsIndex | None, expected_data: list[list[int]] +) -> None: + data = np.array([[1, 2, 3, 4, 5], [6, 7, 8, 9, 10]], np.int32) + + spectrums = Spectrum.from_array_2d(data, start_index=start_index, sample_count=sample_count) + + assert len(spectrums) == 2 + for i in range(len(spectrums)): + assert spectrums[i].data.tolist() == expected_data[i] + + +@pytest.mark.parametrize( + "start_index, sample_count, expected_message", + [ + (-2, None, "The start index must be a non-negative integer."), + (-1, None, "The start index must be a non-negative integer."), + (6, None, "The start index must be less than or equal to the input array length."), + (0, -2, "The sample count must be a non-negative integer."), + (0, -1, "The sample count must be a non-negative integer."), + ( + 0, + 6, + "The sum of the start index and sample count must be less than or equal to the input array length.", + ), + ( + 1, + 5, + "The sum of the start index and sample count must be less than or equal to the input array length.", + ), + ( + 5, + 1, + "The sum of the start index and sample count must be less than or equal to the input array length.", + ), + ], +) +def test___invalid_array_subset___from_array_2d___raises_value_error( + start_index: SupportsIndex, sample_count: SupportsIndex | None, expected_message: str +) -> None: + data = np.array([[1, 2, 3, 4, 5], [6, 7, 8, 9, 10]], np.int32) + + with pytest.raises(ValueError) as exc: + _ = Spectrum.from_array_2d(data, start_index=start_index, sample_count=sample_count) + + assert exc.value.args[0].startswith(expected_message) + + +############################################################################### +# data +############################################################################### +def test___int32_spectrum___data___returns_int32_data() -> None: + spectrum = Spectrum.from_array_1d([0, 1, 2, 3], np.int32) + + data = spectrum.data + + assert_type(data, npt.NDArray[np.int32]) + assert isinstance(data, np.ndarray) and data.dtype == np.int32 + assert list(data) == [0, 1, 2, 3] + + +############################################################################### +# get_data +############################################################################### +def test___int32_spectrum___get_data___returns_data() -> None: + spectrum = Spectrum.from_array_1d([0, 1, 2, 3], np.int32) + + scaled_data = spectrum.get_data() + + assert_type(scaled_data, npt.NDArray[np.int32]) + assert isinstance(scaled_data, np.ndarray) and scaled_data.dtype == np.int32 + assert list(scaled_data) == [0, 1, 2, 3] + + +@pytest.mark.parametrize( + "start_index, sample_count, expected_data", + [ + (None, None, [0, 1, 2, 3]), + (0, None, [0, 1, 2, 3]), + (1, None, [1, 2, 3]), + (3, None, [3]), + (4, None, []), + (None, None, [0, 1, 2, 3]), + (None, 1, [0]), + (None, 3, [0, 1, 2]), + (None, 4, [0, 1, 2, 3]), + (1, 2, [1, 2]), + (4, 0, []), + ], +) +def test___array_subset___get_data___returns_array_subset( + start_index: int, sample_count: int, expected_data: list[int] +) -> None: + spectrum = Spectrum.from_array_1d([0, 1, 2, 3], np.int32) + + scaled_data = spectrum.get_data(start_index=start_index, sample_count=sample_count) + + assert_type(scaled_data, npt.NDArray[np.int32]) + assert isinstance(scaled_data, np.ndarray) and scaled_data.dtype == np.int32 + assert list(scaled_data) == expected_data + + +@pytest.mark.parametrize( + "start_index, sample_count, expected_message", + [ + ( + 5, + None, + "The start index must be less than or equal to the number of samples in the spectrum.", + ), + ( + 0, + 5, + "The sum of the start index and sample count must be less than or equal to the number of samples in the spectrum.", + ), + ( + 4, + 1, + "The sum of the start index and sample count must be less than or equal to the number of samples in the spectrum.", + ), + ], +) +def test___invalid_array_subset___get_data___returns_array_subset( + start_index: int, sample_count: int, expected_message: str +) -> None: + spectrum = Spectrum.from_array_1d([0, 1, 2, 3], np.int32) + + with pytest.raises((TypeError, ValueError)) as exc: + _ = spectrum.get_data(start_index=start_index, sample_count=sample_count) + + assert exc.value.args[0].startswith(expected_message) + + +############################################################################### +# capacity +############################################################################### +@pytest.mark.parametrize( + "capacity, expected_data", + [(3, [1, 2, 3]), (4, [1, 2, 3, 0]), (10, [1, 2, 3, 0, 0, 0, 0, 0, 0, 0])], +) +def test___spectrum___set_capacity___resizes_array_and_pads_with_zeros( + capacity: int, expected_data: list[int] +) -> None: + data = [1, 2, 3] + spectrum = Spectrum.from_array_1d(data, np.int32) + + spectrum.capacity = capacity + + assert spectrum.capacity == capacity + assert spectrum.data.tolist() == data + assert spectrum._data.tolist() == expected_data + + +@pytest.mark.parametrize( + "capacity, expected_message", + [ + (-2, "The capacity must be a non-negative integer."), + (-1, "The capacity must be a non-negative integer."), + (0, "The capacity must be equal to or greater than the number of samples in the spectrum."), + (2, "The capacity must be equal to or greater than the number of samples in the spectrum."), + ], +) +def test___invalid_capacity___set_capacity___raises_value_error( + capacity: int, expected_message: str +) -> None: + data = [1, 2, 3] + spectrum = Spectrum.from_array_1d(data, np.int32) + + with pytest.raises(ValueError) as exc: + spectrum.capacity = capacity + + assert exc.value.args[0].startswith(expected_message) + + +def test___referenced_array___set_capacity___reference_sees_size_change() -> None: + data = np.array([1, 2, 3], np.int32) + spectrum = Spectrum.from_array_1d(data, np.int32, copy=False) + + spectrum.capacity = 10 + + assert len(data) == 10 + assert spectrum.capacity == 10 + assert data.tolist() == [1, 2, 3, 0, 0, 0, 0, 0, 0, 0] + assert spectrum.data.tolist() == [1, 2, 3] + assert spectrum._data.tolist() == [1, 2, 3, 0, 0, 0, 0, 0, 0, 0] + + +def test___array_with_external_buffer___set_capacity___raises_value_error() -> None: + data = array.array("i", [1, 2, 3]) + spectrum = Spectrum.from_array_1d(data, np.int32, copy=False) + + with pytest.raises(ValueError) as exc: + spectrum.capacity = 10 + + assert exc.value.args[0].startswith("cannot resize this array: it does not own its data") + + +############################################################################### +# extended properties +############################################################################### +def test___spectrum___set_channel_name___sets_extended_property() -> None: + spectrum = Spectrum() + + spectrum.channel_name = "Dev1/ai0" + + assert spectrum.channel_name == "Dev1/ai0" + assert spectrum.extended_properties["NI_ChannelName"] == "Dev1/ai0" + + +def test___invalid_type___set_channel_name___raises_type_error() -> None: + spectrum = Spectrum() + + with pytest.raises(TypeError) as exc: + spectrum.channel_name = 1 # type: ignore[assignment] + + assert exc.value.args[0].startswith("The channel name must be a str.") + + +def test___spectrum___set_unit_description___sets_extended_property() -> None: + spectrum = Spectrum() + + spectrum.unit_description = "Volts" + + assert spectrum.unit_description == "Volts" + assert spectrum.extended_properties["NI_UnitDescription"] == "Volts" + + +def test___invalid_type___set_unit_description___raises_type_error() -> None: + spectrum = Spectrum() + + with pytest.raises(TypeError) as exc: + spectrum.unit_description = None # type: ignore[assignment] + + assert exc.value.args[0].startswith("The unit description must be a str.") + + +def test___spectrum___set_undefined_property___raises_attribute_error() -> None: + spectrum = Spectrum() + + with pytest.raises(AttributeError): + spectrum.undefined_property = "Whatever" # type: ignore[attr-defined] + + +def test___spectrum___take_weak_ref___references_spectrum() -> None: + spectrum = Spectrum() + + spectrum_ref = weakref.ref(spectrum) + + assert spectrum_ref() is spectrum + + +############################################################################### +# frequency range +############################################################################### +def test___spectrum___has_default_frequency_range() -> None: + spectrum = Spectrum() + + assert spectrum.start_frequency == 0.0 + assert spectrum.frequency_increment == 0.0 + + +def test___spectrum_with_frequencies___has_specified_frequency_range() -> None: + spectrum = Spectrum(start_frequency=123.456, frequency_increment=0.1) + + assert spectrum.start_frequency == 123.456 + assert spectrum.frequency_increment == 0.1 + + +def test___spectrum_with_frequencies___set_frequencies___has_set_frequency_range() -> None: + spectrum = Spectrum(start_frequency=123.456, frequency_increment=0.1) + + spectrum.start_frequency = 234.567 + spectrum.frequency_increment = 0.2 + + assert spectrum.start_frequency == 234.567 + assert spectrum.frequency_increment == 0.2 + + +############################################################################### +# append array +############################################################################### +def test___empty_ndarray___append___no_effect() -> None: + spectrum = Spectrum.from_array_1d([0, 1, 2], np.int32) + array = np.array([], np.int32) + + spectrum.append(array) + + assert list(spectrum.data) == [0, 1, 2] + + +def test___int32_ndarray___append___appends_array() -> None: + spectrum = Spectrum.from_array_1d([0, 1, 2], np.int32) + array = np.array([3, 4, 5], np.int32) + + spectrum.append(array) + + assert list(spectrum.data) == [0, 1, 2, 3, 4, 5] + + +def test___float64_ndarray___append___appends_array() -> None: + spectrum = Spectrum.from_array_1d([0, 1, 2], np.float64) + array = np.array([3, 4, 5], np.float64) + + spectrum.append(array) + + assert list(spectrum.data) == [0, 1, 2, 3, 4, 5] + + +def test___ndarray_with_mismatched_dtype___append___raises_type_error() -> None: + spectrum = Spectrum.from_array_1d([0, 1, 2], np.float64) + array = np.array([3, 4, 5], np.int32) + + with pytest.raises(TypeError) as exc: + spectrum.append(array) # type: ignore[arg-type] + + assert exc.value.args[0].startswith( + "The data type of the input array must match the spectrum data type." + ) + + +def test___ndarray_2d___append___raises_value_error() -> None: + spectrum = Spectrum.from_array_1d([0, 1, 2], np.float64) + array = np.array([[3, 4, 5], [6, 7, 8]], np.float64) + + with pytest.raises(ValueError) as exc: + spectrum.append(array) + + assert exc.value.args[0].startswith("The input array must be a one-dimensional array.") + + +############################################################################### +# append spectrum +############################################################################### +def test___empty_spectrum___append___no_effect() -> None: + spectrum = Spectrum.from_array_1d([0, 1, 2], np.int32) + other = Spectrum(dtype=np.int32) + + spectrum.append(other) + + assert list(spectrum.data) == [0, 1, 2] + + +def test___int32_spectrum___append___appends_spectrum() -> None: + spectrum = Spectrum.from_array_1d([0, 1, 2], np.int32) + other = Spectrum.from_array_1d([3, 4, 5], np.int32) + + spectrum.append(other) + + assert list(spectrum.data) == [0, 1, 2, 3, 4, 5] + + +def test___float64_spectrum___append___appends_spectrum() -> None: + spectrum = Spectrum.from_array_1d([0, 1, 2], np.float64) + other = Spectrum.from_array_1d([3, 4, 5], np.float64) + + spectrum.append(other) + + assert list(spectrum.data) == [0, 1, 2, 3, 4, 5] + + +def test___spectrum_with_mismatched_dtype___append___raises_type_error() -> None: + spectrum = Spectrum.from_array_1d([0, 1, 2], np.float64) + other = Spectrum.from_array_1d([3, 4, 5], np.int32) + + with pytest.raises(TypeError) as exc: + spectrum.append(other) # type: ignore[arg-type] + + assert exc.value.args[0].startswith( + "The data type of the input spectrum must match the spectrum data type." + ) + + +############################################################################### +# append spectrums +############################################################################### +def test___empty_spectrum_list___append___no_effect() -> None: + spectrum = Spectrum.from_array_1d([0, 1, 2], np.int32) + other: list[Spectrum[np.int32]] = [] + + spectrum.append(other) + + assert list(spectrum.data) == [0, 1, 2] + + +def test___int32_spectrum_list___append___appends_spectrum() -> None: + spectrum = Spectrum.from_array_1d([0, 1, 2], np.int32) + other = [ + Spectrum.from_array_1d([3, 4, 5], np.int32), + Spectrum.from_array_1d([6], np.int32), + Spectrum.from_array_1d([7, 8], np.int32), + ] + + spectrum.append(other) + + assert list(spectrum.data) == [0, 1, 2, 3, 4, 5, 6, 7, 8] + + +def test___float64_spectrum_tuple___append___appends_spectrum() -> None: + spectrum = Spectrum.from_array_1d([0, 1, 2], np.float64) + other = ( + Spectrum.from_array_1d([3, 4, 5], np.float64), + Spectrum.from_array_1d([6, 7, 8], np.float64), + ) + + spectrum.append(other) + + assert list(spectrum.data) == [0, 1, 2, 3, 4, 5, 6, 7, 8] + + +def test___spectrum_list_with_mismatched_dtype___append___raises_type_error_and_does_not_append() -> ( + None +): + spectrum = Spectrum.from_array_1d([0, 1, 2], np.float64) + other = [ + Spectrum.from_array_1d([3, 4, 5], np.float64), + Spectrum.from_array_1d([6, 7, 8], np.int32), + ] + + with pytest.raises(TypeError) as exc: + spectrum.append(other) # type: ignore[arg-type] + + assert exc.value.args[0].startswith( + "The data type of the input spectrum must match the spectrum data type." + ) + assert list(spectrum.data) == [0, 1, 2] + + +############################################################################### +# load data +############################################################################### +def test___empty_ndarray___load_data___clears_data() -> None: + spectrum = Spectrum.from_array_1d([0, 1, 2], np.int32) + array = np.array([], np.int32) + + spectrum.load_data(array) + + assert list(spectrum.data) == [] + + +def test___int32_ndarray___load_data___overwrites_data() -> None: + spectrum = Spectrum.from_array_1d([0, 1, 2], np.int32) + array = np.array([3, 4, 5], np.int32) + + spectrum.load_data(array) + + assert list(spectrum.data) == [3, 4, 5] + + +def test___float64_ndarray___load_data___overwrites_data() -> None: + spectrum = Spectrum.from_array_1d([0, 1, 2], np.float64) + array = np.array([3, 4, 5], np.float64) + + spectrum.load_data(array) + + assert list(spectrum.data) == [3, 4, 5] + + +def test___ndarray_with_mismatched_dtype___load_data___raises_type_error() -> None: + spectrum = Spectrum.from_array_1d([0, 1, 2], np.float64) + array = np.array([3, 4, 5], np.int32) + + with pytest.raises(TypeError) as exc: + spectrum.load_data(array) # type: ignore[arg-type] + + assert exc.value.args[0].startswith( + "The data type of the input array must match the spectrum data type." + ) + + +def test___ndarray_2d___load_data___raises_value_error() -> None: + spectrum = Spectrum.from_array_1d([0, 1, 2], np.float64) + array = np.array([[3, 4, 5], [6, 7, 8]], np.float64) + + with pytest.raises(ValueError) as exc: + spectrum.load_data(array) + + assert exc.value.args[0].startswith("The input array must be a one-dimensional array.") + + +def test___smaller_ndarray___load_data___preserves_capacity() -> None: + spectrum = Spectrum.from_array_1d([0, 1, 2], np.int32) + array = np.array([3], np.int32) + + spectrum.load_data(array) + + assert list(spectrum.data) == [3] + assert spectrum.capacity == 3 + + +def test___larger_ndarray___load_data___grows_capacity() -> None: + spectrum = Spectrum.from_array_1d([0, 1, 2], np.int32) + array = np.array([3, 4, 5, 6], np.int32) + + spectrum.load_data(array) + + assert list(spectrum.data) == [3, 4, 5, 6] + assert spectrum.capacity == 4 + + +def test___spectrum_with_start_index___load_data___clears_start_index() -> None: + spectrum = Spectrum.from_array_1d( + np.array([0, 1, 2], np.int32), np.int32, copy=False, start_index=1, sample_count=1 + ) + assert spectrum._start_index == 1 + array = np.array([3], np.int32) + + spectrum.load_data(array) + + assert list(spectrum.data) == [3] + assert spectrum._start_index == 0 + + +def test___ndarray_subset___load_data___overwrites_data() -> None: + spectrum = Spectrum.from_array_1d([0, 1, 2], np.int32) + array = np.array([3, 4, 5], np.int32) + + spectrum.load_data(array, start_index=1, sample_count=1) + + assert list(spectrum.data) == [4] + assert spectrum._start_index == 0 + assert spectrum.capacity == 3 + + +def test___smaller_ndarray_no_copy___load_data___takes_ownership_of_array() -> None: + spectrum = Spectrum.from_array_1d([0, 1, 2], np.int32) + array = np.array([3], np.int32) + + spectrum.load_data(array, copy=False) + + assert list(spectrum.data) == [3] + assert spectrum._data is array + + +def test___larger_ndarray_no_copy___load_data___takes_ownership_of_array() -> None: + spectrum = Spectrum.from_array_1d([0, 1, 2], np.int32) + array = np.array([3, 4, 5, 6], np.int32) + + spectrum.load_data(array, copy=False) + + assert list(spectrum.data) == [3, 4, 5, 6] + assert spectrum._data is array + + +def test___ndarray_subset_no_copy___load_data___takes_ownership_of_array_subset() -> None: + spectrum = Spectrum.from_array_1d([0, 1, 2], np.int32) + array = np.array([3, 4, 5, 6], np.int32) + + spectrum.load_data(array, copy=False, start_index=1, sample_count=2) + + assert list(spectrum.data) == [4, 5] + assert spectrum._data is array + + +############################################################################### +# magic methods +############################################################################### +@pytest.mark.parametrize( + "left, right", + [ + (Spectrum(), Spectrum()), + (Spectrum(10), Spectrum(10)), + (Spectrum(10, np.float64), Spectrum(10, np.float64)), + (Spectrum(10, np.int32), Spectrum(10, np.int32)), + ( + Spectrum(10, np.int32, start_index=5, capacity=20), + Spectrum(10, np.int32, start_index=5, capacity=20), + ), + ( + Spectrum.from_array_1d([1, 2, 3], np.float64), + Spectrum.from_array_1d([1, 2, 3], np.float64), + ), + ( + Spectrum.from_array_1d([1, 2, 3], np.int32), + Spectrum.from_array_1d([1, 2, 3], np.int32), + ), + ( + Spectrum( + extended_properties={"NI_ChannelName": "Dev1/ai0", "NI_UnitDescription": "Volts"} + ), + Spectrum( + extended_properties={"NI_ChannelName": "Dev1/ai0", "NI_UnitDescription": "Volts"} + ), + ), + # start_index and capacity may differ as long as data and sample_count are the same. + ( + Spectrum(10, np.int32, start_index=5, capacity=20), + Spectrum(10, np.int32, start_index=10, capacity=25), + ), + ( + Spectrum.from_array_1d( + [0, 0, 1, 2, 3, 4, 5, 0], np.int32, start_index=2, sample_count=5 + ), + Spectrum.from_array_1d( + [0, 1, 2, 3, 4, 5, 0, 0, 0], np.int32, start_index=1, sample_count=5 + ), + ), + ], +) +def test___same_value___equality___equal(left: Spectrum[Any], right: Spectrum[Any]) -> None: + assert left == right + assert not (left != right) + + +@pytest.mark.parametrize( + "left, right", + [ + (Spectrum(), Spectrum(10)), + (Spectrum(10), Spectrum(11)), + (Spectrum(10, np.float64), Spectrum(10, np.int32)), + ( + Spectrum(15, np.int32, start_index=5, capacity=20), + Spectrum(10, np.int32, start_index=5, capacity=20), + ), + ( + Spectrum.from_array_1d([1, 4, 3], np.float64), + Spectrum.from_array_1d([1, 2, 3], np.float64), + ), + ( + Spectrum.from_array_1d([1, 2, 3], np.int32), + Spectrum.from_array_1d([1, 2, 3], np.float64), + ), + ( + Spectrum( + extended_properties={"NI_ChannelName": "Dev1/ai0", "NI_UnitDescription": "Volts"} + ), + Spectrum( + extended_properties={"NI_ChannelName": "Dev1/ai0", "NI_UnitDescription": "Amps"} + ), + ), + ], +) +def test___different_value___equality___not_equal( + left: Spectrum[Any], right: Spectrum[Any] +) -> None: + assert not (left == right) + assert left != right + + +@pytest.mark.parametrize( + "value, expected_repr", + [ + (Spectrum(), "nitypes.waveform.Spectrum(0)"), + ( + Spectrum(5), + "nitypes.waveform.Spectrum(5, data=array([0., 0., 0., 0., 0.]))", + ), + ( + Spectrum(5, np.float64), + "nitypes.waveform.Spectrum(5, data=array([0., 0., 0., 0., 0.]))", + ), + (Spectrum(0, np.int32), "nitypes.waveform.Spectrum(0, int32)"), + ( + Spectrum(5, np.int32), + "nitypes.waveform.Spectrum(5, int32, data=array([0, 0, 0, 0, 0], dtype=int32))", + ), + ( + Spectrum(5, np.int32, start_index=5, capacity=20), + "nitypes.waveform.Spectrum(5, int32, data=array([0, 0, 0, 0, 0], dtype=int32))", + ), + ( + Spectrum.from_array_1d([1, 2, 3], np.float64), + "nitypes.waveform.Spectrum(3, data=array([1., 2., 3.]))", + ), + ( + Spectrum.from_array_1d([1, 2, 3], np.int32), + "nitypes.waveform.Spectrum(3, int32, data=array([1, 2, 3], dtype=int32))", + ), + ( + Spectrum( + extended_properties={"NI_ChannelName": "Dev1/ai0", "NI_UnitDescription": "Volts"} + ), + "nitypes.waveform.Spectrum(0, extended_properties={'NI_ChannelName': 'Dev1/ai0', 'NI_UnitDescription': 'Volts'})", + ), + ( + Spectrum.from_array_1d( + [1, 2, 3], + np.int32, + start_frequency=123.456, + frequency_increment=0.1, + ), + "nitypes.waveform.Spectrum(3, int32, data=array([1, 2, 3], dtype=int32), start_frequency=123.456, frequency_increment=0.1)", + ), + ( + Spectrum.from_array_1d( + [1, 2, 3], + np.int32, + extended_properties={"NI_ChannelName": "Dev1/ai0", "NI_UnitDescription": "Volts"}, + ), + "nitypes.waveform.Spectrum(3, int32, data=array([1, 2, 3], dtype=int32), extended_properties={'NI_ChannelName': 'Dev1/ai0', 'NI_UnitDescription': 'Volts'})", + ), + ], +) +def test___various_values___repr___looks_ok(value: Spectrum[Any], expected_repr: str) -> None: + assert repr(value) == expected_repr + + +_VARIOUS_VALUES = [ + Spectrum(), + Spectrum(10), + Spectrum(10, np.float64), + Spectrum(10, np.int32), + Spectrum(10, np.int32, start_index=5, capacity=20), + Spectrum.from_array_1d([1, 2, 3], np.float64), + Spectrum.from_array_1d([1, 2, 3], np.int32), + Spectrum(start_frequency=123.456, frequency_increment=0.1), + Spectrum(extended_properties={"NI_ChannelName": "Dev1/ai0", "NI_UnitDescription": "Volts"}), + Spectrum(10, np.int32, start_index=5, capacity=20), + Spectrum.from_array_1d([0, 0, 1, 2, 3, 4, 5, 0], np.int32, start_index=2, sample_count=5), +] + + +@pytest.mark.parametrize("value", _VARIOUS_VALUES) +def test___various_values___copy___makes_shallow_copy(value: Spectrum[Any]) -> None: + new_value = copy.copy(value) + + _assert_shallow_copy(new_value, value) + + +def _assert_shallow_copy(value: Spectrum[Any], other: Spectrum[Any]) -> None: + assert value == other + assert value is not other + # _data may be a view of the original array. + assert value._data is other._data or value._data.base is other._data + assert value._start_frequency == other._start_frequency + assert value._frequency_increment == other._frequency_increment + assert value._extended_properties is other._extended_properties + + +@pytest.mark.parametrize("value", _VARIOUS_VALUES) +def test___various_values___deepcopy___makes_shallow_copy(value: Spectrum[Any]) -> None: + new_value = copy.deepcopy(value) + + _assert_deep_copy(new_value, value) + + +def _assert_deep_copy(value: Spectrum[Any], other: Spectrum[Any]) -> None: + assert value == other + assert value is not other + assert value._data is not other._data and value._data.base is not other._data + assert value._start_frequency == other._start_frequency + assert value._frequency_increment == other._frequency_increment + assert value._extended_properties is not other._extended_properties + + +@pytest.mark.parametrize("value", _VARIOUS_VALUES) +def test___various_values___pickle_unpickle___makes_deep_copy( + value: Spectrum[Any], +) -> None: + new_value = pickle.loads(pickle.dumps(value)) + + _assert_deep_copy(new_value, value) + + +def test___spectrum___pickle___references_public_modules() -> None: + value = Spectrum( + data=np.array([1, 2, 3], np.float64), + start_frequency=123.456, + frequency_increment=0.1, + extended_properties={"NI_ChannelName": "Dev1/ai0", "NI_UnitDescription": "Volts"}, + ) + + value_bytes = pickle.dumps(value) + + assert b"nitypes.waveform" in value_bytes + assert b"nitypes.waveform._extended_properties" not in value_bytes + assert b"nitypes.waveform._spectrum" not in value_bytes