Skip to content

Commit 13de370

Browse files
fix(bindgen): stream timing issue
This commit fixes some stream timing issues -- in particular when hosts read streams *right* after a final write, but before they were removed by the component. In this case the host would inconsistently hang, waiting for a read from the writer that would never come as it had been dropped.
1 parent 82a1d19 commit 13de370

2 files changed

Lines changed: 48 additions & 79 deletions

File tree

crates/js-component-bindgen/src/intrinsics/p3/async_stream.rs

Lines changed: 45 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -556,13 +556,6 @@ impl AsyncStreamIntrinsic {
556556
}}
557557
558558
onCopyDoneFn({stream_end_class}.CopyResult.COMPLETED);
559-
560-
// After successfully doing a guest write, we may need to
561-
// notify a blocked/waiting host read that it can continue
562-
//
563-
// We notify the other end of the stream (likely held by the hsot)
564-
// *after* the transfer (above) and book-keeping is done, if one occurred.
565-
if (transferred) {{ this.#otherEndNotify(); }}
566559
}}
567560
"#,
568561
),
@@ -618,13 +611,6 @@ impl AsyncStreamIntrinsic {
618611
619612
onCopyDoneFn({stream_end_class}.CopyResult.COMPLETED);
620613
621-
// After successfully doing a guest read, we may need to
622-
// notify a blocked/waiting host write that it can continue
623-
//
624-
// We must only notify the other end of the stream (likely held
625-
// by the host) after a transfer (above) and book-keeping is done.
626-
if (transferred) {{ this.#otherEndNotify(); }}
627-
628614
return;
629615
}}
630616
@@ -648,7 +634,16 @@ impl AsyncStreamIntrinsic {
648634
let copy_impl = format!(
649635
r#"
650636
async copy(args) {{
651-
const {{ isAsync, memory, componentIdx, ptr, count, eventCode, initial }} = args;
637+
const {{
638+
isAsync,
639+
memory,
640+
componentIdx,
641+
ptr,
642+
count,
643+
eventCode,
644+
initial,
645+
skipStateCheck,
646+
}} = args;
652647
if (eventCode === undefined) {{ throw new TypeError('missing/invalid event code'); }}
653648
654649
if (this.isDropped()) {{
@@ -669,6 +664,7 @@ impl AsyncStreamIntrinsic {
669664
buffer: args.buffer,
670665
bufferID: args.bufferID,
671666
initial,
667+
skipStateCheck,
672668
}});
673669
674670
// Perform the read/write
@@ -683,7 +679,7 @@ impl AsyncStreamIntrinsic {
683679
if (!this.hasPendingEvent()) {{
684680
if (isAsync) {{
685681
this.setCopyState({stream_end_class}.CopyState.ASYNC_COPYING);
686-
{debug_log_fn}('[{stream_end_class}#copy()] blocked');
682+
{debug_log_fn}('[{stream_end_class}#copy()] blocked', {{ componentIdx, eventCode, self: this }});
687683
return {async_blocked_const};
688684
}} else {{
689685
this.setCopyState({stream_end_class}.CopyState.SYNC_COPYING);
@@ -807,18 +803,32 @@ impl AsyncStreamIntrinsic {
807803
// If the write was blocked, we can only make progress when
808804
// the read side notifies us of a read, then we must attempt the copy again
809805
810-
await this.#otherEndWait();
806+
await new Promise((resolve) => {{
807+
let waitInterval = setInterval(async () => {{
808+
if (!this.hasPendingEvent()) {{ return; }}
809+
clearInterval(waitInterval);
810+
resolve();
811+
}});
812+
}});
811813
812814
packedResult = await this.copy({{
813815
isAsync: true,
814816
count,
815817
bufferID,
816818
buffer,
817819
eventCode: {async_event_code_enum}.STREAM_WRITE,
820+
// NOTE: we skip state checks only when dealing with a post blocked
821+
// read/write in the host. This enables the host to quickly pick up the
822+
// guest operation on the otherside quickly.
818823
skipStateCheck: true,
819824
componentIdx: -1,
820825
}});
821826
827+
const copied = packedResult >> 4;
828+
if (copied === 0 && this.isDone()) {{
829+
reject(new Error("read end dropped during write"));
830+
}}
831+
822832
if (packedResult === {async_blocked_const}) {{
823833
throw new Error("unexpected double block during write");
824834
}}
@@ -896,39 +906,37 @@ impl AsyncStreamIntrinsic {
896906
// If the read was blocked, we can only make progress when
897907
// the write side notifies us of a write, then we must attempt the copy again
898908
899-
await this.#otherEndWait();
909+
await new Promise((resolve) => {{
910+
let waitInterval = setInterval(() => {{
911+
if (!this.hasPendingEvent()) {{ return; }}
912+
clearInterval(waitInterval);
913+
resolve();
914+
}});
915+
}});
900916
901917
packedResult = await this.copy({{
902918
isAsync: true,
903919
count,
904920
bufferID,
905921
buffer,
906922
eventCode: {async_event_code_enum}.STREAM_READ,
923+
// NOTE: we skip state checks only when dealing with a post blocked
924+
// read/write in the host. This enables the host to quickly pick up the
925+
// guest operation on the otherside quickly.
907926
skipStateCheck: true,
908927
componentIdx: -1,
909928
}});
910929
930+
const copied = packedResult >> 4;
931+
if (copied === 0 && this.isDone()) {{
932+
reject(new Error("write end dropped during read"));
933+
}}
934+
911935
if (packedResult === {async_blocked_const}) {{
912936
throw new Error("unexpected double block during read");
913937
}}
914938
}}
915939
916-
let copied = packedResult >> 4;
917-
let result = packedResult & 0x000F;
918-
919-
// Due to async timing vagaries, it is possible to get to this point
920-
// and have an event have come out from the copy despite the writer end
921-
// being closed or the reader being otherwise done:
922-
//
923-
// - The current CopyState is done (indicating a CopyResult.DROPPED being received)
924-
// - The current CopyResult is DROPPED
925-
//
926-
// These two cases often overlap
927-
//
928-
if (this.isDone() || result === {stream_end_class}.CopyResult.DROPPED) {{
929-
reject(new Error("read end is closed"));
930-
}}
931-
932940
const vs = buffer.read(count);
933941
const res = count === 1 ? vs[0] : vs;
934942
this.#result = null;
@@ -961,9 +969,6 @@ impl AsyncStreamIntrinsic {
961969
962970
#result = null;
963971
964-
#otherEndWait = null;
965-
#otherEndNotify = null;
966-
967972
constructor(args) {{
968973
{debug_log_fn}('[{end_class_name}#constructor()] args', args);
969974
super(args);
@@ -976,12 +981,6 @@ impl AsyncStreamIntrinsic {
976981
977982
if (args.tableIdx === undefined) {{ throw new Error('missing index for stream table idx'); }}
978983
this.#streamTableIdx = args.tableIdx;
979-
980-
if (args.otherEndNotify === undefined) {{ throw new Error('missing fn for notification'); }}
981-
this.#otherEndNotify = args.otherEndNotify;
982-
983-
if (args.otherEndWait === undefined) {{ throw new Error('missing fn for awaiting notification'); }}
984-
this.#otherEndWait = args.otherEndWait;
985984
}}
986985
987986
streamTableIdx() {{ return this.#streamTableIdx; }}
@@ -1004,9 +1003,9 @@ impl AsyncStreamIntrinsic {
10041003
10051004
{type_getter_impl}
10061005
1007-
isDone() {{ this.getCopyState() === {stream_end_class}.CopyState.DONE; }}
1008-
isCompleted() {{ this.getCopyState() === {stream_end_class}.CopyState.COMPLETED; }}
1009-
isDropped() {{ this.getCopyState() === {stream_end_class}.CopyState.DROPPED; }}
1006+
isDone() {{ return this.getCopyState() === {stream_end_class}.CopyState.DONE; }}
1007+
isCompleted() {{ return this.getCopyState() === {stream_end_class}.CopyState.COMPLETED; }}
1008+
isDropped() {{ return this.getCopyState() === {stream_end_class}.CopyState.DROPPED; }}
10101009
10111010
{action_impl}
10121011
{inner_rw_impl}
@@ -1055,7 +1054,6 @@ impl AsyncStreamIntrinsic {
10551054
let internal_stream_class_name = self.name();
10561055
let read_end_class = Self::StreamReadableEndClass.name();
10571056
let write_end_class = Self::StreamWritableEndClass.name();
1058-
let promise_with_resolvers_fn = Intrinsic::PromiseWithResolversPonyfill.name();
10591057

10601058
output.push_str(&format!(
10611059
r#"
@@ -1086,50 +1084,20 @@ impl AsyncStreamIntrinsic {
10861084
10871085
this.#elemMeta = elemMeta;
10881086
1089-
const writeNotify = () => {{
1090-
if (this.#writeWaitPromise === null) {{ return; }}
1091-
const resolve = this.#writeWaitPromise.resolve;
1092-
this.#writeWaitPromise = null;
1093-
resolve();
1094-
}};
1095-
const writeWait = () => {{
1096-
if (this.#writeWaitPromise === null) {{
1097-
this.#writeWaitPromise = {promise_with_resolvers_fn}();
1098-
}}
1099-
return this.#writeWaitPromise.promise;
1100-
}};
1101-
11021087
this.#readEnd = new {read_end_class}({{
11031088
tableIdx,
11041089
elemMeta: this.#elemMeta,
11051090
pendingBufferMeta: this.#pendingBufferMeta,
11061091
target: "stream read end (@ init)",
11071092
waitable: readWaitable,
1108-
otherEndWait: writeWait,
1109-
otherEndNotify: writeNotify,
11101093
}});
11111094
1112-
const readNotify = () => {{
1113-
if (this.#readWaitPromise === null) {{ return; }}
1114-
const resolve = this.#readWaitPromise.resolve;
1115-
this.#readWaitPromise = null;
1116-
resolve();
1117-
}};
1118-
const readWait = () => {{
1119-
if (this.#readWaitPromise === null) {{
1120-
this.#readWaitPromise = {promise_with_resolvers_fn}();
1121-
}}
1122-
return this.#readWaitPromise.promise;
1123-
}};
1124-
11251095
this.#writeEnd = new {write_end_class}({{
11261096
tableIdx,
11271097
elemMeta: this.#elemMeta,
11281098
pendingBufferMeta: this.#pendingBufferMeta,
11291099
target: "stream write end (@ init)",
11301100
waitable: writeWaitable,
1131-
otherEndWait: readWait,
1132-
otherEndNotify: readNotify,
11331101
}});
11341102
}}
11351103

crates/js-component-bindgen/src/intrinsics/p3/async_task.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1145,8 +1145,9 @@ impl AsyncTaskIntrinsic {
11451145
11461146
const ready = readyFn();
11471147
if (ready && {global_async_determinism} === 'random') {{
1148-
const coinFlip = {coin_flip_fn}();
1149-
if (coinFlip) {{ return true }}
1148+
// const coinFlip = {coin_flip_fn}();
1149+
// if (coinFlip) {{ return true }}
1150+
return true;
11501151
}}
11511152
11521153
const keepGoing = await this.immediateSuspend({{ cancellable, readyFn }});

0 commit comments

Comments
 (0)