From c13a17dcf9f2908c347f320b68ff046729843eb4 Mon Sep 17 00:00:00 2001 From: Tom Fay Date: Tue, 1 Dec 2020 20:05:19 +0000 Subject: [PATCH 1/3] add fn for stream post that returns json values use when pulling docker images, to fix bug where shiplift would error if multiple JSON values were returned in a single HTTP chunk --- src/lib.rs | 51 ++++++++++++++++++++++++++------------------------- 1 file changed, 26 insertions(+), 25 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index c810ea50..4dbc3e1c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -164,25 +164,12 @@ impl<'a> Images<'a> { tarball::dir(&mut bytes, &opts.path[..])?; - let chunk_stream = self.docker.stream_post( + let value_stream = self.docker.stream_post_into_values( path.join("?"), Some((Body::from(bytes), tar())), None::>, ); - let value_stream = chunk_stream - .and_then(|chunk| async move { - let stream = futures_util::stream::iter( - serde_json::Deserializer::from_slice(&chunk) - .into_iter() - .collect::>(), - ) - .map_err(Error::from); - - Ok(stream) - }) - .try_flatten(); - Ok(value_stream) } .try_flatten_stream(), @@ -237,11 +224,7 @@ impl<'a> Images<'a> { Box::pin( self.docker - .stream_post(path.join("?"), None, headers) - .and_then(move |chunk| { - // todo: give this a proper enum type - futures_util::future::ready(serde_json::from_slice(&chunk).map_err(Error::from)) - }), + .stream_post_into_values(path.join("?"), None, headers), ) } @@ -272,16 +255,11 @@ impl<'a> Images<'a> { tarball.read_to_end(&mut bytes)?; - let chunk_stream = self.docker.stream_post( + let value_stream = self.docker.stream_post_into_values( "/images/load", Some((Body::from(bytes), tar())), None::>, ); - - let value_stream = chunk_stream.and_then(|chunk| async move { - serde_json::from_slice(&chunk).map_err(Error::from) - }); - Ok(value_stream) } .try_flatten_stream(), @@ -1166,6 +1144,29 @@ impl Docker { .stream_chunks(Method::POST, endpoint, body, headers) } + fn stream_post_into_values<'a, H>( + &'a self, + endpoint: impl AsRef + 'a, + body: Option<(Body, Mime)>, + headers: Option, + ) -> impl Stream> + 'a + where + H: IntoIterator + 'a, + { + self.stream_post(endpoint, body, headers) + .and_then(|chunk| async move { + let stream = futures_util::stream::iter( + serde_json::Deserializer::from_slice(&chunk) + .into_iter() + .collect::>(), + ) + .map_err(Error::from); + + Ok(stream) + }) + .try_flatten() + } + fn stream_get<'a>( &'a self, endpoint: impl AsRef + Unpin + 'a, From 0fd09fbd7321ad757a547fe583e0b1d942b97f99 Mon Sep 17 00:00:00 2001 From: Tom Fay Date: Wed, 2 Dec 2020 08:42:19 +0000 Subject: [PATCH 2/3] fix unnecessary lazy evaluation --- src/transport.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/transport.rs b/src/transport.rs index 5a48f629..3ad3b9e6 100644 --- a/src/transport.rs +++ b/src/transport.rs @@ -122,7 +122,7 @@ impl Transport { message: Self::get_error_message(&message_body).unwrap_or_else(|| { status .canonical_reason() - .unwrap_or_else(|| "unknown error code") + .unwrap_or("unknown error code") .to_owned() }), }) From 25b26df0fb778ffc1481a7088d0b6fbb9ad6770b Mon Sep 17 00:00:00 2001 From: Tom Fay Date: Wed, 2 Dec 2020 16:50:17 +0000 Subject: [PATCH 3/3] add comments to stream post requests --- src/lib.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/lib.rs b/src/lib.rs index 4dbc3e1c..5ca8a82f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1131,6 +1131,9 @@ impl Docker { Ok(serde_json::from_str::(&string)?) } + /// Send a streaming post request. + /// + /// Use stream_post_into_values if the endpoint returns JSON values fn stream_post<'a, H>( &'a self, endpoint: impl AsRef + 'a, @@ -1144,6 +1147,9 @@ impl Docker { .stream_chunks(Method::POST, endpoint, body, headers) } + /// Send a streaming post request that returns a stream of JSON values + /// + /// Assumes that each received chunk contains one or more JSON values fn stream_post_into_values<'a, H>( &'a self, endpoint: impl AsRef + 'a,