From 01871dcb66b9433109868f92846c55bb64d1024b Mon Sep 17 00:00:00 2001 From: Amos Wenger Date: Mon, 12 Aug 2024 21:45:41 +0200 Subject: [PATCH 1/4] Upgrade hyper + fix cancellation in test rig --- Cargo.lock | 104 +++++++++++++++++------ crates/fluke-buffet/src/lib.rs | 3 +- crates/fluke-buffet/src/net/net_uring.rs | 3 + crates/fluke-hyper-testbed/Cargo.toml | 6 +- crates/fluke-hyper-testbed/src/main.rs | 81 +++++++++++------- crates/fluke-io-uring-async/src/linux.rs | 31 ------- crates/httpwg/src/lib.rs | 23 ++++- 7 files changed, 157 insertions(+), 94 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5d160c03..bef05099 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -26,6 +26,12 @@ dependencies = [ "memchr", ] +[[package]] +name = "atomic-waker" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0" + [[package]] name = "autocfg" version = "1.3.0" @@ -209,7 +215,7 @@ dependencies = [ "fluke-h2-parse", "fluke-hpack", "futures-util", - "http 1.1.0", + "http", "httparse", "httpwg", "httpwg-macros", @@ -234,7 +240,7 @@ dependencies = [ "color-eyre", "eyre", "fluke-io-uring-async", - "http 1.1.0", + "http", "io-uring", "libc", "memchr", @@ -259,7 +265,7 @@ dependencies = [ "eyre", "fluke", "futures-util", - "http 1.1.0", + "http", "httparse", "libc", "pretty-hex", @@ -311,7 +317,9 @@ version = "0.1.0" dependencies = [ "bytes", "futures", + "http-body-util", "hyper", + "hyper-util", "tokio", "tokio-stream", "tracing", @@ -334,7 +342,7 @@ version = "0.1.0" dependencies = [ "color-eyre", "fluke", - "http 1.1.0", + "http", "ktls", "pretty-hex", "rcgen", @@ -458,6 +466,25 @@ version = "0.28.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4271d37baee1b8c7e4b708028c57d816cf9d2434acb33a549475f78c181f6253" +[[package]] +name = "h2" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa82e28a107a8cc405f0839610bdc9b15f1e25ec7d696aa5cf173edbcb1486ab" +dependencies = [ + "atomic-waker", + "bytes", + "fnv", + "futures-core", + "futures-sink", + "http", + "indexmap", + "slab", + "tokio", + "tokio-util", + "tracing", +] + [[package]] name = "hashbrown" version = "0.14.3" @@ -478,9 +505,9 @@ checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" [[package]] name = "http" -version = "0.2.12" +version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "601cbb57e577e2f5ef5be8e7b83f0f63994f25aa94d673e54a92d5c516d101f1" +checksum = "21b9ddb458710bc376481b842f5da65cdf31522de232c1ca8146abce2a358258" dependencies = [ "bytes", "fnv", @@ -488,24 +515,25 @@ dependencies = [ ] [[package]] -name = "http" -version = "1.1.0" +name = "http-body" +version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "21b9ddb458710bc376481b842f5da65cdf31522de232c1ca8146abce2a358258" +checksum = "1efedce1fb8e6913f23e0c92de8e62cd5b772a67e7b3946df930a62566c93184" dependencies = [ "bytes", - "fnv", - "itoa", + "http", ] [[package]] -name = "http-body" -version = "0.4.6" +name = "http-body-util" +version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7ceab25649e9960c0311ea418d17bee82c0dcec1bd053b5f9a66e265a693bed2" +checksum = "793429d76616a256bcb62c2a2ec2bed781c8307e797e2598c50010f2bee2544f" dependencies = [ "bytes", - "http 0.2.12", + "futures-util", + "http", + "http-body", "pin-project-lite", ] @@ -569,27 +597,40 @@ dependencies = [ [[package]] name = "hyper" -version = "0.14.30" +version = "1.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a152ddd61dfaec7273fe8419ab357f33aee0d914c5f4efbf0d96fa749eea5ec9" +checksum = "50dfd22e0e76d0f662d429a5f80fcaf3855009297eab6a0a9f8543834744ba05" dependencies = [ "bytes", "futures-channel", - "futures-core", "futures-util", - "http 0.2.12", + "h2", + "http", "http-body", "httparse", "httpdate", "itoa", "pin-project-lite", - "socket2", + "smallvec", "tokio", - "tower-service", - "tracing", "want", ] +[[package]] +name = "hyper-util" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cde7055719c54e36e95e8719f95883f22072a48ede39db7fc17a4e1d5281e9b9" +dependencies = [ + "bytes", + "futures-util", + "http", + "http-body", + "hyper", + "pin-project-lite", + "tokio", +] + [[package]] name = "indenter" version = "0.3.3" @@ -1288,6 +1329,19 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-util" +version = "0.7.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9cf6b47b3771c49ac75ad09a6162f53ad4b8088b76ac60e8ec1455b31a189fe1" +dependencies = [ + "bytes", + "futures-core", + "futures-sink", + "pin-project-lite", + "tokio", +] + [[package]] name = "toml_datetime" version = "0.6.5" @@ -1305,12 +1359,6 @@ dependencies = [ "winnow", ] -[[package]] -name = "tower-service" -version = "0.3.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b6bc1c9ce2b5135ac7f93c72918fc37feb872bdc6a5533a8b85eb4b86bfdae52" - [[package]] name = "tracing" version = "0.1.40" diff --git a/crates/fluke-buffet/src/lib.rs b/crates/fluke-buffet/src/lib.rs index 7cef194f..2d72af71 100644 --- a/crates/fluke-buffet/src/lib.rs +++ b/crates/fluke-buffet/src/lib.rs @@ -54,8 +54,7 @@ pub fn start(task: F) -> F::Output { let res = lset.run_until(task).await; tracing::debug!("waiting for local set (cancellations, cleanups etc.)"); - // let cleanup_timeout = std::time::Duration::from_millis(250); - let cleanup_timeout = std::time::Duration::from_secs(1); + let cleanup_timeout = std::time::Duration::from_millis(250); if (tokio::time::timeout(cleanup_timeout, lset).await).is_err() { tracing::debug!( "🥲 timed out waiting for local set (async cancellations, cleanups etc.)" diff --git a/crates/fluke-buffet/src/net/net_uring.rs b/crates/fluke-buffet/src/net/net_uring.rs index 07898767..26e3a5b9 100644 --- a/crates/fluke-buffet/src/net/net_uring.rs +++ b/crates/fluke-buffet/src/net/net_uring.rs @@ -62,6 +62,9 @@ impl TcpListener { pub async fn bind(addr: SocketAddr) -> std::io::Result { let addr: socket2::SockAddr = addr.into(); let socket = socket2::Socket::new(addr.domain(), socket2::Type::STREAM, None)?; + // FIXME: don't hardcode + socket.set_reuse_port(true)?; + socket.set_reuse_address(true)?; socket.bind(&addr)?; // FIXME: magic values socket.listen(16)?; diff --git a/crates/fluke-hyper-testbed/Cargo.toml b/crates/fluke-hyper-testbed/Cargo.toml index cef904ba..1325d242 100644 --- a/crates/fluke-hyper-testbed/Cargo.toml +++ b/crates/fluke-hyper-testbed/Cargo.toml @@ -7,13 +7,13 @@ publish = false [dependencies] bytes = "1.7.1" futures = "0.3.30" -hyper = { version = "0.14.30", features = [ +http-body-util = "0.1.2" +hyper = { version = "1.4.1", features = [ "client", "server", "http1", - "tcp", - "stream", ] } +hyper-util = { version = "0.1.7", features = ["server", "http1", "tokio", "http2"] } tokio = { version = "1.39.2", features = ["full"] } tokio-stream = "0.1.15" tracing = "0.1.40" diff --git a/crates/fluke-hyper-testbed/src/main.rs b/crates/fluke-hyper-testbed/src/main.rs index 5bed26a9..d90747a1 100644 --- a/crates/fluke-hyper-testbed/src/main.rs +++ b/crates/fluke-hyper-testbed/src/main.rs @@ -1,10 +1,17 @@ +use http_body_util::{BodyExt, StreamBody}; +use hyper_util::rt::TokioExecutor; +use hyper_util::rt::TokioIo; + +use hyper_util::server::conn::auto; use std::{convert::Infallible, pin::Pin}; +use tokio::sync::mpsc; use bytes::Bytes; -use futures::{Future, StreamExt}; +use futures::Future; use hyper::{ - service::{make_service_fn, Service}, - Body, Request, Response, + body::{Body, Frame}, + service::Service, + Request, Response, }; use tokio_stream::wrappers::ReceiverStream; use tracing::debug; @@ -15,57 +22,67 @@ pub fn big_body() -> String { "this is a big chunk".repeat(256).repeat(128) } -impl Service> for TestService { - type Response = Response; +type BoxBody = Pin + Send + Sync + 'static>>; + +impl Service> for TestService +where + B: Body + Send + Unpin + 'static, + ::Error: std::fmt::Debug + Send + 'static, +{ + type Response = Response; type Error = Infallible; type Future = Pin> + Send + 'static>>; - fn poll_ready( - &mut self, - _cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - Ok(()).into() - } - - fn call(&mut self, req: Request) -> Self::Future { + fn call(&self, req: Request) -> Self::Future { Box::pin(async move { - let (parts, body) = req.into_parts(); + let (parts, mut body) = req.into_parts(); println!("Handling {parts:?}"); let path = parts.uri.path(); match path { "/echo-body" => { + let (tx, rx) = mpsc::channel::, Infallible>>(1); + tokio::spawn(async move { + while let Some(frame) = body.frame().await { + let _ = tx.send(Ok(frame.unwrap())).await; + } + }); + + let rx = ReceiverStream::new(rx); + let body = StreamBody::new(rx); + let body: BoxBody = Box::pin(body); let res = Response::builder().body(body).unwrap(); Ok(res) } "/stream-big-body" => { - let (tx, rx) = tokio::sync::mpsc::channel::(1); - let rx = ReceiverStream::new(rx).map(Ok::<_, Infallible>); + let (tx, rx) = mpsc::channel::, Infallible>>(1); tokio::spawn(async move { let chunk = "this is a big chunk".repeat(256); let chunk = Bytes::from(chunk); for _ in 0..128 { - let _ = tx.send(chunk.clone()).await; + let frame = Frame::data(chunk.clone()); + let _ = tx.send(Ok(frame)).await; } }); - let res = Response::builder().body(Body::wrap_stream(rx)).unwrap(); + let rx = ReceiverStream::new(rx); + let body: BoxBody = Box::pin(StreamBody::new(rx)); + let res = Response::builder().body(body).unwrap(); Ok(res) } _ => { let parts = path.trim_start_matches('/').split('/').collect::>(); + let body: BoxBody = Box::pin(http_body_util::Empty::new()); + if let ["status", code] = parts.as_slice() { let code = code.parse::().unwrap(); - let res = Response::builder() - .status(code) - .body(Body::empty()) - .unwrap(); - debug!("Replying with {res:?}"); + let res = Response::builder().status(code).body(body).unwrap(); + debug!("Replying with {:?} {:?}", res.status(), res.headers()); Ok(res) } else { - let res = Response::builder().status(404).body(Body::empty()).unwrap(); + let res = Response::builder().status(404).body(body).unwrap(); Ok(res) } } @@ -76,11 +93,17 @@ impl Service> for TestService { #[tokio::main] async fn main() { - let upstream = hyper::Server::bind(&"127.0.0.1:0".parse().unwrap()).serve(make_service_fn( - |_addr| async move { Ok::<_, Infallible>(TestService) }, - )); - let upstream_addr = upstream.local_addr(); + let ln = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); + let upstream_addr = ln.local_addr().unwrap(); println!("I listen on {upstream_addr}"); - upstream.await.unwrap(); + while let Ok((stream, _)) = ln.accept().await { + tokio::spawn(async move { + let mut builder = auto::Builder::new(TokioExecutor::new()); + builder = builder.http2_only(); + builder + .serve_connection(TokioIo::new(stream), TestService) + .await + }); + } } diff --git a/crates/fluke-io-uring-async/src/linux.rs b/crates/fluke-io-uring-async/src/linux.rs index 85d7f13d..2726f767 100644 --- a/crates/fluke-io-uring-async/src/linux.rs +++ b/crates/fluke-io-uring-async/src/linux.rs @@ -76,46 +76,15 @@ impl Drop for Op { drop(guard); // submit cancel op - tracing::debug!("doing async cancel for op {index}"); let cancel = AsyncCancel::new(inner.index.try_into().unwrap()).build(); - tracing::debug!("doing async cancel for op {index} (.build called)"); let mut cancel_op = get_ring().push(cancel); - tracing::debug!("doing async cancel for op {index} (.push called)"); let cancel_op_inner = cancel_op.inner.take().unwrap(); - tracing::debug!( - "doing async cancel for op {index} (cancel op has index {})", - cancel_op_inner.index - ); - tracing::debug!("doing async cancel for op {index} (.take called)"); std::mem::forget(cancel_op); - tracing::debug!("doing async cancel for op {index} (cancel_fut forgotten)"); - struct NoisyDrop; - - impl Drop for NoisyDrop { - fn drop(&mut self) { - tracing::debug!("dropping noisy drop"); - } - } - - let noisy_drop = NoisyDrop; - tracing::debug!("spawning noisy drop task"); tokio::task::spawn_local(async move { - tracing::debug!("inside noisy drop spawned task"); - drop(noisy_drop); - tracing::debug!("inside noisy drop spawned task.. dropped!"); - }); - tracing::debug!("spawned noisy drop task"); - - tokio::task::spawn_local(async move { - tracing::debug!("cancelling op {index}"); cancel_op_inner.await; - tracing::debug!("cancelling op {index}.. cancel_fut returned!"); inner.await; - tracing::debug!("cancelling op {index}.. inner returned!"); }); - - tracing::debug!("doing async cancel for op {index} (task spawned)"); } } } diff --git a/crates/httpwg/src/lib.rs b/crates/httpwg/src/lib.rs index a9320e66..003bcf2b 100644 --- a/crates/httpwg/src/lib.rs +++ b/crates/httpwg/src/lib.rs @@ -95,6 +95,10 @@ pub struct Conn { hpack_dec: fluke_hpack::Decoder<'static>, /// the peer's settings pub settings: Settings, + + // this field exists for the `Drop` impl + #[allow(dead_code)] + cancel_tx: tokio::sync::oneshot::Sender<()>, } pub enum Ev { @@ -351,7 +355,23 @@ impl Conn { Ok::<_, eyre::Report>(()) } }; - fluke_buffet::spawn(async move { recv_fut.await.unwrap() }); + + // cancel_tx is slapped as a field of `Conn`, which means when `Conn` is + // dropped, the receive loop will be cancelled — before the LocalSet is shut + // down. + let (cancel_tx, cancel_rx) = tokio::sync::oneshot::channel::<()>(); + + tokio::task::spawn_local(async move { + tokio::select! { + _ = cancel_rx => { + // Task cancelled + tracing::trace!("httpwg receive loop cancelled!"); + }, + result = recv_fut => { + result.unwrap(); + } + } + }); let mut settings: Settings = Default::default(); for (code, value) in default_settings().0 { @@ -370,6 +390,7 @@ impl Conn { max_frame_size: DEFAULT_FRAME_SIZE, ..Default::default() }, + cancel_tx, } } From 558f9a532dc53c77a52b450c8a52b875e55b0762 Mon Sep 17 00:00:00 2001 From: Amos Wenger Date: Mon, 12 Aug 2024 21:46:56 +0200 Subject: [PATCH 2/4] Make httpwg-cli build on macOS --- crates/httpwg-cli/src/main.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/httpwg-cli/src/main.rs b/crates/httpwg-cli/src/main.rs index 617cbedd..7adc7d4f 100644 --- a/crates/httpwg-cli/src/main.rs +++ b/crates/httpwg-cli/src/main.rs @@ -149,7 +149,7 @@ async fn async_main(args: Args) -> eyre::Result<()> { for arg in iter { cmd.arg(arg); } - #[cfg(unix)] + #[cfg(target_os = "linux")] unsafe { // avoid zombie children on unix: no matter how the test runner dies, // the server will die with it. From 5fd3670590a14b3c3967d0b29d740992d3ecd6fe Mon Sep 17 00:00:00 2001 From: Amos Wenger Date: Mon, 12 Aug 2024 21:59:58 +0200 Subject: [PATCH 3/4] Fix some hyper tests (testbed is h1, not h2) --- Cargo.lock | 39 -------------------------- crates/fluke-hyper-testbed/Cargo.toml | 8 ++---- crates/fluke-hyper-testbed/src/main.rs | 2 +- 3 files changed, 3 insertions(+), 46 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index bef05099..4d28c5aa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -26,12 +26,6 @@ dependencies = [ "memchr", ] -[[package]] -name = "atomic-waker" -version = "1.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0" - [[package]] name = "autocfg" version = "1.3.0" @@ -466,25 +460,6 @@ version = "0.28.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4271d37baee1b8c7e4b708028c57d816cf9d2434acb33a549475f78c181f6253" -[[package]] -name = "h2" -version = "0.4.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fa82e28a107a8cc405f0839610bdc9b15f1e25ec7d696aa5cf173edbcb1486ab" -dependencies = [ - "atomic-waker", - "bytes", - "fnv", - "futures-core", - "futures-sink", - "http", - "indexmap", - "slab", - "tokio", - "tokio-util", - "tracing", -] - [[package]] name = "hashbrown" version = "0.14.3" @@ -604,7 +579,6 @@ dependencies = [ "bytes", "futures-channel", "futures-util", - "h2", "http", "http-body", "httparse", @@ -1329,19 +1303,6 @@ dependencies = [ "tokio", ] -[[package]] -name = "tokio-util" -version = "0.7.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9cf6b47b3771c49ac75ad09a6162f53ad4b8088b76ac60e8ec1455b31a189fe1" -dependencies = [ - "bytes", - "futures-core", - "futures-sink", - "pin-project-lite", - "tokio", -] - [[package]] name = "toml_datetime" version = "0.6.5" diff --git a/crates/fluke-hyper-testbed/Cargo.toml b/crates/fluke-hyper-testbed/Cargo.toml index 1325d242..5db2ee29 100644 --- a/crates/fluke-hyper-testbed/Cargo.toml +++ b/crates/fluke-hyper-testbed/Cargo.toml @@ -8,12 +8,8 @@ publish = false bytes = "1.7.1" futures = "0.3.30" http-body-util = "0.1.2" -hyper = { version = "1.4.1", features = [ - "client", - "server", - "http1", -] } -hyper-util = { version = "0.1.7", features = ["server", "http1", "tokio", "http2"] } +hyper = { version = "1.4.1", features = ["client", "server", "http1"] } +hyper-util = { version = "0.1.7", features = ["server", "http1", "tokio"] } tokio = { version = "1.39.2", features = ["full"] } tokio-stream = "0.1.15" tracing = "0.1.40" diff --git a/crates/fluke-hyper-testbed/src/main.rs b/crates/fluke-hyper-testbed/src/main.rs index d90747a1..b5427dd7 100644 --- a/crates/fluke-hyper-testbed/src/main.rs +++ b/crates/fluke-hyper-testbed/src/main.rs @@ -100,7 +100,7 @@ async fn main() { while let Ok((stream, _)) = ln.accept().await { tokio::spawn(async move { let mut builder = auto::Builder::new(TokioExecutor::new()); - builder = builder.http2_only(); + builder = builder.http1_only(); builder .serve_connection(TokioIo::new(stream), TestService) .await From 3ec4a3f5864dede3c9a536c7fd1372c5b244b1bc Mon Sep 17 00:00:00 2001 From: Amos Wenger Date: Mon, 12 Aug 2024 22:05:28 +0200 Subject: [PATCH 4/4] fix curl tests --- crates/fluke-hyper-testbed/src/main.rs | 32 ++++++++++---------------- 1 file changed, 12 insertions(+), 20 deletions(-) diff --git a/crates/fluke-hyper-testbed/src/main.rs b/crates/fluke-hyper-testbed/src/main.rs index b5427dd7..12c28eb2 100644 --- a/crates/fluke-hyper-testbed/src/main.rs +++ b/crates/fluke-hyper-testbed/src/main.rs @@ -3,7 +3,7 @@ use hyper_util::rt::TokioExecutor; use hyper_util::rt::TokioIo; use hyper_util::server::conn::auto; -use std::{convert::Infallible, pin::Pin}; +use std::{convert::Infallible, fmt::Debug, pin::Pin}; use tokio::sync::mpsc; use bytes::Bytes; @@ -22,41 +22,32 @@ pub fn big_body() -> String { "this is a big chunk".repeat(256).repeat(128) } -type BoxBody = Pin + Send + Sync + 'static>>; +type BoxBody = Pin + Send + Sync + 'static>>; -impl Service> for TestService +impl Service> for TestService where - B: Body + Send + Unpin + 'static, - ::Error: std::fmt::Debug + Send + 'static, + B: Body + Send + Sync + Unpin + 'static, + E: Debug + Send + Sync + 'static, { - type Response = Response; + type Response = Response>; type Error = Infallible; type Future = Pin> + Send + 'static>>; fn call(&self, req: Request) -> Self::Future { Box::pin(async move { - let (parts, mut body) = req.into_parts(); + let (parts, body) = req.into_parts(); println!("Handling {parts:?}"); let path = parts.uri.path(); match path { "/echo-body" => { - let (tx, rx) = mpsc::channel::, Infallible>>(1); - tokio::spawn(async move { - while let Some(frame) = body.frame().await { - let _ = tx.send(Ok(frame.unwrap())).await; - } - }); - - let rx = ReceiverStream::new(rx); - let body = StreamBody::new(rx); - let body: BoxBody = Box::pin(body); + let body: BoxBody = Box::pin(body); let res = Response::builder().body(body).unwrap(); Ok(res) } "/stream-big-body" => { - let (tx, rx) = mpsc::channel::, Infallible>>(1); + let (tx, rx) = mpsc::channel::, E>>(1); tokio::spawn(async move { let chunk = "this is a big chunk".repeat(256); @@ -68,13 +59,14 @@ where }); let rx = ReceiverStream::new(rx); - let body: BoxBody = Box::pin(StreamBody::new(rx)); + let body: BoxBody = Box::pin(StreamBody::new(rx)); let res = Response::builder().body(body).unwrap(); Ok(res) } _ => { let parts = path.trim_start_matches('/').split('/').collect::>(); - let body: BoxBody = Box::pin(http_body_util::Empty::new()); + let body: BoxBody = + Box::pin(http_body_util::Empty::new().map_err(|_| unreachable!())); if let ["status", code] = parts.as_slice() { let code = code.parse::().unwrap();