Skip to content

Commit f5a0690

Browse files
fix some asyncio cancellation bugs
1 parent 578d726 commit f5a0690

5 files changed

Lines changed: 164 additions & 19 deletions

File tree

Lib/asyncio/base_subprocess.py

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ def __init__(self, loop, protocol, args, shell,
2222
self._proc = None
2323
self._pid = None
2424
self._returncode = None
25-
self._exit_waiters = []
25+
self._exit_waiters = set()
2626
self._pending_calls = collections.deque()
2727
self._pipes = {}
2828
self._finished = False
@@ -209,6 +209,11 @@ async def _connect_pipes(self, waiter):
209209
except (SystemExit, KeyboardInterrupt):
210210
raise
211211
except BaseException as exc:
212+
# Close any pipes that were already connected before the
213+
# error/cancellation to avoid leaking file descriptors.
214+
for proto in self._pipes.values():
215+
if proto is not None:
216+
proto.pipe.close()
212217
if waiter is not None and not waiter.cancelled():
213218
waiter.set_exception(exc)
214219
else:
@@ -251,8 +256,11 @@ async def _wait(self):
251256
return self._returncode
252257

253258
waiter = self._loop.create_future()
254-
self._exit_waiters.append(waiter)
255-
return await waiter
259+
self._exit_waiters.add(waiter)
260+
try:
261+
return await waiter
262+
finally:
263+
self._exit_waiters.discard(waiter)
256264

257265
def _try_finish(self):
258266
assert not self._finished
@@ -280,7 +288,6 @@ def _call_connection_lost(self, exc):
280288
for waiter in self._exit_waiters:
281289
if not waiter.done():
282290
waiter.set_result(self._returncode)
283-
self._exit_waiters = None
284291
self._loop = None
285292
self._proc = None
286293
self._protocol = None

Lib/asyncio/subprocess.py

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -160,10 +160,10 @@ async def _feed_stdin(self, input):
160160
# write() and drain() can raise these exceptions.
161161
if debug:
162162
logger.debug('%r communicate: stdin got %r', self, exc)
163-
164-
if debug:
165-
logger.debug('%r communicate: close stdin', self)
166-
self.stdin.close()
163+
finally:
164+
if debug:
165+
logger.debug('%r communicate: close stdin', self)
166+
self.stdin.close()
167167

168168
async def _noop(self):
169169
return None
@@ -178,12 +178,14 @@ async def _read_stream(self, fd):
178178
if self._loop.get_debug():
179179
name = 'stdout' if fd == 1 else 'stderr'
180180
logger.debug('%r communicate: read %s', self, name)
181-
output = await stream.read()
182-
if self._loop.get_debug():
183-
name = 'stdout' if fd == 1 else 'stderr'
184-
logger.debug('%r communicate: close %s', self, name)
185-
transport.close()
186-
return output
181+
try:
182+
output = await stream.read()
183+
if self._loop.get_debug():
184+
name = 'stdout' if fd == 1 else 'stderr'
185+
logger.debug('%r communicate: close %s', self, name)
186+
return output
187+
finally:
188+
transport.close()
187189

188190
async def communicate(self, input=None):
189191
if self.stdin is not None:
@@ -198,8 +200,13 @@ async def communicate(self, input=None):
198200
stderr = self._read_stream(2)
199201
else:
200202
stderr = self._noop()
201-
stdin, stdout, stderr = await tasks.gather(stdin, stdout, stderr)
202-
await self.wait()
203+
try:
204+
stdin, stdout, stderr = await tasks.gather(stdin, stdout, stderr)
205+
except:
206+
self.kill()
207+
raise
208+
finally:
209+
await self.wait()
203210
return (stdout, stderr)
204211

205212

Lib/asyncio/unix_events.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,7 @@ async def _make_subprocess_transport(self, protocol, args, shell,
211211
raise
212212
except BaseException:
213213
transp.close()
214-
await transp._wait()
214+
await tasks.shield(transp._wait())
215215
raise
216216

217217
return transp

Lib/asyncio/windows_events.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -408,7 +408,7 @@ async def _make_subprocess_transport(self, protocol, args, shell,
408408
raise
409409
except BaseException:
410410
transp.close()
411-
await transp._wait()
411+
await tasks.shield(transp._wait())
412412
raise
413413

414414
return transp

Lib/test/test_asyncio/test_subprocess.py

Lines changed: 132 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ def test_proc_exited_no_invalid_state_error_on_exit_waiters(self):
125125

126126
# Simulate a waiter registered via _wait() before the process exits.
127127
exit_waiter = self.loop.create_future()
128-
transport._exit_waiters.append(exit_waiter)
128+
transport._exit_waiters.add(exit_waiter)
129129

130130
# _connect_pipes hasn't completed, so _pipes_connected is False.
131131
self.assertFalse(transport._pipes_connected)
@@ -910,6 +910,137 @@ async def main():
910910

911911
self.loop.run_until_complete(main())
912912

913+
def test_communicate_cancellation_kills_process(self):
914+
async def run():
915+
proc = await asyncio.create_subprocess_exec(
916+
*PROGRAM_BLOCKED,
917+
stdout=subprocess.PIPE,
918+
)
919+
with self.assertRaises(asyncio.TimeoutError):
920+
await asyncio.wait_for(proc.communicate(), 0.1)
921+
922+
returncode = await asyncio.wait_for(proc.wait(), 5.0)
923+
self.assertIsNotNone(returncode)
924+
925+
self.loop.run_until_complete(run())
926+
927+
def test_communicate_cancellation_closes_stdout_transport(self):
928+
async def run():
929+
proc = await asyncio.create_subprocess_exec(
930+
*PROGRAM_BLOCKED,
931+
stdout=subprocess.PIPE,
932+
)
933+
try:
934+
with self.assertRaises(asyncio.TimeoutError):
935+
await asyncio.wait_for(proc.communicate(), 0.1)
936+
937+
await asyncio.sleep(0)
938+
939+
stdout_transport = proc._transport.get_pipe_transport(1)
940+
self.assertTrue(
941+
stdout_transport is None or stdout_transport.is_closing(),
942+
"stdout pipe transport not closed after cancellation")
943+
finally:
944+
if proc.returncode is None:
945+
proc.kill()
946+
await proc.wait()
947+
948+
self.loop.run_until_complete(run())
949+
950+
def test_communicate_cancellation_closes_stdin(self):
951+
async def run():
952+
proc = await asyncio.create_subprocess_exec(
953+
*PROGRAM_BLOCKED,
954+
stdin=subprocess.PIPE,
955+
stdout=subprocess.PIPE,
956+
)
957+
try:
958+
large_input = b'x' * (1024 * 1024)
959+
with self.assertRaises(asyncio.TimeoutError):
960+
await asyncio.wait_for(
961+
proc.communicate(large_input), 0.5)
962+
963+
await asyncio.sleep(0)
964+
965+
stdin_transport = proc._transport.get_pipe_transport(0)
966+
self.assertTrue(
967+
stdin_transport is None or stdin_transport.is_closing(),
968+
"stdin pipe transport not closed after cancellation")
969+
finally:
970+
if proc.returncode is None:
971+
proc.kill()
972+
await proc.wait()
973+
974+
self.loop.run_until_complete(run())
975+
976+
def test_communicate_cancellation_closes_stderr_transport(self):
977+
async def run():
978+
proc = await asyncio.create_subprocess_exec(
979+
*PROGRAM_BLOCKED,
980+
stderr=subprocess.PIPE,
981+
)
982+
try:
983+
with self.assertRaises(asyncio.TimeoutError):
984+
await asyncio.wait_for(proc.communicate(), 0.1)
985+
986+
await asyncio.sleep(0)
987+
988+
stderr_transport = proc._transport.get_pipe_transport(2)
989+
self.assertTrue(
990+
stderr_transport is None or stderr_transport.is_closing(),
991+
"stderr pipe transport not closed after cancellation")
992+
finally:
993+
if proc.returncode is None:
994+
proc.kill()
995+
await proc.wait()
996+
997+
self.loop.run_until_complete(run())
998+
999+
def test_wait_cancellation_removes_exit_waiters(self):
1000+
async def run():
1001+
proc = await asyncio.create_subprocess_exec(*PROGRAM_BLOCKED)
1002+
try:
1003+
for _ in range(5):
1004+
task = self.loop.create_task(proc.wait())
1005+
self.loop.call_soon(task.cancel)
1006+
try:
1007+
await task
1008+
except asyncio.CancelledError:
1009+
pass
1010+
1011+
self.assertEqual(len(proc._transport._exit_waiters), 0)
1012+
finally:
1013+
proc.kill()
1014+
await proc.wait()
1015+
1016+
self.loop.run_until_complete(run())
1017+
1018+
def test_communicate_cancellation_all_pipes(self):
1019+
async def run():
1020+
proc = await asyncio.create_subprocess_exec(
1021+
*PROGRAM_BLOCKED,
1022+
stdin=subprocess.PIPE,
1023+
stdout=subprocess.PIPE,
1024+
stderr=subprocess.PIPE,
1025+
)
1026+
large_input = b'x' * (1024 * 1024)
1027+
with self.assertRaises(asyncio.TimeoutError):
1028+
await asyncio.wait_for(
1029+
proc.communicate(large_input), 0.5)
1030+
1031+
await asyncio.sleep(0)
1032+
1033+
for fd, name in [(0, 'stdin'), (1, 'stdout'), (2, 'stderr')]:
1034+
transport = proc._transport.get_pipe_transport(fd)
1035+
self.assertTrue(
1036+
transport is None or transport.is_closing(),
1037+
f"{name} pipe transport not closed after cancellation")
1038+
1039+
returncode = await asyncio.wait_for(proc.wait(), 5.0)
1040+
self.assertIsNotNone(returncode)
1041+
1042+
self.loop.run_until_complete(run())
1043+
9131044
@warnings_helper.ignore_warnings(category=ResourceWarning)
9141045
def test_subprocess_read_pipe_cancelled(self):
9151046
async def main():

0 commit comments

Comments
 (0)