@@ -126,8 +126,66 @@ impl Reactor {
126126 }
127127 }
128128
129+ /// The reactor tracks the set of WASI pollables which have an associated
130+ /// Future pending on their readiness. This function returns indicating
131+ /// that set of pollables is not empty.
132+ pub ( crate ) fn nonempty_pending_pollables ( & self ) -> bool {
133+ !self . inner . borrow ( ) . wakers . is_empty ( )
134+ }
135+
129136 /// Block until at least one pending pollable is ready, waking a pending future.
137+ /// Precondition: self.nonempty_pending_pollables() is true.
130138 pub ( crate ) fn block_on_pollables ( & self ) {
139+ self . check_pollables ( |targets| {
140+ debug_assert_ne ! (
141+ targets. len( ) ,
142+ 0 ,
143+ "Attempting to block on an empty list of pollables - without any pending work, no progress can be made and wasi::io::poll::poll will trap"
144+ ) ;
145+ wasi:: io:: poll:: poll ( targets)
146+
147+ } )
148+ }
149+
150+ /// Without blocking, check for any ready pollables and wake the
151+ /// associated futures.
152+ pub ( crate ) fn nonblock_check_pollables ( & self ) {
153+ // If there are no pollables with associated pending futures, there is
154+ // no work to do here, so return immediately.
155+ if !self . nonempty_pending_pollables ( ) {
156+ return ;
157+ }
158+ // Lazily create a pollable which always resolves to ready.
159+ use std:: sync:: LazyLock ;
160+ static READY_POLLABLE : LazyLock < Pollable > =
161+ LazyLock :: new ( || wasi:: clocks:: monotonic_clock:: subscribe_duration ( 0 ) ) ;
162+
163+ self . check_pollables ( |targets| {
164+ // Create a new set of targets, with the addition of the ready
165+ // pollable:
166+ let ready_index = targets. len ( ) ;
167+ let mut new_targets = Vec :: with_capacity ( ready_index + 1 ) ;
168+ new_targets. extend_from_slice ( targets) ;
169+ new_targets. push ( & * READY_POLLABLE ) ;
170+
171+ // Poll is now guaranteed to return immediately, because at least
172+ // one member is ready:
173+ let mut ready_list = wasi:: io:: poll:: poll ( & new_targets) ;
174+
175+ // Erase our extra ready pollable from the ready list:
176+ ready_list. retain ( |e| * e != ready_index as u32 ) ;
177+ ready_list
178+ } )
179+ }
180+
181+ /// Common core of blocking and nonblocking pollable checks. Wakes any
182+ /// futures which are pending on the pollables, according to the result of
183+ /// the check_ready function.
184+ /// Precondition: self.nonempty_pending_pollables() is true.
185+ fn check_pollables < F > ( & self , check_ready : F )
186+ where
187+ F : FnOnce ( & [ & Pollable ] ) -> Vec < u32 > ,
188+ {
131189 let reactor = self . inner . borrow ( ) ;
132190
133191 // We're about to wait for a number of pollables. When they wake we get
@@ -144,15 +202,9 @@ impl Reactor {
144202 targets. push ( & reactor. pollables [ pollable_index. 0 ] ) ;
145203 }
146204
147- debug_assert_ne ! (
148- targets. len( ) ,
149- 0 ,
150- "Attempting to block on an empty list of pollables - without any pending work, no progress can be made and wasi::io::poll::poll will trap"
151- ) ;
152-
153- // Now that we have that association, we're ready to poll our targets.
154- // This will block until an event has completed.
155- let ready_indexes = wasi:: io:: poll:: poll ( & targets) ;
205+ // Now that we have that association, we're ready to check our targets for readiness.
206+ // (This is either a wasi poll, or the nonblocking variant.)
207+ let ready_indexes = check_ready ( & targets) ;
156208
157209 // Once we have the indexes for which pollables are available, we need
158210 // to convert it back to the right keys for the wakers. Earlier we
@@ -321,4 +373,31 @@ mod test {
321373 ) ;
322374 } )
323375 }
376+
377+ #[ test]
378+ fn cooperative_concurrency ( ) {
379+ crate :: runtime:: block_on ( async {
380+ let cpu_heavy = async move {
381+ // Simulating a CPU-heavy task that runs for 1 second and yields occasionally
382+ for _ in 0 ..10 {
383+ std:: thread:: sleep ( std:: time:: Duration :: from_millis ( 100 ) ) ;
384+ futures_lite:: future:: yield_now ( ) . await ;
385+ }
386+ true
387+ } ;
388+ let timeout = async move {
389+ crate :: time:: Timer :: after ( crate :: time:: Duration :: from_millis ( 200 ) )
390+ . wait ( )
391+ . await ;
392+ false
393+ } ;
394+ let mut future_group = futures_concurrency:: future:: FutureGroup :: <
395+ Pin < Box < dyn std:: future:: Future < Output = bool > > > ,
396+ > :: new ( ) ;
397+ future_group. insert ( Box :: pin ( cpu_heavy) ) ;
398+ future_group. insert ( Box :: pin ( timeout) ) ;
399+ let result = futures_lite:: StreamExt :: next ( & mut future_group) . await ;
400+ assert_eq ! ( result, Some ( false ) , "cpu_heavy task should have timed out" ) ;
401+ } ) ;
402+ }
324403}
0 commit comments