Skip to content

Commit

Permalink
fix: lint
Browse files Browse the repository at this point in the history
  • Loading branch information
0xAlcibiades committed Dec 22, 2024
1 parent 4c45624 commit 0997bde
Show file tree
Hide file tree
Showing 4 changed files with 200 additions and 93 deletions.
5 changes: 1 addition & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,6 @@ readme = "README.md"
repository = "https://github.com/warlock-labs/postel"
version = "0.7.1"

[target.'cfg(not(target_env = "msvc"))'.dependencies]
tikv-jemallocator = { version = "0.6", optional = true }

[dependencies]
async-stream = "0.3.6"
bytes = "1.9.0"
Expand Down Expand Up @@ -50,6 +47,7 @@ tokio-util = { version = "0.7.13", features = ["compat"] }
tonic = "0.12.3"
tonic-health = "0.12.3"
tracing-subscriber = "0.3.19"
lazy_static = "1.5.0"

[[bench]]
name = "hello_world_tower_hyper_tls_tcp"
Expand All @@ -65,7 +63,6 @@ path = "examples/tonic.rs"

[features]
default = []
jemalloc = ["tikv-jemallocator"]
dev-profiling = ["pprof"]

[profile.release]
Expand Down
116 changes: 101 additions & 15 deletions benches/hello_world_tower_hyper_tls_tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use rustls::crypto::aws_lc_rs::Ticketer;
use rustls::pki_types::{CertificateDer, PrivatePkcs8KeyDer};
use rustls::server::ServerSessionMemoryCache;
use rustls::{ClientConfig, RootCertStore, ServerConfig};
use std::convert::Infallible;
use tokio::net::TcpSocket;
use tokio::runtime::Runtime;
use tokio::sync::oneshot;
Expand Down Expand Up @@ -206,27 +207,72 @@ fn generate_shared_ecdsa_config() -> TlsConfig {
fn create_optimized_runtime(thread_count: usize) -> io::Result<Runtime> {
tokio::runtime::Builder::new_multi_thread()
.worker_threads(thread_count)
.max_blocking_threads(thread_count * 2)
.max_blocking_threads(thread_count)
.enable_all()
.build()
}

#[inline]
async fn echo(req: Request<Incoming>) -> Result<Response<Full<Bytes>>, hyper::Error> {
match (req.method(), req.uri().path()) {
(&hyper::Method::GET, "/") => Ok(Response::new(Full::new(Bytes::from("Hello, World!")))),
(&hyper::Method::POST, "/echo") => {
let body = req.collect().await?.to_bytes();
Ok(Response::new(Full::new(body)))
}
_ => {
let mut res = Response::new(Full::new(Bytes::from("Not Found")));
*res.status_mut() = StatusCode::NOT_FOUND;
Ok(res)
// Pre-allocate static responses
const HELLO: &[u8] = b"Hello, World!";
const NOT_FOUND: &[u8] = b"Not Found";

const BASE_PATH: &str = "/";

const ECHO_PATH: &str = "/echo";

#[derive(Clone, Copy)]
struct EchoService {
hello_response: &'static [u8],
not_found_response: &'static [u8],
}

impl EchoService {
const fn new() -> Self {
Self {
hello_response: HELLO,
not_found_response: NOT_FOUND,
}
}
}

impl tower::Service<Request<Incoming>> for EchoService {
type Response = Response<Full<Bytes>>;
type Error = Infallible;
type Future = std::pin::Pin<
Box<dyn std::future::Future<Output = Result<Self::Response, Self::Error>> + Send>,
>;

#[inline]
fn poll_ready(
&mut self,
_: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), Self::Error>> {
std::task::Poll::Ready(Ok(()))
}

#[inline]
fn call(&mut self, req: Request<Incoming>) -> Self::Future {
let service = *self; // Copy the service since it's Copy
Box::pin(async move {
Ok(match (req.method(), req.uri().path()) {
(&hyper::Method::GET, BASE_PATH) => {
Response::new(Full::new(Bytes::from_static(service.hello_response)))
}
(&hyper::Method::POST, ECHO_PATH) => {
let body = req.collect().await.unwrap().to_bytes();
Response::new(Full::new(body))
}
_ => {
let mut res =
Response::new(Full::new(Bytes::from_static(service.not_found_response)));
*res.status_mut() = StatusCode::NOT_FOUND;
res
}
})
})
}
}

async fn setup_server(
) -> Result<(TcpListenerStream, SocketAddr), Box<dyn std::error::Error + Send + Sync>> {
let addr = SocketAddr::from(([127, 0, 0, 1], 0));
Expand Down Expand Up @@ -255,9 +301,49 @@ async fn start_server(
let tls_config = Arc::new(tls_config);
let (incoming, server_addr) = setup_server().await?;
let (shutdown_tx, shutdown_rx) = oneshot::channel();
let http_server_builder = HttpConnectionBuilder::new(TokioExecutor::new());
let mut http_server_builder = HttpConnectionBuilder::new(TokioExecutor::new());
http_server_builder
// HTTP/1 optimizations
.http1()
// Enable half-close for better connection handling
.half_close(true)
// Enable keep-alive to reduce overhead for multiple requests
.keep_alive(true)
// Increase max buffer size to 1MB for better performance with larger payloads
.max_buf_size(1024 * 1024)
// Enable immediate flushing of pipelined responses for lower latency
.pipeline_flush(true)
// Preserve original header case for compatibility
.preserve_header_case(true)
// Disable automatic title casing of headers to reduce processing overhead
.title_case_headers(false)
// HTTP/2 optimizations
.http2()
// Add the timer to the builder to avoid potential issues
.timer(TokioTimer::new())
// Increase initial stream window size to 4MB for better throughput
.initial_stream_window_size(Some(4 * 1024 * 1024))
// Increase initial connection window size to 8MB for improved performance
.initial_connection_window_size(Some(8 * 1024 * 1024))
// Enable adaptive window for dynamic flow control
.adaptive_window(true)
// Increase max frame size to 1MB for larger data chunks
.max_frame_size(Some(1024 * 1024))
// Allow up to 1024 concurrent streams for better parallelism without overwhelming the connection
.max_concurrent_streams(Some(1024))
// Increase max send buffer size to 4MB for improved write performance
.max_send_buf_size(4 * 1024 * 1024)
// Enable CONNECT protocol support for proxying and tunneling
.enable_connect_protocol()
// Increase max header list size to 64KB to handle larger headers
.max_header_list_size(64 * 1024)
// Set keep-alive interval to 30 seconds for more responsive connection management
.keep_alive_interval(Some(Duration::from_secs(30)))
// Set keep-alive timeout to 60 seconds to balance connection reuse and resource conservation
.keep_alive_timeout(Duration::from_secs(60));

let tower_service_fn = EchoService::new();

let tower_service_fn = tower::service_fn(echo);
let hyper_service = TowerToHyperService::new(tower_service_fn);
tokio::spawn(async move {
serve_http_with_shutdown(
Expand Down
99 changes: 79 additions & 20 deletions examples/tower_service_fn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,25 @@ use bytes::Bytes;
use http_body_util::Full;
use hyper::body::Incoming;
use hyper::{Request, Response};
use hyper_util::rt::TokioExecutor;
use hyper_util::rt::{TokioExecutor, TokioTimer};
use hyper_util::server::conn::auto::Builder as HttpConnectionBuilder;
use hyper_util::service::TowerToHyperService;
use rustls::ServerConfig;
use tokio::net::TcpListener;
use tokio_stream::wrappers::TcpListenerStream;
use tower::{Layer, ServiceBuilder};
use tracing::{debug, info, trace};
use tracing::{debug, info, trace, Level};

use postel::{load_certs, load_private_key, serve_http_with_shutdown};

// Define a simple service that responds with "Hello, World!"
async fn hello(_: Request<Incoming>) -> Result<Response<Full<Bytes>>, Infallible> {
Ok(Response::new(Full::new(Bytes::from("Hello, World!"))))
lazy_static::lazy_static! {
static ref HELLO: Bytes = Bytes::from("Hello, World!");
}

async fn hello(_: Request<Incoming>) -> Result<Response<Full<Bytes>>, Infallible> {
Ok(Response::new(Full::new(HELLO.clone()))) // Zero-copy clone
}
// Define a Custom middleware to add a header to all responses, for example
struct AddHeaderLayer;

Expand Down Expand Up @@ -69,8 +72,11 @@ where
}
}

#[tokio::main(flavor = "current_thread")]
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
// Initialize logging
tracing_subscriber::fmt().with_max_level(Level::INFO).init();

// 1. Set up the TCP listener
let addr = SocketAddr::from(([127, 0, 0, 1], 8443));

Expand All @@ -79,7 +85,46 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let incoming = TcpListenerStream::new(listener);

// 2. Create the HTTP connection builder
let builder = HttpConnectionBuilder::new(TokioExecutor::new());
let mut builder = HttpConnectionBuilder::new(TokioExecutor::new());
builder
// HTTP/1 optimizations
.http1()
// Enable half-close for better connection handling
.half_close(true)
// Enable keep-alive to reduce overhead for multiple requests
.keep_alive(true)
// Increase max buffer size to 1MB for better performance with larger payloads
.max_buf_size(1024 * 1024)
// Enable immediate flushing of pipelined responses for lower latency
.pipeline_flush(true)
// Preserve original header case for compatibility
.preserve_header_case(true)
// Disable automatic title casing of headers to reduce processing overhead
.title_case_headers(false)
// HTTP/2 optimizations
.http2()
// Add the timer to the builder to avoid potential issues
.timer(TokioTimer::new())
// Increase initial stream window size to 4MB for better throughput
.initial_stream_window_size(Some(4 * 1024 * 1024))
// Increase initial connection window size to 8MB for improved performance
.initial_connection_window_size(Some(8 * 1024 * 1024))
// Enable adaptive window for dynamic flow control
.adaptive_window(true)
// Increase max frame size to 1MB for larger data chunks
.max_frame_size(Some(1024 * 1024))
// Allow up to 1024 concurrent streams for better parallelism without overwhelming the connection
.max_concurrent_streams(Some(1024))
// Increase max send buffer size to 4MB for improved write performance
.max_send_buf_size(4 * 1024 * 1024)
// Enable CONNECT protocol support for proxying and tunneling
.enable_connect_protocol()
// Increase max header list size to 64KB to handle larger headers
.max_header_list_size(64 * 1024)
// Set keep-alive interval to 30 seconds for more responsive connection management
.keep_alive_interval(Some(Duration::from_secs(30)))
// Set keep-alive timeout to 60 seconds to balance connection reuse and resource conservation
.keep_alive_timeout(Duration::from_secs(60));

// 3. Set up the Tower service with middleware
let svc = tower::service_fn(hello);
Expand Down Expand Up @@ -107,7 +152,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let tls_config = Arc::new(config);

// 6. Set up graceful shutdown
let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>();
let (shutdown_tx, _shutdown_rx) = tokio::sync::oneshot::channel::<()>();

// Spawn a task to send the shutdown signal after 1 second
tokio::spawn(async move {
Expand All @@ -116,19 +161,33 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
debug!("Shutdown signal sent");
});

// 7. Start the server
info!("Starting HTTPS server...");
serve_http_with_shutdown(
svc,
incoming,
builder,
Some(tls_config),
Some(async {
shutdown_rx.await.ok();
info!("Shutdown signal received, starting graceful shutdown");
}),
)
.await?;
// 7. Create a shutdown signal
let (shutdown_tx, _shutdown_rx) = tokio::sync::oneshot::channel();

// 8. Start the server
let server = tokio::spawn(async move {
info!("Starting HTTPS server...");
serve_http_with_shutdown(
svc,
incoming,
builder,
Some(tls_config),
Some(async {
_shutdown_rx.await.ok();
info!("Shutdown signal received");
}),
)
.await
.expect("Server failed unexpectedly");
});

// Keep the main thread running until Ctrl+C
tokio::signal::ctrl_c().await?;
info!("Initiating graceful shutdown");
let _ = shutdown_tx.send(());

// Wait for server to shutdown
server.await?;

info!("Server has shut down");
// Et voilà!
Expand Down
Loading

0 comments on commit 0997bde

Please sign in to comment.