diff --git a/src/client/connect/http.rs b/src/client/connect/http.rs index c1cdf4e129..7a57d503af 100644 --- a/src/client/connect/http.rs +++ b/src/client/connect/http.rs @@ -277,7 +277,7 @@ where { async fn call_async(&mut self, dst: Uri) -> Result { trace!( - "Http::connect; scheme={:?}, host={:?}, port={:?}", + "Server::connect; scheme={:?}, host={:?}, port={:?}", dst.scheme(), dst.host(), dst.port(), diff --git a/src/common/exec.rs b/src/common/exec.rs index f4e80ead5a..f66e16f04d 100644 --- a/src/common/exec.rs +++ b/src/common/exec.rs @@ -5,7 +5,7 @@ use std::sync::Arc; use crate::body::{Body, HttpBody}; use crate::proto::h2::server::H2Stream; -use crate::server::conn::spawn_all::{NewSvcTask, Watcher}; +use crate::server::conn::spawn_all::SvcTask; use crate::service::HttpService; /// An executor of futures. @@ -18,8 +18,8 @@ pub trait H2Exec: Clone { fn execute_h2stream(&mut self, fut: H2Stream); } -pub trait NewSvcExec, E, W: Watcher>: Clone { - fn execute_new_svc(&mut self, fut: NewSvcTask); +pub trait SvcExec, E>: Clone { + fn execute_svc(&self, fut: SvcTask); } pub type BoxSendFuture = Pin + Send>>; @@ -74,13 +74,12 @@ where } } -impl NewSvcExec for Exec +impl SvcExec for Exec where - NewSvcTask: Future + Send + 'static, + SvcTask: Future + Send + 'static, S: HttpService, - W: Watcher, { - fn execute_new_svc(&mut self, fut: NewSvcTask) { + fn execute_svc(&self, fut: SvcTask) { self.execute(fut) } } @@ -98,14 +97,13 @@ where } } -impl NewSvcExec for E +impl SvcExec for E where - E: Executor> + Clone, - NewSvcTask: Future, + E: Executor> + Clone, + SvcTask: Future, S: HttpService, - W: Watcher, { - fn execute_new_svc(&mut self, fut: NewSvcTask) { + fn execute_svc(&self, fut: SvcTask) { self.execute(fut) } } diff --git a/src/error.rs b/src/error.rs index 2cffe3d9e4..47d258c6a8 100644 --- a/src/error.rs +++ b/src/error.rs @@ -37,7 +37,8 @@ pub(crate) enum Kind { /// Error creating a TcpListener. #[cfg(feature = "tcp")] Listen, - /// Error accepting on an Incoming stream. + #[cfg(feature = "tcp")] + /// Error while accepting a new connection. Accept, /// Error while reading a body from connection. Body, @@ -67,8 +68,6 @@ pub(crate) enum Parse { pub(crate) enum User { /// Error calling user's HttpBody::poll_data(). Body, - /// Error calling user's MakeService. - MakeService, /// Error from future of user's Service. Service, /// User tried to send a certain header in an unexpected context. @@ -212,6 +211,7 @@ impl Error { Error::new(Kind::Listen).with(cause) } + #[cfg(feature = "tcp")] pub(crate) fn new_accept>(cause: E) -> Error { Error::new(Kind::Accept).with(cause) } @@ -268,10 +268,6 @@ impl Error { Error::new_user(User::ManualUpgrade) } - pub(crate) fn new_user_make_service>(cause: E) -> Error { - Error::new_user(User::MakeService).with(cause) - } - pub(crate) fn new_user_service>(cause: E) -> Error { Error::new_user(User::Service).with(cause) } @@ -308,7 +304,8 @@ impl Error { Kind::Canceled => "operation was canceled", #[cfg(feature = "tcp")] Kind::Listen => "error creating server listener", - Kind::Accept => "error accepting connection", + #[cfg(feature = "tcp")] + Kind::Accept => "error accepting a new connection", Kind::Body => "error reading a body from connection", Kind::BodyWrite => "error writing a body to connection", Kind::BodyWriteAborted => "body write aborted", @@ -317,7 +314,6 @@ impl Error { Kind::Io => "connection error", Kind::User(User::Body) => "error from user's HttpBody stream", - Kind::User(User::MakeService) => "error from user's MakeService", Kind::User(User::Service) => "error from user's Service", Kind::User(User::UnexpectedHeader) => "user sent unexpected header", Kind::User(User::UnsupportedVersion) => "request has unsupported HTTP version", diff --git a/src/proto/h1/io.rs b/src/proto/h1/io.rs index ed1b731306..1d0250286d 100644 --- a/src/proto/h1/io.rs +++ b/src/proto/h1/io.rs @@ -18,7 +18,7 @@ pub const MINIMUM_MAX_BUFFER_SIZE: usize = INIT_BUFFER_SIZE; /// The default maximum read buffer size. If the buffer gets this big and /// a message is still not complete, a `TooLarge` error is triggered. -// Note: if this changes, update server::conn::Http::max_buf_size docs. +// Note: if this changes, update server::conn::Server::max_buf_size docs. pub(crate) const DEFAULT_MAX_BUFFER_SIZE: usize = 8192 + 4096 * 100; /// The maximum number of distinct `Buf`s to hold in a list before requiring diff --git a/src/server/accept.rs b/src/server/accept.rs deleted file mode 100644 index 4ec287129d..0000000000 --- a/src/server/accept.rs +++ /dev/null @@ -1,107 +0,0 @@ -//! The `Accept` trait and supporting types. -//! -//! This module contains: -//! -//! - The [`Accept`](Accept) trait used to asynchronously accept incoming -//! connections. -//! - Utilities like `poll_fn` to ease creating a custom `Accept`. - -#[cfg(feature = "stream")] -use futures_core::Stream; -#[cfg(feature = "stream")] -use pin_project::pin_project; - -use crate::common::{ - task::{self, Poll}, - Pin, -}; - -/// Asynchronously accept incoming connections. -pub trait Accept { - /// The connection type that can be accepted. - type Conn; - /// The error type that can occur when accepting a connection. - type Error; - - /// Poll to accept the next connection. - fn poll_accept( - self: Pin<&mut Self>, - cx: &mut task::Context<'_>, - ) -> Poll>>; -} - -/// Create an `Accept` with a polling function. -/// -/// # Example -/// -/// ``` -/// use std::task::Poll; -/// use hyper::server::{accept, Server}; -/// -/// # let mock_conn = (); -/// // If we created some mocked connection... -/// let mut conn = Some(mock_conn); -/// -/// // And accept just the mocked conn once... -/// let once = accept::poll_fn(move |cx| { -/// Poll::Ready(conn.take().map(Ok::<_, ()>)) -/// }); -/// -/// let builder = Server::builder(once); -/// ``` -pub fn poll_fn(func: F) -> impl Accept -where - F: FnMut(&mut task::Context<'_>) -> Poll>>, -{ - struct PollFn(F); - - // The closure `F` is never pinned - impl Unpin for PollFn {} - - impl Accept for PollFn - where - F: FnMut(&mut task::Context<'_>) -> Poll>>, - { - type Conn = IO; - type Error = E; - fn poll_accept( - self: Pin<&mut Self>, - cx: &mut task::Context<'_>, - ) -> Poll>> { - (self.get_mut().0)(cx) - } - } - - PollFn(func) -} - -/// Adapt a `Stream` of incoming connections into an `Accept`. -/// -/// # Optional -/// -/// This function requires enabling the `stream` feature in your -/// `Cargo.toml`. -#[cfg(feature = "stream")] -pub fn from_stream(stream: S) -> impl Accept -where - S: Stream>, -{ - #[pin_project] - struct FromStream(#[pin] S); - - impl Accept for FromStream - where - S: Stream>, - { - type Conn = IO; - type Error = E; - fn poll_accept( - self: Pin<&mut Self>, - cx: &mut task::Context<'_>, - ) -> Poll>> { - self.project().0.poll_next(cx) - } - } - - FromStream(stream) -} diff --git a/src/server/conn.rs b/src/server/conn.rs index 0fe7a22aba..13b6b9db46 100644 --- a/src/server/conn.rs +++ b/src/server/conn.rs @@ -26,7 +26,7 @@ //! loop { //! let (tcp_stream, _) = tcp_listener.accept().await?; //! tokio::task::spawn(async move { -//! if let Err(http_err) = Http::new() +//! if let Err(http_err) = Server::new() //! .http1_only(true) //! .keep_alive(true) //! .serve_connection(tcp_stream, service_fn(hello)) @@ -46,99 +46,24 @@ use std::error::Error as StdError; use std::fmt; use std::mem; -#[cfg(feature = "tcp")] -use std::net::SocketAddr; -#[cfg(feature = "runtime")] -use std::time::Duration; use bytes::Bytes; use pin_project::pin_project; use tokio::io::{AsyncRead, AsyncWrite}; -use super::Accept; use crate::body::{Body, HttpBody}; -use crate::common::exec::{Exec, H2Exec, NewSvcExec}; +use crate::common::exec::{Exec, H2Exec}; use crate::common::io::Rewind; use crate::common::{task, Future, Pin, Poll, Unpin}; use crate::error::{Kind, Parse}; use crate::proto; -use crate::service::{HttpService, MakeServiceRef}; +use crate::service::HttpService; use crate::upgrade::Upgraded; -use self::spawn_all::NewSvcTask; -pub(super) use self::spawn_all::NoopWatcher; -pub(super) use self::spawn_all::Watcher; pub(super) use self::upgrades::UpgradeableConnection; -#[cfg(feature = "tcp")] -pub use super::tcp::{AddrIncoming, AddrStream}; - -/// A lower-level configuration of the HTTP protocol. -/// -/// This structure is used to configure options for an HTTP server connection. -/// -/// If you don't have need to manage connections yourself, consider using the -/// higher-level [Server](super) API. -#[derive(Clone, Debug)] -pub struct Http { - exec: E, - h1_half_close: bool, - h1_keep_alive: bool, - h1_writev: Option, - h2_builder: proto::h2::server::Config, - mode: ConnectionMode, - max_buf_size: Option, - pipeline_flush: bool, -} - -/// The internal mode of HTTP protocol which indicates the behavior when a parse error occurs. -#[derive(Clone, Debug, PartialEq)] -enum ConnectionMode { - /// Always use HTTP/1 and do not upgrade when a parse error occurs. - H1Only, - /// Always use HTTP/2. - H2Only, - /// Use HTTP/1 and try to upgrade to h2 when a parse error occurs. - Fallback, -} - -/// A stream mapping incoming IOs to new services. -/// -/// Yields `Connecting`s that are futures that should be put on a reactor. -#[must_use = "streams do nothing unless polled"] -#[pin_project] -#[derive(Debug)] -pub(super) struct Serve { - #[pin] - incoming: I, - make_service: S, - protocol: Http, -} - -/// A future building a new `Service` to a `Connection`. -/// -/// Wraps the future returned from `MakeService` into one that returns -/// a `Connection`. -#[must_use = "futures do nothing unless polled"] -#[pin_project] -#[derive(Debug)] -pub struct Connecting { - #[pin] - future: F, - io: Option, - protocol: Http, -} - -#[must_use = "futures do nothing unless polled"] -#[pin_project] -#[derive(Debug)] -pub(super) struct SpawnAll { - // TODO: re-add `pub(super)` once rustdoc can handle this. - // - // See https://github.com/rust-lang/rust/issues/64705 - #[pin] - pub serve: Serve, -} +// #[cfg(feature = "tcp")] +// pub use super::tcp::{AddrIncoming, AddrStream}; /// A future binding a connection with a Service. /// @@ -150,7 +75,7 @@ where S: HttpService, { pub(super) conn: Option>, - fallback: Fallback, + pub(super) fallback: Fallback, } #[pin_project(project = ProtoServerProj)] @@ -172,7 +97,7 @@ where } #[derive(Clone, Debug)] -enum Fallback { +pub(super) enum Fallback { ToHttp2(proto::h2::server::Config, E), Http1Only, } @@ -210,340 +135,6 @@ pub struct Parts { _inner: (), } -// ===== impl Http ===== - -impl Http { - /// Creates a new instance of the HTTP protocol, ready to spawn a server or - /// start accepting connections. - pub fn new() -> Http { - Http { - exec: Exec::Default, - h1_half_close: false, - h1_keep_alive: true, - h1_writev: None, - h2_builder: Default::default(), - mode: ConnectionMode::Fallback, - max_buf_size: None, - pipeline_flush: false, - } - } -} - -impl Http { - /// Sets whether HTTP1 is required. - /// - /// Default is false - pub fn http1_only(&mut self, val: bool) -> &mut Self { - if val { - self.mode = ConnectionMode::H1Only; - } else { - self.mode = ConnectionMode::Fallback; - } - self - } - - /// Set whether HTTP/1 connections should support half-closures. - /// - /// Clients can chose to shutdown their write-side while waiting - /// for the server to respond. Setting this to `true` will - /// prevent closing the connection immediately if `read` - /// detects an EOF in the middle of a request. - /// - /// Default is `false`. - pub fn http1_half_close(&mut self, val: bool) -> &mut Self { - self.h1_half_close = val; - self - } - - /// Enables or disables HTTP/1 keep-alive. - /// - /// Default is true. - pub fn http1_keep_alive(&mut self, val: bool) -> &mut Self { - self.h1_keep_alive = val; - self - } - - // renamed due different semantics of http2 keep alive - #[doc(hidden)] - #[deprecated(note = "renamed to `http1_keep_alive`")] - pub fn keep_alive(&mut self, val: bool) -> &mut Self { - self.http1_keep_alive(val) - } - - /// Set whether HTTP/1 connections should try to use vectored writes, - /// or always flatten into a single buffer. - /// - /// Note that setting this to false may mean more copies of body data, - /// but may also improve performance when an IO transport doesn't - /// support vectored writes well, such as most TLS implementations. - /// - /// Setting this to true will force hyper to use queued strategy - /// which may eliminate unnecessary cloning on some TLS backends - /// - /// Default is `auto`. In this mode hyper will try to guess which - /// mode to use - #[inline] - pub fn http1_writev(&mut self, val: bool) -> &mut Self { - self.h1_writev = Some(val); - self - } - - /// Sets whether HTTP2 is required. - /// - /// Default is false - pub fn http2_only(&mut self, val: bool) -> &mut Self { - if val { - self.mode = ConnectionMode::H2Only; - } else { - self.mode = ConnectionMode::Fallback; - } - self - } - - /// Sets the [`SETTINGS_INITIAL_WINDOW_SIZE`][spec] option for HTTP2 - /// stream-level flow control. - /// - /// Passing `None` will do nothing. - /// - /// If not set, hyper will use a default. - /// - /// [spec]: https://http2.github.io/http2-spec/#SETTINGS_INITIAL_WINDOW_SIZE - pub fn http2_initial_stream_window_size(&mut self, sz: impl Into>) -> &mut Self { - if let Some(sz) = sz.into() { - self.h2_builder.adaptive_window = false; - self.h2_builder.initial_stream_window_size = sz; - } - self - } - - /// Sets the max connection-level flow control for HTTP2. - /// - /// Passing `None` will do nothing. - /// - /// If not set, hyper will use a default. - pub fn http2_initial_connection_window_size( - &mut self, - sz: impl Into>, - ) -> &mut Self { - if let Some(sz) = sz.into() { - self.h2_builder.adaptive_window = false; - self.h2_builder.initial_conn_window_size = sz; - } - self - } - - /// Sets whether to use an adaptive flow control. - /// - /// Enabling this will override the limits set in - /// `http2_initial_stream_window_size` and - /// `http2_initial_connection_window_size`. - pub fn http2_adaptive_window(&mut self, enabled: bool) -> &mut Self { - use proto::h2::SPEC_WINDOW_SIZE; - - self.h2_builder.adaptive_window = enabled; - if enabled { - self.h2_builder.initial_conn_window_size = SPEC_WINDOW_SIZE; - self.h2_builder.initial_stream_window_size = SPEC_WINDOW_SIZE; - } - self - } - - /// Sets the maximum frame size to use for HTTP2. - /// - /// Passing `None` will do nothing. - /// - /// If not set, hyper will use a default. - pub fn http2_max_frame_size(&mut self, sz: impl Into>) -> &mut Self { - if let Some(sz) = sz.into() { - self.h2_builder.max_frame_size = sz; - } - self - } - - /// Sets the [`SETTINGS_MAX_CONCURRENT_STREAMS`][spec] option for HTTP2 - /// connections. - /// - /// Default is no limit (`std::u32::MAX`). Passing `None` will do nothing. - /// - /// [spec]: https://http2.github.io/http2-spec/#SETTINGS_MAX_CONCURRENT_STREAMS - pub fn http2_max_concurrent_streams(&mut self, max: impl Into>) -> &mut Self { - self.h2_builder.max_concurrent_streams = max.into(); - self - } - - /// Sets an interval for HTTP2 Ping frames should be sent to keep a - /// connection alive. - /// - /// Pass `None` to disable HTTP2 keep-alive. - /// - /// Default is currently disabled. - /// - /// # Cargo Feature - /// - /// Requires the `runtime` cargo feature to be enabled. - #[cfg(feature = "runtime")] - pub fn http2_keep_alive_interval( - &mut self, - interval: impl Into>, - ) -> &mut Self { - self.h2_builder.keep_alive_interval = interval.into(); - self - } - - /// Sets a timeout for receiving an acknowledgement of the keep-alive ping. - /// - /// If the ping is not acknowledged within the timeout, the connection will - /// be closed. Does nothing if `http2_keep_alive_interval` is disabled. - /// - /// Default is 20 seconds. - /// - /// # Cargo Feature - /// - /// Requires the `runtime` cargo feature to be enabled. - #[cfg(feature = "runtime")] - pub fn http2_keep_alive_timeout(&mut self, timeout: Duration) -> &mut Self { - self.h2_builder.keep_alive_timeout = timeout; - self - } - - /// Set the maximum buffer size for the connection. - /// - /// Default is ~400kb. - /// - /// # Panics - /// - /// The minimum value allowed is 8192. This method panics if the passed `max` is less than the minimum. - pub fn max_buf_size(&mut self, max: usize) -> &mut Self { - assert!( - max >= proto::h1::MINIMUM_MAX_BUFFER_SIZE, - "the max_buf_size cannot be smaller than the minimum that h1 specifies." - ); - self.max_buf_size = Some(max); - self - } - - /// Aggregates flushes to better support pipelined responses. - /// - /// Experimental, may have bugs. - /// - /// Default is false. - pub fn pipeline_flush(&mut self, enabled: bool) -> &mut Self { - self.pipeline_flush = enabled; - self - } - - /// Set the executor used to spawn background tasks. - /// - /// Default uses implicit default (like `tokio::spawn`). - pub fn with_executor(self, exec: E2) -> Http { - Http { - exec, - h1_half_close: self.h1_half_close, - h1_keep_alive: self.h1_keep_alive, - h1_writev: self.h1_writev, - h2_builder: self.h2_builder, - mode: self.mode, - max_buf_size: self.max_buf_size, - pipeline_flush: self.pipeline_flush, - } - } - - /// Bind a connection together with a [`Service`](crate::service::Service). - /// - /// This returns a Future that must be polled in order for HTTP to be - /// driven on the connection. - /// - /// # Example - /// - /// ``` - /// # use hyper::{Body, Request, Response}; - /// # use hyper::service::Service; - /// # use hyper::server::conn::Http; - /// # use tokio::io::{AsyncRead, AsyncWrite}; - /// # async fn run(some_io: I, some_service: S) - /// # where - /// # I: AsyncRead + AsyncWrite + Unpin + Send + 'static, - /// # S: Service, Response=hyper::Response> + Send + 'static, - /// # S::Error: Into>, - /// # S::Future: Send, - /// # { - /// let http = Http::new(); - /// let conn = http.serve_connection(some_io, some_service); - /// - /// if let Err(e) = conn.await { - /// eprintln!("server connection error: {}", e); - /// } - /// # } - /// # fn main() {} - /// ``` - pub fn serve_connection(&self, io: I, service: S) -> Connection - where - S: HttpService, - S::Error: Into>, - Bd: HttpBody + 'static, - Bd::Error: Into>, - I: AsyncRead + AsyncWrite + Unpin, - E: H2Exec, - { - let proto = match self.mode { - ConnectionMode::H1Only | ConnectionMode::Fallback => { - let mut conn = proto::Conn::new(io); - if !self.h1_keep_alive { - conn.disable_keep_alive(); - } - if self.h1_half_close { - conn.set_allow_half_close(); - } - if let Some(writev) = self.h1_writev { - if writev { - conn.set_write_strategy_queue(); - } else { - conn.set_write_strategy_flatten(); - } - } - conn.set_flush_pipeline(self.pipeline_flush); - if let Some(max) = self.max_buf_size { - conn.set_max_buf_size(max); - } - let sd = proto::h1::dispatch::Server::new(service); - ProtoServer::H1(proto::h1::Dispatcher::new(sd, conn)) - } - ConnectionMode::H2Only => { - let rewind_io = Rewind::new(io); - let h2 = - proto::h2::Server::new(rewind_io, service, &self.h2_builder, self.exec.clone()); - ProtoServer::H2(h2) - } - }; - - Connection { - conn: Some(proto), - fallback: if self.mode == ConnectionMode::Fallback { - Fallback::ToHttp2(self.h2_builder.clone(), self.exec.clone()) - } else { - Fallback::Http1Only - }, - } - } - - pub(super) fn serve(&self, incoming: I, make_service: S) -> Serve - where - I: Accept, - IE: Into>, - IO: AsyncRead + AsyncWrite + Unpin, - S: MakeServiceRef, - S::Error: Into>, - Bd: HttpBody, - E: H2Exec<>::Future, Bd>, - { - Serve { - incoming, - make_service, - protocol: self.clone(), - } - } -} - // ===== impl Connection ===== impl Connection @@ -736,135 +327,6 @@ where f.debug_struct("Connection").finish() } } -// ===== impl Serve ===== - -impl Serve { - /// Get a reference to the incoming stream. - #[inline] - pub fn incoming_ref(&self) -> &I { - &self.incoming - } - - /* - /// Get a mutable reference to the incoming stream. - #[inline] - pub fn incoming_mut(&mut self) -> &mut I { - &mut self.incoming - } - */ - - /// Spawn all incoming connections onto the executor in `Http`. - pub(super) fn spawn_all(self) -> SpawnAll { - SpawnAll { serve: self } - } -} - -impl Serve -where - I: Accept, - IO: AsyncRead + AsyncWrite + Unpin, - IE: Into>, - S: MakeServiceRef, - B: HttpBody, - E: H2Exec<>::Future, B>, -{ - fn poll_next_( - self: Pin<&mut Self>, - cx: &mut task::Context<'_>, - ) -> Poll>>> { - let me = self.project(); - match ready!(me.make_service.poll_ready_ref(cx)) { - Ok(()) => (), - Err(e) => { - trace!("make_service closed"); - return Poll::Ready(Some(Err(crate::Error::new_user_make_service(e)))); - } - } - - if let Some(item) = ready!(me.incoming.poll_accept(cx)) { - let io = item.map_err(crate::Error::new_accept)?; - let new_fut = me.make_service.make_service_ref(&io); - Poll::Ready(Some(Ok(Connecting { - future: new_fut, - io: Some(io), - protocol: me.protocol.clone(), - }))) - } else { - Poll::Ready(None) - } - } -} - -// ===== impl Connecting ===== - -impl Future for Connecting -where - I: AsyncRead + AsyncWrite + Unpin, - F: Future>, - S: HttpService, - B: HttpBody + 'static, - B::Error: Into>, - E: H2Exec, -{ - type Output = Result, FE>; - - fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { - let me = self.project(); - let service = ready!(me.future.poll(cx))?; - let io = me.io.take().expect("polled after complete"); - Poll::Ready(Ok(me.protocol.serve_connection(io, service))) - } -} - -// ===== impl SpawnAll ===== - -#[cfg(feature = "tcp")] -impl SpawnAll { - pub(super) fn local_addr(&self) -> SocketAddr { - self.serve.incoming.local_addr() - } -} - -impl SpawnAll { - pub(super) fn incoming_ref(&self) -> &I { - self.serve.incoming_ref() - } -} - -impl SpawnAll -where - I: Accept, - IE: Into>, - IO: AsyncRead + AsyncWrite + Unpin + Send + 'static, - S: MakeServiceRef, - B: HttpBody, - E: H2Exec<>::Future, B>, -{ - pub(super) fn poll_watch( - self: Pin<&mut Self>, - cx: &mut task::Context<'_>, - watcher: &W, - ) -> Poll> - where - E: NewSvcExec, - W: Watcher, - { - let mut me = self.project(); - loop { - if let Some(connecting) = ready!(me.serve.as_mut().poll_next_(cx)?) { - let fut = NewSvcTask::new(connecting, watcher.clone()); - me.serve - .as_mut() - .project() - .protocol - .exec - .execute_new_svc(fut); - } else { - return Poll::Ready(Ok(())); - } - } - } -} // ===== impl ProtoServer ===== @@ -891,46 +353,14 @@ pub(crate) mod spawn_all { use std::error::Error as StdError; use tokio::io::{AsyncRead, AsyncWrite}; - use super::{Connecting, UpgradeableConnection}; + // use super::UpgradeableConnection; + use super::Connection; use crate::body::{Body, HttpBody}; use crate::common::exec::H2Exec; use crate::common::{task, Future, Pin, Poll, Unpin}; use crate::service::HttpService; use pin_project::pin_project; - // Used by `SpawnAll` to optionally watch a `Connection` future. - // - // The regular `hyper::Server` just uses a `NoopWatcher`, which does - // not need to watch anything, and so returns the `Connection` untouched. - // - // The `Server::with_graceful_shutdown` needs to keep track of all active - // connections, and signal that they start to shutdown when prompted, so - // it has a `GracefulWatcher` implementation to do that. - pub trait Watcher, E>: Clone { - type Future: Future>; - - fn watch(&self, conn: UpgradeableConnection) -> Self::Future; - } - - #[allow(missing_debug_implementations)] - #[derive(Copy, Clone)] - pub struct NoopWatcher; - - impl Watcher for NoopWatcher - where - I: AsyncRead + AsyncWrite + Unpin + Send + 'static, - S: HttpService, - E: H2Exec, - S::ResBody: 'static, - ::Error: Into>, - { - type Future = UpgradeableConnection; - - fn watch(&self, conn: UpgradeableConnection) -> Self::Future { - conn - } - } - // This is a `Future` spawned to an `Executor` inside // the `SpawnAll`. By being a nameable type, we can be generic over the // user's `Service::Future`, and thus an `Executor` can execute it. @@ -943,35 +373,24 @@ pub(crate) mod spawn_all { #[pin_project] #[allow(missing_debug_implementations)] - pub struct NewSvcTask, E, W: Watcher> { + pub struct SvcTask, E> { #[pin] - state: State, - } - - #[pin_project(project = StateProj)] - pub enum State, E, W: Watcher> { - Connecting(#[pin] Connecting, W), - Connected(#[pin] W::Future), + conn: Connection, } - impl, E, W: Watcher> NewSvcTask { - pub(super) fn new(connecting: Connecting, watcher: W) -> Self { - NewSvcTask { - state: State::Connecting(connecting, watcher), - } + impl, E> SvcTask { + pub(crate) fn new(conn: Connection) -> Self { + SvcTask { conn } } } - impl Future for NewSvcTask + impl Future for SvcTask where I: AsyncRead + AsyncWrite + Unpin + Send + 'static, - N: Future>, - NE: Into>, S: HttpService, B: HttpBody + 'static, B::Error: Into>, E: H2Exec, - W: Watcher, { type Output = (); @@ -980,35 +399,11 @@ pub(crate) mod spawn_all { // could be projected to the `Serve` executor, this could just be // an `async fn`, and much safer. Woe is me. - let mut me = self.project(); - loop { - let next = { - match me.state.as_mut().project() { - StateProj::Connecting(connecting, watcher) => { - let res = ready!(connecting.poll(cx)); - let conn = match res { - Ok(conn) => conn, - Err(err) => { - let err = crate::Error::new_user_make_service(err); - debug!("connecting error: {}", err); - return Poll::Ready(()); - } - }; - let connected = watcher.watch(conn.with_upgrades()); - State::Connected(connected) - } - StateProj::Connected(future) => { - return future.poll(cx).map(|res| { - if let Err(err) = res { - debug!("connection error: {}", err); - } - }); - } - } - }; - - me.state.set(next); - } + return self.project().conn.poll(cx).map(|res| { + if let Err(err) = res { + debug!("connection error: {}", err); + } + }); } } } diff --git a/src/server/mod.rs b/src/server/mod.rs index 63bf2988ad..fd0a47b0a0 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -51,198 +51,102 @@ //! # fn main() {} //! ``` -pub mod accept; -pub mod conn; -mod shutdown; +pub(crate) mod conn; #[cfg(feature = "tcp")] mod tcp; -use std::error::Error as StdError; -use std::fmt; +pub use conn::{Connection, Parts}; #[cfg(feature = "tcp")] -use std::net::{SocketAddr, TcpListener as StdTcpListener}; +pub use tcp::{AddrIncoming, AddrStream}; -#[cfg(feature = "tcp")] -use std::time::Duration; +use std::error::Error as StdError; -use pin_project::pin_project; use tokio::io::{AsyncRead, AsyncWrite}; -use self::accept::Accept; -use crate::body::{Body, HttpBody}; -use crate::common::exec::{Exec, H2Exec, NewSvcExec}; -use crate::common::{task, Future, Pin, Poll, Unpin}; -use crate::service::{HttpService, MakeServiceRef}; -// Renamed `Http` as `Http_` for now so that people upgrading don't see an -// error that `hyper::server::Http` is private... -use self::conn::{Http as Http_, NoopWatcher, SpawnAll}; -use self::shutdown::{Graceful, GracefulWatcher}; -#[cfg(feature = "tcp")] -use self::tcp::AddrIncoming; - -/// A listening HTTP server that accepts connections in both HTTP1 and HTTP2 by default. -/// -/// `Server` is a `Future` mapping a bound listener with a set of service -/// handlers. It is built using the [`Builder`](Builder), and the future -/// completes when the server has been shutdown. It should be run by an -/// `Executor`. -#[pin_project] -pub struct Server { - #[pin] - spawn_all: SpawnAll, -} - -/// A builder for a [`Server`](Server). -#[derive(Debug)] -pub struct Builder { - incoming: I, - protocol: Http_, -} - -// ===== impl Server ===== - -impl Server { - /// Starts a [`Builder`](Builder) with the provided incoming stream. - pub fn builder(incoming: I) -> Builder { - Builder { - incoming, - protocol: Http_::new(), - } - } -} +use crate::common::exec::{Exec, H2Exec}; +use crate::common::io::Rewind; +use crate::common::Future; +use crate::error::Error; +use crate::proto; +use crate::service::{util::ServiceFn, HttpService}; +use crate::{ + body::{Body, HttpBody}, + common::exec::SvcExec, +}; +use crate::{Request, Response}; #[cfg(feature = "tcp")] -impl Server { - /// Binds to the provided address, and returns a [`Builder`](Builder). - /// - /// # Panics - /// - /// This method will panic if binding to the address fails. For a method - /// to bind to an address and return a `Result`, see `Server::try_bind`. - pub fn bind(addr: &SocketAddr) -> Builder { - let incoming = AddrIncoming::new(addr).unwrap_or_else(|e| { - panic!("error binding to {}: {}", addr, e); - }); - Server::builder(incoming) - } +use std::net::SocketAddr; +#[cfg(feature = "runtime")] +use std::time::Duration; - /// Tries to bind to the provided address, and returns a [`Builder`](Builder). - pub fn try_bind(addr: &SocketAddr) -> crate::Result> { - AddrIncoming::new(addr).map(Server::builder) - } +use conn::{Fallback, ProtoServer}; - /// Create a new instance from a `std::net::TcpListener` instance. - pub fn from_tcp(listener: StdTcpListener) -> Result, crate::Error> { - AddrIncoming::from_std(listener).map(Server::builder) - } +/// A lower-level configuration of the HTTP protocol. +/// +/// This structure is used to configure options for an HTTP server connection. +/// +/// If you don't have need to manage connections yourself, consider using the +/// higher-level [Server](super) API. +#[derive(Clone, Debug)] +pub struct Server { + exec: E, + h1_half_close: bool, + h1_keep_alive: bool, + h1_writev: Option, + h2_builder: proto::h2::server::Config, + mode: ConnectionMode, + max_buf_size: Option, + pipeline_flush: bool, } -#[cfg(feature = "tcp")] -impl Server { - /// Returns the local address that this server is bound to. - pub fn local_addr(&self) -> SocketAddr { - self.spawn_all.local_addr() - } +/// The internal mode of HTTP protocol which indicates the behavior when a parse error occurs. +#[derive(Clone, Debug, PartialEq)] +enum ConnectionMode { + /// Always use HTTP/1 and do not upgrade when a parse error occurs. + H1Only, + /// Always use HTTP/2. + H2Only, + /// Use HTTP/1 and try to upgrade to h2 when a parse error occurs. + Fallback, } -impl Server -where - I: Accept, - IE: Into>, - IO: AsyncRead + AsyncWrite + Unpin + Send + 'static, - S: MakeServiceRef, - S::Error: Into>, - B: HttpBody + Send + Sync + 'static, - B::Error: Into>, - E: H2Exec<>::Future, B>, - E: NewSvcExec, -{ - /// Prepares a server to handle graceful shutdown when the provided future - /// completes. - /// - /// # Example - /// - /// ``` - /// # fn main() {} - /// # #[cfg(feature = "tcp")] - /// # async fn run() { - /// # use hyper::{Body, Response, Server, Error}; - /// # use hyper::service::{make_service_fn, service_fn}; - /// # let make_service = make_service_fn(|_| async { - /// # Ok::<_, Error>(service_fn(|_req| async { - /// # Ok::<_, Error>(Response::new(Body::from("Hello World"))) - /// # })) - /// # }); - /// // Make a server from the previous examples... - /// let server = Server::bind(&([127, 0, 0, 1], 3000).into()) - /// .serve(make_service); - /// - /// // Prepare some signal for when the server should start shutting down... - /// let (tx, rx) = tokio::sync::oneshot::channel::<()>(); - /// let graceful = server - /// .with_graceful_shutdown(async { - /// rx.await.ok(); - /// }); - /// - /// // Await the `server` receiving the signal... - /// if let Err(e) = graceful.await { - /// eprintln!("server error: {}", e); - /// } - /// - /// // And later, trigger the signal by calling `tx.send(())`. - /// let _ = tx.send(()); - /// # } - /// ``` - pub fn with_graceful_shutdown(self, signal: F) -> Graceful - where - F: Future, - { - Graceful::new(self.spawn_all, signal) - } +/// DOX +#[derive(Debug)] +pub struct Serve { + server: Server, + incoming: AddrIncoming, } -impl Future for Server -where - I: Accept, - IE: Into>, - IO: AsyncRead + AsyncWrite + Unpin + Send + 'static, - S: MakeServiceRef, - S::Error: Into>, - B: HttpBody + 'static, - B::Error: Into>, - E: H2Exec<>::Future, B>, - E: NewSvcExec, -{ - type Output = crate::Result<()>; - - fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { - self.project().spawn_all.poll_watch(cx, &NoopWatcher) - } -} +// ===== impl Server ===== -impl fmt::Debug for Server { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("Server") - .field("listener", &self.spawn_all.incoming_ref()) - .finish() +impl Server { + /// Creates a new instance of the HTTP protocol, ready to spawn a server or + /// start accepting connections. + pub fn new() -> Server { + Server { + exec: Exec::Default, + h1_half_close: false, + h1_keep_alive: true, + h1_writev: None, + h2_builder: Default::default(), + mode: ConnectionMode::Fallback, + max_buf_size: None, + pipeline_flush: false, + } } } -// ===== impl Builder ===== - -impl Builder { - /// Start a new builder, wrapping an incoming stream and low-level options. - /// - /// For a more convenient constructor, see [`Server::bind`](Server::bind). - pub fn new(incoming: I, protocol: Http_) -> Self { - Builder { incoming, protocol } - } - - /// Sets whether to use keep-alive for HTTP/1 connections. +impl Server { + /// Sets whether HTTP1 is required. /// - /// Default is `true`. - pub fn http1_keepalive(mut self, val: bool) -> Self { - self.protocol.http1_keep_alive(val); + /// Default is false + pub fn http1_only(&mut self, val: bool) -> &mut Self { + if val { + self.mode = ConnectionMode::H1Only; + } else { + self.mode = ConnectionMode::Fallback; + } self } @@ -254,35 +158,30 @@ impl Builder { /// detects an EOF in the middle of a request. /// /// Default is `false`. - pub fn http1_half_close(mut self, val: bool) -> Self { - self.protocol.http1_half_close(val); + pub fn http1_half_close(&mut self, val: bool) -> &mut Self { + self.h1_half_close = val; self } - /// Set the maximum buffer size. + /// Enables or disables HTTP/1 keep-alive. /// - /// Default is ~ 400kb. - pub fn http1_max_buf_size(mut self, val: usize) -> Self { - self.protocol.max_buf_size(val); + /// Default is true. + pub fn http1_keep_alive(&mut self, val: bool) -> &mut Self { + self.h1_keep_alive = val; self } - // Sets whether to bunch up HTTP/1 writes until the read buffer is empty. - // - // This isn't really desirable in most cases, only really being useful in - // silly pipeline benchmarks. + // renamed due different semantics of http2 keep alive #[doc(hidden)] - pub fn http1_pipeline_flush(mut self, val: bool) -> Self { - self.protocol.pipeline_flush(val); - self + #[deprecated(note = "renamed to `http1_keep_alive`")] + pub fn keep_alive(&mut self, val: bool) -> &mut Self { + self.http1_keep_alive(val) } /// Set whether HTTP/1 connections should try to use vectored writes, /// or always flatten into a single buffer. /// - /// # Note - /// - /// Setting this to `false` may mean more copies of body data, + /// Note that setting this to false may mean more copies of body data, /// but may also improve performance when an IO transport doesn't /// support vectored writes well, such as most TLS implementations. /// @@ -291,24 +190,21 @@ impl Builder { /// /// Default is `auto`. In this mode hyper will try to guess which /// mode to use - pub fn http1_writev(mut self, val: bool) -> Self { - self.protocol.http1_writev(val); + #[inline] + pub fn http1_writev(&mut self, val: bool) -> &mut Self { + self.h1_writev = Some(val); self } - /// Sets whether HTTP/1 is required. + /// Sets whether HTTP2 is required. /// - /// Default is `false`. - pub fn http1_only(mut self, val: bool) -> Self { - self.protocol.http1_only(val); - self - } - - /// Sets whether HTTP/2 is required. - /// - /// Default is `false`. - pub fn http2_only(mut self, val: bool) -> Self { - self.protocol.http2_only(val); + /// Default is false + pub fn http2_only(&mut self, val: bool) -> &mut Self { + if val { + self.mode = ConnectionMode::H2Only; + } else { + self.mode = ConnectionMode::Fallback; + } self } @@ -320,19 +216,27 @@ impl Builder { /// If not set, hyper will use a default. /// /// [spec]: https://http2.github.io/http2-spec/#SETTINGS_INITIAL_WINDOW_SIZE - pub fn http2_initial_stream_window_size(mut self, sz: impl Into>) -> Self { - self.protocol.http2_initial_stream_window_size(sz.into()); + pub fn http2_initial_stream_window_size(&mut self, sz: impl Into>) -> &mut Self { + if let Some(sz) = sz.into() { + self.h2_builder.adaptive_window = false; + self.h2_builder.initial_stream_window_size = sz; + } self } - /// Sets the max connection-level flow control for HTTP2 + /// Sets the max connection-level flow control for HTTP2. /// /// Passing `None` will do nothing. /// /// If not set, hyper will use a default. - pub fn http2_initial_connection_window_size(mut self, sz: impl Into>) -> Self { - self.protocol - .http2_initial_connection_window_size(sz.into()); + pub fn http2_initial_connection_window_size( + &mut self, + sz: impl Into>, + ) -> &mut Self { + if let Some(sz) = sz.into() { + self.h2_builder.adaptive_window = false; + self.h2_builder.initial_conn_window_size = sz; + } self } @@ -341,8 +245,14 @@ impl Builder { /// Enabling this will override the limits set in /// `http2_initial_stream_window_size` and /// `http2_initial_connection_window_size`. - pub fn http2_adaptive_window(mut self, enabled: bool) -> Self { - self.protocol.http2_adaptive_window(enabled); + pub fn http2_adaptive_window(&mut self, enabled: bool) -> &mut Self { + use proto::h2::SPEC_WINDOW_SIZE; + + self.h2_builder.adaptive_window = enabled; + if enabled { + self.h2_builder.initial_conn_window_size = SPEC_WINDOW_SIZE; + self.h2_builder.initial_stream_window_size = SPEC_WINDOW_SIZE; + } self } @@ -351,8 +261,10 @@ impl Builder { /// Passing `None` will do nothing. /// /// If not set, hyper will use a default. - pub fn http2_max_frame_size(mut self, sz: impl Into>) -> Self { - self.protocol.http2_max_frame_size(sz); + pub fn http2_max_frame_size(&mut self, sz: impl Into>) -> &mut Self { + if let Some(sz) = sz.into() { + self.h2_builder.max_frame_size = sz; + } self } @@ -362,8 +274,8 @@ impl Builder { /// Default is no limit (`std::u32::MAX`). Passing `None` will do nothing. /// /// [spec]: https://http2.github.io/http2-spec/#SETTINGS_MAX_CONCURRENT_STREAMS - pub fn http2_max_concurrent_streams(mut self, max: impl Into>) -> Self { - self.protocol.http2_max_concurrent_streams(max.into()); + pub fn http2_max_concurrent_streams(&mut self, max: impl Into>) -> &mut Self { + self.h2_builder.max_concurrent_streams = max.into(); self } @@ -378,8 +290,11 @@ impl Builder { /// /// Requires the `runtime` cargo feature to be enabled. #[cfg(feature = "runtime")] - pub fn http2_keep_alive_interval(mut self, interval: impl Into>) -> Self { - self.protocol.http2_keep_alive_interval(interval); + pub fn http2_keep_alive_interval( + &mut self, + interval: impl Into>, + ) -> &mut Self { + self.h2_builder.keep_alive_interval = interval.into(); self } @@ -394,104 +309,185 @@ impl Builder { /// /// Requires the `runtime` cargo feature to be enabled. #[cfg(feature = "runtime")] - pub fn http2_keep_alive_timeout(mut self, timeout: Duration) -> Self { - self.protocol.http2_keep_alive_timeout(timeout); + pub fn http2_keep_alive_timeout(&mut self, timeout: Duration) -> &mut Self { + self.h2_builder.keep_alive_timeout = timeout; + self + } + + /// Set the maximum buffer size for the connection. + /// + /// Default is ~400kb. + /// + /// # Panics + /// + /// The minimum value allowed is 8192. This method panics if the passed `max` is less than the minimum. + pub fn max_buf_size(&mut self, max: usize) -> &mut Self { + assert!( + max >= proto::h1::MINIMUM_MAX_BUFFER_SIZE, + "the max_buf_size cannot be smaller than the minimum that h1 specifies." + ); + self.max_buf_size = Some(max); self } - /// Sets the `Executor` to deal with connection tasks. + /// Aggregates flushes to better support pipelined responses. /// - /// Default is `tokio::spawn`. - pub fn executor(self, executor: E2) -> Builder { - Builder { - incoming: self.incoming, - protocol: self.protocol.with_executor(executor), + /// Experimental, may have bugs. + /// + /// Default is false. + pub fn pipeline_flush(&mut self, enabled: bool) -> &mut Self { + self.pipeline_flush = enabled; + self + } + + /// Set the executor used to spawn background tasks. + /// + /// Default uses implicit default (like `tokio::spawn`). + pub fn with_executor(self, exec: E2) -> Server { + Server { + exec, + h1_half_close: self.h1_half_close, + h1_keep_alive: self.h1_keep_alive, + h1_writev: self.h1_writev, + h2_builder: self.h2_builder, + mode: self.mode, + max_buf_size: self.max_buf_size, + pipeline_flush: self.pipeline_flush, } } - /// Consume this `Builder`, creating a [`Server`](Server). + /// DOX + #[cfg(feature = "tcp")] + pub fn bind(&self, addr: &SocketAddr) -> Result, Error> + where + E: Clone, + { + let incoming = AddrIncoming::bind(addr).map_err(Error::new_listen)?; + let server = self.clone(); + + Ok(Serve { server, incoming }) + } + + /// Bind a connection together with a [`Service`](crate::service::Service). + /// + /// This returns a Future that must be polled in order for HTTP to be + /// driven on the connection. /// /// # Example /// /// ``` - /// # #[cfg(feature = "tcp")] - /// # async fn run() { - /// use hyper::{Body, Error, Response, Server}; - /// use hyper::service::{make_service_fn, service_fn}; - /// - /// // Construct our SocketAddr to listen on... - /// let addr = ([127, 0, 0, 1], 3000).into(); - /// - /// // And a MakeService to handle each connection... - /// let make_svc = make_service_fn(|_| async { - /// Ok::<_, Error>(service_fn(|_req| async { - /// Ok::<_, Error>(Response::new(Body::from("Hello World"))) - /// })) - /// }); - /// - /// // Then bind and serve... - /// let server = Server::bind(&addr) - /// .serve(make_svc); - /// - /// // Run forever-ish... - /// if let Err(err) = server.await { - /// eprintln!("server error: {}", err); + /// # use hyper::{Body, Request, Response}; + /// # use hyper::service::Service; + /// # use hyper::server::conn::Http; + /// # use tokio::io::{AsyncRead, AsyncWrite}; + /// # async fn run(some_io: I, some_service: S) + /// # where + /// # I: AsyncRead + AsyncWrite + Unpin + Send + 'static, + /// # S: Service, Response=hyper::Response> + Send + 'static, + /// # S::Error: Into>, + /// # S::Future: Send, + /// # { + /// let http = Server::new(); + /// let conn = http.serve_connection(some_io, some_service); + /// + /// if let Err(e) = conn.await { + /// eprintln!("server connection error: {}", e); /// } /// # } + /// # fn main() {} /// ``` - pub fn serve(self, new_service: S) -> Server + pub fn serve_connection(&self, io: I, service: S) -> Connection where - I: Accept, - I::Error: Into>, - I::Conn: AsyncRead + AsyncWrite + Unpin + Send + 'static, - S: MakeServiceRef, + S: HttpService, S::Error: Into>, - B: HttpBody + 'static, - B::Error: Into>, - E: NewSvcExec, - E: H2Exec<>::Future, B>, + Bd: HttpBody + 'static, + Bd::Error: Into>, + I: AsyncRead + AsyncWrite + Unpin, + E: H2Exec, { - let serve = self.protocol.serve(self.incoming, new_service); - let spawn_all = serve.spawn_all(); - Server { spawn_all } + let proto = match self.mode { + ConnectionMode::H1Only | ConnectionMode::Fallback => { + let mut conn = proto::Conn::new(io); + if !self.h1_keep_alive { + conn.disable_keep_alive(); + } + if self.h1_half_close { + conn.set_allow_half_close(); + } + if let Some(writev) = self.h1_writev { + if writev { + conn.set_write_strategy_queue(); + } else { + conn.set_write_strategy_flatten(); + } + } + conn.set_flush_pipeline(self.pipeline_flush); + if let Some(max) = self.max_buf_size { + conn.set_max_buf_size(max); + } + let sd = proto::h1::dispatch::Server::new(service); + ProtoServer::H1(proto::h1::Dispatcher::new(sd, conn)) + } + ConnectionMode::H2Only => { + let rewind_io = Rewind::new(io); + let h2 = + proto::h2::Server::new(rewind_io, service, &self.h2_builder, self.exec.clone()); + ProtoServer::H2(h2) + } + }; + + Connection { + conn: Some(proto), + fallback: if self.mode == ConnectionMode::Fallback { + Fallback::ToHttp2(self.h2_builder.clone(), self.exec.clone()) + } else { + Fallback::Http1Only + }, + } } } -#[cfg(feature = "tcp")] -impl Builder { - /// Set whether TCP keepalive messages are enabled on accepted connections. - /// - /// If `None` is specified, keepalive is disabled, otherwise the duration - /// specified will be the time to remain idle before sending TCP keepalive - /// probes. - pub fn tcp_keepalive(mut self, keepalive: Option) -> Self { - self.incoming.set_keepalive(keepalive); - self +// ===== impl Serve ===== + +impl Serve { + /// DOX + pub fn local_addr(&self) -> SocketAddr { + self.incoming.local_addr() } - /// Set the value of `TCP_NODELAY` option for accepted connections. - pub fn tcp_nodelay(mut self, enabled: bool) -> Self { - self.incoming.set_nodelay(enabled); - self + /// DOX! + #[cfg(all(feature = "tcp", feature = "runtime"))] + pub async fn serve_fn(&mut self, f: F) -> Result<(), Error> + where + F: FnMut(Request) -> R + Clone, + R: Future, Er>>, + Er: Into>, + Bd: HttpBody + 'static, + Bd::Error: Into>, + E: SvcExec, E>, + E: H2Exec, + { + let svc = crate::service::service_fn(f); + self.serve(svc).await } - /// Set whether to sleep on accept errors. - /// - /// A possible scenario is that the process has hit the max open files - /// allowed, and so trying to accept a new connection will fail with - /// EMFILE. In some cases, it's preferable to just wait for some time, if - /// the application will likely close some files (or connections), and try - /// to accept the connection again. If this option is true, the error will - /// be logged at the error level, since it is still a big deal, and then - /// the listener will sleep for 1 second. - /// - /// In other cases, hitting the max open files should be treat similarly - /// to being out-of-memory, and simply error (and shutdown). Setting this - /// option to false will allow that. - /// - /// For more details see [`AddrIncoming::set_sleep_on_errors`] - pub fn tcp_sleep_on_accept_errors(mut self, val: bool) -> Self { - self.incoming.set_sleep_on_errors(val); - self + /// DOX + #[cfg(all(feature = "tcp", feature = "runtime"))] + pub async fn serve(&mut self, service: S) -> Result<(), Error> + where + S: HttpService + Clone, + S::Error: Into>, + Bd: HttpBody + 'static, + Bd::Error: Into>, + E: SvcExec, + E: H2Exec, + { + loop { + let conn = self.incoming.accept().await.map_err(Error::new_accept)?; + + let fut = self.server.serve_connection(conn, service.clone()); + let svc_task = conn::spawn_all::SvcTask::new(fut); + self.server.exec.execute_svc(svc_task); + } } } diff --git a/src/server/shutdown.rs b/src/server/shutdown.rs deleted file mode 100644 index 543859379a..0000000000 --- a/src/server/shutdown.rs +++ /dev/null @@ -1,121 +0,0 @@ -use std::error::Error as StdError; - -use pin_project::pin_project; -use tokio::io::{AsyncRead, AsyncWrite}; - -use super::conn::{SpawnAll, UpgradeableConnection, Watcher}; -use super::Accept; -use crate::body::{Body, HttpBody}; -use crate::common::drain::{self, Draining, Signal, Watch, Watching}; -use crate::common::exec::{H2Exec, NewSvcExec}; -use crate::common::{task, Future, Pin, Poll, Unpin}; -use crate::service::{HttpService, MakeServiceRef}; - -#[allow(missing_debug_implementations)] -#[pin_project] -pub struct Graceful { - #[pin] - state: State, -} - -#[pin_project(project = StateProj)] -pub(super) enum State { - Running { - drain: Option<(Signal, Watch)>, - #[pin] - spawn_all: SpawnAll, - #[pin] - signal: F, - }, - Draining(Draining), -} - -impl Graceful { - pub(super) fn new(spawn_all: SpawnAll, signal: F) -> Self { - let drain = Some(drain::channel()); - Graceful { - state: State::Running { - drain, - spawn_all, - signal, - }, - } - } -} - -impl Future for Graceful -where - I: Accept, - IE: Into>, - IO: AsyncRead + AsyncWrite + Unpin + Send + 'static, - S: MakeServiceRef, - S::Error: Into>, - B: HttpBody + Send + Sync + 'static, - B::Error: Into>, - F: Future, - E: H2Exec<>::Future, B>, - E: NewSvcExec, -{ - type Output = crate::Result<()>; - - fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { - let mut me = self.project(); - loop { - let next = { - match me.state.as_mut().project() { - StateProj::Running { - drain, - spawn_all, - signal, - } => match signal.poll(cx) { - Poll::Ready(()) => { - debug!("signal received, starting graceful shutdown"); - let sig = drain.take().expect("drain channel").0; - State::Draining(sig.drain()) - } - Poll::Pending => { - let watch = drain.as_ref().expect("drain channel").1.clone(); - return spawn_all.poll_watch(cx, &GracefulWatcher(watch)); - } - }, - StateProj::Draining(ref mut draining) => { - return Pin::new(draining).poll(cx).map(Ok); - } - } - }; - me.state.set(next); - } - } -} - -#[allow(missing_debug_implementations)] -#[derive(Clone)] -pub struct GracefulWatcher(Watch); - -impl Watcher for GracefulWatcher -where - I: AsyncRead + AsyncWrite + Unpin + Send + 'static, - S: HttpService, - E: H2Exec, - S::ResBody: Send + Sync + 'static, - ::Error: Into>, -{ - type Future = - Watching, fn(Pin<&mut UpgradeableConnection>)>; - - fn watch(&self, conn: UpgradeableConnection) -> Self::Future { - self.0.clone().watch(conn, on_drain) - } -} - -fn on_drain(conn: Pin<&mut UpgradeableConnection>) -where - S: HttpService, - S::Error: Into>, - I: AsyncRead + AsyncWrite + Unpin, - S::ResBody: HttpBody + Send + 'static, - ::Error: Into>, - E: H2Exec, -{ - conn.graceful_shutdown() -} diff --git a/src/server/tcp.rs b/src/server/tcp.rs index e526303429..48c8558e8f 100644 --- a/src/server/tcp.rs +++ b/src/server/tcp.rs @@ -6,11 +6,13 @@ use std::time::Duration; use tokio::net::TcpListener; use tokio::time::Delay; +use futures_util::future::poll_fn; + use crate::common::{task, Future, Pin, Poll}; pub use self::addr_stream::AddrStream; -use super::Accept; +// TODO(lucio): impl `Stream` on this. /// A stream of connections from binding to an address. #[must_use = "streams do nothing unless polled"] pub struct AddrIncoming { @@ -29,7 +31,8 @@ impl AddrIncoming { AddrIncoming::from_std(std_listener) } - pub(super) fn from_std(std_listener: StdTcpListener) -> crate::Result { + /// Create an `AddrIncoming` from a `std::net::TcpListener`. + pub fn from_std(std_listener: StdTcpListener) -> crate::Result { let listener = TcpListener::from_std(std_listener).map_err(crate::Error::new_listen)?; let addr = listener.local_addr().map_err(crate::Error::new_listen)?; Ok(AddrIncoming { @@ -87,6 +90,35 @@ impl AddrIncoming { self.sleep_on_errors = val; } + /// Accepts a new incoming connection from this listener. + /// + /// This function will yield once a new TCP connection is established. When + /// established, the corresponding [`AddrStream`] and the remote peer's + /// address will be returned. + /// + /// [`AddrStream`]: struct@crate::server::AddrStream + /// + /// # Examples + /// + /// ```no_run + /// use hyper::server::AddrIncoming; + /// + /// use std::io; + /// + /// #async fn main() -> io::Result<()> { + /// let listener = TcpListener::bind("127.0.0.1:8080").await?; + /// + /// match listener.accept().await { + /// Ok((_socket, addr)) => println!("new client: {:?}", addr), + /// Err(e) => println!("couldn't get client: {:?}", e), + /// } + /// # Ok(()) + /// #} + /// ``` + pub async fn accept(&mut self) -> io::Result { + poll_fn(|cx| self.poll_next_(cx)).await + } + fn poll_next_(&mut self, cx: &mut task::Context<'_>) -> Poll> { // Check if a previous timeout is active that was set by IO errors. if let Some(ref mut to) = self.timeout { @@ -140,19 +172,6 @@ impl AddrIncoming { } } -impl Accept for AddrIncoming { - type Conn = AddrStream; - type Error = io::Error; - - fn poll_accept( - mut self: Pin<&mut Self>, - cx: &mut task::Context<'_>, - ) -> Poll>> { - let result = ready!(self.poll_next_(cx)); - Poll::Ready(Some(result)) - } -} - /// This function defines errors that are per-connection. Which basically /// means that if we get this error from `accept()` system call it means /// next connection might be ready to be accepted. diff --git a/src/service/make.rs b/src/service/make.rs index 074d66f1b9..53c36cd833 100644 --- a/src/service/make.rs +++ b/src/service/make.rs @@ -1,10 +1,6 @@ -use std::error::Error as StdError; -use std::fmt; - use tokio::io::{AsyncRead, AsyncWrite}; -use super::{HttpService, Service}; -use crate::body::HttpBody; +use super::Service; use crate::common::{task, Future, Poll}; // The same "trait alias" as tower::MakeConnection, but inlined to reduce @@ -38,149 +34,8 @@ where } } -// Just a sort-of "trait alias" of `MakeService`, not to be implemented -// by anyone, only used as bounds. -pub trait MakeServiceRef: self::sealed::Sealed<(Target, ReqBody)> { - type ResBody: HttpBody; - type Error: Into>; - type Service: HttpService; - type MakeError: Into>; - type Future: Future>; - - // Acting like a #[non_exhaustive] for associated types of this trait. - // - // Basically, no one outside of hyper should be able to set this type - // or declare bounds on it, so it should prevent people from creating - // trait objects or otherwise writing code that requires using *all* - // of the associated types. - // - // Why? So we can add new associated types to this alias in the future, - // if necessary. - type __DontNameMe: self::sealed::CantImpl; - - fn poll_ready_ref(&mut self, cx: &mut task::Context<'_>) -> Poll>; - - fn make_service_ref(&mut self, target: &Target) -> Self::Future; -} - -impl MakeServiceRef for T -where - T: for<'a> Service<&'a Target, Error = ME, Response = S, Future = F>, - E: Into>, - ME: Into>, - S: HttpService, - F: Future>, - IB: HttpBody, - OB: HttpBody, -{ - type Error = E; - type Service = S; - type ResBody = OB; - type MakeError = ME; - type Future = F; - - type __DontNameMe = self::sealed::CantName; - - fn poll_ready_ref(&mut self, cx: &mut task::Context<'_>) -> Poll> { - self.poll_ready(cx) - } - - fn make_service_ref(&mut self, target: &Target) -> Self::Future { - self.call(target) - } -} - -impl self::sealed::Sealed<(Target, B1)> for T -where - T: for<'a> Service<&'a Target, Response = S>, - S: HttpService, - B1: HttpBody, - B2: HttpBody, -{ -} - -/// Create a `MakeService` from a function. -/// -/// # Example -/// -/// ``` -/// # #[cfg(feature = "runtime")] -/// # async fn run() { -/// use std::convert::Infallible; -/// use hyper::{Body, Request, Response, Server}; -/// use hyper::server::conn::AddrStream; -/// use hyper::service::{make_service_fn, service_fn}; -/// -/// let addr = ([127, 0, 0, 1], 3000).into(); -/// -/// let make_svc = make_service_fn(|socket: &AddrStream| { -/// let remote_addr = socket.remote_addr(); -/// async move { -/// Ok::<_, Infallible>(service_fn(move |_: Request| async move { -/// Ok::<_, Infallible>( -/// Response::new(Body::from(format!("Hello, {}!", remote_addr))) -/// ) -/// })) -/// } -/// }); -/// -/// // Then bind and serve... -/// let server = Server::bind(&addr) -/// .serve(make_svc); -/// -/// // Finally, spawn `server` onto an Executor... -/// if let Err(e) = server.await { -/// eprintln!("server error: {}", e); -/// } -/// # } -/// # fn main() {} -/// ``` -pub fn make_service_fn(f: F) -> MakeServiceFn -where - F: FnMut(&Target) -> Ret, - Ret: Future, -{ - MakeServiceFn { f } -} - -/// `MakeService` returned from [`make_service_fn`] -#[derive(Clone, Copy)] -pub struct MakeServiceFn { - f: F, -} - -impl<'t, F, Ret, Target, Svc, MkErr> Service<&'t Target> for MakeServiceFn -where - F: FnMut(&Target) -> Ret, - Ret: Future>, - MkErr: Into>, -{ - type Error = MkErr; - type Response = Svc; - type Future = Ret; - - fn poll_ready(&mut self, _cx: &mut task::Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } - - fn call(&mut self, target: &'t Target) -> Self::Future { - (self.f)(target) - } -} - -impl fmt::Debug for MakeServiceFn { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("MakeServiceFn").finish() - } -} - mod sealed { pub trait Sealed {} pub trait CantImpl {} - - #[allow(missing_debug_implementations)] - pub enum CantName {} - - impl CantImpl for CantName {} } diff --git a/src/service/mod.rs b/src/service/mod.rs index 6310a10fa3..222da8eedf 100644 --- a/src/service/mod.rs +++ b/src/service/mod.rs @@ -40,11 +40,10 @@ pub use tower_service::Service; mod http; mod make; mod oneshot; -mod util; +pub(crate) mod util; pub(crate) use self::http::HttpService; -pub(crate) use self::make::{MakeConnection, MakeServiceRef}; +pub(crate) use self::make::MakeConnection; pub(crate) use self::oneshot::{oneshot, Oneshot}; -pub use self::make::make_service_fn; pub use self::util::service_fn; diff --git a/tests/client.rs b/tests/client.rs index 576423768f..a8160115a5 100644 --- a/tests/client.rs +++ b/tests/client.rs @@ -1808,7 +1808,7 @@ mod dispatch_impl { #[test] fn alpn_h2() { - use hyper::server::conn::Http; + use hyper::server::Server; use hyper::service::service_fn; use hyper::Response; use tokio::net::TcpListener; @@ -1827,7 +1827,7 @@ mod dispatch_impl { rt.spawn(async move { let (socket, _addr) = listener.accept().await.expect("accept"); - Http::new() + Server::new() .http2_only(true) .serve_connection( socket, @@ -2472,29 +2472,38 @@ mod conn { } #[tokio::test] + #[ignore] async fn http2_detect_conn_eof() { use futures_util::future; - use hyper::service::{make_service_fn, service_fn}; use hyper::{Response, Server}; let _ = pretty_env_logger::try_init(); - let server = Server::bind(&([127, 0, 0, 1], 0).into()) + let mut serve = Server::new() .http2_only(true) - .serve(make_service_fn(|_| async move { - Ok::<_, hyper::Error>(service_fn(|_req| { - future::ok::<_, hyper::Error>(Response::new(Body::empty())) - })) - })); - let addr = server.local_addr(); + .bind(&([127, 0, 0, 1], 0).into()) + .expect("bind error"); + + let addr = serve.local_addr(); + let (shdn_tx, shdn_rx) = oneshot::channel(); tokio::task::spawn(async move { - server - .with_graceful_shutdown(async move { - let _ = shdn_rx.await; - }) - .await - .expect("server") + let srv = + serve.serve_fn(|_req| future::ok::<_, hyper::Error>(Response::new(Body::empty()))); + + tokio::select! { + res = srv => { + res.expect("server"); + }, + _ = shdn_rx => {}, + } + // TODO(lucio): Add graceful shutdown back + // serve + // .with_graceful_shutdown(async move { + // let _ = shdn_rx.await; + // }) + // .await + // .expect("server") }); let io = tcp_connect(&addr).await.expect("tcp connect"); @@ -2675,7 +2684,7 @@ mod conn { // Spawn an HTTP2 server that reads the whole body and responds tokio::spawn(async move { let sock = listener.accept().await.unwrap().0; - hyper::server::conn::Http::new() + hyper::server::Server::new() .http2_only(true) .serve_connection( sock, diff --git a/tests/server.rs b/tests/server.rs index 19b4cac25b..97790c2390 100644 --- a/tests/server.rs +++ b/tests/server.rs @@ -24,9 +24,8 @@ use tokio::runtime::Runtime; use hyper::body::HttpBody as _; use hyper::client::Client; -use hyper::server::conn::Http; use hyper::server::Server; -use hyper::service::{make_service_fn, service_fn}; +use hyper::service::service_fn; use hyper::{Body, Request, Response, StatusCode, Version}; #[test] @@ -815,7 +814,7 @@ async fn expect_continue_waits_for_body_poll() { let (socket, _) = listener.accept().await.expect("accept"); - Http::new() + Server::new() .serve_connection( socket, service_fn(|req| { @@ -973,7 +972,7 @@ async fn disable_keep_alive_mid_request() { }); let (socket, _) = listener.accept().await.unwrap(); - let srv = Http::new().serve_connection(socket, HelloWorld); + let srv = Server::new().serve_connection(socket, HelloWorld); future::try_select(srv, rx1) .then(|r| match r { Ok(Either::Left(_)) => panic!("expected rx first"), @@ -1026,7 +1025,7 @@ async fn disable_keep_alive_post_request() { stream: socket, _debug: dropped2, }; - let server = Http::new().serve_connection(transport, HelloWorld); + let server = Server::new().serve_connection(transport, HelloWorld); let fut = future::try_select(server, rx1).then(|r| match r { Ok(Either::Left(_)) => panic!("expected rx first"), Ok(Either::Right(((), mut conn))) => { @@ -1054,7 +1053,7 @@ async fn empty_parse_eof_does_not_return_error() { }); let (socket, _) = listener.accept().await.unwrap(); - Http::new() + Server::new() .serve_connection(socket, HelloWorld) .await .expect("empty parse eof is ok"); @@ -1071,7 +1070,7 @@ async fn nonempty_parse_eof_returns_error() { }); let (socket, _) = listener.accept().await.unwrap(); - Http::new() + Server::new() .serve_connection(socket, HelloWorld) .await .expect_err("partial parse eof is error"); @@ -1095,7 +1094,7 @@ async fn http1_allow_half_close() { }); let (socket, _) = listener.accept().await.unwrap(); - Http::new() + Server::new() .http1_half_close(true) .serve_connection( socket, @@ -1122,7 +1121,7 @@ async fn disconnect_after_reading_request_before_responding() { }); let (socket, _) = listener.accept().await.unwrap(); - Http::new() + Server::new() .http1_half_close(false) .serve_connection( socket, @@ -1154,7 +1153,7 @@ async fn returning_1xx_response_is_error() { }); let (socket, _) = listener.accept().await.unwrap(); - Http::new() + Server::new() .serve_connection( socket, service_fn(|_| async move { @@ -1222,7 +1221,7 @@ async fn upgrades() { }); let (socket, _) = listener.accept().await.unwrap(); - let conn = Http::new().serve_connection( + let conn = Server::new().serve_connection( socket, service_fn(|_| { let res = Response::builder() @@ -1279,7 +1278,7 @@ async fn http_connect() { }); let (socket, _) = listener.accept().await.unwrap(); - let conn = Http::new().serve_connection( + let conn = Server::new().serve_connection( socket, service_fn(|_| { let res = Response::builder() @@ -1350,7 +1349,7 @@ async fn upgrades_new() { }); let (socket, _) = listener.accept().await.unwrap(); - Http::new() + Server::new() .serve_connection(socket, svc) .with_upgrades() .await @@ -1385,7 +1384,7 @@ async fn upgrades_ignored() { }); let (socket, _) = listener.accept().await.unwrap(); - Http::new() + Server::new() .serve_connection(socket, svc) .with_upgrades() .await @@ -1456,7 +1455,7 @@ async fn http_connect_new() { }); let (socket, _) = listener.accept().await.unwrap(); - Http::new() + Server::new() .serve_connection(socket, svc) .with_upgrades() .await @@ -1494,7 +1493,7 @@ async fn parse_errors_send_4xx_response() { }); let (socket, _) = listener.accept().await.unwrap(); - Http::new() + Server::new() .serve_connection(socket, HelloWorld) .await .expect_err("HTTP parse error"); @@ -1517,7 +1516,7 @@ async fn illegal_request_length_returns_400_response() { }); let (socket, _) = listener.accept().await.unwrap(); - Http::new() + Server::new() .serve_connection(socket, HelloWorld) .await .expect_err("illegal Content-Length should error"); @@ -1527,12 +1526,12 @@ async fn illegal_request_length_returns_400_response() { #[should_panic] fn max_buf_size_panic_too_small() { const MAX: usize = 8191; - Http::new().max_buf_size(MAX); + Server::new().max_buf_size(MAX); } #[test] fn max_buf_size_no_panic() { const MAX: usize = 8193; - Http::new().max_buf_size(MAX); + Server::new().max_buf_size(MAX); } #[tokio::test] @@ -1555,7 +1554,7 @@ async fn max_buf_size() { }); let (socket, _) = listener.accept().await.unwrap(); - Http::new() + Server::new() .max_buf_size(MAX) .serve_connection(socket, HelloWorld) .await @@ -1730,42 +1729,42 @@ impl tower_service::Service> for Http2ReadyErrorSvc { } } -#[tokio::test] -#[ignore] // sometimes ECONNRESET wins the race -async fn http2_service_poll_ready_error_sends_goaway() { - use std::error::Error; +// #[tokio::test] +// #[ignore] // sometimes ECONNRESET wins the race +// async fn http2_service_poll_ready_error_sends_goaway() { +// use std::error::Error; - let _ = pretty_env_logger::try_init(); +// let _ = pretty_env_logger::try_init(); - let server = hyper::Server::bind(&([127, 0, 0, 1], 0).into()) - .http2_only(true) - .serve(make_service_fn(|_| async move { - Ok::<_, BoxError>(Http2ReadyErrorSvc) - })); +// let server = hyper::Server::bind(&([127, 0, 0, 1], 0).into()) +// .http2_only(true) +// .serve(make_service_fn(|_| async move { +// Ok::<_, BoxError>(Http2ReadyErrorSvc) +// })); - let addr_str = format!("http://{}", server.local_addr()); +// let addr_str = format!("http://{}", server.local_addr()); - tokio::task::spawn(async move { - server.await.expect("server"); - }); +// tokio::task::spawn(async move { +// server.await.expect("server"); +// }); - let uri = addr_str.parse().expect("server addr should parse"); - let err = dbg!(Client::builder() - .http2_only(true) - .build_http::() - .get(uri) - .await - .expect_err("client.get should fail")); +// let uri = addr_str.parse().expect("server addr should parse"); +// let err = dbg!(Client::builder() +// .http2_only(true) +// .build_http::() +// .get(uri) +// .await +// .expect_err("client.get should fail")); - // client request should have gotten the specific GOAWAY error... - let h2_err = err - .source() - .expect("source") - .downcast_ref::() - .expect("downcast"); +// // client request should have gotten the specific GOAWAY error... +// let h2_err = err +// .source() +// .expect("source") +// .downcast_ref::() +// .expect("downcast"); - assert_eq!(h2_err.reason(), Some(h2::Reason::INADEQUATE_SECURITY)); -} +// assert_eq!(h2_err.reason(), Some(h2::Reason::INADEQUATE_SECURITY)); +// } #[test] fn skips_content_length_for_304_responses() { @@ -1856,7 +1855,7 @@ async fn http2_keep_alive_detects_unresponsive_client() { let (socket, _) = listener.accept().await.expect("accept"); - let err = Http::new() + let err = Server::new() .http2_only(true) .http2_keep_alive_interval(Duration::from_secs(1)) .http2_keep_alive_timeout(Duration::from_secs(1)) @@ -1877,7 +1876,7 @@ async fn http2_keep_alive_with_responsive_client() { tokio::spawn(async move { let (socket, _) = listener.accept().await.expect("accept"); - Http::new() + Server::new() .http2_only(true) .http2_keep_alive_interval(Duration::from_secs(1)) .http2_keep_alive_timeout(Duration::from_secs(1)) @@ -2214,28 +2213,30 @@ impl ServeOptions { .expect("rt new"); rt.block_on(async move { - let service = make_service_fn(|_| { - let msg_tx = msg_tx.clone(); - let reply_rx = reply_rx.clone(); - future::ok::<_, BoxError>(TestService { - tx: msg_tx, - reply: reply_rx, - }) - }); - - let server = Server::bind(&addr) - .http1_only(options.http1_only) - .http1_keepalive(options.keep_alive) - .http1_pipeline_flush(options.pipeline) - .serve(service); + let svc = TestService { + tx: msg_tx.clone(), + reply: reply_rx.clone(), + }; - addr_tx.send(server.local_addr()).expect("server addr tx"); + let mut server = Server::new(); - server - .with_graceful_shutdown(async { - let _ = shutdown_rx.await; - }) - .await + let mut serve = server + .http1_only(options.http1_only) + .http1_keep_alive(options.keep_alive) + .pipeline_flush(options.pipeline) + .bind(&addr) + .expect("bind error"); + + addr_tx.send(serve.local_addr()).expect("server addr tx"); + + tokio::select! { + res = serve.serve(svc)=> { + res + }, + _ = shutdown_rx => { + Ok(()) + } + } }) .expect("serve()"); }) diff --git a/tests/support/mod.rs b/tests/support/mod.rs index fd089bc561..36354576df 100644 --- a/tests/support/mod.rs +++ b/tests/support/mod.rs @@ -6,7 +6,8 @@ use std::sync::{ }; use hyper::client::HttpConnector; -use hyper::service::{make_service_fn, service_fn}; +use hyper::server::AddrIncoming; +use hyper::service::service_fn; use hyper::{Body, Client, Request, Response, Server, Version}; pub use futures_util::{ @@ -332,49 +333,59 @@ async fn async_test(cfg: __TestConfig) { let expected_connections = cfg.connections; let mut cnt = 0; - let new_service = make_service_fn(move |_| { - cnt += 1; - assert!( - cnt <= expected_connections, - "server expected {} connections, received {}", - expected_connections, - cnt - ); - - // Move a clone into the service_fn - let serve_handles = serve_handles.clone(); - future::ok::<_, hyper::Error>(service_fn(move |req: Request| { - let (sreq, sres) = serve_handles.lock().unwrap().remove(0); - - assert_eq!(req.uri().path(), sreq.uri, "client path"); - assert_eq!(req.method(), &sreq.method, "client method"); - assert_eq!(req.version(), version, "client version"); - for func in &sreq.headers { - func(&req.headers()); - } - let sbody = sreq.body; - hyper::body::to_bytes(req).map_ok(move |body| { - assert_eq!(body.as_ref(), sbody.as_slice(), "client body"); - - let mut res = Response::builder() - .status(sres.status) - .body(Body::from(sres.body)) - .expect("Response::build"); - *res.headers_mut() = sres.headers; - res - }) - })) - }); - let server = hyper::Server::bind(&SocketAddr::from(([127, 0, 0, 1], 0))) - .http2_only(cfg.server_version == 2) - .serve(new_service); + let mut incoming = + AddrIncoming::bind(&SocketAddr::from(([127, 0, 0, 1], 0))).expect("bind error"); + let mut addr = incoming.local_addr(); + + let mut server = Server::new(); + server.http2_only(cfg.server_version == 2); + + let srv = async move { + loop { + let conn = incoming.accept().await.expect("accept error"); + + cnt += 1; + assert!( + cnt <= expected_connections, + "server expected {} connections, received {}", + expected_connections, + cnt + ); + + // Move a clone into the service_fn + let serve_handles = serve_handles.clone(); + + let svc = service_fn(move |req: Request| { + let (sreq, sres) = serve_handles.lock().unwrap().remove(0); + + assert_eq!(req.uri().path(), sreq.uri, "client path"); + assert_eq!(req.method(), &sreq.method, "client method"); + assert_eq!(req.version(), version, "client version"); + for func in &sreq.headers { + func(&req.headers()); + } + let sbody = sreq.body; + hyper::body::to_bytes(req).map_ok(move |body| { + assert_eq!(body.as_ref(), sbody.as_slice(), "client body"); + + let mut res = Response::builder() + .status(sres.status) + .body(Body::from(sres.body)) + .expect("Response::build"); + *res.headers_mut() = sres.headers; + res + }) + }); - let mut addr = server.local_addr(); + server + .serve_connection(conn, svc) + .await + .expect("connection error"); + } + }; - tokio::task::spawn(server.map(|result| { - result.expect("server error"); - })); + tokio::spawn(srv); if cfg.proxy { let (proxy_addr, proxy) = naive_proxy(ProxyConfig { @@ -455,18 +466,35 @@ fn naive_proxy(cfg: ProxyConfig) -> (SocketAddr, impl Future) { let max_connections = cfg.connections; let counter = AtomicUsize::new(0); - let srv = Server::bind(&([127, 0, 0, 1], 0).into()).serve(make_service_fn(move |_| { - let prev = counter.fetch_add(1, Ordering::Relaxed); - assert!(max_connections > prev, "proxy max connections"); - let client = client.clone(); - future::ok::<_, hyper::Error>(service_fn(move |mut req| { - let uri = format!("http://{}{}", dst_addr, req.uri().path()) - .parse() - .expect("proxy new uri parse"); - *req.uri_mut() = uri; - client.request(req) - })) - })); - let proxy_addr = srv.local_addr(); - (proxy_addr, srv.map(|res| res.expect("proxy error"))) + let mut incoming = AddrIncoming::bind(&([127, 0, 0, 1], 0).into()).expect("proxy bind error"); + let proxy_addr = incoming.local_addr(); + + let server = Server::new(); + + let srv = async move { + loop { + let conn = incoming.accept().await.expect("Accept error"); + + let prev = counter.fetch_add(1, Ordering::Relaxed); + assert!(max_connections > prev, "proxy max connections"); + let client = client.clone(); + + let conn_fut = server.serve_connection( + conn, + service_fn(move |mut req| { + let uri = format!("http://{}{}", dst_addr, req.uri().path()) + .parse() + .expect("proxy new uri parse"); + *req.uri_mut() = uri; + client.request(req) + }), + ); + + tokio::spawn(async move { + conn_fut.await.expect("connection error"); + }); + } + }; + + (proxy_addr, srv) }