From b521339a285dc0a85c74d787f975e11ba8de5274 Mon Sep 17 00:00:00 2001 From: Aaron Todd Date: Mon, 15 Jul 2024 15:17:43 -0400 Subject: [PATCH] add io related abstractions to support s3 tm uploads (#3756) ## Description Adds a new `io` module to S3 transfer manager HLL. This is supporting functionality for implementing uploads. High level description of new components: * **InputStream** - this is a new abstraction to replace `ByteStream` for use in transfer manager uploads. The primary driver behind this is once we create a `ByteStream` we have very little control over optimizing how it can be consumed. We lose whether it came from a file or in-memory or elsewhere. Internally this will of course eventually map to a `ByteStream` when we call `UploadPart` or `PutObject` but we want to have control over how bytes are read from the stream (e.g. being able to read a file concurrently). * In the future we may add additional variants to support e.g. unknown streams (e.g. `impl io::Read`) or giving users control over checksums for each part, etc. This is an area of exploration still. * **PartReader / ReadPart** - internal abstractions over `InputStream` that workers will eventually consume. The idea is we know what `InputStream` is and we convert it to something that reads "parts" for multipart upload in the most efficient way possible for whatever the underlying stream type is. * In a subsequent PR workers share a single part reader and read from it concurrently. Workers don't have to care about anything other than asking for the next "part" of data and uploading it. * **PathBodyBuilder / PathBody** - replacements for `FsBuilder` from smithy runtime / SDK. Notable difference is we don't currently allow specifying either an already open file or control over buffer sizes. * We can of course support an open file but the implementation will require additional locking to ensure bytes are read sequentially ---- _By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice._ --- aws/hll/aws-s3-transfer-manager/Cargo.toml | 13 +- .../src/download/object_meta.rs | 5 + .../aws-s3-transfer-manager/src/io/error.rs | 79 ++++ aws/hll/aws-s3-transfer-manager/src/io/mod.rs | 17 + .../src/io/part_reader.rs | 337 ++++++++++++++++++ .../src/io/path_body.rs | 209 +++++++++++ .../src/io/size_hint.rs | 41 +++ .../aws-s3-transfer-manager/src/io/stream.rs | 129 +++++++ aws/hll/aws-s3-transfer-manager/src/lib.rs | 4 + 9 files changed, 828 insertions(+), 6 deletions(-) create mode 100644 aws/hll/aws-s3-transfer-manager/src/io/error.rs create mode 100644 aws/hll/aws-s3-transfer-manager/src/io/mod.rs create mode 100644 aws/hll/aws-s3-transfer-manager/src/io/part_reader.rs create mode 100644 aws/hll/aws-s3-transfer-manager/src/io/path_body.rs create mode 100644 aws/hll/aws-s3-transfer-manager/src/io/size_hint.rs create mode 100644 aws/hll/aws-s3-transfer-manager/src/io/stream.rs diff --git a/aws/hll/aws-s3-transfer-manager/Cargo.toml b/aws/hll/aws-s3-transfer-manager/Cargo.toml index da61910b5f..d6570a123f 100644 --- a/aws/hll/aws-s3-transfer-manager/Cargo.toml +++ b/aws/hll/aws-s3-transfer-manager/Cargo.toml @@ -10,12 +10,12 @@ publish = false [dependencies] async-channel = "2.3.1" -async-trait = "0.1.80" -aws-sdk-s3 = { version = "1.36.0", features = ["behavior-version-latest", "test-util"] } -aws-smithy-http = "0.60.8" -aws-smithy-runtime-api = "1.7.0" +async-trait = "0.1.81" +aws-sdk-s3 = { version = "1.40.0", features = ["behavior-version-latest", "test-util"] } +aws-smithy-http = "0.60.9" +aws-smithy-runtime-api = "1.7.1" aws-smithy-types = "1.2.0" -aws-types = "1.3.1" +aws-types = "1.3.3" bytes = "1" # FIXME - upgrade to hyper 1.x hyper = { version = "0.14.29", features = ["client"] } @@ -24,8 +24,9 @@ tokio = { version = "1.38.0", features = ["rt-multi-thread", "io-util", "sync", tracing = "0.1" [dev-dependencies] -aws-config = { version = "1.5.1", features = ["behavior-version-latest"] } +aws-config = { version = "1.5.4", features = ["behavior-version-latest"] } aws-smithy-mocks-experimental = "0.2.1" clap = { version = "4.5.7", default-features = false, features = ["derive", "std", "help"] } console-subscriber = "0.3.0" tracing-subscriber = { version = "0.3.18", features = ["env-filter"] } +tempfile = "3.10.1" diff --git a/aws/hll/aws-s3-transfer-manager/src/download/object_meta.rs b/aws/hll/aws-s3-transfer-manager/src/download/object_meta.rs index d5f570660f..e742406b5f 100644 --- a/aws/hll/aws-s3-transfer-manager/src/download/object_meta.rs +++ b/aws/hll/aws-s3-transfer-manager/src/download/object_meta.rs @@ -32,6 +32,7 @@ pub struct ObjectMetadata { pub content_range: Option, pub content_type: Option, pub expires: Option<::aws_smithy_types::DateTime>, + pub expires_string: Option, pub website_redirect_location: Option, pub server_side_encryption: Option, pub metadata: Option<::std::collections::HashMap>, @@ -85,7 +86,9 @@ impl From for ObjectMetadata { content_language: value.content_language, content_range: value.content_range, content_type: value.content_type, + #[allow(deprecated)] expires: value.expires, + expires_string: value.expires_string, website_redirect_location: value.website_redirect_location, server_side_encryption: value.server_side_encryption, metadata: value.metadata, @@ -127,7 +130,9 @@ impl From for ObjectMetadata { content_language: value.content_language, content_range: None, content_type: value.content_type, + #[allow(deprecated)] expires: value.expires, + expires_string: value.expires_string, website_redirect_location: value.website_redirect_location, server_side_encryption: value.server_side_encryption, metadata: value.metadata, diff --git a/aws/hll/aws-s3-transfer-manager/src/io/error.rs b/aws/hll/aws-s3-transfer-manager/src/io/error.rs new file mode 100644 index 0000000000..4b88133a33 --- /dev/null +++ b/aws/hll/aws-s3-transfer-manager/src/io/error.rs @@ -0,0 +1,79 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ +use std::error::Error as StdError; +use std::fmt; +use std::fmt::Formatter; +use std::io::{Error as StdIoError, ErrorKind as StdIoErrorKind}; +use tokio::task::JoinError; + +#[derive(Debug)] +pub(crate) enum ErrorKind { + UpperBoundSizeHintRequired, + OffsetGreaterThanFileSize, + TaskFailed(JoinError), + IoError(StdIoError), +} + +/// An I/O related error occurred +#[derive(Debug)] +pub struct Error { + kind: ErrorKind, +} + +impl Error { + pub(crate) fn upper_bound_size_hint_required() -> Error { + ErrorKind::UpperBoundSizeHintRequired.into() + } +} +impl From for Error { + fn from(kind: ErrorKind) -> Self { + Self { kind } + } +} + +impl From for Error { + fn from(err: StdIoError) -> Self { + ErrorKind::IoError(err).into() + } +} + +impl fmt::Display for Error { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + match &self.kind { + ErrorKind::UpperBoundSizeHintRequired => write!( + f, + "size hint upper bound (SizeHint::upper) is required but was None" + ), + ErrorKind::OffsetGreaterThanFileSize => write!( + f, + "offset must be less than or equal to file size but was greater than" + ), + ErrorKind::IoError(_) => write!(f, "I/O error"), + ErrorKind::TaskFailed(_) => write!(f, "task failed"), + } + } +} + +impl StdError for Error { + fn source(&self) -> Option<&(dyn StdError + 'static)> { + match &self.kind { + ErrorKind::UpperBoundSizeHintRequired => None, + ErrorKind::OffsetGreaterThanFileSize => None, + ErrorKind::IoError(err) => Some(err as _), + ErrorKind::TaskFailed(err) => Some(err as _), + } + } +} +impl From for StdIoError { + fn from(err: Error) -> Self { + StdIoError::new(StdIoErrorKind::Other, err) + } +} + +impl From for Error { + fn from(value: JoinError) -> Self { + ErrorKind::TaskFailed(value).into() + } +} diff --git a/aws/hll/aws-s3-transfer-manager/src/io/mod.rs b/aws/hll/aws-s3-transfer-manager/src/io/mod.rs new file mode 100644 index 0000000000..ebe9a60df7 --- /dev/null +++ b/aws/hll/aws-s3-transfer-manager/src/io/mod.rs @@ -0,0 +1,17 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +pub(crate) mod part_reader; +mod path_body; +mod stream; + +/// Error types related to I/O abstractions +pub mod error; +mod size_hint; + +// re-exports +pub use self::path_body::PathBodyBuilder; +pub use self::size_hint::SizeHint; +pub use self::stream::InputStream; diff --git a/aws/hll/aws-s3-transfer-manager/src/io/part_reader.rs b/aws/hll/aws-s3-transfer-manager/src/io/part_reader.rs new file mode 100644 index 0000000000..338ea9e49c --- /dev/null +++ b/aws/hll/aws-s3-transfer-manager/src/io/part_reader.rs @@ -0,0 +1,337 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ +use std::cmp; +use std::ops::DerefMut; +use std::sync::Mutex; + +use bytes::{Buf, Bytes, BytesMut}; + +use crate::io::error::Error; +use crate::io::path_body::PathBody; +use crate::io::stream::RawInputStream; +use crate::io::InputStream; +use crate::MEBIBYTE; + +/// Builder for creating a `ReadPart` implementation. +#[derive(Debug)] +pub(crate) struct Builder { + stream: Option, + part_size: usize, +} + +impl Builder { + pub(crate) fn new() -> Self { + Self { + stream: None, + part_size: 5 * MEBIBYTE as usize, + } + } + + /// Set the input stream to read from. + pub(crate) fn stream(mut self, stream: InputStream) -> Self { + self.stream = Some(stream.inner); + self + } + + /// Set the target part size that should be used when reading data. + /// + /// All parts except for possibly the last one should be of this size. + pub(crate) fn part_size(mut self, part_size: usize) -> Self { + self.part_size = part_size; + self + } + + pub(crate) fn build(self) -> impl ReadPart { + let stream = self.stream.expect("input stream set"); + match stream { + RawInputStream::Buf(buf) => { + PartReader::Bytes(BytesPartReader::new(buf, self.part_size)) + } + RawInputStream::Fs(path_body) => { + PartReader::Fs(PathBodyPartReader::new(path_body, self.part_size)) + } + } + } +} + +#[derive(Debug)] +enum PartReader { + Bytes(BytesPartReader), + Fs(PathBodyPartReader), +} + +impl ReadPart for PartReader { + async fn next_part(&self) -> Result, Error> { + match self { + PartReader::Bytes(bytes) => bytes.next_part().await, + PartReader::Fs(path_body) => path_body.next_part().await, + } + } +} + +/// Data for a single part +pub(crate) struct PartData { + // 1-indexed + pub(crate) part_number: u64, + pub(crate) data: Bytes, +} + +/// The `ReadPart` trait allows for reading data from an `InputStream` and packaging the raw +/// data into `PartData` which carries additional metadata needed for uploading a part. +pub(crate) trait ReadPart { + /// Request the next "part" of data. + /// + /// When there is no more data readers should return `Ok(None)`. + /// NOTE: Implementations are allowed to return data in any order and consumers are + /// expected to order data by the part number. + fn next_part( + &self, + ) -> impl std::future::Future, Error>> + Send; +} + +#[derive(Debug)] +struct PartReaderState { + // current start offset + offset: u64, + // current part number + part_number: u64, + // total number of bytes remaining to be read + remaining: u64, +} + +impl PartReaderState { + /// Create a new `PartReaderState` + fn new(content_length: u64) -> Self { + Self { + offset: 0, + part_number: 1, + remaining: content_length, + } + } + + /// Set the initial offset to start reading from + fn with_offset(self, offset: u64) -> Self { + Self { offset, ..self } + } +} + +/// [ReadPart] implementation for in-memory input streams. +#[derive(Debug)] +struct BytesPartReader { + buf: Bytes, + part_size: usize, + state: Mutex, +} + +impl BytesPartReader { + fn new(buf: Bytes, part_size: usize) -> Self { + let content_length = buf.remaining() as u64; + Self { + buf, + part_size, + state: Mutex::new(PartReaderState::new(content_length)), // std Mutex + } + } +} + +impl ReadPart for BytesPartReader { + async fn next_part(&self) -> Result, Error> { + let mut state = self.state.lock().expect("lock valid"); + if state.remaining == 0 { + return Ok(None); + } + + let start = state.offset as usize; + let end = cmp::min(start + self.part_size, self.buf.len()); + let data = self.buf.slice(start..end); + let part_number = state.part_number; + state.part_number += 1; + state.offset += data.len() as u64; + state.remaining -= data.len() as u64; + let part = PartData { data, part_number }; + Ok(Some(part)) + } +} + +/// [ReadPart] implementation for path based input streams +#[derive(Debug)] +struct PathBodyPartReader { + body: PathBody, + part_size: usize, + state: Mutex, // std Mutex +} + +impl PathBodyPartReader { + fn new(body: PathBody, part_size: usize) -> Self { + let offset = body.offset; + let content_length = body.length; + Self { + body, + part_size, + state: Mutex::new(PartReaderState::new(content_length).with_offset(offset)), // std Mutex + } + } +} + +impl ReadPart for PathBodyPartReader { + async fn next_part(&self) -> Result, Error> { + let (offset, part_number, part_size) = { + let mut state = self.state.lock().expect("lock valid"); + if state.remaining == 0 { + return Ok(None); + } + let offset = state.offset; + let part_number = state.part_number; + + let part_size = cmp::min(self.part_size as u64, state.remaining); + state.offset += part_size; + state.part_number += 1; + state.remaining -= part_size; + + (offset, part_number, part_size) + }; + let path = self.body.path.clone(); + let handle = tokio::task::spawn_blocking(move || { + // TODO(aws-sdk-rust#1159) - replace allocation with memory pool + let mut dst = BytesMut::with_capacity(part_size as usize); + // we need to set the length so that the raw &[u8] slice has the correct + // size, we are guaranteed to read exactly part_size data from file on success + // FIXME(aws-sdk-rust#1159) - can we get rid of this use of unsafe? + unsafe { dst.set_len(dst.capacity()) } + file_util::read_file_chunk_sync(dst.deref_mut(), path, offset)?; + let data = dst.freeze(); + Ok::(PartData { data, part_number }) + }); + + handle.await?.map(Some) + } +} + +mod file_util { + #[cfg(unix)] + pub(super) use unix::read_file_chunk_sync; + #[cfg(windows)] + pub(super) use windows::read_file_chunk_sync; + + #[cfg(unix)] + mod unix { + use std::fs::File; + use std::io; + use std::os::unix::fs::FileExt; + use std::path::Path; + + pub(crate) fn read_file_chunk_sync( + dst: &mut [u8], + path: impl AsRef, + offset: u64, + ) -> Result<(), io::Error> { + let file = File::open(path)?; + file.read_exact_at(dst, offset) + } + } + + #[cfg(windows)] + mod windows { + use std::fs::File; + use std::io; + use std::io::{Read, Seek, SeekFrom}; + use std::path::Path; + + pub(crate) fn read_file_chunk_sync( + dst: &mut [u8], + path: impl AsRef, + offset: u64, + ) -> Result<(), io::Error> { + let mut file = File::open(path)?; + file.seek(SeekFrom::Start(offset))?; + file.read_exact(dst) + } + } +} + +#[cfg(test)] +mod test { + use std::io::Write; + + use bytes::{Buf, Bytes}; + use tempfile::NamedTempFile; + + use crate::io::part_reader::{PartData, Builder, ReadPart}; + use crate::io::InputStream; + + async fn collect_parts(reader: impl ReadPart) -> Vec { + let mut parts = Vec::new(); + let mut expected_part_number = 1; + while let Some(part) = reader.next_part().await.unwrap() { + assert_eq!(expected_part_number, part.part_number); + expected_part_number += 1; + parts.push(part); + } + parts + } + + #[tokio::test] + async fn test_bytes_part_reader() { + let data = Bytes::from("a lep is a ball, a tay is a hammer, a flix is a comb"); + let stream = InputStream::from(data.clone()); + let expected = data.chunks(5).collect::>(); + let reader = Builder::new().part_size(5).stream(stream).build(); + let parts = collect_parts(reader).await; + let actual = parts.iter().map(|p| p.data.chunk()).collect::>(); + + assert_eq!(expected, actual); + } + + async fn path_reader_test(limit: Option, offset: Option) { + let part_size = 5; + let mut tmp = NamedTempFile::new().unwrap(); + let mut data = Bytes::from("a lep is a ball, a tay is a hammer, a flix is a comb"); + tmp.write_all(data.chunk()).unwrap(); + + let mut builder = InputStream::read_from().path(tmp.path()); + if let Some(limit) = limit { + data.truncate(limit); + builder = builder.length((limit - offset.unwrap_or_default()) as u64); + } + + if let Some(offset) = offset { + data.advance(offset); + builder = builder.offset(offset as u64); + } + + let expected = data.chunks(part_size).collect::>(); + + let stream = builder.build().unwrap(); + let reader = Builder::new() + .part_size(part_size) + .stream(stream) + .build(); + + let parts = collect_parts(reader).await; + let actual = parts.iter().map(|p| p.data.chunk()).collect::>(); + + assert_eq!(expected, actual); + } + + #[tokio::test] + async fn test_path_part_reader() { + path_reader_test(None, None).await; + } + + #[tokio::test] + async fn test_path_part_reader_with_offset() { + path_reader_test(None, Some(8)).await; + } + + #[tokio::test] + async fn test_path_part_reader_with_explicit_length() { + path_reader_test(Some(12), None).await; + } + + #[tokio::test] + async fn test_path_part_reader_with_length_and_offset() { + path_reader_test(Some(23), Some(4)).await; + } +} diff --git a/aws/hll/aws-s3-transfer-manager/src/io/path_body.rs b/aws/hll/aws-s3-transfer-manager/src/io/path_body.rs new file mode 100644 index 0000000000..0d64d0f6a7 --- /dev/null +++ b/aws/hll/aws-s3-transfer-manager/src/io/path_body.rs @@ -0,0 +1,209 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +use crate::io::error::{Error, ErrorKind}; +use crate::io::stream::{InputStream, RawInputStream}; +use std::fs; +use std::path::PathBuf; + +/// Input stream designed to wrap file based input. +#[derive(Debug)] +pub(super) struct PathBody { + // The path to the file + pub(super) path: PathBuf, + // The total number of bytes to read + pub(super) length: u64, + // The byte-offset to start reading from + pub(super) offset: u64, +} + +/// Builder for creating [`InputStream`](InputStream) from a file/path. +/// +/// ```no_run +/// # { +/// use aws_s3_transfer_manager::io::InputStream; +/// use std::path::Path; +/// +/// async fn input_stream_from_file() -> InputStream { +/// let stream = InputStream::read_from() +/// .path("docs/some-large-file.csv") +/// // Specify the length of the file used (skips an additional call to retrieve the size) +/// .build() +/// .expect("valid path"); +/// stream +/// } +/// # } +/// ``` +#[derive(Debug, Default)] +pub struct PathBodyBuilder { + path: Option, + length: Option, + offset: Option, +} + +impl PathBodyBuilder { + /// Create a new [`PathBodyBuilder`]. + /// + /// You must call [`path`](PathBodyBuilder::path) to specify what to read from. + pub fn new() -> Self { + Self::default() + } + + /// Sets the path to read from. + pub fn path(mut self, path: impl AsRef) -> Self { + self.path = Some(path.as_ref().to_path_buf()); + self + } + + /// Specify the offset to start reading from (in bytes) + /// + /// When used in conjunction with [`length`](PathBodyBuilder::length), allows for reading a single "chunk" of a file. + pub fn offset(mut self, offset: u64) -> Self { + self.offset = Some(offset); + self + } + + /// Specify the length to read (in bytes). + /// + /// By pre-specifying the length, this API skips an additional call to retrieve the size from file-system metadata. + /// + /// When used in conjunction with [`offset`](PathBodyBuilder::offset), allows for reading a single "chunk" of a file. + /// + ///
+ /// Setting the length manually will trigger no validation related to any offset provided or the actual size of + /// the file. This is an advanced setting mainly used to avoid an additional syscall if you know the + /// size of the file already. + ///
+ pub fn length(mut self, length: u64) -> Self { + self.length = Some(length); + self + } + + /// Returns a [`InputStream`] from this builder. + pub fn build(self) -> Result { + let path = self.path.expect("path set"); + let offset = self.offset.unwrap_or_default(); + + let length = match self.length { + None => { + // TODO(aws-sdk-rust#1159, design) - evaluate if we want build() to be async and to use tokio for stat() call (bytestream FsBuilder::build() is async) + let metadata = fs::metadata(path.clone())?; + let file_size = metadata.len(); + + if offset >= file_size { + return Err(ErrorKind::OffsetGreaterThanFileSize.into()); + } + + file_size - offset + } + Some(explicit) => explicit, + }; + + let body = PathBody { + path, + length, + offset, + }; + + let stream = InputStream { + inner: RawInputStream::Fs(body), + }; + + Ok(stream) + } +} + +#[cfg(test)] +mod test { + use std::io::Write; + use tempfile::NamedTempFile; + + use crate::io::{path_body::PathBodyBuilder, InputStream}; + + use super::PathBody; + + fn path_body(stream: &InputStream) -> &PathBody { + match &stream.inner { + crate::io::stream::RawInputStream::Buf(_) => panic!("unexpected inner body"), + crate::io::stream::RawInputStream::Fs(path_body) => path_body, + } + } + + #[test] + fn test_from_path() { + let mut tmp = NamedTempFile::new().unwrap(); + let content = "hello path body"; + tmp.write_all(content.as_bytes()).unwrap(); + + let stream = PathBodyBuilder::new().path(tmp.path()).build().unwrap(); + let body = path_body(&stream); + assert_eq!(0, body.offset); + assert_eq!(content.as_bytes().len() as u64, body.length); + } + + #[test] + fn test_explicit_content_length() { + let mut tmp = NamedTempFile::new().unwrap(); + + let stream = PathBodyBuilder::new() + .path(tmp.path()) + .length(64) + .build() + .unwrap(); + + let body = path_body(&stream); + assert_eq!(0, body.offset); + // we don't validate this + assert_eq!(64, body.length); + } + + #[test] + fn test_length_with_offset() { + let mut tmp = NamedTempFile::new().unwrap(); + let content = "hello path body"; + tmp.write_all(content.as_bytes()).unwrap(); + let offset = 5; + + let stream = PathBodyBuilder::new() + .path(tmp.path()) + .offset(offset) + .build() + .unwrap(); + + let body = path_body(&stream); + assert_eq!(offset, body.offset); + assert_eq!(content.len() as u64 - offset, body.length); + } + + #[test] + fn test_explicit_content_length_and_offset() { + let mut tmp = NamedTempFile::new().unwrap(); + + let stream = PathBodyBuilder::new() + .path(tmp.path()) + .length(64) + .offset(12) + .build() + .unwrap(); + + let body = path_body(&stream); + assert_eq!(12, body.offset); + assert_eq!(64, body.length); + } + + #[should_panic] + #[test] + fn test_invalid_offset() { + let mut tmp = NamedTempFile::new().unwrap(); + let content = "hello path body"; + tmp.write_all(content.as_bytes()).unwrap(); + + let stream = PathBodyBuilder::new() + .path(tmp.path()) + .offset(22) + .build() + .unwrap(); + } +} diff --git a/aws/hll/aws-s3-transfer-manager/src/io/size_hint.rs b/aws/hll/aws-s3-transfer-manager/src/io/size_hint.rs new file mode 100644 index 0000000000..430961734c --- /dev/null +++ b/aws/hll/aws-s3-transfer-manager/src/io/size_hint.rs @@ -0,0 +1,41 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +/// A body size hint +#[derive(Debug, Clone, Default)] +pub struct SizeHint { + lower: u64, + upper: Option, +} + +impl SizeHint { + /// Set an exact size hint with upper and lower set to `size` bytes. + pub fn exact(size: u64) -> Self { + Self { + lower: size, + upper: Some(size), + } + } + + /// Set the lower bound on the body size + pub fn with_lower(self, lower: u64) -> Self { + Self { lower, ..self } + } + + /// Set the upper bound on the body size + pub fn with_upper(self, upper: Option) -> Self { + Self { upper, ..self } + } + + /// Get the lower bound of the body size + pub fn lower(&self) -> u64 { + self.lower + } + + /// Get the upper bound of the body size if known. + pub fn upper(&self) -> Option { + self.upper + } +} diff --git a/aws/hll/aws-s3-transfer-manager/src/io/stream.rs b/aws/hll/aws-s3-transfer-manager/src/io/stream.rs new file mode 100644 index 0000000000..c346d8294f --- /dev/null +++ b/aws/hll/aws-s3-transfer-manager/src/io/stream.rs @@ -0,0 +1,129 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +use std::default::Default; +use std::path::Path; + +use bytes::{Buf, Bytes}; + +use crate::io::error::Error; +use crate::io::path_body::PathBody; +use crate::io::path_body::PathBodyBuilder; +use crate::io::size_hint::SizeHint; + +/// Source of binary data. +/// +/// `InputStream` wraps a stream of data for ease of use. +#[derive(Debug)] +pub struct InputStream { + pub(super) inner: RawInputStream, +} + +impl InputStream { + /// Create a new `InputStream` from a static byte slice + pub fn from_static(bytes: &'static [u8]) -> Self { + let inner = RawInputStream::Buf(bytes.into()); + Self { inner } + } + + /// Return the bounds on the remaining length of the `InputStream` + pub fn size_hint(&self) -> SizeHint { + self.inner.size_hint() + } + + /// Returns a [`PathBodyBuilder`], allowing you to build a `InputStream` with + /// full control over how the file is read (eg. specifying the length of + /// the file or the starting offset to read from). + /// + /// ```no_run + /// # { + /// use aws_s3_transfer_manager::io::InputStream; + /// + /// async fn input_stream_from_file() -> InputStream { + /// let stream = InputStream::read_from() + /// .path("docs/some-large-file.csv") + /// // Specify the length of the file used (skips an additional call to retrieve the size) + /// .length(123_456) + /// .build() + /// .expect("valid path"); + /// stream + /// } + /// # } + /// ``` + pub fn read_from() -> PathBodyBuilder { + PathBodyBuilder::new() + } + + /// Create a new `InputStream` that reads data from a given `path`. + /// + /// ## Warning + /// The contents of the file MUST not change. The length & checksum of the file + /// will be cached. If the contents of the file change, the operation will almost certainly fail. + /// + /// Furthermore, a partial write MAY seek in the file and resume from the previous location. + /// + /// # Examples + /// ```no_run + /// use aws_s3_transfer_manager::io::InputStream; + /// use std::path::Path; + /// async fn make_stream() -> InputStream { + /// InputStream::from_path("docs/rows.csv").expect("file should be readable") + /// } + /// ``` + pub fn from_path(path: impl AsRef) -> Result { + Self::read_from().path(path).build() + } +} + +#[derive(Debug)] +pub(super) enum RawInputStream { + /// In-memory buffer to read from + Buf(Bytes), + /// File based input + Fs(PathBody), +} + +impl RawInputStream { + pub(super) fn size_hint(&self) -> SizeHint { + match self { + RawInputStream::Buf(bytes) => SizeHint::exact(bytes.remaining() as u64), + RawInputStream::Fs(path_body) => SizeHint::exact(path_body.length), + } + } +} + +impl Default for InputStream { + fn default() -> Self { + Self { + inner: RawInputStream::Buf(Bytes::default()), + } + } +} + +impl From for InputStream { + fn from(value: Bytes) -> Self { + Self { + inner: RawInputStream::Buf(value), + } + } +} + +impl From> for InputStream { + fn from(value: Vec) -> Self { + Self::from(Bytes::from(value)) + } +} + +impl From<&'static [u8]> for InputStream { + fn from(slice: &'static [u8]) -> InputStream { + Self::from(Bytes::from_static(slice)) + } +} + +impl From<&'static str> for InputStream { + fn from(slice: &'static str) -> InputStream { + Self::from(Bytes::from_static(slice.as_bytes())) + } +} diff --git a/aws/hll/aws-s3-transfer-manager/src/lib.rs b/aws/hll/aws-s3-transfer-manager/src/lib.rs index 5816229b23..d4e327745d 100644 --- a/aws/hll/aws-s3-transfer-manager/src/lib.rs +++ b/aws/hll/aws-s3-transfer-manager/src/lib.rs @@ -27,3 +27,7 @@ pub mod download; /// Error types emitted by `aws-s3-transfer-manager` pub mod error; + +/// Types and helpers for I/O +#[allow(unused)] // FIXME(aws-sdk-rust#1159) - remove when consumed internally by other modules +pub mod io;