diff --git a/tests/test_application.py b/tests/test_application.py index f51794d..1e7a85b 100644 --- a/tests/test_application.py +++ b/tests/test_application.py @@ -276,7 +276,7 @@ async def test_deconz_dev_remove_from_group(app, nwk, device_path): assert group.remove_member.call_count == 2 -def test_deconz_props(nwk, device_path): +def test_deconz_props(app, nwk, device_path): deconz = application.DeconzDevice("Conbee II", app, sentinel.ieee, nwk) assert deconz.manufacturer is not None assert deconz.model is not None @@ -305,20 +305,20 @@ async def test_deconz_new(app, nwk, device_path, monkeypatch): def test_tx_confirm_success(app): tsn = 123 - req = app._pending[tsn] = MagicMock() + req = app._pending_requests[tsn] = MagicMock() app.handle_tx_confirm(tsn, sentinel.status) - assert req.result.set_result.call_count == 1 - assert req.result.set_result.call_args[0][0] is sentinel.status + assert req.set_result.call_count == 1 + assert req.set_result.call_args[0][0] is sentinel.status def test_tx_confirm_dup(app, caplog): caplog.set_level(logging.DEBUG) tsn = 123 - req = app._pending[tsn] = MagicMock() - req.result.set_result.side_effect = asyncio.InvalidStateError + req = app._pending_requests[tsn] = MagicMock() + req.set_result.side_effect = asyncio.InvalidStateError app.handle_tx_confirm(tsn, sentinel.status) - assert req.result.set_result.call_count == 1 - assert req.result.set_result.call_args[0][0] is sentinel.status + assert req.set_result.call_count == 1 + assert req.set_result.call_args[0][0] is sentinel.status assert any(r.levelname == "DEBUG" for r in caplog.records) assert "probably duplicate response" in caplog.text diff --git a/zigpy_deconz/zigbee/application.py b/zigpy_deconz/zigbee/application.py index 9d08289..ae9766e 100644 --- a/zigpy_deconz/zigbee/application.py +++ b/zigpy_deconz/zigbee/application.py @@ -73,7 +73,7 @@ def __init__(self, config: dict[str, Any]): super().__init__(config=config) self._api = None - self._pending = zigpy.util.Requests() + self._pending_requests = {} self._delayed_neighbor_scan_task = None self._reconnect_task = None @@ -504,7 +504,14 @@ async def send_packet(self, packet): async with self._limit_concurrency(priority=packet.priority): req_id = self.get_sequence() - with self._pending.new(req_id) as req: + if req_id in self._pending_requests: + raise zigpy.exceptions.DeliveryError( + f"Request with id {req_id} is already pending, cannot send" + ) + + future = self._pending_requests[req_id] = asyncio.Future() + + try: try: await self._api.aps_data_request( req_id=req_id, @@ -525,12 +532,14 @@ async def send_packet(self, packet): ) async with asyncio_timeout(SEND_CONFIRM_TIMEOUT): - status = await req.result + status = await future if status != TXStatus.SUCCESS: raise zigpy.exceptions.DeliveryError( f"Failed to deliver packet: {status!r}", status ) + finally: + del self._pending_requests[req_id] async def permit_ncp(self, time_s=60): assert 0 <= time_s <= 254 @@ -538,18 +547,20 @@ async def permit_ncp(self, time_s=60): def handle_tx_confirm(self, req_id, status): try: - self._pending[req_id].result.set_result(status) - return + future = self._pending_requests[req_id] except KeyError: LOGGER.warning( "Unexpected transmit confirm for request id %s, Status: %s", req_id, status, ) - except asyncio.InvalidStateError as exc: - LOGGER.debug( - "Invalid state on future - probably duplicate response: %s", exc - ) + else: + try: + future.set_result(status) + except asyncio.InvalidStateError as exc: + LOGGER.debug( + "Invalid state on future - probably duplicate response: %s", exc + ) async def restore_neighbours(self) -> None: """Restore children.""" @@ -599,10 +610,10 @@ async def _delayed_neighbour_scan(self) -> None: class DeconzDevice(zigpy.device.Device): """Zigpy Device representing Coordinator.""" - def __init__(self, model: str, *args): + def __init__(self, model: str, *args, **kwargs): """Initialize instance.""" - super().__init__(*args) + super().__init__(*args, **kwargs) self._model = model async def add_to_group(self, grp_id: int, name: str = None) -> None: