Skip to content

Commit 80be8d6

Browse files
authored
yield to executor when polling with zero-duration wait (#13085)
* yield to executor when polling with zero-duration wait This addresses #13040 for `wasmtime-wasi`'s WASIp2 implementation by calling `tokio::task::yield_now` in `<Deadline as Pollable>::ready` when `self` is a `Deadline::Past`. Previously, it did nothing, which had the effect of starving other pollables that rely on yielding control to `mio` in order to progress, e.g. when the guest polls with a zero timeout in a busy loop. This also addresses that issue for `wasmtime`'s `thread.yield` implementation by modifying the `poll_until` event loop in `concurrent.rs` to yield to the executor prior to executing any low-priority tasks. This works because `thread.yield` operates by queueing a low-priority task to resume the calling fiber just prior to suspending it. Note that, because the `wasmtime` crate does not depend on Tokio, we can't directly call `tokio::task::yield_now`. Instead, we return a `Future` which, when polled for the first time, wakes the `Context`'s `Waker` and returns `Poll::Pending`; then it returns `Poll::Ready(())` for subsequent polls. That's enough to ensure `mio` has a chance to run when using `tokio`. Once we have a mechanism to allow the embedder to configure a custom yield function, we'll be able to use that instead. Fixes #13040 * remove obsolete assertions from `p2_sleep` test Now that zero-duration `monotonic_clock` waits yield to the executor, they aren't ready immediately, so the assertions in this test are no longer appropriate. * address review feedback - Expand comments regarding why we yield to the runtime in `wasmtime` and `wasmtime-wasi` - Move `yield_to_executor` to `StoreOpaque::yield_now` and remove `vm/async_yield` in favor of the new function - Revert `take_low_priority` performance tweak (will open a separate PR for that) - Inline `collect_work_itmes_to_run` for clarity
1 parent b1c0b5d commit 80be8d6

14 files changed

Lines changed: 341 additions & 82 deletions

File tree

crates/test-programs/src/bin/p2_sleep.rs

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,10 @@ fn sleep_0ms() {
1818
let p = monotonic_clock::subscribe_instant(monotonic_clock::now());
1919
p.block();
2020
let p = monotonic_clock::subscribe_duration(0);
21-
assert!(
22-
p.ready(),
23-
"timer subscription with duration 0 is ready immediately"
24-
);
21+
p.block();
2522
}
2623

2724
fn sleep_backwards_in_time() {
2825
let p = monotonic_clock::subscribe_instant(monotonic_clock::now() - 1);
29-
assert!(
30-
p.ready(),
31-
"timer subscription for instant which has passed is ready immediately"
32-
);
26+
p.block();
3327
}
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
use test_programs::sockets::supports_ipv6;
2+
use test_programs::wasi::clocks::monotonic_clock;
3+
use test_programs::wasi::io::poll;
4+
use test_programs::wasi::sockets::network::{
5+
IpAddressFamily, IpSocketAddress, Ipv4SocketAddress, Ipv6SocketAddress, Network,
6+
};
7+
use test_programs::wasi::sockets::tcp::TcpSocket;
8+
9+
// Historically, `wasmtime-wasi` had a bug such that polling with a
10+
// zero-duration (i.e. always-ready) wait would starve the host executor and
11+
// prevent e.g. socket readiness from being delivered. Here we verify that such
12+
// starvation does not happen.
13+
fn test_tcp_busy_poll(family: IpAddressFamily, address: IpSocketAddress) {
14+
let zero_wait = monotonic_clock::subscribe_duration(0);
15+
16+
let net = Network::default();
17+
18+
let listener = TcpSocket::new(family).unwrap();
19+
listener.blocking_bind(&net, address).unwrap();
20+
listener.set_listen_backlog_size(32).unwrap();
21+
listener.blocking_listen().unwrap();
22+
23+
let address = listener.local_address().unwrap();
24+
25+
let message = b"Hello, world!";
26+
27+
for _ in 0..100 {
28+
let client = TcpSocket::new(family).unwrap();
29+
let (_rx, tx) = client.blocking_connect(&net, address).unwrap();
30+
tx.blocking_write_util(message).unwrap();
31+
32+
let (_accepted, rx, _tx) = listener.blocking_accept().unwrap();
33+
let rx_ready = rx.subscribe();
34+
let mut counter = 0;
35+
loop {
36+
if counter > 1_000_000 {
37+
panic!("socket still not ready!");
38+
}
39+
40+
let ready = poll::poll(&[&zero_wait, &rx_ready]);
41+
if ready.contains(&1) {
42+
break;
43+
}
44+
counter += 1;
45+
}
46+
47+
let data = rx.read(message.len().try_into().unwrap()).unwrap();
48+
assert_eq!(data, message); // Not guaranteed to work but should work in practice.
49+
}
50+
}
51+
52+
fn main() {
53+
test_tcp_busy_poll(
54+
IpAddressFamily::Ipv4,
55+
IpSocketAddress::Ipv4(Ipv4SocketAddress {
56+
port: 0,
57+
address: (127, 0, 0, 1),
58+
}),
59+
);
60+
61+
if supports_ipv6() {
62+
test_tcp_busy_poll(
63+
IpAddressFamily::Ipv6,
64+
IpSocketAddress::Ipv6(Ipv6SocketAddress {
65+
port: 0,
66+
address: (0, 0, 0, 0, 0, 0, 0, 1),
67+
flow_info: 0,
68+
scope_id: 0,
69+
}),
70+
);
71+
}
72+
}
Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
use futures::join;
2+
use std::ptr;
3+
use test_programs::async_::{
4+
BLOCKED, EVENT_NONE, EVENT_STREAM_READ, thread_yield, waitable_join, waitable_set_drop,
5+
waitable_set_new, waitable_set_poll,
6+
};
7+
use test_programs::p3::wasi::sockets::types::{
8+
IpAddressFamily, IpSocketAddress, Ipv4SocketAddress, Ipv6SocketAddress, TcpSocket,
9+
};
10+
use test_programs::p3::wit_stream;
11+
use test_programs::sockets::supports_ipv6;
12+
use wit_bindgen::StreamResult;
13+
14+
#[link(wasm_import_module = "wasi:http/types@0.3.0-rc-2026-03-15")]
15+
unsafe extern "C" {
16+
#[link_name = "[async-lower][stream-read-0][static]request.new"]
17+
fn stream_read(_: u32, _: *mut u8, _: usize) -> u32;
18+
}
19+
20+
// Historically, `wasmtime-wasi` had a bug such that polling would starve the
21+
// host executor and prevent e.g. socket readiness from being delivered. Here
22+
// we verify that such starvation does not happen.
23+
async fn test_tcp_busy_poll(family: IpAddressFamily, address: IpSocketAddress) {
24+
let listener = TcpSocket::create(family).unwrap();
25+
listener.bind(address).unwrap();
26+
listener.set_listen_backlog_size(32).unwrap();
27+
let mut accept = listener.listen().unwrap();
28+
29+
let address = listener.get_local_address().unwrap();
30+
31+
let message = b"Hello, world!";
32+
33+
for _ in 0..100 {
34+
let client = TcpSocket::create(family).unwrap();
35+
client.connect(address).await.unwrap();
36+
let (mut data_tx, data_rx) = wit_stream::new();
37+
join!(
38+
async {
39+
client.send(data_rx).await.unwrap();
40+
},
41+
async {
42+
let remaining = data_tx.write_all(message.into()).await;
43+
assert!(remaining.is_empty());
44+
drop(data_tx);
45+
}
46+
);
47+
48+
let sock = accept.next().await.unwrap();
49+
let (mut data_rx, fut) = sock.receive();
50+
51+
unsafe {
52+
if (stream_read(data_rx.handle(), ptr::null_mut(), 0)) == BLOCKED {
53+
let set = waitable_set_new();
54+
waitable_join(data_rx.handle(), set);
55+
let mut counter = 0;
56+
loop {
57+
if counter > 1_000_000 {
58+
panic!("socket still not ready!");
59+
}
60+
61+
let (mut event, mut waitable, _) = waitable_set_poll(set);
62+
if event == EVENT_NONE {
63+
// `waitable-set.poll` does not yield, so we
64+
// call `thread.yield` to give the executor a
65+
// chance to run, then poll one more time.
66+
assert!(thread_yield() == 0);
67+
(event, waitable, _) = waitable_set_poll(set);
68+
}
69+
if event == EVENT_STREAM_READ && waitable == data_rx.handle() {
70+
waitable_join(data_rx.handle(), 0);
71+
waitable_set_drop(set);
72+
break;
73+
}
74+
counter += 1;
75+
}
76+
}
77+
}
78+
79+
let (result, data) = data_rx.read(Vec::with_capacity(100)).await;
80+
assert_eq!(result, StreamResult::Complete(message.len()));
81+
assert_eq!(data, message); // Not guaranteed to work but should work in practice.
82+
83+
let (result, data) = data_rx.read(Vec::with_capacity(1)).await;
84+
assert_eq!(result, StreamResult::Dropped);
85+
assert_eq!(data, []);
86+
87+
fut.await.unwrap();
88+
}
89+
}
90+
91+
struct Component;
92+
93+
test_programs::p3::export!(Component);
94+
95+
impl test_programs::p3::exports::wasi::cli::run::Guest for Component {
96+
async fn run() -> Result<(), ()> {
97+
test_tcp_busy_poll(
98+
IpAddressFamily::Ipv4,
99+
IpSocketAddress::Ipv4(Ipv4SocketAddress {
100+
port: 0,
101+
address: (127, 0, 0, 1),
102+
}),
103+
)
104+
.await;
105+
106+
if supports_ipv6() {
107+
test_tcp_busy_poll(
108+
IpAddressFamily::Ipv6,
109+
IpSocketAddress::Ipv6(Ipv6SocketAddress {
110+
port: 0,
111+
address: (0, 0, 0, 0, 0, 0, 0, 1),
112+
flow_info: 0,
113+
scope_id: 0,
114+
}),
115+
)
116+
.await;
117+
}
118+
119+
Ok(())
120+
}
121+
}
122+
123+
fn main() {}

crates/wasi/src/p2/host/clocks.rs

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,26 @@ enum Deadline {
124124
impl Pollable for Deadline {
125125
async fn ready(&mut self) {
126126
match self {
127-
Deadline::Past => {}
127+
Deadline::Past => {
128+
// It is important we yield to Tokio here; otherwise we risk
129+
// starving `mio` such that it is unable to signal readiness for
130+
// other pollables (e.g. TCP sockets) when the guest is polling
131+
// in a busy loop.
132+
//
133+
// This is somewhat of a hack to ensure that
134+
// `wasmtime-wasi-io`'s implementation of `wasi:io/poll` does
135+
// not starve `mio` when the guest calls `wasi:io/poll#poll` in
136+
// a busy loop with a zero timeout. It relies on the guest
137+
// using the most natural approach to making a non-blocking call
138+
// to `wasi:io/poll#poll`, which is to include a zero-duration
139+
// `monotonic_clock::subscribe_{instant,duration}` in the list
140+
// of pollables. That's what `wasi-libc`'s `poll(2)`
141+
// implementation does as of this writing, for example. There
142+
// are hypothetically other ways to generate a pollable that's
143+
// always immediately ready, which this hack doesn't cover, but
144+
// we consider this sufficient for now.
145+
tokio::task::yield_now().await
146+
}
128147
Deadline::Instant(instant) => tokio::time::sleep_until(*instant).await,
129148
Deadline::Never => std::future::pending().await,
130149
}

crates/wasi/tests/all/p2/async_.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -307,6 +307,10 @@ async fn p2_tcp_listen() {
307307
run(P2_TCP_LISTEN_COMPONENT, false).await.unwrap()
308308
}
309309
#[test_log::test(tokio::test(flavor = "multi_thread"))]
310+
async fn p2_tcp_busy_poll() {
311+
run(P2_TCP_BUSY_POLL_COMPONENT, false).await.unwrap()
312+
}
313+
#[test_log::test(tokio::test(flavor = "multi_thread"))]
310314
async fn p2_udp_sockopts() {
311315
run(P2_UDP_SOCKOPTS_COMPONENT, false).await.unwrap()
312316
}

crates/wasi/tests/all/p2/sync.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -293,6 +293,10 @@ fn p2_tcp_listen() {
293293
run(P2_TCP_LISTEN_COMPONENT, false).unwrap()
294294
}
295295
#[test_log::test]
296+
fn p2_tcp_busy_poll() {
297+
run(P2_TCP_BUSY_POLL_COMPONENT, false).unwrap()
298+
}
299+
#[test_log::test]
296300
fn p2_udp_sockopts() {
297301
run(P2_UDP_SOCKOPTS_COMPONENT, false).unwrap()
298302
}

crates/wasi/tests/all/p3/mod.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,11 @@ async fn p3_sockets_tcp_streams() -> wasmtime::Result<()> {
115115
run(P3_SOCKETS_TCP_STREAMS_COMPONENT).await
116116
}
117117

118+
#[test_log::test(tokio::test(flavor = "multi_thread"))]
119+
async fn p3_sockets_tcp_busy_poll() -> wasmtime::Result<()> {
120+
run(P3_SOCKETS_TCP_BUSY_POLL_COMPONENT).await
121+
}
122+
118123
#[test_log::test(tokio::test(flavor = "multi_thread"))]
119124
async fn p3_sockets_udp_bind() -> wasmtime::Result<()> {
120125
run(P3_SOCKETS_UDP_BIND_COMPONENT).await

crates/wasmtime/src/runtime/component/concurrent.rs

Lines changed: 53 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1196,8 +1196,12 @@ impl<T> StoreContextMut<'_, T> {
11961196

11971197
enum PollResult<R> {
11981198
Complete(R),
1199-
ProcessWork(Vec<WorkItem>),
1199+
ProcessWork {
1200+
ready: Vec<WorkItem>,
1201+
low_priority: bool,
1202+
},
12001203
}
1204+
12011205
let result = future::poll_fn(|cx| {
12021206
// First, poll the future we were passed as an argument and
12031207
// return immediately if it's ready.
@@ -1220,13 +1224,23 @@ impl<T> StoreContextMut<'_, T> {
12201224
Poll::Pending => Poll::Pending,
12211225
};
12221226

1223-
// Next, collect the next batch of work items to process, if any.
1224-
// This will be either all of the high-priority work items, or if
1225-
// there are none, a single low-priority work item.
1227+
// Next, collect the next batch of work items to process, if
1228+
// any. This will be either all of the high-priority work
1229+
// items, or if there are none, a single low-priority work item.
12261230
let state = reset.store.0.concurrent_state_mut();
1227-
let ready = state.collect_work_items_to_run();
1231+
let mut ready = mem::take(&mut state.high_priority);
1232+
let mut low_priority = false;
1233+
if ready.is_empty() {
1234+
if let Some(item) = state.low_priority.pop_back() {
1235+
ready.push(item);
1236+
low_priority = true;
1237+
}
1238+
}
12281239
if !ready.is_empty() {
1229-
return Poll::Ready(Ok(PollResult::ProcessWork(ready)));
1240+
return Poll::Ready(Ok(PollResult::ProcessWork {
1241+
ready,
1242+
low_priority,
1243+
}));
12301244
}
12311245

12321246
// Finally, if we have nothing else to do right now, determine what to do
@@ -1239,7 +1253,10 @@ impl<T> StoreContextMut<'_, T> {
12391253
// successfully, so we return now and continue
12401254
// the outer loop in case there is another one
12411255
// ready to complete.
1242-
Poll::Ready(Ok(PollResult::ProcessWork(Vec::new())))
1256+
Poll::Ready(Ok(PollResult::ProcessWork {
1257+
ready: Vec::new(),
1258+
low_priority: false,
1259+
}))
12431260
}
12441261
Poll::Ready(false) => {
12451262
// Poll the future we were passed one last time
@@ -1287,7 +1304,10 @@ impl<T> StoreContextMut<'_, T> {
12871304
PollResult::Complete(value) => break Ok(value),
12881305
// The future we were passed has not yet completed, so handle
12891306
// any work items and then loop again.
1290-
PollResult::ProcessWork(ready) => {
1307+
PollResult::ProcessWork {
1308+
ready,
1309+
low_priority,
1310+
} => {
12911311
struct Dispose<'a, T: 'static, I: Iterator<Item = WorkItem>> {
12921312
store: StoreContextMut<'a, T>,
12931313
ready: I,
@@ -1312,6 +1332,31 @@ impl<T> StoreContextMut<'_, T> {
13121332
ready: ready.into_iter(),
13131333
};
13141334

1335+
// If we're about to run a low-priority task, first yield to
1336+
// the executor. This ensures that it won't be starved of
1337+
// the ability to e.g. update the readiness of sockets,
1338+
// etc. which the guest may be using `thread.yield` along
1339+
// with `waitable-set.poll` to monitor in a CPU-heavy loop.
1340+
//
1341+
// This works for e.g. `thread.yield` and callbacks which
1342+
// return `CALLBACK_CODE_YIELD` because we queue a low
1343+
// priority item to resume the task (i.e. resume the thread
1344+
// or call the callback, respectively) just prior to
1345+
// suspending it. Indeed, as of this writing those are the
1346+
// _only_ situations we queue low-priority tasks.
1347+
// Therefore, we interpret the guest's request to yield as
1348+
// meaning "yield to other guest tasks _and_/_or_ host
1349+
// operations such as updating socket readiness", the latter
1350+
// being the async runtime's responsibility.
1351+
//
1352+
// In the future, if this ends up causing measurable
1353+
// performance issues, this could be optimized such that we
1354+
// only yield periodically (e.g. for batches of low priority
1355+
// items) and not for each and every idividual item.
1356+
if low_priority {
1357+
dispose.store.0.yield_now().await
1358+
}
1359+
13151360
while let Some(item) = dispose.ready.next() {
13161361
dispose
13171362
.store
@@ -4972,19 +5017,6 @@ impl ConcurrentState {
49725017
}
49735018
}
49745019

4975-
/// Collect the next set of work items to run. This will be either all
4976-
/// high-priority items, or a single low-priority item if there are no
4977-
/// high-priority items.
4978-
fn collect_work_items_to_run(&mut self) -> Vec<WorkItem> {
4979-
let mut ready = mem::take(&mut self.high_priority);
4980-
if ready.is_empty() {
4981-
if let Some(item) = self.low_priority.pop_back() {
4982-
ready.push(item);
4983-
}
4984-
}
4985-
ready
4986-
}
4987-
49885020
fn push<V: Send + Sync + 'static>(
49895021
&mut self,
49905022
value: V,

0 commit comments

Comments
 (0)