Skip to content

Commit d3aa8af

Browse files
authored
cm-async: Remove assertion when disposing a task (#13092)
* cm-async: Remove assertion when disposing a task This commit removes a now-older assertion when disposing of a guest task that all threads are empty. This assertion is hit in a situation where `TypedFunc::call_async` is used where the guest can't be entered due to backpressure. The assertion is a bit dated now and originally this helper function handled reparenting, but that's all gone now too. This commit removes the assertion entirely meaning that in this situation the guest thread will "leak" into the host, but the store is locked down at that point anyway, so this is in theory reasonable. * Fix miri testing
1 parent da45b53 commit d3aa8af

2 files changed

Lines changed: 65 additions & 13 deletions

File tree

crates/wasmtime/src/runtime/component/concurrent.rs

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1578,7 +1578,7 @@ impl StoreOpaque {
15781578
let state = self.concurrent_state_mut();
15791579
let task = state.get_mut(thread.task)?;
15801580
if task.ready_to_delete() {
1581-
state.delete(thread.task)?.dispose(state)?;
1581+
state.delete(thread.task)?;
15821582
}
15831583

15841584
Ok(())
@@ -4612,12 +4612,6 @@ impl GuestTask {
46124612
async_function,
46134613
})
46144614
}
4615-
4616-
/// Dispose of this guest task.
4617-
fn dispose(self, _state: &mut ConcurrentState) -> Result<()> {
4618-
assert!(self.threads.is_empty());
4619-
Ok(())
4620-
}
46214615
}
46224616

46234617
impl TableDebug for GuestTask {
@@ -4778,7 +4772,7 @@ impl Waitable {
47784772
}
47794773
Self::Guest(task) => {
47804774
log::trace!("delete guest task {task:?}");
4781-
state.delete(*task)?.dispose(state)?;
4775+
state.delete(*task)?;
47824776
}
47834777
Self::Transmit(task) => {
47844778
state.delete(*task)?;
@@ -5335,13 +5329,14 @@ impl TaskId {
53355329
/// and delete the task when all threads are done.
53365330
pub(crate) fn host_future_dropped<T>(&self, store: StoreContextMut<T>) -> Result<()> {
53375331
let task = store.0.concurrent_state_mut().get_mut(self.task)?;
5338-
if !task.already_lowered_parameters() {
5339-
Waitable::Guest(self.task).delete_from(store.0.concurrent_state_mut())?
5332+
let delete = if !task.already_lowered_parameters() {
5333+
true
53405334
} else {
53415335
task.host_future_state = HostFutureState::Dropped;
5342-
if task.ready_to_delete() {
5343-
Waitable::Guest(self.task).delete_from(store.0.concurrent_state_mut())?
5344-
}
5336+
task.ready_to_delete()
5337+
};
5338+
if delete {
5339+
Waitable::Guest(self.task).delete_from(store.0.concurrent_state_mut())?
53455340
}
53465341
Ok(())
53475342
}

tests/all/component_model/async.rs

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1324,3 +1324,60 @@ async fn bytes_stream_producer() -> Result<()> {
13241324

13251325
Ok(())
13261326
}
1327+
1328+
#[tokio::test]
1329+
#[cfg_attr(miri, ignore)]
1330+
async fn drop_deadlocked_typed_future() -> Result<()> {
1331+
let mut config = Config::new();
1332+
config.wasm_component_model_async(true);
1333+
1334+
let engine = Engine::new(&config)?;
1335+
let component = Component::new(
1336+
&engine,
1337+
r#"
1338+
(component
1339+
(core func $backpressure_inc (canon backpressure.inc))
1340+
1341+
(core module $m
1342+
(import "" "backpressure.inc" (func $backpressure_inc))
1343+
(func (export "set-backpressure") (call $backpressure_inc))
1344+
(func (export "target") (result i32) unreachable)
1345+
(func (export "callback") (param i32 i32 i32) (result i32) unreachable)
1346+
)
1347+
1348+
(core instance $i (instantiate $m
1349+
(with "" (instance (export "backpressure.inc" (func $backpressure_inc))))
1350+
))
1351+
1352+
(func (export "set-backpressure")
1353+
(canon lift (core func $i "set-backpressure")))
1354+
1355+
(func (export "target")
1356+
(canon lift (core func $i "target") async (callback (func $i "callback"))))
1357+
)
1358+
"#,
1359+
)?;
1360+
let mut store = Store::new(&engine, ());
1361+
let linker = Linker::<()>::new(&engine);
1362+
let instance = linker.instantiate_async(&mut store, &component).await?;
1363+
1364+
// Force the instance to deadlock the next call by turning on backpressure
1365+
// meaning it can't enter the instance.
1366+
instance
1367+
.get_typed_func::<(), ()>(&mut store, "set-backpressure")?
1368+
.call_async(&mut store, ())
1369+
.await?;
1370+
1371+
// This call should fail due to deadlock since no progress is possible. When
1372+
// the future is dropped that shouldn't cause anything to go awry...
1373+
let result = instance
1374+
.get_typed_func::<(), ()>(&mut store, "target")?
1375+
.call_async(&mut store, ())
1376+
.await;
1377+
1378+
assert!(result.is_err(), "expected an error, got Ok");
1379+
let trap = result.unwrap_err().downcast::<Trap>()?;
1380+
assert_eq!(trap, Trap::AsyncDeadlock);
1381+
1382+
Ok(())
1383+
}

0 commit comments

Comments
 (0)