Skip to content
Merged
8 changes: 4 additions & 4 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ click = ">=8.0.0"
distro = { version = ">=1.9.0", platform = "linux" }
requests = ">=2.25.0"
typing_extensions = { version = ">=4.0.0" }
nitypes = {version=">=1.0.0"}
nitypes = {version=">=1.1.0.dev1"}

[tool.poetry.group.codegen.dependencies]
Mako = "^1.2"
Expand Down
28 changes: 23 additions & 5 deletions tests/component/_digital_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from nitypes.waveform.typing import AnyDigitalState, TDigitalState

import nidaqmx
import nidaqmx.system

_D = TypeVar("_D", bound=numpy.generic)

Expand Down Expand Up @@ -119,17 +120,17 @@ def _get_digital_port_data_sample_major(task: nidaqmx.Task, num_samples: int) ->
return numpy.transpose(result).tolist()


def _bool_array_to_int(bool_array: numpy.typing.NDArray[numpy.bool_]) -> int:
def _bool_array_to_int_lsb(bool_array: numpy.typing.NDArray[numpy.bool_]) -> int:
result = 0
# Simulated data is little-endian
# Interpret data as little-endian
for bit in bool_array[::-1]:
result = (result << 1) | int(bit)
return result


def _bool_array_to_int_msb(bool_array: numpy.typing.NDArray[numpy.bool_]) -> int:
result = 0
# Data from ports is big-endian (see AB#3178052)
# Interpret data as big-endian
for bit in bool_array:
result = (result << 1) | int(bit)
return result
Expand All @@ -144,10 +145,10 @@ def _int_to_bool_array(num_lines: int, input: int) -> numpy.typing.NDArray[numpy

def _get_waveform_data(waveform: DigitalWaveform[Any]) -> list[int]:
assert isinstance(waveform, DigitalWaveform)
return [_bool_array_to_int(sample) for sample in waveform.data]
return [_bool_array_to_int_lsb(sample) for sample in waveform.data]


def _get_waveform_data_msb(waveform: DigitalWaveform[Any]) -> list[int]:
def _get_waveform_port_data(waveform: DigitalWaveform[Any]) -> list[int]:
assert isinstance(waveform, DigitalWaveform)
return [_bool_array_to_int_msb(sample) for sample in waveform.data]

Expand Down Expand Up @@ -232,3 +233,20 @@ def _read_and_copy(
) -> numpy.typing.NDArray[_D]:
read_func(array)
return array.copy()


def _validate_waveform_signals(
device: nidaqmx.system.device.Device,
waveform: DigitalWaveform[Any],
lines: list[int] | range, # signal index to line index mapping
) -> None:
lines_list = list(lines)
signal_count = waveform.signal_count
sample_count = waveform.sample_count
assert signal_count == len(lines_list)
for signal_index in range(signal_count):
line_index = lines_list[signal_index]
signal = waveform.signals[signal_index]
assert signal.signal_index == signal_index
assert signal.name == device.di_lines[line_index].name
assert signal.data.tolist() == _get_expected_data_for_line(sample_count, line_index)

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
_get_digital_port_data_sample_major,
_get_num_do_lines_in_task,
_get_waveform_data,
_get_waveform_data_msb,
_get_waveform_port_data,
_int_to_bool_array,
_start_do_task,
)
Expand Down Expand Up @@ -369,9 +369,9 @@ def test___digital_multi_channel_writer___write_waveforms_ports___outputs_match_
actual_value = di_multi_channel_port_loopback_task.read()
assert actual_value[0] != actual_value[1]
assert actual_value == [
_get_waveform_data_msb(waveforms[0])[-1],
_get_waveform_data_msb(waveforms[1])[-1],
] # TODO: AB#3178052 - change to _get_waveform_data()
_get_waveform_port_data(waveforms[0])[-1],
_get_waveform_port_data(waveforms[1])[-1],
]


def test___digital_multi_channel_writer___write_waveforms_port_and_lines___outputs_match_final_values(
Expand All @@ -395,9 +395,9 @@ def test___digital_multi_channel_writer___write_waveforms_port_and_lines___outpu
assert samples_written == num_samples
actual_value = di_multi_channel_port_and_lines_loopback_task.read()
assert actual_value == [
_get_waveform_data_msb(waveforms[0])[-1],
_get_waveform_port_data(waveforms[0])[-1],
_get_waveform_data(waveforms[1])[-1],
] # TODO: AB#3178052 - change to _get_waveform_data()
]


def test___digital_multi_channel_writer___write_waveforms_with_non_contiguous_data___outputs_match_final_values(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
_get_digital_data,
_get_num_do_lines_in_task,
_get_waveform_data,
_get_waveform_data_msb,
_get_waveform_port_data,
_int_to_bool_array,
)

Expand Down Expand Up @@ -426,9 +426,7 @@ def test___digital_single_channel_writer___write_waveform_port_uint8___outputs_m
actual_value = di_port1_loopback_task.read()
assert samples_written == num_samples
assert waveform.signal_count == num_lines
assert (
actual_value == _get_waveform_data_msb(waveform)[i - 1]
) # TODO: AB#3178052 - change to _get_waveform_data()
assert actual_value == _get_waveform_port_data(waveform)[i - 1]


def test___digital_single_channel_writer___write_waveform_port_uint32___outputs_match_final_values(
Expand All @@ -450,6 +448,4 @@ def test___digital_single_channel_writer___write_waveform_port_uint32___outputs_
actual_value = di_port0_loopback_task.read()
assert samples_written == num_samples
assert waveform.signal_count == num_lines
assert (
actual_value == _get_waveform_data_msb(waveform)[i - 1]
) # TODO: AB#3178052 - change to _get_waveform_data()
assert actual_value == _get_waveform_port_data(waveform)[i - 1]
Loading