Skip to content

Commit 2ab5f54

Browse files
authored
Fix a copy/paste typo in StreamAny (#12871)
When closing, close a stream, not a future. Fixes the test added here as well.
1 parent bc6585e commit 2ab5f54

2 files changed

Lines changed: 106 additions & 1 deletion

File tree

crates/wasmtime/src/runtime/component/concurrent/future_stream_any.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -304,7 +304,7 @@ impl StreamAny {
304304
///
305305
/// Panics if the `store` does not own this stream.
306306
pub fn close(&mut self, mut store: impl AsContextMut) -> Result<()> {
307-
futures_and_streams::future_close(store.as_context_mut().0, &mut self.id)
307+
futures_and_streams::stream_close(store.as_context_mut().0, &mut self.id)
308308
}
309309
}
310310

tests/all/component_model/async_dynamic.rs

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,3 +178,108 @@ fn simple_type_assertions() -> Result<()> {
178178

179179
Ok(())
180180
}
181+
182+
#[tokio::test]
183+
#[cfg_attr(miri, ignore)]
184+
async fn stream_any_smoke() -> Result<()> {
185+
let mut config = Config::new();
186+
config.wasm_component_model_async(true);
187+
let engine = Engine::new(&config)?;
188+
let mut store = Store::new(&engine, ());
189+
let component = Component::new(
190+
&engine,
191+
r#"
192+
(component
193+
(type $s (stream u8))
194+
195+
(core module $libc (memory (export "mem") 1))
196+
(core instance $libc (instantiate $libc))
197+
198+
(core module $m
199+
(import "" "stream.new" (func $stream.new (result i64)))
200+
(import "" "stream.write" (func $stream.write (param i32 i32 i32) (result i32)))
201+
(import "" "task.return" (func $task.return))
202+
(import "" "waitable-set.new" (func $waitable-set.new (result i32)))
203+
(import "" "waitable.join" (func $waitable.join (param i32 i32)))
204+
(import "" "waitable-set.wait" (func $waitable-set.wait (param i32 i32) (result i32)))
205+
(import "" "waitable-set.drop" (func $waitable-set.drop (param i32)))
206+
(import "" "mem" (memory 1))
207+
208+
(global $w (mut i32) (i32.const 0))
209+
210+
(func (export "mk") (result i32)
211+
(local $r i32) (local $tmp i64)
212+
(local.set $tmp (call $stream.new))
213+
(local.set $r (i32.wrap_i64 (local.get $tmp)))
214+
(global.set $w (i32.wrap_i64 (i64.shr_u (local.get $tmp) (i64.const 32))))
215+
local.get $r
216+
)
217+
218+
(func (export "run") (result i32)
219+
(local $ws i32)
220+
(local.set $ws (call $waitable-set.new))
221+
(call $waitable.join (global.get $w) (local.get $ws))
222+
(call $waitable-set.wait (local.get $ws) (i32.const 0))
223+
i32.const 3 ;; EVENT_STREAM_WRITE
224+
i32.ne
225+
if unreachable end
226+
227+
(if (i32.ne (i32.load (i32.const 0)) (global.get $w))
228+
(then unreachable))
229+
(if (i32.ne (i32.load (i32.const 4)) (i32.const 1)) ;; DROPPED | (0 << 4)
230+
(then unreachable))
231+
232+
call $task.return
233+
234+
i32.const 0 ;; CALLBACK_CODE_EXIT
235+
)
236+
237+
(func (export "cb") (param i32 i32 i32) (result i32) unreachable)
238+
)
239+
(core func $stream.new (canon stream.new $s))
240+
(core func $stream.write (canon stream.write $s (memory $libc "mem")))
241+
(core func $task.return (canon task.return))
242+
(core func $waitable-set.new (canon waitable-set.new))
243+
(core func $waitable.join (canon waitable.join))
244+
(core func $waitable-set.wait (canon waitable-set.wait (memory $libc "mem")))
245+
(core func $waitable-set.drop (canon waitable-set.drop))
246+
(core instance $i (instantiate $m
247+
(with "" (instance
248+
(export "stream.new" (func $stream.new))
249+
(export "stream.write" (func $stream.write))
250+
(export "task.return" (func $task.return))
251+
(export "waitable-set.new" (func $waitable-set.new))
252+
(export "waitable.join" (func $waitable.join))
253+
(export "waitable-set.wait" (func $waitable-set.wait))
254+
(export "waitable-set.drop" (func $waitable-set.drop))
255+
(export "mem" (memory $libc "mem"))
256+
))
257+
))
258+
(func (export "mk") (result (stream u8))
259+
(canon lift (core func $i "mk")))
260+
(func (export "run") async
261+
(canon lift (core func $i "run") async (callback (func $i "cb"))))
262+
)
263+
"#,
264+
)?;
265+
let instance = Linker::new(&engine).instantiate(&mut store, &component)?;
266+
let mk = instance.get_typed_func::<(), (StreamAny,)>(&mut store, "mk")?;
267+
let run = instance.get_typed_func::<(), ()>(&mut store, "run")?;
268+
store
269+
.run_concurrent(async |store| {
270+
let (mut stream,) = mk.call_concurrent(store, ()).await?;
271+
tokio::try_join! {
272+
async {
273+
run.call_concurrent(store, ()).await?;
274+
wasmtime::error::Ok(())
275+
},
276+
async {
277+
store.with(|store| stream.close(store))?;
278+
wasmtime::error::Ok(())
279+
}
280+
}?;
281+
wasmtime::error::Ok(())
282+
})
283+
.await??;
284+
Ok(())
285+
}

0 commit comments

Comments
 (0)