1- use super :: { body:: IncomingBody , Body , Error , Request , Response , Result } ;
1+ use super :: {
2+ body:: { BodyForthcoming , IncomingBody , OutgoingBody } ,
3+ fields:: header_map_to_wasi,
4+ Body , Error , HeaderMap , Request , Response , Result ,
5+ } ;
26use crate :: http:: request:: try_into_outgoing;
37use crate :: http:: response:: try_from_incoming;
48use crate :: io:: { self , AsyncOutputStream , AsyncPollable } ;
59use crate :: time:: Duration ;
6- use wasi:: http:: types:: { OutgoingBody , RequestOptions as WasiRequestOptions } ;
10+ use wasi:: http:: types:: {
11+ FutureIncomingResponse as WasiFutureIncomingResponse , OutgoingBody as WasiOutgoingBody ,
12+ RequestOptions as WasiRequestOptions ,
13+ } ;
714
815/// An HTTP client.
916// Empty for now, but permits adding support for RequestOptions soon:
@@ -19,22 +26,27 @@ impl Client {
1926 }
2027
2128 /// Send an HTTP request.
29+ ///
30+ /// TODO: Should this automatically add a "Content-Length" header if the
31+ /// body size is known?
32+ ///
33+ /// To respond with trailers, use [`Client::start_request`] instead.
2234 pub async fn send < B : Body > ( & self , req : Request < B > ) -> Result < Response < IncomingBody > > {
2335 // We don't use `body::OutputBody` here because we can report I/O
2436 // errors from the `copy` directly.
2537 let ( wasi_req, body) = try_into_outgoing ( req) ?;
2638 let wasi_body = wasi_req. body ( ) . unwrap ( ) ;
27- let body_stream = wasi_body. write ( ) . unwrap ( ) ;
39+ let wasi_stream = wasi_body. write ( ) . unwrap ( ) ;
2840
2941 // 1. Start sending the request head
3042 let res = wasi:: http:: outgoing_handler:: handle ( wasi_req, self . wasi_options ( ) ?) . unwrap ( ) ;
3143
3244 // 2. Start sending the request body
33- io:: copy ( body, AsyncOutputStream :: new ( body_stream ) ) . await ?;
45+ io:: copy ( body, AsyncOutputStream :: new ( wasi_stream ) ) . await ?;
3446
3547 // 3. Finish sending the request body
3648 let trailers = None ;
37- OutgoingBody :: finish ( wasi_body, trailers) . unwrap ( ) ;
49+ WasiOutgoingBody :: finish ( wasi_body, trailers) . unwrap ( ) ;
3850
3951 // 4. Receive the response
4052 AsyncPollable :: new ( res. subscribe ( ) ) . wait_for ( ) . await ;
@@ -46,6 +58,55 @@ impl Client {
4658 try_from_incoming ( res)
4759 }
4860
61+ /// Start sending an HTTP request, and return an `OutgoingBody` stream to
62+ /// write the body to.
63+ ///
64+ /// The returned `OutgoingBody` must be consumed by [`Client::finish`] or
65+ /// [`Client::fail`].
66+ pub async fn start_request (
67+ & self ,
68+ req : Request < BodyForthcoming > ,
69+ ) -> Result < ( OutgoingBody , FutureIncomingResponse ) > {
70+ let ( wasi_req, _body_forthcoming) = try_into_outgoing ( req) ?;
71+ let wasi_body = wasi_req. body ( ) . unwrap ( ) ;
72+ let wasi_stream = wasi_body. write ( ) . unwrap ( ) ;
73+
74+ // Start sending the request head.
75+ let res = wasi:: http:: outgoing_handler:: handle ( wasi_req, self . wasi_options ( ) ?) . unwrap ( ) ;
76+
77+ let outgoing_body = OutgoingBody :: new ( AsyncOutputStream :: new ( wasi_stream) , wasi_body) ;
78+
79+ Ok ( ( outgoing_body, FutureIncomingResponse ( res) ) )
80+ }
81+
82+ /// Finish the body, optionally with trailers.
83+ ///
84+ /// This is used with [`Client::start_request`].
85+ pub fn finish ( body : OutgoingBody , trailers : Option < HeaderMap > ) -> Result < ( ) > {
86+ let ( stream, body) = body. consume ( ) ;
87+
88+ // The stream is a child resource of the `OutgoingBody`, so ensure that
89+ // it's dropped first.
90+ drop ( stream) ;
91+
92+ let wasi_trailers = match trailers {
93+ Some ( trailers) => Some ( header_map_to_wasi ( & trailers) ?) ,
94+ None => None ,
95+ } ;
96+
97+ wasi:: http:: types:: OutgoingBody :: finish ( body, wasi_trailers)
98+ . expect ( "body length did not match Content-Length header value" ) ;
99+ Ok ( ( ) )
100+ }
101+
102+ /// Consume the `OutgoingBody` and indicate that the body was not
103+ /// completed.
104+ ///
105+ /// This is used with [`Client::start_request`].
106+ pub fn fail ( body : OutgoingBody ) {
107+ let ( _stream, _body) = body. consume ( ) ;
108+ }
109+
49110 /// Set timeout on connecting to HTTP server
50111 pub fn set_connect_timeout ( & mut self , d : impl Into < Duration > ) {
51112 self . options_mut ( ) . connect_timeout = Some ( d. into ( ) ) ;
@@ -76,6 +137,25 @@ impl Client {
76137 }
77138}
78139
140+ /// Returned from [`Client::start_request`], this represents a handle to a
141+ /// response which has not arrived yet. Call [`FutureIncomingResponse::get`]
142+ /// to wait for the response.
143+ pub struct FutureIncomingResponse ( WasiFutureIncomingResponse ) ;
144+
145+ impl FutureIncomingResponse {
146+ /// Consume this `FutureIncomingResponse`, wait, and return the `Response`.
147+ pub async fn get ( self ) -> Result < Response < IncomingBody > > {
148+ // Wait for the response.
149+ AsyncPollable :: new ( self . 0 . subscribe ( ) ) . wait_for ( ) . await ;
150+
151+ // NOTE: the first `unwrap` is to ensure readiness, the second `unwrap`
152+ // is to trap if we try and get the response more than once. The final
153+ // `?` is to raise the actual error if there is one.
154+ let res = self . 0 . get ( ) . unwrap ( ) . unwrap ( ) ?;
155+ try_from_incoming ( res)
156+ }
157+ }
158+
79159#[ derive( Default , Debug ) ]
80160struct RequestOptions {
81161 connect_timeout : Option < Duration > ,
0 commit comments