Skip to content

Commit

Permalink
disable stalled stream protection on empty bodies and after read comp…
Browse files Browse the repository at this point in the history
…lete (#3644)

## Motivation and Context
<!--- Why is this change required? What problem does it solve? -->
<!--- If it fixes an open issue, please link to the issue here -->
* awslabs/aws-sdk-rust#1141
* awslabs/aws-sdk-rust#1146
* awslabs/aws-sdk-rust#1148


## Description
* Disables stalled stream upload protection for requests with an
empty/zero length body.
* Disables stalled stream upload throughput checking once the request
body has been read and handed off to the HTTP layer.


## Testing
Additional integration tests added covering empty bodies and completed
uploads.

Tested SQS issue against latest runtime and can see it works now. The S3
`CopyObject` issue is related to downloads and will need a different
solution.

## Checklist
<!--- If a checkbox below is not applicable, then please DELETE it
rather than leaving it unchecked -->
- [x] I have updated `CHANGELOG.next.toml` if I made changes to the
smithy-rs codegen or runtime crates
- [x] I have updated `CHANGELOG.next.toml` if I made changes to the AWS
SDK, generated SDK code, or SDK runtime crates

----

_By submitting this pull request, I confirm that you can use, modify,
copy, and redistribute this contribution, under the terms of your
choice._

---------

Co-authored-by: Zelda Hessler <[email protected]>
Co-authored-by: ysaito1001 <[email protected]>
  • Loading branch information
3 people authored May 17, 2024
1 parent d755bd2 commit f0ddc66
Show file tree
Hide file tree
Showing 15 changed files with 339 additions and 49 deletions.
12 changes: 12 additions & 0 deletions CHANGELOG.next.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,18 @@ references = ["aws-sdk-rust#1079"]
meta = { "breaking" = false, "bug" = true, "tada" = false }
author = "rcoh"

[[aws-sdk-rust]]
message = "Fixes stalled upload stream protection to not apply to empty request bodies and to stop checking for violations once the request body has been read."
references = ["aws-sdk-rust#1141", "aws-sdk-rust#1146", "aws-sdk-rust#1148"]
meta = { "breaking" = false, "tada" = false, "bug" = true }
authors = ["aajtodd", "Velfi"]

[[smithy-rs]]
message = "Fixes stalled upload stream protection to not apply to empty request bodies and to stop checking for violations once the request body has been read."
references = ["aws-sdk-rust#1141", "aws-sdk-rust#1146", "aws-sdk-rust#1148"]
meta = { "breaking" = false, "tada" = false, "bug" = true }
authors = ["aajtodd", "Velfi"]

[[aws-sdk-rust]]
message = "Updating the documentation for the `app_name` method on `ConfigLoader` to indicate the order of precedence for the sources of the `AppName`."
references = ["smithy-rs#3645"]
Expand Down
2 changes: 1 addition & 1 deletion rust-runtime/aws-smithy-runtime-api/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "aws-smithy-runtime-api"
version = "1.6.0"
version = "1.6.1"
authors = ["AWS Rust SDK Team <[email protected]>", "Zelda Hessler <[email protected]>"]
description = "Smithy runtime types."
edition = "2021"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,11 @@
use aws_smithy_types::config_bag::{Storable, StoreReplace};
use std::time::Duration;

const DEFAULT_GRACE_PERIOD: Duration = Duration::from_secs(5);
/// The default grace period for stalled stream protection.
///
/// When a stream stalls for longer than this grace period, the stream will
/// return an error.
pub const DEFAULT_GRACE_PERIOD: Duration = Duration::from_secs(20);

/// Configuration for stalled stream protection.
///
Expand Down
2 changes: 1 addition & 1 deletion rust-runtime/aws-smithy-runtime/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "aws-smithy-runtime"
version = "1.5.2"
version = "1.5.3"
authors = ["AWS Rust SDK Team <[email protected]>", "Zelda Hessler <[email protected]>"]
description = "The new smithy runtime crate"
edition = "2021"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,10 @@ impl UploadThroughput {
self.logs.lock().unwrap().push_bytes_transferred(now, bytes);
}

pub(crate) fn mark_complete(&self) -> bool {
self.logs.lock().unwrap().mark_complete()
}

pub(crate) fn report(&self, now: SystemTime) -> ThroughputReport {
self.logs.lock().unwrap().report(now)
}
Expand Down Expand Up @@ -177,6 +181,8 @@ trait UploadReport {
impl UploadReport for ThroughputReport {
fn minimum_throughput_violated(self, minimum_throughput: Throughput) -> (bool, Throughput) {
let throughput = match self {
// stream has been exhausted, stop tracking violations
ThroughputReport::Complete => return (false, ZERO_THROUGHPUT),
// If the report is incomplete, then we don't have enough data yet to
// decide if minimum throughput was violated.
ThroughputReport::Incomplete => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ trait DownloadReport {
impl DownloadReport for ThroughputReport {
fn minimum_throughput_violated(self, minimum_throughput: Throughput) -> (bool, Throughput) {
let throughput = match self {
ThroughputReport::Complete => return (false, ZERO_THROUGHPUT),
// If the report is incomplete, then we don't have enough data yet to
// decide if minimum throughput was violated.
ThroughputReport::Incomplete => {
Expand Down Expand Up @@ -175,6 +176,18 @@ where
tracing::trace!("received data: {}", bytes.len());
this.throughput
.push_bytes_transferred(now, bytes.len() as u64);

// hyper will optimistically stop polling when end of stream is reported
// (e.g. when content-length amount of data has been consumed) which means
// we may never get to `Poll:Ready(None)`. Check for same condition and
// attempt to stop checking throughput violations _now_ as we may never
// get polled again. The caveat here is that it depends on `Body` implementations
// implementing `is_end_stream()` correctly. Users can also disable SSP as an
// alternative for such fringe use cases.
if self.is_end_stream() {
tracing::trace!("stream reported end of stream before Poll::Ready(None) reached; marking stream complete");
self.throughput.mark_complete();
}
Poll::Ready(Some(Ok(bytes)))
}
Poll::Pending => {
Expand All @@ -183,7 +196,12 @@ where
Poll::Pending
}
// If we've read all the data or an error occurred, then return that result.
res => res,
res => {
if this.throughput.mark_complete() {
tracing::trace!("stream completed: {:?}", res);
}
res
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@
*/

use super::Throughput;
use aws_smithy_runtime_api::client::stalled_stream_protection::StalledStreamProtectionConfig;
use aws_smithy_runtime_api::client::stalled_stream_protection::{
StalledStreamProtectionConfig, DEFAULT_GRACE_PERIOD,
};
use std::time::Duration;

/// A collection of options for configuring a [`MinimumThroughputBody`](super::MinimumThroughputBody).
/// A collection of options for configuring a [`MinimumThroughputBody`](super::MinimumThroughputDownloadBody).
#[derive(Debug, Clone)]
pub struct MinimumThroughputBodyOptions {
/// The minimum throughput that is acceptable.
Expand Down Expand Up @@ -69,6 +71,13 @@ impl MinimumThroughputBodyOptions {
}
}

const DEFAULT_MINIMUM_THROUGHPUT: Throughput = Throughput {
bytes_read: 1,
per_time_elapsed: Duration::from_secs(1),
};

const DEFAULT_CHECK_WINDOW: Duration = Duration::from_secs(1);

impl Default for MinimumThroughputBodyOptions {
fn default() -> Self {
Self {
Expand All @@ -87,14 +96,6 @@ pub struct MinimumThroughputBodyOptionsBuilder {
grace_period: Option<Duration>,
}

const DEFAULT_GRACE_PERIOD: Duration = Duration::from_secs(0);
const DEFAULT_MINIMUM_THROUGHPUT: Throughput = Throughput {
bytes_read: 1,
per_time_elapsed: Duration::from_secs(1),
};

const DEFAULT_CHECK_WINDOW: Duration = Duration::from_secs(1);

impl MinimumThroughputBodyOptionsBuilder {
/// Create a new `MinimumThroughputBodyOptionsBuilder`.
pub fn new() -> Self {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,8 @@ pub(crate) enum ThroughputReport {
Pending,
/// The stream transferred this amount of throughput during the time window.
Transferred(Throughput),
/// The stream has completed, no more data is expected.
Complete,
}

const BIN_COUNT: usize = 10;
Expand All @@ -285,6 +287,7 @@ pub(super) struct ThroughputLogs {
resolution: Duration,
current_tail: SystemTime,
buffer: LogBuffer<BIN_COUNT>,
stream_complete: bool,
}

impl ThroughputLogs {
Expand All @@ -302,6 +305,7 @@ impl ThroughputLogs {
resolution,
current_tail: now,
buffer: LogBuffer::new(),
stream_complete: false,
}
}

Expand Down Expand Up @@ -343,8 +347,24 @@ impl ThroughputLogs {
assert!(self.current_tail >= now);
}

/// Mark the stream complete indicating no more data is expected. This is an
/// idempotent operation -- subsequent invocations of this function have no effect
/// and return false.
///
/// After marking a stream complete [report](#method.report) will forever more return
/// [ThroughputReport::Complete]
pub(super) fn mark_complete(&mut self) -> bool {
let prev = self.stream_complete;
self.stream_complete = true;
!prev
}

/// Generates an overall report of the time window.
pub(super) fn report(&mut self, now: SystemTime) -> ThroughputReport {
if self.stream_complete {
return ThroughputReport::Complete;
}

self.catch_up(now);
self.buffer.fill_gaps();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,12 @@ impl Intercept for StalledStreamProtectionInterceptor {
) -> Result<(), BoxError> {
if let Some(sspcfg) = cfg.load::<StalledStreamProtectionConfig>().cloned() {
if sspcfg.upload_enabled() {
if let Some(0) = context.request().body().content_length() {
tracing::trace!(
"skipping stalled stream protection for zero length request body"
);
return Ok(());
}
let (_async_sleep, time_source) = get_runtime_component_deps(runtime_components)?;
let now = time_source.now();

Expand Down
12 changes: 10 additions & 2 deletions rust-runtime/aws-smithy-runtime/tests/stalled_stream_download.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,17 +105,24 @@ async fn download_stalls() {
let (time, sleep) = tick_advance_time_and_sleep();
let (server, response_sender) = channel_server();
let op = operation(server, time.clone(), sleep);
let barrier = Arc::new(Barrier::new(2));

let c = barrier.clone();
let server = tokio::spawn(async move {
for _ in 1..10 {
c.wait().await;
for i in 1..10 {
tracing::debug!("send {i}");
response_sender.send(NEAT_DATA).await.unwrap();
tick!(time, Duration::from_secs(1));
}
tick!(time, Duration::from_secs(10));
});

let response_body = op.invoke(()).await.expect("initial success");
let result = tokio::spawn(eagerly_consume(response_body));
let result = tokio::spawn(async move {
barrier.wait().await;
eagerly_consume(response_body).await
});
server.await.unwrap();

let err = result
Expand Down Expand Up @@ -188,6 +195,7 @@ async fn user_downloads_data_too_slowly() {
}

use download_test_tools::*;
use tokio::sync::Barrier;
mod download_test_tools {
use crate::stalled_stream_common::*;

Expand Down
Loading

0 comments on commit f0ddc66

Please sign in to comment.