Skip to content

Commit

Permalink
do graceful h2 shutdown after upstream closed
Browse files Browse the repository at this point in the history
  • Loading branch information
zh-jq-b committed May 14, 2024
1 parent dae4d6a commit b5e40e2
Showing 1 changed file with 40 additions and 22 deletions.
62 changes: 40 additions & 22 deletions g3proxy/src/inspect/http/v2/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@
* limitations under the License.
*/

use std::future::poll_fn;
use std::sync::Arc;

use async_recursion::async_recursion;
use h2::Reason;
use bytes::Bytes;
use h2::{server::Connection, Reason};
use slog::slog_info;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::time::Instant;

use g3_dpi::Protocol;
Expand Down Expand Up @@ -185,18 +186,12 @@ where
ups_r = &mut h2s_connection => {
return match ups_r {
Ok(_) => {
// cancel and wait the h2c connection to close
h2c.abrupt_shutdown(Reason::CANCEL);
// TODO add timeout
let _ = poll_fn(|ctx| h2c.poll_closed(ctx)).await;
server_graceful_shutdown(h2c).await;

Err(H2InterceptionError::UpstreamConnectionFinished)
}
Err(e) => {
// cancel and wait the h2c connection to close
h2c.abrupt_shutdown(Reason::CANCEL);
// TODO add timeout
let _ = poll_fn(|ctx| h2c.poll_closed(ctx)).await;
server_graceful_shutdown(h2c).await;

if let Some(e) = e.get_io() {
if e.kind() == std::io::ErrorKind::NotConnected {
Expand Down Expand Up @@ -248,10 +243,7 @@ where
idle_count += 1;

if idle_count > max_idle_count {
// cancel and wait the h2c connection to close
h2c.abrupt_shutdown(Reason::CANCEL);
// TODO add timeout
let _ = poll_fn(|ctx| h2c.poll_closed(ctx)).await;
server_abrupt_shutdown(h2c, Reason::ENHANCE_YOUR_CALM).await;

return Err(H2InterceptionError::Idle(idle_duration, idle_count));
}
Expand All @@ -260,19 +252,13 @@ where
}

if self.ctx.belongs_to_blocked_user() {
// cancel and wait the h2c connection to close
h2c.abrupt_shutdown(Reason::CANCEL);
// TODO add timeout
let _ = poll_fn(|ctx| h2c.poll_closed(ctx)).await;
server_abrupt_shutdown(h2c, Reason::CANCEL).await;

return Err(H2InterceptionError::CanceledAsUserBlocked);
}

if self.ctx.server_force_quit() {
// cancel and wait the h2c connection to close
h2c.abrupt_shutdown(Reason::CANCEL);
// TODO add timeout
let _ = poll_fn(|ctx| h2c.poll_closed(ctx)).await;
server_abrupt_shutdown(h2c, Reason::CANCEL).await;

return Err(H2InterceptionError::CanceledAsServerQuit)
}
Expand All @@ -285,3 +271,35 @@ where
}
}
}

async fn server_graceful_shutdown<T>(mut h2c: Connection<T, Bytes>)
where
T: AsyncRead + AsyncWrite + Unpin,
{
h2c.graceful_shutdown();

while let Some(r) = h2c.accept().await {
match r {
Ok((_req, mut send_rsp)) => {
send_rsp.send_reset(Reason::REFUSED_STREAM);
}
Err(_) => break,
}
}
}

async fn server_abrupt_shutdown<T>(mut h2c: Connection<T, Bytes>, reason: Reason)
where
T: AsyncRead + AsyncWrite + Unpin,
{
h2c.abrupt_shutdown(reason);

while let Some(r) = h2c.accept().await {
match r {
Ok((_req, mut send_rsp)) => {
send_rsp.send_reset(reason);
}
Err(_) => break,
}
}
}

0 comments on commit b5e40e2

Please sign in to comment.