Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for constructing sdk body types from http-body 1.0 #3300

Merged
merged 5 commits into from
Dec 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions CHANGELOG.next.toml
Original file line number Diff line number Diff line change
Expand Up @@ -97,3 +97,15 @@ message = "[`Number`](https://docs.rs/aws-smithy-types/latest/aws_smithy_types/e
references = ["smithy-rs#3294"]
meta = { "breaking" = false, "tada" = false, "bug" = false, "target" = "all" }
author = "rcoh"

[[smithy-rs]]
message = "Add support for constructing [`SdkBody`] and [`ByteStream`] from `http-body` 1.0 bodies. Note that this is initial support and works via a backwards compatibility shim to http-body 0.4. Hyper 1.0 is not supported."
references = ["smithy-rs#3300", "aws-sdk-rust#977"]
meta = { "breaking" = false, "tada" = true, "bug" = false, "target" = "all" }
author = "rcoh"

[[aws-sdk-rust]]
message = "Add support for constructing [`SdkBody`] and [`ByteStream`] from `http-body` 1.0 bodies. Note that this is initial support and works via a backwards compatibility shim to http-body 0.4. Hyper 1.0 is not supported."
references = ["smithy-rs#3300", "aws-sdk-rust#977"]
meta = { "breaking" = false, "tada" = true, "bug" = false }
author = "rcoh"
4 changes: 4 additions & 0 deletions rust-runtime/aws-smithy-types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ repository = "https://github.com/smithy-lang/smithy-rs"
[features]
byte-stream-poll-next = []
http-body-0-4-x = ["dep:http-body-0-4"]
http-body-1-x = ["dep:http-body-1-0", "dep:http-body-util", "dep:http-body-0-4", "dep:http-1x"]
hyper-0-14-x = ["dep:hyper-0-14"]
rt-tokio = [
"dep:http-body-0-4",
Expand All @@ -32,7 +33,10 @@ base64-simd = "0.8"
bytes = "1"
bytes-utils = "0.1"
http = "0.2.3"
http-1x = { package = "http", version = "1", optional = true }
http-body-0-4 = { package = "http-body", version = "0.4.4", optional = true }
http-body-1-0 = { package = "http-body", version = "1", optional = true }
http-body-util = { version = "0.1.0", optional = true }
hyper-0-14 = { package = "hyper", version = "0.14.26", optional = true }
itoa = "1.0.0"
num-integer = "0.1.44"
Expand Down
3 changes: 3 additions & 0 deletions rust-runtime/aws-smithy-types/additional-ci
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,6 @@ cargo tree -d --edges normal --all-features

echo "### Checking whether the features are properly feature-gated"
! cargo tree -e no-dev | grep serde

echo "### Checking feature powerset"
cargo hack check --feature-powerset --exclude-all-features
31 changes: 30 additions & 1 deletion rust-runtime/aws-smithy-types/src/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ use std::task::{Context, Poll};
/// The name has a suffix `_x` to avoid name collision with a third-party `http-body-0-4`.
#[cfg(feature = "http-body-0-4-x")]
pub mod http_body_0_4_x;
#[cfg(feature = "http-body-1-x")]
pub mod http_body_1_x;

/// A generic, boxed error that's `Send` and `Sync`
pub type Error = Box<dyn StdError + Send + Sync>;
Expand Down Expand Up @@ -55,7 +57,13 @@ impl Debug for SdkBody {

/// A boxed generic HTTP body that, when consumed, will result in [`Bytes`] or an [`Error`].
enum BoxBody {
#[cfg(feature = "http-body-0-4-x")]
// This is enabled by the **dependency**, not the feature. This allows us to construct it
// whenever we have the dependency and keep the APIs private
#[cfg(any(
feature = "http-body-0-4-x",
feature = "http-body-1-x",
feature = "rt-tokio"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does rt-tokio require this?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rt-tokio is what enables PathBody, path body uses it

))]
HttpBody04(http_body_0_4::combinators::BoxBody<Bytes, Error>),
}

Expand Down Expand Up @@ -162,6 +170,27 @@ impl SdkBody {
}
}

#[cfg(any(
feature = "http-body-0-4-x",
feature = "http-body-1-x",
feature = "rt-tokio"
))]
pub(crate) fn from_body_0_4_internal<T, E>(body: T) -> Self
where
T: http_body_0_4::Body<Data = Bytes, Error = E> + Send + Sync + 'static,
E: Into<Error> + 'static,
{
Self {
inner: Inner::Dyn {
inner: BoxBody::HttpBody04(http_body_0_4::combinators::BoxBody::new(
body.map_err(Into::into),
)),
},
rebuild: None,
bytes_contents: None,
}
}

