From 416ab958e749f707d3666febe72bd94b67c49cc0 Mon Sep 17 00:00:00 2001 From: rle Date: Wed, 20 Aug 2025 00:29:21 +0700 Subject: [PATCH 1/2] fix: errors when triggerring 'Restart kernel and run all cells' --- jupyter_server_nbmodel/actions.py | 51 +++++++++++++++++++++++++++++++ 1 file changed, 51 insertions(+) diff --git a/jupyter_server_nbmodel/actions.py b/jupyter_server_nbmodel/actions.py index a59eb9a..34fc75e 100644 --- a/jupyter_server_nbmodel/actions.py +++ b/jupyter_server_nbmodel/actions.py @@ -250,6 +250,50 @@ async def _execute_snippet( } +async def dedup_task_queue(q: asyncio.Queue): + """ + Deduplicate tasks in an asyncio.Queue by keeping only the last + submitted task per cell_id. + + Problem: + After "Restart Kernel and Run All Cells", tasks from the previous + run may still remain in the queue. This causes duplicate tasks + for the same cell_id to exist. When consumed, both the old and + new tasks run, leading to duplicated execution. + + Solution: + This function drains the queue, keeps only the last occurrence + of each cell_id, and puts those tasks back into the queue in the + correct order. That way, each cell_id has only one pending task. + """ + if q.empty(): + return + + # Drain the queue into a list + items = [] + while not q.empty(): + items.append(await q.get()) + + # Track last occurrence index of each cell_id + last_idx = {} + for i, item in enumerate(items): + cell_id = item[2]["cell_id"] + last_idx[cell_id] = i + + # Collect items in the order of last appearances + keep = [] + seen = set() + for i, item in enumerate(items): + cell_id = item[2]["cell_id"] + if i == last_idx[cell_id] and cell_id not in seen: + keep.append(item) + seen.add(cell_id) + + # Put back only the deduplicated items + for item in keep: + await q.put(item) + + async def kernel_worker( kernel_id: str, client: jupyter_client.asynchronous.client.AsyncKernelClient, @@ -264,6 +308,7 @@ async def kernel_worker( while True: try: uid, snippet, metadata = await queue.get() + await dedup_task_queue(queue) get_logger().debug(f"Processing execution request {uid} for kernel {kernel_id}…") get_logger().debug("%s %s %s", uid, snippet, metadata) client.session.session = uid @@ -278,6 +323,12 @@ async def kernel_worker( ) queue.task_done() get_logger().debug(f"Execution request {uid} processed for kernel {kernel_id}.") + + # stop other tasks if one hits error + if results[uid]['status'] == 'error': + while not queue.empty(): + queue.get_nowait() + queue.task_done() except (asyncio.CancelledError, KeyboardInterrupt, RuntimeError) as e: results[uid] = {"error": str(e)} get_logger().debug( From d8f8dd99bbee80081736d4ad52d710085797bdc7 Mon Sep 17 00:00:00 2001 From: rle Date: Wed, 20 Aug 2025 00:45:17 +0700 Subject: [PATCH 2/2] fix: correct task cleanup --- jupyter_server_nbmodel/actions.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/jupyter_server_nbmodel/actions.py b/jupyter_server_nbmodel/actions.py index 34fc75e..73b6bbc 100644 --- a/jupyter_server_nbmodel/actions.py +++ b/jupyter_server_nbmodel/actions.py @@ -336,6 +336,7 @@ async def kernel_worker( ) # Empty the queue while not queue.empty(): + queue.get_nowait() queue.task_done() to_raise = e break @@ -344,6 +345,7 @@ async def kernel_worker( f"Failed to process execution request {uid} for kernel {kernel_id}.", exc_info=e ) if not queue.empty(): + queue.get_nowait() queue.task_done() if to_raise is not None: raise to_raise