Skip to content

Commit

Permalink
Add support for constructing sdk body types from http-body 1.0
Browse files Browse the repository at this point in the history
  • Loading branch information
rcoh committed Dec 8, 2023
1 parent b09b02f commit 0dfc603
Show file tree
Hide file tree
Showing 6 changed files with 332 additions and 1 deletion.
12 changes: 12 additions & 0 deletions CHANGELOG.next.toml
Original file line number Diff line number Diff line change
Expand Up @@ -85,3 +85,15 @@ message = "[`Number`](https://docs.rs/aws-smithy-types/latest/aws_smithy_types/e
references = ["smithy-rs#3294"]
meta = { "breaking" = false, "tada" = false, "bug" = false, "target" = "all" }
author = "rcoh"

[[smithy-rs]]
message = "Add support for construction [`SdkBody`] and [`ByteStream`] from `http-body` 1.0 bodies. Note that this is initial support and works via a backwards compatibility shim to http-body 0.4. Hyper 1.0 is not supported."
references = ["smithy-rs#3300", "aws-sdk-rust#977"]
meta = { "breaking" = false, "tada" = true, "bug" = false, "target" = "all" }
author = "rcoh"

[[aws-sdk-rust]]
message = "Add support for construction [`SdkBody`] and [`ByteStream`] from `http-body` 1.0 bodies. Note that this is initial support and works via a backwards compatibility shim to http-body 0.4. Hyper 1.0 is not supported."
references = ["smithy-rs#3300", "aws-sdk-rust#977"]
meta = { "breaking" = false, "tada" = true, "bug" = false }
author = "rcoh"
6 changes: 5 additions & 1 deletion rust-runtime/aws-smithy-types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@ repository = "https://github.com/smithy-lang/smithy-rs"
[features]
byte-stream-poll-next = []
http-body-0-4-x = ["dep:http-body-0-4"]
http-body-1-x = ["dep:http-body-1-0", "dep:http-body-util", "http-body-0-4-x"]
hyper-0-14-x = ["dep:hyper-0-14"]
rt-tokio = [
"dep:http-body-0-4",
"http-body-0-4-x",
"dep:tokio-util",
"dep:tokio",
"tokio?/rt",
Expand All @@ -32,7 +33,10 @@ base64-simd = "0.8"
bytes = "1"
bytes-utils = "0.1"
http = "0.2.3"
http_1x = { package = "http", version = "1" }
http-body-0-4 = { package = "http-body", version = "0.4.4", optional = true }
http-body-1-0 = { package = "http-body", version = "1", optional = true }
http-body-util = { version = "0.1.0", optional = true }
hyper-0-14 = { package = "hyper", version = "0.14.26", optional = true }
itoa = "1.0.0"
num-integer = "0.1.44"
Expand Down
2 changes: 2 additions & 0 deletions rust-runtime/aws-smithy-types/src/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ use std::task::{Context, Poll};
/// The name has a suffix `_x` to avoid name collision with a third-party `http-body-0-4`.
#[cfg(feature = "http-body-0-4-x")]
pub mod http_body_0_4_x;
#[cfg(feature = "http-body-1-x")]
pub mod http_body_1_x;

/// A generic, boxed error that's `Send` and `Sync`
pub type Error = Box<dyn StdError + Send + Sync>;
Expand Down
288 changes: 288 additions & 0 deletions rust-runtime/aws-smithy-types/src/body/http_body_1_x.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,288 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/

//! Adapters to use http-body 1.0 bodies with SdkBody & ByteStream
use std::pin::Pin;
use std::task::{ready, Context, Poll};

use bytes::Bytes;
use http_body_util::BodyExt;
use pin_project_lite::pin_project;

use crate::body::{BoxBody, Error, Inner, SdkBody};

impl SdkBody {
/// Construct an `SdkBody` from a type that implements [`http_body_1_0::Body<Data = Bytes>`](http_body_1_0::Body).
///
/// _Note: This is only available with `http-body-1-0` enabled._
pub fn from_body_1_0<T, E>(body: T) -> Self
where
T: http_body_1_0::Body<Data = Bytes, Error = E> + Send + Sync + 'static,
E: Into<Error> + 'static,
{
Self {
inner: Inner::Dyn {
inner: BoxBody::HttpBody04(http_body_0_4::combinators::BoxBody::new(
Http1toHttp04::new(body.map_err(Into::into)),
)),
},
rebuild: None,
bytes_contents: None,
}
}
}

pin_project! {
struct Http1toHttp04<B> {
#[pin]
inner: B,
trailers: Option<http_1x::HeaderMap>,
}
}

impl<B> Http1toHttp04<B> {
fn new(inner: B) -> Self {
Self {
inner,
trailers: None,
}
}
}

impl<B> http_body_0_4::Body for Http1toHttp04<B>
where
B: http_body_1_0::Body,
{
type Data = B::Data;
type Error = B::Error;

///
///
/// # Arguments
///
/// * `cx`:
///
/// returns: Poll<Option<Result<<Http1toHttp04<B> as Body>::Data, <Http1toHttp04<B> as Body>::Error>>>
///
/// # Examples
///
/// ```
///
/// ```
fn poll_data(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
loop {
let this = self.as_mut().project();
match ready!(this.inner.poll_frame(cx)) {
Some(Ok(frame)) => {
let frame = match frame.into_data() {
Ok(data) => return Poll::Ready(Some(Ok(data))),
Err(frame) => frame,
};
// when we get a trailers frame, store the trailers for the next poll
if let Ok(trailers) = frame.into_trailers() {
this.trailers.replace(trailers);
return Poll::Ready(None);
};
// if the frame type was unknown, discard it. the next one might be something
// useful
}
Some(Err(e)) => return Poll::Ready(Some(Err(e))),
None => return Poll::Ready(None),
}
}
}

fn poll_trailers(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Result<Option<http::HeaderMap>, Self::Error>> {
// all of the polling happens in poll_data, once we get to the trailers we've actually
// already read everything
let this = self.project();
match this.trailers.take() {
Some(headers) => Poll::Ready(Ok(Some(convert_header_map(headers)))),
None => Poll::Ready(Ok(None)),
}
}

fn is_end_stream(&self) -> bool {
self.inner.is_end_stream()
}

fn size_hint(&self) -> http_body_0_4::SizeHint {
let mut size_hint = http_body_0_4::SizeHint::new();
let inner_hint = self.inner.size_hint();
if let Some(exact) = inner_hint.exact() {
size_hint.set_exact(exact);
} else {
size_hint.set_lower(inner_hint.lower());
if let Some(upper) = inner_hint.upper() {
size_hint.set_upper(upper);
}
}
size_hint
}
}

fn convert_header_map(input: http_1x::HeaderMap) -> http::HeaderMap {
let mut map = http::HeaderMap::with_capacity(input.capacity());
let mut mem: Option<http_1x::HeaderName> = None;
for (k, v) in input.into_iter() {
let name = k.or_else(|| mem.clone()).unwrap();
map.append(
http::HeaderName::from_bytes(name.as_str().as_bytes()).expect("already validated"),
http::HeaderValue::from_bytes(v.as_bytes()).expect("already validated"),
);
mem = Some(name);
}
map
}

#[cfg(test)]
mod test {
use crate::body::http_body_1_x::convert_header_map;
use crate::body::{Error, SdkBody};
use crate::byte_stream::ByteStream;
use bytes::Bytes;
use http::header::{CONTENT_LENGTH as CL0, CONTENT_TYPE as CT0};
use http_1x::header::{CONTENT_LENGTH as CL1, CONTENT_TYPE as CT1};
use http_1x::{HeaderMap, HeaderName, HeaderValue};
use http_body_1_0::Frame;
use std::collections::VecDeque;
use std::pin::Pin;
use std::task::{Context, Poll};

struct TestBody {
chunks: VecDeque<Chunk>,
}

enum Chunk {
Data(&'static str),
Error(&'static str),
Trailers(HeaderMap),
}

impl http_body_1_0::Body for TestBody {
type Data = Bytes;
type Error = Error;

fn poll_frame(
mut self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
let next = self.chunks.pop_front();
let mk = |v: Frame<Bytes>| Poll::Ready(Some(Ok(v)));

match next {
Some(Chunk::Data(s)) => mk(Frame::data(Bytes::from_static(s.as_bytes()))),
Some(Chunk::Trailers(headers)) => mk(Frame::trailers(headers)),
Some(Chunk::Error(err)) => Poll::Ready(Some(Err(err.into()))),
None => Poll::Ready(None),
}
}
}

fn trailers() -> HeaderMap {
let mut map = HeaderMap::new();
map.insert(
HeaderName::from_static("x-test"),
HeaderValue::from_static("x-test-value"),
);
map.append(
HeaderName::from_static("x-test"),
HeaderValue::from_static("x-test-value-2"),
);
map.append(
HeaderName::from_static("y-test"),
HeaderValue::from_static("y-test-value-2"),
);
map
}

#[tokio::test]
async fn test_body_with_trailers() {
let body = TestBody {
chunks: vec![
Chunk::Data("123"),
Chunk::Data("456"),
Chunk::Data("789"),
Chunk::Trailers(trailers()),
]
.into(),
};
let body = SdkBody::from_body_1_0(body);
let data = ByteStream::new(body);
assert_eq!(data.collect().await.unwrap().to_vec(), b"123456789");
}

#[tokio::test]
async fn test_read_trailers() {
let body = TestBody {
chunks: vec![
Chunk::Data("123"),
Chunk::Data("456"),
Chunk::Data("789"),
Chunk::Trailers(trailers()),
]
.into(),
};
let mut body = SdkBody::from_body_1_0(body);
while let Some(_data) = http_body_0_4::Body::data(&mut body).await {}
assert_eq!(
http_body_0_4::Body::trailers(&mut body).await.unwrap(),
Some(convert_header_map(trailers()))
);
}

#[tokio::test]
async fn test_errors() {
let body = TestBody {
chunks: vec![
Chunk::Data("123"),
Chunk::Data("456"),
Chunk::Data("789"),
Chunk::Error("errors!"),
]
.into(),
};

let body = SdkBody::from_body_1_0(body);
let body = ByteStream::new(body);
body.collect().await.expect_err("body returned an error");
}
#[tokio::test]
async fn test_no_trailers() {
let body = TestBody {
chunks: vec![Chunk::Data("123"), Chunk::Data("456"), Chunk::Data("789")].into(),
};

let body = SdkBody::from_body_1_0(body);
let body = ByteStream::new(body);
assert_eq!(body.collect().await.unwrap().to_vec(), b"123456789");
}

#[test]
fn test_convert_headers() {
let mut http1_headermap = http_1x::HeaderMap::new();
http1_headermap.append(CT1, HeaderValue::from_static("a"));
http1_headermap.append(CT1, HeaderValue::from_static("b"));
http1_headermap.append(CT1, HeaderValue::from_static("c"));

http1_headermap.insert(CL1, HeaderValue::from_static("1234"));

let mut expect = http::HeaderMap::new();
expect.append(CT0, http::HeaderValue::from_static("a"));
expect.append(CT0, http::HeaderValue::from_static("b"));
expect.append(CT0, http::HeaderValue::from_static("c"));

expect.insert(CL0, http::HeaderValue::from_static("1234"));

assert_eq!(convert_header_map(http1_headermap), expect);
}
}
2 changes: 2 additions & 0 deletions rust-runtime/aws-smithy-types/src/byte_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,8 @@ pub use self::bytestream_util::FsBuilder;
/// The name has a suffix `_x` to avoid name collision with a third-party `http-body-0-4`.
#[cfg(feature = "http-body-0-4-x")]
pub mod http_body_0_4_x;
#[cfg(feature = "http-body-1-x")]
pub mod http_body_1_x;

pin_project! {
/// Stream of binary data
Expand Down
23 changes: 23 additions & 0 deletions rust-runtime/aws-smithy-types/src/byte_stream/http_body_1_x.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/

//! Adapters to use http-body 1.0 bodies with SdkBody & ByteStream
use crate::body::SdkBody;
use crate::byte_stream::ByteStream;
use bytes::Bytes;

impl ByteStream {
/// Construct a `ByteStream` from a type that implements [`http_body_0_4::Body<Data = Bytes>`](http_body_0_4::Body).
///
/// _Note: This is only available with `http-body-0-4-x` enabled._
pub fn from_body_1_x<T, E>(body: T) -> Self
where
T: http_body_1_0::Body<Data = Bytes, Error = E> + Send + Sync + 'static,
E: Into<crate::body::Error> + 'static,
{
ByteStream::new(SdkBody::from_body_1_0(body))
}
}

0 comments on commit 0dfc603

Please sign in to comment.