@@ -55,13 +55,17 @@ def abort_cb(raise_cancel_arg):
5555async def run_generator (loop , async_generator ):
5656 task = trio .hazmat .current_task ()
5757 raise_cancel = None
58+ current_read = None
5859
5960 async def consume_next ():
6061 try :
6162 item = await async_generator .__anext__ ()
6263 result = trio .hazmat .Value (value = item )
6364 except StopAsyncIteration :
6465 result = trio .hazmat .Value (value = STOP )
66+ except asyncio .CancelledError :
67+ # Once we are cancelled, we do not call reschedule() anymore
68+ return
6569 except Exception as e :
6670 result = trio .hazmat .Error (error = e )
6771
@@ -71,16 +75,29 @@ def abort_cb(raise_cancel_arg):
7175 # Save the cancel-raising function
7276 nonlocal raise_cancel
7377 raise_cancel = raise_cancel_arg
74- # XXX: we need to cancel any actice consume_next() call.
75- # Keep waiting
76- return trio .hazmat .Abort .FAILED
78+
79+ if not current_read :
80+ # There is no current read
81+ return trio .hazmat .Abort .SUCCEEDED
82+ else :
83+ # Attempt to cancel the current iterator read, do not
84+ # report success until the future was actually cancelled.
85+ already_cancelled = current_read .cancel ()
86+ if already_cancelled :
87+ return trio .hazmat .Abort .SUCCEEDED
88+ else :
89+ # Continue dealing with the cancellation once
90+ # future.cancel() goes to the result of
91+ # wait_task_rescheduled()
92+ return trio .hazmat .Abort .FAILED
7793
7894 try :
7995 while True :
80- # schedule that we read the next one from the iterator
81- asyncio .ensure_future (consume_next (), loop = loop )
96+ # Schedule in asyncio that we read the next item from the iterator
97+ current_read = asyncio .ensure_future (consume_next (), loop = loop )
8298
8399 item = await trio .hazmat .wait_task_rescheduled (abort_cb )
100+
84101 if item is STOP :
85102 break
86103 await yield_ (item )
0 commit comments