Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sync commits #59

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .bleep
Original file line number Diff line number Diff line change
@@ -1 +1 @@
819e3bc852ba912c6fcf02bc9aa4f2c5d6a61d79
c90e4ce2596840c60b5ff1737e2141447e5953e1
2 changes: 1 addition & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ jobs:

- name: Run cargo clippy
run: |
[[ ${{ matrix.toolchain }} == nightly ]] || cargo clippy --all-targets --all -- --deny=warnings
[[ ${{ matrix.toolchain }} == nightly ]] || cargo clippy --all-targets --all -- --allow=unknown-lints --deny=warnings

- name: Run cargo audit
run: |
Expand Down
1 change: 1 addition & 0 deletions pingora-cache/src/eviction/lru.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use std::time::SystemTime;
///
/// - Space optimized in-memory LRU (see [pingora_lru]).
/// - Instead of a single giant LRU, this struct shards the assets into `N` independent LRUs.
///
/// This allows [EvictionManager::save()] not to lock the entire cache manager while performing
/// serialization.
pub struct Manager<const N: usize>(Lru<CompactCacheKey, N>);
Expand Down
6 changes: 3 additions & 3 deletions pingora-cache/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -344,10 +344,10 @@ impl HttpCache {
/// - `storage`: the cache storage backend that implements [storage::Storage]
/// - `eviction`: optionally the eviction manager, without it, nothing will be evicted from the storage
/// - `predictor`: optionally a cache predictor. The cache predictor predicts whether something is likely
/// to be cacheable or not. This is useful because the proxy can apply different types of optimization to
/// cacheable and uncacheable requests.
/// to be cacheable or not. This is useful because the proxy can apply different types of optimization to
/// cacheable and uncacheable requests.
/// - `cache_lock`: optionally a cache lock which handles concurrent lookups to the same asset. Without it
/// such lookups will all be allowed to fetch the asset independently.
/// such lookups will all be allowed to fetch the asset independently.
pub fn enable(
&mut self,
storage: &'static (dyn storage::Storage + Sync),
Expand Down
2 changes: 1 addition & 1 deletion pingora-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,4 @@ jemallocator = "0.5"
default = ["openssl"]
openssl = ["pingora-openssl"]
boringssl = ["pingora-boringssl"]
patched_http1 = []
patched_http1 = []
6 changes: 3 additions & 3 deletions pingora-core/src/apps/http_app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use crate::protocols::Stream;
use crate::server::ShutdownWatch;

/// This trait defines how to map a request to a response
#[cfg_attr(not(doc_async_trait), async_trait)]
#[async_trait]
pub trait ServeHttp {
/// Define the mapping from a request to a response.
/// Note that the request header is already read, but the implementation needs to read the
Expand All @@ -42,7 +42,7 @@ pub trait ServeHttp {
}

// TODO: remove this in favor of HttpServer?
#[cfg_attr(not(doc_async_trait), async_trait)]
#[async_trait]
impl<SV> HttpServerApp for SV
where
SV: ServeHttp + Send + Sync,
Expand Down Expand Up @@ -128,7 +128,7 @@ impl<SV> HttpServer<SV> {
}
}

#[cfg_attr(not(doc_async_trait), async_trait)]
#[async_trait]
impl<SV> HttpServerApp for HttpServer<SV>
where
SV: ServeHttp + Send + Sync,
Expand Down
6 changes: 3 additions & 3 deletions pingora-core/src/apps/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use crate::protocols::Digest;
use crate::protocols::Stream;
use crate::protocols::ALPN;

#[cfg_attr(not(doc_async_trait), async_trait)]
#[async_trait]
/// This trait defines the interface of a transport layer (TCP or TLS) application.
pub trait ServerApp {
/// Whenever a new connection is established, this function will be called with the established
Expand Down Expand Up @@ -62,7 +62,7 @@ pub struct HttpServerOptions {
}

/// This trait defines the interface of an HTTP application.
#[cfg_attr(not(doc_async_trait), async_trait)]
#[async_trait]
pub trait HttpServerApp {
/// Similar to the [`ServerApp`], this function is called whenever a new HTTP session is established.
///
Expand Down Expand Up @@ -95,7 +95,7 @@ pub trait HttpServerApp {
async fn http_cleanup(&self) {}
}

#[cfg_attr(not(doc_async_trait), async_trait)]
#[async_trait]
impl<T> ServerApp for T
where
T: HttpServerApp + Send + Sync + 'static,
Expand Down
2 changes: 1 addition & 1 deletion pingora-core/src/apps/prometheus_http_app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use crate::protocols::http::ServerSession;
/// collected via the [Prometheus](https://docs.rs/prometheus/) crate;
pub struct PrometheusHttpApp;

#[cfg_attr(not(doc_async_trait), async_trait)]
#[async_trait]
impl ServeHttp for PrometheusHttpApp {
async fn response(&self, _http_session: &mut ServerSession) -> Response<Vec<u8>> {
let encoder = TextEncoder::new();
Expand Down
160 changes: 98 additions & 62 deletions pingora-core/src/connectors/l4.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use async_trait::async_trait;
use log::debug;
use pingora_error::{Context, Error, ErrorType::*, OrErr, Result};
use rand::seq::SliceRandom;
Expand All @@ -26,6 +27,12 @@ use crate::protocols::l4::stream::Stream;
use crate::protocols::{GetSocketDigest, SocketDigest};
use crate::upstreams::peer::Peer;

/// The interface to establish a L4 connection
#[async_trait]
pub trait Connect: std::fmt::Debug {
async fn connect(&self, addr: &SocketAddr) -> Result<Stream>;
}

/// Establish a connection (l4) to the given peer using its settings and an optional bind address.
pub async fn connect<P>(peer: &P, bind_to: Option<InetSocketAddr>) -> Result<Stream>
where
Expand All @@ -37,72 +44,78 @@ where
.err_context(|| format!("Fail to establish CONNECT proxy: {}", peer));
}
let peer_addr = peer.address();
let mut stream: Stream = match peer_addr {
SocketAddr::Inet(addr) => {
let connect_future = tcp_connect(addr, bind_to.as_ref(), |socket| {
if peer.tcp_fast_open() {
set_tcp_fastopen_connect(socket.as_raw_fd())?;
}
if let Some(recv_buf) = peer.tcp_recv_buf() {
debug!("Setting recv buf size");
set_recv_buf(socket.as_raw_fd(), recv_buf)?;
}
if let Some(dscp) = peer.dscp() {
debug!("Setting dscp");
set_dscp(socket.as_raw_fd(), dscp)?;
}
Ok(())
});
let conn_res = match peer.connection_timeout() {
Some(t) => pingora_timeout::timeout(t, connect_future)
.await
.explain_err(ConnectTimedout, |_| {
format!("timeout {t:?} connecting to server {peer}")
})?,
None => connect_future.await,
};
match conn_res {
Ok(socket) => {
debug!("connected to new server: {}", peer.address());
Ok(socket.into())
}
Err(e) => {
let c = format!("Fail to connect to {peer}");
match e.etype() {
SocketError | BindError => Error::e_because(InternalError, c, e),
_ => Err(e.more_context(c)),
let mut stream: Stream =
if let Some(custom_l4) = peer.get_peer_options().and_then(|o| o.custom_l4.as_ref()) {
custom_l4.connect(peer_addr).await?
} else {
match peer_addr {
SocketAddr::Inet(addr) => {
let connect_future = tcp_connect(addr, bind_to.as_ref(), |socket| {
if peer.tcp_fast_open() {
set_tcp_fastopen_connect(socket.as_raw_fd())?;
}
if let Some(recv_buf) = peer.tcp_recv_buf() {
debug!("Setting recv buf size");
set_recv_buf(socket.as_raw_fd(), recv_buf)?;
}
if let Some(dscp) = peer.dscp() {
debug!("Setting dscp");
set_dscp(socket.as_raw_fd(), dscp)?;
}
Ok(())
});
let conn_res = match peer.connection_timeout() {
Some(t) => pingora_timeout::timeout(t, connect_future)
.await
.explain_err(ConnectTimedout, |_| {
format!("timeout {t:?} connecting to server {peer}")
})?,
None => connect_future.await,
};
match conn_res {
Ok(socket) => {
debug!("connected to new server: {}", peer.address());
Ok(socket.into())
}
Err(e) => {
let c = format!("Fail to connect to {peer}");
match e.etype() {
SocketError | BindError => Error::e_because(InternalError, c, e),
_ => Err(e.more_context(c)),
}
}
}
}
}
}
SocketAddr::Unix(addr) => {
let connect_future = connect_uds(
addr.as_pathname()
.expect("non-pathname unix sockets not supported as peer"),
);
let conn_res = match peer.connection_timeout() {
Some(t) => pingora_timeout::timeout(t, connect_future)
.await
.explain_err(ConnectTimedout, |_| {
format!("timeout {t:?} connecting to server {peer}")
})?,
None => connect_future.await,
};
match conn_res {
Ok(socket) => {
debug!("connected to new server: {}", peer.address());
Ok(socket.into())
}
Err(e) => {
let c = format!("Fail to connect to {peer}");
match e.etype() {
SocketError | BindError => Error::e_because(InternalError, c, e),
_ => Err(e.more_context(c)),
SocketAddr::Unix(addr) => {
let connect_future = connect_uds(
addr.as_pathname()
.expect("non-pathname unix sockets not supported as peer"),
);
let conn_res = match peer.connection_timeout() {
Some(t) => pingora_timeout::timeout(t, connect_future)
.await
.explain_err(ConnectTimedout, |_| {
format!("timeout {t:?} connecting to server {peer}")
})?,
None => connect_future.await,
};
match conn_res {
Ok(socket) => {
debug!("connected to new server: {}", peer.address());
Ok(socket.into())
}
Err(e) => {
let c = format!("Fail to connect to {peer}");
match e.etype() {
SocketError | BindError => Error::e_because(InternalError, c, e),
_ => Err(e.more_context(c)),
}
}
}
}
}
}
}?;
}?
};

let tracer = peer.get_tracer();
if let Some(t) = tracer {
t.0.on_connected();
Expand Down Expand Up @@ -249,6 +262,29 @@ mod tests {
assert_eq!(new_session.unwrap_err().etype(), &ConnectTimedout)
}

#[tokio::test]
async fn test_custom_connect() {
#[derive(Debug)]
struct MyL4;
#[async_trait]
impl Connect for MyL4 {
async fn connect(&self, _addr: &SocketAddr) -> Result<Stream> {
tokio::net::TcpStream::connect("1.1.1.1:80")
.await
.map(|s| s.into())
.or_fail()
}
}
// :79 shouldn't be able to be connected to
let mut peer = BasicPeer::new("1.1.1.1:79");
peer.options.custom_l4 = Some(std::sync::Arc::new(MyL4 {}));

let new_session = connect(&peer, None).await;

// but MyL4 connects to :80 instead
assert!(new_session.is_ok());
}

#[tokio::test]
async fn test_connect_proxy_fail() {
let mut peer = HttpPeer::new("1.1.1.1:80".to_string(), false, "".to_string());
Expand Down
1 change: 1 addition & 0 deletions pingora-core/src/connectors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use crate::tls::ssl::SslConnector;
use crate::upstreams::peer::{Peer, ALPN};

use l4::connect as l4_connect;
pub use l4::Connect as L4Connect;
use log::{debug, error, warn};
use offload::OffloadRuntime;
use parking_lot::RwLock;
Expand Down
2 changes: 0 additions & 2 deletions pingora-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
#![allow(clippy::match_wild_err_arm)]
#![allow(clippy::missing_safety_doc)]
#![allow(clippy::upper_case_acronyms)]
// enable nightly feature async trait so that the docs are cleaner
#![cfg_attr(doc_async_trait, feature(async_fn_in_trait))]

//! # Pingora
//!
Expand Down
15 changes: 14 additions & 1 deletion pingora-core/src/protocols/http/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ impl Session {
/// else with the session.
/// - `Ok(true)`: successful
/// - `Ok(false)`: client exit without sending any bytes. This is normal on reused connection.
/// In this case the user should give up this session.
/// In this case the user should give up this session.
pub async fn read_request(&mut self) -> Result<bool> {
match self {
Self::H1(s) => {
Expand Down Expand Up @@ -218,6 +218,19 @@ impl Session {
}
}

/// Sets whether we ignore writing informational responses downstream.
///
/// For HTTP/1.1 this is a noop if the response is Upgrade or Continue and
/// Expect: 100-continue was set on the request.
///
/// This is a noop for h2 because informational responses are always ignored.
pub fn set_ignore_info_resp(&mut self, ignore: bool) {
match self {
Self::H1(s) => s.set_ignore_info_resp(ignore),
Self::H2(_) => {} // always ignored
}
}

/// Return a digest of the request including the method, path and Host header
// TODO: make this use a `Formatter`
pub fn request_summary(&self) -> String {
Expand Down
8 changes: 8 additions & 0 deletions pingora-core/src/protocols/http/v1/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,14 @@ pub(super) fn is_upgrade_req(req: &RequestHeader) -> bool {
req.version == http::Version::HTTP_11 && req.headers.get(header::UPGRADE).is_some()
}

pub(super) fn is_expect_continue_req(req: &RequestHeader) -> bool {
req.version == http::Version::HTTP_11
// https://www.rfc-editor.org/rfc/rfc9110#section-10.1.1
&& req.headers.get(header::EXPECT).map_or(false, |v| {
v.as_bytes().eq_ignore_ascii_case(b"100-continue")
})
}

// Unlike the upgrade check on request, this function doesn't check the Upgrade or Connection header
// because when seeing 101, we assume the server accepts to switch protocol.
// In reality it is not common that some servers don't send all the required headers to establish
Expand Down
Loading