Skip to content

Commit df64d9d

Browse files
authored
cm-async: Fix a zero-length write rendezvousing with a zero-length read (#13100)
Previously the state of the reader would end up getting corrupted and nothing would be able to make progress. Now the reader is updated back to its original state after the zero-length read/write is complete.
1 parent 4090d1b commit df64d9d

File tree

2 files changed

+208
-1
lines changed

2 files changed

+208
-1
lines changed

crates/wasmtime/src/runtime/component/concurrent/futures_and_streams.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3641,7 +3641,13 @@ impl Instance {
36413641
concurrent_state.send_read_result(read_ty, transmit_id, read_handle, code)?;
36423642
}
36433643

3644-
if read_buffer_remaining {
3644+
// If the reader still has buffer remaining, or if this was a
3645+
// zero-length rendezvous, then restore the state of the reader
3646+
// back to what it was when we found it. Note that for the
3647+
// zero-length rendezvous case this specifically won't execute
3648+
// the `read_complete` logic above, which is intentional, as the
3649+
// reader remains blocked.
3650+
if read_buffer_remaining || (count == 0 && read_count == 0) {
36453651
let transmit = concurrent_state.get_mut(transmit_id)?;
36463652
transmit.read = ReadState::GuestReady {
36473653
ty: read_ty,
Lines changed: 201 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,201 @@
1+
;;! component_model_async = true
2+
;;! reference_types = true
3+
4+
;; This test asserts some behavior with 0-length reads/writes rendezvous in the
5+
;; component model. Specifically when a writer meets a waiting reader it doesn't
6+
;; unblock the reader and the writer keeps going.
7+
(component
8+
(type $ST (stream))
9+
10+
(core module $libc (memory (export "m") 1))
11+
12+
(component $A
13+
(core module $m
14+
(import "libc" "m" (memory 1))
15+
(import "" "stream.read" (func $stream.read (param i32 i32 i32) (result i32)))
16+
(import "" "stream.drop-readable" (func $stream.drop-readable (param i32)))
17+
(import "" "waitable-set.new" (func $waitable-set.new (result i32)))
18+
(import "" "waitable.join" (func $waitable.join (param i32 i32)))
19+
(import "" "waitable-set.drop" (func $waitable-set.drop (param i32)))
20+
(import "" "waitable-set.wait" (func $waitable-set.wait (param i32 i32) (result i32)))
21+
22+
(func (export "read") (param $r i32)
23+
(local $t32 i32)
24+
(local $ws i32)
25+
26+
;; Start a zero-length read on this stream, it should be blocked.
27+
(local.set $t32 (call $stream.read (local.get $r) (i32.const 100) (i32.const 0)))
28+
(if (i32.ne (local.get $t32) (i32.const -1 (; BLOCKED ;)))
29+
(then unreachable))
30+
31+
;; Wait for the zero-length read to complete, and assert the results of
32+
;; completion.
33+
(local.set $ws (call $waitable-set.new))
34+
(call $waitable.join (local.get $r) (local.get $ws))
35+
(local.set $t32 (call $waitable-set.wait (local.get $ws) (i32.const 0)))
36+
(if (i32.ne (local.get $t32) (i32.const 2 (; EVENT_STREAM_READ ;)))
37+
(then unreachable))
38+
(if (i32.ne (i32.load (i32.const 0)) (local.get $r))
39+
(then unreachable))
40+
(if (i32.ne (i32.load (i32.const 4)) (i32.const 0 (; (0<<4) | COMPLETED ;) ))
41+
(then unreachable))
42+
(call $waitable.join (local.get $r) (i32.const 0))
43+
44+
;; Perform a nonzero-length-read (of 2) and assert that one item is here
45+
;; immediately because that's what the writer gave us below.
46+
(local.set $t32 (call $stream.read (local.get $r) (i32.const 100) (i32.const 2)))
47+
(if (i32.ne (local.get $t32) (i32.const 0x10 (; (1<<4) | COMPLETED ;)))
48+
(then unreachable))
49+
50+
;; clean up
51+
(call $stream.drop-readable (local.get $r))
52+
(call $waitable-set.drop (local.get $ws))
53+
)
54+
)
55+
56+
(core func $stream.read (canon stream.read $ST async))
57+
(core func $stream.drop-readable (canon stream.drop-readable $ST))
58+
(core func $waitable-set.new (canon waitable-set.new))
59+
(core func $waitable.join (canon waitable.join))
60+
(core func $waitable-set.drop (canon waitable-set.drop))
61+
62+
(core instance $libc (instantiate $libc))
63+
(core func $waitable-set.wait (canon waitable-set.wait (memory $libc "m")))
64+
65+
(core instance $i (instantiate $m
66+
(with "libc" (instance $libc))
67+
(with "" (instance
68+
(export "stream.read" (func $stream.read))
69+
(export "stream.drop-readable" (func $stream.drop-readable))
70+
(export "waitable-set.new" (func $waitable-set.new))
71+
(export "waitable.join" (func $waitable.join))
72+
(export "waitable-set.wait" (func $waitable-set.wait))
73+
(export "waitable-set.drop" (func $waitable-set.drop))
74+
))
75+
))
76+
77+
(func (export "read") async (param "x" $ST) (canon lift (core func $i "read")))
78+
79+
)
80+
(instance $a (instantiate $A))
81+
82+
(component $B
83+
(import "a" (instance $a
84+
(export "read" (func async (param "x" $ST)))
85+
))
86+
87+
(core module $m
88+
(import "libc" "m" (memory 1))
89+
(import "" "stream.new" (func $stream.new (result i64)))
90+
(import "" "stream.write" (func $stream.write (param i32 i32 i32) (result i32)))
91+
(import "" "read" (func $read (param i32) (result i32)))
92+
(import "" "waitable-set.new" (func $waitable-set.new (result i32)))
93+
(import "" "waitable.join" (func $waitable.join (param i32 i32)))
94+
(import "" "waitable-set.drop" (func $waitable-set.drop (param i32)))
95+
(import "" "waitable-set.wait" (func $waitable-set.wait (param i32 i32) (result i32)))
96+
(import "" "subtask.drop" (func $subtask.drop (param i32)))
97+
(import "" "stream.drop-writable" (func $stream.drop-writable (param i32)))
98+
99+
(func (export "run")
100+
(local $t64 i64)
101+
(local $t32 i32)
102+
(local $r i32)
103+
(local $w i32)
104+
(local $ws i32)
105+
(local $subtask i32)
106+
107+
;; Create a stream and split its halves.
108+
(local.set $t64 (call $stream.new))
109+
(local.set $r (i32.wrap_i64 (local.get $t64)))
110+
(local.set $w (i32.wrap_i64 (i64.shr_u (local.get $t64) (i64.const 32))))
111+
112+
;; Start the subtask in the above component, and it shouldn't be done
113+
;; yet.
114+
(local.set $t32 (call $read (local.get $r)))
115+
(if (i32.ne (i32.and (local.get $t32) (i32.const 0xf)) (i32.const 1 (; STARTED ;)))
116+
(then unreachable))
117+
(local.set $subtask (i32.shr_u (local.get $t32) (i32.const 4)))
118+
119+
;; write of 0 values should be immediately ready
120+
(local.set $t32 (call $stream.write (local.get $w) (i32.const 100) (i32.const 0)))
121+
(if (i32.ne (local.get $t32) (i32.const 0 (; (0<<4) | COMPLETED ;)))
122+
(then unreachable))
123+
124+
;; write again with 0 values and it should be immediately ready
125+
(local.set $t32 (call $stream.write (local.get $w) (i32.const 100) (i32.const 0)))
126+
(if (i32.ne (local.get $t32) (i32.const 0 (; (0<<4) | COMPLETED ;)))
127+
(then unreachable))
128+
129+
;; write with a nonzero number of values should be blocked since this'll
130+
;; wake up the reader later on.
131+
(local.set $t32 (call $stream.write (local.get $w) (i32.const 100) (i32.const 1)))
132+
(if (i32.ne (local.get $t32) (i32.const -1 (; BLOCKED ;)))
133+
(then unreachable))
134+
135+
(local.set $ws (call $waitable-set.new))
136+
137+
;; Wait for the subtask to finish now that we've issue the write. Assert
138+
;; the results of the wait as well.
139+
(call $waitable.join (local.get $subtask) (local.get $ws))
140+
(local.set $t32 (call $waitable-set.wait (local.get $ws) (i32.const 0)))
141+
(if (i32.ne (local.get $t32) (i32.const 1 (; EVENT_SUBTASK ;)))
142+
(then unreachable))
143+
(if (i32.ne (i32.load (i32.const 0)) (local.get $subtask))
144+
(then unreachable))
145+
(if (i32.ne (i32.load (i32.const 4)) (i32.const 0x2 (; RETURNED ;) ))
146+
(then unreachable))
147+
(call $waitable.join (local.get $subtask) (i32.const 0))
148+
149+
;; Also wait on the pending write to complete.
150+
(call $waitable.join (local.get $w) (local.get $ws))
151+
(local.set $t32 (call $waitable-set.wait (local.get $ws) (i32.const 0)))
152+
(if (i32.ne (local.get $t32) (i32.const 3 (; EVENT_STREAM_WRITE ;)))
153+
(then unreachable))
154+
(if (i32.ne (i32.load (i32.const 0)) (local.get $w))
155+
(then unreachable))
156+
(if (i32.ne (i32.load (i32.const 4)) (i32.const 0x11 (; (1<<4) | DROPPED ;) ))
157+
(then unreachable))
158+
(call $waitable.join (local.get $w) (i32.const 0))
159+
160+
;; clean up
161+
(call $subtask.drop (local.get $subtask))
162+
(call $stream.drop-writable (local.get $w))
163+
(call $waitable-set.drop (local.get $ws))
164+
)
165+
)
166+
167+
(core func $stream.new (canon stream.new $ST))
168+
(core func $stream.write (canon stream.write $ST async))
169+
(core func $stream.drop-writable (canon stream.drop-writable $ST))
170+
(core func $read (canon lower (func $a "read") async))
171+
(core func $waitable-set.new (canon waitable-set.new))
172+
(core func $waitable.join (canon waitable.join))
173+
(core func $waitable-set.drop (canon waitable-set.drop))
174+
(core func $subtask.drop (canon subtask.drop))
175+
176+
(core instance $libc (instantiate $libc))
177+
(core func $waitable-set.wait (canon waitable-set.wait (memory $libc "m")))
178+
179+
(core instance $i (instantiate $m
180+
(with "libc" (instance $libc))
181+
(with "" (instance
182+
(export "stream.new" (func $stream.new))
183+
(export "stream.write" (func $stream.write))
184+
(export "read" (func $read))
185+
(export "waitable-set.new" (func $waitable-set.new))
186+
(export "waitable.join" (func $waitable.join))
187+
(export "waitable-set.wait" (func $waitable-set.wait))
188+
(export "waitable-set.drop" (func $waitable-set.drop))
189+
(export "subtask.drop" (func $subtask.drop))
190+
(export "stream.drop-writable" (func $stream.drop-writable))
191+
))
192+
))
193+
194+
(func (export "run") async (canon lift (core func $i "run")))
195+
)
196+
(instance $b (instantiate $B (with "a" (instance $a))))
197+
198+
(export "run" (func $b "run"))
199+
)
200+
201+
(assert_return (invoke "run"))

0 commit comments

Comments
 (0)