Skip to content

Commit 1ee1020

Browse files
authored
Fix state of futures/streams after cancellation (#12881)
* Fix state of futures/streams after cancellation This fixes two related but distinct issues with respect to delivering events to stream/future handles. First when delivering an event to a future or a stream the shared code is now more unified into one path. This fixes an issue with futures where they would always have `done` flagged as `false` accidentally. This then fixes an additional issue where this `on_delivery` function wasn't invoked when futures/streams had their operations cancelled. * Fix CI
1 parent f820750 commit 1ee1020

2 files changed

Lines changed: 290 additions & 45 deletions

File tree

crates/wasmtime/src/runtime/component/concurrent/futures_and_streams.rs

Lines changed: 39 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -3901,13 +3901,13 @@ impl Instance {
39013901
transmit.read,
39023902
transmit.write
39033903
);
3904+
let waitable = Waitable::Transmit(transmit.write_handle);
39043905

3905-
let code = if let Some(event) =
3906-
Waitable::Transmit(transmit.write_handle).take_event(state)?
3907-
{
3906+
let code = if let Some(event) = waitable.take_event(state)? {
39083907
let (Event::FutureWrite { code, .. } | Event::StreamWrite { code, .. }) = event else {
39093908
bail_bug!("expected either a stream or future write event")
39103909
};
3910+
waitable.on_delivery(store, self, event)?;
39113911
match (code, event) {
39123912
(ReturnCode::Completed(count), Event::StreamWrite { .. }) => {
39133913
ReturnCode::Cancelled(count)
@@ -3989,12 +3989,12 @@ impl Instance {
39893989
transmit.write
39903990
);
39913991

3992-
let code = if let Some(event) =
3993-
Waitable::Transmit(transmit.read_handle).take_event(state)?
3994-
{
3992+
let waitable = Waitable::Transmit(transmit.read_handle);
3993+
let code = if let Some(event) = waitable.take_event(state)? {
39953994
let (Event::FutureRead { code, .. } | Event::StreamRead { code, .. }) = event else {
39963995
bail_bug!("expected either a stream or future read event")
39973996
};
3997+
waitable.on_delivery(store, self, event)?;
39983998
match (code, event) {
39993999
(ReturnCode::Completed(count), Event::StreamRead { .. }) => {
40004000
ReturnCode::Cancelled(count)
@@ -4673,31 +4673,21 @@ impl Waitable {
46734673
instance: Instance,
46744674
event: Event,
46754675
) -> Result<()> {
4676-
match event {
4676+
let instance = instance.id().get_mut(store);
4677+
let (rep, state, code) = match event {
46774678
Event::FutureRead {
46784679
pending: Some((ty, handle)),
4679-
..
4680+
code,
46804681
}
46814682
| Event::FutureWrite {
46824683
pending: Some((ty, handle)),
4683-
..
4684+
code,
46844685
} => {
4685-
let instance = instance.id().get_mut(store);
46864686
let runtime_instance = instance.component().types()[ty].instance;
46874687
let (rep, state) = instance.instance_states().0[runtime_instance]
46884688
.handle_table()
46894689
.future_rep(ty, handle)?;
4690-
if rep != self.rep() {
4691-
bail_bug!("unexpected rep mismatch");
4692-
}
4693-
if *state != TransmitLocalState::Busy {
4694-
bail_bug!("expected state to be busy");
4695-
}
4696-
*state = match event {
4697-
Event::FutureRead { .. } => TransmitLocalState::Read { done: false },
4698-
Event::FutureWrite { .. } => TransmitLocalState::Write { done: false },
4699-
_ => bail_bug!("unexpected event for future"),
4700-
};
4690+
(rep, state, code)
47014691
}
47024692
Event::StreamRead {
47034693
pending: Some((ty, handle)),
@@ -4707,37 +4697,41 @@ impl Waitable {
47074697
pending: Some((ty, handle)),
47084698
code,
47094699
} => {
4710-
let instance = instance.id().get_mut(store);
47114700
let runtime_instance = instance.component().types()[ty].instance;
47124701
let (rep, state) = instance.instance_states().0[runtime_instance]
47134702
.handle_table()
47144703
.stream_rep(ty, handle)?;
4715-
if rep != self.rep() {
4716-
bail_bug!("unexpected rep mismatch");
4717-
}
4718-
if *state != TransmitLocalState::Busy {
4719-
bail_bug!("expected state to be busy");
4720-
}
4721-
let done = matches!(code, ReturnCode::Dropped(_));
4722-
*state = match event {
4723-
Event::StreamRead { .. } => TransmitLocalState::Read { done },
4724-
Event::StreamWrite { .. } => TransmitLocalState::Write { done },
4725-
_ => bail_bug!("unexpected event for stream"),
4726-
};
4704+
(rep, state, code)
4705+
}
4706+
_ => return Ok(()),
4707+
};
4708+
if rep != self.rep() {
4709+
bail_bug!("unexpected rep mismatch");
4710+
}
4711+
if *state != TransmitLocalState::Busy {
4712+
bail_bug!("expected state to be busy");
4713+
}
4714+
let done = matches!(code, ReturnCode::Dropped(_));
4715+
*state = match event {
4716+
Event::FutureRead { .. } | Event::StreamRead { .. } => {
4717+
TransmitLocalState::Read { done }
4718+
}
4719+
Event::FutureWrite { .. } | Event::StreamWrite { .. } => {
4720+
TransmitLocalState::Write { done }
4721+
}
4722+
_ => bail_bug!("unexpected event for stream"),
4723+
};
47274724

4728-
let transmit_handle = TableId::<TransmitHandle>::new(rep);
4729-
let state = store.concurrent_state_mut();
4730-
let transmit_id = state.get_mut(transmit_handle)?.state;
4731-
let transmit = state.get_mut(transmit_id)?;
4725+
let transmit_handle = TableId::<TransmitHandle>::new(rep);
4726+
let state = store.concurrent_state_mut();
4727+
let transmit_id = state.get_mut(transmit_handle)?.state;
4728+
let transmit = state.get_mut(transmit_id)?;
47324729

4733-
match event {
4734-
Event::StreamRead { .. } => {
4735-
transmit.read = ReadState::Open;
4736-
}
4737-
Event::StreamWrite { .. } => transmit.write = WriteState::Open,
4738-
_ => bail_bug!("unexpected event for stream"),
4739-
};
4730+
match event {
4731+
Event::StreamRead { .. } => {
4732+
transmit.read = ReadState::Open;
47404733
}
4734+
Event::StreamWrite { .. } => transmit.write = WriteState::Open,
47414735
_ => {}
47424736
}
47434737
Ok(())
Lines changed: 251 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,251 @@
1+
;;! component_model_async = true
2+
3+
(component definition $A
4+
(core module $libc (memory (export "mem") 1))
5+
(core instance $libc (instantiate $libc))
6+
7+
(core module $m
8+
(import "" "mem" (memory 1))
9+
(import "" "stream.new" (func $stream.new (result i64)))
10+
(import "" "stream.read" (func $stream.read (param i32 i32 i32) (result i32)))
11+
(import "" "stream.write" (func $stream.write (param i32 i32 i32) (result i32)))
12+
(import "" "stream.cancel-read" (func $stream.cancel-read (param i32) (result i32)))
13+
(import "" "stream.cancel-write" (func $stream.cancel-write (param i32) (result i32)))
14+
(import "" "stream.drop-writable" (func $stream.drop-writable (param i32)))
15+
(import "" "stream.drop-readable" (func $stream.drop-readable (param i32)))
16+
(import "" "waitable-set.new" (func $waitable-set.new (result i32)))
17+
(import "" "waitable-set.wait" (func $waitable-set.wait (param i32 i32) (result i32)))
18+
(import "" "waitable-set.poll" (func $waitable-set.poll (param i32 i32) (result i32)))
19+
(import "" "waitable.join" (func $waitable.join (param i32) (param i32)))
20+
(import "" "waitable-set.drop" (func $waitable-set.drop (param i32)))
21+
(import "" "future.new" (func $future.new (result i64)))
22+
(import "" "future.read" (func $future.read (param i32 i32) (result i32)))
23+
(import "" "future.write" (func $future.write (param i32 i32) (result i32)))
24+
(import "" "future.cancel-read" (func $future.cancel-read (param i32) (result i32)))
25+
(import "" "future.cancel-write" (func $future.cancel-write (param i32) (result i32)))
26+
(import "" "future.drop-writable" (func $future.drop-writable (param i32)))
27+
(import "" "future.drop-readable" (func $future.drop-readable (param i32)))
28+
29+
(table 5 funcref)
30+
(elem (i32.const 0)
31+
func
32+
$witness-dropped-via-cancel-read
33+
$witness-dropped-via-cancel-write
34+
$witness-dropped-via-waitable-set.wait
35+
$witness-dropped-via-waitable-set.poll
36+
$witness-dropped-via-future.cancel-write
37+
)
38+
39+
(func $witness-dropped-via-cancel-read (param i32)
40+
(local $ret i32)
41+
(local.set $ret (call $stream.cancel-read (local.get 0)))
42+
(if (i32.ne (local.get $ret) (i32.const 0x01 (; DROPPED ;)))
43+
(then unreachable))
44+
)
45+
46+
(func $witness-dropped-via-cancel-write (param i32)
47+
(local $ret i32)
48+
(local.set $ret (call $stream.cancel-write (local.get 0)))
49+
(if (i32.ne (local.get $ret) (i32.const 0x01 (; DROPPED ;)))
50+
(then unreachable))
51+
)
52+
53+
(func $witness-dropped-via-waitable-set.wait (param i32)
54+
(local $ws i32)
55+
(local.set $ws (call $waitable-set.new))
56+
(call $waitable.join (local.get 0) (local.get $ws))
57+
(call $waitable-set.wait (local.get $ws) (i32.const 4))
58+
drop ;; ignore the event
59+
(call $waitable.join (local.get 0) (i32.const 0))
60+
(call $waitable-set.drop (local.get $ws))
61+
62+
(if (i32.ne (i32.load (i32.const 4)) (local.get 0))
63+
(then unreachable))
64+
(if (i32.ne (i32.load (i32.const 8)) (i32.const 0x01 (; DROPPED ;)))
65+
(then unreachable))
66+
)
67+
68+
(func $witness-dropped-via-waitable-set.poll (param i32)
69+
(local $ws i32)
70+
(local.set $ws (call $waitable-set.new))
71+
(call $waitable.join (local.get 0) (local.get $ws))
72+
(call $waitable-set.poll (local.get $ws) (i32.const 4))
73+
drop ;; ignore the event
74+
(call $waitable.join (local.get 0) (i32.const 0))
75+
(call $waitable-set.drop (local.get $ws))
76+
77+
(if (i32.ne (i32.load (i32.const 4)) (local.get 0))
78+
(then unreachable))
79+
(if (i32.ne (i32.load (i32.const 8)) (i32.const 0x01 (; DROPPED ;)))
80+
(then unreachable))
81+
)
82+
83+
(func $witness-dropped-via-future.cancel-write (param i32)
84+
(local $ret i32)
85+
(local.set $ret (call $future.cancel-write (local.get 0)))
86+
(if (i32.ne (local.get $ret) (i32.const 0x01 (; DROPPED ;)))
87+
(then unreachable))
88+
)
89+
90+
(func (export "test-cancel-read") (param i32)
91+
(local $ret64 i64)
92+
(local $reader i32)
93+
(local $writer i32)
94+
(local $ret i32)
95+
96+
;; Create a new stream
97+
(local.set $ret64 (call $stream.new))
98+
(local.set $reader (i32.wrap_i64 (local.get $ret64)))
99+
(local.set $writer (i32.wrap_i64 (i64.shr_u (local.get $ret64) (i64.const 32))))
100+
101+
;; Start a read which will block (no writer ready)
102+
(local.set $ret (call $stream.read (local.get $reader) (i32.const 16) (i32.const 100)))
103+
(if (i32.ne (local.get $ret) (i32.const -1 (; BLOCKED ;)))
104+
(then unreachable))
105+
106+
;; Drop the writer, which queues a DROPPED event for the pending read
107+
(call $stream.drop-writable (local.get $writer))
108+
109+
;; Receive the dropped event
110+
local.get $reader
111+
local.get 0
112+
call_indirect (param i32)
113+
114+
;; attempting to read again should fail
115+
(drop (call $stream.read (local.get $reader) (i32.const 16) (i32.const 100)))
116+
117+
unreachable
118+
)
119+
120+
(func (export "test-cancel-write") (param i32)
121+
(local $ret64 i64)
122+
(local $reader i32)
123+
(local $writer i32)
124+
(local $ret i32)
125+
126+
;; Create a new stream
127+
(local.set $ret64 (call $stream.new))
128+
(local.set $reader (i32.wrap_i64 (local.get $ret64)))
129+
(local.set $writer (i32.wrap_i64 (i64.shr_u (local.get $ret64) (i64.const 32))))
130+
131+
;; Write some data (will block since no reader is ready)
132+
(local.set $ret (call $stream.write (local.get $writer) (i32.const 0) (i32.const 1)))
133+
(if (i32.ne (local.get $ret) (i32.const -1 (; BLOCKED ;)))
134+
(then unreachable))
135+
136+
;; Drop the reader, which queues a DROPPED event for the pending write
137+
(call $stream.drop-readable (local.get $reader))
138+
139+
;; Receive the dropped event
140+
local.get $writer
141+
local.get 0
142+
call_indirect (param i32)
143+
144+
;; attempting to write again should fail
145+
(drop (call $stream.write (local.get $writer) (i32.const 0) (i32.const 1)))
146+
147+
unreachable
148+
)
149+
150+
(func (export "test-cancel-future-write") (param i32)
151+
(local $ret64 i64)
152+
(local $reader i32)
153+
(local $writer i32)
154+
(local $ret i32)
155+
156+
;; Create a new future
157+
(local.set $ret64 (call $future.new))
158+
(local.set $reader (i32.wrap_i64 (local.get $ret64)))
159+
(local.set $writer (i32.wrap_i64 (i64.shr_u (local.get $ret64) (i64.const 32))))
160+
161+
;; Write some data (will block since no reader is ready)
162+
(local.set $ret (call $future.write (local.get $writer) (i32.const 0)))
163+
(if (i32.ne (local.get $ret) (i32.const -1 (; BLOCKED ;)))
164+
(then unreachable))
165+
166+
;; Drop the reader, which queues a DROPPED event for the pending write
167+
(call $future.drop-readable (local.get $reader))
168+
169+
;; Receive the dropped event
170+
local.get $writer
171+
local.get 0
172+
call_indirect (param i32)
173+
174+
;; attempting to write again should fail
175+
(drop (call $future.write (local.get $writer) (i32.const 0)))
176+
177+
unreachable
178+
)
179+
)
180+
181+
(core func $waitable-set.new (canon waitable-set.new))
182+
(core func $waitable.join (canon waitable.join))
183+
(core func $waitable-set.wait (canon waitable-set.wait (memory $libc "mem")))
184+
(core func $waitable-set.poll (canon waitable-set.poll (memory $libc "mem")))
185+
(core func $waitable-set.drop (canon waitable-set.drop))
186+
(type $s (stream u8))
187+
(core func $stream.new (canon stream.new $s))
188+
(core func $stream.read (canon stream.read $s async (memory $libc "mem")))
189+
(core func $stream.write (canon stream.write $s async (memory $libc "mem")))
190+
(core func $stream.cancel-read (canon stream.cancel-read $s))
191+
(core func $stream.cancel-write (canon stream.cancel-write $s))
192+
(core func $stream.drop-readable (canon stream.drop-readable $s))
193+
(core func $stream.drop-writable (canon stream.drop-writable $s))
194+
(type $f (future u8))
195+
(core func $future.new (canon future.new $f))
196+
(core func $future.read (canon future.read $f async (memory $libc "mem")))
197+
(core func $future.write (canon future.write $f async (memory $libc "mem")))
198+
(core func $future.cancel-read (canon future.cancel-read $f))
199+
(core func $future.cancel-write (canon future.cancel-write $f))
200+
(core func $future.drop-readable (canon future.drop-readable $f))
201+
(core func $future.drop-writable (canon future.drop-writable $f))
202+
203+
(core instance $i (instantiate $m (with "" (instance
204+
(export "mem" (memory $libc "mem"))
205+
(export "stream.new" (func $stream.new))
206+
(export "stream.read" (func $stream.read))
207+
(export "stream.write" (func $stream.write))
208+
(export "stream.cancel-read" (func $stream.cancel-read))
209+
(export "stream.cancel-write" (func $stream.cancel-write))
210+
(export "stream.drop-writable" (func $stream.drop-writable))
211+
(export "stream.drop-readable" (func $stream.drop-readable))
212+
(export "waitable-set.new" (func $waitable-set.new))
213+
(export "waitable.join" (func $waitable.join))
214+
(export "waitable-set.wait" (func $waitable-set.wait))
215+
(export "waitable-set.poll" (func $waitable-set.poll))
216+
(export "waitable-set.drop" (func $waitable-set.drop))
217+
(export "future.new" (func $future.new))
218+
(export "future.read" (func $future.read))
219+
(export "future.write" (func $future.write))
220+
(export "future.cancel-read" (func $future.cancel-read))
221+
(export "future.cancel-write" (func $future.cancel-write))
222+
(export "future.drop-writable" (func $future.drop-writable))
223+
(export "future.drop-readable" (func $future.drop-readable))
224+
))))
225+
226+
(func (export "test-cancel-read") async (param "x" u32) (canon lift (core func $i "test-cancel-read")))
227+
(func (export "test-cancel-write") async (param "x" u32) (canon lift (core func $i "test-cancel-write")))
228+
(func (export "test-cancel-future-write") async (param "x" u32) (canon lift (core func $i "test-cancel-future-write")))
229+
)
230+
231+
(component instance $A $A)
232+
(assert_trap (invoke "test-cancel-read" (u32.const 0)) "cannot read from stream after being notified that the writable end dropped")
233+
(component instance $A $A)
234+
(assert_trap (invoke "test-cancel-read" (u32.const 2)) "cannot read from stream after being notified that the writable end dropped")
235+
(component instance $A $A)
236+
(assert_trap (invoke "test-cancel-read" (u32.const 3)) "cannot read from stream after being notified that the writable end dropped")
237+
238+
(component instance $A $A)
239+
(assert_trap (invoke "test-cancel-write" (u32.const 1)) "cannot write to stream after being notified that the readable end dropped")
240+
(component instance $A $A)
241+
(assert_trap (invoke "test-cancel-write" (u32.const 2)) "cannot write to stream after being notified that the readable end dropped")
242+
(component instance $A $A)
243+
(assert_trap (invoke "test-cancel-write" (u32.const 3)) "cannot write to stream after being notified that the readable end dropped")
244+
245+
246+
(component instance $A $A)
247+
(assert_trap (invoke "test-cancel-future-write" (u32.const 2)) "cannot write to stream after being notified that the readable end dropped")
248+
(component instance $A $A)
249+
(assert_trap (invoke "test-cancel-future-write" (u32.const 3)) "cannot write to stream after being notified that the readable end dropped")
250+
(component instance $A $A)
251+
(assert_trap (invoke "test-cancel-future-write" (u32.const 4)) "cannot write to stream after being notified that the readable end dropped")

0 commit comments

Comments
 (0)