File tree Expand file tree Collapse file tree
Expand file tree Collapse file tree Original file line number Diff line number Diff line change @@ -17,10 +17,8 @@ async fn main() -> Result<(), Box<dyn Error>> {
1717 . ok_or_else ( || "response expected to have Content-Type header" ) ?;
1818 assert_eq ! ( content_type, "application/json; charset=utf-8" ) ;
1919
20- // Would much prefer read_to_end here:
21- let mut body_buf = vec ! [ 0 ; 4096 ] ;
22- let body_len = response. body ( ) . read ( & mut body_buf) . await ?;
23- body_buf. truncate ( body_len) ;
20+ let mut body_buf = Vec :: new ( ) ;
21+ let _body_len = response. body ( ) . read_to_end ( & mut body_buf) . await ?;
2422
2523 let val: serde_json:: Value = serde_json:: from_slice ( & body_buf) ?;
2624 let body_url = val
Original file line number Diff line number Diff line change @@ -101,8 +101,8 @@ pub struct IncomingBody {
101101 // How many bytes have we already read from the buf?
102102 buf_offset : usize ,
103103
104- // IMPORTANT: the order of these fields here matters. `incoming_body ` must
105- // be dropped before `body_stream `.
104+ // IMPORTANT: the order of these fields here matters. `body_stream ` must
105+ // be dropped before `_incoming_body `.
106106 body_stream : InputStream ,
107107 _incoming_body : WasiIncomingBody ,
108108}
@@ -117,12 +117,16 @@ impl AsyncRead for IncomingBody {
117117 Reactor :: current ( ) . wait_for ( pollable) . await ;
118118
119119 // Read the bytes from the body stream
120- let buf = self . body_stream . read ( CHUNK_SIZE ) . map_err ( |err| match err {
121- StreamError :: LastOperationFailed ( err) => {
122- std:: io:: Error :: other ( format ! ( "{}" , err. to_debug_string( ) ) )
120+ let buf = match self . body_stream . read ( CHUNK_SIZE ) {
121+ Ok ( buf) => buf,
122+ Err ( StreamError :: Closed ) => return Ok ( 0 ) ,
123+ Err ( StreamError :: LastOperationFailed ( err) ) => {
124+ return Err ( std:: io:: Error :: other ( format ! (
125+ "last operation failed: {}" ,
126+ err. to_debug_string( )
127+ ) ) )
123128 }
124- StreamError :: Closed => std:: io:: Error :: other ( "Connection closed" ) ,
125- } ) ?;
129+ } ;
126130 self . buf . insert ( buf)
127131 }
128132 } ;
Original file line number Diff line number Diff line change 1212 if bytes_read == 0 {
1313 break ' read Ok ( ( ) ) ;
1414 }
15- let mut slice = & buf[ 0 ..bytes_read] ;
16-
17- ' write: loop {
18- let bytes_written = writer. write ( slice) . await ?;
19- slice = & slice[ bytes_written..] ;
20- if slice. is_empty ( ) {
21- break ' write;
22- }
23- }
15+ writer. write_all ( & buf[ 0 ..bytes_read] ) . await ?;
2416 }
2517}
Original file line number Diff line number Diff line change 11use crate :: io;
22
3+ const CHUNK_SIZE : usize = 2048 ;
4+
35/// Read bytes from a source.
46pub trait AsyncRead {
57 async fn read ( & mut self , buf : & mut [ u8 ] ) -> io:: Result < usize > ;
8+ async fn read_to_end ( & mut self , buf : & mut Vec < u8 > ) -> io:: Result < usize > {
9+ // total bytes written to buf
10+ let mut n = 0 ;
11+
12+ loop {
13+ // grow buf if empty
14+ if buf. len ( ) == n {
15+ buf. resize ( n + CHUNK_SIZE , 0u8 ) ;
16+ }
17+
18+ let len = self . read ( & mut buf[ n..] ) . await ?;
19+ if len == 0 {
20+ buf. truncate ( n) ;
21+ return Ok ( n) ;
22+ }
23+
24+ n += len;
25+ }
26+ }
627}
Original file line number Diff line number Diff line change @@ -5,4 +5,15 @@ pub trait AsyncWrite {
55 // Required methods
66 async fn write ( & mut self , buf : & [ u8 ] ) -> io:: Result < usize > ;
77 async fn flush ( & mut self ) -> io:: Result < ( ) > ;
8+
9+ async fn write_all ( & mut self , buf : & [ u8 ] ) -> io:: Result < ( ) > {
10+ let mut to_write = & buf[ 0 ..] ;
11+ loop {
12+ let bytes_written = self . write ( to_write) . await ?;
13+ to_write = & to_write[ bytes_written..] ;
14+ if to_write. is_empty ( ) {
15+ return Ok ( ( ) ) ;
16+ }
17+ }
18+ }
819}
Original file line number Diff line number Diff line change @@ -31,7 +31,11 @@ impl TcpStream {
3131impl AsyncRead for TcpStream {
3232 async fn read ( & mut self , buf : & mut [ u8 ] ) -> io:: Result < usize > {
3333 Reactor :: current ( ) . wait_for ( self . input . subscribe ( ) ) . await ;
34- let slice = self . input . read ( buf. len ( ) as u64 ) . map_err ( to_io_err) ?;
34+ let slice = match self . input . read ( buf. len ( ) as u64 ) {
35+ Ok ( slice) => slice,
36+ Err ( StreamError :: Closed ) => return Ok ( 0 ) ,
37+ Err ( e) => return Err ( to_io_err ( e) ) ,
38+ } ;
3539 let bytes_read = slice. len ( ) ;
3640 buf[ ..bytes_read] . clone_from_slice ( & slice) ;
3741 Ok ( bytes_read)
@@ -41,7 +45,11 @@ impl AsyncRead for TcpStream {
4145impl AsyncRead for & TcpStream {
4246 async fn read ( & mut self , buf : & mut [ u8 ] ) -> io:: Result < usize > {
4347 Reactor :: current ( ) . wait_for ( self . input . subscribe ( ) ) . await ;
44- let slice = self . input . read ( buf. len ( ) as u64 ) . map_err ( to_io_err) ?;
48+ let slice = match self . input . read ( buf. len ( ) as u64 ) {
49+ Ok ( slice) => slice,
50+ Err ( StreamError :: Closed ) => return Ok ( 0 ) ,
51+ Err ( e) => return Err ( to_io_err ( e) ) ,
52+ } ;
4553 let bytes_read = slice. len ( ) ;
4654 buf[ ..bytes_read] . clone_from_slice ( & slice) ;
4755 Ok ( bytes_read)
You can’t perform that action at this time.
0 commit comments