From 0dfc6039245087d0472a8e5d958d4b1c705cd7dd Mon Sep 17 00:00:00 2001 From: Russell Cohen Date: Fri, 8 Dec 2023 16:09:45 -0500 Subject: [PATCH] Add support for constructing sdk body types from http-body 1.0 --- CHANGELOG.next.toml | 12 + rust-runtime/aws-smithy-types/Cargo.toml | 6 +- rust-runtime/aws-smithy-types/src/body.rs | 2 + .../src/body/http_body_1_x.rs | 288 ++++++++++++++++++ .../aws-smithy-types/src/byte_stream.rs | 2 + .../src/byte_stream/http_body_1_x.rs | 23 ++ 6 files changed, 332 insertions(+), 1 deletion(-) create mode 100644 rust-runtime/aws-smithy-types/src/body/http_body_1_x.rs create mode 100644 rust-runtime/aws-smithy-types/src/byte_stream/http_body_1_x.rs diff --git a/CHANGELOG.next.toml b/CHANGELOG.next.toml index e1702849183..a0f239ce4e5 100644 --- a/CHANGELOG.next.toml +++ b/CHANGELOG.next.toml @@ -85,3 +85,15 @@ message = "[`Number`](https://docs.rs/aws-smithy-types/latest/aws_smithy_types/e references = ["smithy-rs#3294"] meta = { "breaking" = false, "tada" = false, "bug" = false, "target" = "all" } author = "rcoh" + +[[smithy-rs]] +message = "Add support for construction [`SdkBody`] and [`ByteStream`] from `http-body` 1.0 bodies. Note that this is initial support and works via a backwards compatibility shim to http-body 0.4. Hyper 1.0 is not supported." +references = ["smithy-rs#3300", "aws-sdk-rust#977"] +meta = { "breaking" = false, "tada" = true, "bug" = false, "target" = "all" } +author = "rcoh" + +[[aws-sdk-rust]] +message = "Add support for construction [`SdkBody`] and [`ByteStream`] from `http-body` 1.0 bodies. Note that this is initial support and works via a backwards compatibility shim to http-body 0.4. Hyper 1.0 is not supported." +references = ["smithy-rs#3300", "aws-sdk-rust#977"] +meta = { "breaking" = false, "tada" = true, "bug" = false } +author = "rcoh" diff --git a/rust-runtime/aws-smithy-types/Cargo.toml b/rust-runtime/aws-smithy-types/Cargo.toml index a46f9924fa4..cbe5ddbf9ae 100644 --- a/rust-runtime/aws-smithy-types/Cargo.toml +++ b/rust-runtime/aws-smithy-types/Cargo.toml @@ -13,9 +13,10 @@ repository = "https://github.com/smithy-lang/smithy-rs" [features] byte-stream-poll-next = [] http-body-0-4-x = ["dep:http-body-0-4"] +http-body-1-x = ["dep:http-body-1-0", "dep:http-body-util", "http-body-0-4-x"] hyper-0-14-x = ["dep:hyper-0-14"] rt-tokio = [ - "dep:http-body-0-4", + "http-body-0-4-x", "dep:tokio-util", "dep:tokio", "tokio?/rt", @@ -32,7 +33,10 @@ base64-simd = "0.8" bytes = "1" bytes-utils = "0.1" http = "0.2.3" +http_1x = { package = "http", version = "1" } http-body-0-4 = { package = "http-body", version = "0.4.4", optional = true } +http-body-1-0 = { package = "http-body", version = "1", optional = true } +http-body-util = { version = "0.1.0", optional = true } hyper-0-14 = { package = "hyper", version = "0.14.26", optional = true } itoa = "1.0.0" num-integer = "0.1.44" diff --git a/rust-runtime/aws-smithy-types/src/body.rs b/rust-runtime/aws-smithy-types/src/body.rs index 33e5912a935..d604a7416b3 100644 --- a/rust-runtime/aws-smithy-types/src/body.rs +++ b/rust-runtime/aws-smithy-types/src/body.rs @@ -19,6 +19,8 @@ use std::task::{Context, Poll}; /// The name has a suffix `_x` to avoid name collision with a third-party `http-body-0-4`. #[cfg(feature = "http-body-0-4-x")] pub mod http_body_0_4_x; +#[cfg(feature = "http-body-1-x")] +pub mod http_body_1_x; /// A generic, boxed error that's `Send` and `Sync` pub type Error = Box; diff --git a/rust-runtime/aws-smithy-types/src/body/http_body_1_x.rs b/rust-runtime/aws-smithy-types/src/body/http_body_1_x.rs new file mode 100644 index 00000000000..8e81a14da9a --- /dev/null +++ b/rust-runtime/aws-smithy-types/src/body/http_body_1_x.rs @@ -0,0 +1,288 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +//! Adapters to use http-body 1.0 bodies with SdkBody & ByteStream + +use std::pin::Pin; +use std::task::{ready, Context, Poll}; + +use bytes::Bytes; +use http_body_util::BodyExt; +use pin_project_lite::pin_project; + +use crate::body::{BoxBody, Error, Inner, SdkBody}; + +impl SdkBody { + /// Construct an `SdkBody` from a type that implements [`http_body_1_0::Body`](http_body_1_0::Body). + /// + /// _Note: This is only available with `http-body-1-0` enabled._ + pub fn from_body_1_0(body: T) -> Self + where + T: http_body_1_0::Body + Send + Sync + 'static, + E: Into + 'static, + { + Self { + inner: Inner::Dyn { + inner: BoxBody::HttpBody04(http_body_0_4::combinators::BoxBody::new( + Http1toHttp04::new(body.map_err(Into::into)), + )), + }, + rebuild: None, + bytes_contents: None, + } + } +} + +pin_project! { + struct Http1toHttp04 { + #[pin] + inner: B, + trailers: Option, + } +} + +impl Http1toHttp04 { + fn new(inner: B) -> Self { + Self { + inner, + trailers: None, + } + } +} + +impl http_body_0_4::Body for Http1toHttp04 +where + B: http_body_1_0::Body, +{ + type Data = B::Data; + type Error = B::Error; + + /// + /// + /// # Arguments + /// + /// * `cx`: + /// + /// returns: Poll as Body>::Data, as Body>::Error>>> + /// + /// # Examples + /// + /// ``` + /// + /// ``` + fn poll_data( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll>> { + loop { + let this = self.as_mut().project(); + match ready!(this.inner.poll_frame(cx)) { + Some(Ok(frame)) => { + let frame = match frame.into_data() { + Ok(data) => return Poll::Ready(Some(Ok(data))), + Err(frame) => frame, + }; + // when we get a trailers frame, store the trailers for the next poll + if let Ok(trailers) = frame.into_trailers() { + this.trailers.replace(trailers); + return Poll::Ready(None); + }; + // if the frame type was unknown, discard it. the next one might be something + // useful + } + Some(Err(e)) => return Poll::Ready(Some(Err(e))), + None => return Poll::Ready(None), + } + } + } + + fn poll_trailers( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + ) -> Poll, Self::Error>> { + // all of the polling happens in poll_data, once we get to the trailers we've actually + // already read everything + let this = self.project(); + match this.trailers.take() { + Some(headers) => Poll::Ready(Ok(Some(convert_header_map(headers)))), + None => Poll::Ready(Ok(None)), + } + } + + fn is_end_stream(&self) -> bool { + self.inner.is_end_stream() + } + + fn size_hint(&self) -> http_body_0_4::SizeHint { + let mut size_hint = http_body_0_4::SizeHint::new(); + let inner_hint = self.inner.size_hint(); + if let Some(exact) = inner_hint.exact() { + size_hint.set_exact(exact); + } else { + size_hint.set_lower(inner_hint.lower()); + if let Some(upper) = inner_hint.upper() { + size_hint.set_upper(upper); + } + } + size_hint + } +} + +fn convert_header_map(input: http_1x::HeaderMap) -> http::HeaderMap { + let mut map = http::HeaderMap::with_capacity(input.capacity()); + let mut mem: Option = None; + for (k, v) in input.into_iter() { + let name = k.or_else(|| mem.clone()).unwrap(); + map.append( + http::HeaderName::from_bytes(name.as_str().as_bytes()).expect("already validated"), + http::HeaderValue::from_bytes(v.as_bytes()).expect("already validated"), + ); + mem = Some(name); + } + map +} + +#[cfg(test)] +mod test { + use crate::body::http_body_1_x::convert_header_map; + use crate::body::{Error, SdkBody}; + use crate::byte_stream::ByteStream; + use bytes::Bytes; + use http::header::{CONTENT_LENGTH as CL0, CONTENT_TYPE as CT0}; + use http_1x::header::{CONTENT_LENGTH as CL1, CONTENT_TYPE as CT1}; + use http_1x::{HeaderMap, HeaderName, HeaderValue}; + use http_body_1_0::Frame; + use std::collections::VecDeque; + use std::pin::Pin; + use std::task::{Context, Poll}; + + struct TestBody { + chunks: VecDeque, + } + + enum Chunk { + Data(&'static str), + Error(&'static str), + Trailers(HeaderMap), + } + + impl http_body_1_0::Body for TestBody { + type Data = Bytes; + type Error = Error; + + fn poll_frame( + mut self: Pin<&mut Self>, + _cx: &mut Context<'_>, + ) -> Poll, Self::Error>>> { + let next = self.chunks.pop_front(); + let mk = |v: Frame| Poll::Ready(Some(Ok(v))); + + match next { + Some(Chunk::Data(s)) => mk(Frame::data(Bytes::from_static(s.as_bytes()))), + Some(Chunk::Trailers(headers)) => mk(Frame::trailers(headers)), + Some(Chunk::Error(err)) => Poll::Ready(Some(Err(err.into()))), + None => Poll::Ready(None), + } + } + } + + fn trailers() -> HeaderMap { + let mut map = HeaderMap::new(); + map.insert( + HeaderName::from_static("x-test"), + HeaderValue::from_static("x-test-value"), + ); + map.append( + HeaderName::from_static("x-test"), + HeaderValue::from_static("x-test-value-2"), + ); + map.append( + HeaderName::from_static("y-test"), + HeaderValue::from_static("y-test-value-2"), + ); + map + } + + #[tokio::test] + async fn test_body_with_trailers() { + let body = TestBody { + chunks: vec![ + Chunk::Data("123"), + Chunk::Data("456"), + Chunk::Data("789"), + Chunk::Trailers(trailers()), + ] + .into(), + }; + let body = SdkBody::from_body_1_0(body); + let data = ByteStream::new(body); + assert_eq!(data.collect().await.unwrap().to_vec(), b"123456789"); + } + + #[tokio::test] + async fn test_read_trailers() { + let body = TestBody { + chunks: vec![ + Chunk::Data("123"), + Chunk::Data("456"), + Chunk::Data("789"), + Chunk::Trailers(trailers()), + ] + .into(), + }; + let mut body = SdkBody::from_body_1_0(body); + while let Some(_data) = http_body_0_4::Body::data(&mut body).await {} + assert_eq!( + http_body_0_4::Body::trailers(&mut body).await.unwrap(), + Some(convert_header_map(trailers())) + ); + } + + #[tokio::test] + async fn test_errors() { + let body = TestBody { + chunks: vec![ + Chunk::Data("123"), + Chunk::Data("456"), + Chunk::Data("789"), + Chunk::Error("errors!"), + ] + .into(), + }; + + let body = SdkBody::from_body_1_0(body); + let body = ByteStream::new(body); + body.collect().await.expect_err("body returned an error"); + } + #[tokio::test] + async fn test_no_trailers() { + let body = TestBody { + chunks: vec![Chunk::Data("123"), Chunk::Data("456"), Chunk::Data("789")].into(), + }; + + let body = SdkBody::from_body_1_0(body); + let body = ByteStream::new(body); + assert_eq!(body.collect().await.unwrap().to_vec(), b"123456789"); + } + + #[test] + fn test_convert_headers() { + let mut http1_headermap = http_1x::HeaderMap::new(); + http1_headermap.append(CT1, HeaderValue::from_static("a")); + http1_headermap.append(CT1, HeaderValue::from_static("b")); + http1_headermap.append(CT1, HeaderValue::from_static("c")); + + http1_headermap.insert(CL1, HeaderValue::from_static("1234")); + + let mut expect = http::HeaderMap::new(); + expect.append(CT0, http::HeaderValue::from_static("a")); + expect.append(CT0, http::HeaderValue::from_static("b")); + expect.append(CT0, http::HeaderValue::from_static("c")); + + expect.insert(CL0, http::HeaderValue::from_static("1234")); + + assert_eq!(convert_header_map(http1_headermap), expect); + } +} diff --git a/rust-runtime/aws-smithy-types/src/byte_stream.rs b/rust-runtime/aws-smithy-types/src/byte_stream.rs index 2721b1b6b21..3acb18b5625 100644 --- a/rust-runtime/aws-smithy-types/src/byte_stream.rs +++ b/rust-runtime/aws-smithy-types/src/byte_stream.rs @@ -148,6 +148,8 @@ pub use self::bytestream_util::FsBuilder; /// The name has a suffix `_x` to avoid name collision with a third-party `http-body-0-4`. #[cfg(feature = "http-body-0-4-x")] pub mod http_body_0_4_x; +#[cfg(feature = "http-body-1-x")] +pub mod http_body_1_x; pin_project! { /// Stream of binary data diff --git a/rust-runtime/aws-smithy-types/src/byte_stream/http_body_1_x.rs b/rust-runtime/aws-smithy-types/src/byte_stream/http_body_1_x.rs new file mode 100644 index 00000000000..803de6bf78a --- /dev/null +++ b/rust-runtime/aws-smithy-types/src/byte_stream/http_body_1_x.rs @@ -0,0 +1,23 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +//! Adapters to use http-body 1.0 bodies with SdkBody & ByteStream + +use crate::body::SdkBody; +use crate::byte_stream::ByteStream; +use bytes::Bytes; + +impl ByteStream { + /// Construct a `ByteStream` from a type that implements [`http_body_0_4::Body`](http_body_0_4::Body). + /// + /// _Note: This is only available with `http-body-0-4-x` enabled._ + pub fn from_body_1_x(body: T) -> Self + where + T: http_body_1_0::Body + Send + Sync + 'static, + E: Into + 'static, + { + ByteStream::new(SdkBody::from_body_1_0(body)) + } +}