Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 53 additions & 0 deletions jupyter_server_nbmodel/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,50 @@ async def _execute_snippet(
}


async def dedup_task_queue(q: asyncio.Queue):
"""
Deduplicate tasks in an asyncio.Queue by keeping only the last
submitted task per cell_id.

Problem:
After "Restart Kernel and Run All Cells", tasks from the previous
run may still remain in the queue. This causes duplicate tasks
for the same cell_id to exist. When consumed, both the old and
new tasks run, leading to duplicated execution.

Solution:
This function drains the queue, keeps only the last occurrence
of each cell_id, and puts those tasks back into the queue in the
correct order. That way, each cell_id has only one pending task.
"""
if q.empty():
return

# Drain the queue into a list
items = []
while not q.empty():
items.append(await q.get())

# Track last occurrence index of each cell_id
last_idx = {}
for i, item in enumerate(items):
cell_id = item[2]["cell_id"]
last_idx[cell_id] = i

# Collect items in the order of last appearances
keep = []
seen = set()
for i, item in enumerate(items):
cell_id = item[2]["cell_id"]
if i == last_idx[cell_id] and cell_id not in seen:
keep.append(item)
seen.add(cell_id)

# Put back only the deduplicated items
for item in keep:
await q.put(item)


async def kernel_worker(
kernel_id: str,
client: jupyter_client.asynchronous.client.AsyncKernelClient,
Expand All @@ -264,6 +308,7 @@ async def kernel_worker(
while True:
try:
uid, snippet, metadata = await queue.get()
await dedup_task_queue(queue)
get_logger().debug(f"Processing execution request {uid} for kernel {kernel_id}…")
get_logger().debug("%s %s %s", uid, snippet, metadata)
client.session.session = uid
Expand All @@ -278,13 +323,20 @@ async def kernel_worker(
)
queue.task_done()
get_logger().debug(f"Execution request {uid} processed for kernel {kernel_id}.")

# stop other tasks if one hits error
if results[uid]['status'] == 'error':
while not queue.empty():
queue.get_nowait()
queue.task_done()
except (asyncio.CancelledError, KeyboardInterrupt, RuntimeError) as e:
results[uid] = {"error": str(e)}
get_logger().debug(
f"Stopping execution requests worker for kernel {kernel_id}…", exc_info=e
)
# Empty the queue
while not queue.empty():
queue.get_nowait()
queue.task_done()
to_raise = e
break
Expand All @@ -293,6 +345,7 @@ async def kernel_worker(
f"Failed to process execution request {uid} for kernel {kernel_id}.", exc_info=e
)
if not queue.empty():
queue.get_nowait()
queue.task_done()
if to_raise is not None:
raise to_raise