Skip to content

Commit 37c4942

Browse files
authored
Fix a panic in Bytes{,Mut} StreamProducer impls (#12878)
This commit fixes the logic of these `impl`s to match the `Vec`-style blocks to relinquish the entire buffer to Wasmtime immediately. This fixes an issue where `split_off` is called with too large a value which can panic.
1 parent 2ab5f54 commit 37c4942

3 files changed

Lines changed: 64 additions & 29 deletions

File tree

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ rustix = { workspace = true, features = ["mm", "process"] }
9696

9797
[dev-dependencies]
9898
# depend again on wasmtime to activate its default features for tests
99-
wasmtime = { workspace = true, features = ['default', 'anyhow', 'winch', 'pulley', 'all-arch', 'call-hook', 'memory-protection-keys', 'component-model-async'] }
99+
wasmtime = { workspace = true, features = ['default', 'anyhow', 'winch', 'pulley', 'all-arch', 'call-hook', 'memory-protection-keys', 'component-model-async', 'component-model-async-bytes'] }
100100
env_logger = { workspace = true }
101101
log = { workspace = true }
102102
filecheck = { workspace = true }

crates/wasmtime/src/runtime/component/concurrent/futures_and_streams.rs

Lines changed: 6 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -732,24 +732,13 @@ impl<D> StreamProducer<D> for bytes::Bytes {
732732
type Buffer = Cursor<Self>;
733733

734734
fn poll_produce<'a>(
735-
mut self: Pin<&mut Self>,
735+
self: Pin<&mut Self>,
736736
_: &mut Context<'_>,
737-
mut store: StoreContextMut<'a, D>,
737+
_store: StoreContextMut<'a, D>,
738738
mut dst: Destination<'a, Self::Item, Self::Buffer>,
739739
_: bool,
740740
) -> Poll<Result<StreamResult>> {
741-
let cap = dst.remaining(&mut store);
742-
let Some(cap) = cap.and_then(core::num::NonZeroUsize::new) else {
743-
// on 0-length or host reads, buffer the bytes
744-
dst.set_buffer(Cursor::new(mem::take(self.get_mut())));
745-
return Poll::Ready(Ok(StreamResult::Dropped));
746-
};
747-
let cap = cap.into();
748-
// data does not fit in destination, fill it and buffer the rest
749-
dst.set_buffer(Cursor::new(self.split_off(cap)));
750-
let mut dst = dst.as_direct(store, cap);
751-
dst.remaining().copy_from_slice(&self);
752-
dst.mark_written(cap);
741+
dst.set_buffer(Cursor::new(mem::take(self.get_mut())));
753742
Poll::Ready(Ok(StreamResult::Dropped))
754743
}
755744
}
@@ -760,24 +749,13 @@ impl<D> StreamProducer<D> for bytes::BytesMut {
760749
type Buffer = Cursor<Self>;
761750

762751
fn poll_produce<'a>(
763-
mut self: Pin<&mut Self>,
752+
self: Pin<&mut Self>,
764753
_: &mut Context<'_>,
765-
mut store: StoreContextMut<'a, D>,
754+
_store: StoreContextMut<'a, D>,
766755
mut dst: Destination<'a, Self::Item, Self::Buffer>,
767756
_: bool,
768757
) -> Poll<Result<StreamResult>> {
769-
let cap = dst.remaining(&mut store);
770-
let Some(cap) = cap.and_then(core::num::NonZeroUsize::new) else {
771-
// on 0-length or host reads, buffer the bytes
772-
dst.set_buffer(Cursor::new(mem::take(self.get_mut())));
773-
return Poll::Ready(Ok(StreamResult::Dropped));
774-
};
775-
let cap = cap.into();
776-
// data does not fit in destination, fill it and buffer the rest
777-
dst.set_buffer(Cursor::new(self.split_off(cap)));
778-
let mut dst = dst.as_direct(store, cap);
779-
dst.remaining().copy_from_slice(&self);
780-
dst.mark_written(cap);
758+
dst.set_buffer(Cursor::new(mem::take(self.get_mut())));
781759
Poll::Ready(Ok(StreamResult::Dropped))
782760
}
783761
}

tests/all/component_model/async.rs

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1267,3 +1267,60 @@ async fn concurrent_sync_calls_to_async_host() -> Result<()> {
12671267

12681268
Ok(())
12691269
}
1270+
1271+
#[tokio::test]
1272+
#[cfg_attr(miri, ignore)]
1273+
async fn bytes_stream_producer() -> Result<()> {
1274+
let mut config = Config::new();
1275+
config.wasm_component_model_async(true);
1276+
let engine = Engine::new(&config)?;
1277+
1278+
let component = Component::new(
1279+
&engine,
1280+
r#"
1281+
(component
1282+
(core module $libc (memory (export "mem") 1))
1283+
(core instance $libc (instantiate $libc))
1284+
(core module $m
1285+
(import "" "mem" (memory 1))
1286+
(import "" "stream.read" (func $stream.read (param i32 i32 i32) (result i32)))
1287+
1288+
(func (export "read") (param i32 i32) (result i32)
1289+
(call $stream.read (local.get 0) (i32.const 0) (local.get 1))
1290+
)
1291+
)
1292+
(type $s (stream u8))
1293+
(core func $stream.read (canon stream.read $s async (memory $libc "mem")))
1294+
(core instance $i (instantiate $m
1295+
(with "" (instance
1296+
(export "mem" (memory $libc "mem"))
1297+
(export "stream.read" (func $stream.read))
1298+
))
1299+
))
1300+
(func (export "read") (param "s" (stream u8)) (param "l" u32) (result u32)
1301+
(canon lift (core func $i "read")))
1302+
)
1303+
"#,
1304+
)?;
1305+
1306+
let linker = Linker::new(&engine);
1307+
let mut store = Store::new(&engine, ());
1308+
let instance = linker.instantiate_async(&mut store, &component).await?;
1309+
let func = instance.get_typed_func::<(StreamReader<u8>, u32), (u32,)>(&mut store, "read")?;
1310+
1311+
// read less than the capacity
1312+
let reader = StreamReader::new(&mut store, bytes::Bytes::from_static(b"hello"))?;
1313+
assert_eq!(
1314+
func.call_async(&mut store, (reader, 1)).await?,
1315+
((1 << 4) | 0,),
1316+
);
1317+
1318+
// read more than the capacity
1319+
let reader = StreamReader::new(&mut store, bytes::Bytes::from_static(b"hello"))?;
1320+
assert_eq!(
1321+
func.call_async(&mut store, (reader, 100)).await?,
1322+
((5 << 4) | 1,),
1323+
);
1324+
1325+
Ok(())
1326+
}

0 commit comments

Comments
 (0)