diff --git a/.cargo/config b/.cargo/config.toml similarity index 100% rename from .cargo/config rename to .cargo/config.toml diff --git a/Cargo.toml b/Cargo.toml index ee68bc3..ffbb94a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,13 +4,13 @@ version = "0.1.0" edition = "2021" [dependencies] -axum = { version = "0.6.18", features = ["macros"] } +axum = { version = "^0.7", features = ["macros"] } async-trait = "0.1.57" clap = { version = "4.2.1", features = ["derive"] } color-eyre = { version = "0.6.2", default-features = false } ctrlc = "3.2.3" futures-util = "0.3.21" -reqwest = { version = "0.11.16", default-features = false, features = [ +reqwest = { version = "^0.12", default-features = false, features = [ "json", "native-tls-vendored", ] } @@ -29,19 +29,22 @@ serde = "*" left-right = "0.11.5" tonic = { version = "*", optional = true } prost = { version = "*", optional = true } -hyper = { version = "0.14.26", features = ["stream"] } -http = "0.2.9" -tower-http = { version = "0.4.3", default-features = false, features = [ +hyper = { version = "^1" } +http = "^1" +tower-http = { version = "^0", default-features = false, features = [ "compression-gzip", ] } -flate2 = "1.0.26" +flate2 = "^1.0" opentelemetry = "0.19.0" tracing-opentelemetry = "0.19.0" opentelemetry-jaeger = "0.18.0" +hyper-util = { version = "0.1.6", features = ["server", "http2", "tokio"] } +tower = "0.4.13" [dev-dependencies] rand = { version = "*" } criterion = "*" +http-body-util = "0.1.2" [features] default = ["grpc", "codec"] @@ -55,3 +58,6 @@ harness = false [[bench]] name = "depth_monitor_scale" harness = false + +#[patch."https://github.com/CAGS295/lob.git"] +#lob = { path = "../lob" } diff --git a/benches/depth_monitor_scale.rs b/benches/depth_monitor_scale.rs index 1436b23..5985942 100644 --- a/benches/depth_monitor_scale.rs +++ b/benches/depth_monitor_scale.rs @@ -1,24 +1,27 @@ +use axum::body::Body; use criterion::{black_box, criterion_group, criterion_main, Criterion}; use flate2::read::GzDecoder; use http::header::ACCEPT_ENCODING; use http::Request; -use hyper::body::to_bytes; -use hyper::Body; -use hyper::Client; +use http::Response; +use http_body_util::BodyExt; +use hyper_util::client::legacy::Client; +use hyper_util::rt::TokioExecutor; +use hyper_util::rt::TokioTimer; use lob::Decode; use lob::LimitOrderBook; -use std::io::Read; -use std::time::Duration; +use std::{io::Read, time::Duration}; fn random_gets(c: &mut Criterion) { let rt = tokio::runtime::Builder::new_current_thread() .enable_all() .build() .expect("Failed building the Runtime"); - let client = Client::builder() + let client = Client::builder(TokioExecutor::new()) .http2_only(true) - .http2_keep_alive_interval(Some(Duration::from_millis(20))) - .build_http::(); + .http2_keep_alive_interval(Some(Duration::from_millis(200))) + .timer(TokioTimer::new()) + .build_http(); c.bench_function("depth scale", |b| { b.iter(|| { @@ -26,15 +29,15 @@ fn random_gets(c: &mut Criterion) { .header(ACCEPT_ENCODING, "gzip") .method("GET") .uri("http://[::1]:50051/scale/depth/btcusdt") - .body(Body::default()) + .body(Body::empty()) .unwrap(); - let response = rt.block_on(client.request(req)).unwrap(); + let response: Response<_> = rt.block_on(client.request(req)).unwrap(); let body = response.into_body(); - let bytes = rt.block_on(to_bytes(body)).unwrap(); - let mut x = Vec::new(); - GzDecoder::new(&bytes[..]).read_to_end(&mut x).unwrap(); + let bytes: Vec = rt.block_on(body.collect()).unwrap().to_bytes().into(); + let mut buf = Vec::new(); + GzDecoder::new(&bytes[..]).read_to_end(&mut buf).unwrap(); - let book_snapshot = LimitOrderBook::decode(&mut x.as_slice()).unwrap(); + let book_snapshot = LimitOrderBook::decode(&mut buf.as_slice()).unwrap(); black_box(book_snapshot); }) diff --git a/examples/depth_sampler_scale.rs b/examples/depth_sampler_scale.rs index d6c222d..0d0538b 100644 --- a/examples/depth_sampler_scale.rs +++ b/examples/depth_sampler_scale.rs @@ -1,12 +1,11 @@ use flate2::read::GzDecoder; use http::header::{ACCEPT_ENCODING, CONTENT_ENCODING}; -use http::{Request, StatusCode}; -use hyper::body::to_bytes; -use hyper::{Body, Client}; +use http::StatusCode; use lob::Decode; use lob::LimitOrderBook; use rand::rngs::SmallRng; use rand::{Rng, SeedableRng}; +use reqwest::{Client, Method}; use std::io::Read; use std::time::Duration; use std::{println, thread}; @@ -30,19 +29,16 @@ async fn main() -> Result<(), Box> { b / 2 ); - let client = Client::builder().http2_only(true).build_http::(); + let client = Client::new(); while !signal.is_terminated() { - let request = Request::builder() + let req = client + .request(Method::GET, "http://[::1]:50051/scale/depth/btcusdt") .header(ACCEPT_ENCODING, "gzip") - .method("GET") - .uri("http://[::1]:50051/scale/depth/btcusdt") - .body(Body::default()) - .unwrap(); + .build()?; - let future = client.request(request); + let response = client.execute(req).await?; - let response = future.await?; match response.status() { StatusCode::OK => {} err => { @@ -63,7 +59,7 @@ async fn main() -> Result<(), Box> { } None => false, }; - let bytes = to_bytes(response.into_body()).await?; + let bytes = response.bytes().await?; let mut decode_output_buffer; let mut bytes = if decode { decode_output_buffer = Vec::new(); diff --git a/src/connectors/handler.rs b/src/connectors/handler.rs index af659bc..4fef097 100644 --- a/src/connectors/handler.rs +++ b/src/connectors/handler.rs @@ -15,12 +15,12 @@ pub(crate) trait EventHandler { /// A handler shoud be identifiable from a Self::Update. /// This is necessary to route updates to their handler when processing multiple subscriptions from the same source. - fn to_id<'a>(event: &'a Self::Update) -> &'a str; + fn to_id(event: &Self::Update) -> &str; /// Use the Result to break out of the handler loop; /// Handle a raw message. fn handle(&mut self, msg: Message) -> Result<(), ()> { - let Some(update) = Self::parse_update(msg)? else{ + let Some(update) = Self::parse_update(msg)? else { //Skip if not a relevant update. return Ok(()); }; diff --git a/src/connectors/multiplexor.rs b/src/connectors/multiplexor.rs index cc978a9..3afddd5 100644 --- a/src/connectors/multiplexor.rs +++ b/src/connectors/multiplexor.rs @@ -24,14 +24,14 @@ where H::parse_update(event) } - fn to_id<'b>(event: &'b Self::Update) -> &'b str { + fn to_id(event: &Self::Update) -> &str { H::to_id(event) } fn handle_update(&mut self, update: Self::Update) -> Result<(), ()> { let id = Self::to_id(&update); - let Some(handle) = self.writers.get_mut(id)else{ + let Some(handle) = self.writers.get_mut(id) else { error!("Unknown handle {id}"); return Ok(()); }; diff --git a/src/monitor/order_book.rs b/src/monitor/order_book.rs index 26a6320..13ecab6 100644 --- a/src/monitor/order_book.rs +++ b/src/monitor/order_book.rs @@ -95,7 +95,7 @@ impl EventHandler for OrderBook { } } - fn to_id<'a>(update: &'a DepthUpdate) -> &'a str { + fn to_id(update: &DepthUpdate) -> &str { &update.event.symbol } @@ -176,7 +176,7 @@ mod test { }; let x = DepthUpdate { - asks: asks, + asks, ..Default::default() }; //Skip the first publish optimization. diff --git a/src/providers/binance.rs b/src/providers/binance.rs index 6176696..62950da 100644 --- a/src/providers/binance.rs +++ b/src/providers/binance.rs @@ -39,7 +39,7 @@ mod test { #[test] fn subscription_serialization() { - let symbols = vec!["a", "b"]; + let symbols = ["a", "b"]; let value = Binance.ws_subscriptions(symbols.iter()); assert_eq!( value, diff --git a/src/servers/mod.rs b/src/servers/mod.rs index 6b4b9b1..9c0685d 100644 --- a/src/servers/mod.rs +++ b/src/servers/mod.rs @@ -1,15 +1,18 @@ pub mod grpc; pub mod scale; -use axum::Router; +use axum::{extract::Request, Router}; +use futures_util::TryFutureExt; #[cfg(feature = "grpc")] pub use grpc::{limit_order_book_service_client, Pair}; -use hyper::server::conn::AddrIncoming; +use hyper::body::Incoming; +use hyper_util::rt::{TokioExecutor, TokioIo, TokioTimer}; use left_right::ReadHandleFactory; use std::collections::HashMap; use std::net::Ipv6Addr; use std::time::Duration; use tokio::sync::mpsc::UnboundedReceiver; +use tower::Service; use tracing::{error, info, warn}; #[derive(Clone)] @@ -53,28 +56,50 @@ async fn inner_start( #[cfg(feature = "codec")] let router = { - use axum::{handler::Handler, routing::get}; use tower_http::compression::CompressionLayer; router.route( "/scale/depth/:symbol", - get(scale::serve_book.layer(CompressionLayer::new())).with_state(Hook(factory)), + axum::routing::get(scale::serve_book) + .with_state(Hook(factory)) + .layer(CompressionLayer::new()), ) }; - let app = router.into_make_service(); - let incoming = AddrIncoming::bind(&addr).unwrap(); - axum::Server::builder(incoming) - .tcp_keepalive_interval(Some(Duration::from_millis(500))) - .tcp_nodelay(true) - .http2_only(true) - .serve(app) - .await - .map_err(|e| { - error!("{e}"); - })?; + let listener = tokio::net::TcpListener::bind(addr).await.unwrap(); - Ok(()) + loop { + let (stream, addr) = match listener.accept().await { + Ok(conn) => conn, + Err(e) => { + error!("{e}"); + continue; + } + }; + + info!("Connection {addr}, accepted"); + + stream.set_nodelay(true).unwrap(); + + let tower_service = router.clone(); + tokio::spawn(async move { + let stream = TokioIo::new(stream); + let hyper_service = hyper::service::service_fn(move |req: Request| { + tower_service.clone().call(req) + }); + + let mut builder = hyper_util::server::conn::auto::Builder::new(TokioExecutor::new()); + //builder.keep_alive_interval(Some(Duration::from_millis(500))); + //builder.timer(TokioTimer::new()); + + let x = builder + .serve_connection(stream, hyper_service) + .map_err(|e| { + error!("{e}"); + }); + x.await.unwrap(); + }); + } } pub fn start( diff --git a/src/signals.rs b/src/signals.rs index 732303f..31ec4bc 100644 --- a/src/signals.rs +++ b/src/signals.rs @@ -4,6 +4,12 @@ use std::sync::{atomic::AtomicBool, Arc}; #[derive(Clone)] pub struct Terminate(Arc); +impl Default for Terminate { + fn default() -> Self { + Self::new() + } +} + impl Terminate { pub fn new() -> Terminate { let flag = Terminate(Arc::new(AtomicBool::new(false)));