Skip to content

Commit

Permalink
Move SdkBody and bytestream into aws-smithy-types (#3026)
Browse files Browse the repository at this point in the history
## Motivation and Context
Takes care of the first part of
#3053 (the remaining part is
denoted as `TODO(runtimeCratesVersioningCleanup)` and until that and
#3033 are done the issue will
not be closed).

## Description
This PR moves from `aws-smithy-http` to `aws-smithy-types`:
- the `SdkBody` struct
- the `byte_stream` module

Within the origin crate, we leave "breadcrumbs" (i.e. reexports) for
existing customers of the items above so they are not broken by the
move.

We have just moved `SdkBody` to `aws-smithy-types` without renaming it
to see how it looks there. However, as
[TODO(naming)](https://github.com/awslabs/smithy-rs/pull/3026/files#diff-090c503b779024fdadb8ac97465c80438635df82e62c42b0d85b588a303d9a95R28)
says, we can choose to rename it to just `Body`. Curious to hear what
the reviewers think.

## Testing
Relied on the tests in CI.

----

_By submitting this pull request, I confirm that you can use, modify,
copy, and redistribute this contribution, under the terms of your
choice._

---------

Co-authored-by: Russell Cohen <[email protected]>
  • Loading branch information
ysaito1001 and rcoh authored Oct 17, 2023
1 parent 3ab5a69 commit 0f38dae
Show file tree
Hide file tree
Showing 16 changed files with 88 additions and 58 deletions.
2 changes: 1 addition & 1 deletion aws/rust-runtime/aws-config/external-types.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ allowed_external_types = [
"aws_smithy_async::rt::sleep::SharedAsyncSleep",
"aws_smithy_async::time::SharedTimeSource",
"aws_smithy_async::time::TimeSource",
"aws_smithy_http::body::SdkBody",
"aws_smithy_types::body::SdkBody",
"aws_smithy_http::endpoint",
"aws_smithy_http::endpoint::error::InvalidEndpointError",
"aws_smithy_http::result::SdkError",
Expand Down
2 changes: 1 addition & 1 deletion aws/rust-runtime/aws-http/external-types.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
allowed_external_types = [
"aws_smithy_http::body::Error",
"aws_smithy_types::body::Error",
"aws_smithy_types::config_bag::storable::Storable",
"aws_smithy_types::config_bag::storable::StoreReplace",
"aws_types::app_name::AppName",
Expand Down
1 change: 1 addition & 0 deletions aws/rust-runtime/aws-sig-auth/external-types.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ allowed_external_types = [
"aws_sigv4::http_request::sign::SignableBody",
"aws_sigv4::http_request::error::SigningError",
"aws_smithy_http::*",
"aws_smithy_types::body::SdkBody",
"aws_types::*",
"http::request::Request",
"aws_smithy_runtime_api::client::identity::Identity",
Expand Down
2 changes: 1 addition & 1 deletion rust-runtime/aws-smithy-checksums/external-types.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
allowed_external_types = [
"aws_smithy_http::*",
"aws_smithy_types::body::SdkBody",
"bytes::bytes::Bytes",
"http::header::map::HeaderMap",
"http::header::name::HeaderName",
Expand Down
15 changes: 2 additions & 13 deletions rust-runtime/aws-smithy-http/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ license = "Apache-2.0"
repository = "https://github.com/awslabs/smithy-rs"

[features]
rt-tokio = ["dep:tokio-util", "dep:tokio", "tokio?/rt", "tokio?/fs", "tokio?/io-util", "tokio-util?/io"]
event-stream = ["aws-smithy-eventstream"]
rt-tokio = ["aws-smithy-types/rt-tokio"]

[dependencies]
aws-smithy-eventstream = { path = "../aws-smithy-eventstream", optional = true }
Expand All @@ -27,30 +27,19 @@ pin-project-lite = "0.2.9"
pin-utils = "0.1.0"
tracing = "0.1"

# We are using hyper for our streaming body implementation, but this is an internal detail.
hyper = "0.14.26"

# ByteStream internals
# For an adapter to enable the `Stream` trait for `aws_smithy_types::byte_stream::ByteStream`
futures-core = "0.3.14"
tokio = { version = "1.23.1", optional = true }
tokio-util = { version = "0.7", optional = true }

[dev-dependencies]
async-stream = "0.3"
futures-util = { version = "0.3.16", default-features = false }
hyper = { version = "0.14.26", features = ["stream"] }
pretty_assertions = "1.3"
proptest = "1"
tokio = { version = "1.23.1", features = [
"macros",
"rt",
"rt-multi-thread",
"fs",
"io-util",
] }
tokio-stream = "0.1.5"
tempfile = "3.2.0"
tracing-test = "0.2.1"

[package.metadata.docs.rs]
all-features = true
Expand Down
13 changes: 1 addition & 12 deletions rust-runtime/aws-smithy-http/external-types.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
allowed_external_types = [
"aws_smithy_types::*",
"bytes::buf::buf_impl::Buf",
"bytes::bytes::Bytes",
"http::error::Error",
"http::header::map::HeaderMap",
Expand All @@ -12,20 +11,10 @@ allowed_external_types = [
"http::response::Builder",
"http::response::Response",
"http::uri::Uri",
"http::version::Version",
"http_body::Body",
"http_body::combinators::box_body::BoxBody",
"hyper::body::body::Body",

# TODO(https://github.com/awslabs/smithy-rs/issues/1193): Feature gate Tokio `AsyncRead`
"tokio::io::async_read::AsyncRead",

# TODO(https://github.com/awslabs/smithy-rs/issues/1193): Switch to AsyncIterator once standardized
# TODO(https://github.com/awslabs/smithy-rs/issues/1193): Once tooling permits it, only allow the following types in the `event-stream` feature
"futures_core::stream::Stream",

# TODO(https://github.com/awslabs/smithy-rs/issues/1193): Feature gate references to Tokio `File`
"tokio::fs::file::File",

# TODO(https://github.com/awslabs/smithy-rs/issues/1193): Once tooling permits it, only allow the following types in the `event-stream` feature
"aws_smithy_eventstream::*",
]
2 changes: 1 addition & 1 deletion rust-runtime/aws-smithy-http/src/event_stream/receiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use aws_smithy_eventstream::frame::{
use bytes::Buf;
use bytes::Bytes;
use bytes_utils::SegmentedBuf;
use hyper::body::HttpBody;
use http_body::Body;
use std::error::Error as StdError;
use std::fmt;
use std::marker::PhantomData;
Expand Down
1 change: 1 addition & 0 deletions rust-runtime/aws-smithy-http/src/futures_stream_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::task::{Context, Poll};
/// new-type to enable the trait when it is required.
///
/// This is meant to be used by codegen code, and users should not need to use it directly.
#[derive(Debug)]
pub struct FuturesStreamCompatByteStream(ByteStream);

impl FuturesStreamCompatByteStream {
Expand Down
8 changes: 5 additions & 3 deletions rust-runtime/aws-smithy-http/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,11 @@
#![allow(clippy::derive_partial_eq_without_eq)]
#![cfg_attr(docsrs, feature(doc_cfg))]

pub mod body;
//TODO(runtimeCratesVersioningCleanup): Re-point those who use the following reexports to
// directly depend on `aws_smithy_types` and remove the reexports below.
pub use aws_smithy_types::body;
pub use aws_smithy_types::byte_stream;

pub mod endpoint;
// Marked as `doc(hidden)` because a type in the module is used both by this crate and by the code
// generator, but not by external users. Also, by the module being `doc(hidden)` instead of it being
Expand All @@ -45,7 +49,5 @@ pub mod result;
#[cfg(feature = "event-stream")]
pub mod event_stream;

pub mod byte_stream;

pub mod connection;
mod urlencode;
22 changes: 22 additions & 0 deletions rust-runtime/aws-smithy-types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,30 @@ license = "Apache-2.0"
repository = "https://github.com/awslabs/smithy-rs"

[features]
rt-tokio = ["dep:tokio-util", "dep:tokio", "tokio?/rt", "tokio?/fs", "tokio?/io-util", "tokio-util?/io"]
test-util = []
serde-serialize = []
serde-deserialize = []

[dependencies]
base64-simd = "0.8"
bytes = "1"
bytes-utils = "0.1"
http = "0.2.3"
http-body = "0.4.4"
hyper = "0.14.26"
itoa = "1.0.0"
num-integer = "0.1.44"
pin-project-lite = "0.2.9"
pin-utils = "0.1.0"
ryu = "1.0.5"
time = { version = "0.3.4", features = ["parsing"] }

# ByteStream internals
futures-core = "0.3.14"
tokio = { version = "1.23.1", optional = true }
tokio-util = { version = "0.7", optional = true }

[dev-dependencies]
base64 = "0.13.0"
ciborium = { version = "0.2.1" }
Expand All @@ -31,6 +44,15 @@ proptest = "1"
rand = "0.8.4"
serde = { version = "1", features = ["derive"] }
serde_json = "1"
tokio = { version = "1.23.1", features = [
"macros",
"rt",
"rt-multi-thread",
"fs",
"io-util",
] }
tokio-stream = "0.1.5"
tempfile = "3.2.0"

[package.metadata.docs.rs]
all-features = true
Expand Down
13 changes: 13 additions & 0 deletions rust-runtime/aws-smithy-types/external-types.toml
Original file line number Diff line number Diff line change
@@ -1,2 +1,15 @@
allowed_external_types = [
"bytes::bytes::Bytes",
"bytes::buf::buf_impl::Buf",

# TODO(https://github.com/awslabs/smithy-rs/issues/3033): Feature gate based on unstable versions
"http_body::Body",
"http_body::combinators::box_body::BoxBody",
"hyper::body::body::Body",

# TODO(https://github.com/awslabs/smithy-rs/issues/1193): Feature gate Tokio `AsyncRead`
"tokio::io::async_read::AsyncRead",

# TODO(https://github.com/awslabs/smithy-rs/issues/1193): Feature gate references to Tokio `File`
"tokio::fs::file::File",
]
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ impl SdkBody {
}
}

/// Update this `SdkBody` with `map`. **This function MUST NOT alert the data of the body.**
/// Update this `SdkBody` with `map`. **This function MUST NOT alter the data of the body.**
///
/// This function is useful for adding metadata like progress tracking to an [`SdkBody`] that
/// does not alter the actual byte data. If your mapper alters the contents of the body, use [`SdkBody::map`]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
//!
//! ### Writing a ByteStream into a file:
//! ```no_run
//! use aws_smithy_http::byte_stream::ByteStream;
//! use aws_smithy_types::byte_stream::ByteStream;
//! use std::error::Error;
//! use tokio::fs::File;
//! use tokio::io::AsyncWriteExt;
Expand All @@ -34,7 +34,7 @@
//! ### Converting a ByteStream into Bytes
//! ```no_run
//! use bytes::Bytes;
//! use aws_smithy_http::byte_stream::ByteStream;
//! use aws_smithy_types::byte_stream::ByteStream;
//! use std::error::Error;
//! struct SynthesizeSpeechOutput {
//! audio_stream: ByteStream,
Expand All @@ -53,7 +53,7 @@
//!
//! ```no_run
//! use bytes::{Buf, Bytes};
//! use aws_smithy_http::byte_stream::ByteStream;
//! use aws_smithy_types::byte_stream::ByteStream;
//! use std::error::Error;
//! use tokio::fs::File;
//! use tokio::io::AsyncWriteExt;
Expand Down Expand Up @@ -83,7 +83,7 @@
//! ```no_run
//! # #[cfg(feature = "rt-tokio")]
//! # {
//! use aws_smithy_http::byte_stream::ByteStream;
//! use aws_smithy_types::byte_stream::ByteStream;
//! use std::path::Path;
//! struct GetObjectInput {
//! body: ByteStream
Expand All @@ -104,7 +104,7 @@
//! ```no_run
//! # #[cfg(feature = "rt-tokio")]
//! # {
//! use aws_smithy_http::byte_stream::{ByteStream, Length};
//! use aws_smithy_types::byte_stream::{ByteStream, Length};
//! use std::path::Path;
//! struct GetObjectInput {
//! body: ByteStream
Expand Down Expand Up @@ -157,8 +157,8 @@ pin_project! {
/// [`.collect()`](crate::byte_stream::ByteStream::collect) reads the complete ByteStream into memory and stores it in `AggregatedBytes`,
/// a non-contiguous ByteBuffer.
/// ```no_run
/// use aws_smithy_http::byte_stream::{ByteStream, AggregatedBytes};
/// use aws_smithy_http::body::SdkBody;
/// use aws_smithy_types::byte_stream::{ByteStream, AggregatedBytes};
/// use aws_smithy_types::body::SdkBody;
/// use bytes::Buf;
/// async fn example() {
/// let stream = ByteStream::new(SdkBody::from("hello! This is some data"));
Expand All @@ -181,8 +181,8 @@ pin_project! {
/// # pub fn finish(&self) -> u64 { 6 }
/// # }
/// # }
/// use aws_smithy_http::byte_stream::{ByteStream, AggregatedBytes, error::Error};
/// use aws_smithy_http::body::SdkBody;
/// use aws_smithy_types::byte_stream::{ByteStream, AggregatedBytes, error::Error};
/// use aws_smithy_types::body::SdkBody;
///
/// async fn example() -> Result<(), Error> {
/// let mut stream = ByteStream::from(vec![1, 2, 3, 4, 5, 99]);
Expand All @@ -202,8 +202,8 @@ pin_project! {
/// It's possible to convert a `ByteStream` into a struct that implements [`tokio::io::AsyncRead`](tokio::io::AsyncRead).
/// Then, you can use pre-existing tools like [`tokio::io::BufReader`](tokio::io::BufReader):
/// ```no_run
/// use aws_smithy_http::byte_stream::ByteStream;
/// use aws_smithy_http::body::SdkBody;
/// use aws_smithy_types::byte_stream::ByteStream;
/// use aws_smithy_types::body::SdkBody;
/// use tokio::io::{AsyncBufReadExt, BufReader};
/// #[cfg(feature = "rt-tokio")]
/// async fn example() -> std::io::Result<()> {
Expand All @@ -224,7 +224,7 @@ pin_project! {
/// will be converted into `Bytes` enabling a cheap clone during retries.
/// ```no_run
/// use bytes::Bytes;
/// use aws_smithy_http::byte_stream::ByteStream;
/// use aws_smithy_types::byte_stream::ByteStream;
/// let stream = ByteStream::from(vec![1,2,3]);
/// let stream = ByteStream::from(Bytes::from_static(b"hello!"));
/// ```
Expand All @@ -233,7 +233,7 @@ pin_project! {
/// ```no_run
/// #[cfg(feature = "tokio-rt")]
/// # {
/// use aws_smithy_http::byte_stream::ByteStream;
/// use aws_smithy_types::byte_stream::ByteStream;
/// let stream = ByteStream::from_path("big_file.csv");
/// # }
/// ```
Expand All @@ -242,8 +242,8 @@ pin_project! {
/// from an SdkBody. **When created from an SdkBody, care must be taken to ensure retriability.** An SdkBody is retryable
/// when constructed from in-memory data or when using [`SdkBody::retryable`](crate::body::SdkBody::retryable).
/// ```no_run
/// use aws_smithy_http::byte_stream::ByteStream;
/// use aws_smithy_http::body::SdkBody;
/// use aws_smithy_types::byte_stream::ByteStream;
/// use aws_smithy_types::body::SdkBody;
/// use bytes::Bytes;
/// let (mut tx, channel_body) = hyper::Body::channel();
/// // this will not be retryable because the SDK has no way to replay this stream
Expand Down Expand Up @@ -322,9 +322,9 @@ impl ByteStream {
/// over the network. If a contiguous slice is required, use `into_bytes()`.
/// ```no_run
/// use bytes::Bytes;
/// use aws_smithy_http::body;
/// use aws_smithy_http::body::SdkBody;
/// use aws_smithy_http::byte_stream::{ByteStream, error::Error};
/// use aws_smithy_types::body;
/// use aws_smithy_types::body::SdkBody;
/// use aws_smithy_types::byte_stream::{ByteStream, error::Error};
/// async fn get_data() {
/// let stream = ByteStream::new(SdkBody::from("hello!"));
/// let data: Result<Bytes, Error> = stream.collect().await.map(|data| data.into_bytes());
Expand All @@ -339,7 +339,7 @@ impl ByteStream {
/// ```no_run
/// # #[cfg(feature = "rt-tokio")]
/// # {
/// use aws_smithy_http::byte_stream::{ByteStream, Length};
/// use aws_smithy_types::byte_stream::{ByteStream, Length};
///
/// async fn bytestream_from_file() -> ByteStream {
/// let bytestream = ByteStream::read_from()
Expand Down Expand Up @@ -379,7 +379,7 @@ impl ByteStream {
///
/// # Examples
/// ```no_run
/// use aws_smithy_http::byte_stream::ByteStream;
/// use aws_smithy_types::byte_stream::ByteStream;
/// use std::path::Path;
/// async fn make_bytestream() -> ByteStream {
/// ByteStream::from_path("docs/rows.csv").await.expect("file should be readable")
Expand Down Expand Up @@ -412,7 +412,7 @@ impl ByteStream {
///
/// ```rust
/// use tokio::io::{BufReader, AsyncBufReadExt};
/// use aws_smithy_http::byte_stream::ByteStream;
/// use aws_smithy_types::byte_stream::ByteStream;
///
/// # async fn dox(my_bytestream: ByteStream) -> std::io::Result<()> {
/// let mut lines = BufReader::new(my_bytestream.into_async_read()).lines();
Expand All @@ -423,9 +423,20 @@ impl ByteStream {
/// # }
/// ```
pub fn into_async_read(self) -> impl tokio::io::AsyncRead {
tokio_util::io::StreamReader::new(
crate::futures_stream_adapter::FuturesStreamCompatByteStream::new(self),
)
// The `Stream` trait is currently unstable so we can only use it in private.
// Here, we create a local struct just to enable the trait for `ByteStream` and pass it
// to `StreamReader`.
struct FuturesStreamCompatByteStream(ByteStream);
impl futures_core::stream::Stream for FuturesStreamCompatByteStream {
type Item = Result<Bytes, Error>;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
Pin::new(&mut self.0).poll_next(cx)
}
}
tokio_util::io::StreamReader::new(FuturesStreamCompatByteStream(self))
}

/// Given a function to modify an [`SdkBody`], run it on the `SdkBody` inside this `Bytestream`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ impl PathBody {
/// ```no_run
/// # #[cfg(feature = "rt-tokio")]
/// # {
/// use aws_smithy_http::byte_stream::{ByteStream, Length};
/// use aws_smithy_types::byte_stream::{ByteStream, Length};
/// use std::path::Path;
/// struct GetObjectInput {
/// body: ByteStream
Expand Down
Loading

0 comments on commit 0f38dae

Please sign in to comment.