Skip to content

Commit 337be4c

Browse files
committed
Merge PR #19
2 parents 921344d + 2bf24e8 commit 337be4c

2 files changed

Lines changed: 42 additions & 5 deletions

File tree

tests/interop/test_calls.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -259,3 +259,23 @@ async def dly_asyncio():
259259
with pytest.raises(RuntimeError) as err:
260260
await async_gen_to_list(loop.wrap_generator(dly_asyncio))
261261
assert err.value.args[0] == "I has an owie"
262+
263+
@pytest.mark.trio
264+
async def test_trio_asyncio_generator_with_cancellation(self, loop):
265+
async def dly_asyncio(hold, seen):
266+
yield 1
267+
seen.flag |= 1
268+
await hold.wait()
269+
270+
async def cancel_soon(nursery):
271+
await trio.sleep(0.01)
272+
nursery.cancel_scope.cancel()
273+
274+
hold = asyncio.Event(loop=loop)
275+
seen = Seen()
276+
277+
async with trio.open_nursery() as nursery:
278+
nursery.start_soon(async_gen_to_list, loop.wrap_generator(dly_asyncio, hold, seen))
279+
nursery.start_soon(cancel_soon, nursery)
280+
assert nursery.cancel_scope.cancel_called
281+
assert seen.flag == 1

trio_asyncio/util.py

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -55,13 +55,17 @@ def abort_cb(raise_cancel_arg):
5555
async def run_generator(loop, async_generator):
5656
task = trio.hazmat.current_task()
5757
raise_cancel = None
58+
current_read = None
5859

5960
async def consume_next():
6061
try:
6162
item = await async_generator.__anext__()
6263
result = trio.hazmat.Value(value=item)
6364
except StopAsyncIteration:
6465
result = trio.hazmat.Value(value=STOP)
66+
except asyncio.CancelledError:
67+
# Once we are cancelled, we do not call reschedule() anymore
68+
return
6569
except Exception as e:
6670
result = trio.hazmat.Error(error=e)
6771

@@ -71,16 +75,29 @@ def abort_cb(raise_cancel_arg):
7175
# Save the cancel-raising function
7276
nonlocal raise_cancel
7377
raise_cancel = raise_cancel_arg
74-
# XXX: we need to cancel any actice consume_next() call.
75-
# Keep waiting
76-
return trio.hazmat.Abort.FAILED
78+
79+
if not current_read:
80+
# There is no current read
81+
return trio.hazmat.Abort.SUCCEEDED
82+
else:
83+
# Attempt to cancel the current iterator read, do not
84+
# report success until the future was actually cancelled.
85+
already_cancelled = current_read.cancel()
86+
if already_cancelled:
87+
return trio.hazmat.Abort.SUCCEEDED
88+
else:
89+
# Continue dealing with the cancellation once
90+
# future.cancel() goes to the result of
91+
# wait_task_rescheduled()
92+
return trio.hazmat.Abort.FAILED
7793

7894
try:
7995
while True:
80-
# schedule that we read the next one from the iterator
81-
asyncio.ensure_future(consume_next(), loop=loop)
96+
# Schedule in asyncio that we read the next item from the iterator
97+
current_read = asyncio.ensure_future(consume_next(), loop=loop)
8298

8399
item = await trio.hazmat.wait_task_rescheduled(abort_cb)
100+
84101
if item is STOP:
85102
break
86103
await yield_(item)

0 commit comments

Comments
 (0)