Skip to content

Commit

Permalink
Merge pull request #198 from bearcove/upgrade-hyper
Browse files Browse the repository at this point in the history
feat: Upgrade to hyper 1.x + fix cancellation in test rig
  • Loading branch information
fasterthanlime authored Aug 12, 2024
2 parents 5807701 + 3ec4a3f commit 7387d6d
Show file tree
Hide file tree
Showing 8 changed files with 111 additions and 99 deletions.
65 changes: 37 additions & 28 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions crates/fluke-buffet/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,7 @@ pub fn start<F: Future>(task: F) -> F::Output {

let res = lset.run_until(task).await;
tracing::debug!("waiting for local set (cancellations, cleanups etc.)");
// let cleanup_timeout = std::time::Duration::from_millis(250);
let cleanup_timeout = std::time::Duration::from_secs(1);
let cleanup_timeout = std::time::Duration::from_millis(250);
if (tokio::time::timeout(cleanup_timeout, lset).await).is_err() {
tracing::debug!(
"🥲 timed out waiting for local set (async cancellations, cleanups etc.)"
Expand Down
3 changes: 3 additions & 0 deletions crates/fluke-buffet/src/net/net_uring.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ impl TcpListener {
pub async fn bind(addr: SocketAddr) -> std::io::Result<Self> {
let addr: socket2::SockAddr = addr.into();
let socket = socket2::Socket::new(addr.domain(), socket2::Type::STREAM, None)?;
// FIXME: don't hardcode
socket.set_reuse_port(true)?;
socket.set_reuse_address(true)?;
socket.bind(&addr)?;
// FIXME: magic values
socket.listen(16)?;
Expand Down
10 changes: 3 additions & 7 deletions crates/fluke-hyper-testbed/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,9 @@ publish = false
[dependencies]
bytes = "1.7.1"
futures = "0.3.30"
hyper = { version = "0.14.30", features = [
"client",
"server",
"http1",
"tcp",
"stream",
] }
http-body-util = "0.1.2"
hyper = { version = "1.4.1", features = ["client", "server", "http1"] }
hyper-util = { version = "0.1.7", features = ["server", "http1", "tokio"] }
tokio = { version = "1.39.2", features = ["full"] }
tokio-stream = "0.1.15"
tracing = "0.1.40"
73 changes: 44 additions & 29 deletions crates/fluke-hyper-testbed/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
use std::{convert::Infallible, pin::Pin};
use http_body_util::{BodyExt, StreamBody};
use hyper_util::rt::TokioExecutor;
use hyper_util::rt::TokioIo;

use hyper_util::server::conn::auto;
use std::{convert::Infallible, fmt::Debug, pin::Pin};
use tokio::sync::mpsc;

use bytes::Bytes;
use futures::{Future, StreamExt};
use futures::Future;
use hyper::{
service::{make_service_fn, Service},
Body, Request, Response,
body::{Body, Frame},
service::Service,
Request, Response,
};
use tokio_stream::wrappers::ReceiverStream;
use tracing::debug;
Expand All @@ -15,57 +22,59 @@ pub fn big_body() -> String {
"this is a big chunk".repeat(256).repeat(128)
}

impl Service<Request<Body>> for TestService {
type Response = Response<Body>;
type BoxBody<E> = Pin<Box<dyn Body<Data = Bytes, Error = E> + Send + Sync + 'static>>;

impl<B, E> Service<Request<B>> for TestService
where
B: Body<Data = Bytes, Error = E> + Send + Sync + Unpin + 'static,
E: Debug + Send + Sync + 'static,
{
type Response = Response<BoxBody<E>>;
type Error = Infallible;
type Future =
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;

fn poll_ready(
&mut self,
_cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), Self::Error>> {
Ok(()).into()
}

fn call(&mut self, req: Request<Body>) -> Self::Future {
fn call(&self, req: Request<B>) -> Self::Future {
Box::pin(async move {
let (parts, body) = req.into_parts();
println!("Handling {parts:?}");

let path = parts.uri.path();
match path {
"/echo-body" => {
let body: BoxBody<E> = Box::pin(body);
let res = Response::builder().body(body).unwrap();
Ok(res)
}
"/stream-big-body" => {
let (tx, rx) = tokio::sync::mpsc::channel::<Bytes>(1);
let rx = ReceiverStream::new(rx).map(Ok::<_, Infallible>);
let (tx, rx) = mpsc::channel::<Result<Frame<Bytes>, E>>(1);

tokio::spawn(async move {
let chunk = "this is a big chunk".repeat(256);
let chunk = Bytes::from(chunk);
for _ in 0..128 {
let _ = tx.send(chunk.clone()).await;
let frame = Frame::data(chunk.clone());
let _ = tx.send(Ok(frame)).await;
}
});

let res = Response::builder().body(Body::wrap_stream(rx)).unwrap();
let rx = ReceiverStream::new(rx);
let body: BoxBody<E> = Box::pin(StreamBody::new(rx));
let res = Response::builder().body(body).unwrap();
Ok(res)
}
_ => {
let parts = path.trim_start_matches('/').split('/').collect::<Vec<_>>();
let body: BoxBody<E> =
Box::pin(http_body_util::Empty::new().map_err(|_| unreachable!()));

if let ["status", code] = parts.as_slice() {
let code = code.parse::<u16>().unwrap();
let res = Response::builder()
.status(code)
.body(Body::empty())
.unwrap();
debug!("Replying with {res:?}");
let res = Response::builder().status(code).body(body).unwrap();
debug!("Replying with {:?} {:?}", res.status(), res.headers());
Ok(res)
} else {
let res = Response::builder().status(404).body(Body::empty()).unwrap();
let res = Response::builder().status(404).body(body).unwrap();
Ok(res)
}
}
Expand All @@ -76,11 +85,17 @@ impl Service<Request<Body>> for TestService {

#[tokio::main]
async fn main() {
let upstream = hyper::Server::bind(&"127.0.0.1:0".parse().unwrap()).serve(make_service_fn(
|_addr| async move { Ok::<_, Infallible>(TestService) },
));
let upstream_addr = upstream.local_addr();
let ln = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let upstream_addr = ln.local_addr().unwrap();
println!("I listen on {upstream_addr}");

upstream.await.unwrap();
while let Ok((stream, _)) = ln.accept().await {
tokio::spawn(async move {
let mut builder = auto::Builder::new(TokioExecutor::new());
builder = builder.http1_only();
builder
.serve_connection(TokioIo::new(stream), TestService)
.await
});
}
}
31 changes: 0 additions & 31 deletions crates/fluke-io-uring-async/src/linux.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,46 +76,15 @@ impl<C: cqueue::Entry> Drop for Op<C> {
drop(guard);

// submit cancel op
tracing::debug!("doing async cancel for op {index}");
let cancel = AsyncCancel::new(inner.index.try_into().unwrap()).build();
tracing::debug!("doing async cancel for op {index} (.build called)");
let mut cancel_op = get_ring().push(cancel);
tracing::debug!("doing async cancel for op {index} (.push called)");
let cancel_op_inner = cancel_op.inner.take().unwrap();
tracing::debug!(
"doing async cancel for op {index} (cancel op has index {})",
cancel_op_inner.index
);
tracing::debug!("doing async cancel for op {index} (.take called)");
std::mem::forget(cancel_op);
tracing::debug!("doing async cancel for op {index} (cancel_fut forgotten)");

struct NoisyDrop;

impl Drop for NoisyDrop {
fn drop(&mut self) {
tracing::debug!("dropping noisy drop");
}
}

let noisy_drop = NoisyDrop;
tracing::debug!("spawning noisy drop task");
tokio::task::spawn_local(async move {
tracing::debug!("inside noisy drop spawned task");
drop(noisy_drop);
tracing::debug!("inside noisy drop spawned task.. dropped!");
});
tracing::debug!("spawned noisy drop task");

tokio::task::spawn_local(async move {
tracing::debug!("cancelling op {index}");
cancel_op_inner.await;
tracing::debug!("cancelling op {index}.. cancel_fut returned!");
inner.await;
tracing::debug!("cancelling op {index}.. inner returned!");
});

tracing::debug!("doing async cancel for op {index} (task spawned)");
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/httpwg-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ async fn async_main(args: Args) -> eyre::Result<()> {
for arg in iter {
cmd.arg(arg);
}
#[cfg(unix)]
#[cfg(target_os = "linux")]
unsafe {
// avoid zombie children on unix: no matter how the test runner dies,
// the server will die with it.
Expand Down
Loading

0 comments on commit 7387d6d

Please sign in to comment.