diff --git a/crates/driver/src/domain/competition/mod.rs b/crates/driver/src/domain/competition/mod.rs index 6fb308978a..2ecde0fdc7 100644 --- a/crates/driver/src/domain/competition/mod.rs +++ b/crates/driver/src/domain/competition/mod.rs @@ -362,7 +362,7 @@ impl Competition { self.settle_queue.try_send(request).map_err(|err| { tracing::warn!(?err, "Failed to enqueue /settle request"); - Error::SubmissionError + Error::TooManyPendingSettlements })?; response_receiver.await.map_err(|err| { @@ -374,7 +374,7 @@ impl Competition { pub fn ensure_settle_queue_capacity(&self) -> Result<(), Error> { if self.settle_queue.capacity() == 0 { tracing::warn!("settlement queue is full; auction is rejected"); - Err(Error::SettlementQueueIsFull) + Err(Error::TooManyPendingSettlements) } else { Ok(()) } @@ -611,5 +611,5 @@ pub enum Error { #[error("failed to submit the solution")] SubmissionError, #[error("too many pending settlements for the same solver")] - SettlementQueueIsFull, + TooManyPendingSettlements, } diff --git a/crates/driver/src/infra/api/error.rs b/crates/driver/src/infra/api/error.rs index e02b7dcbb1..e788bbe5bf 100644 --- a/crates/driver/src/infra/api/error.rs +++ b/crates/driver/src/infra/api/error.rs @@ -11,6 +11,7 @@ use { enum Kind { QuotingFailed, SolverFailed, + TooManyPendingSettlements, SolutionNotAvailable, DeadlineExceeded, Unknown, @@ -51,6 +52,7 @@ impl From for (hyper::StatusCode, axum::Json) { or sell amount" } Kind::FailedToSubmit => "Could not submit the solution to the blockchain", + Kind::TooManyPendingSettlements => "Settlement queue is full", }; ( hyper::StatusCode::BAD_REQUEST, @@ -83,7 +85,7 @@ impl From for (hyper::StatusCode, axum::Json) { competition::Error::DeadlineExceeded(_) => Kind::DeadlineExceeded, competition::Error::Solver(_) => Kind::SolverFailed, competition::Error::SubmissionError => Kind::FailedToSubmit, - competition::Error::SettlementQueueIsFull => Kind::SolverFailed, + competition::Error::TooManyPendingSettlements => Kind::TooManyPendingSettlements, }; error.into() } diff --git a/crates/driver/src/infra/observe/mod.rs b/crates/driver/src/infra/observe/mod.rs index a743527990..f8c1394e54 100644 --- a/crates/driver/src/infra/observe/mod.rs +++ b/crates/driver/src/infra/observe/mod.rs @@ -368,7 +368,7 @@ fn competition_error(err: &competition::Error) -> &'static str { competition::Error::Solver(solver::Error::Deserialize(_)) => "SolverDeserializeError", competition::Error::Solver(solver::Error::Dto(_)) => "SolverDtoError", competition::Error::SubmissionError => "SubmissionError", - competition::Error::SettlementQueueIsFull => "SettlementQueueIsFull", + competition::Error::TooManyPendingSettlements => "TooManyPendingSettlements", } } diff --git a/crates/driver/src/tests/cases/buy_eth.rs b/crates/driver/src/tests/cases/buy_eth.rs index 35bf50f699..dde42a0c20 100644 --- a/crates/driver/src/tests/cases/buy_eth.rs +++ b/crates/driver/src/tests/cases/buy_eth.rs @@ -16,5 +16,10 @@ async fn test() { .await; let id = test.solve().await.ok().orders(&[order]).id(); - test.settle(id).await.ok().await.eth_order_executed().await; + test.settle(id) + .await + .ok() + .await + .eth_order_executed(&test) + .await; } diff --git a/crates/driver/src/tests/cases/merge_settlements.rs b/crates/driver/src/tests/cases/merge_settlements.rs index 8c16ecaa6f..2747ac2307 100644 --- a/crates/driver/src/tests/cases/merge_settlements.rs +++ b/crates/driver/src/tests/cases/merge_settlements.rs @@ -42,9 +42,9 @@ async fn possible() { // combination of the two, meaning the settlements were merged successfully. .ok() .await - .ab_order_executed() + .ab_order_executed(&test) .await - .cd_order_executed() + .cd_order_executed(&test) .await; } @@ -92,7 +92,12 @@ async fn impossible() { // Only the first A-B order gets settled. let id = test.solve().await.ok().orders(&[order]).id(); test.reveal(id).await.ok().calldata(); - test.settle(id).await.ok().await.ab_order_executed().await; + test.settle(id) + .await + .ok() + .await + .ab_order_executed(&test) + .await; } /// Test that mergable solutions don't get merged if feature was not enabled. @@ -115,5 +120,10 @@ async fn possible_but_forbidden() { // not because solution merging is not enabled by default. let id = test.solve().await.ok().orders(&[ab_order]).id(); test.reveal(id).await.ok().calldata(); - test.settle(id).await.ok().await.ab_order_executed().await; + test.settle(id) + .await + .ok() + .await + .ab_order_executed(&test) + .await; } diff --git a/crates/driver/src/tests/cases/parallel_auctions.rs b/crates/driver/src/tests/cases/parallel_auctions.rs index cac7e3f186..8acc114a25 100644 --- a/crates/driver/src/tests/cases/parallel_auctions.rs +++ b/crates/driver/src/tests/cases/parallel_auctions.rs @@ -47,7 +47,7 @@ async fn driver_handles_solutions_based_on_id() { .await .ok() .await - .eth_order_executed() + .eth_order_executed(&test) .await; // calling `/reveal` or `/settle` with for a legit solution that @@ -89,7 +89,12 @@ async fn driver_can_settle_old_solutions() { // Technically this is not super convincing since all remembered solutions // are identical but this is the best we are going to get without needing // to heavily modify the testing framework. - test.settle(id1).await.ok().await.eth_order_executed().await; + test.settle(id1) + .await + .ok() + .await + .eth_order_executed(&test) + .await; } /// Tests that the driver only remembers a relatively small number of solutions. diff --git a/crates/driver/src/tests/cases/settle.rs b/crates/driver/src/tests/cases/settle.rs index fd50e1779c..599ba35e82 100644 --- a/crates/driver/src/tests/cases/settle.rs +++ b/crates/driver/src/tests/cases/settle.rs @@ -7,6 +7,9 @@ use { setup::{ab_order, ab_pool, ab_solution}, }, }, + futures::future::join_all, + itertools::Itertools, + std::{sync::Arc, time::Duration}, web3::Transport, }; @@ -30,7 +33,12 @@ async fn matrix() { .await; let id = test.solve().await.ok().id(); - test.settle(id).await.ok().await.ab_order_executed().await; + test.settle(id) + .await + .ok() + .await + .ab_order_executed(&test) + .await; } } } @@ -110,3 +118,174 @@ async fn high_gas_limit() { .unwrap(); test.settle(id).await.ok().await; } + +#[tokio::test] +#[ignore] +async fn discards_excess_settle_and_solve_requests() { + let test = Arc::new( + tests::setup() + .allow_multiple_solve_requests() + .pool(ab_pool()) + .order(ab_order()) + .solution(ab_solution()) + .settle_submission_deadline(6) + .done() + .await, + ); + + // MAX_SOLUTION_STORAGE = 5. Since this is hardcoded, no more solutions can be + // stored. + let solution_ids = join_all(vec![ + test.solve(), + test.solve(), + test.solve(), + test.solve(), + test.solve(), + ]) + .await + .into_iter() + .map(|res| res.ok().id()) + .collect::>(); + + let unique_solutions_count = solution_ids.iter().unique().count(); + assert_eq!(unique_solutions_count, solution_ids.len()); + + // Disable auto mining to accumulate all the settlement requests. + test.set_auto_mining(false).await; + + // To avoid race conditions with the settlement queue processing, a + // `/settle` request needs to be sent first, so it is dequeued, and it's + // execution is paused before any subsequent request is received. + let test_clone = Arc::clone(&test); + let first_solution_id = solution_ids[0]; + let first_settlement_fut = + tokio::spawn(async move { test_clone.settle(first_solution_id).await }); + // Make sure the first settlement gets dequeued before sending the remaining + // requests. + tokio::time::sleep(Duration::from_millis(100)).await; + let remaining_solutions = solution_ids[1..].to_vec(); + let remaining_settlements = { + let test_clone = Arc::clone(&test); + remaining_solutions.into_iter().map(move |id| { + let test_clone = Arc::clone(&test_clone); + async move { test_clone.settle(id).await } + }) + }; + let remaining_settlements_fut = tokio::spawn(join_all(remaining_settlements)); + + // Sleep for a bit to make sure all the settlement requests are queued. + tokio::time::sleep(Duration::from_millis(500)).await; + + // While there is no room in the settlement queue, `/solve` requests must be + // rejected. + test.solve().await.err().kind("TooManyPendingSettlements"); + + // Enable auto mining to process all the settlement requests. + test.set_auto_mining(true).await; + + // The first settlement must be successful. + let first_settlement = first_settlement_fut.await.unwrap(); + first_settlement.ok().await.ab_order_executed(&test).await; + + let remaining_settlements = remaining_settlements_fut.await.unwrap(); + assert_eq!(remaining_settlements.len(), 4); + + for (idx, result) in remaining_settlements.into_iter().enumerate() { + match idx { + // The next 2 settlements failed to submit due to the framework's limitation(unable to + // fulfill the same order again). + 0 | 1 => result.err().kind("FailedToSubmit"), + // All the subsequent settlements rejected due to the settlement queue being full. + 2 | 3 => result.err().kind("TooManyPendingSettlements"), + _ => unreachable!(), + } + } + + // `/solve` works again. + test.solve().await.ok(); +} + +#[tokio::test] +#[ignore] +async fn accepts_new_settle_requests_after_timeout() { + let test = Arc::new( + tests::setup() + .allow_multiple_solve_requests() + .pool(ab_pool()) + .order(ab_order()) + .solution(ab_solution()) + .settle_submission_deadline(6) + .done() + .await, + ); + + // MAX_SOLUTION_STORAGE = 5. Since this is hardcoded, no more solutions can be + // stored. + let solution_ids = join_all(vec![ + test.solve(), + test.solve(), + test.solve(), + test.solve(), + test.solve(), + ]) + .await + .into_iter() + .map(|res| res.ok().id()) + .collect::>(); + + let unique_solutions_count = solution_ids.iter().unique().count(); + assert_eq!(unique_solutions_count, solution_ids.len()); + + // Disable auto mining to accumulate all the settlement requests. + test.set_auto_mining(false).await; + + // To avoid race conditions with the settlement queue processing, a + // `/settle` request needs to be sent first, so it is dequeued, and it's + // execution is paused before any subsequent request is received. + let test_clone = Arc::clone(&test); + let first_solution_id = solution_ids[0]; + let first_settlement_fut = + tokio::spawn(async move { test_clone.settle(first_solution_id).await }); + // Make sure the first settlement gets dequeued before sending the remaining + // requests. + tokio::time::sleep(Duration::from_millis(100)).await; + // Send only 3 more settle requests. + let additional_solutions = solution_ids[1..4].to_vec(); + let additional_settlements = { + let test_clone = Arc::clone(&test); + additional_solutions.into_iter().map(move |id| { + let test_clone = Arc::clone(&test_clone); + async move { test_clone.settle(id).await } + }) + }; + let additional_settlements_fut = tokio::spawn(join_all(additional_settlements)); + + // Sleep for a bit to make sure all the settlement requests are queued. + tokio::time::sleep(Duration::from_millis(500)).await; + test.set_auto_mining(true).await; + + let first_settlement = first_settlement_fut.await.unwrap(); + // The first settlement must be successful. + first_settlement.ok().await.ab_order_executed(&test).await; + + let additional_settlements = additional_settlements_fut.await.unwrap(); + assert_eq!(additional_settlements.len(), 3); + + for (idx, result) in additional_settlements.into_iter().enumerate() { + match idx { + // The next 2 settlements failed to submit due to the framework's limitation(unable to + // fulfill the same order again). + 0 | 1 => result.err().kind("FailedToSubmit"), + // The next request gets rejected due to the settlement queue being full. + 2 => result.err().kind("TooManyPendingSettlements"), + _ => unreachable!(), + } + } + + // Now we send the last settlement request. It fails due to the framework's + // limitation(unable to fulfill the same order again). + test.settle(solution_ids[4]) + .await + .err() + .kind("FailedToSubmit"); +} diff --git a/crates/driver/src/tests/setup/blockchain.rs b/crates/driver/src/tests/setup/blockchain.rs index 9c99cb3c8c..2a3340c719 100644 --- a/crates/driver/src/tests/setup/blockchain.rs +++ b/crates/driver/src/tests/setup/blockchain.rs @@ -12,7 +12,7 @@ use { secp256k1::SecretKey, serde_json::json, std::collections::HashMap, - web3::signing::Key, + web3::{signing::Key, Transport}, }; // TODO Possibly might be a good idea to use an enum for tokens instead of @@ -857,6 +857,14 @@ impl Blockchain { _ => self.tokens.get(token).unwrap().address(), } } + + pub async fn set_auto_mining(&self, enabled: bool) { + self.web3 + .transport() + .execute("evm_setAutomine", vec![json!(enabled)]) + .await + .unwrap(); + } } async fn primary_address(web3: &DynWeb3) -> ethcontract::H160 { diff --git a/crates/driver/src/tests/setup/mod.rs b/crates/driver/src/tests/setup/mod.rs index dc22169e29..ed57f2fb2f 100644 --- a/crates/driver/src/tests/setup/mod.rs +++ b/crates/driver/src/tests/setup/mod.rs @@ -506,6 +506,7 @@ pub fn setup() -> Setup { rpc_args: vec!["--gas-limit".into(), "10000000".into()], allow_multiple_solve_requests: false, auction_id: 1, + settle_submission_deadline: 3, ..Default::default() } } @@ -539,6 +540,9 @@ pub struct Setup { allow_multiple_solve_requests: bool, /// Auction ID used during tests auction_id: i64, + /// The maximum number of blocks to wait for a settlement to appear on + /// chain. + settle_submission_deadline: u64, } /// The validity of a solution. @@ -849,6 +853,13 @@ impl Setup { self } + /// Set the maximum number of blocks to wait for a settlement to appear on + /// chain. + pub fn settle_submission_deadline(mut self, settle_submission_deadline: u64) -> Self { + self.settle_submission_deadline = settle_submission_deadline; + self + } + /// Create the test: set up onchain contracts and pools, start a mock HTTP /// server for the solver and start the HTTP server for the driver. pub async fn done(self) -> Test { @@ -966,6 +977,7 @@ impl Setup { trades: solutions.into_iter().flat_map(|s| s.trades).collect(), trusted, deadline, + settle_submission_deadline: self.settle_submission_deadline, quoted_orders: quotes, quote: self.quote, surplus_capturing_jit_order_owners, @@ -1006,6 +1018,7 @@ pub struct Test { trades: Vec, trusted: HashSet<&'static str>, deadline: chrono::DateTime, + settle_submission_deadline: u64, /// Is this testing the /quote endpoint? quote: bool, /// List of surplus capturing JIT-order owners @@ -1094,12 +1107,9 @@ impl Test { } pub async fn settle_with_solver(&self, solver_name: &str, solution_id: u64) -> Settle { - /// The maximum number of blocks to wait for a settlement to appear on - /// chain. - const SUBMISSION_DEADLINE: u64 = 3; let submission_deadline_latest_block: u64 = u64::try_from(self.web3().eth().block_number().await.unwrap()).unwrap() - + SUBMISSION_DEADLINE; + + self.settle_submission_deadline; let old_balances = self.balances().await; let res = self .client @@ -1127,7 +1137,6 @@ impl Test { Settle { old_balances, status: settle_status, - test: self, } } @@ -1174,6 +1183,10 @@ impl Test { pub fn set_auction_id(&mut self, auction_id: i64) { self.auction_id = auction_id; } + + pub async fn set_auto_mining(&self, enabled: bool) { + self.blockchain.set_auto_mining(enabled).await + } } /// A /solve response. @@ -1200,6 +1213,11 @@ impl<'a> Solve<'a> { blockchain: self.blockchain, } } + + pub fn err(self) -> SolveErr { + assert_ne!(self.status, hyper::StatusCode::OK); + SolveErr { body: self.body } + } } impl SolveOk<'_> { @@ -1339,6 +1357,23 @@ impl SolveOk<'_> { } } +pub struct SolveErr { + body: String, +} + +impl SolveErr { + /// Check the kind field in the error response. + pub fn kind(self, expected_kind: &str) { + let result: serde_json::Value = serde_json::from_str(&self.body).unwrap(); + assert!(result.is_object()); + assert_eq!(result.as_object().unwrap().len(), 2); + assert!(result.get("kind").is_some()); + assert!(result.get("description").is_some()); + let kind = result.get("kind").unwrap().as_str().unwrap(); + assert_eq!(kind, expected_kind); + } +} + /// A /reveal response. pub struct Reveal { status: StatusCode, @@ -1588,10 +1623,9 @@ pub enum Balance { } /// A /settle response. -pub struct Settle<'a> { +pub struct Settle { old_balances: HashMap<&'static str, eth::U256>, status: SettleStatus, - test: &'a Test, } #[derive(Debug, PartialEq)] @@ -1603,8 +1637,7 @@ pub enum SettleStatus { }, } -pub struct SettleOk<'a> { - test: &'a Test, +pub struct SettleOk { old_balances: HashMap<&'static str, eth::U256>, } @@ -1612,14 +1645,13 @@ pub struct SettleErr { body: String, } -impl<'a> Settle<'a> { +impl Settle { /// Expect the /settle endpoint to have returned a 200 OK response. - pub async fn ok(self) -> SettleOk<'a> { + pub async fn ok(self) -> SettleOk { // Ensure that the response is OK. assert_eq!(self.status, SettleStatus::Ok); SettleOk { - test: self.test, old_balances: self.old_balances, } } @@ -1636,10 +1668,10 @@ impl<'a> Settle<'a> { } } -impl<'a> SettleOk<'a> { +impl SettleOk { /// Check that the user balance changed. - pub async fn balance(self, token: &'static str, balance: Balance) -> SettleOk<'a> { - let new_balances = self.test.balances().await; + pub async fn balance(self, test: &Test, token: &'static str, balance: Balance) -> SettleOk { + let new_balances = test.balances().await; let new_balance = new_balances.get(token).unwrap(); let old_balance = self.old_balances.get(token).unwrap(); match balance { @@ -1651,41 +1683,57 @@ impl<'a> SettleOk<'a> { /// Ensure that the onchain balances changed in accordance with the /// [`ab_order`]. - pub async fn ab_order_executed(self) -> SettleOk<'a> { - self.balance("A", Balance::SmallerBy(AB_ORDER_AMOUNT.ether().into_wei())) - .await - .balance("B", Balance::Greater) - .await + pub async fn ab_order_executed(self, test: &Test) -> SettleOk { + self.balance( + test, + "A", + Balance::SmallerBy(AB_ORDER_AMOUNT.ether().into_wei()), + ) + .await + .balance(test, "B", Balance::Greater) + .await } /// Ensure that the onchain balances changed in accordance with the /// [`cd_order`]. - pub async fn cd_order_executed(self) -> SettleOk<'a> { - self.balance("C", Balance::SmallerBy(CD_ORDER_AMOUNT.ether().into_wei())) - .await - .balance("D", Balance::Greater) - .await + pub async fn cd_order_executed(self, test: &Test) -> SettleOk { + self.balance( + test, + "C", + Balance::SmallerBy(CD_ORDER_AMOUNT.ether().into_wei()), + ) + .await + .balance(test, "D", Balance::Greater) + .await } /// Ensure that the onchain balances changed in accordance with the /// [`eth_order`]. - pub async fn eth_order_executed(self) -> SettleOk<'a> { - self.balance("A", Balance::SmallerBy(ETH_ORDER_AMOUNT.ether().into_wei())) - .await - .balance("ETH", Balance::Greater) - .await + pub async fn eth_order_executed(self, test: &Test) -> SettleOk { + self.balance( + test, + "A", + Balance::SmallerBy(ETH_ORDER_AMOUNT.ether().into_wei()), + ) + .await + .balance(test, "ETH", Balance::Greater) + .await } } impl SettleErr { /// Check the kind field in the error response. - pub fn kind(self, expected_kind: &str) { + pub fn kind(&self, expected_kind: &str) { + assert_eq!(self.get_kind(), expected_kind); + } + + /// Extract the kind field from the error response. + pub fn get_kind(&self) -> String { let result: serde_json::Value = serde_json::from_str(&self.body).unwrap(); assert!(result.is_object()); assert_eq!(result.as_object().unwrap().len(), 2); assert!(result.get("kind").is_some()); assert!(result.get("description").is_some()); - let kind = result.get("kind").unwrap().as_str().unwrap(); - assert_eq!(kind, expected_kind); + result.get("kind").unwrap().as_str().unwrap().to_string() } } diff --git a/crates/driver/src/tests/setup/solver.rs b/crates/driver/src/tests/setup/solver.rs index ab45bf1a94..a53ea3d1b3 100644 --- a/crates/driver/src/tests/setup/solver.rs +++ b/crates/driver/src/tests/setup/solver.rs @@ -459,7 +459,7 @@ impl Solver { .0 .to_string(); let expected = json!({ - "id": (!config.quote).then_some("1" ), + "id": (!config.quote).then_some("1"), "tokens": tokens_json, "orders": orders_json, "liquidity": [],