Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Chunks #2

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 32 additions & 25 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,25 +164,12 @@ impl<'a> Images<'a> {

tarball::dir(&mut bytes, &opts.path[..])?;

let chunk_stream = self.docker.stream_post(
let value_stream = self.docker.stream_post_into_values(
path.join("?"),
Some((Body::from(bytes), tar())),
None::<iter::Empty<_>>,
);

let value_stream = chunk_stream
.and_then(|chunk| async move {
let stream = futures_util::stream::iter(
serde_json::Deserializer::from_slice(&chunk)
.into_iter()
.collect::<Vec<_>>(),
)
.map_err(Error::from);

Ok(stream)
})
.try_flatten();

Ok(value_stream)
}
.try_flatten_stream(),
Expand Down Expand Up @@ -237,11 +224,7 @@ impl<'a> Images<'a> {

Box::pin(
self.docker
.stream_post(path.join("?"), None, headers)
.and_then(move |chunk| {
// todo: give this a proper enum type
futures_util::future::ready(serde_json::from_slice(&chunk).map_err(Error::from))
}),
.stream_post_into_values(path.join("?"), None, headers),
)
}

Expand Down Expand Up @@ -272,16 +255,11 @@ impl<'a> Images<'a> {

tarball.read_to_end(&mut bytes)?;

let chunk_stream = self.docker.stream_post(
let value_stream = self.docker.stream_post_into_values(
"/images/load",
Some((Body::from(bytes), tar())),
None::<iter::Empty<_>>,
);

let value_stream = chunk_stream.and_then(|chunk| async move {
serde_json::from_slice(&chunk).map_err(Error::from)
});

Ok(value_stream)
}
.try_flatten_stream(),
Expand Down Expand Up @@ -1153,6 +1131,9 @@ impl Docker {
Ok(serde_json::from_str::<T>(&string)?)
}

/// Send a streaming post request.
///
/// Use stream_post_into_values if the endpoint returns JSON values
fn stream_post<'a, H>(
&'a self,
endpoint: impl AsRef<str> + 'a,
Expand All @@ -1166,6 +1147,32 @@ impl Docker {
.stream_chunks(Method::POST, endpoint, body, headers)
}

/// Send a streaming post request that returns a stream of JSON values
///
/// Assumes that each received chunk contains one or more JSON values
fn stream_post_into_values<'a, H>(
tofay marked this conversation as resolved.
Show resolved Hide resolved
&'a self,
endpoint: impl AsRef<str> + 'a,
body: Option<(Body, Mime)>,
headers: Option<H>,
) -> impl Stream<Item = Result<Value>> + 'a
where
H: IntoIterator<Item = (&'static str, String)> + 'a,
{
self.stream_post(endpoint, body, headers)
tofay marked this conversation as resolved.
Show resolved Hide resolved
.and_then(|chunk| async move {
let stream = futures_util::stream::iter(
serde_json::Deserializer::from_slice(&chunk)
.into_iter()
.collect::<Vec<_>>(),
)
bossmc marked this conversation as resolved.
Show resolved Hide resolved
.map_err(Error::from);

Ok(stream)
})
.try_flatten()
}

fn stream_get<'a>(
&'a self,
endpoint: impl AsRef<str> + Unpin + 'a,
Expand Down
2 changes: 1 addition & 1 deletion src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ impl Transport {
message: Self::get_error_message(&message_body).unwrap_or_else(|| {
status
.canonical_reason()
.unwrap_or_else(|| "unknown error code")
.unwrap_or("unknown error code")
.to_owned()
}),
})
Expand Down