diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 63c5e22a2f..0b5eba9b9b 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -198,12 +198,12 @@ jobs: include: - os: windows-latest # s2n-tls and s2n-quic-dc don't currently build on windows - exclude: --exclude s2n-quic-tls --exclude s2n-quic-dc + exclude: --exclude s2n-quic-tls --exclude s2n-quic-dc --exclude s2n-quic-dc-benches - rust: stable os: ubuntu-latest target: aarch64-unknown-linux-gnu # s2n-quic-dc doesn't currently build on aarch64 - exclude: --exclude s2n-quic-dc + exclude: --exclude s2n-quic-dc --exclude s2n-quic-dc-benches - rust: stable os: ubuntu-latest target: i686-unknown-linux-gnu @@ -217,7 +217,7 @@ jobs: target: native env: S2N_QUIC_PLATFORM_FEATURES_OVERRIDE="" # s2n-quic-dc requires platform features - exclude: --exclude s2n-quic-dc + exclude: --exclude s2n-quic-dc --exclude s2n-quic-dc-benches - rust: stable os: ubuntu-latest target: native diff --git a/dc/s2n-quic-dc-benches/Cargo.toml b/dc/s2n-quic-dc-benches/Cargo.toml new file mode 100644 index 0000000000..2d4e502252 --- /dev/null +++ b/dc/s2n-quic-dc-benches/Cargo.toml @@ -0,0 +1,16 @@ +[package] +name = "s2n-quic-dc-benches" +version = "0.1.0" +edition = "2021" +publish = false + +[dependencies] +aws-lc-rs = "1" +criterion = { version = "0.5", features = ["html_reports", "async_tokio"] } +s2n-codec = { path = "../../common/s2n-codec" } +s2n-quic-dc = { path = "../s2n-quic-dc", features = ["testing"] } +tokio = { version = "1", features = ["full"] } + +[[bench]] +name = "bench" +harness = false diff --git a/dc/s2n-quic-dc/benches/benches/bench.rs b/dc/s2n-quic-dc-benches/benches/bench.rs similarity index 75% rename from dc/s2n-quic-dc/benches/benches/bench.rs rename to dc/s2n-quic-dc-benches/benches/bench.rs index 74ba0a4441..696435a462 100644 --- a/dc/s2n-quic-dc/benches/benches/bench.rs +++ b/dc/s2n-quic-dc-benches/benches/bench.rs @@ -3,5 +3,5 @@ use criterion::{criterion_group, criterion_main}; -criterion_group!(benches, ::benches::benchmarks); +criterion_group!(benches, s2n_quic_dc_benches::benchmarks); criterion_main!(benches); diff --git a/dc/s2n-quic-dc/benches/src/crypto.rs b/dc/s2n-quic-dc-benches/src/crypto.rs similarity index 100% rename from dc/s2n-quic-dc/benches/src/crypto.rs rename to dc/s2n-quic-dc-benches/src/crypto.rs diff --git a/dc/s2n-quic-dc/benches/src/crypto/encrypt.rs b/dc/s2n-quic-dc-benches/src/crypto/encrypt.rs similarity index 83% rename from dc/s2n-quic-dc/benches/src/crypto/encrypt.rs rename to dc/s2n-quic-dc-benches/src/crypto/encrypt.rs index 18baee88fb..8383fffb87 100644 --- a/dc/s2n-quic-dc/benches/src/crypto/encrypt.rs +++ b/dc/s2n-quic-dc-benches/src/crypto/encrypt.rs @@ -33,12 +33,7 @@ pub fn benchmarks(c: &mut Criterion) { let mut payload = black_box(payload.to_vec()); let mut packet_number = 0u32; b.iter(move || { - let _ = black_box(awslc::encrypt( - &key, - &mut packet_number, - header, - &mut payload, - )); + awslc::encrypt(&key, &mut packet_number, header, &mut payload); }); }, ); @@ -51,12 +46,7 @@ pub fn benchmarks(c: &mut Criterion) { let mut packet_number = 0u32; b.iter(move || { let key = black_box(awslc::key(algo)); - let _ = black_box(awslc::encrypt( - &key, - &mut packet_number, - header, - &mut payload, - )); + awslc::encrypt(&key, &mut packet_number, header, &mut payload); }); }, ); diff --git a/dc/s2n-quic-dc/benches/src/crypto/hkdf.rs b/dc/s2n-quic-dc-benches/src/crypto/hkdf.rs similarity index 92% rename from dc/s2n-quic-dc/benches/src/crypto/hkdf.rs rename to dc/s2n-quic-dc-benches/src/crypto/hkdf.rs index 1f2c11b41a..e0d5d9ecab 100644 --- a/dc/s2n-quic-dc/benches/src/crypto/hkdf.rs +++ b/dc/s2n-quic-dc-benches/src/crypto/hkdf.rs @@ -36,7 +36,7 @@ fn psk(c: &mut Criterion) { let label = black_box(vec![42u8; label_len]); let mut out = black_box(vec![0u8; key_len]); b.iter(move || { - let _ = black_box(awslc::derive_psk(&prk, &label, &mut out)); + awslc::derive_psk(&prk, &label, &mut out); }); }, ); @@ -52,7 +52,7 @@ fn psk(c: &mut Criterion) { let mut out = black_box(vec![0u8; key_len]); b.iter(move || { let prk = black_box(awslc::prk(&key, alg)); - let _ = black_box(awslc::derive_psk(&prk, &label, &mut out)); + awslc::derive_psk(&prk, &label, &mut out); }); }, ); @@ -75,7 +75,7 @@ mod awslc { let out_len = out.len(); let out_len = OutLen(out_len); - prk.expand(&[&label], out_len) + prk.expand(&[label], out_len) .unwrap() .fill(&mut out[..out_len.0]) .unwrap(); diff --git a/dc/s2n-quic-dc/benches/src/crypto/hmac.rs b/dc/s2n-quic-dc-benches/src/crypto/hmac.rs similarity index 99% rename from dc/s2n-quic-dc/benches/src/crypto/hmac.rs rename to dc/s2n-quic-dc-benches/src/crypto/hmac.rs index 689abc4ca9..8875b82595 100644 --- a/dc/s2n-quic-dc/benches/src/crypto/hmac.rs +++ b/dc/s2n-quic-dc-benches/src/crypto/hmac.rs @@ -25,7 +25,7 @@ fn init(c: &mut Criterion) { aws_lc_rs::rand::fill(&mut key).unwrap(); let key = black_box(&key); b.iter(move || { - let _ = black_box(aws_lc_rs::hmac::Key::new(alg, &key)); + let _ = black_box(aws_lc_rs::hmac::Key::new(alg, key)); }); }); } diff --git a/dc/s2n-quic-dc/benches/src/datagram.rs b/dc/s2n-quic-dc-benches/src/datagram.rs similarity index 100% rename from dc/s2n-quic-dc/benches/src/datagram.rs rename to dc/s2n-quic-dc-benches/src/datagram.rs diff --git a/dc/s2n-quic-dc/benches/src/datagram/recv.rs b/dc/s2n-quic-dc-benches/src/datagram/recv.rs similarity index 100% rename from dc/s2n-quic-dc/benches/src/datagram/recv.rs rename to dc/s2n-quic-dc-benches/src/datagram/recv.rs diff --git a/dc/s2n-quic-dc/benches/src/datagram/send.rs b/dc/s2n-quic-dc-benches/src/datagram/send.rs similarity index 100% rename from dc/s2n-quic-dc/benches/src/datagram/send.rs rename to dc/s2n-quic-dc-benches/src/datagram/send.rs diff --git a/dc/s2n-quic-dc/benches/src/lib.rs b/dc/s2n-quic-dc-benches/src/lib.rs similarity index 85% rename from dc/s2n-quic-dc/benches/src/lib.rs rename to dc/s2n-quic-dc-benches/src/lib.rs index 09ded95a8b..36a47f8356 100644 --- a/dc/s2n-quic-dc/benches/src/lib.rs +++ b/dc/s2n-quic-dc-benches/src/lib.rs @@ -5,8 +5,10 @@ use criterion::Criterion; pub mod crypto; pub mod datagram; +pub mod streams; pub fn benchmarks(c: &mut Criterion) { crypto::benchmarks(c); datagram::benchmarks(c); + streams::benchmarks(c); } diff --git a/dc/s2n-quic-dc-benches/src/streams.rs b/dc/s2n-quic-dc-benches/src/streams.rs new file mode 100644 index 0000000000..69ae425117 --- /dev/null +++ b/dc/s2n-quic-dc-benches/src/streams.rs @@ -0,0 +1,95 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use criterion::Criterion; +use s2n_quic_dc::stream::{self, server::tokio::accept, socket::Protocol}; +use tokio::{ + io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}, + net::{TcpListener, TcpStream}, +}; + +async fn copy_data( + input: &'static [u8], + a: impl AsyncWrite + Send + 'static, + b: impl AsyncRead + Send + 'static, +) { + let a = tokio::spawn(async move { + tokio::pin!(a); + for _ in 0..30 { + a.write_all(input).await.unwrap(); + } + a.shutdown().await.unwrap(); + }); + + let b = tokio::spawn(async move { + tokio::pin!(b); + let mut void = vec![0; 1024 * 1024]; + while b.read(&mut void[..]).await.unwrap() != 0 { + // Read until EOF + } + }); + + tokio::try_join!(a, b).unwrap(); +} + +fn pair( + protocol: Protocol, + accept_flavor: accept::Flavor, +) -> (stream::testing::Client, stream::testing::Server) { + let client = stream::testing::Client::default(); + let server = stream::testing::Server::new(protocol, accept_flavor); + client.handshake_with(&server).unwrap(); + (client, server) +} + +pub fn benchmarks(c: &mut Criterion) { + let mut group = c.benchmark_group("streams/throughput"); + + group.throughput(criterion::Throughput::Bytes(1024 * 1024 * 30)); + + let rt = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap(); + + let buffer = &*vec![0x0; 1024 * 1024].leak(); + + group.bench_function(criterion::BenchmarkId::new("duplex", ""), |b| { + b.to_async(&rt).iter(move || async move { + let (a, b) = tokio::io::duplex(1024 * 1024); + copy_data(buffer, a, b).await; + }); + }); + + group.bench_function(criterion::BenchmarkId::new("tcp", ""), |b| { + b.to_async(&rt).iter(move || async move { + let server = TcpListener::bind("localhost:0").await.unwrap(); + let server_addr = server.local_addr().unwrap(); + let (a, b) = tokio::join!(TcpStream::connect(server_addr), async move { + server.accept().await.unwrap().0 + }); + copy_data(buffer, a.unwrap(), b).await; + }); + }); + + for protocol in [Protocol::Udp, Protocol::Tcp] { + let _rt = rt.enter(); + let (client, server) = pair(protocol, accept::Flavor::Fifo); + let name = format!("{protocol:?}").to_lowercase(); + group.bench_function(criterion::BenchmarkId::new("dcquic", name), |b| { + b.to_async(&rt).iter(|| { + let client = &client; + let server = &server; + async move { + let (a, b) = + tokio::join!(async { client.connect_to(server).await.unwrap() }, async { + let (b, _addr) = server.accept().await.unwrap(); + b + }); + + copy_data(buffer, a, b).await; + } + }); + }); + } +} diff --git a/dc/s2n-quic-dc/Cargo.toml b/dc/s2n-quic-dc/Cargo.toml index 5857bd6b93..f4fb3ee53d 100644 --- a/dc/s2n-quic-dc/Cargo.toml +++ b/dc/s2n-quic-dc/Cargo.toml @@ -12,7 +12,7 @@ exclude = ["corpus.tar.gz"] [features] default = ["tokio"] -testing = ["bolero-generator", "s2n-quic-core/testing", "tracing-subscriber"] +testing = ["bolero-generator", "s2n-quic-core/testing", "s2n-quic-platform/testing", "tracing-subscriber"] tokio = ["tokio/io-util", "tokio/net", "tokio/rt-multi-thread", "tokio/time"] [dependencies] @@ -51,6 +51,7 @@ bolero-generator = "0.11" insta = "1" s2n-codec = { path = "../../common/s2n-codec", features = ["testing"] } s2n-quic-core = { path = "../../quic/s2n-quic-core", features = ["testing"] } +s2n-quic-platform = { path = "../../quic/s2n-quic-platform", features = ["testing"] } tokio = { version = "1", features = ["full"] } tracing-subscriber = { version = "0.3", features = ["env-filter"] } @@ -58,4 +59,5 @@ tracing-subscriber = { version = "0.3", features = ["env-filter"] } level = "warn" check-cfg = [ 'cfg(kani)', + 'cfg(todo)', ] diff --git a/dc/s2n-quic-dc/benches/Cargo.toml b/dc/s2n-quic-dc/benches/Cargo.toml deleted file mode 100644 index bdef01d857..0000000000 --- a/dc/s2n-quic-dc/benches/Cargo.toml +++ /dev/null @@ -1,28 +0,0 @@ -[package] -name = "benches" -version = "0.1.0" -edition = "2021" -publish = false - -[dependencies] -aws-lc-rs = "1" -criterion = { version = "0.5", features = ["html_reports"] } -s2n-codec = { path = "../../../common/s2n-codec" } -s2n-quic-dc = { path = "../../s2n-quic-dc", features = ["testing"] } - -[[bench]] -name = "bench" -harness = false - -[workspace] -members = ["."] - -[profile.release] -lto = true -codegen-units = 1 -incremental = false - -[profile.bench] -lto = true -codegen-units = 1 -incremental = false diff --git a/dc/s2n-quic-dc/src/path/secret/map.rs b/dc/s2n-quic-dc/src/path/secret/map.rs index b2c6b02847..19310e936d 100644 --- a/dc/s2n-quic-dc/src/path/secret/map.rs +++ b/dc/s2n-quic-dc/src/path/secret/map.rs @@ -19,6 +19,9 @@ mod state; mod status; mod store; +#[cfg(any(test, feature = "testing"))] +pub mod testing; + #[cfg(test)] mod event_tests; @@ -213,8 +216,8 @@ impl Map { self.store.test_insert(entry); } - #[cfg(test)] - fn test_insert_pair( + #[cfg(any(test, feature = "testing"))] + pub(crate) fn test_insert_pair( &self, local_addr: SocketAddr, peer: &Self, diff --git a/dc/s2n-quic-dc/src/path/secret/map/testing.rs b/dc/s2n-quic-dc/src/path/secret/map/testing.rs new file mode 100644 index 0000000000..07e6720a82 --- /dev/null +++ b/dc/s2n-quic-dc/src/path/secret/map/testing.rs @@ -0,0 +1,28 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use crate::{event, path::secret}; + +pub fn new(capacity: usize) -> secret::Map { + crate::testing::init_tracing(); + + let subscriber = event::tracing::Subscriber::default(); + + let signer = secret::stateless_reset::Signer::random(); + + if s2n_quic_platform::io::testing::is_in_env() { + secret::Map::new( + signer, + capacity, + s2n_quic_platform::io::testing::time::Clock::default(), + subscriber, + ) + } else { + secret::Map::new( + signer, + capacity, + s2n_quic_core::time::StdClock::default(), + subscriber, + ) + } +} diff --git a/dc/s2n-quic-dc/src/stream.rs b/dc/s2n-quic-dc/src/stream.rs index 1cd9465841..2ad810d6e4 100644 --- a/dc/s2n-quic-dc/src/stream.rs +++ b/dc/s2n-quic-dc/src/stream.rs @@ -26,6 +26,9 @@ pub mod server; pub mod shared; pub mod socket; +#[cfg(any(test, feature = "testing"))] +pub mod testing; + bitflags::bitflags! { #[derive(Clone, Copy, Debug, PartialEq, Eq)] pub struct TransportFeatures: u8 { diff --git a/dc/s2n-quic-dc/src/stream/application.rs b/dc/s2n-quic-dc/src/stream/application.rs index 32171c8157..846f2e2f90 100644 --- a/dc/s2n-quic-dc/src/stream/application.rs +++ b/dc/s2n-quic-dc/src/stream/application.rs @@ -10,7 +10,7 @@ use crate::{ socket, }, }; -use core::fmt; +use core::{fmt, time::Duration}; use s2n_quic_core::{buffer, time::Timestamp}; use std::{io, net::SocketAddr}; @@ -25,11 +25,11 @@ pub struct Builder { impl Builder { /// Builds the stream and emits an event indicating that the stream was built #[inline] - pub(crate) fn build(self, publisher: &Pub) -> io::Result + pub(crate) fn build(self, publisher: &Pub) -> io::Result<(Stream, Duration)> where Pub: event::EndpointPublisher, { - { + let sojourn_time = { let remote_address = self.shared.read_remote_addr(); let remote_address = &remote_address; let credential_id = &*self.shared.credentials().id; @@ -43,9 +43,12 @@ impl Builder { stream_id, sojourn_time, }); - } + + sojourn_time + }; self.build_without_event() + .map(|stream| (stream, sojourn_time)) } #[inline] diff --git a/dc/s2n-quic-dc/src/stream/send/tests.rs b/dc/s2n-quic-dc/src/stream/send/tests.rs index cf1406c942..4d7ab31f3a 100644 --- a/dc/s2n-quic-dc/src/stream/send/tests.rs +++ b/dc/s2n-quic-dc/src/stream/send/tests.rs @@ -1,2 +1,263 @@ // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 + +use crate::stream::{socket::Protocol, testing}; +use core::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tracing::Instrument as _; + +fn pair(protocol: Protocol) -> (testing::Client, testing::Server) { + let client = testing::Client::default(); + let server = testing::Server::new(protocol, Default::default()); + (client, server) +} + +bitflags::bitflags!( + #[derive(Clone, Copy, Debug, Default)] + struct TestFeatures: u16 { + const EXPLICIT_SHUTDOWN = 1 << 0; + const FLOW_LIMITED = 1 << 1; + const SEND_LIMITED = 1 << 2; + const RECV_LIMITED = 1 << 3; + } +); + +async fn run(protocol: Protocol, buffer_len: usize, iterations: usize, features: TestFeatures) { + let (client, server) = pair(protocol); + let server_handle = server.handle(); + + let (server_response, client_response) = tokio::sync::oneshot::channel(); + + tokio::spawn( + async move { + let mut server_response = Some(server_response); + loop { + let (mut stream, _peer_addr) = server.accept().await.unwrap(); + let server_response = server_response.take().unwrap(); + + let total = Arc::new(AtomicUsize::new(0)); + + tokio::spawn({ + let total = total.clone(); + async move { + let mut prev = 0; + loop { + tokio::time::sleep(core::time::Duration::from_secs(1)).await; + let total = total.load(Ordering::Relaxed); + let gbps = (total - prev) as f64 * 8e-9; + prev = total; + println!("total={total} gbps={gbps:.2}"); + } + } + }); + + tokio::spawn( + async move { + let mut data = vec![0; 1 << 17]; + loop { + let len = stream.read(&mut data).await.unwrap(); + if len == 0 { + break; + } + total.fetch_add(len, Ordering::Relaxed); + if features.contains(TestFeatures::RECV_LIMITED) { + tokio::time::sleep(core::time::Duration::from_millis(1)).await; + } + } + let _ = server_response.send(total.load(Ordering::Relaxed)); + } + .instrument(tracing::debug_span!("stream")), + ); + } + } + .instrument(tracing::debug_span!("server")), + ); + + let expected = buffer_len * iterations; + println!("expected={expected}"); + + tokio::spawn( + async move { + let mut stream = client.connect_to(&server_handle).await.unwrap(); + let mut total = 0; + let buffer = vec![0; buffer_len]; + for _ in 0..iterations { + stream.write_all(&buffer).await.unwrap(); + total += buffer.len(); + if features.contains(TestFeatures::SEND_LIMITED) { + tokio::time::sleep(core::time::Duration::from_millis(1)).await; + } + } + assert_eq!(total, expected); + + if features.contains(TestFeatures::EXPLICIT_SHUTDOWN) { + let _ = stream.shutdown().await; + } + } + .instrument(tracing::debug_span!("application")), + ); + + let actual = client_response.await.unwrap(); + assert_eq!(expected, actual); + + // TODO make sure the worker shut down correctly + //worker.await.unwrap(); +} + +macro_rules! suite { + ($flavor:literal, $name:ident) => { + mod $name { + use super::{TestFeatures as F, *}; + + fn large_times() -> usize { + std::env::var("S2N_QUIC_DC_LARGE_TIMES") + .ok() + .and_then(|x| x.parse().ok()) + .unwrap_or(100) + } + + suite!($flavor, empty, 0, 0, F::default()); + + suite!($flavor, write_1k, 1000); + suite!($flavor, write_10k, 10_000); + suite!($flavor, write_100k, 100_000); + suite!($flavor, write_100k_10_times, 100_000, 10); + suite!($flavor, write_100k_x_times, 100_000, large_times()); + } + }; + ($flavor:literal, $name:ident, $size:expr) => { + suite!($flavor, $name, $size, 1); + }; + ($flavor:literal, $name:ident, $size:expr, $times:expr, $features:expr) => { + mod $name { + use super::*; + + #[tokio::test(flavor = $flavor)] + async fn drop_test() { + run(PROTOCOL, $size, $times, $features).await; + } + + #[tokio::test(flavor = $flavor)] + async fn shutdown_test() { + run(PROTOCOL, $size, $times, $features | F::EXPLICIT_SHUTDOWN).await; + } + } + }; + ($flavor:literal, $name:ident, $size:expr, $times:expr) => { + mod $name { + use super::*; + + suite!($flavor, send_limited, $size, $times, F::SEND_LIMITED); + suite!($flavor, recv_limited, $size, $times, F::RECV_LIMITED); + suite!($flavor, flow_limited, $size, $times, F::FLOW_LIMITED); + suite!($flavor, congestion_limited, $size, $times, F::default()); + } + }; +} + +macro_rules! negative_suite { + () => { + mod negative { + use super::*; + + #[tokio::test] + async fn unresponsive_reader_test() { + let (client, server) = pair(PROTOCOL); + let server_handle = server.handle(); + + tokio::spawn( + async move { + loop { + let (stream, _peer_addr) = server.accept().await.unwrap(); + + tokio::spawn( + async move { + let () = core::future::pending().await; + drop(stream); + } + .instrument(tracing::debug_span!("stream")), + ); + } + } + .instrument(tracing::debug_span!("server")), + ); + + let application = tokio::spawn( + async move { + let mut stream = client.connect_to(&server_handle).await.unwrap(); + stream.write_all(b"hello!").await?; + stream.shutdown().await + } + .instrument(tracing::debug_span!("application")), + ); + + // the application should succeed, even if the server didn't respond + application.await.unwrap().unwrap(); + } + + #[tokio::test] + async fn panicking_writer_test() { + let (client, server) = pair(PROTOCOL); + let server_handle = server.handle(); + + let (server_response, client_response) = tokio::sync::oneshot::channel(); + + tokio::spawn( + async move { + let mut server_response = Some(server_response); + loop { + let (mut stream, _peer_addr) = server.accept().await.unwrap(); + let server_response = server_response.take().unwrap(); + + tokio::spawn( + async move { + let mut buffer = vec![]; + let _ = + server_response.send(stream.read_to_end(&mut buffer).await); + } + .instrument(tracing::debug_span!("stream")), + ); + } + } + .instrument(tracing::debug_span!("server")), + ); + + tokio::spawn( + async move { + let mut stream = client.connect_to(&server_handle).await.unwrap(); + let _ = stream.write_all(b"hello!").await; + panic!("the application panicked (as expected)!"); + } + .instrument(tracing::debug_span!("application")), + ); + + match client_response.await { + Ok(Err(_)) => {} + other => { + panic!("unexpected result {other:?}"); + } + } + } + } + }; +} + +mod tcp { + use super::*; + const PROTOCOL: Protocol = Protocol::Tcp; + + suite!("current_thread", current_thread); + suite!("multi_thread", multi_thread); + negative_suite!(); +} + +#[cfg(todo)] // These tests are currently flaky +mod udp { + use super::*; + const PROTOCOL: Protocol = Protocol::Udp; + + suite!("current_thread", current_thread); + suite!("multi_thread", multi_thread); + negative_suite!(); +} diff --git a/dc/s2n-quic-dc/src/stream/server/tokio/accept.rs b/dc/s2n-quic-dc/src/stream/server/tokio/accept.rs index 14495d7be0..a09d9a006e 100644 --- a/dc/s2n-quic-dc/src/stream/server/tokio/accept.rs +++ b/dc/s2n-quic-dc/src/stream/server/tokio/accept.rs @@ -26,7 +26,16 @@ pub type Sender = channel::Sender; pub type Receiver = channel::Receiver; #[inline] -pub async fn accept(streams: &Receiver, subscriber: &Sub) -> io::Result<(Stream, SocketAddr)> +pub fn channel(capacity: usize) -> (Sender, Receiver) { + channel::new(capacity) +} + +#[inline] +pub async fn accept( + streams: &Receiver, + stats: &stats::Sender, + subscriber: &Sub, +) -> io::Result<(Stream, SocketAddr)> where Sub: Subscriber, { @@ -46,7 +55,9 @@ where ); // build the stream inside the application context - let stream = stream.build(&publisher)?; + let (stream, sojourn_time) = stream.build(&publisher)?; + stats.send(sojourn_time); + let remote_addr = stream.peer_addr()?; Ok((stream, remote_addr)) diff --git a/dc/s2n-quic-dc/src/stream/testing.rs b/dc/s2n-quic-dc/src/stream/testing.rs new file mode 100644 index 0000000000..cf0f90355e --- /dev/null +++ b/dc/s2n-quic-dc/src/stream/testing.rs @@ -0,0 +1,270 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use super::{server::tokio::stats, socket::Protocol}; +use crate::{ + event, + path::secret, + stream::{ + application::Stream, + client::tokio as client, + environment::{tokio as env, Environment as _}, + server::tokio::{self as server, accept}, + }, +}; +use std::{io, net::SocketAddr}; +use tracing::Instrument; + +pub struct Client { + map: secret::Map, + env: env::Environment, +} + +impl Default for Client { + fn default() -> Self { + let _span = tracing::info_span!("client").entered(); + let map = secret::map::testing::new(16); + Self { + map, + env: Default::default(), + } + } +} + +impl Client { + pub fn handshake_with>( + &self, + server: &S, + ) -> io::Result { + let server = server.as_ref(); + if self.map.contains(server.local_addr) { + Ok(secret::HandshakeKind::Cached) + } else { + let local_addr = "127.0.0.1:1337".parse().unwrap(); + self.map + .test_insert_pair(local_addr, &server.map, server.local_addr); + Ok(secret::HandshakeKind::Fresh) + } + } + + pub async fn connect_to>(&self, server: &S) -> io::Result { + let server = server.as_ref(); + let handshake = async { self.handshake_with(server) }; + + match server.protocol { + Protocol::Tcp => { + client::connect_tcp( + server.local_addr, + handshake, + server.local_addr, + &self.env, + &self.map, + ) + .await + } + Protocol::Udp => { + client::connect_udp( + server.local_addr, + handshake, + server.local_addr, + &self.env, + &self.map, + ) + .await + } + Protocol::Other(name) => { + todo!("protocol {name:?} not implemented") + } + } + } +} + +#[derive(Clone)] +pub struct ServerHandle { + map: secret::Map, + protocol: Protocol, + local_addr: SocketAddr, +} + +impl AsRef for ServerHandle { + fn as_ref(&self) -> &ServerHandle { + self + } +} + +pub struct Server { + handle: ServerHandle, + receiver: accept::Receiver, + stats: stats::Sender, + #[allow(dead_code)] + drop_handle: drop_handle::Sender, +} + +impl Default for Server { + fn default() -> Self { + Self::new_udp(accept::Flavor::Fifo) + } +} + +impl AsRef for Server { + fn as_ref(&self) -> &ServerHandle { + &self.handle + } +} + +impl Server { + pub fn new_tcp(accept_flavor: accept::Flavor) -> Self { + Self::new(Protocol::Tcp, accept_flavor) + } + + pub fn new_udp(accept_flavor: accept::Flavor) -> Self { + Self::new(Protocol::Udp, accept_flavor) + } + + pub fn new(protocol: Protocol, accept_flavor: accept::Flavor) -> Self { + if s2n_quic_platform::io::testing::is_in_env() { + todo!() + } else { + Self::new_tokio(protocol, accept_flavor) + } + } + + fn new_tokio(protocol: Protocol, accept_flavor: accept::Flavor) -> Self { + let _span = tracing::info_span!("server").entered(); + let map = secret::map::testing::new(16); + let (sender, receiver) = accept::channel(16); + + let options = crate::socket::Options::new("127.0.0.1:0".parse().unwrap()); + + let env = env::Builder::default().build().unwrap(); + + let subscriber = event::tracing::Subscriber::default(); + let (drop_handle_sender, drop_handle_receiver) = drop_handle::new(); + + let local_addr = match protocol { + Protocol::Tcp => { + let socket = options.build_tcp_listener().unwrap(); + let local_addr = socket.local_addr().unwrap(); + let socket = tokio::net::TcpListener::from_std(socket).unwrap(); + + let acceptor = server::tcp::Acceptor::new( + 0, + socket, + &sender, + &env, + &map, + 16, + accept_flavor, + subscriber, + ); + let acceptor = drop_handle_receiver.wrap(acceptor.run()); + let acceptor = acceptor.instrument(tracing::info_span!("tcp")); + tokio::task::spawn(acceptor); + + local_addr + } + Protocol::Udp => { + let socket = options.build_udp().unwrap(); + let local_addr = socket.local_addr().unwrap(); + + let socket = tokio::io::unix::AsyncFd::new(socket).unwrap(); + + let acceptor = server::udp::Acceptor::new( + 0, + socket, + &sender, + &env, + &map, + accept_flavor, + subscriber, + ); + let acceptor = drop_handle_receiver.wrap(acceptor.run()); + let acceptor = acceptor.instrument(tracing::info_span!("udp")); + tokio::task::spawn(acceptor); + + local_addr + } + Protocol::Other(name) => { + todo!("protocol {name:?} not implemented") + } + }; + + let (stats_sender, stats_worker, stats) = stats::channel(); + + { + let task = stats_worker.run(env.clock().clone()); + let task = task.instrument(tracing::info_span!("stats")); + let task = drop_handle_receiver.wrap(task); + tokio::task::spawn(task); + } + + if matches!(accept_flavor, accept::Flavor::Lifo) { + let channel = receiver.downgrade(); + let task = accept::Pruner::default().run( + env, + channel, + stats, + event::tracing::Subscriber::default(), + ); + let task = task.instrument(tracing::info_span!("pruner")); + let task = drop_handle_receiver.wrap(task); + tokio::task::spawn(task); + } + + let handle = ServerHandle { + map, + protocol, + local_addr, + }; + + Self { + handle, + receiver, + stats: stats_sender, + drop_handle: drop_handle_sender, + } + } + + pub fn handle(&self) -> ServerHandle { + self.handle.clone() + } + + pub async fn accept(&self) -> io::Result<(Stream, SocketAddr)> { + accept::accept( + &self.receiver, + &self.stats, + &event::tracing::Subscriber::default(), + ) + .await + } +} + +mod drop_handle { + use core::future::Future; + use tokio::sync::watch; + + pub fn new() -> (Sender, Receiver) { + let (sender, receiver) = watch::channel(()); + (Sender(sender), Receiver(receiver)) + } + + #[derive(Clone)] + pub struct Receiver(watch::Receiver<()>); + + impl Receiver { + pub fn wrap(&self, other: F) -> impl Future + where + F: Future, + { + let mut watch = self.0.clone(); + async move { + tokio::select! { + _ = other => {} + _ = watch.changed() => {} + } + } + } + } + + pub struct Sender(#[allow(dead_code)] watch::Sender<()>); +} diff --git a/dc/s2n-quic-dc/src/testing.rs b/dc/s2n-quic-dc/src/testing.rs index 93e057eceb..59de185367 100644 --- a/dc/s2n-quic-dc/src/testing.rs +++ b/dc/s2n-quic-dc/src/testing.rs @@ -20,8 +20,14 @@ pub fn init_tracing() { //.with_ansi(false) .compact(); // Use a less verbose output format. + let default_level = if cfg!(debug_assertions) { + tracing::Level::DEBUG + } else { + tracing::Level::WARN + }; + let env_filter = tracing_subscriber::EnvFilter::builder() - .with_default_directive(tracing::Level::DEBUG.into()) + .with_default_directive(default_level.into()) .with_env_var("S2N_LOG") .from_env() .unwrap(); diff --git a/quic/s2n-quic-platform/src/io/testing/time.rs b/quic/s2n-quic-platform/src/io/testing/time.rs index 80f156da96..9f257c06d0 100644 --- a/quic/s2n-quic-platform/src/io/testing/time.rs +++ b/quic/s2n-quic-platform/src/io/testing/time.rs @@ -24,7 +24,7 @@ pub fn delay_until(deadline: Timestamp) -> Timer { } #[derive(Debug, Default)] -pub(crate) struct Clock(()); +pub struct Clock(()); impl clock::Clock for Clock { fn get_time(&self) -> Timestamp {