Skip to content

Commit

Permalink
Simplify deadline passing (#2222)
Browse files Browse the repository at this point in the history
# Description
There seems to be an issue with the deadline we are passing down to
solvers leading to timeouts while still having significant time left for
postprocessing. Take Laertes in barn auction 8222857 for instance:

> 2023-12-29T09:37:47.818Z DEBUG
request{id="2710"}:/solve{solver=laertes-solve auction_id=8222857}:
driver::infra::observe: computed deadline deadline=Deadline { driver:
2023-12-29T09:38:02.090509218Z, solvers:
2023-12-29T**09:38:00**.663289792Z } timeouts=Timeouts { http_delay:
Duration { secs: 0, nanos: 500000000 }, solving_share_of_deadline:
Percent(0.9) }
> 2023-12-29T09:37:49.476Z TRACE request{id="2710"}:/solve:
solvers::api::routes::solve: auction=Auction { id: Solve(8222857), ...
deadline: Deadline(2023-12-29T**09:37:58**.702187297Z}
...
> 2023-12-29T09:37:58.738Z WARN
request{id="2710"}:/solve:auction{id=8222857}:
solvers::domain::solver::legacy: failed to solve auction err=**Timeout**
> 2023-12-29T09:37:58.739Z DEBUG
request{id="2710"}:/solve{solver=laertes-solve auction_id=8222857}:
driver::infra::observe: postprocessing solutions solutions=0
**remaining=3.351353404s**

Here we aborted solver computation at 09:37:58 despite the original
solver deadline (first log) being almost two seconds later (09:38:00).
We can see that the deadline that is received by the solver engine is
already much smaller than what we computed in the driver. Looking at the
code we expect a reduction of `http_delay` (0.5s) but not 2s.

One thing to note is that the way we pass down solver deadlines is
surprisingly 🤡. We convert it into a duration to later on convert it
into a timestamp again. My hunch is that this is causing us to lose
precision and thus time. This PR simplifies this logic and hopes that
this will resolve the 2s time loss.

# Changes
- [x] Remove SolverTimeout type and argument being passed around in
favour of the timeouts that are already part of the auciton
- [x] Return DateTime instead of durations for solver/driver deadlines
- [x] Move `remaining` helper method into an extension trait to allow it
being used where needed
- [x] Remove the 500ms http-delay reduction in the request to the
solver.

We already have a buffer for postprocessing in the driver, and really it
should be the consumer (solver engine in this case) who adjusts the
deadline to take network latency into account. We do the same for the
autopilot<>driver deadline (the driver attempts to send their response
500ms before the deadline) and in fact also already account for another
1s buffer inside the legacy solver-engine
([code](https://github.com/cowprotocol/services/blob/1a8261857a726ffa6180533dac548ff0a0b696be/crates/shared/src/http_solver.rs#L121-L126))

## How to test
Existing tests

## Related Issues

Fixes #2211
  • Loading branch information
fleupold authored and sunce86 committed Jan 6, 2024
1 parent 96bd0a5 commit 99b69d2
Show file tree
Hide file tree
Showing 12 changed files with 55 additions and 68 deletions.
16 changes: 10 additions & 6 deletions crates/driver/src/domain/competition/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use {
self::solution::settlement,
super::{time, Mempools},
super::{time, time::Remaining, Mempools},
crate::{
domain::{competition::solution::Settlement, eth},
infra::{
Expand Down Expand Up @@ -34,7 +34,7 @@ pub use {
risk::{ObjectiveValue, SuccessProbability},
Score,
},
solution::{Solution, SolverScore, SolverTimeout},
solution::{Solution, SolverScore},
};

/// An ongoing competition. There is one competition going on per solver at any
Expand All @@ -55,6 +55,7 @@ pub struct Competition {
impl Competition {
/// Solve an auction as part of this competition.
pub async fn solve(&self, auction: &Auction) -> Result<Option<Solved>, Error> {
tracing::warn!("solveee");
let liquidity = match self.solver.liquidity() {
solver::Liquidity::Fetch => {
self.liquidity
Expand All @@ -70,15 +71,15 @@ impl Competition {
// Fetch the solutions from the solver.
let solutions = self
.solver
.solve(auction, &liquidity, auction.deadline().solvers()?.into())
.solve(auction, &liquidity)
.await
.tap_err(|err| {
if err.is_timeout() {
notify::solver_timeout(&self.solver, auction.id());
}
})?;

observe::postprocessing(&solutions, auction.deadline().driver().unwrap_or_default());
observe::postprocessing(&solutions, auction.deadline().driver());

// Discard solutions that don't have unique ID.
let mut ids = HashSet::new();
Expand Down Expand Up @@ -125,7 +126,7 @@ impl Competition {
// timeout is reached.
let mut settlements = Vec::new();
if tokio::time::timeout(
auction.deadline().driver().unwrap_or_default(),
auction.deadline().driver().remaining().unwrap_or_default(),
merge_settlements(&mut settlements, encoded, &self.eth, &self.simulator),
)
.await
Expand Down Expand Up @@ -197,7 +198,7 @@ impl Competition {
// Re-simulate the solution on every new block until the deadline ends to make
// sure we actually submit a working solution close to when the winner
// gets picked by the procotol.
if let Ok(remaining) = auction.deadline().driver() {
if let Ok(remaining) = auction.deadline().driver().remaining() {
let score_ref = &mut score;
let simulate_on_new_blocks = async move {
let mut stream =
Expand Down Expand Up @@ -229,6 +230,7 @@ impl Competition {
}

pub async fn reveal(&self) -> Result<Revealed, Error> {
tracing::warn!("reveal");
let settlement = self
.settlement
.lock()
Expand All @@ -255,6 +257,7 @@ impl Competition {
/// Execute the solution generated as part of this competition. Use
/// [`Competition::solve`] to generate the solution.
pub async fn settle(&self) -> Result<Settled, Error> {
tracing::warn!("settle");
let settlement = self
.settlement
.lock()
Expand All @@ -263,6 +266,7 @@ impl Competition {
.ok_or(Error::SolutionNotAvailable)?;

let executed = self.mempools.execute(&self.solver, &settlement).await;
tracing::warn!("settlement executed");
notify::executed(
&self.solver,
settlement.auction_id,
Expand Down
26 changes: 0 additions & 26 deletions crates/driver/src/domain/competition/solution/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use {
eth::{self, TokenAddress},
},
infra::{
self,
blockchain::{self, Ethereum},
simulator,
solver::Solver,
Expand Down Expand Up @@ -247,31 +246,6 @@ impl std::fmt::Debug for Solution {
}
}

/// The time limit passed to the solver for solving an auction.
#[derive(Debug, Clone, Copy)]
pub struct SolverTimeout(chrono::Duration);

impl SolverTimeout {
pub fn deadline(self) -> chrono::DateTime<chrono::Utc> {
infra::time::now() + self.0
}

pub fn duration(self) -> chrono::Duration {
self.0
}

#[must_use]
pub fn reduce(self, duration: chrono::Duration) -> Self {
Self(self.0 - duration)
}
}

impl From<std::time::Duration> for SolverTimeout {
fn from(duration: std::time::Duration) -> Self {
Self(chrono::Duration::from_std(duration).unwrap_or(chrono::Duration::max_value()))
}
}

/// Carries information how the score should be calculated.
#[derive(Debug, Clone)]
pub enum SolverScore {
Expand Down
8 changes: 2 additions & 6 deletions crates/driver/src/domain/quote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,7 @@ impl Order {
};

let solutions = solver
.solve(
&self.fake_auction(eth, tokens).await?,
&liquidity,
self.deadline.solvers()?.into(),
)
.solve(&self.fake_auction(eth, tokens).await?, &liquidity)
.await?;
Quote::new(
eth,
Expand Down Expand Up @@ -156,7 +152,7 @@ impl Order {
},
]
.into_iter(),
Default::default(),
self.deadline,
eth,
Default::default(),
)
Expand Down
19 changes: 11 additions & 8 deletions crates/driver/src/domain/time.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,20 +36,23 @@ impl Deadline {

/// Remaining time until the deadline for driver to return solution to
/// autopilot is reached.
pub fn driver(self) -> Result<std::time::Duration, DeadlineExceeded> {
Self::remaining(self.driver)
pub fn driver(self) -> chrono::DateTime<chrono::Utc> {
self.driver
}

/// Remaining time until the deadline for solvers to return solution to
/// driver is reached.
pub fn solvers(self) -> Result<std::time::Duration, DeadlineExceeded> {
Self::remaining(self.solvers)
pub fn solvers(self) -> chrono::DateTime<chrono::Utc> {
self.solvers
}
}

fn remaining(
deadline: chrono::DateTime<chrono::Utc>,
) -> Result<std::time::Duration, DeadlineExceeded> {
let deadline = deadline - infra::time::now();
pub trait Remaining {
fn remaining(self) -> Result<std::time::Duration, DeadlineExceeded>;
}
impl Remaining for chrono::DateTime<chrono::Utc> {
fn remaining(self) -> Result<std::time::Duration, DeadlineExceeded> {
let deadline = self - infra::time::now();
if deadline <= chrono::Duration::zero() {
Err(DeadlineExceeded)
} else {
Expand Down
1 change: 1 addition & 0 deletions crates/driver/src/infra/notify/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ pub fn executed(
Err(Error::Other(_)) => notification::Settlement::Fail,
};

tracing::warn!("notify");
solver.notify(
Some(auction_id),
solution_id,
Expand Down
6 changes: 3 additions & 3 deletions crates/driver/src/infra/observe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use {
eth::{self, Gas},
mempools,
quote::{self, Quote},
time::Deadline,
time::{Deadline, Remaining},
Liquidity,
},
infra::solver,
Expand Down Expand Up @@ -90,10 +90,10 @@ pub fn empty_solution(solver: &solver::Name, id: solution::Id) {

// Observe that postprocessing (encoding & merging) of solutions is about to
// start.
pub fn postprocessing(solutions: &[Solution], deadline: std::time::Duration) {
pub fn postprocessing(solutions: &[Solution], deadline: chrono::DateTime<chrono::Utc>) {
tracing::debug!(
solutions = ?solutions.len(),
remaining = ?deadline,
remaining = ?deadline.remaining(),
"postprocessing solutions"
);
}
Expand Down
3 changes: 1 addition & 2 deletions crates/driver/src/infra/solver/dto/auction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ impl Auction {
pub fn new(
auction: &competition::Auction,
liquidity: &[liquidity::Liquidity],
timeout: competition::SolverTimeout,
weth: eth::WethAddress,
) -> Self {
let mut tokens: HashMap<eth::H160, _> = auction
Expand Down Expand Up @@ -200,7 +199,7 @@ impl Auction {
.collect(),
tokens,
effective_gas_price: auction.gas_price().effective().into(),
deadline: timeout.deadline(),
deadline: auction.deadline().solvers(),
}
}
}
Expand Down
19 changes: 6 additions & 13 deletions crates/driver/src/infra/solver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ use {
competition::{
auction::{self, Auction},
solution::{self, Solution},
SolverTimeout,
},
eth,
liquidity,
time::Remaining,
},
infra::blockchain::Ethereum,
util,
Expand Down Expand Up @@ -153,26 +153,17 @@ impl Solver {
&self,
auction: &Auction,
liquidity: &[liquidity::Liquidity],
timeout: SolverTimeout,
) -> Result<Vec<Solution>, Error> {
// Fetch the solutions from the solver.
let weth = self.eth.contracts().weth_address();
let body = serde_json::to_string(&dto::Auction::new(
auction,
liquidity,
// Reduce the timeout by a small buffer to account for network latency. Otherwise the
// HTTP timeout might happen before the solver times out its search algorithm.
timeout.reduce(self.config.timeouts.http_delay),
weth,
))
.unwrap();
let body = serde_json::to_string(&dto::Auction::new(auction, liquidity, weth)).unwrap();
let url = shared::url::join(&self.config.endpoint, "solve");
super::observe::solver_request(&url, &body);
let mut req = self
.client
.post(url.clone())
.body(body)
.timeout(timeout.duration().to_std().unwrap());
.timeout(auction.deadline().solvers().remaining().unwrap());
if let Some(id) = observe::request_id::get_task_local_storage() {
req = req.header("X-REQUEST-ID", id);
}
Expand Down Expand Up @@ -202,8 +193,10 @@ impl Solver {
if let Some(id) = observe::request_id::get_task_local_storage() {
req = req.header("X-REQUEST-ID", id);
}
tracing::warn!("solver notification almost sent");
let future = async move {
let _ = util::http::send(SOLVER_RESPONSE_MAX_BYTES, req).await;
let result = util::http::send(SOLVER_RESPONSE_MAX_BYTES, req).await;
tracing::warn!(?result, "solver notification sent");
};
tokio::task::spawn(future.in_current_span());
}
Expand Down
7 changes: 5 additions & 2 deletions crates/driver/src/tests/setup/solver.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use {
super::{blockchain, blockchain::Blockchain, Partial},
crate::{
domain::{competition::order, time},
domain::{
competition::order,
time::{self},
},
infra::{
self,
blockchain::contracts::Addresses,
Expand Down Expand Up @@ -241,7 +244,7 @@ impl Solver {
"orders": orders_json,
"liquidity": [],
"effectiveGasPrice": effective_gas_price,
"deadline": infra::time::now() + chrono::Duration::from_std(deadline.solvers().unwrap()).unwrap() - http_delay,
"deadline": deadline.solvers(),
});
assert_eq!(req, expected, "unexpected /solve request");
let mut state = state.0.lock().unwrap();
Expand Down
2 changes: 1 addition & 1 deletion crates/solvers/src/api/routes/notify/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ pub async fn notify(
let notification = notification.to_domain();
let auction_id = notification.auction_id;

tracing::trace!(?auction_id, ?notification);
tracing::warn!(?auction_id, ?notification);
state.notify(notification);

axum::http::StatusCode::OK
Expand Down
5 changes: 5 additions & 0 deletions crates/solvers/src/domain/auction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,4 +99,9 @@ impl Deadline {
.to_std()
.ok()
}

/// Returns a new deadline with the specified duration subtracted.
pub fn reduce(self, duration: chrono::Duration) -> Self {
Self(self.0 - duration)
}
}
11 changes: 10 additions & 1 deletion crates/solvers/src/domain/solver/baseline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ use {

pub struct Baseline(Arc<Inner>);

/// The amount of time we aim the solver to finish before the final deadline is
/// reached.
const DEADLINE_SLACK: chrono::Duration = chrono::Duration::milliseconds(500);

struct Inner {
weth: eth::WethAddress,

Expand Down Expand Up @@ -70,7 +74,12 @@ impl Baseline {
// the real async things. For larger settlements, this can block in the
// 100s of ms.
let (sender, mut receiver) = tokio::sync::mpsc::unbounded_channel();
let deadline = auction.deadline.remaining().unwrap_or_default();
let deadline = auction
.deadline
.clone()
.reduce(DEADLINE_SLACK)
.remaining()
.unwrap_or_default();

let inner = self.0.clone();
let span = tracing::Span::current();
Expand Down

0 comments on commit 99b69d2

Please sign in to comment.