Skip to content

Commit

Permalink
add io related abstractions to support s3 tm uploads (#3756)
Browse files Browse the repository at this point in the history
## Description

Adds a new `io` module to S3 transfer manager HLL. This is supporting
functionality for implementing uploads.

High level description of new components:

* **InputStream** - this is a new abstraction to replace `ByteStream`
for use in transfer manager uploads. The primary driver behind this is
once we create a `ByteStream` we have very little control over
optimizing how it can be consumed. We lose whether it came from a file
or in-memory or elsewhere. Internally this will of course eventually map
to a `ByteStream` when we call `UploadPart` or `PutObject` but we want
to have control over how bytes are read from the stream (e.g. being able
to read a file concurrently).
* In the future we may add additional variants to support e.g. unknown
streams (e.g. `impl io::Read`) or giving users control over checksums
for each part, etc. This is an area of exploration still.
* **PartReader / ReadPart** - internal abstractions over `InputStream`
that workers will eventually consume. The idea is we know what
`InputStream` is and we convert it to something that reads "parts" for
multipart upload in the most efficient way possible for whatever the
underlying stream type is.
* In a subsequent PR workers share a single part reader and read from it
concurrently. Workers don't have to care about anything other than
asking for the next "part" of data and uploading it.
* **PathBodyBuilder / PathBody** - replacements for `FsBuilder` from
smithy runtime / SDK. Notable difference is we don't currently allow
specifying either an already open file or control over buffer sizes.
* We can of course support an open file but the implementation will
require additional locking to ensure bytes are read sequentially

----

_By submitting this pull request, I confirm that you can use, modify,
copy, and redistribute this contribution, under the terms of your
choice._
  • Loading branch information
aajtodd authored Jul 15, 2024
1 parent 523a558 commit b521339
Show file tree
Hide file tree
Showing 9 changed files with 828 additions and 6 deletions.
13 changes: 7 additions & 6 deletions aws/hll/aws-s3-transfer-manager/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@ publish = false

[dependencies]
async-channel = "2.3.1"
async-trait = "0.1.80"
aws-sdk-s3 = { version = "1.36.0", features = ["behavior-version-latest", "test-util"] }
aws-smithy-http = "0.60.8"
aws-smithy-runtime-api = "1.7.0"
async-trait = "0.1.81"
aws-sdk-s3 = { version = "1.40.0", features = ["behavior-version-latest", "test-util"] }
aws-smithy-http = "0.60.9"
aws-smithy-runtime-api = "1.7.1"
aws-smithy-types = "1.2.0"
aws-types = "1.3.1"
aws-types = "1.3.3"
bytes = "1"
# FIXME - upgrade to hyper 1.x
hyper = { version = "0.14.29", features = ["client"] }
Expand All @@ -24,8 +24,9 @@ tokio = { version = "1.38.0", features = ["rt-multi-thread", "io-util", "sync",
tracing = "0.1"

[dev-dependencies]
aws-config = { version = "1.5.1", features = ["behavior-version-latest"] }
aws-config = { version = "1.5.4", features = ["behavior-version-latest"] }
aws-smithy-mocks-experimental = "0.2.1"
clap = { version = "4.5.7", default-features = false, features = ["derive", "std", "help"] }
console-subscriber = "0.3.0"
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
tempfile = "3.10.1"
5 changes: 5 additions & 0 deletions aws/hll/aws-s3-transfer-manager/src/download/object_meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ pub struct ObjectMetadata {
pub content_range: Option<String>,
pub content_type: Option<String>,
pub expires: Option<::aws_smithy_types::DateTime>,
pub expires_string: Option<String>,
pub website_redirect_location: Option<String>,
pub server_side_encryption: Option<aws_sdk_s3::types::ServerSideEncryption>,
pub metadata: Option<::std::collections::HashMap<String, String>>,
Expand Down Expand Up @@ -85,7 +86,9 @@ impl From<GetObjectOutput> for ObjectMetadata {
content_language: value.content_language,
content_range: value.content_range,
content_type: value.content_type,
#[allow(deprecated)]
expires: value.expires,
expires_string: value.expires_string,
website_redirect_location: value.website_redirect_location,
server_side_encryption: value.server_side_encryption,
metadata: value.metadata,
Expand Down Expand Up @@ -127,7 +130,9 @@ impl From<HeadObjectOutput> for ObjectMetadata {
content_language: value.content_language,
content_range: None,
content_type: value.content_type,
#[allow(deprecated)]
expires: value.expires,
expires_string: value.expires_string,
website_redirect_location: value.website_redirect_location,
server_side_encryption: value.server_side_encryption,
metadata: value.metadata,
Expand Down
79 changes: 79 additions & 0 deletions aws/hll/aws-s3-transfer-manager/src/io/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/
use std::error::Error as StdError;
use std::fmt;
use std::fmt::Formatter;
use std::io::{Error as StdIoError, ErrorKind as StdIoErrorKind};
use tokio::task::JoinError;

#[derive(Debug)]
pub(crate) enum ErrorKind {
UpperBoundSizeHintRequired,
OffsetGreaterThanFileSize,
TaskFailed(JoinError),
IoError(StdIoError),
}

/// An I/O related error occurred
#[derive(Debug)]
pub struct Error {
kind: ErrorKind,
}

impl Error {
pub(crate) fn upper_bound_size_hint_required() -> Error {
ErrorKind::UpperBoundSizeHintRequired.into()
}
}
impl From<ErrorKind> for Error {
fn from(kind: ErrorKind) -> Self {
Self { kind }
}
}

impl From<StdIoError> for Error {
fn from(err: StdIoError) -> Self {
ErrorKind::IoError(err).into()
}
}

impl fmt::Display for Error {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
match &self.kind {
ErrorKind::UpperBoundSizeHintRequired => write!(
f,
"size hint upper bound (SizeHint::upper) is required but was None"
),
ErrorKind::OffsetGreaterThanFileSize => write!(
f,
"offset must be less than or equal to file size but was greater than"
),
ErrorKind::IoError(_) => write!(f, "I/O error"),
ErrorKind::TaskFailed(_) => write!(f, "task failed"),
}
}
}

impl StdError for Error {
fn source(&self) -> Option<&(dyn StdError + 'static)> {
match &self.kind {
ErrorKind::UpperBoundSizeHintRequired => None,
ErrorKind::OffsetGreaterThanFileSize => None,
ErrorKind::IoError(err) => Some(err as _),
ErrorKind::TaskFailed(err) => Some(err as _),
}
}
}
impl From<Error> for StdIoError {
fn from(err: Error) -> Self {
StdIoError::new(StdIoErrorKind::Other, err)
}
}

impl From<JoinError> for Error {
fn from(value: JoinError) -> Self {
ErrorKind::TaskFailed(value).into()
}
}
17 changes: 17 additions & 0 deletions aws/hll/aws-s3-transfer-manager/src/io/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/

pub(crate) mod part_reader;
mod path_body;
mod stream;

/// Error types related to I/O abstractions
pub mod error;
mod size_hint;

// re-exports
pub use self::path_body::PathBodyBuilder;
pub use self::size_hint::SizeHint;
pub use self::stream::InputStream;
Loading

0 comments on commit b521339

Please sign in to comment.