Skip to content

Commit d54924a

Browse files
authored
fix Pollable impl for OutgoingDatagramStream (#12629)
* fix `Pollable` impl for `OutgoingDatagramStream` Previously, `OutgoingDatagramStream` used its `send_state` field to determine whether the socket was ready for writing without necessarily polling the `tokio::net::UdpSocket`. The underlying assumption was that a freshly-bound socket would be immediately ready for writing, but that's not true for `tokio`. `tokio` assumes a socket is not ready for _anything_ by default and relies on e.g. `epoll` to tell it otherwise. In practice, this meant the `Pollable::ready` impl for `OutgoingDatagramStream` would resolve immediately for a fresh socket, leaving the guest to race with `tokio`'s use of `epoll`. Usually, `tokio` won and all was well, but occasionally it lost and the `OutgoingDatagramStream::send` call would return `Ok(0)` (meaning "would block") leaving the guest confused since it was just told that the socket was ready for writing. This commit removes `SendState` entirely, relying exclusively on `tokio::net::UdpSocket::ready` for the `Pollable` and `check_send` implementations. As a side effect, we no longer attempt to prevent the guest from sending more datagrams than `check_send` indicated since the number returned by `check_send` is only a guess anyway, and `tokio` will push back if it needs to. For `p3`, the whole `check_send`/`send` pattern is gone, so we won't need to concern ourselves with it going forward. Fixes #12612 * address review feedback
1 parent 3621d3e commit d54924a

4 files changed

Lines changed: 76 additions & 59 deletions

File tree

crates/test-programs/src/bin/p2_udp_connect.rs

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use test_programs::wasi::sockets::network::{
22
ErrorCode, IpAddress, IpAddressFamily, IpSocketAddress, Network,
33
};
4-
use test_programs::wasi::sockets::udp::UdpSocket;
4+
use test_programs::wasi::sockets::udp::{OutgoingDatagram, UdpSocket};
55

66
const SOME_PORT: u16 = 47; // If the tests pass, this will never actually be connected to.
77

@@ -117,6 +117,30 @@ fn test_udp_connect_dual_stack(net: &Network) {
117117
));
118118
}
119119

120+
/// If `pollable.block` says a `OutgoingDatagramStream` is ready, it better be
121+
/// ready.
122+
fn test_udp_connect_and_send(net: &Network, family: IpAddressFamily) {
123+
let unspecified_port = IpSocketAddress::new(IpAddress::new_loopback(family), 0);
124+
let remote = IpSocketAddress::new(IpAddress::new_loopback(family), 4320);
125+
126+
let client = UdpSocket::new(family).unwrap();
127+
client.blocking_bind(&net, unspecified_port).unwrap();
128+
129+
let (_, tx) = client.stream(Some(remote)).unwrap();
130+
assert_eq!(client.remote_address(), Ok(remote));
131+
132+
tx.subscribe().block();
133+
134+
assert!(matches!(tx.check_send(), Ok(n) if n > 0));
135+
assert!(matches!(
136+
tx.send(&[OutgoingDatagram {
137+
data: b"hello".into(),
138+
remote_address: None
139+
}]),
140+
Ok(1)
141+
));
142+
}
143+
120144
fn main() {
121145
let net = Network::default();
122146

@@ -136,4 +160,11 @@ fn main() {
136160
test_udp_connect_without_bind(IpAddressFamily::Ipv6);
137161

138162
test_udp_connect_dual_stack(&net);
163+
164+
// `wasmtime-wasi`'s use of Tokio was once flaky such that these tests would
165+
// _usually_, but not always, pass, so we run them a bunch of times here:
166+
for _ in 0..256 {
167+
test_udp_connect_and_send(&net, IpAddressFamily::Ipv4);
168+
test_udp_connect_and_send(&net, IpAddressFamily::Ipv6);
169+
}
139170
}

crates/test-programs/src/bin/p3_sockets_udp_connect.rs

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,26 +166,45 @@ fn test_udp_connect_dual_stack() {
166166
));
167167
}
168168