#[cfg(feature = "http-body-0-4-x")]
pub(crate) fn poll_next_trailers(
self: Pin<&mut Self>,
Expand Down
16 changes: 5 additions & 11 deletions rust-runtime/aws-smithy-types/src/body/http_body_0_4_x.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@
* SPDX-License-Identifier: Apache-2.0
*/

use crate::body::{BoxBody, Error, Inner, SdkBody};
use bytes::Bytes;
use std::pin::Pin;
use std::task::{Context, Poll};

use bytes::Bytes;

use crate::body::{Error, SdkBody};

impl SdkBody {
/// Construct an `SdkBody` from a type that implements [`http_body_0_4::Body<Data = Bytes>`](http_body_0_4::Body).
///
Expand All @@ -17,15 +19,7 @@ impl SdkBody {
T: http_body_0_4::Body<Data = Bytes, Error = E> + Send + Sync + 'static,
E: Into<Error> + 'static,
{
Self {
inner: Inner::Dyn {
inner: BoxBody::HttpBody04(http_body_0_4::combinators::BoxBody::new(
body.map_err(Into::into),
)),
},
rebuild: None,
bytes_contents: None,
}
SdkBody::from_body_0_4_internal(body)
}
}

Expand Down
267 changes: 267 additions & 0 deletions rust-runtime/aws-smithy-types/src/body/http_body_1_x.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,267 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/

//! Adapters to use http-body 1.0 bodies with SdkBody & ByteStream

use std::pin::Pin;
use std::task::{ready, Context, Poll};

use bytes::Bytes;
use http_body_util::BodyExt;
use pin_project_lite::pin_project;

use crate::body::{Error, SdkBody};

impl SdkBody {
/// Construct an `SdkBody` from a type that implements [`http_body_1_0::Body<Data = Bytes>`](http_body_1_0::Body).
pub fn from_body_1_x<T, E>(body: T) -> Self
where
T: http_body_1_0::Body<Data = Bytes, Error = E> + Send + Sync + 'static,
E: Into<Error> + 'static,
{
SdkBody::from_body_0_4_internal(Http1toHttp04::new(body.map_err(Into::into)))
}
}

pin_project! {
struct Http1toHttp04<B> {
#[pin]
inner: B,
trailers: Option<http_1x::HeaderMap>,
}
}

impl<B> Http1toHttp04<B> {
fn new(inner: B) -> Self {
Self {
inner,
trailers: None,
}
}
}

impl<B> http_body_0_4::Body for Http1toHttp04<B>
where
B: http_body_1_0::Body,
{
type Data = B::Data;
type Error = B::Error;

fn poll_data(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
loop {
let this = self.as_mut().project();
match ready!(this.inner.poll_frame(cx)) {
Some(Ok(frame)) => {
let frame = match frame.into_data() {
Ok(data) => return Poll::Ready(Some(Ok(data))),
Err(frame) => frame,
};
// when we get a trailers frame, store the trailers for the next poll
if let Ok(trailers) = frame.into_trailers() {
this.trailers.replace(trailers);
return Poll::Ready(None);
};
// if the frame type was unknown, discard it. the next one might be something
// useful
}
Some(Err(e)) => return Poll::Ready(Some(Err(e))),
None => return Poll::Ready(None),
}
}
}

fn poll_trailers(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Result<Option<http::HeaderMap>, Self::Error>> {
// all of the polling happens in poll_data, once we get to the trailers we've actually
// already read everything
let this = self.project();
match this.trailers.take() {
Some(headers) => Poll::Ready(Ok(Some(convert_header_map(headers)))),
None => Poll::Ready(Ok(None)),
}
}

fn is_end_stream(&self) -> bool {
self.inner.is_end_stream()
}

fn size_hint(&self) -> http_body_0_4::SizeHint {
let mut size_hint = http_body_0_4::SizeHint::new();
let inner_hint = self.inner.size_hint();
if let Some(exact) = inner_hint.exact() {
size_hint.set_exact(exact);
} else {
size_hint.set_lower(inner_hint.lower());
if let Some(upper) = inner_hint.upper() {
size_hint.set_upper(upper);
}
}
size_hint
}
}

fn convert_header_map(input: http_1x::HeaderMap) -> http::HeaderMap {
let mut map = http::HeaderMap::with_capacity(input.capacity());
let mut mem: Option<http_1x::HeaderName> = None;
for (k, v) in input.into_iter() {
let name = k.or_else(|| mem.clone()).unwrap();
map.append(
http::HeaderName::from_bytes(name.as_str().as_bytes()).expect("already validated"),
http::HeaderValue::from_bytes(v.as_bytes()).expect("already validated"),
);
mem = Some(name);
}
map
}

#[cfg(test)]
mod test {
use std::collections::VecDeque;
use std::pin::Pin;
use std::task::{Context, Poll};

use bytes::Bytes;
use http::header::{CONTENT_LENGTH as CL0, CONTENT_TYPE as CT0};
use http_1x::header::{CONTENT_LENGTH as CL1, CONTENT_TYPE as CT1};
use http_1x::{HeaderMap, HeaderName, HeaderValue};
use http_body_1_0::Frame;

use crate::body::http_body_1_x::convert_header_map;
use crate::body::{Error, SdkBody};
use crate::byte_stream::ByteStream;

struct TestBody {
chunks: VecDeque<Chunk>,
}

enum Chunk {
Data(&'static str),
Error(&'static str),
Trailers(HeaderMap),
}

impl http_body_1_0::Body for TestBody {
type Data = Bytes;
type Error = Error;

fn poll_frame(
mut self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
let next = self.chunks.pop_front();
let mk = |v: Frame<Bytes>| Poll::Ready(Some(Ok(v)));

match next {
Some(Chunk::Data(s)) => mk(Frame::data(Bytes::from_static(s.as_bytes()))),
Some(Chunk::Trailers(headers)) => mk(Frame::trailers(headers)),
Some(Chunk::Error(err)) => Poll::Ready(Some(Err(err.into()))),
None => Poll::Ready(None),
}
}
}

fn trailers() -> HeaderMap {
let mut map = HeaderMap::new();
map.insert(
HeaderName::from_static("x-test"),
HeaderValue::from_static("x-test-value"),
);
map.append(
HeaderName::from_static("x-test"),
HeaderValue::from_static("x-test-value-2"),
);
map.append(
HeaderName::from_static("y-test"),
HeaderValue::from_static("y-test-value-2"),
);
map
}

#[tokio::test]
async fn test_body_with_trailers() {
let body = TestBody {
chunks: vec![
Chunk::Data("123"),
Chunk::Data("456"),
Chunk::Data("789"),
Chunk::Trailers(trailers()),
]
.into(),
};
let body = SdkBody::from_body_1_x(body);
let data = ByteStream::new(body);
assert_eq!(data.collect().await.unwrap().to_vec(), b"123456789");
}

#[tokio::test]
async fn test_read_trailers() {
let body = TestBody {
chunks: vec![
Chunk::Data("123"),
Chunk::Data("456"),
Chunk::Data("789"),
Chunk::Trailers(trailers()),
]
.into(),
};
let mut body = SdkBody::from_body_1_x(body);
while let Some(_data) = http_body_0_4::Body::data(&mut body).await {}
assert_eq!(
http_body_0_4::Body::trailers(&mut body).await.unwrap(),
Some(convert_header_map(trailers()))
);
}

#[tokio::test]
async fn test_errors() {
let body = TestBody {
chunks: vec![
Chunk::Data("123"),
Chunk::Data("456"),
Chunk::Data("789"),
Chunk::Error("errors!"),
]
.into(),
};

let body = SdkBody::from_body_1_x(body);
let body = ByteStream::new(body);
body.collect().await.expect_err("body returned an error");
}
#[tokio::test]
async fn test_no_trailers() {
let body = TestBody {
chunks: vec![Chunk::Data("123"), Chunk::Data("456"), Chunk::Data("789")].into(),
};

let body = SdkBody::from_body_1_x(body);
let body = ByteStream::new(body);
assert_eq!(body.collect().await.unwrap().to_vec(), b"123456789");
}

#[test]
fn test_convert_headers() {
let mut http1_headermap = http_1x::HeaderMap::new();
http1_headermap.append(CT1, HeaderValue::from_static("a"));
http1_headermap.append(CT1, HeaderValue::from_static("b"));
http1_headermap.append(CT1, HeaderValue::from_static("c"));

http1_headermap.insert(CL1, HeaderValue::from_static("1234"));

let mut expect = http::HeaderMap::new();
expect.append(CT0, http::HeaderValue::from_static("a"));
expect.append(CT0, http::HeaderValue::from_static("b"));
expect.append(CT0, http::HeaderValue::from_static("c"));

expect.insert(CL0, http::HeaderValue::from_static("1234"));

assert_eq!(convert_header_map(http1_headermap), expect);
}
}
Loading