Skip to content

Commit 065eba5

Browse files
committed
Annoying fixes for the sync loop
Problem: $THING calls loop.stop() but then doesn't immediately return. * split the processing part of the mainloop so that we can cobble up a non-waiting single-step mode * call the single-step mode while there's something to do (whenever we wait for run_until_complete) * return a RuntimeError when the loop has been cancelled * improve error handling by copying the __cause__ of that cancellation (let's hope that it exists) to that error
1 parent 84c7645 commit 065eba5

3 files changed

Lines changed: 77 additions & 52 deletions

File tree

trio_asyncio/async_.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ def stop(self, waiter=None):
6868

6969
def stop_me():
7070
waiter.set()
71-
raise StopIteration
71+
raise StopAsyncIteration
7272

7373
if self._stopped.is_set():
7474
waiter.set()

trio_asyncio/base.py

Lines changed: 54 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -636,59 +636,66 @@ async def _main_loop(self, task_status=trio.TASK_STATUS_IGNORED):
636636

637637
try:
638638
while True:
639-
obj = None
640-
if self._timers:
641-
timeout = self._timers[0]._when - self.time()
642-
if timeout <= 0:
643-
# If the timer ran out, process the object now.
644-
obj = heapq.heappop(self._timers)
645-
else:
646-
timeout = math.inf
647-
648-
if obj is None:
649-
with trio.move_on_after(timeout):
650-
obj = await self._q.get()
651-
if obj is None:
652-
# Timeout reached. Presumably now a timer is ready,
653-
# so restart from the beginning.
654-
continue
655-
656-
if isinstance(obj, trio.Event):
657-
# Events are used for synchronization.
658-
# Simply set them.
659-
if obj is self._stopped:
660-
break
661-
obj.set()
662-
continue
663-
664-
if isinstance(obj, TimerHandle):
665-
# A TimerHandle is added to the list of timers.
666-
heapq.heappush(self._timers, obj)
667-
continue
668-
669-
assert isinstance(obj, asyncio.Handle)
670-
# Hopefully one of ours
671-
# but it might be a standard asyncio handle
672-
673-
if obj._cancelled:
674-
# simply skip cancelled handlers
675-
continue
676-
677-
# Don't go through the expensive nursery dance
678-
# if this is a sync function.
679-
if getattr(obj, '_is_sync', True):
680-
obj._callback(*obj._args)
681-
else:
682-
await self._nursery.start(obj._call_async)
683-
684-
except StopIteration:
639+
await self._main_loop_one()
640+
except StopAsyncIteration:
685641
# raised by .stop_me() to interrupt the loop
686642
pass
687-
643+
except trio.Cancelled:
644+
import pdb;pdb.set_trace()
645+
raise
688646
finally:
689647
# Signal that the loop is no longer running
690648
self._stopped.set()
691649

650+
async def _main_loop_one(self, no_wait=False):
651+
obj = None
652+
if self._timers:
653+
timeout = self._timers[0]._when - self.time()
654+
if timeout <= 0:
655+
# If the timer ran out, process the object now.
656+
obj = heapq.heappop(self._timers)
657+
else:
658+
timeout = math.inf
659+
660+
if obj is None:
661+
if no_wait:
662+
obj = self._q.get_nowait()
663+
else:
664+
with trio.move_on_after(timeout):
665+
obj = await self._q.get()
666+
if obj is None:
667+
# Timeout reached. Presumably now a timer is ready,
668+
# so restart from the beginning.
669+
return
670+
671+
if isinstance(obj, trio.Event):
672+
# Events are used for synchronization.
673+
# Simply set them.
674+
if obj is self._stopped:
675+
raise StopAsyncIteration
676+
obj.set()
677+
return
678+
679+
if isinstance(obj, TimerHandle):
680+
# A TimerHandle is added to the list of timers.
681+
heapq.heappush(self._timers, obj)
682+
return
683+
684+
assert isinstance(obj, asyncio.Handle)
685+
# Hopefully one of ours
686+
# but it might be a standard asyncio handle
687+
688+
if obj._cancelled:
689+
# simply skip cancelled handlers
690+
return
691+
692+
# Don't go through the expensive nursery dance
693+
# if this is a sync function.
694+
if getattr(obj, '_is_sync', True):
695+
obj._callback(*obj._args)
696+
else:
697+
await self._nursery.start(obj._call_async)
698+
692699
async def _main_loop_exit(self):
693700
"""Finalize the loop. It may not be re-entered."""
694701
if self._closed:

trio_asyncio/sync.py

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
1+
import sys
12
import trio
23
import queue
34
import asyncio
45
import threading
6+
import traceback
57

68
from .base import BaseTrioEventLoop
79
from .handles import Handle
@@ -48,12 +50,12 @@ def stop(self):
4850
"""
4951

5052
def do_stop():
51-
raise StopIteration
53+
raise StopAsyncIteration
5254

5355

5456
# async def stop_me():
5557
# def kick_():
56-
# raise StopIteration
58+
# raise StopAsyncIteration
5759
# self._queue_handle(Handle(kick_, (), self, context=None, is_sync=True))
5860
# await self._main_loop()
5961
# if threading.current_thread() != self._thread:
@@ -141,10 +143,19 @@ def is_done(_):
141143
future.add_done_callback(is_done)
142144
try:
143145
await self._main_loop()
146+
try:
147+
while result is None:
148+
try:
149+
await self._main_loop_one(no_wait=True)
150+
except StopAsyncIteration:
151+
pass
152+
except trio.WouldBlock:
153+
pass
144154
finally:
145155
future.remove_done_callback(is_done)
156+
146157
if result is None:
147-
result = trio.hazmat.Error(RuntimeError('Event loop stopped before Future completed.'))
158+
raise RuntimeError('Event loop stopped before Future completed.')
148159
return result.unwrap()
149160

150161
def __run_in_thread(self, async_fn, *args):
@@ -155,6 +166,8 @@ def __run_in_thread(self, async_fn, *args):
155166
raise RuntimeError("The Trio thread is not running")
156167
self.__blocking_job_queue.put((async_fn, args))
157168
res = self.__blocking_result_queue.get()
169+
if res is None:
170+
raise RuntimeError("Loop has died / terminated")
158171
return res.unwrap()
159172

160173
def _start_loop(self):
@@ -188,8 +201,13 @@ async def __trio_thread_main(self):
188201
async_fn, args = req
189202

190203
result = await trio.hazmat.Result.acapture(async_fn, *args)
204+
if type(result) == trio.hazmat.Error and type(result.error) == trio.Cancelled:
205+
res = RuntimeError("Main loop cancelled")
206+
res.__cause__ = result.error.__cause__
207+
result = trio.hazmat.Error(res)
191208
self.__blocking_result_queue.put(result)
192-
await self._main_loop_exit()
209+
with trio.open_cancel_scope(shield=True):
210+
await self._main_loop_exit()
193211
self.__blocking_result_queue.put(None)
194212
nursery.cancel_scope.cancel()
195213

0 commit comments

Comments
 (0)