Skip to content

Commit

Permalink
add basic CI and debugging instructions for S3 transfer manager (#3731)
Browse files Browse the repository at this point in the history
## Description
* Update README and code with basic instructions on using cargo
flamegraph and tokio console
* Add some basic CI sanity tests against latest crate versions.

----

_By submitting this pull request, I confirm that you can use, modify,
copy, and redistribute this contribution, under the terms of your
choice._
  • Loading branch information
aajtodd authored Jul 2, 2024
1 parent a7c825c commit 523a558
Show file tree
Hide file tree
Showing 14 changed files with 161 additions and 52 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,8 @@ jobs:
runner: smithy_ubuntu-latest_8-core
- action: check-aws-sdk-standalone-integration-tests
runner: ubuntu-latest
- action: check-aws-sdk-hll-s3-transfer-manager
runner: ubuntu-latest
steps:
- uses: actions/checkout@v4
with:
Expand Down
6 changes: 6 additions & 0 deletions aws/hll/aws-s3-transfer-manager/.cargo/config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
[build]
rustflags = ["--cfg", "tokio_unstable"]

[profile.profiling]
inherits = "release"
debug = true
2 changes: 2 additions & 0 deletions aws/hll/aws-s3-transfer-manager/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
flamegraph.svg
profile.json
1 change: 1 addition & 0 deletions aws/hll/aws-s3-transfer-manager/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,5 @@ tracing = "0.1"
aws-config = { version = "1.5.1", features = ["behavior-version-latest"] }
aws-smithy-mocks-experimental = "0.2.1"
clap = { version = "4.5.7", default-features = false, features = ["derive", "std", "help"] }
console-subscriber = "0.3.0"
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
21 changes: 21 additions & 0 deletions aws/hll/aws-s3-transfer-manager/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ cargo test --lib download::worker::tests::test_distribute_work

### Examples

NOTE: You can use the `profiling` profile from `.cargo/config.toml` to enable release with debug info for any example.

**Copy**

See all options:
Expand All @@ -34,3 +36,22 @@ AWS_PROFILE=<profile-name> RUST_LOG=trace cargo run --example cp s3://<my-bucket

NOTE: To run in release mode add `--release/-r` to the command, see `cargo run -h`.
NOTE: `trace` may be too verbose, you can see just this library's logs with `RUST_LOG=aws_s3_transfer_manager=trace`

#### Flamegraphs

See [cargo-flamegraph](https://github.com/flamegraph-rs/flamegraph) for more prerequisites and installation information.

Generate a flamegraph (default is to output to `flamegraph.svg`):

```sh
sudo AWS_PROFILE=<profile-name> RUST_LOG=aws_s3_transfer_manager=info cargo flamegraph --profile profiling --example cp -- s3://test-sdk-rust-aaron/mb-128.dat /tmp/mb-128.dat
```

#### Using tokio-console

Examples use [`console-subscriber`](https://crates.io/crates/console-subscriber) which allows you to run them with
[tokio-console](https://github.com/tokio-rs/console) to help debug task execution.


Follow installation instructions for [tokio-console](https://github.com/tokio-rs/console) and then run the
example with `tokio-console` running.
76 changes: 67 additions & 9 deletions aws/hll/aws-s3-transfer-manager/examples/cp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,12 @@ use aws_s3_transfer_manager::download::Downloader;

use aws_s3_transfer_manager::download::body::Body;
use aws_sdk_s3::operation::get_object::builders::GetObjectInputBuilder;
use aws_types::SdkConfig;
use bytes::Buf;
use clap::{CommandFactory, Parser};
use tokio::fs;
use tokio::io::AsyncWriteExt;
use tracing::{debug_span, Instrument};

type BoxError = Box<dyn Error + Send + Sync>;

Expand Down Expand Up @@ -101,7 +104,7 @@ fn invalid_arg(message: &str) -> ! {

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
tracing_subscriber::fmt::init();
console_subscriber::init();
let args = dbg!(Args::parse());

use TransferUri::*;
Expand All @@ -114,6 +117,10 @@ async fn main() -> Result<(), Box<dyn Error>> {

let config = aws_config::from_env().load().await;

println!("warming up client...");
warmup(&config).await?;
println!("warming up complete");

let tm = Downloader::builder()
.sdk_config(config)
.concurrency(args.concurrency)
Expand All @@ -123,7 +130,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
let (bucket, key) = args.source.expect_s3().parts();
let input = GetObjectInputBuilder::default().bucket(bucket).key(key);

let mut dest = fs::File::create(args.dest.expect_local()).await?;
let dest = fs::File::create(args.dest.expect_local()).await?;
println!("dest file opened, starting download");

let start = time::Instant::now();
Expand All @@ -132,14 +139,11 @@ async fn main() -> Result<(), Box<dyn Error>> {
// likely abstract this into performant utils for single file download. Higher level
// TM will handle it's own thread pool for filesystem work
let mut handle = tm.download(input.into()).await?;
let mut body = mem::replace(&mut handle.body, Body::empty());
let body = mem::replace(&mut handle.body, Body::empty());

while let Some(chunk) = body.next().await {
let chunk = chunk.unwrap();
for segment in chunk.into_segments() {
dest.write_all(segment.as_ref()).await?;
}
}
write_body(body, dest)
.instrument(debug_span!("write-output"))
.await?;

let elapsed = start.elapsed();
let obj_size = handle.object_meta.total_size();
Expand All @@ -152,3 +156,57 @@ async fn main() -> Result<(), Box<dyn Error>> {

Ok(())
}

// async fn write_body(mut body: Body, mut dest: fs::File) -> Result<(), Box<dyn Error>> {
// let b1: &[u8] = &mut [];
// let b2: &[u8] = &mut [];
// let b3: &[u8] = &mut [];
// let b4: &[u8] = &mut [];
// let b5: &[u8] = &mut [];
// let b6: &[u8] = &mut [];
// let b7: &[u8] = &mut [];
// let b8: &[u8] = &mut [];
// while let Some(chunk) = body.next().await {
// let mut chunk = chunk.unwrap();
// while chunk.has_remaining() {
// let mut dst = [
// IoSlice::new(b1),
// IoSlice::new(b2),
// IoSlice::new(b3),
// IoSlice::new(b4),
// IoSlice::new(b5),
// IoSlice::new(b6),
// IoSlice::new(b7),
// IoSlice::new(b8),
// ];
// let filled = chunk.chunks_vectored(&mut dst[..]);
// tracing::trace!("filled: {filled} io slices");
//
// let wc = dest.write_vectored(&dst[0..filled]).await?;
// tracing::trace!("wrote: {wc} bytes");
// chunk.advance(wc);
// }
// }
// Ok(())
// }

async fn write_body(mut body: Body, mut dest: fs::File) -> Result<(), Box<dyn Error>> {
while let Some(chunk) = body.next().await {
let chunk = chunk.unwrap();
tracing::trace!("recv'd chunk remaining={}", chunk.remaining());
let mut segment_cnt = 1;
for segment in chunk.into_segments() {
dest.write_all(segment.as_ref()).await?;
tracing::trace!("wrote segment size: {}", segment.remaining());
segment_cnt += 1;
}
tracing::trace!("chunk had {segment_cnt} segments");
}
Ok(())
}

async fn warmup(config: &SdkConfig) -> Result<(), Box<dyn Error>> {
let s3 = aws_sdk_s3::Client::new(&config);
s3.list_buckets().send().await?;
Ok(())
}
30 changes: 11 additions & 19 deletions aws/hll/aws-s3-transfer-manager/src/download.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

/// Abstractions for response bodies and consuming data streams.
pub mod body;
mod context;
mod discovery;
Expand All @@ -16,11 +17,10 @@ use crate::download::discovery::{discover_obj, ObjectDiscovery};
use crate::download::handle::DownloadHandle;
use crate::download::worker::{distribute_work, download_chunks, ChunkResponse};
use crate::error::TransferError;
use crate::{MEBIBYTE, MIN_PART_SIZE};
use crate::MEBIBYTE;
use aws_sdk_s3::operation::get_object::builders::{GetObjectFluentBuilder, GetObjectInputBuilder};
use aws_types::SdkConfig;
use context::DownloadContext;
use std::{cmp, mem};
use tokio::sync::mpsc;
use tokio::task::JoinSet;
use tracing::Instrument;
Expand Down Expand Up @@ -53,7 +53,6 @@ impl From<GetObjectInputBuilder> for DownloadRequest {
#[derive(Debug, Clone)]
pub struct Builder {
target_part_size_bytes: u64,
checksum_validation_enabled: bool,
// TODO(design): should we instead consider an enum here allows for not only explicit but also
// an "Auto" mode that allows us to control the concurrency actually used based on overall transfer and part size?
concurrency: usize,
Expand All @@ -64,26 +63,16 @@ impl Builder {
fn new() -> Self {
Self {
target_part_size_bytes: 8 * MEBIBYTE,
checksum_validation_enabled: true,
concurrency: 8,
sdk_config: None,
}
}

/// Size of parts the object will be downloaded in, in bytes.
///
/// The minimum part size is 5 MiB and any value given less than that will be rounded up.
/// Defaults is 8 MiB.
pub fn target_part_size(mut self, size_bytes: u64) -> Self {
self.target_part_size_bytes = cmp::min(size_bytes, MIN_PART_SIZE);
self
}

/// Set whether checksum validation is enabled.
///
/// Default is true.
pub fn enable_checksum_validation(mut self, enabled: bool) -> Self {
self.checksum_validation_enabled = enabled;
self.target_part_size_bytes = size_bytes;
self
}

Expand All @@ -102,6 +91,7 @@ impl Builder {
self
}

/// Consumes the builder and constructs a [Downloader]
pub fn build(self) -> Downloader {
self.into()
}
Expand All @@ -115,7 +105,6 @@ impl From<Builder> for Downloader {
let client = aws_sdk_s3::Client::new(&sdk_config);
Self {
target_part_size_bytes: value.target_part_size_bytes,
checksum_validation_enabled: value.checksum_validation_enabled,
concurrency: value.concurrency,
client,
}
Expand All @@ -127,7 +116,6 @@ impl From<Builder> for Downloader {
#[derive(Debug, Clone)]
pub struct Downloader {
target_part_size_bytes: u64,
checksum_validation_enabled: bool,
concurrency: usize,
client: aws_sdk_s3::client::Client,
}
Expand Down Expand Up @@ -187,6 +175,11 @@ impl Downloader {
let part_size = self.target_part_size_bytes;
let rem = discovery.remaining.clone();

// TODO(aws-sdk-rust#1159) - test semaphore based approach where we create all futures at once,
// the downside is controlling memory usage as a large download may result in
// quite a few futures created. If more performant could be enabled for
// objects less than some size.

tasks.spawn(distribute_work(rem, input, part_size, start_seq, work_tx));

for i in 0..self.concurrency {
Expand All @@ -204,7 +197,7 @@ impl Downloader {
// have the correct metadata w.r.t. content-length and maybe others for the whole object.
object_meta: discovery.meta,
body: Body::new(comp_rx),
tasks,
_tasks: tasks,
};

Ok(handle)
Expand All @@ -218,11 +211,10 @@ async fn handle_discovery_chunk(
completed: &mpsc::Sender<Result<ChunkResponse, TransferError>>,
) -> u64 {
let mut start_seq = 0;
if let Some(initial_data) = mem::replace(&mut discovery.initial_chunk, None) {
if let Some(initial_data) = discovery.initial_chunk.take() {
let chunk = ChunkResponse {
seq: start_seq,
data: Some(initial_data),
object_meta: Some(discovery.meta.clone()),
};
completed.send(Ok(chunk)).await.expect("initial chunk");
start_seq = 1;
Expand Down
8 changes: 3 additions & 5 deletions aws/hll/aws-s3-transfer-manager/src/download/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ impl Body {
}

/// Convert this body into an unordered stream of chunks.
// TODO(aws-sdk-rust#1159) - revisit if we actually need/use unordered data stream
#[allow(dead_code)]
pub(crate) fn unordered(self) -> UnorderedBody {
self.inner
}
Expand Down Expand Up @@ -177,11 +179,7 @@ mod tests {
use super::{Body, Sequencer};

fn chunk_resp(seq: u64, data: Option<AggregatedBytes>) -> ChunkResponse {
ChunkResponse {
seq,
data,
object_meta: None,
}
ChunkResponse { seq, data }
}

#[test]
Expand Down
10 changes: 2 additions & 8 deletions aws/hll/aws-s3-transfer-manager/src/download/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ enum ObjectDiscoveryStrategy {
// Send a `HeadObject` request.
// The overall transfer is optionally constrained to the given range.
HeadObject(Option<ByteRange>),
// Send `GetObject` with `part_number` = 1
FirstPart,
// Send `GetObject` request using a ranged get.
// The overall transfer is optionally constrained to the given range.
RangedGet(Option<RangeInclusive<u64>>),
Expand Down Expand Up @@ -80,10 +78,6 @@ pub(super) async fn discover_obj(
ObjectDiscoveryStrategy::HeadObject(byte_range) => {
discover_obj_with_head(ctx, request, byte_range).await
}
ObjectDiscoveryStrategy::FirstPart => {
let r = request.input.clone().part_number(1);
discover_obj_with_get(ctx, r, None).await
}
ObjectDiscoveryStrategy::RangedGet(range) => {
let byte_range = match range.as_ref() {
Some(r) => ByteRange::Inclusive(
Expand Down Expand Up @@ -177,7 +171,7 @@ mod tests {
discover_obj, discover_obj_with_head, ObjectDiscoveryStrategy,
};
use crate::download::header::ByteRange;
use crate::MIN_PART_SIZE;
use crate::MEBIBYTE;
use aws_sdk_s3::operation::get_object::{GetObjectInput, GetObjectOutput};
use aws_sdk_s3::operation::head_object::HeadObjectOutput;
use aws_sdk_s3::Client;
Expand Down Expand Up @@ -222,7 +216,7 @@ mod tests {

let ctx = DownloadContext {
client,
target_part_size: MIN_PART_SIZE,
target_part_size: 5 * MEBIBYTE,
};
let request = GetObjectInput::builder()
.bucket("test-bucket")
Expand Down
2 changes: 1 addition & 1 deletion aws/hll/aws-s3-transfer-manager/src/download/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ pub struct DownloadHandle {
pub body: Body,

/// All child tasks spawned for this download
pub(crate) tasks: task::JoinSet<()>,
pub(crate) _tasks: task::JoinSet<()>,
}

impl DownloadHandle {
Expand Down
8 changes: 2 additions & 6 deletions aws/hll/aws-s3-transfer-manager/src/download/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
*/
use crate::download::context::DownloadContext;
use crate::download::header;
use crate::download::object_meta::ObjectMetadata;
use crate::error;
use crate::error::TransferError;
use aws_sdk_s3::operation::get_object::builders::GetObjectInputBuilder;
Expand Down Expand Up @@ -39,12 +38,10 @@ pub(crate) struct ChunkResponse {
pub(crate) seq: u64,
// chunk data
pub(crate) data: Option<AggregatedBytes>,
// object metadata
pub(crate) object_meta: Option<ObjectMetadata>,
}

/// Worker function that processes requests from the [requests] channel and
/// sends the result back on the [completed] channel.
/// Worker function that processes requests from the `requests` channel and
/// sends the result back on the `completed` channel.
pub(super) async fn download_chunks(
ctx: DownloadContext,
requests: async_channel::Receiver<ChunkRequest>,
Expand Down Expand Up @@ -89,7 +86,6 @@ async fn download_chunk(
Ok(ChunkResponse {
seq: request.seq,
data: Some(bytes),
object_meta: Some(ObjectMetadata::from(resp)),
})
}

Expand Down
Loading

0 comments on commit 523a558

Please sign in to comment.