1- use super :: {
2- polling:: { EventKey , Poller } ,
3- REACTOR ,
4- } ;
1+ use super :: REACTOR ;
52
63use core:: cell:: RefCell ;
74use core:: future;
8- use core:: task:: Poll ;
9- use core:: task:: Waker ;
5+ use core:: pin:: Pin ;
6+ use core:: task:: { Context , Poll , Waker } ;
7+ use slab:: Slab ;
108use std:: collections:: HashMap ;
119use std:: rc:: Rc ;
1210use wasi:: io:: poll:: Pollable ;
1311
12+ /// A key for a Pollable, which is an index into the Slab<Pollable> in Reactor.
13+ #[ repr( transparent) ]
14+ #[ derive( Debug , PartialEq , Eq , PartialOrd , Ord , Hash , Clone , Copy ) ]
15+ pub ( crate ) struct EventKey ( pub ( crate ) usize ) ;
16+
17+ /// A Registration is a reference to the Reactor's owned Pollable. When the registration is
18+ /// dropped, the reactor will drop the Pollable resource.
19+ #[ derive( Debug , PartialEq , Eq , Hash ) ]
20+ struct Registration {
21+ key : EventKey ,
22+ }
23+
24+ impl Drop for Registration {
25+ fn drop ( & mut self ) {
26+ Reactor :: current ( ) . deregister_event ( self . key )
27+ }
28+ }
29+
30+ /// An AsyncPollable is a reference counted Registration. It can be cloned, and used to create
31+ /// as many WaitFor futures on a Pollable that the user needs.
32+ #[ derive( Debug , Clone , PartialEq , Eq , Hash ) ]
33+ pub struct AsyncPollable ( Rc < Registration > ) ;
34+
35+ impl AsyncPollable {
36+ /// Create a Future that waits for the Pollable's readiness.
37+ pub fn wait_for ( & self ) -> WaitFor {
38+ use std:: sync:: atomic:: { AtomicUsize , Ordering } ;
39+ static COUNTER : AtomicUsize = AtomicUsize :: new ( 0 ) ;
40+ let unique = COUNTER . fetch_add ( 1 , Ordering :: Relaxed ) ;
41+ WaitFor {
42+ waitee : Waitee {
43+ pollable : self . clone ( ) ,
44+ unique,
45+ } ,
46+ needs_deregistration : false ,
47+ }
48+ }
49+ }
50+
51+ #[ derive( Debug , PartialEq , Eq , Hash , Clone ) ]
52+ struct Waitee {
53+ /// This needs to be a reference counted registration, because it may outlive the AsyncPollable
54+ /// &self that it was created from.
55+ pollable : AsyncPollable ,
56+ unique : usize ,
57+ }
58+
59+ /// A Future that waits for the Pollable's readiness.
60+ #[ must_use = "futures do nothing unless polled or .awaited" ]
61+ #[ derive( Debug ) ]
62+ pub struct WaitFor {
63+ waitee : Waitee ,
64+ needs_deregistration : bool ,
65+ }
66+ impl future:: Future for WaitFor {
67+ type Output = ( ) ;
68+ fn poll ( mut self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Self :: Output > {
69+ let reactor = Reactor :: current ( ) ;
70+ if reactor. ready ( & self . as_ref ( ) . waitee , cx. waker ( ) ) {
71+ Poll :: Ready ( ( ) )
72+ } else {
73+ self . as_mut ( ) . needs_deregistration = true ;
74+ Poll :: Pending
75+ }
76+ }
77+ }
78+ impl Drop for WaitFor {
79+ fn drop ( & mut self ) {
80+ if self . needs_deregistration {
81+ Reactor :: current ( ) . deregister_waitee ( & self . waitee )
82+ }
83+ }
84+ }
85+
1486/// Manage async system resources for WASI 0.2
1587#[ derive( Debug , Clone ) ]
1688pub struct Reactor {
@@ -21,8 +93,8 @@ pub struct Reactor {
2193/// a lock of the whole.
2294#[ derive( Debug ) ]
2395struct InnerReactor {
24- poller : Poller ,
25- wakers : HashMap < EventKey , Waker > ,
96+ pollables : Slab < Pollable > ,
97+ wakers : HashMap < Waitee , Waker > ,
2698}
2799
28100impl Reactor {
@@ -43,7 +115,7 @@ impl Reactor {
43115 pub ( crate ) fn new ( ) -> Self {
44116 Self {
45117 inner : Rc :: new ( RefCell :: new ( InnerReactor {
46- poller : Poller :: new ( ) ,
118+ pollables : Slab :: new ( ) ,
47119 wakers : HashMap :: new ( ) ,
48120 } ) ) ,
49121 }
@@ -62,41 +134,152 @@ impl Reactor {
62134 /// reason that we have to call all the wakers - even if by default they
63135 /// will do nothing.
64136 pub ( crate ) fn block_until ( & self ) {
137+ let reactor = self . inner . borrow ( ) ;
138+
139+ // We're about to wait for a number of pollables. When they wake we get
140+ // the *indexes* back for the pollables whose events were available - so
141+ // we need to be able to associate the index with the right waker.
142+
143+ // We start by iterating over the pollables, and keeping note of which
144+ // pollable belongs to which waker
145+ let mut indexed_wakers = Vec :: with_capacity ( reactor. wakers . len ( ) ) ;
146+ let mut targets = Vec :: with_capacity ( reactor. wakers . len ( ) ) ;
147+ for ( waitee, waker) in reactor. wakers . iter ( ) {
148+ let pollable_index = waitee. pollable . 0 . key ;
149+ indexed_wakers. push ( waker) ;
150+ targets. push ( & reactor. pollables [ pollable_index. 0 ] ) ;
151+ }
152+
153+ debug_assert_ne ! (
154+ targets. len( ) ,
155+ 0 ,
156+ "Attempting to block on an empty list of pollables - without any pending work, no progress can be made and wasi::io::poll::poll will trap"
157+ ) ;
158+
159+ // Now that we have that association, we're ready to poll our targets.
160+ // This will block until an event has completed.
161+ let ready_indexes = wasi:: io:: poll:: poll ( & targets) ;
162+
163+ // Once we have the indexes for which pollables are available, we need
164+ // to convert it back to the right keys for the wakers. Earlier we
165+ // established a positional index -> waker key relationship, so we can
166+ // go right ahead and perform a lookup there.
167+ let ready_wakers = ready_indexes
168+ . into_iter ( )
169+ . map ( |index| indexed_wakers[ index as usize ] ) ;
170+
171+ for waker in ready_wakers {
172+ waker. wake_by_ref ( )
173+ }
174+ }
175+
176+ /// Turn a Wasi [`Pollable`] into an [`AsyncPollable`]
177+ pub fn schedule ( & self , pollable : Pollable ) -> AsyncPollable {
178+ let mut reactor = self . inner . borrow_mut ( ) ;
179+ let key = EventKey ( reactor. pollables . insert ( pollable) ) ;
180+ AsyncPollable ( Rc :: new ( Registration { key } ) )
181+ }
182+
183+ fn deregister_event ( & self , key : EventKey ) {
184+ let mut reactor = self . inner . borrow_mut ( ) ;
185+ reactor. pollables . remove ( key. 0 ) ;
186+ }
187+
188+ fn deregister_waitee ( & self , waitee : & Waitee ) {
189+ let mut reactor = self . inner . borrow_mut ( ) ;
190+ reactor. wakers . remove ( waitee) ;
191+ }
192+
193+ fn ready ( & self , waitee : & Waitee , waker : & Waker ) -> bool {
65194 let mut reactor = self . inner . borrow_mut ( ) ;
66- for key in reactor. poller . block_until ( ) {
67- match reactor. wakers . get ( & key) {
68- Some ( waker) => waker. wake_by_ref ( ) ,
69- None => panic ! ( "tried to wake the waker for non-existent `{:?}`" , key) ,
70- }
195+ let ready = reactor
196+ . pollables
197+ . get ( waitee. pollable . 0 . key . 0 )
198+ . expect ( "only live EventKey can be checked for readiness" )
199+ . ready ( ) ;
200+ if !ready {
201+ reactor. wakers . insert ( waitee. clone ( ) , waker. clone ( ) ) ;
71202 }
203+ ready
72204 }
73205
74206 /// Wait for the pollable to resolve.
75207 pub async fn wait_for ( & self , pollable : Pollable ) {
76- let mut pollable = Some ( pollable) ;
77- let mut key = None ;
78- // This function is the core loop of our function; it will be called
79- // multiple times as the future is resolving.
80- future:: poll_fn ( |cx| {
81- // Start by taking a lock on the reactor. This is single-threaded
82- // and short-lived, so it will never be contended.
83- let mut reactor = self . inner . borrow_mut ( ) ;
84-
85- // Schedule interest in the `pollable` on the first iteration. On
86- // every iteration, register the waker with the reactor.
87- let key = key. get_or_insert_with ( || reactor. poller . insert ( pollable. take ( ) . unwrap ( ) ) ) ;
88- reactor. wakers . insert ( * key, cx. waker ( ) . clone ( ) ) ;
89-
90- // Check whether we're ready or need to keep waiting. If we're
91- // ready, we clean up after ourselves.
92- if reactor. poller . get ( key) . unwrap ( ) . ready ( ) {
93- reactor. poller . remove ( * key) ;
94- reactor. wakers . remove ( key) ;
95- Poll :: Ready ( ( ) )
96- } else {
97- Poll :: Pending
98- }
208+ let p = self . schedule ( pollable) ;
209+ p. wait_for ( ) . await
210+ }
211+ }
212+
213+ #[ cfg( test) ]
214+ mod test {
215+ use super :: * ;
216+ // Using WASMTIME_LOG, observe that this test doesn't even call poll() - the pollable is ready
217+ // immediately.
218+ #[ test]
219+ fn subscribe_no_duration ( ) {
220+ crate :: runtime:: block_on ( async {
221+ let reactor = Reactor :: current ( ) ;
222+ let pollable = wasi:: clocks:: monotonic_clock:: subscribe_duration ( 0 ) ;
223+ let sched = reactor. schedule ( pollable) ;
224+ sched. wait_for ( ) . await ;
225+ } )
226+ }
227+ // Using WASMTIME_LOG, observe that this test calls poll() until the timer is ready.
228+ #[ test]
229+ fn subscribe_some_duration ( ) {
230+ crate :: runtime:: block_on ( async {
231+ let reactor = Reactor :: current ( ) ;
232+ let pollable = wasi:: clocks:: monotonic_clock:: subscribe_duration ( 10_000_000 ) ;
233+ let sched = reactor. schedule ( pollable) ;
234+ sched. wait_for ( ) . await ;
235+ } )
236+ }
237+
238+ // Using WASMTIME_LOG, observe that this test results in a single poll() on the second
239+ // subscription, rather than spinning in poll() with first subscription, which is instantly
240+ // ready, but not what the waker requests.
241+ #[ test]
242+ fn subscribe_multiple_durations ( ) {
243+ crate :: runtime:: block_on ( async {
244+ let reactor = Reactor :: current ( ) ;
245+ let now = wasi:: clocks:: monotonic_clock:: subscribe_duration ( 0 ) ;
246+ let soon = wasi:: clocks:: monotonic_clock:: subscribe_duration ( 10_000_000 ) ;
247+ let now = reactor. schedule ( now) ;
248+ let soon = reactor. schedule ( soon) ;
249+ soon. wait_for ( ) . await ;
250+ drop ( now)
251+ } )
252+ }
253+
254+ // Using WASMTIME_LOG, observe that this test results in two calls to poll(), one with both
255+ // pollables because both are awaiting, and one with just the later pollable.
256+ #[ test]
257+ fn subscribe_multiple_durations_zipped ( ) {
258+ crate :: runtime:: block_on ( async {
259+ let reactor = Reactor :: current ( ) ;
260+ let start = wasi:: clocks:: monotonic_clock:: now ( ) ;
261+ let soon = wasi:: clocks:: monotonic_clock:: subscribe_duration ( 10_000_000 ) ;
262+ let later = wasi:: clocks:: monotonic_clock:: subscribe_duration ( 40_000_000 ) ;
263+ let soon = reactor. schedule ( soon) ;
264+ let later = reactor. schedule ( later) ;
265+
266+ futures_lite:: future:: zip (
267+ async move {
268+ soon. wait_for ( ) . await ;
269+ println ! (
270+ "*** subscribe_duration(soon) ready ({})" ,
271+ wasi:: clocks:: monotonic_clock:: now( ) - start
272+ ) ;
273+ } ,
274+ async move {
275+ later. wait_for ( ) . await ;
276+ println ! (
277+ "*** subscribe_duration(later) ready ({})" ,
278+ wasi:: clocks:: monotonic_clock:: now( ) - start
279+ ) ;
280+ } ,
281+ )
282+ . await ;
99283 } )
100- . await
101284 }
102285}
0 commit comments