Skip to content

Commit 425a601

Browse files
authored
Fix a stacked borrows violation in futures/streams copy (#12872)
* Fix a stacked borrows violation in futures/streams copy This commit fixes an issue in the copy implementation for futures/streams related to component-model-async. Specifically the added tests here tripped an error in Miri which is related to intra-component copies and stacked borrows. This refactoring ends up using `copy_within` for the intra-component case and `copy_to_nonoverlapping` for the inter-component case to resolve this issue. This commit additionally refactors the validation/checks to happen in one location instead of in a few to ensure that all validation is done up-front in a shared manner. * Fix CI * Fix expected error message * Review comments
1 parent 9bc302a commit 425a601

6 files changed

Lines changed: 261 additions & 172 deletions

File tree

crates/wasi-http/tests/all/p3/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -322,7 +322,8 @@ async fn p3_http_middleware() -> Result<()> {
322322
async fn p3_http_middleware_host_to_host() {
323323
let error = format!("{:?}", test_http_middleware(true).await.unwrap_err());
324324

325-
let expected = "cannot read from and write to intra-component future with non-numeric payload";
325+
let expected =
326+
"cannot read from and write to intra-component future/stream with non-numeric payload";
326327

327328
assert!(
328329
error.contains(expected),

crates/wasmtime/src/runtime/component/concurrent/futures_and_streams.rs

Lines changed: 136 additions & 146 deletions
Original file line numberDiff line numberDiff line change
@@ -116,14 +116,20 @@ impl TransmitIndex {
116116
TransmitIndex::Future(_) => TransmitKind::Future,
117117
}
118118
}
119-
}
120119

121-
/// Retrieve the payload type of the specified stream or future, or `None` if it
122-
/// has no payload type.
123-
fn payload(ty: TransmitIndex, types: &ComponentTypes) -> Option<InterfaceType> {
124-
match ty {
125-
TransmitIndex::Future(ty) => types[types[ty].ty].payload,
126-
TransmitIndex::Stream(ty) => types[types[ty].ty].payload,
120+
/// Retrieve the payload type of the specified stream or future, or `None`
121+
/// if it has no payload type.
122+
fn payload<'a>(&self, types: &'a ComponentTypes) -> Option<&'a InterfaceType> {
123+
match self {
124+
TransmitIndex::Stream(i) => {
125+
let ty = types[*i].ty;
126+
types[ty].payload.as_ref()
127+
}
128+
TransmitIndex::Future(i) => {
129+
let ty = types[*i].ty;
130+
types[ty].payload.as_ref()
131+
}
132+
}
127133
}
128134
}
129135

@@ -177,8 +183,8 @@ fn lower<T: func::Lower + Send + 'static, B: WriteBuffer<T>, U: 'static>(
177183
.and_then(|b| b.get_mut(..T::SIZE32 * count))
178184
.ok_or_else(|| crate::format_err!("read pointer out of bounds of memory"))?;
179185

180-
if let Some(ty) = payload(ty, lower.types) {
181-
T::linear_store_list_to_memory(lower, ty, address, &buffer.remaining()[..count])?;
186+
if let Some(ty) = ty.payload(lower.types) {
187+
T::linear_store_list_to_memory(lower, *ty, address, &buffer.remaining()[..count])?;
182188
}
183189

184190
if let Some(old_thread) = old_thread {
@@ -805,11 +811,11 @@ impl<'a, T> Source<'a, T> {
805811
};
806812

807813
let cx = &mut LiftContext::new(store.0.store_opaque_mut(), options, instance);
808-
let ty = payload(ty, cx.types);
814+
let ty = ty.payload(cx.types);
809815
let old_remaining = buffer.remaining_capacity();
810816
lift::<T, B>(
811817
cx,
812-
ty,
818+
ty.copied(),
813819
buffer,
814820
address + (T::SIZE32 * guest_offset),
815821
count - guest_offset,
@@ -3173,7 +3179,7 @@ impl Instance {
31733179
/// specified stream or future.
31743180
fn copy<T: 'static>(
31753181
self,
3176-
mut store: StoreContextMut<T>,
3182+
store: StoreContextMut<T>,
31773183
flat_abi: Option<FlatAbi>,
31783184
write_caller_instance: RuntimeComponentInstanceIndex,
31793185
write_ty: TransmitIndex,
@@ -3187,146 +3193,146 @@ impl Instance {
31873193
count: usize,
31883194
rep: u32,
31893195
) -> Result<()> {
3190-
let types = self.id().get(store.0).component().types();
3196+
let (component, mut store) = self.component_and_store_mut(store.0);
3197+
let types = component.types();
3198+
3199+
// Validate `write_ty` w.r.t. `write_address` to ensure it's properly
3200+
// aligned and in-bounds.
3201+
let write_payload_ty = write_ty.payload(types);
3202+
let write_abi = match write_payload_ty {
3203+
Some(ty) => types.canonical_abi(ty),
3204+
None => &CanonicalAbiInfo::ZERO,
3205+
};
3206+
let write_length_in_bytes = match flat_abi {
3207+
Some(abi) => usize::try_from(abi.size)? * count,
3208+
None => usize::try_from(write_abi.size32)? * count,
3209+
};
3210+
if write_length_in_bytes > 0 {
3211+
if write_address % usize::try_from(write_abi.align32)? != 0 {
3212+
bail!("write pointer not aligned");
3213+
}
3214+
self.options_memory(store, write_options)
3215+
.get(write_address..)
3216+
.and_then(|b| b.get(..write_length_in_bytes))
3217+
.ok_or_else(|| crate::format_err!("write pointer out of bounds"))?;
3218+
}
3219+
3220+
let read_payload_ty = read_ty.payload(types);
3221+
let read_abi = match read_payload_ty {
3222+
Some(ty) => types.canonical_abi(ty),
3223+
None => &CanonicalAbiInfo::ZERO,
3224+
};
3225+
let read_length_in_bytes = match flat_abi {
3226+
Some(abi) => usize::try_from(abi.size)? * count,
3227+
None => usize::try_from(read_abi.size32)? * count,
3228+
};
3229+
if read_length_in_bytes > 0 {
3230+
if read_address % usize::try_from(read_abi.align32)? != 0 {
3231+
bail!("read pointer not aligned");
3232+
}
3233+
self.options_memory(store, read_options)
3234+
.get(read_address..)
3235+
.and_then(|b| b.get(..read_length_in_bytes))
3236+
.ok_or_else(|| crate::format_err!("read pointer out of bounds"))?;
3237+
}
3238+
3239+
if write_caller_instance == read_caller_instance
3240+
&& !allow_intra_component_read_write(write_payload_ty)
3241+
{
3242+
bail!(
3243+
"cannot read from and write to intra-component future/stream with non-numeric payload"
3244+
)
3245+
}
3246+
31913247
match (write_ty, read_ty) {
3192-
(TransmitIndex::Future(write_ty), TransmitIndex::Future(read_ty)) => {
3248+
(TransmitIndex::Future(_), TransmitIndex::Future(_)) => {
31933249
if count != 1 {
31943250
bail_bug!("futures can only send 1 item");
31953251
}
31963252

3197-
let payload = types[types[write_ty].ty].payload;
3198-
3199-
if write_caller_instance == read_caller_instance
3200-
&& !allow_intra_component_read_write(payload)
3201-
{
3202-
bail!(
3203-
"cannot read from and write to intra-component future with non-numeric payload"
3204-
)
3205-
}
3206-
3207-
let val = payload
3253+
let val = write_payload_ty
32083254
.map(|ty| {
3209-
let lift =
3210-
&mut LiftContext::new(store.0.store_opaque_mut(), write_options, self);
3211-
3212-
let abi = lift.types.canonical_abi(&ty);
3213-
// FIXME: needs to read an i64 for memory64
3214-
if write_address % usize::try_from(abi.align32)? != 0 {
3215-
bail!("write pointer not aligned");
3216-
}
3217-
3218-
let bytes = lift
3219-
.memory()
3220-
.get(write_address..)
3221-
.and_then(|b| b.get(..usize::try_from(abi.size32).ok()?))
3222-
.ok_or_else(|| {
3223-
crate::format_err!("write pointer out of bounds of memory")
3224-
})?;
3225-
3226-
Val::load(lift, ty, bytes)
3255+
let lift = &mut LiftContext::new(store, write_options, self);
3256+
let bytes = &lift.memory()[write_address..][..write_length_in_bytes];
3257+
Val::load(lift, *ty, bytes)
32273258
})
32283259
.transpose()?;
32293260

32303261
if let Some(val) = val {
32313262
// Serializing the value may require calling the guest's realloc function, so we
32323263
// set the guest's thread context in case realloc requires it, and restore the original
32333264
// thread context after the copy is complete.
3234-
let old_thread = store.0.set_thread(read_caller_thread)?;
3265+
let old_thread = store.set_thread(read_caller_thread)?;
32353266
let lower = &mut LowerContext::new(store.as_context_mut(), read_options, self);
3236-
let types = lower.types;
3237-
let ty = match types[types[read_ty].ty].payload {
3238-
Some(ty) => ty,
3239-
None => bail_bug!("expected payload type to be present"),
3240-
};
32413267
let ptr = func::validate_inbounds_dynamic(
3242-
types.canonical_abi(&ty),
3268+
read_abi,
32433269
lower.as_slice_mut(),
32443270
&ValRaw::u32(read_address.try_into()?),
32453271
)?;
3246-
val.store(lower, ty, ptr)?;
3247-
store.0.set_thread(old_thread)?;
3272+
let ty = match read_payload_ty {
3273+
Some(ty) => ty,
3274+
None => bail_bug!("expected read payload type to be present"),
3275+
};
3276+
val.store(lower, *ty, ptr)?;
3277+
store.set_thread(old_thread)?;
32483278
}
32493279
}
3250-
(TransmitIndex::Stream(write_ty), TransmitIndex::Stream(read_ty)) => {
3251-
if write_caller_instance == read_caller_instance
3252-
&& !allow_intra_component_read_write(types[types[write_ty].ty].payload)
3253-
{
3254-
bail!(
3255-
"cannot read from and write to intra-component stream with non-numeric payload"
3256-
)
3280+
(TransmitIndex::Stream(_), TransmitIndex::Stream(_)) => {
3281+
if write_length_in_bytes == 0 {
3282+
return Ok(());
32573283
}
3258-
3259-
if let Some(flat_abi) = flat_abi {
3284+
let write_payload_ty = match write_payload_ty {
3285+
Some(ty) => ty,
3286+
None => bail_bug!("expected write payload type to be present"),
3287+
};
3288+
let read_payload_ty = match read_payload_ty {
3289+
Some(ty) => ty,
3290+
None => bail_bug!("expected read payload type to be present"),
3291+
};
3292+
if flat_abi.is_some() {
32603293
// Fast path memcpy for "flat" (i.e. no pointers or handles) payloads:
3261-
let length_in_bytes = usize::try_from(flat_abi.size)? * count;
3262-
if length_in_bytes > 0 {
3263-
if write_address % usize::try_from(flat_abi.align)? != 0 {
3264-
bail!("write pointer not aligned");
3265-
}
3266-
if read_address % usize::try_from(flat_abi.align)? != 0 {
3267-
bail!("read pointer not aligned");
3268-
}
3294+
let store_opaque = store.store_opaque_mut();
32693295

3270-
let store_opaque = store.0.store_opaque_mut();
3296+
assert_eq!(read_length_in_bytes, write_length_in_bytes);
32713297

3272-
{
3273-
let src = self
3274-
.options_memory(store_opaque, write_options)
3275-
.get(write_address..)
3276-
.and_then(|b| b.get(..length_in_bytes))
3277-
.ok_or_else(|| {
3278-
crate::format_err!("write pointer out of bounds of memory")
3279-
})?
3280-
.as_ptr();
3281-
let dst = self
3282-
.options_memory_mut(store_opaque, read_options)
3283-
.get_mut(read_address..)
3284-
.and_then(|b| b.get_mut(..length_in_bytes))
3285-
.ok_or_else(|| {
3286-
crate::format_err!("read pointer out of bounds of memory")
3287-
})?
3288-
.as_mut_ptr();
3289-
// SAFETY: Both `src` and `dst` have been validated
3290-
// above.
3291-
unsafe {
3292-
if write_caller_instance == read_caller_instance {
3293-
// If the same instance owns both ends of
3294-
// the stream, the source and destination
3295-
// buffers might overlap.
3296-
src.copy_to(dst, length_in_bytes)
3297-
} else {
3298-
// Since the read and write ends of the
3299-
// stream are owned by distinct instances,
3300-
// the buffers cannot possibly belong to the
3301-
// same memory and thus cannot overlap.
3302-
src.copy_to_nonoverlapping(dst, length_in_bytes)
3303-
}
3304-
}
3298+
if write_caller_instance == read_caller_instance {
3299+
let memory = self.options_memory_mut(store_opaque, read_options);
3300+
memory.copy_within(
3301+
write_address..write_address + write_length_in_bytes,
3302+
read_address,
3303+
);
3304+
} else {
3305+
let src = self.options_memory(store_opaque, write_options)[write_address..]
3306+
[..write_length_in_bytes]
3307+
.as_ptr();
3308+
let dst = self.options_memory_mut(store_opaque, read_options)
3309+
[read_address..][..read_length_in_bytes]
3310+
.as_mut_ptr();
3311+
3312+
// SAFETY: Both `src` and `dst` have been validated
3313+
// above to be valid pointers as they're derived from
3314+
// slices that have the desired length with the desired
3315+
// read/write permission. The `unsafe` bit here is that
3316+
// the memories are disjoint (present in different
3317+
// instances) and there's no easy way to borrow both
3318+
// simultaneously from the store. Different instances
3319+
// are guaranteed to be disjoint, however, so the
3320+
// `unsafe` here should be ok.
3321+
unsafe {
3322+
src.copy_to_nonoverlapping(dst, write_length_in_bytes);
33053323
}
33063324
}
33073325
} else {
3308-
let store_opaque = store.0.store_opaque_mut();
3326+
let store_opaque = store.store_opaque_mut();
33093327
let lift = &mut LiftContext::new(store_opaque, write_options, self);
3310-
let ty = match lift.types[lift.types[write_ty].ty].payload {
3311-
Some(ty) => ty,
3312-
None => bail_bug!("expected payload type to be present"),
3313-
};
3314-
let abi = lift.types.canonical_abi(&ty);
3315-
let size = usize::try_from(abi.size32)?;
3316-
if write_address % usize::try_from(abi.align32)? != 0 {
3317-
bail!("write pointer not aligned");
3318-
}
3319-
let bytes = lift
3320-
.memory()
3321-
.get(write_address..)
3322-
.and_then(|b| b.get(..size * count))
3323-
.ok_or_else(|| {
3324-
crate::format_err!("write pointer out of bounds of memory")
3325-
})?;
3328+
let bytes = &lift.memory()[write_address..][..write_length_in_bytes];
33263329
lift.consume_fuel_array(count, size_of::<Val>())?;
33273330

33283331
let values = (0..count)
3329-
.map(|index| Val::load(lift, ty, &bytes[(index * size)..][..size]))
3332+
.map(|index| {
3333+
let size = usize::try_from(write_abi.size32)?;
3334+
Val::load(lift, *write_payload_ty, &bytes[(index * size)..][..size])
3335+
})
33303336
.collect::<Result<Vec<_>>>()?;
33313337

33323338
let id = TableId::<TransmitHandle>::new(rep);
@@ -3335,30 +3341,14 @@ impl Instance {
33353341
// Serializing the value may require calling the guest's realloc function, so we
33363342
// set the guest's thread context in case realloc requires it, and restore the original
33373343
// thread context after the copy is complete.
3338-
let old_thread = store.0.set_thread(read_caller_thread)?;
3344+
let old_thread = store.set_thread(read_caller_thread)?;
33393345
let lower = &mut LowerContext::new(store.as_context_mut(), read_options, self);
3340-
let ty = match lower.types[lower.types[read_ty].ty].payload {
3341-
Some(ty) => ty,
3342-
None => bail_bug!("expected payload type to be present"),
3343-
};
3344-
let abi = lower.types.canonical_abi(&ty);
3345-
if read_address % usize::try_from(abi.align32)? != 0 {
3346-
bail!("read pointer not aligned");
3347-
}
3348-
let size = usize::try_from(abi.size32)?;
3349-
lower
3350-
.as_slice_mut()
3351-
.get_mut(read_address..)
3352-
.and_then(|b| b.get_mut(..size * count))
3353-
.ok_or_else(|| {
3354-
crate::format_err!("read pointer out of bounds of memory")
3355-
})?;
33563346
let mut ptr = read_address;
33573347
for value in values {
3358-
value.store(lower, ty, ptr)?;
3359-
ptr += size
3348+
value.store(lower, *read_payload_ty, ptr)?;
3349+
ptr += usize::try_from(read_abi.size32)?;
33603350
}
3361-
store.0.set_thread(old_thread)?;
3351+
store.set_thread(old_thread)?;
33623352
}
33633353
}
33643354
_ => bail_bug!("mismatched transmit types in copy"),
@@ -3535,8 +3525,8 @@ impl Instance {
35353525

35363526
let instance = self.id().get_mut(store.0);
35373527
let types = instance.component().types();
3538-
let item_size = match payload(ty, types) {
3539-
Some(ty) => usize::try_from(types.canonical_abi(&ty).size32)?,
3528+
let item_size = match ty.payload(types) {
3529+
Some(ty) => usize::try_from(types.canonical_abi(ty).size32)?,
35403530
None => 0,
35413531
};
35423532
let concurrent_state = store.0.concurrent_state_mut();
@@ -3761,8 +3751,8 @@ impl Instance {
37613751

37623752
let instance = self.id().get_mut(store.0);
37633753
let types = instance.component().types();
3764-
let item_size = match payload(ty, types) {
3765-
Some(ty) => usize::try_from(types.canonical_abi(&ty).size32)?,
3754+
let item_size = match ty.payload(types) {
3755+
Some(ty) => usize::try_from(types.canonical_abi(ty).size32)?,
37663756
None => 0,
37673757
};
37683758
let concurrent_state = store.0.concurrent_state_mut();
@@ -4741,7 +4731,7 @@ impl Waitable {
47414731
/// Determine whether an intra-component read/write is allowed for the specified
47424732
/// `stream` or `future` payload type according to the component model
47434733
/// specification.
4744-
fn allow_intra_component_read_write(ty: Option<InterfaceType>) -> bool {
4734+
fn allow_intra_component_read_write(ty: Option<&InterfaceType>) -> bool {
47454735
matches!(
47464736
ty,
47474737
None | Some(

0 commit comments

Comments
 (0)