Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions Lib/asyncio/base_subprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ def __init__(self, loop, protocol, args, shell,
self._pending_calls = collections.deque()
self._pipes = {}
self._finished = False
self._pipes_connected = False

if stdin == subprocess.PIPE:
self._pipes[0] = None
Expand Down Expand Up @@ -213,6 +214,7 @@ async def _connect_pipes(self, waiter):
else:
if waiter is not None and not waiter.cancelled():
waiter.set_result(None)
self._pipes_connected = True

def _call(self, cb, *data):
if self._pending_calls is not None:
Expand Down Expand Up @@ -256,6 +258,15 @@ def _try_finish(self):
assert not self._finished
if self._returncode is None:
return
if not self._pipes_connected:
# self._pipes_connected can be False if not all pipes were connected
# because either the process failed to start or the self._connect_pipes task
# got cancelled. In this broken state we consider all pipes disconnected and
# to avoid hanging forever in self._wait as otherwise _exit_waiters
# would never be woken up, we wake them up here.
for waiter in self._exit_waiters:
if not waiter.cancelled():
waiter.set_result(self._returncode)
if all(p is not None and p.disconnected
for p in self._pipes.values()):
self._finished = True
Expand Down
40 changes: 39 additions & 1 deletion Lib/test/test_asyncio/test_subprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from asyncio import subprocess
from test.test_asyncio import utils as test_utils
from test import support
from test.support import os_helper
from test.support import os_helper, warnings_helper, gc_collect

if not support.has_subprocess_support:
raise unittest.SkipTest("test module requires subprocess")
Expand Down Expand Up @@ -879,6 +879,44 @@ async def main():

self.loop.run_until_complete(main())

@warnings_helper.ignore_warnings(category=ResourceWarning)
def test_subprocess_read_pipe_cancelled(self):
async def main():
loop = asyncio.get_running_loop()
loop.connect_read_pipe = mock.AsyncMock(side_effect=asyncio.CancelledError)
with self.assertRaises(asyncio.CancelledError):
await asyncio.create_subprocess_exec(*PROGRAM_BLOCKED, stderr=asyncio.subprocess.PIPE)

asyncio.run(main())
gc_collect()

@warnings_helper.ignore_warnings(category=ResourceWarning)
def test_subprocess_write_pipe_cancelled(self):
async def main():
loop = asyncio.get_running_loop()
loop.connect_write_pipe = mock.AsyncMock(side_effect=asyncio.CancelledError)
with self.assertRaises(asyncio.CancelledError):
await asyncio.create_subprocess_exec(*PROGRAM_BLOCKED, stdin=asyncio.subprocess.PIPE)

asyncio.run(main())
gc_collect()

@warnings_helper.ignore_warnings(category=ResourceWarning)
def test_subprocess_read_write_pipe_cancelled(self):
async def main():
loop = asyncio.get_running_loop()
loop.connect_read_pipe = mock.AsyncMock(side_effect=asyncio.CancelledError)
loop.connect_write_pipe = mock.AsyncMock(side_effect=asyncio.CancelledError)
with self.assertRaises(asyncio.CancelledError):
await asyncio.create_subprocess_exec(
*PROGRAM_BLOCKED,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)

asyncio.run(main())
gc_collect()

if sys.platform != 'win32':
# Unix
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix hang when cancelling process created by :func:`asyncio.create_subprocess_exec` or :func:`asyncio.create_subprocess_shell`. Patch by Kumar Aditya.
Loading