From 2b1c1289f76f19614537e01aa0580e33571b82a6 Mon Sep 17 00:00:00 2001 From: Emmanuel Briot Date: Fri, 7 Feb 2025 11:23:56 +0100 Subject: [PATCH] Clear queues on terminating the pool On code that dynamically add work to pool, for instance async with aiomultiprocess.Pool() as pool: async with asyncio.TaskGroup() as tg: _ = tg.create_task(monitor.listen() Where monitor.listen() gets work (for instance by listening to postgresql notifications) and then calls pool.queue_work). When we exit the application with Ctrl-C, it hangs forever. --- aiomultiprocess/pool.py | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/aiomultiprocess/pool.py b/aiomultiprocess/pool.py index 93e7603..5c66602 100644 --- a/aiomultiprocess/pool.py +++ b/aiomultiprocess/pool.py @@ -371,6 +371,28 @@ def terminate(self) -> None: for process in self.processes: process.terminate() + # Remove all remaining work in the queues, since otherwise this will + # prevent proper exit while multiprocessing waits until the threads + # associated with the queue has terminated, and that thread itself is + # waiting for queues to empty (but the processes have exited, so will + # not be doing the work). + for (tx, rx) in self.queues.values(): + while True: + try: + _ = tx.get_nowait() + except queue.Empty: + break + tx.close() + + while True: + try: + _ = rx.get_nowait() + except queue.Empty: + break + rx.close() + + self.queues = {} + async def join(self) -> None: """Wait for the pool to finish gracefully.""" if self.running: