Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/nitypes/_exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,23 +44,23 @@ def invalid_arg_type(arg_description: str, type_description: str, value: object)

def invalid_array_ndim(arg_description: str, valid_value_description: str, ndim: int) -> ValueError:
"""Create a ValueError for an array with an invalid number of dimensions."""
raise ValueError(
return ValueError(
f"The {arg_description} must be {_a(valid_value_description)}.\n\n"
f"Number of dimensions: {ndim}"
)


def invalid_requested_type(type_description: str, requested_type: type) -> TypeError:
"""Create a TypeError for an invalid requested type."""
raise TypeError(
return TypeError(
f"The requested type must be {_a(type_description)} type.\n\n"
f"Requested type: {requested_type}"
)


def unsupported_arg(arg_description: str, value: object) -> ValueError:
"""Create a ValueError for an unsupported argument."""
raise ValueError(
return ValueError(
f"The {arg_description} argument is not supported.\n\n"
f"Provided value: {reprlib.repr(value)}"
)
Expand Down
16 changes: 15 additions & 1 deletion src/nitypes/waveform/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Waveform data types for NI Python APIs."""

from nitypes.waveform._analog_waveform import AnalogWaveform
from nitypes.waveform._exceptions import TimingMismatchError
from nitypes.waveform._extended_properties import (
ExtendedPropertyDictionary,
ExtendedPropertyValue,
Expand All @@ -11,7 +12,13 @@
NoneScaleMode,
ScaleMode,
)
from nitypes.waveform._timing import BaseTiming, PrecisionTiming, SampleIntervalMode, Timing
from nitypes.waveform._timing import (
BaseTiming,
PrecisionTiming,
SampleIntervalMode,
Timing,
)
from nitypes.waveform._warnings import ScalingMismatchWarning, TimingMismatchWarning

__all__ = [
"AnalogWaveform",
Expand All @@ -24,7 +31,10 @@
"PrecisionTiming",
"SampleIntervalMode",
"ScaleMode",
"ScalingMismatchWarning",
"Timing",
"TimingMismatchError",
"TimingMismatchWarning",
]

# Hide that it was defined in a helper file
Expand All @@ -33,8 +43,12 @@
ExtendedPropertyDictionary.__module__ = __name__
# ExtendedPropertyValue is a TypeAlias
LinearScaleMode.__module__ = __name__
# NO_SCALING is a constant
NoneScaleMode.__module__ = __name__
PrecisionTiming.__module__ = __name__
SampleIntervalMode.__module__ = __name__
ScaleMode.__module__ = __name__
ScalingMismatchWarning.__module__ = __name__
Timing.__module__ = __name__
TimingMismatchError.__module__ = __name__
TimingMismatchWarning.__module__ = __name__
210 changes: 173 additions & 37 deletions src/nitypes/waveform/_analog_waveform.py
Original file line number Diff line number Diff line change
@@ -1,34 +1,37 @@
from __future__ import annotations

import datetime as dt
import sys
import warnings
from collections.abc import Sequence
from typing import (
Any,
Generic,
SupportsIndex,
TypeVar,
overload,
)
from typing import Any, Generic, SupportsIndex, TypeVar, Union, cast, overload

import hightime as ht
import numpy as np
import numpy.typing as npt

from nitypes._arguments import arg_to_uint, validate_dtype
from nitypes._arguments import arg_to_uint, validate_dtype, validate_unsupported_arg
from nitypes._exceptions import invalid_arg_type, invalid_array_ndim
from nitypes._typing import TypeAlias
from nitypes.waveform._extended_properties import (
CHANNEL_NAME,
UNIT_DESCRIPTION,
ExtendedPropertyDictionary,
)
from nitypes.waveform._scaling import NO_SCALING, ScaleMode
from nitypes.waveform._timing import PrecisionTiming, Timing, convert_timing
from nitypes.waveform._timing import BaseTiming, PrecisionTiming, Timing, convert_timing
from nitypes.waveform._warnings import scale_mode_mismatch

if sys.version_info < (3, 10):
import array as std_array


_ScalarType = TypeVar("_ScalarType", bound=np.generic)
_ScalarType_co = TypeVar("_ScalarType_co", bound=np.generic, covariant=True)

_AnyTiming: TypeAlias = Union[BaseTiming[Any, Any], Timing, PrecisionTiming]
_TTiming = TypeVar("_TTiming", bound=BaseTiming[Any, Any])

# Use the C types here because np.isdtype() considers some of them to be distinct types, even when
# they have the same size (e.g. np.intc vs. np.int_ vs. np.long).
_ANALOG_DTYPES = (
Expand Down Expand Up @@ -57,7 +60,6 @@
np.double,
)


# Note about NumPy type hints:
# - At time of writing (April 2025), shape typing is still under development, so we do not
# distinguish between 1D and 2D arrays in type hints.
Expand Down Expand Up @@ -229,7 +231,7 @@ def from_array_2d(
"_sample_count",
"_extended_properties",
"_timing",
"_precision_timing",
"_converted_timing_cache",
"_scale_mode",
"__weakref__",
]
Expand All @@ -238,8 +240,8 @@ def from_array_2d(
_start_index: int
_sample_count: int
_extended_properties: ExtendedPropertyDictionary
_timing: Timing | None
_precision_timing: PrecisionTiming | None
_timing: BaseTiming[Any, Any]
_converted_timing_cache: dict[type[_AnyTiming], _AnyTiming]
_scale_mode: ScaleMode

# If neither dtype nor _data is specified, the type parameter defaults to np.float64.
Expand Down Expand Up @@ -357,7 +359,7 @@ def _init_with_new_array(
self._sample_count = sample_count
self._extended_properties = ExtendedPropertyDictionary()
self._timing = Timing.empty
self._precision_timing = None
self._converted_timing_cache = {}
self._scale_mode = NO_SCALING

def _init_with_provided_array(
Expand Down Expand Up @@ -414,7 +416,7 @@ def _init_with_provided_array(
self._sample_count = sample_count
self._extended_properties = ExtendedPropertyDictionary()
self._timing = Timing.empty
self._precision_timing = None
self._converted_timing_cache = {}
self._scale_mode = NO_SCALING

@property
Expand Down Expand Up @@ -579,32 +581,47 @@ def unit_description(self, value: str) -> None:
raise invalid_arg_type("unit description", "str", value)
self._extended_properties[UNIT_DESCRIPTION] = value

def _get_timing(self, requested_type: type[_TTiming]) -> _TTiming:
if isinstance(self._timing, requested_type):
return self._timing
value = cast(_TTiming, self._converted_timing_cache.get(requested_type))
if value is None:
value = convert_timing(requested_type, self._timing)
self._converted_timing_cache[requested_type] = value
return value

def _set_timing(self, value: _TTiming) -> None:
if self._timing is not value:
self._timing = value
self._converted_timing_cache.clear()

def _validate_timing(self, value: _TTiming) -> None:
if value._timestamps is not None and len(value._timestamps) != self._sample_count:
raise ValueError(
"The number of irregular timestamps is not equal to the number of samples in the waveform.\n\n"
f"Number of timestamps: {len(value._timestamps)}\n"
f"Number of samples in the waveform: {self._sample_count}"
)

@property
def timing(self) -> Timing:
"""The timing information of the analog waveform.

The default value is Timing.empty.
"""
if self._timing is None:
if self._precision_timing is PrecisionTiming.empty:
self._timing = Timing.empty
elif self._precision_timing is not None:
self._timing = convert_timing(Timing, self._precision_timing)
else:
raise RuntimeError("The waveform has no timing information.")
return self._timing
return self._get_timing(Timing)

@timing.setter
def timing(self, value: Timing) -> None:
if not isinstance(value, Timing):
raise invalid_arg_type("timing information", "Timing object", value)
self._timing = value
self._precision_timing = None
self._validate_timing(value)
self._set_timing(value)

@property
def is_precision_timing_initialized(self) -> bool:
"""Indicates whether the waveform's precision timing information is initialized."""
return self._precision_timing is not None
"""Indicates whether the waveform's timing information was set using precision timing."""
return isinstance(self._timing, PrecisionTiming)

@property
def precision_timing(self) -> PrecisionTiming:
Expand All @@ -622,21 +639,14 @@ def precision_timing(self) -> PrecisionTiming:
set using AnalogWaveform.timing. Use AnalogWaveform.is_precision_timing_initialized to
determine if AnalogWaveform.precision_timing has been initialized.
"""
if self._precision_timing is None:
if self._timing is Timing.empty:
self._precision_timing = PrecisionTiming.empty
elif self._timing is not None:
self._precision_timing = convert_timing(PrecisionTiming, self._timing)
else:
raise RuntimeError("The waveform has no timing information.")
return self._precision_timing
return self._get_timing(PrecisionTiming)

@precision_timing.setter
def precision_timing(self, value: PrecisionTiming) -> None:
if not isinstance(value, PrecisionTiming):
raise invalid_arg_type("precision timing information", "PrecisionTiming object", value)
self._precision_timing = value
self._timing = None
self._validate_timing(value)
self._set_timing(value)

@property
def scale_mode(self) -> ScaleMode:
Expand All @@ -648,3 +658,129 @@ def scale_mode(self, value: ScaleMode) -> None:
if not isinstance(value, ScaleMode):
raise invalid_arg_type("scale mode", "ScaleMode object", value)
self._scale_mode = value

def append(
self,
other: (
npt.NDArray[_ScalarType_co]
| AnalogWaveform[_ScalarType_co]
| Sequence[AnalogWaveform[_ScalarType_co]]
),
/,
timestamps: Sequence[dt.datetime] | Sequence[ht.datetime] | None = None,
) -> None:
"""Append data to the analog waveform.

Args:
other: The array or waveform(s) to append.
timestamps: A sequence of timestamps. When the current waveform has
SampleIntervalMode.IRREGULAR, you must provide a sequence of timestamps with the
same length as the array.

Raises:
TimingMismatchError: The current and other waveforms have incompatible timing.
ValueError: The other array has the wrong number of dimensions or the length of the
timestamps argument does not match the length of the other array.
TypeError: The data types of the current waveform and other array or waveform(s) do not
match, or an argument has the wrong data type.

Warnings:
TimingMismatchWarning: The sample intervals of the waveform(s) do not match.
ScalingMismatchWarning: The scale modes of the waveform(s) do not match.

When appending waveforms:

* Timing information is merged based on the sample interval mode of the current
waveform:

* SampleIntervalMode.NONE or SampleIntervalMode.REGULAR: The other waveform(s) must also
have SampleIntervalMode.NONE or SampleIntervalMode.REGULAR. If the sample interval does
not match, a TimingMismatchWarning is generated. Otherwise, the timing information of
the other waveform(s) is discarded.

* SampleIntervalMode.IRREGULAR: The other waveforms(s) must also have
SampleIntervalMode.IRREGULAR. The timestamps of the other waveforms(s) are appended to
the current waveform's timing information.

* Extended properties of the other waveform(s) are merged into the current waveform if they
are not already set in the current waveform.

* If the scale mode of other waveform(s) does not match the scale mode of the current
waveform, a ScalingMismatchWarning is generated. Otherwise, the scaling information of the
other waveform(s) is discarded.
"""
if isinstance(other, np.ndarray):
self._append_array(other, timestamps)
elif isinstance(other, AnalogWaveform):
validate_unsupported_arg("timestamps", timestamps)
self._append_waveform(other)
elif isinstance(other, Sequence) and all(isinstance(x, AnalogWaveform) for x in other):
validate_unsupported_arg("timestamps", timestamps)
self._append_waveforms(other)
else:
raise invalid_arg_type("input", "array or waveform(s)", other)

def _append_array(
self,
array: npt.NDArray[_ScalarType_co],
timestamps: Sequence[dt.datetime] | Sequence[ht.datetime] | None = None,
) -> None:
if array.dtype != self.dtype:
raise TypeError(
"The data type of the input array must match the waveform data type.\n\n"
f"Input array data type: {array.dtype}\n"
f"Waveform data type: {self.dtype}"
)
if array.ndim != 1:
raise ValueError(
"The input array must be a one-dimensional array.\n\n"
f"Number of dimensions: {array.ndim}"
)
if timestamps is not None and len(array) != len(timestamps):
raise ValueError(
"The number of irregular timestamps must be equal to the input array length.\n\n"
f"Number of timestamps: {len(timestamps)}\n"
f"Array length: {len(array)}"
)

new_timing = self._timing._append_timestamps(timestamps)

self._increase_capacity(len(array))
self._set_timing(new_timing)

offset = self._start_index + self._sample_count
self._data[offset : offset + len(array)] = array
self._sample_count += len(array)

def _append_waveform(self, waveform: AnalogWaveform[_ScalarType_co]) -> None:
self._append_waveforms([waveform])

def _append_waveforms(self, waveforms: Sequence[AnalogWaveform[_ScalarType_co]]) -> None:
for waveform in waveforms:
if waveform.dtype != self.dtype:
raise TypeError(
"The data type of the input waveform must match the waveform data type.\n\n"
f"Input waveform data type: {waveform.dtype}\n"
f"Waveform data type: {self.dtype}"
)
if waveform._scale_mode != self._scale_mode:
warnings.warn(scale_mode_mismatch())

new_timing = self._timing
for waveform in waveforms:
new_timing = new_timing._append_timing(waveform._timing)

self._increase_capacity(sum(waveform.sample_count for waveform in waveforms))
self._set_timing(new_timing)

offset = self._start_index + self._sample_count
for waveform in waveforms:
self._data[offset : offset + waveform.sample_count] = waveform.raw_data
offset += waveform.sample_count
self._sample_count += waveform.sample_count
self._extended_properties._merge(waveform._extended_properties)

def _increase_capacity(self, amount: int) -> None:
new_capacity = self._start_index + self._sample_count + amount
if new_capacity > self.capacity:
self.capacity = new_capacity
Loading