Skip to content

Commit 92bfb73

Browse files
authored
Fix for #12903 (#12904)
* Fix for #12903 and tests * Rebased to upstream/main
1 parent c2e71eb commit 92bfb73

2 files changed

Lines changed: 140 additions & 40 deletions

File tree

crates/wasi-http/src/p3/body.rs

Lines changed: 50 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -484,57 +484,67 @@ where
484484
if self.body.is_end_stream() {
485485
break 'result Ok(None);
486486
} else {
487-
return Poll::Ready(Ok(StreamResult::Completed));
487+
// Destination has zero capacity but the body could still have data. Cannot return
488+
// `Completed` (nothing was produced — violates the StreamProducer contract). Fall
489+
// through to poll the body with `cap = None` so it buffers the frame.
490+
None
488491
}
489492
}
490493
None => None,
491494
};
492-
match Pin::new(&mut self.body).poll_frame(cx) {
493-
Poll::Ready(Some(Ok(frame))) => {
494-
match frame.into_data().map_err(http_body::Frame::into_trailers) {
495-
Ok(mut frame) => {
496-
// Libraries like `Reqwest` generate a 0-length frame after sensing end-of-stream,
497-
// so we have to check for the body's end-of-stream indicator here too
498-
if frame.len() == 0 && self.body.is_end_stream() {
499-
break 'result Ok(None);
500-
}
495+
loop {
496+
match Pin::new(&mut self.body).poll_frame(cx) {
497+
Poll::Ready(Some(Ok(frame))) => {
498+
match frame.into_data().map_err(http_body::Frame::into_trailers) {
499+
Ok(mut frame) => {
500+
if frame.len() == 0 {
501+
// Zero-length data frames are valid per the http_body::Body
502+
// trait contract and RFC 9113 §6.1 — skip and re-poll
503+
// directly rather than using `wake_by_ref()` + Pending, which
504+
// would just re-enter this function via the task queue
505+
if self.body.is_end_stream() {
506+
break 'result Ok(None);
507+
}
508+
continue;
509+
}
501510

502-
if let Some(cap) = cap {
503-
let n = frame.len();
504-
let cap = cap.into();
505-
if n > cap {
506-
// data frame does not fit in destination, fill it and buffer the rest
507-
dst.set_buffer(Cursor::new(frame.split_off(cap)));
508-
let mut dst = dst.as_direct(store, cap);
509-
dst.remaining().copy_from_slice(&frame);
510-
dst.mark_written(cap);
511+
if let Some(cap) = cap {
512+
let n = frame.len();
513+
let cap = cap.into();
514+
if n > cap {
515+
// data frame does not fit in destination, fill it and buffer the rest
516+
dst.set_buffer(Cursor::new(frame.split_off(cap)));
517+
let mut dst = dst.as_direct(store, cap);
518+
dst.remaining().copy_from_slice(&frame);
519+
dst.mark_written(cap);
520+
} else {
521+
// copy the whole frame into the destination
522+
let mut dst = dst.as_direct(store, n);
523+
dst.remaining()[..n].copy_from_slice(&frame);
524+
dst.mark_written(n);
525+
}
511526
} else {
512-
// copy the whole frame into the destination
513-
let mut dst = dst.as_direct(store, n);
514-
dst.remaining()[..n].copy_from_slice(&frame);
515-
dst.mark_written(n);
527+
dst.set_buffer(Cursor::new(frame));
516528
}
517-
} else {
518-
dst.set_buffer(Cursor::new(frame));
529+
return Poll::Ready(Ok(StreamResult::Completed));
519530
}
520-
return Poll::Ready(Ok(StreamResult::Completed));
521-
}
522-
Err(Ok(trailers)) => {
523-
let view = (self.getter)(store.data_mut());
524-
let trailers = FieldMap::new_immutable(trailers);
525-
let trailers = view
526-
.table
527-
.push(trailers)
528-
.context("failed to push trailers to table")?;
529-
break 'result Ok(Some(trailers));
531+
Err(Ok(trailers)) => {
532+
let view = (self.getter)(store.data_mut());
533+
let trailers = FieldMap::new_immutable(trailers);
534+
let trailers = view
535+
.table
536+
.push(trailers)
537+
.context("failed to push trailers to table")?;
538+
break 'result Ok(Some(trailers));
539+
}
540+
Err(Err(..)) => break 'result Err(ErrorCode::HttpProtocolError),
530541
}
531-
Err(Err(..)) => break 'result Err(ErrorCode::HttpProtocolError),
532542
}
543+
Poll::Ready(Some(Err(err))) => break 'result Err(err),
544+
Poll::Ready(None) => break 'result Ok(None),
545+
Poll::Pending if finish => return Poll::Ready(Ok(StreamResult::Cancelled)),
546+
Poll::Pending => return Poll::Pending,
533547
}
534-
Poll::Ready(Some(Err(err))) => break 'result Err(err),
535-
Poll::Ready(None) => break 'result Ok(None),
536-
Poll::Pending if finish => return Poll::Ready(Ok(StreamResult::Cancelled)),
537-
Poll::Pending => return Poll::Pending,
538548
}
539549
};
540550
self.close(res);

crates/wasi-http/tests/all/p3/mod.rs

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -731,3 +731,93 @@ async fn p3_http_data_frame_at_end_of_stream() -> Result<()> {
731731
assert_eq!(collected_body, expected.as_slice());
732732
Ok(())
733733
}
734+
735+
/// Body wrapper that interleaves zero-length data frames with real data.
736+
///
737+
/// Zero-length data frames are valid per the `http_body::Body` trait contract and
738+
/// RFC 9113 §6.1. This wrapper injects `empty_per_frame` empty frames before each
739+
/// real frame from the inner body, testing that `HostBodyStreamProducer` tolerates them.
740+
struct BodyWithEmptyFrames {
741+
inner: http_body_util::StreamBody<
742+
futures::channel::mpsc::Receiver<Result<http_body::Frame<Bytes>, ErrorCode>>,
743+
>,
744+
/// Number of empty frames to inject before each real frame
745+
empty_per_frame: usize,
746+
/// Number of empty frames left to inject before the next real frame
747+
empty_remaining: usize,
748+
}
749+
750+
impl http_body::Body for BodyWithEmptyFrames {
751+
type Data = Bytes;
752+
type Error = ErrorCode;
753+
754+
fn poll_frame(
755+
mut self: Pin<&mut Self>,
756+
cx: &mut Context<'_>,
757+
) -> Poll<Option<Result<http_body::Frame<Self::Data>, Self::Error>>> {
758+
if self.empty_remaining > 0 {
759+
self.empty_remaining -= 1;
760+
return Poll::Ready(Some(Ok(http_body::Frame::data(Bytes::new()))));
761+
}
762+
let this = &mut *self;
763+
let result = Pin::new(&mut this.inner).poll_frame(cx);
764+
if matches!(&result, Poll::Ready(Some(Ok(_)))) {
765+
// Reset counter so empty frames are injected before the next real frame
766+
this.empty_remaining = this.empty_per_frame;
767+
}
768+
result
769+
}
770+
771+
fn is_end_stream(&self) -> bool {
772+
false
773+
}
774+
}
775+
776+
#[test_log::test(tokio::test(flavor = "multi_thread"))]
777+
async fn p3_http_empty_frames_interleaved() -> Result<()> {
778+
_ = env_logger::try_init();
779+
780+
// Verifies that zero-length data frames interleaved with real data do not cause
781+
// poll_produce to return Completed with 0 items produced.
782+
// Pattern: empty, empty, data("hello "), empty, empty, data("world")
783+
784+
let (mut body_tx, body_rx) = futures::channel::mpsc::channel::<Result<_, ErrorCode>>(2);
785+
786+
let wrapped_body = BodyWithEmptyFrames {
787+
inner: http_body_util::StreamBody::new(body_rx),
788+
empty_per_frame: 2,
789+
empty_remaining: 2,
790+
};
791+
792+
let request = http::Request::builder()
793+
.uri("http://localhost/")
794+
.method(http::Method::GET);
795+
796+
let response = futures::join!(
797+
run_http(
798+
P3_HTTP_ECHO_COMPONENT,
799+
request.body(wrapped_body)?,
800+
oneshot::channel().0
801+
),
802+
async {
803+
body_tx
804+
.send(Ok(http_body::Frame::data(Bytes::from_static(b"hello "))))
805+
.await
806+
.unwrap();
807+
body_tx
808+
.send(Ok(http_body::Frame::data(Bytes::from_static(b"world"))))
809+
.await
810+
.unwrap();
811+
drop(body_tx);
812+
}
813+
)
814+
.0?
815+
.unwrap();
816+
817+
assert_eq!(response.status().as_u16(), 200);
818+
819+
let (_, collected_body) = response.into_parts();
820+
let collected_body = collected_body.to_bytes();
821+
assert_eq!(collected_body, b"hello world".as_slice());
822+
Ok(())
823+
}

0 commit comments

Comments
 (0)