diff --git a/crates/driver/src/domain/competition/mod.rs b/crates/driver/src/domain/competition/mod.rs index 13d69fb32b..986ae2f489 100644 --- a/crates/driver/src/domain/competition/mod.rs +++ b/crates/driver/src/domain/competition/mod.rs @@ -1,6 +1,6 @@ use { self::solution::settlement, - super::{time, Mempools}, + super::{time, time::Remaining, Mempools}, crate::{ domain::{competition::solution::Settlement, eth}, infra::{ @@ -34,7 +34,7 @@ pub use { risk::{ObjectiveValue, SuccessProbability}, Score, }, - solution::{Solution, SolverScore, SolverTimeout}, + solution::{Solution, SolverScore}, }; /// An ongoing competition. There is one competition going on per solver at any @@ -55,6 +55,7 @@ pub struct Competition { impl Competition { /// Solve an auction as part of this competition. pub async fn solve(&self, auction: &Auction) -> Result, Error> { + tracing::warn!("solveee"); let liquidity = match self.solver.liquidity() { solver::Liquidity::Fetch => { self.liquidity @@ -70,7 +71,7 @@ impl Competition { // Fetch the solutions from the solver. let solutions = self .solver - .solve(auction, &liquidity, auction.deadline().solvers()?.into()) + .solve(auction, &liquidity) .await .tap_err(|err| { if err.is_timeout() { @@ -78,7 +79,7 @@ impl Competition { } })?; - observe::postprocessing(&solutions, auction.deadline().driver().unwrap_or_default()); + observe::postprocessing(&solutions, auction.deadline().driver()); // Discard solutions that don't have unique ID. let mut ids = HashSet::new(); @@ -125,7 +126,7 @@ impl Competition { // timeout is reached. let mut settlements = Vec::new(); if tokio::time::timeout( - auction.deadline().driver().unwrap_or_default(), + auction.deadline().driver().remaining().unwrap_or_default(), merge_settlements(&mut settlements, encoded, &self.eth, &self.simulator), ) .await @@ -197,7 +198,7 @@ impl Competition { // Re-simulate the solution on every new block until the deadline ends to make // sure we actually submit a working solution close to when the winner // gets picked by the procotol. - if let Ok(remaining) = auction.deadline().driver() { + if let Ok(remaining) = auction.deadline().driver().remaining() { let score_ref = &mut score; let simulate_on_new_blocks = async move { let mut stream = @@ -229,6 +230,7 @@ impl Competition { } pub async fn reveal(&self) -> Result { + tracing::warn!("reveal"); let settlement = self .settlement .lock() @@ -255,6 +257,7 @@ impl Competition { /// Execute the solution generated as part of this competition. Use /// [`Competition::solve`] to generate the solution. pub async fn settle(&self) -> Result { + tracing::warn!("settle"); let settlement = self .settlement .lock() @@ -263,6 +266,7 @@ impl Competition { .ok_or(Error::SolutionNotAvailable)?; let executed = self.mempools.execute(&self.solver, &settlement).await; + tracing::warn!("settlement executed"); notify::executed( &self.solver, settlement.auction_id, diff --git a/crates/driver/src/domain/competition/solution/mod.rs b/crates/driver/src/domain/competition/solution/mod.rs index 6ab6773aa0..716c7b994e 100644 --- a/crates/driver/src/domain/competition/solution/mod.rs +++ b/crates/driver/src/domain/competition/solution/mod.rs @@ -6,7 +6,6 @@ use { eth::{self, TokenAddress}, }, infra::{ - self, blockchain::{self, Ethereum}, simulator, solver::Solver, @@ -247,31 +246,6 @@ impl std::fmt::Debug for Solution { } } -/// The time limit passed to the solver for solving an auction. -#[derive(Debug, Clone, Copy)] -pub struct SolverTimeout(chrono::Duration); - -impl SolverTimeout { - pub fn deadline(self) -> chrono::DateTime { - infra::time::now() + self.0 - } - - pub fn duration(self) -> chrono::Duration { - self.0 - } - - #[must_use] - pub fn reduce(self, duration: chrono::Duration) -> Self { - Self(self.0 - duration) - } -} - -impl From for SolverTimeout { - fn from(duration: std::time::Duration) -> Self { - Self(chrono::Duration::from_std(duration).unwrap_or(chrono::Duration::max_value())) - } -} - /// Carries information how the score should be calculated. #[derive(Debug, Clone)] pub enum SolverScore { diff --git a/crates/driver/src/domain/quote.rs b/crates/driver/src/domain/quote.rs index 67146641bb..8a5142b48d 100644 --- a/crates/driver/src/domain/quote.rs +++ b/crates/driver/src/domain/quote.rs @@ -85,11 +85,7 @@ impl Order { }; let solutions = solver - .solve( - &self.fake_auction(eth, tokens).await?, - &liquidity, - self.deadline.solvers()?.into(), - ) + .solve(&self.fake_auction(eth, tokens).await?, &liquidity) .await?; Quote::new( eth, @@ -156,7 +152,7 @@ impl Order { }, ] .into_iter(), - Default::default(), + self.deadline, eth, Default::default(), ) diff --git a/crates/driver/src/domain/time.rs b/crates/driver/src/domain/time.rs index 86408e72e9..9bd7cc637b 100644 --- a/crates/driver/src/domain/time.rs +++ b/crates/driver/src/domain/time.rs @@ -36,20 +36,23 @@ impl Deadline { /// Remaining time until the deadline for driver to return solution to /// autopilot is reached. - pub fn driver(self) -> Result { - Self::remaining(self.driver) + pub fn driver(self) -> chrono::DateTime { + self.driver } /// Remaining time until the deadline for solvers to return solution to /// driver is reached. - pub fn solvers(self) -> Result { - Self::remaining(self.solvers) + pub fn solvers(self) -> chrono::DateTime { + self.solvers } +} - fn remaining( - deadline: chrono::DateTime, - ) -> Result { - let deadline = deadline - infra::time::now(); +pub trait Remaining { + fn remaining(self) -> Result; +} +impl Remaining for chrono::DateTime { + fn remaining(self) -> Result { + let deadline = self - infra::time::now(); if deadline <= chrono::Duration::zero() { Err(DeadlineExceeded) } else { diff --git a/crates/driver/src/infra/notify/mod.rs b/crates/driver/src/infra/notify/mod.rs index cd96a9006c..d2bb3880aa 100644 --- a/crates/driver/src/infra/notify/mod.rs +++ b/crates/driver/src/infra/notify/mod.rs @@ -120,6 +120,7 @@ pub fn executed( Err(Error::Other(_)) => notification::Settlement::Fail, }; + tracing::warn!("notify"); solver.notify( Some(auction_id), solution_id, diff --git a/crates/driver/src/infra/observe/mod.rs b/crates/driver/src/infra/observe/mod.rs index bb476d453b..b41b1efb79 100644 --- a/crates/driver/src/infra/observe/mod.rs +++ b/crates/driver/src/infra/observe/mod.rs @@ -18,7 +18,7 @@ use { eth::{self, Gas}, mempools, quote::{self, Quote}, - time::Deadline, + time::{Deadline, Remaining}, Liquidity, }, infra::solver, @@ -90,10 +90,10 @@ pub fn empty_solution(solver: &solver::Name, id: solution::Id) { // Observe that postprocessing (encoding & merging) of solutions is about to // start. -pub fn postprocessing(solutions: &[Solution], deadline: std::time::Duration) { +pub fn postprocessing(solutions: &[Solution], deadline: chrono::DateTime) { tracing::debug!( solutions = ?solutions.len(), - remaining = ?deadline, + remaining = ?deadline.remaining(), "postprocessing solutions" ); } diff --git a/crates/driver/src/infra/solver/dto/auction.rs b/crates/driver/src/infra/solver/dto/auction.rs index df424c86c5..5021bebe68 100644 --- a/crates/driver/src/infra/solver/dto/auction.rs +++ b/crates/driver/src/infra/solver/dto/auction.rs @@ -16,7 +16,6 @@ impl Auction { pub fn new( auction: &competition::Auction, liquidity: &[liquidity::Liquidity], - timeout: competition::SolverTimeout, weth: eth::WethAddress, ) -> Self { let mut tokens: HashMap = auction @@ -200,7 +199,7 @@ impl Auction { .collect(), tokens, effective_gas_price: auction.gas_price().effective().into(), - deadline: timeout.deadline(), + deadline: auction.deadline().solvers(), } } } diff --git a/crates/driver/src/infra/solver/mod.rs b/crates/driver/src/infra/solver/mod.rs index 133c2a3add..8bae1d41e9 100644 --- a/crates/driver/src/infra/solver/mod.rs +++ b/crates/driver/src/infra/solver/mod.rs @@ -5,10 +5,10 @@ use { competition::{ auction::{self, Auction}, solution::{self, Solution}, - SolverTimeout, }, eth, liquidity, + time::Remaining, }, infra::blockchain::Ethereum, util, @@ -153,26 +153,17 @@ impl Solver { &self, auction: &Auction, liquidity: &[liquidity::Liquidity], - timeout: SolverTimeout, ) -> Result, Error> { // Fetch the solutions from the solver. let weth = self.eth.contracts().weth_address(); - let body = serde_json::to_string(&dto::Auction::new( - auction, - liquidity, - // Reduce the timeout by a small buffer to account for network latency. Otherwise the - // HTTP timeout might happen before the solver times out its search algorithm. - timeout.reduce(self.config.timeouts.http_delay), - weth, - )) - .unwrap(); + let body = serde_json::to_string(&dto::Auction::new(auction, liquidity, weth)).unwrap(); let url = shared::url::join(&self.config.endpoint, "solve"); super::observe::solver_request(&url, &body); let mut req = self .client .post(url.clone()) .body(body) - .timeout(timeout.duration().to_std().unwrap()); + .timeout(auction.deadline().solvers().remaining().unwrap()); if let Some(id) = observe::request_id::get_task_local_storage() { req = req.header("X-REQUEST-ID", id); } @@ -202,8 +193,10 @@ impl Solver { if let Some(id) = observe::request_id::get_task_local_storage() { req = req.header("X-REQUEST-ID", id); } + tracing::warn!("solver notification almost sent"); let future = async move { - let _ = util::http::send(SOLVER_RESPONSE_MAX_BYTES, req).await; + let result = util::http::send(SOLVER_RESPONSE_MAX_BYTES, req).await; + tracing::warn!(?result, "solver notification sent"); }; tokio::task::spawn(future.in_current_span()); } diff --git a/crates/driver/src/tests/setup/solver.rs b/crates/driver/src/tests/setup/solver.rs index 10fdf778d0..ae165cbbca 100644 --- a/crates/driver/src/tests/setup/solver.rs +++ b/crates/driver/src/tests/setup/solver.rs @@ -1,7 +1,10 @@ use { super::{blockchain, blockchain::Blockchain, Partial}, crate::{ - domain::{competition::order, time}, + domain::{ + competition::order, + time::{self}, + }, infra::{ self, blockchain::contracts::Addresses, @@ -241,7 +244,7 @@ impl Solver { "orders": orders_json, "liquidity": [], "effectiveGasPrice": effective_gas_price, - "deadline": infra::time::now() + chrono::Duration::from_std(deadline.solvers().unwrap()).unwrap() - http_delay, + "deadline": deadline.solvers(), }); assert_eq!(req, expected, "unexpected /solve request"); let mut state = state.0.lock().unwrap(); diff --git a/crates/solvers/src/api/routes/notify/mod.rs b/crates/solvers/src/api/routes/notify/mod.rs index f2e47b52a9..821ef7ae16 100644 --- a/crates/solvers/src/api/routes/notify/mod.rs +++ b/crates/solvers/src/api/routes/notify/mod.rs @@ -10,7 +10,7 @@ pub async fn notify( let notification = notification.to_domain(); let auction_id = notification.auction_id; - tracing::trace!(?auction_id, ?notification); + tracing::warn!(?auction_id, ?notification); state.notify(notification); axum::http::StatusCode::OK diff --git a/crates/solvers/src/domain/auction.rs b/crates/solvers/src/domain/auction.rs index 35ecc9fd86..26ac8cc104 100644 --- a/crates/solvers/src/domain/auction.rs +++ b/crates/solvers/src/domain/auction.rs @@ -99,4 +99,9 @@ impl Deadline { .to_std() .ok() } + + /// Returns a new deadline with the specified duration subtracted. + pub fn reduce(self, duration: chrono::Duration) -> Self { + Self(self.0 - duration) + } } diff --git a/crates/solvers/src/domain/solver/baseline.rs b/crates/solvers/src/domain/solver/baseline.rs index d3a0e805c5..a90ee2131c 100644 --- a/crates/solvers/src/domain/solver/baseline.rs +++ b/crates/solvers/src/domain/solver/baseline.rs @@ -24,6 +24,10 @@ use { pub struct Baseline(Arc); +/// The amount of time we aim the solver to finish before the final deadline is +/// reached. +const DEADLINE_SLACK: chrono::Duration = chrono::Duration::milliseconds(500); + struct Inner { weth: eth::WethAddress, @@ -70,7 +74,12 @@ impl Baseline { // the real async things. For larger settlements, this can block in the // 100s of ms. let (sender, mut receiver) = tokio::sync::mpsc::unbounded_channel(); - let deadline = auction.deadline.remaining().unwrap_or_default(); + let deadline = auction + .deadline + .clone() + .reduce(DEADLINE_SLACK) + .remaining() + .unwrap_or_default(); let inner = self.0.clone(); let span = tracing::Span::current();