Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 51 additions & 16 deletions src/nitypes/waveform/_digital/_port.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from __future__ import annotations

import sys
from collections.abc import Sequence
from typing import Literal

import numpy as np
import numpy.typing as npt
Expand Down Expand Up @@ -76,15 +78,28 @@ def _get_port_dtype(mask: int) -> np.dtype[AnyDigitalPort]:
)


def port_to_line_data(port_data: npt.NDArray[AnyDigitalPort], mask: int) -> npt.NDArray[np.uint8]:
def port_to_line_data(
port_data: npt.NDArray[AnyDigitalPort], mask: int, bitorder: Literal["big", "little"] = "big"
) -> npt.NDArray[np.uint8]:
"""Convert a 1D array of port data to a 2D array of line data, using the specified mask.

>>> port_to_line_data(np.array([0,1,2,3], np.uint8), 0xFF)
array([[0, 0, 0, 0, 0, 0, 0, 0],
[0, 0, 0, 0, 0, 0, 0, 1],
[0, 0, 0, 0, 0, 0, 1, 0],
[0, 0, 0, 0, 0, 0, 1, 1]], dtype=uint8)

>>> port_to_line_data(np.array([0,1,2,3], np.uint8), 0xFF, bitorder="little")
array([[0, 0, 0, 0, 0, 0, 0, 0],
[1, 0, 0, 0, 0, 0, 0, 0],
[0, 1, 0, 0, 0, 0, 0, 0],
[1, 1, 0, 0, 0, 0, 0, 0]], dtype=uint8)
>>> port_to_line_data(np.array([0,1,2,3], np.uint8), 0x3)
array([[0, 0],
[0, 1],
[1, 0],
[1, 1]], dtype=uint8)
>>> port_to_line_data(np.array([0,1,2,3], np.uint8), 0x3, bitorder="little")
array([[0, 0],
[1, 0],
[0, 1],
Expand All @@ -97,29 +112,42 @@ def port_to_line_data(port_data: npt.NDArray[AnyDigitalPort], mask: int) -> npt.
>>> port_to_line_data(np.array([0,1,2,3], np.uint8), 0)
array([], shape=(4, 0), dtype=uint8)
>>> port_to_line_data(np.array([0x12000000,0xFE000000], np.uint32), 0xFF000000)
array([[0, 1, 0, 0, 1, 0, 0, 0],
[0, 1, 1, 1, 1, 1, 1, 1]], dtype=uint8)
array([[0, 0, 0, 1, 0, 0, 1, 0],
[1, 1, 1, 1, 1, 1, 1, 0]], dtype=uint8)
"""
port_size = port_data.dtype.itemsize * 8
line_data = np.unpackbits(port_data.view(np.uint8), bitorder="little")
# Convert to big-endian byte order to ensure MSB comes first when bitorder='big'
# For multi-byte types on little-endian systems, we need to byteswap
if bitorder != sys.byteorder and port_data.dtype.itemsize > 1:
port_data = port_data.byteswap()

line_data = np.unpackbits(port_data.view(np.uint8), bitorder=bitorder)
line_data = line_data.reshape(len(port_data), port_size)

if mask == bit_mask(port_size):
return line_data
else:
return line_data[:, _mask_to_line_indices(mask)]
return line_data[:, _mask_to_column_indices(mask, port_size, bitorder)]


def _mask_to_line_indices(mask: int, /) -> list[int]:
"""Return the line indices for the given mask.
def _mask_to_column_indices(
mask: int, port_size: int, bitorder: Literal["big", "little"], /
) -> list[int]:
"""Return the column indices for the given mask.

>>> _mask_to_line_indices(0xF)
>>> _mask_to_column_indices(0xF, 8, "big")
[4, 5, 6, 7]
>>> _mask_to_column_indices(0x100, 16, "big")
[7]
>>> _mask_to_column_indices(0xDEADBEEF, 32, "big")
[0, 1, 3, 4, 5, 6, 8, 10, 12, 13, 15, 16, 18, 19, 20, 21, 22, 24, 25, 26, 28, 29, 30, 31]
>>> _mask_to_column_indices(0xF, 8, "little")
[0, 1, 2, 3]
>>> _mask_to_line_indices(0x100)
>>> _mask_to_column_indices(0x100, 16, "little")
[8]
>>> _mask_to_line_indices(0xDEADBEEF)
>>> _mask_to_column_indices(0xDEADBEEF, 32, "little")
[0, 1, 2, 3, 5, 6, 7, 9, 10, 11, 12, 13, 15, 16, 18, 19, 21, 23, 25, 26, 27, 28, 30, 31]
>>> _mask_to_line_indices(-1)
>>> _mask_to_column_indices(-1, 8)
Traceback (most recent call last):
...
ValueError: The mask must be a non-negative integer.
Expand All @@ -128,11 +156,18 @@ def _mask_to_line_indices(mask: int, /) -> list[int]:
"""
if mask < 0:
raise ValueError("The mask must be a non-negative integer.\n\n" f"Mask: {mask}")
line_indices = []
line_index = 0
column_indices = []
bit_position = 0
while mask != 0:
if mask & 1:
line_indices.append(line_index)
line_index += 1
if bitorder == "big":
column_indices.append(port_size - 1 - bit_position)
else: # little
column_indices.append(bit_position)
bit_position += 1
mask >>= 1
return line_indices

if bitorder == "big":
column_indices.reverse()

return column_indices
39 changes: 32 additions & 7 deletions src/nitypes/waveform/_digital/_signal.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,26 @@ class DigitalWaveformSignal(Generic[TDigitalState]):
collection, e.g. ``waveform.signals[0]`` or ``waveform.signals["Dev1/port0/line0"]``.
"""

