Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pysquared/config/radio.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ def __init__(self, radio_dict: dict) -> None:
self.start_time: int = radio_dict["start_time"]
self.fsk: FSKConfig = FSKConfig(radio_dict["fsk"])
self.lora: LORAConfig = LORAConfig(radio_dict["lora"])
self.default_callsigns: list = radio_dict["default_callsigns"]
self.main_sat_license: str = radio_dict["main_sat_license"]

self.RADIO_SCHEMA = {
"license": {"type": str},
Expand All @@ -57,6 +59,8 @@ def __init__(self, radio_dict: dict) -> None:
"min1": 915.0,
"max1": 915.0,
},
"default_callsigns": {"type": list},
"main_sat_license": {"type": str},
}

def validate(self, key: str, value) -> None:
Expand Down
28 changes: 24 additions & 4 deletions pysquared/hardware/radio/packetizer/packet_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,10 @@ def __init__(
self._license: str = license
# 1 byte for packet identifier, 2 bytes for sequence number, 2 for total packets, 1 for rssi
self._header_size: int = 6
self._payload_size: int = radio.get_max_packet_size() - self._header_size
self._callsign_size: int = 6
self._payload_size: int = (
radio.get_max_packet_size() - self._header_size - self._callsign_size
)
self._message_counter: Counter = message_counter

def send(self, data: bytes) -> bool:
Expand Down Expand Up @@ -92,6 +95,7 @@ def _pack_data(self, data: bytes) -> list[bytes]:
- 2 bytes: sequence number (0-based)
- 2 bytes: total number of packets
- 1 byte: RSSI
- callsign: 6 bytes
- remaining bytes: payload

Args:
Expand Down Expand Up @@ -120,13 +124,17 @@ def _pack_data(self, data: bytes) -> list[bytes]:
+ abs(self._radio.get_rssi()).to_bytes(1, "big")
)

# Create callsign from license, ensuring exactly 6 bytes
license_bytes = self._license.encode() if self._license else b""
# Truncate to 6 chars, pad with nulls to exactly 6 bytes
callsign: bytes = license_bytes[:6] + b"\x00" * (6 - len(license_bytes[:6]))
# Get payload slice for this packet
start: int = sequence_number * self._payload_size
end: int = start + self._payload_size
payload: bytes = data[start:end]

# Combine header and payload
packet: bytes = header + payload
# Combine header, callsign and payload
packet: bytes = header + callsign + payload
packets.append(packet)

return packets
Expand Down Expand Up @@ -171,6 +179,7 @@ def listen(self, timeout: Optional[int] = None) -> bytes | None:
"Received packet",
packet_length=len(packet),
header=self._get_header(packet),
callsign=self._get_callsign(packet),
payload=self._get_payload(packet),
)

Expand Down Expand Up @@ -232,6 +241,17 @@ def _get_header(self, packet: bytes) -> tuple[int, int, int, int]:
-int.from_bytes(packet[5:6], "big"), # RSSI
)

def _get_callsign(self, packet: bytes) -> bytes:
"""Returns the callsign.

Args:
packet: The packet to extract the callsign from.

Returns:
A string containing the callsign (up to 6 characters).
"""
return packet[self._header_size : self._header_size + self._callsign_size]

def _get_payload(self, packet: bytes) -> bytes:
"""Returns the payload of the packet.

Expand All @@ -241,7 +261,7 @@ def _get_payload(self, packet: bytes) -> bytes:
Returns:
The payload of the packet.
"""
return packet[self._header_size :]
return packet[self._header_size + self._callsign_size :]

def _get_packet_identifier(self) -> int:
"""Increments message_counter and returns the current identifier for a packet"""
Expand Down
18 changes: 14 additions & 4 deletions tests/unit/hardware/radio/packetizer/test_packet_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,9 @@ def test_packet_manager_init(mock_logger, mock_radio, mock_message_counter):
assert packet_manager._message_counter is mock_message_counter
assert packet_manager._send_delay == 0.5
assert packet_manager._header_size == 6
assert packet_manager._payload_size == 94 # 100 - 6 header bytes
assert packet_manager._callsign_size == len(license_str.encode())
# payload_size = max_packet_size - header_size - callsign_size = 100 - 6 - 12 = 82
assert packet_manager._payload_size == 82


def test_pack_data_single_packet(mock_logger, mock_radio, mock_message_counter):
Expand All @@ -85,19 +87,24 @@ def test_pack_data_single_packet(mock_logger, mock_radio, mock_message_counter):

# Check packet structure
packet = packets[0]
assert len(packet) == len(test_data) + packet_manager._header_size
expected_packet_size = (
len(test_data) + packet_manager._header_size + packet_manager._callsign_size
)
assert len(packet) == expected_packet_size

# Check header
packet_identifier = int.from_bytes(packet[0:1], "big")
sequence_number = int.from_bytes(packet[1:3], "big")
total_packets = int.from_bytes(packet[3:5], "big")
rssi = int.from_bytes(packet[5:6], "big")
payload = packet[6:]
callsign = packet[6 : 6 + packet_manager._callsign_size]
payload = packet[6 + packet_manager._callsign_size :]

assert packet_identifier == mock_message_counter.get()
assert sequence_number == 0
assert total_packets == 1
assert rssi == 70
assert callsign == license_str.encode()
assert payload == test_data


Expand All @@ -117,6 +124,7 @@ def test_pack_data_multiple_packets(mock_logger, mock_radio, mock_message_counte
# Create test data that requires multiple packets
# With a payload size of 94, this will require 3 packets
test_data = b"X" * 250

packets = packet_manager._pack_data(test_data)
assert len(packets) == 3

Expand All @@ -128,12 +136,14 @@ def test_pack_data_multiple_packets(mock_logger, mock_radio, mock_message_counte
sequence_number = int.from_bytes(packet[1:3], "big")
total_packets = int.from_bytes(packet[3:5], "big")
rssi = int.from_bytes(packet[5:6], "big")
payload = packet[6:]
callsign = packet[6 : 6 + packet_manager._callsign_size]
payload = packet[6 + packet_manager._callsign_size :]

assert packet_identifier == mock_message_counter.get()
assert sequence_number == i
assert total_packets == 3
assert rssi == 70
assert callsign == license_str.encode()
reconstructed_data += payload

# Verify the reconstructed data matches the original
Expand Down
Loading