169+
/// A UDP socket should be immediately writable
170+
async fn test_udp_connect_and_send(family: IpAddressFamily) {
171+
let unspecified_port = IpSocketAddress::new(IpAddress::new_loopback(family), 0);
172+
let remote = IpSocketAddress::new(IpAddress::new_loopback(family), 4320);
173+
174+
let client = UdpSocket::create(family).unwrap();
175+
client.bind(unspecified_port).unwrap();
176+
177+
client.connect(remote).unwrap();
178+
assert_eq!(client.get_remote_address(), Ok(remote));
179+
180+
assert_eq!(client.send(b"hello".into(), None).await, Ok(()));
181+
}
182+
169183
impl test_programs::p3::exports::wasi::cli::run::Guest for Component {
170184
async fn run() -> Result<(), ()> {
185+
let supports_ipv6 = supports_ipv6();
186+
171187
test_udp_connect_disconnect_reconnect(IpAddressFamily::Ipv4);
172188
test_udp_connect_unspec(IpAddressFamily::Ipv4);
173189
test_udp_connect_local_address_change(IpAddressFamily::Ipv4);
174190
test_udp_connect_port_0(IpAddressFamily::Ipv4);
175191
test_udp_connect_wrong_family(IpAddressFamily::Ipv4);
176192
test_udp_connect_without_bind(IpAddressFamily::Ipv4);
177193
test_udp_connect_with_bind(IpAddressFamily::Ipv4);
194+
test_udp_connect_and_send(IpAddressFamily::Ipv4).await;
178195

179-
if supports_ipv6() {
196+
if supports_ipv6 {
180197
test_udp_connect_disconnect_reconnect(IpAddressFamily::Ipv6);
181198
test_udp_connect_unspec(IpAddressFamily::Ipv6);
182199
test_udp_connect_local_address_change(IpAddressFamily::Ipv6);
183200
test_udp_connect_port_0(IpAddressFamily::Ipv6);
184201
test_udp_connect_wrong_family(IpAddressFamily::Ipv6);
185202
test_udp_connect_without_bind(IpAddressFamily::Ipv6);
186203
test_udp_connect_with_bind(IpAddressFamily::Ipv6);
204+
test_udp_connect_and_send(IpAddressFamily::Ipv6).await;
187205
test_udp_connect_dual_stack();
188206
}
207+
189208
Ok(())
190209
}
191210
}

crates/wasi/src/p2/host/udp.rs

Lines changed: 24 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
11
use crate::p2::bindings::sockets::network::{ErrorCode, IpAddressFamily, IpSocketAddress, Network};
22
use crate::p2::bindings::sockets::udp;
3-
use crate::p2::udp::{IncomingDatagramStream, OutgoingDatagramStream, SendState};
3+
use crate::p2::udp::{IncomingDatagramStream, OutgoingDatagramStream};
44
use crate::p2::{Pollable, SocketError, SocketResult};
55
use crate::sockets::util::{is_valid_address_family, is_valid_remote_address};
66
use crate::sockets::{
77
MAX_UDP_DATAGRAM_SIZE, SocketAddrUse, SocketAddressFamily, UdpSocket, WasiSocketsCtxView,
88
};
99
use async_trait::async_trait;
1010
use std::net::SocketAddr;
11+
use std::pin::pin;
12+
use std::task::{Context, Poll, Waker};
1113
use tokio::io::Interest;
1214
use wasmtime::component::Resource;
1315
use wasmtime::format_err;
@@ -82,7 +84,6 @@ impl udp::HostUdpSocket for WasiSocketsCtxView<'_> {
8284
inner: socket.socket().clone(),
8385
remote_address,
8486
family: socket.address_family(),
85-
send_state: SendState::Idle,
8687
socket_addr_check: socket.socket_addr_check().cloned(),
8788
};
8889

