Skip to content

Commit

Permalink
Add integration test for stall stream protection based on aws-sdk-rus…
Browse files Browse the repository at this point in the history
…t#1202 (#3874)

## Motivation and Context
A follow-up on #3871, responding to [the review
feedback](#3871 (review))

## Testing
- Also confirmed that reverting the change in the above PR (so that
`BinLabel::Pending` becomes the top of the list) failed the integration
test added to this PR, as expected.
```
2024-10-10T19:06:56.417686Z TRACE aws_smithy_runtime::client::http::body::minimum_throughput::http_body_0_4_x: received poll pending
2024-10-10T19:06:56.417694Z DEBUG aws_smithy_runtime::client::http::body::minimum_throughput::http_body_0_4_x: current throughput: 0 B/s is below minimum: 1 B/s
thread 'user_polls_pending_followed_by_data_for_every_bin_in_throughput_logs' panicked at aws-smithy-runtime/tests/stalled_stream_download.rs:252:10:
response MUST NOT timeout: ThroughputBelowMinimum { expected: Throughput { bytes_read: 1, per_time_elapsed: 1s }, actual: Throughput { bytes_read: 0, per_time_elapsed: 1s } }
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace


failures:
    user_polls_pending_followed_by_data_for_every_bin_in_throughput_logs
```

----

_By submitting this pull request, I confirm that you can use, modify,
copy, and redistribute this contribution, under the terms of your
choice._
  • Loading branch information
ysaito1001 authored Oct 10, 2024
1 parent e6b154b commit 1873874
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 5 deletions.
6 changes: 3 additions & 3 deletions rust-runtime/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion rust-runtime/aws-smithy-runtime/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "aws-smithy-runtime"
version = "1.7.2"
version = "1.7.3"
authors = ["AWS Rust SDK Team <[email protected]>", "Zelda Hessler <[email protected]>"]
description = "The new smithy runtime crate"
edition = "2021"
Expand Down
77 changes: 76 additions & 1 deletion rust-runtime/aws-smithy-runtime/tests/stalled_stream_download.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
#![cfg(all(feature = "client", feature = "test-util"))]

use std::time::Duration;
use tokio::sync::mpsc::channel;
use tokio::sync::Barrier;

#[macro_use]
mod stalled_stream_common;
Expand Down Expand Up @@ -194,10 +196,66 @@ async fn user_downloads_data_too_slowly() {
result.expect("response MUST NOT timeout");
}

/// Scenario: Derived from the reproduction steps in https://github.com/awslabs/aws-sdk-rust/issues/1202.
/// Expected: MUST NOT timeout.
#[tokio::test]
async fn user_polls_pending_followed_by_data_for_every_bin_in_throughput_logs() {
let _logs = show_test_logs();

let (time, sleep) = tick_advance_time_and_sleep();
let (server, response_sender) = channel_server();
let op = operation(server, time.clone(), sleep);

let (tx_server, mut rx_server) = channel(1);
let (tx_client, rx_client) = channel(1);

let server = tokio::spawn(async move {
for _ in 1..100 {
// Block until a signal has been received
let _ = rx_server.recv().await;
if response_sender.send(NEAT_DATA).await.is_err() {
// The client has shut down due to a minimum throughput detection error
break;
}
}
drop(response_sender);
});

let _ticker = tokio::spawn({
async move {
// Each `Bin` has a time resolution of 100ms. In every iteration, the client will go first, yielding
// a `Poll::Pending` in the first half of the allotted time. The server will then take its turn in the
// second half to generate data, allowing the client to yield a `Poll::Ready` immediately after.
// This creates a consistent pattern in throughput logs: within each 100ms interval, a newly created `Bin`
// will be assigned a `BinLabel::Pending`, followed by an attempt to assign `BinLabel::TransferredBytes` to
// the same `Bin`.
loop {
tick!(time, Duration::from_millis(50));
// We don't `unwrap` here since it will eventually fail when the client shuts down due to the minimum
// throughput detection error.
let _ = tx_client.send(()).await;
tick!(time, Duration::from_millis(50));
// We don't `unwrap` here since it will eventually fail when the server exits due to the client shutting
// down due to a minimum throughput detection error.
let _ = tx_server.send(()).await;
}
}
});

let response_body = op.invoke(()).await.expect("initial success");
let result = tokio::spawn(consume_on_signal(rx_client, response_body));
server.await.unwrap();

result
.await
.expect("no panics")
.expect("response MUST NOT timeout");
}

use download_test_tools::*;
use tokio::sync::Barrier;
mod download_test_tools {
use crate::stalled_stream_common::*;
use tokio::sync::mpsc::Receiver;

fn response(body: SdkBody) -> HttpResponse {
HttpResponse::try_from(
Expand Down Expand Up @@ -307,4 +365,21 @@ mod download_test_tools {
}
Ok(())
}

/// A client that allows us to control when data is consumed by sending a signal to `rx`.
pub async fn consume_on_signal(mut rx: Receiver<()>, body: SdkBody) -> Result<(), BoxError> {
// Wait to start polling until a signal has been received
let _ = rx.recv().await;
pin_mut!(body);
while let Some(result) = poll_fn(|cx| body.as_mut().poll_data(cx)).await {
if let Err(err) = result {
return Err(err);
} else {
info!("consumed bytes from the response body");
// Block until a signal has been received
let _ = rx.recv().await;
}
}
Ok(())
}
}

0 comments on commit 1873874

Please sign in to comment.