diff --git a/bongonet-core/src/apps/http_app.rs b/bongonet-core/src/apps/http_app.rs index 8b13789..2e9a5bd 100644 --- a/bongonet-core/src/apps/http_app.rs +++ b/bongonet-core/src/apps/http_app.rs @@ -1 +1,211 @@ +// Copyright 2024 Khulnasoft, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +//! The abstraction and implementation interface for service application logic + +use async_trait::async_trait; +use bongonet_http::ResponseHeader; +use http::Response; +use log::{debug, error, trace}; +use std::sync::Arc; + +use crate::apps::HttpServerApp; +use crate::modules::http::{HttpModules, ModuleBuilder}; +use crate::protocols::http::HttpTask; +use crate::protocols::http::ServerSession; +use crate::protocols::Stream; +use crate::server::ShutdownWatch; + +/// This trait defines how to map a request to a response +#[async_trait] +pub trait ServeHttp { + /// Define the mapping from a request to a response. + /// Note that the request header is already read, but the implementation needs to read the + /// request body if any. + /// + /// # Limitation + /// In this API, the entire response has to be generated before the end of this call. + /// So it is not suitable for streaming response or interactive communications. + /// Users need to implement their own [`super::HttpServerApp`] for those use cases. + async fn response(&self, http_session: &mut ServerSession) -> Response>; +} + +// TODO: remove this in favor of HttpServer? +#[async_trait] +impl HttpServerApp for SV +where + SV: ServeHttp + Send + Sync, +{ + async fn process_new_http( + self: &Arc, + mut http: ServerSession, + shutdown: &ShutdownWatch, + ) -> Option { + match http.read_request().await { + Ok(res) => match res { + false => { + debug!("Failed to read request header"); + return None; + } + true => { + debug!("Successfully get a new request"); + } + }, + Err(e) => { + error!("HTTP server fails to read from downstream: {e}"); + return None; + } + } + trace!("{:?}", http.req_header()); + if *shutdown.borrow() { + http.set_keepalive(None); + } else { + http.set_keepalive(Some(60)); + } + let new_response = self.response(&mut http).await; + let (parts, body) = new_response.into_parts(); + let resp_header: ResponseHeader = parts.into(); + match http.write_response_header(Box::new(resp_header)).await { + Ok(()) => { + debug!("HTTP response header done."); + } + Err(e) => { + error!( + "HTTP server fails to write to downstream: {e}, {}", + http.request_summary() + ); + } + } + if !body.is_empty() { + // TODO: check if chunked encoding is needed + match http.write_response_body(body.into(), true).await { + Ok(_) => debug!("HTTP response written."), + Err(e) => error!( + "HTTP server fails to write to downstream: {e}, {}", + http.request_summary() + ), + } + } + match http.finish().await { + Ok(c) => c, + Err(e) => { + error!("HTTP server fails to finish the request: {e}"); + None + } + } + } +} + +/// A helper struct for HTTP server with http modules embedded +pub struct HttpServer { + app: SV, + modules: HttpModules, +} + +impl HttpServer { + /// Create a new [HttpServer] with the given app which implements [ServeHttp] + pub fn new_app(app: SV) -> Self { + HttpServer { + app, + modules: HttpModules::new(), + } + } + + /// Add [ModuleBuilder] to this [HttpServer] + pub fn add_module(&mut self, module: ModuleBuilder) { + self.modules.add_module(module) + } +} + +#[async_trait] +impl HttpServerApp for HttpServer +where + SV: ServeHttp + Send + Sync, +{ + async fn process_new_http( + self: &Arc, + mut http: ServerSession, + shutdown: &ShutdownWatch, + ) -> Option { + match http.read_request().await { + Ok(res) => match res { + false => { + debug!("Failed to read request header"); + return None; + } + true => { + debug!("Successfully get a new request"); + } + }, + Err(e) => { + error!("HTTP server fails to read from downstream: {e}"); + return None; + } + } + trace!("{:?}", http.req_header()); + if *shutdown.borrow() { + http.set_keepalive(None); + } else { + http.set_keepalive(Some(60)); + } + let mut module_ctx = self.modules.build_ctx(); + let req = http.req_header_mut(); + module_ctx.request_header_filter(req).await.ok()?; + let new_response = self.app.response(&mut http).await; + let (parts, body) = new_response.into_parts(); + let mut resp_header: ResponseHeader = parts.into(); + module_ctx + .response_header_filter(&mut resp_header, body.is_empty()) + .await + .ok()?; + + let task = HttpTask::Header(Box::new(resp_header), body.is_empty()); + trace!("{task:?}"); + + match http.response_duplex_vec(vec![task]).await { + Ok(_) => { + debug!("HTTP response header done."); + } + Err(e) => { + error!( + "HTTP server fails to write to downstream: {e}, {}", + http.request_summary() + ); + } + } + + let mut body = Some(body.into()); + module_ctx.response_body_filter(&mut body, true).ok()?; + + let task = HttpTask::Body(body, true); + + trace!("{task:?}"); + + // TODO: check if chunked encoding is needed + match http.response_duplex_vec(vec![task]).await { + Ok(_) => debug!("HTTP response written."), + Err(e) => error!( + "HTTP server fails to write to downstream: {e}, {}", + http.request_summary() + ), + } + match http.finish().await { + Ok(c) => c, + Err(e) => { + error!("HTTP server fails to finish the request: {e}"); + None + } + } + } +} diff --git a/bongonet-proxy/Cargo.toml b/bongonet-proxy/Cargo.toml index ce9b61e..35f82db 100644 --- a/bongonet-proxy/Cargo.toml +++ b/bongonet-proxy/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "bongonet-proxy" version = "0.3.0" -authors = ["KhulnaSoft DevOps "] +authors = ["Yuchen Wu "] license = "Apache-2.0" edition = "2021" repository = "https://github.com/khulnasoft/bongonet" @@ -64,4 +64,4 @@ boringssl = ["bongonet-core/boringssl", "bongonet-cache/boringssl"] rustdoc-args = ["--cfg", "doc_async_trait"] [lints.rust] -unexpected_cfgs = { level = "warn", check-cfg = ['cfg(doc_async_trait)'] } +unexpected_cfgs = { level = "warn", check-cfg = ['cfg(doc_async_trait)'] } \ No newline at end of file diff --git a/bongonet-proxy/examples/gateway.rs b/bongonet-proxy/examples/gateway.rs index b12d8f0..6b73087 100644 --- a/bongonet-proxy/examples/gateway.rs +++ b/bongonet-proxy/examples/gateway.rs @@ -121,7 +121,7 @@ fn main() { let mut my_proxy = bongonet_proxy::http_proxy_service( &my_server.configuration, MyGateway { - req_metric: register_int_counter!("reg_counter", "Number of requests").unwrap(), + req_metric: register_int_counter!("req_counter", "Number of requests").unwrap(), }, ); my_proxy.add_tcp("0.0.0.0:6191"); diff --git a/bongonet-proxy/examples/grpc_web_module.rs b/bongonet-proxy/examples/grpc_web_module.rs new file mode 100644 index 0000000..2601aa7 --- /dev/null +++ b/bongonet-proxy/examples/grpc_web_module.rs @@ -0,0 +1,90 @@ +// Copyright 2024 Khulnasoft, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use async_trait::async_trait; +use clap::Parser; + +use bongonet_core::server::Server; +use bongonet_core::upstreams::peer::HttpPeer; +use bongonet_core::Result; +use bongonet_core::{ + modules::http::{ + grpc_web::{GrpcWeb, GrpcWebBridge}, + HttpModules, + }, + prelude::Opt, +}; +use bongonet_proxy::{ProxyHttp, Session}; + +/// This example shows how to use the gRPC-web bridge module + +pub struct GrpcWebBridgeProxy; + +#[async_trait] +impl ProxyHttp for GrpcWebBridgeProxy { + type CTX = (); + fn new_ctx(&self) -> Self::CTX {} + + fn init_downstream_modules(&self, modules: &mut HttpModules) { + // Add the gRPC web module + modules.add_module(Box::new(GrpcWeb)) + } + + async fn early_request_filter( + &self, + session: &mut Session, + _ctx: &mut Self::CTX, + ) -> Result<()> { + let grpc = session + .downstream_modules_ctx + .get_mut::() + .expect("GrpcWebBridge module added"); + + // initialize gRPC module for this request + grpc.init(); + Ok(()) + } + + async fn upstream_peer( + &self, + _session: &mut Session, + _ctx: &mut Self::CTX, + ) -> Result> { + // this needs to be your gRPC server + let grpc_peer = Box::new(HttpPeer::new( + ("1.1.1.1", 443), + true, + "one.one.one.one".to_string(), + )); + Ok(grpc_peer) + } +} + +// RUST_LOG=INFO cargo run --example grpc_web_module + +fn main() { + env_logger::init(); + + // read command line arguments + let opt = Opt::parse(); + let mut my_server = Server::new(Some(opt)).unwrap(); + my_server.bootstrap(); + + let mut my_proxy = + bongonet_proxy::http_proxy_service(&my_server.configuration, GrpcWebBridgeProxy); + my_proxy.add_tcp("0.0.0.0:6194"); + + my_server.add_service(my_proxy); + my_server.run_forever(); +} diff --git a/bongonet-proxy/examples/load_balancer.rs b/bongonet-proxy/examples/load_balancer.rs index 8b13789..140d318 100644 --- a/bongonet-proxy/examples/load_balancer.rs +++ b/bongonet-proxy/examples/load_balancer.rs @@ -1 +1,96 @@ +// Copyright 2024 Khulnasoft, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +use async_trait::async_trait; +use clap::Parser; +use log::info; +use bongonet_core::services::background::background_service; +use std::{sync::Arc, time::Duration}; + +use bongonet_core::server::configuration::Opt; +use bongonet_core::server::Server; +use bongonet_core::upstreams::peer::HttpPeer; +use bongonet_core::Result; +use bongonet_load_balancing::{health_check, selection::RoundRobin, LoadBalancer}; +use bongonet_proxy::{ProxyHttp, Session}; + +pub struct LB(Arc>); + +#[async_trait] +impl ProxyHttp for LB { + type CTX = (); + fn new_ctx(&self) -> Self::CTX {} + + async fn upstream_peer(&self, _session: &mut Session, _ctx: &mut ()) -> Result> { + let upstream = self + .0 + .select(b"", 256) // hash doesn't matter + .unwrap(); + + info!("upstream peer is: {:?}", upstream); + + let peer = Box::new(HttpPeer::new(upstream, true, "one.one.one.one".to_string())); + Ok(peer) + } + + async fn upstream_request_filter( + &self, + _session: &mut Session, + upstream_request: &mut bongonet_http::RequestHeader, + _ctx: &mut Self::CTX, + ) -> Result<()> { + upstream_request + .insert_header("Host", "one.one.one.one") + .unwrap(); + Ok(()) + } +} + +// RUST_LOG=INFO cargo run --example load_balancer +fn main() { + env_logger::init(); + + // read command line arguments + let opt = Opt::parse(); + let mut my_server = Server::new(Some(opt)).unwrap(); + my_server.bootstrap(); + + // 127.0.0.1:343" is just a bad server + let mut upstreams = + LoadBalancer::try_from_iter(["1.1.1.1:443", "1.0.0.1:443", "127.0.0.1:343"]).unwrap(); + + // We add health check in the background so that the bad server is never selected. + let hc = health_check::TcpHealthCheck::new(); + upstreams.set_health_check(hc); + upstreams.health_check_frequency = Some(Duration::from_secs(1)); + + let background = background_service("health check", upstreams); + + let upstreams = background.task(); + + let mut lb = bongonet_proxy::http_proxy_service(&my_server.configuration, LB(upstreams)); + lb.add_tcp("0.0.0.0:6188"); + + let cert_path = format!("{}/tests/keys/server.crt", env!("CARGO_MANIFEST_DIR")); + let key_path = format!("{}/tests/keys/key.pem", env!("CARGO_MANIFEST_DIR")); + + let mut tls_settings = + bongonet_core::listeners::TlsSettings::intermediate(&cert_path, &key_path).unwrap(); + tls_settings.enable_h2(); + lb.add_tls_with_settings("0.0.0.0:6189", None, tls_settings); + + my_server.add_service(lb); + my_server.add_service(background); + my_server.run_forever(); +} diff --git a/bongonet-proxy/examples/rate_limiter.rs b/bongonet-proxy/examples/rate_limiter.rs index aec1828..884ad10 100644 --- a/bongonet-proxy/examples/rate_limiter.rs +++ b/bongonet-proxy/examples/rate_limiter.rs @@ -1,11 +1,11 @@ use async_trait::async_trait; +use once_cell::sync::Lazy; use bongonet_core::prelude::*; use bongonet_http::{RequestHeader, ResponseHeader}; use bongonet_limits::rate::Rate; use bongonet_load_balancing::prelude::{RoundRobin, TcpHealthCheck}; use bongonet_load_balancing::LoadBalancer; use bongonet_proxy::{http_proxy_service, ProxyHttp, Session}; -use once_cell::sync::Lazy; use std::sync::Arc; use std::time::Duration; diff --git a/bongonet-proxy/src/lib.rs b/bongonet-proxy/src/lib.rs index 8b13789..4ef5248 100644 --- a/bongonet-proxy/src/lib.rs +++ b/bongonet-proxy/src/lib.rs @@ -1 +1,785 @@ +// Copyright 2024 Khulnasoft, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +//! # bongonet-proxy +//! +//! Programmable HTTP proxy built on top of [bongonet_core]. +//! +//! # Features +//! - HTTP/1.x and HTTP/2 for both downstream and upstream +//! - Connection pooling +//! - TLSv1.3, mutual TLS, customizable CA +//! - Request/Response scanning, modification or rejection +//! - Dynamic upstream selection +//! - Configurable retry and failover +//! - Fully programmable and customizable at any stage of a HTTP request +//! +//! # How to use +//! +//! Users of this crate defines their proxy by implementing [ProxyHttp] trait, which contains the +//! callbacks to be invoked at each stage of a HTTP request. +//! +//! Then the service can be passed into [`http_proxy_service()`] for a [bongonet_core::server::Server] to +//! run it. +//! +//! See `examples/load_balancer.rs` for a detailed example. + +use async_trait::async_trait; +use bytes::Bytes; +use futures::future::FutureExt; +use http::{header, version::Version}; +use log::{debug, error, trace, warn}; +use once_cell::sync::Lazy; +use bongonet_http::{RequestHeader, ResponseHeader}; +use std::fmt::Debug; +use std::str; +use std::sync::Arc; +use tokio::sync::{mpsc, Notify}; +use tokio::time; + +use bongonet_cache::NoCacheReason; +use bongonet_core::apps::{HttpServerApp, HttpServerOptions}; +use bongonet_core::connectors::{http::Connector, ConnectorOptions}; +use bongonet_core::modules::http::compression::ResponseCompressionBuilder; +use bongonet_core::modules::http::{HttpModuleCtx, HttpModules}; +use bongonet_core::protocols::http::client::HttpSession as ClientSession; +use bongonet_core::protocols::http::v1::client::HttpSession as HttpSessionV1; +use bongonet_core::protocols::http::HttpTask; +use bongonet_core::protocols::http::ServerSession as HttpSession; +use bongonet_core::protocols::http::SERVER_NAME; +use bongonet_core::protocols::Stream; +use bongonet_core::protocols::{Digest, UniqueID}; +use bongonet_core::server::configuration::ServerConf; +use bongonet_core::server::ShutdownWatch; +use bongonet_core::upstreams::peer::{HttpPeer, Peer}; +use bongonet_error::{Error, ErrorSource, ErrorType::*, OrErr, Result}; + +const MAX_RETRIES: usize = 16; +const TASK_BUFFER_SIZE: usize = 4; + +mod proxy_cache; +mod proxy_common; +mod proxy_h1; +mod proxy_h2; +mod proxy_purge; +mod proxy_trait; +mod subrequest; + +use subrequest::Ctx as SubReqCtx; + +pub use proxy_purge::PurgeStatus; +pub use proxy_trait::ProxyHttp; + +pub mod prelude { + pub use crate::{http_proxy_service, ProxyHttp, Session}; +} + +/// The concrete type that holds the user defined HTTP proxy. +/// +/// Users don't need to interact with this object directly. +pub struct HttpProxy { + inner: SV, // TODO: name it better than inner + client_upstream: Connector, + shutdown: Notify, + pub server_options: Option, + pub downstream_modules: HttpModules, +} + +impl HttpProxy { + fn new(inner: SV, conf: Arc) -> Self { + HttpProxy { + inner, + client_upstream: Connector::new(Some(ConnectorOptions::from_server_conf(&conf))), + shutdown: Notify::new(), + server_options: None, + downstream_modules: HttpModules::new(), + } + } + + fn handle_init_modules(&mut self) + where + SV: ProxyHttp, + { + self.inner + .init_downstream_modules(&mut self.downstream_modules); + } + + async fn handle_new_request( + &self, + mut downstream_session: Box, + ) -> Option> + where + SV: ProxyHttp + Send + Sync, + SV::CTX: Send + Sync, + { + // phase 1 read request header + + let res = tokio::select! { + biased; // biased select is cheaper, and we don't want to drop already buffered requests + res = downstream_session.read_request() => { res } + _ = self.shutdown.notified() => { + // service shutting down, dropping the connection to stop more req from coming in + return None; + } + }; + match res { + Ok(true) => { + // TODO: check n==0 + debug!("Successfully get a new request"); + } + Ok(false) => { + return None; // TODO: close connection? + } + Err(mut e) => { + e.as_down(); + error!("Fail to proxy: {}", e); + if matches!(e.etype, InvalidHTTPHeader) { + downstream_session.respond_error(400).await; + } // otherwise the connection must be broken, no need to send anything + downstream_session.shutdown().await; + return None; + } + } + trace!( + "Request header: {:?}", + downstream_session.req_header().as_ref() + ); + Some(downstream_session) + } + + // return bool: server_session can be reused, and error if any + async fn proxy_to_upstream( + &self, + session: &mut Session, + ctx: &mut SV::CTX, + ) -> (bool, Option>) + where + SV: ProxyHttp + Send + Sync, + SV::CTX: Send + Sync, + { + let peer = match self.inner.upstream_peer(session, ctx).await { + Ok(p) => p, + Err(e) => return (false, Some(e)), + }; + + let client_session = self.client_upstream.get_http_session(&*peer).await; + match client_session { + Ok((client_session, client_reused)) => { + let (server_reused, error) = match client_session { + ClientSession::H1(mut h1) => { + let (server_reused, client_reuse, error) = self + .proxy_to_h1_upstream(session, &mut h1, client_reused, &peer, ctx) + .await; + if client_reuse { + let session = ClientSession::H1(h1); + self.client_upstream + .release_http_session(session, &*peer, peer.idle_timeout()) + .await; + } + (server_reused, error) + } + ClientSession::H2(mut h2) => { + let (server_reused, mut error) = self + .proxy_to_h2_upstream(session, &mut h2, client_reused, &peer, ctx) + .await; + let session = ClientSession::H2(h2); + self.client_upstream + .release_http_session(session, &*peer, peer.idle_timeout()) + .await; + + if let Some(e) = error.as_mut() { + // try to downgrade if A. origin says so or B. origin sends an invalid + // response, which usually means origin h2 is not production ready + if matches!(e.etype, H2Downgrade | InvalidH2) { + if peer + .get_alpn() + .map_or(true, |alpn| alpn.get_min_http_version() == 1) + { + // Add the peer to prefer h1 so that all following requests + // will use h1 + self.client_upstream.prefer_h1(&*peer); + } else { + // the peer doesn't allow downgrading to h1 (e.g. gRPC) + e.retry = false.into(); + } + } + } + + (server_reused, error) + } + }; + ( + server_reused, + error.map(|e| { + self.inner + .error_while_proxy(&peer, session, e, ctx, client_reused) + }), + ) + } + Err(mut e) => { + e.as_up(); + let new_err = self.inner.fail_to_connect(session, &peer, ctx, e); + (false, Some(new_err.into_up())) + } + } + } + + fn upstream_filter( + &self, + session: &mut Session, + task: &mut HttpTask, + ctx: &mut SV::CTX, + ) -> Result<()> + where + SV: ProxyHttp, + { + match task { + HttpTask::Header(header, _eos) => { + self.inner.upstream_response_filter(session, header, ctx) + } + HttpTask::Body(data, eos) => self + .inner + .upstream_response_body_filter(session, data, *eos, ctx), + HttpTask::Trailer(Some(trailers)) => self + .inner + .upstream_response_trailer_filter(session, trailers, ctx)?, + _ => { + // task does not support a filter + } + } + Ok(()) + } + + async fn finish( + &self, + mut session: Session, + ctx: &mut SV::CTX, + reuse: bool, + error: Option<&Error>, + ) -> Option + where + SV: ProxyHttp + Send + Sync, + SV::CTX: Send + Sync, + { + self.inner.logging(&mut session, error, ctx).await; + + if reuse { + // TODO: log error + session.downstream_session.finish().await.ok().flatten() + } else { + None + } + } +} + +use bongonet_cache::HttpCache; +use bongonet_core::protocols::http::compression::ResponseCompressionCtx; + +/// The established HTTP session +/// +/// This object is what users interact with in order to access the request itself or change the proxy +/// behavior. +pub struct Session { + /// the HTTP session to downstream (the client) + pub downstream_session: Box, + /// The interface to control HTTP caching + pub cache: HttpCache, + /// (de)compress responses coming into the proxy (from upstream) + pub upstream_compression: ResponseCompressionCtx, + /// ignore downstream range (skip downstream range filters) + pub ignore_downstream_range: bool, + // the context from parent request + subrequest_ctx: Option>, + // Downstream filter modules + pub downstream_modules_ctx: HttpModuleCtx, +} + +impl Session { + fn new( + downstream_session: impl Into>, + downstream_modules: &HttpModules, + ) -> Self { + Session { + downstream_session: downstream_session.into(), + cache: HttpCache::new(), + // disable both upstream and downstream compression + upstream_compression: ResponseCompressionCtx::new(0, false, false), + ignore_downstream_range: false, + subrequest_ctx: None, + downstream_modules_ctx: downstream_modules.build_ctx(), + } + } + + /// Create a new [Session] from the given [Stream] + /// + /// This function is mostly used for testing and mocking. + pub fn new_h1(stream: Stream) -> Self { + let modules = HttpModules::new(); + Self::new(Box::new(HttpSession::new_http1(stream)), &modules) + } + + /// Create a new [Session] from the given [Stream] with modules + /// + /// This function is mostly used for testing and mocking. + pub fn new_h1_with_modules(stream: Stream, downstream_modules: &HttpModules) -> Self { + Self::new(Box::new(HttpSession::new_http1(stream)), downstream_modules) + } + + pub fn as_downstream_mut(&mut self) -> &mut HttpSession { + &mut self.downstream_session + } + + pub fn as_downstream(&self) -> &HttpSession { + &self.downstream_session + } + + /// Write HTTP response with the given error code to the downstream + pub async fn respond_error(&mut self, error: u16) -> Result<()> { + let resp = HttpSession::generate_error(error); + self.write_response_header(Box::new(resp), true) + .await + .unwrap_or_else(|e| { + self.downstream_session.set_keepalive(None); + error!("failed to send error response to downstream: {e}"); + }); + Ok(()) + } + + /// Write the given HTTP response header to the downstream + /// + /// Different from directly calling [HttpSession::write_response_header], this function also + /// invokes the filter modules. + pub async fn write_response_header( + &mut self, + mut resp: Box, + end_of_stream: bool, + ) -> Result<()> { + self.downstream_modules_ctx + .response_header_filter(&mut resp, end_of_stream) + .await?; + self.downstream_session.write_response_header(resp).await + } + + /// Write the given HTTP response body chunk to the downstream + /// + /// Different from directly calling [HttpSession::write_response_body], this function also + /// invokes the filter modules. + pub async fn write_response_body( + &mut self, + mut body: Option, + end_of_stream: bool, + ) -> Result<()> { + self.downstream_modules_ctx + .response_body_filter(&mut body, end_of_stream)?; + + if body.is_none() && !end_of_stream { + return Ok(()); + } + + let data = body.unwrap_or_default(); + self.downstream_session + .write_response_body(data, end_of_stream) + .await + } + + pub async fn write_response_tasks(&mut self, mut tasks: Vec) -> Result { + for task in tasks.iter_mut() { + match task { + HttpTask::Header(resp, end) => { + self.downstream_modules_ctx + .response_header_filter(resp, *end) + .await?; + } + HttpTask::Body(data, end) => { + self.downstream_modules_ctx + .response_body_filter(data, *end)?; + } + HttpTask::Trailer(trailers) => { + if let Some(buf) = self + .downstream_modules_ctx + .response_trailer_filter(trailers)? + { + // Write the trailers into the body if the filter + // returns a buffer. + // + // Note, this will not work if end of stream has already + // been seen or we've written content-length bytes. + *task = HttpTask::Body(Some(buf), true); + } + } + _ => { /* Done or Failed */ } + } + } + self.downstream_session.response_duplex_vec(tasks).await + } +} + +impl AsRef for Session { + fn as_ref(&self) -> &HttpSession { + &self.downstream_session + } +} + +impl AsMut for Session { + fn as_mut(&mut self) -> &mut HttpSession { + &mut self.downstream_session + } +} + +use std::ops::{Deref, DerefMut}; + +impl Deref for Session { + type Target = HttpSession; + + fn deref(&self) -> &Self::Target { + &self.downstream_session + } +} + +impl DerefMut for Session { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.downstream_session + } +} + +// generic HTTP 502 response sent when proxy_upstream_filter refuses to connect to upstream +static BAD_GATEWAY: Lazy = Lazy::new(|| { + let mut resp = ResponseHeader::build(http::StatusCode::BAD_GATEWAY, Some(3)).unwrap(); + resp.insert_header(header::SERVER, &SERVER_NAME[..]) + .unwrap(); + resp.insert_header(header::CONTENT_LENGTH, 0).unwrap(); + resp.insert_header(header::CACHE_CONTROL, "private, no-store") + .unwrap(); + + resp +}); + +impl HttpProxy { + async fn process_request( + self: &Arc, + mut session: Session, + mut ctx: ::CTX, + ) -> Option + where + SV: ProxyHttp + Send + Sync + 'static, + ::CTX: Send + Sync, + { + if let Err(e) = self + .inner + .early_request_filter(&mut session, &mut ctx) + .await + { + self.handle_error(&mut session, &mut ctx, e, "Fail to early filter request:") + .await; + return None; + } + + let req = session.downstream_session.req_header_mut(); + + // Built-in downstream request filters go first + if let Err(e) = session + .downstream_modules_ctx + .request_header_filter(req) + .await + { + self.handle_error( + &mut session, + &mut ctx, + e, + "Failed in downstream modules request filter:", + ) + .await; + return None; + } + + match self.inner.request_filter(&mut session, &mut ctx).await { + Ok(response_sent) => { + if response_sent { + // TODO: log error + self.inner.logging(&mut session, None, &mut ctx).await; + return session.downstream_session.finish().await.ok().flatten(); + } + /* else continue */ + } + Err(e) => { + self.handle_error(&mut session, &mut ctx, e, "Fail to filter request:") + .await; + return None; + } + } + + if let Some((reuse, err)) = self.proxy_cache(&mut session, &mut ctx).await { + // cache hit + return self.finish(session, &mut ctx, reuse, err.as_deref()).await; + } + // either uncacheable, or cache miss + + // decide if the request is allowed to go to upstream + match self + .inner + .proxy_upstream_filter(&mut session, &mut ctx) + .await + { + Ok(proxy_to_upstream) => { + if !proxy_to_upstream { + // The hook can choose to write its own response, but if it doesn't, we respond + // with a generic 502 + if session.response_written().is_none() { + match session.write_response_header_ref(&BAD_GATEWAY).await { + Ok(()) => {} + Err(e) => { + self.handle_error( + &mut session, + &mut ctx, + e, + "Error responding with Bad Gateway:", + ) + .await; + + return None; + } + } + } + + return self.finish(session, &mut ctx, false, None).await; + } + /* else continue */ + } + Err(e) => { + self.handle_error( + &mut session, + &mut ctx, + e, + "Error deciding if we should proxy to upstream:", + ) + .await; + return None; + } + } + + let mut retries: usize = 0; + + let mut server_reuse = false; + let mut proxy_error: Option> = None; + + while retries < MAX_RETRIES { + retries += 1; + + let (reuse, e) = self.proxy_to_upstream(&mut session, &mut ctx).await; + server_reuse = reuse; + + match e { + Some(error) => { + let retry = error.retry(); + proxy_error = Some(error); + if !retry { + break; + } + // only log error that will be retried here, the final error will be logged below + warn!( + "Fail to proxy: {}, tries: {}, retry: {}, {}", + proxy_error.as_ref().unwrap(), + retries, + retry, + self.inner.request_summary(&session, &ctx) + ); + } + None => { + proxy_error = None; + break; + } + }; + } + + // serve stale if error + // Check both error and cache before calling the function because await is not cheap + let serve_stale_result = if proxy_error.is_some() && session.cache.can_serve_stale_error() { + self.handle_stale_if_error(&mut session, &mut ctx, proxy_error.as_ref().unwrap()) + .await + } else { + None + }; + + let final_error = if let Some((reuse, stale_cache_error)) = serve_stale_result { + // don't reuse server conn if serve stale polluted it + server_reuse = server_reuse && reuse; + stale_cache_error + } else { + proxy_error + }; + + if let Some(e) = final_error.as_ref() { + // If we have errored and are still holding a cache lock, release it. + session.cache.disable(NoCacheReason::InternalError); + let status = self.inner.fail_to_proxy(&mut session, e, &mut ctx).await; + + // final error will have > 0 status unless downstream connection is dead + if !self.inner.suppress_error_log(&session, &ctx, e) { + error!( + "Fail to proxy: {}, status: {}, tries: {}, retry: {}, {}", + final_error.as_ref().unwrap(), + status, + retries, + false, // we never retry here + self.inner.request_summary(&session, &ctx) + ); + } + } + + // logging() will be called in finish() + self.finish(session, &mut ctx, server_reuse, final_error.as_deref()) + .await + } + + async fn handle_error( + &self, + session: &mut Session, + ctx: &mut ::CTX, + e: Box, + context: &str, + ) where + SV: ProxyHttp + Send + Sync + 'static, + ::CTX: Send + Sync, + { + if !self.inner.suppress_error_log(session, ctx, &e) { + error!( + "{context} {}, {}", + e, + self.inner.request_summary(session, ctx) + ); + } + self.inner.fail_to_proxy(session, &e, ctx).await; + self.inner.logging(session, Some(&e), ctx).await; + } +} + +/* Make process_subrequest() a trait to workaround https://github.com/rust-lang/rust/issues/78649 + if process_subrequest() is implemented as a member of HttpProxy, rust complains + +error[E0391]: cycle detected when computing type of `proxy_cache::::proxy_cache::{opaque#0}` + --> bongonet-proxy/src/proxy_cache.rs:13:10 + | +13 | ) -> Option<(bool, Option>)> + +*/ +#[async_trait] +trait Subrequest { + async fn process_subrequest( + self: &Arc, + session: Box, + sub_req_ctx: Box, + ); +} + +#[async_trait] +impl Subrequest for HttpProxy +where + SV: ProxyHttp + Send + Sync + 'static, + ::CTX: Send + Sync, +{ + async fn process_subrequest( + self: &Arc, + session: Box, + sub_req_ctx: Box, + ) { + debug!("starting subrequest"); + let mut session = match self.handle_new_request(session).await { + Some(downstream_session) => Session::new(downstream_session, &self.downstream_modules), + None => return, // bad request + }; + + // no real downstream to keepalive, but it doesn't matter what is set here because at the end + // of this fn the dummy connection will be dropped + session.set_keepalive(None); + + session.subrequest_ctx.replace(sub_req_ctx); + trace!("processing subrequest"); + let ctx = self.inner.new_ctx(); + self.process_request(session, ctx).await; + trace!("subrequest done"); + } +} + +#[async_trait] +impl HttpServerApp for HttpProxy +where + SV: ProxyHttp + Send + Sync + 'static, + ::CTX: Send + Sync, +{ + async fn process_new_http( + self: &Arc, + session: HttpSession, + shutdown: &ShutdownWatch, + ) -> Option { + let session = Box::new(session); + + // TODO: keepalive pool, use stack + let mut session = match self.handle_new_request(session).await { + Some(downstream_session) => Session::new(downstream_session, &self.downstream_modules), + None => return None, // bad request + }; + + if *shutdown.borrow() { + // stop downstream from reusing if this service is shutting down soon + session.set_keepalive(None); + } else { + // default 60s + session.set_keepalive(Some(60)); + } + + let ctx = self.inner.new_ctx(); + self.process_request(session, ctx).await + } + + async fn http_cleanup(&self) { + // Notify all keepalived requests blocking on read_request() to abort + self.shutdown.notify_waiters(); + + // TODO: impl shutting down flag so that we don't need to read stack.is_shutting_down() + } + + fn server_options(&self) -> Option<&HttpServerOptions> { + self.server_options.as_ref() + } + + // TODO implement h2_options +} + +use bongonet_core::services::listening::Service; + +/// Create a [Service] from the user implemented [ProxyHttp]. +/// +/// The returned [Service] can be hosted by a [bongonet_core::server::Server] directly. +pub fn http_proxy_service(conf: &Arc, inner: SV) -> Service> +where + SV: ProxyHttp, +{ + http_proxy_service_with_name(conf, inner, "Bongonet HTTP Proxy Service") +} + +/// Create a [Service] from the user implemented [ProxyHttp]. +/// +/// The returned [Service] can be hosted by a [bongonet_core::server::Server] directly. +pub fn http_proxy_service_with_name( + conf: &Arc, + inner: SV, + name: &str, +) -> Service> +where + SV: ProxyHttp, +{ + let mut proxy = HttpProxy::new(inner, conf.clone()); + proxy.handle_init_modules(); + Service::new(name.to_string(), proxy) +} diff --git a/bongonet-proxy/src/proxy_cache.rs b/bongonet-proxy/src/proxy_cache.rs index a48635a..88e76b8 100644 --- a/bongonet-proxy/src/proxy_cache.rs +++ b/bongonet-proxy/src/proxy_cache.rs @@ -13,6 +13,7 @@ // limitations under the License. use super::*; +use http::StatusCode; use bongonet_cache::key::CacheHashKey; use bongonet_cache::lock::LockStatus; use bongonet_cache::max_file_size::ERR_RESPONSE_TOO_LARGE; @@ -20,7 +21,6 @@ use bongonet_cache::{HitStatus, RespCacheable::*}; use bongonet_core::protocols::http::conditional_filter::to_304; use bongonet_core::protocols::http::v1::common::header_value_content_length; use bongonet_core::ErrorType; -use http::StatusCode; impl HttpProxy { // return bool: server_session can be reused, and error if any diff --git a/bongonet-proxy/src/proxy_h2.rs b/bongonet-proxy/src/proxy_h2.rs index 37872e0..8dd8ea5 100644 --- a/bongonet-proxy/src/proxy_h2.rs +++ b/bongonet-proxy/src/proxy_h2.rs @@ -124,6 +124,9 @@ impl HttpProxy { session.upstream_compression.request_filter(&req); let body_empty = session.as_mut().is_body_empty(); + // whether we support sending END_STREAM on HEADERS if body is empty + let send_end_stream = req.send_end_stream().expect("req must be h2"); + let mut req: http::request::Parts = req.into(); // H2 requires authority to be set, so copy that from H1 host if that is set @@ -133,27 +136,25 @@ impl HttpProxy { } } - debug!("Request to h2: {:?}", req); + debug!("Request to h2: {req:?}"); - // don't send END_STREAM on HEADERS for no_header_eos - let send_header_eos = !peer.options.no_header_eos && body_empty; + // send END_STREAM on HEADERS + let send_header_eos = send_end_stream && body_empty; + debug!("send END_STREAM on HEADERS: {send_end_stream}"); let req = Box::new(RequestHeader::from(req)); - match client_session.write_request_header(req, send_header_eos) { - Ok(v) => v, - Err(e) => { - return (false, Some(e.into_up())); - } - }; + if let Err(e) = client_session.write_request_header(req, send_header_eos) { + return (false, Some(e.into_up())); + } - // send END_STREAM on empty DATA frame for no_headers_eos - if peer.options.no_header_eos && body_empty { + if !send_end_stream && body_empty { + // send END_STREAM on empty DATA frame match client_session.write_request_body(Bytes::new(), true) { Ok(()) => debug!("sent empty DATA frame to h2"), Err(e) => { return (false, Some(e.into_up())); } - }; + } } client_session.read_timeout = peer.options.read_timeout; diff --git a/bongonet-proxy/src/subrequest.rs b/bongonet-proxy/src/subrequest.rs index 15a12af..9065aa9 100644 --- a/bongonet-proxy/src/subrequest.rs +++ b/bongonet-proxy/src/subrequest.rs @@ -13,13 +13,13 @@ // limitations under the License. use async_trait::async_trait; +use core::pin::Pin; +use core::task::{Context, Poll}; use bongonet_cache::lock::WritePermit; use bongonet_core::protocols::raw_connect::ProxyDigest; use bongonet_core::protocols::{ GetProxyDigest, GetSocketDigest, GetTimingDigest, SocketDigest, Ssl, TimingDigest, UniqueID, }; -use core::pin::Pin; -use core::task::{Context, Poll}; use std::io::Cursor; use std::sync::Arc; use tokio::io::{AsyncRead, AsyncWrite, Error, ReadBuf}; diff --git a/bongonet-proxy/tests/test_upstream.rs b/bongonet-proxy/tests/test_upstream.rs index 25ceb7f..9046421 100644 --- a/bongonet-proxy/tests/test_upstream.rs +++ b/bongonet-proxy/tests/test_upstream.rs @@ -1077,9 +1077,6 @@ mod test_cache { init(); let url = "http://127.0.0.1:6148/sleep/test_cache_lock_network_error.txt"; - // FIXME: Dangling lock happens in this test because the first request aborted without - // properly release the lock. This is a bug - let task1 = tokio::spawn(async move { let res = reqwest::Client::new() .get(url) @@ -1519,6 +1516,43 @@ mod test_cache { task3.await.unwrap(); } + #[tokio::test] + async fn test_cache_streaming_multiple_writers() { + // multiple streaming writers don't conflict + init(); + let url = "http://127.0.0.1:6148/slow_body/test_cache_streaming_multiple_writers.txt"; + let task1 = tokio::spawn(async move { + let res = reqwest::Client::new() + .get(url) + .header("x-set-hello", "everyone") + .send() + .await + .unwrap(); + assert_eq!(res.status(), StatusCode::OK); + let headers = res.headers(); + assert_eq!(headers["x-cache-status"], "miss"); + assert_eq!(res.text().await.unwrap(), "hello everyone!"); + }); + + let task2 = tokio::spawn(async move { + let res = reqwest::Client::new() + .get(url) + // don't allow using the other streaming write's result + .header("x-force-expire", "1") + .header("x-set-hello", "todo el mundo") + .send() + .await + .unwrap(); + assert_eq!(res.status(), StatusCode::OK); + let headers = res.headers(); + assert_eq!(headers["x-cache-status"], "miss"); + assert_eq!(res.text().await.unwrap(), "hello todo el mundo!"); + }); + + task1.await.unwrap(); + task2.await.unwrap(); + } + #[tokio::test] async fn test_range_request() { init(); diff --git a/bongonet-proxy/tests/utils/cert.rs b/bongonet-proxy/tests/utils/cert.rs index 0db9ee2..92b4307 100644 --- a/bongonet-proxy/tests/utils/cert.rs +++ b/bongonet-proxy/tests/utils/cert.rs @@ -12,9 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +use once_cell::sync::Lazy; use bongonet_core::tls::pkey::{PKey, Private}; use bongonet_core::tls::x509::X509; -use once_cell::sync::Lazy; use std::fs; pub static ROOT_CERT: Lazy = Lazy::new(|| load_cert("keys/root.crt")); diff --git a/bongonet-proxy/tests/utils/conf/origin/conf/nginx.conf b/bongonet-proxy/tests/utils/conf/origin/conf/nginx.conf index a41a743..6d5abd7 100644 --- a/bongonet-proxy/tests/utils/conf/origin/conf/nginx.conf +++ b/bongonet-proxy/tests/utils/conf/origin/conf/nginx.conf @@ -408,12 +408,13 @@ http { location /slow_body { content_by_lua_block { local sleep_sec = tonumber(ngx.var.http_x_set_sleep) or 1 + local hello_to = ngx.var.http_x_set_hello or "world" ngx.flush() ngx.sleep(sleep_sec) ngx.print("hello ") ngx.flush() ngx.sleep(sleep_sec) - ngx.print("world") + ngx.print(hello_to) ngx.sleep(sleep_sec) ngx.print("!") } diff --git a/bongonet-proxy/tests/utils/server_utils.rs b/bongonet-proxy/tests/utils/server_utils.rs index be6f2a8..7cdea14 100644 --- a/bongonet-proxy/tests/utils/server_utils.rs +++ b/bongonet-proxy/tests/utils/server_utils.rs @@ -14,6 +14,10 @@ use super::cert; use async_trait::async_trait; +use clap::Parser; +use http::header::VARY; +use http::HeaderValue; +use once_cell::sync::Lazy; use bongonet_cache::cache_control::CacheControl; use bongonet_cache::key::HashBinary; use bongonet_cache::VarianceBuilder; @@ -32,10 +36,6 @@ use bongonet_core::utils::CertKey; use bongonet_error::{Error, ErrorSource, Result}; use bongonet_http::{RequestHeader, ResponseHeader}; use bongonet_proxy::{ProxyHttp, Session}; -use clap::Parser; -use http::header::VARY; -use http::HeaderValue; -use once_cell::sync::Lazy; use std::collections::{HashMap, HashSet}; use std::sync::Arc; use std::thread; @@ -399,6 +399,19 @@ impl ProxyHttp for ExampleProxyCache { Ok(()) } + async fn cache_hit_filter( + &self, + session: &Session, + _meta: &CacheMeta, + _ctx: &mut Self::CTX, + ) -> Result { + // allow test header to control force expiry + if session.get_header_bytes("x-force-expire") != b"" { + return Ok(true); + } + Ok(false) + } + fn cache_vary_filter( &self, meta: &CacheMeta,