Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Http #233

Merged
merged 12 commits into from
Oct 18, 2023
656 changes: 301 additions & 355 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,6 @@ volo-http = { path = "../volo-http" }
volo-gen = { path = "./volo-gen" }
bytes.workspace = true
http.workspace = true
hyper = { version = "1.0.0-rc.4", features = ["server", "http1", "http2"] }
hyper = { version = "1.0.0-rc.3", features = ["server", "http1", "http2"] }
motore.workspace = true
serde.workspace = true
15 changes: 9 additions & 6 deletions examples/src/http/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ use serde::{Deserialize, Serialize};
use volo_http::{
handler::HandlerService,
request::Json,
route::{Route, Router, Server, ServiceLayerExt},
route::{Route, Router, ServiceLayerExt},
server::Server,
HttpContext,
};

Expand Down Expand Up @@ -69,7 +70,7 @@ async fn test(

#[tokio::main(flavor = "multi_thread")]
async fn main() {
Router::new()
let app = Router::new()
.route(
"/",
Route::builder()
Expand All @@ -86,8 +87,10 @@ async fn main() {
.post(HandlerService::new(test))
.build(),
)
.layer(TimeoutLayer::new(Some(std::time::Duration::from_secs(1))))
.serve(SocketAddr::from(([127, 0, 0, 1], 3000)))
.await
.unwrap();
.layer(TimeoutLayer::new(Some(std::time::Duration::from_secs(1))));

let addr: SocketAddr = "[::]:9091".parse().unwrap();
let addr = volo::net::Address::from(addr);

Server::new(app).run(addr).await.unwrap();
}
7 changes: 4 additions & 3 deletions volo-http/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ categories = ["asynchronous", "network-programming", "web-programming"]
keywords = ["async", "http"]

[dependencies]
hyper = { version = "1.0.0-rc.4", features = ["server", "http1", "http2"] }
volo = { version = "0.5", path = "../volo" }
hyper = { version = "=1.0.0-rc.3", features = ["server", "http1", "http2"] }
tokio = { version = "1", features = ["full"] }
http-body-util = "0.1.0-rc.3"
http-body-util = "=0.1.0-rc.2"
http = { version = "0.2" }
hyper-util = { git = "https://github.com/hyperium/hyper-util.git" }
matchit = { version = "0.7" }
motore = { version = "0.3" }
tracing.workspace = true
Expand All @@ -29,6 +29,7 @@ thiserror.workspace = true
mime = "0.3"
serde = "1"
async-trait.workspace = true
parking_lot.workspace = true

[dev-dependencies]
serde = { version = "1", features = ["derive"] }
33 changes: 14 additions & 19 deletions volo-http/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,30 +8,22 @@ pub mod param;
pub mod request;
pub mod response;
pub mod route;
pub mod server;

use std::{future::Future, net::SocketAddr};
use std::future::Future;

use http::{Extensions, HeaderMap, HeaderValue, Method, Uri, Version};
use hyper::{
body::{Body, Incoming},
Request, Response,
};
use param::Params;
use volo::net::Address;

pub type DynError = Box<dyn std::error::Error + Send + Sync>;

pub struct HttpContextInner {
pub(crate) peer: SocketAddr,

pub(crate) method: Method,
pub(crate) uri: Uri,
pub(crate) version: Version,
pub(crate) headers: HeaderMap<HeaderValue>,
pub(crate) extensions: Extensions,
}

pub struct HttpContext {
pub peer: SocketAddr,
pub peer: Address,
pub method: Method,
pub uri: Uri,
pub version: Version,
Expand All @@ -43,14 +35,14 @@ pub struct HttpContext {

#[derive(Clone)]
pub struct MotoreService<S> {
peer: SocketAddr,
inner: S,
pub peer: Address,
pub inner: S,
}

impl<OB, S> hyper::service::Service<Request<Incoming>> for MotoreService<S>
where
OB: Body<Error = DynError>,
S: motore::Service<(), (HttpContextInner, Incoming), Response = Response<OB>> + Clone,
S: motore::Service<HttpContext, Incoming, Response = Response<OB>> + Clone,
S::Error: Into<DynError>,
{
type Response = S::Response;
Expand All @@ -59,20 +51,23 @@ where

type Future = impl Future<Output = Result<Self::Response, Self::Error>>;

fn call(&self, req: Request<Incoming>) -> Self::Future {
fn call(&mut self, req: Request<Incoming>) -> Self::Future {
let s = self.inner.clone();
let peer = self.peer;
let peer = self.peer.clone();
async move {
let (parts, req) = req.into_parts();
let cx = HttpContextInner {
let mut cx = HttpContext {
peer,
method: parts.method,
uri: parts.uri,
version: parts.version,
headers: parts.headers,
extensions: parts.extensions,
params: Params {
inner: Vec::with_capacity(0),
},
};
s.call(&mut (), (cx, req)).await
s.call(&mut cx, req).await
}
}
}
2 changes: 1 addition & 1 deletion volo-http/src/param.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::slice::Iter;
use bytes::{BufMut, Bytes, BytesMut};

pub struct Params {
inner: Vec<(Bytes, Bytes)>,
pub(crate) inner: Vec<(Bytes, Bytes)>,
}

impl From<matchit::Params<'_, '_>> for Params {
Expand Down
71 changes: 7 additions & 64 deletions volo-http/src/route.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,12 @@
use std::{future::Future, net::SocketAddr};
use std::future::Future;

use http::{Method, Response, StatusCode};
use http_body_util::Full;
use hyper::{
body::{Body, Bytes, Incoming},
server::conn::http1,
};
use hyper_util::rt::TokioIo;
use hyper::body::{Bytes, Incoming};
use motore::layer::Layer;
use tokio::net::TcpListener;

use crate::{
dispatch::DispatchService, request::FromRequest, response::RespBody, DynError, HttpContext,
HttpContextInner, MotoreService,
};

pub type DynService = motore::BoxCloneService<HttpContext, Incoming, Response<RespBody>, DynError>;
Expand All @@ -22,37 +16,24 @@ pub struct Router {
inner: matchit::Router<DynService>,
}

impl motore::Service<(), (HttpContextInner, Incoming)> for Router {
impl motore::Service<HttpContext, Incoming> for Router {
type Response = Response<RespBody>;

type Error = DynError;

type Future<'cx> = impl Future<Output = Result<Self::Response, Self::Error>> + Send + 'cx
where
HttpContextInner: 'cx,
HttpContext: 'cx,
Self: 'cx;

fn call<'cx, 's>(
&'s self,
_cx: &'cx mut (),
cxreq: (HttpContextInner, Incoming),
) -> Self::Future<'cx>
fn call<'cx, 's>(&'s self, cx: &'cx mut HttpContext, req: Incoming) -> Self::Future<'cx>
where
's: 'cx,
{
async move {
let (cx, req) = cxreq;
if let Ok(matched) = self.inner.at(cx.uri.path()) {
let mut context = HttpContext {
peer: cx.peer,
method: cx.method,
uri: cx.uri.clone(),
version: cx.version,
headers: cx.headers,
extensions: cx.extensions,
params: matched.params.into(),
};
matched.value.call(&mut context, req).await
cx.params = matched.params.into();
matched.value.call(cx, req).await
} else {
Ok(Response::builder()
.status(StatusCode::NOT_FOUND)
Expand Down Expand Up @@ -99,44 +80,6 @@ impl<S> ServiceLayerExt for S {
}
}

#[async_trait::async_trait]
pub trait Server {
async fn serve(self, addr: SocketAddr) -> Result<(), DynError>;
}
#[async_trait::async_trait]
impl<S, OB> Server for S
where
S: motore::Service<(), (HttpContextInner, Incoming), Response = Response<OB>>
+ Clone
+ Send
+ Sync
+ 'static,
OB: Body<Error = DynError> + Send + 'static,
<OB as Body>::Data: Send,
<S as motore::Service<(), (HttpContextInner, Incoming)>>::Error: Into<DynError>,
{
async fn serve(self, addr: SocketAddr) -> Result<(), DynError> {
let listener = TcpListener::bind(addr).await?;

let service = self;
loop {
let s = service.clone();
let (stream, peer) = listener.accept().await?;

let io = TokioIo::new(stream);

tokio::task::spawn(async move {
if let Err(err) = http1::Builder::new()
.serve_connection(io, MotoreService { peer, inner: s })
.await
{
tracing::warn!("error serving connection: {:?}", err);
}
});
}
}
}

#[derive(Default, Clone)]
pub struct Route {
options: Option<DynService>,
Expand Down
Loading
Loading