Skip to content

Commit

Permalink
fix graceful shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
bobozhengsir committed Oct 25, 2023
1 parent eaa78a3 commit 0f4c92e
Showing 1 changed file with 28 additions and 37 deletions.
65 changes: 28 additions & 37 deletions volo-http/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,37 +39,45 @@ where
let mut incoming = mk_incoming.make_incoming().await?;
info!("[VOLO-HTTP] server start at: {:?}", incoming);

let conn_cnt = Arc::new(std::sync::atomic::AtomicUsize::new(0));
let gconn_cnt = conn_cnt.clone();
let (exit_notify, exit_flag, exit_mark) = (
Arc::new(Notify::const_new()),
Arc::new(parking_lot::RwLock::new(false)),
Arc::new(std::sync::atomic::AtomicBool::default()),
);
let (tx, rx) = tokio::sync::watch::channel(());
let exit_mark = Arc::new(std::sync::atomic::AtomicBool::default());

let exit_flag_inner = exit_flag.clone();
let exit_mark_inner = exit_mark.clone();
let rx_inner = rx.clone();

let service = self;
let handler = tokio::spawn(async move {
let exit_flag = exit_flag_inner.clone();
let exit_mark = exit_mark_inner.clone();
loop {
if *exit_flag.read() {
if exit_mark.load(Ordering::Relaxed) {
break Ok(());
}
match incoming.accept().await {
Ok(Some(conn)) => {
let peer = conn.info.peer_addr.clone().unwrap();
trace!("[VOLO] accept connection from: {:?}", peer);

conn_cnt.fetch_add(1, Ordering::Relaxed);

let s = service.clone();
let mut watch = rx_inner.clone();
tokio::task::spawn(async move {
if let Err(err) = http1::Builder::new()
.serve_connection(conn, MotoreService { peer, inner: s.app })
.await
{
warn!("error serving connection: {:?}", err);
let mut http_conn = http1::Builder::new()
.serve_connection(conn, MotoreService { peer, inner: s.app });
tokio::select! {
_ = watch.changed() => {
tracing::trace!("[VOLO] closing a pending connection");
// Graceful shutdown.
hyper::server::conn::http1::Connection::graceful_shutdown(Pin::new(&mut http_conn));
// Continue to poll this connection until shutdown can finish.
let result = http_conn.await;
if let Err(err) = result {
tracing::debug!("[VOLO] connection error: {:?}", err);
}
}
result = &mut http_conn => {
if let Err(err) = result {
tracing::debug!("[VOLO] connection error: {:?}", err);
}
},
}
});
}
Expand Down Expand Up @@ -127,27 +135,10 @@ where

// received signal, graceful shutdown now
info!("[VOLO] received signal, gracefully exiting now");
*exit_flag.write() = true;
exit_mark.store(true, Ordering::Relaxed);

// Now we won't accept new connections.
// And we want to send crrst reply to the peers in the short future.
if gconn_cnt.load(Ordering::Relaxed) != 0 {
tokio::time::sleep(Duration::from_secs(2)).await;
}
exit_notify.notify_waiters();

// wait for all connections to be closed
for _ in 0..28 {
if gconn_cnt.load(Ordering::Relaxed) == 0 {
break;
}
trace!(
"[VOLO] gracefully exiting, remaining connection count: {}",
gconn_cnt.load(Ordering::Relaxed)
);
tokio::time::sleep(Duration::from_secs(1)).await;
}
drop(rx);
let _ = tx.send(());
tx.closed().await;
Ok(())
}
}

0 comments on commit 0f4c92e

Please sign in to comment.