@@ -252,9 +253,8 @@ impl udp::HostIncomingDatagramStream for WasiSocketsCtxView<'_> {
252253
#[async_trait]
253254
impl Pollable for IncomingDatagramStream {
254255
async fn ready(&mut self) {
255-
// FIXME: Add `Interest::ERROR` when we update to tokio 1.32.
256256
self.inner
257-
.ready(Interest::READABLE)
257+
.ready(Interest::READABLE.add(Interest::ERROR))
258258
.await
259259
.expect("failed to await UDP socket readiness");
260260
}
@@ -264,17 +264,20 @@ impl udp::HostOutgoingDatagramStream for WasiSocketsCtxView<'_> {
264264
fn check_send(&mut self, this: Resource<udp::OutgoingDatagramStream>) -> SocketResult<u64> {
265265
let stream = self.table.get_mut(&this)?;
266266

267-
let permit = match stream.send_state {
268-
SendState::Idle => {
269-
const PERMIT: usize = 16;
270-
stream.send_state = SendState::Permitted(PERMIT);
271-
PERMIT
272-
}
273-
SendState::Permitted(n) => n,
274-
SendState::Waiting => 0,
275-
};
276-
277-
Ok(permit.try_into().unwrap())
267+
Ok(
268+
if let Poll::Ready(_) =
269+
pin!(stream.inner.ready(Interest::WRITABLE.add(Interest::ERROR)))
270+
.poll(&mut Context::from_waker(Waker::noop()))
271+
{
272+
// We don't know how many Tokio will accept, so we make up a
273+
// reasonable number here. If we're wrong and `send` returns
274+
// `Ok(0)`, the guest will just have to deal with that, e.g. by
275+
// looping or returning `EWOULDBLOCK`.
276+
16
277+
} else {
278+
0
279+
},
280+
)
278281
}
279282

280283
async fn send(
@@ -324,22 +327,6 @@ impl udp::HostOutgoingDatagramStream for WasiSocketsCtxView<'_> {
324327

325328
let stream = self.table.get_mut(&this)?;
326329

327-
match stream.send_state {
328-
SendState::Permitted(n) if n >= datagrams.len() => {
329-
stream.send_state = SendState::Idle;
330-
}
331-
SendState::Permitted(_) => {
332-
return Err(SocketError::trap(wasmtime::format_err!(
333-
"unpermitted: argument exceeds permitted size"
334-
)));
335-
}
336-
SendState::Idle | SendState::Waiting => {
337-
return Err(SocketError::trap(wasmtime::format_err!(
338-
"unpermitted: must call check-send first"
339-
)));
340-
}
341-
}
342-
343330
if datagrams.is_empty() {
344331
return Ok(0);
345332
}
@@ -354,8 +341,8 @@ impl udp::HostOutgoingDatagramStream for WasiSocketsCtxView<'_> {
354341
return Ok(count);
355342
}
356343
Err(e) if matches!(e.downcast_ref(), Some(ErrorCode::WouldBlock)) => {
357-
stream.send_state = SendState::Waiting;
358-
return Ok(count);
344+
debug_assert!(count == 0);
345+
return Ok(0);
359346
}
360347
Err(e) => {
361348
return Err(e);
@@ -386,17 +373,10 @@ impl udp::HostOutgoingDatagramStream for WasiSocketsCtxView<'_> {
386373
#[async_trait]
387374
impl Pollable for OutgoingDatagramStream {
388375
async fn ready(&mut self) {
389-
match self.send_state {
390-
SendState::Idle | SendState::Permitted(_) => {}
391-
SendState::Waiting => {
392-
// FIXME: Add `Interest::ERROR` when we update to tokio 1.32.
393-
self.inner
394-
.ready(Interest::WRITABLE)
395-
.await
396-
.expect("failed to await UDP socket readiness");
397-
self.send_state = SendState::Idle;
398-
}
399-
}
376+
self.inner
377+
.ready(Interest::WRITABLE.add(Interest::ERROR))
378+
.await
379+
.expect("failed to await UDP socket readiness");
400380
}
401381
}
402382

crates/wasi/src/p2/udp.rs

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,19 +18,6 @@ pub struct OutgoingDatagramStream {
1818
/// Socket address family.
1919
pub(crate) family: SocketAddressFamily,
2020

21-
pub(crate) send_state: SendState,
22-
2321
/// The check of allowed addresses
2422
pub(crate) socket_addr_check: Option<SocketAddrCheck>,
2523
}
26-
27-
pub(crate) enum SendState {
28-
/// Waiting for the API consumer to call `check-send`.
29-
Idle,
30-
31-
/// Ready to send up to x datagrams.
32-
Permitted(usize),
33-
34-
/// Waiting for the OS.
35-
Waiting,
36-
}

0 commit comments

Comments
 (0)