__slots__ = ["_owner", "_signal_index", "__weakref__"]
__slots__ = ["_owner", "_signal_index", "_column_index", "__weakref__"]

_owner: DigitalWaveform[TDigitalState]
_column_index: int
_signal_index: int

def __init__(self, owner: DigitalWaveform[TDigitalState], signal_index: SupportsIndex) -> None:
def __init__(
self,
owner: DigitalWaveform[TDigitalState],
signal_index: SupportsIndex,
column_index: SupportsIndex | None = None,
) -> None:
"""Initialize a new digital waveform signal."""
if column_index is None:
# when unpickling an old version, column_index may not be provided
column_index = signal_index

self._owner = owner
self._signal_index = arg_to_uint("signal index", signal_index)
self._column_index = arg_to_uint("column index", column_index)

@property
def owner(self) -> DigitalWaveform[TDigitalState]:
Expand All @@ -40,22 +51,36 @@ def owner(self) -> DigitalWaveform[TDigitalState]:

@property
def signal_index(self) -> int:
"""The signal index."""
"""The signal's position in the DigitalWaveform.signals collection (0-based)."""
return self._signal_index

@property
def column_index(self) -> int:
"""The signal's position in the DigitalWaveform.data array's second dimension (0-based).

This index is used to access the signal's data within the waveform's data array:
`waveform.data[:, column_index]`.

Note: The column_index is reversed compared to the signal_index. column_index 0 (the
leftmost column) corresponds to the highest signal_index and highest line number. The
highest column_index (the rightmost column) corresponds to signal_index 0 and line 0. This
matches industry conventions where line 0 is the LSB and appears as the rightmost bit.
"""
return self._column_index

@property
def data(self) -> npt.NDArray[TDigitalState]:
"""The signal data, indexed by sample."""
return self._owner.data[:, self._signal_index]
return self._owner.data[:, self._column_index]

@property
def name(self) -> str:
"""The signal name."""
return self._owner._get_signal_name(self._signal_index)
return self._owner._get_line_name(self._column_index)

@name.setter
def name(self, value: str) -> None:
self._owner._set_signal_name(self._signal_index, value)
self._owner._set_line_name(self._column_index, value)

def __eq__(self, value: object, /) -> bool:
"""Return self==value."""
Expand All @@ -66,7 +91,7 @@ def __eq__(self, value: object, /) -> bool:

def __reduce__(self) -> tuple[Any, ...]:
"""Return object state for pickling."""
ctor_args = (self._owner, self._signal_index)
ctor_args = (self._owner, self._signal_index, self._column_index)
return (self.__class__, ctor_args)

def __repr__(self) -> str:
Expand Down
16 changes: 10 additions & 6 deletions src/nitypes/waveform/_digital/_signal_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,21 +49,25 @@ def __getitem__(
self, index: int | str | slice
) -> DigitalWaveformSignal[TDigitalState] | Sequence[DigitalWaveformSignal[TDigitalState]]:
"""Get self[index]."""
if isinstance(index, int):
if isinstance(index, int): # index is the signal index
if index < 0:
index += len(self._signals)
value = self._signals[index]
if value is None:
value = self._signals[index] = DigitalWaveformSignal(self._owner, index)
column_index = self._owner._reverse_index(index)
value = self._signals[index] = DigitalWaveformSignal(
self._owner, index, column_index
)
return value
elif isinstance(index, str):
signal_names = self._owner._get_signal_names()
elif isinstance(index, str): # index is the line name
line_names = self._owner._get_line_names()
try:
signal_index = signal_names.index(index)
column_index = line_names.index(index)
except ValueError:
raise IndexError(index)
signal_index = self._owner._reverse_index(column_index)
return self[signal_index]
elif isinstance(index, slice):
elif isinstance(index, slice): # index is a slice of signal indices
return [self[i] for i in range(*index.indices(len(self)))]
else:
raise invalid_arg_type("index", "int or str", index)
Loading