11//! HTTP body types
22
3- use crate :: io:: { AsyncInputStream , AsyncRead , Cursor , Empty } ;
3+ use crate :: http:: fields:: header_map_from_wasi;
4+ use crate :: io:: { AsyncInputStream , AsyncOutputStream , AsyncRead , AsyncWrite , Cursor , Empty } ;
5+ use crate :: runtime:: AsyncPollable ;
46use core:: fmt;
57use http:: header:: { CONTENT_LENGTH , TRANSFER_ENCODING } ;
68use wasi:: http:: types:: IncomingBody as WasiIncomingBody ;
@@ -116,9 +118,9 @@ impl Body for Empty {
116118pub struct IncomingBody {
117119 kind : BodyKind ,
118120 // IMPORTANT: the order of these fields here matters. `body_stream` must
119- // be dropped before `_incoming_body `.
121+ // be dropped before `incoming_body `.
120122 body_stream : AsyncInputStream ,
121- _incoming_body : WasiIncomingBody ,
123+ incoming_body : WasiIncomingBody ,
122124}
123125
124126impl IncomingBody {
@@ -130,9 +132,29 @@ impl IncomingBody {
130132 Self {
131133 kind,
132134 body_stream,
133- _incoming_body : incoming_body,
135+ incoming_body,
134136 }
135137 }
138+
139+ /// Consume this `IncomingBody` and return the trailers, if present.
140+ pub async fn finish ( self ) -> Result < Option < HeaderMap > , Error > {
141+ // The stream is a child resource of the `IncomingBody`, so ensure that
142+ // it's dropped first.
143+ drop ( self . body_stream ) ;
144+
145+ let trailers = WasiIncomingBody :: finish ( self . incoming_body ) ;
146+
147+ AsyncPollable :: new ( trailers. subscribe ( ) ) . wait_for ( ) . await ;
148+
149+ let trailers = trailers. get ( ) . unwrap ( ) . unwrap ( ) ?;
150+
151+ let trailers = match trailers {
152+ None => None ,
153+ Some ( trailers) => Some ( header_map_from_wasi ( trailers) ?) ,
154+ } ;
155+
156+ Ok ( trailers)
157+ }
136158}
137159
138160impl AsyncRead for IncomingBody {
@@ -177,3 +199,79 @@ impl From<InvalidContentLength> for Error {
177199 ErrorVariant :: Other ( e. to_string ( ) ) . into ( )
178200 }
179201}
202+
203+ /// The output stream for the body, implementing [`AsyncWrite`]. Call
204+ /// [`Responder::start_response`] to obtain one. Once the body is complete,
205+ /// it must be declared finished, using [`OutgoingBody::finish`].
206+ #[ must_use]
207+ pub struct OutgoingBody {
208+ // IMPORTANT: the order of these fields here matters. `stream` must
209+ // be dropped before `body`.
210+ stream : AsyncOutputStream ,
211+ body : wasi:: http:: types:: OutgoingBody ,
212+ dontdrop : DontDropOutgoingBody ,
213+ }
214+
215+ impl OutgoingBody {
216+ pub ( crate ) fn new ( stream : AsyncOutputStream , body : wasi:: http:: types:: OutgoingBody ) -> Self {
217+ Self {
218+ stream,
219+ body,
220+ dontdrop : DontDropOutgoingBody ,
221+ }
222+ }
223+
224+ pub ( crate ) fn consume ( self ) -> ( AsyncOutputStream , wasi:: http:: types:: OutgoingBody ) {
225+ let Self {
226+ stream,
227+ body,
228+ dontdrop,
229+ } = self ;
230+
231+ std:: mem:: forget ( dontdrop) ;
232+
233+ ( stream, body)
234+ }
235+
236+ /// Return a reference to the underlying `AsyncOutputStream`.
237+ ///
238+ /// This usually isn't needed, as `OutgoingBody` implements `AsyncWrite`
239+ /// too, however it is useful for code that expects to work with
240+ /// `AsyncOutputStream` specifically.
241+ pub fn stream ( & mut self ) -> & mut AsyncOutputStream {
242+ & mut self . stream
243+ }
244+ }
245+
246+ impl AsyncWrite for OutgoingBody {
247+ async fn write ( & mut self , buf : & [ u8 ] ) -> crate :: io:: Result < usize > {
248+ self . stream . write ( buf) . await
249+ }
250+
251+ async fn flush ( & mut self ) -> crate :: io:: Result < ( ) > {
252+ self . stream . flush ( ) . await
253+ }
254+
255+ fn as_async_output_stream ( & self ) -> Option < & AsyncOutputStream > {
256+ Some ( & self . stream )
257+ }
258+ }
259+
260+ /// A utility to ensure that `OutgoingBody` is either finished or failed, and
261+ /// not implicitly dropped.
262+ struct DontDropOutgoingBody ;
263+
264+ impl Drop for DontDropOutgoingBody {
265+ fn drop ( & mut self ) {
266+ unreachable ! ( "`OutgoingBody::drop` called; `OutgoingBody`s should be consumed with `finish` or `fail`." ) ;
267+ }
268+ }
269+
270+ /// A placeholder for use as the type parameter to [`Response`] to indicate
271+ /// that the body has not yet started. This is used with
272+ /// [`Responder::start_response`], which has a `Response<BodyForthcoming>`
273+ /// argument.
274+ ///
275+ /// To instead start the response and obtain the output stream for the body,
276+ /// use [`Responder::respond`].
277+ pub struct BodyForthcoming ;
0 commit comments