Skip to content

Commit

Permalink
Sync run loop to block production (#2884)
Browse files Browse the repository at this point in the history
# Description
fixes: #486

In order to achieve one block per auction we need to synchronize the
autopilot run loop to the block production.
At the moment this will actually slow us down because the solution
submission is not yet prepared for 1 auction per block so I tied this
behavior to a CLI flag.

After adding the `next_block` function to the `current_block` module I
noticed what a silly name that is and also renamed it to `block_stream`.
And then I also renamed `CurrentBlockStream` to `CurrentBlockWatcher`
since it's actually not a stream.
LMK if you don't like the names or feel strongly if I should do them in
a separate PR.

## How to test
Only added a unit test for a new function that awaits the next block
  • Loading branch information
MartinquaXD authored Aug 13, 2024
1 parent 776b137 commit e7de6bf
Show file tree
Hide file tree
Showing 53 changed files with 187 additions and 111 deletions.
16 changes: 16 additions & 0 deletions crates/autopilot/src/arguments.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,10 @@ pub struct Arguments {
/// before the deployment of the factory)
#[clap(long, env, use_value_delimiter = true)]
pub cow_amm_configs: Vec<CowAmmConfig>,

/// Controls start of the run loop.
#[clap(long, env, default_value = "unsynchronized")]
pub run_loop_mode: RunLoopMode,
}

impl std::fmt::Display for Arguments {
Expand Down Expand Up @@ -270,6 +274,7 @@ impl std::fmt::Display for Arguments {
max_settlement_transaction_wait,
s3,
cow_amm_configs,
run_loop_mode,
} = self;

write!(f, "{}", shared)?;
Expand Down Expand Up @@ -347,6 +352,7 @@ impl std::fmt::Display for Arguments {
)?;
writeln!(f, "s3: {:?}", s3)?;
writeln!(f, "cow_amm_configs: {:?}", cow_amm_configs)?;
writeln!(f, "run_loop_mode: {:?}", run_loop_mode)?;
Ok(())
}
}
Expand Down Expand Up @@ -507,6 +513,16 @@ impl FromStr for CowAmmConfig {
}
}

/// Controls the timing of the run loop.
#[derive(clap::Parser, clap::ValueEnum, Clone, Debug, Default)]
pub enum RunLoopMode {
/// The run loop starts with the next mined block.
SyncToBlockchain,
/// The run loop starts whenever the previous loop ends.
#[default]
Unsynchronized,
}

#[cfg(test)]
mod test {
use super::*;
Expand Down
2 changes: 1 addition & 1 deletion crates/autopilot/src/boundary/events/settlement.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use {
crate::{database::Postgres, on_settlement_event_updater::OnSettlementEventUpdater},
anyhow::Result,
ethrpc::current_block::RangeInclusive,
ethrpc::block_stream::RangeInclusive,
shared::{event_handling::EventStoring, impl_event_retrieving},
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use {
crate::database::{events::bytes_to_order_uid, Postgres},
anyhow::Result,
database::ethflow_orders::Refund,
ethrpc::current_block::RangeInclusive,
ethrpc::block_stream::RangeInclusive,
shared::event_handling::EventStoring,
};

Expand Down
2 changes: 1 addition & 1 deletion crates/autopilot/src/database/onchain_order_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use {
},
ethcontract::{Event as EthContractEvent, H160},
ethrpc::{
current_block::{timestamp_of_block_in_seconds, RangeInclusive},
block_stream::{timestamp_of_block_in_seconds, RangeInclusive},
Web3,
},
futures::{stream, StreamExt},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use {
},
ethcontract::Event as EthContractEvent,
ethrpc::{
current_block::{block_number_to_block_number_hash, BlockNumberHash},
block_stream::{block_number_to_block_number_hash, BlockNumberHash},
Web3,
},
hex_literal::hex,
Expand Down
2 changes: 1 addition & 1 deletion crates/autopilot/src/event_updater.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use {
anyhow::Result,
ethrpc::current_block::{BlockNumberHash, BlockRetrieving},
ethrpc::block_stream::{BlockNumberHash, BlockRetrieving},
shared::{
event_handling::{EventHandler, EventRetrieving, EventStoring},
maintenance::Maintaining,
Expand Down
8 changes: 4 additions & 4 deletions crates/autopilot/src/infra/blockchain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use {
self::contracts::Contracts,
crate::{boundary, domain::eth},
ethcontract::dyns::DynWeb3,
ethrpc::current_block::CurrentBlockStream,
ethrpc::block_stream::CurrentBlockWatcher,
primitive_types::U256,
std::time::Duration,
thiserror::Error,
Expand Down Expand Up @@ -72,7 +72,7 @@ impl Rpc {
pub struct Ethereum {
web3: DynWeb3,
chain: ChainId,
current_block: CurrentBlockStream,
current_block: CurrentBlockWatcher,
contracts: Contracts,
}

Expand All @@ -93,7 +93,7 @@ impl Ethereum {
let contracts = Contracts::new(&web3, &chain, addresses).await;

Self {
current_block: ethrpc::current_block::current_block_stream(url, poll_interval)
current_block: ethrpc::block_stream::current_block_stream(url, poll_interval)
.await
.expect("couldn't initialize current block stream"),
web3,
Expand All @@ -108,7 +108,7 @@ impl Ethereum {

/// Returns a stream that monitors the block chain to inform about the
/// current and new blocks.
pub fn current_block(&self) -> &CurrentBlockStream {
pub fn current_block(&self) -> &CurrentBlockWatcher {
&self.current_block
}

Expand Down
12 changes: 11 additions & 1 deletion crates/autopilot/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use {
clap::Parser,
contracts::{BalancerV2Vault, IUniswapV3Factory},
ethcontract::{dyns::DynWeb3, errors::DeployError, BlockNumber},
ethrpc::current_block::block_number_to_block_number_hash,
ethrpc::block_stream::block_number_to_block_number_hash,
futures::StreamExt,
model::DomainSeparator,
shared::{
Expand Down Expand Up @@ -515,6 +515,7 @@ pub async fn run(args: Arguments) {
in_flight_orders: Default::default(),
persistence: persistence.clone(),
liveness: liveness.clone(),
synchronization: args.run_loop_mode,
};
run.run_forever().await;
unreachable!("run loop exited");
Expand Down Expand Up @@ -568,12 +569,21 @@ async fn shadow_mode(args: Arguments) -> ! {
let liveness = Arc::new(Liveness::new(args.max_auction_age));
shared::metrics::serve_metrics(liveness.clone(), args.metrics_address);

let current_block = ethrpc::block_stream::current_block_stream(
args.shared.node_url,
args.shared.current_block.block_stream_poll_interval,
)
.await
.expect("couldn't initialize current block stream");

let shadow = shadow::RunLoop::new(
orderbook,
drivers,
trusted_tokens,
args.solve_deadline,
liveness.clone(),
args.run_loop_mode,
current_block,
);
shadow.run_forever().await;

Expand Down
14 changes: 9 additions & 5 deletions crates/autopilot/src/run_loop.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
use {
crate::{
arguments::RunLoopMode,
database::competition::Competition,
domain::{
self,
auction::order::Class,
competition::{
SolutionError,
{self},
},
competition::{self, SolutionError},
OrderUid,
},
infra::{
Expand Down Expand Up @@ -52,13 +50,20 @@ pub struct RunLoop {
pub solve_deadline: Duration,
pub in_flight_orders: Arc<Mutex<Option<InFlightOrders>>>,
pub liveness: Arc<Liveness>,
pub synchronization: RunLoopMode,
}

impl RunLoop {
pub async fn run_forever(self) -> ! {
let mut last_auction = None;
let mut last_block = None;
loop {
if let RunLoopMode::SyncToBlockchain = self.synchronization {
let _ = ethrpc::block_stream::next_block(self.eth.current_block()).await;
} else {
tokio::time::sleep(Duration::from_secs(1)).await;
}

if let Some(domain::AuctionWithId { id, auction }) = self.next_auction().await {
let current_block = self.eth.current_block().borrow().hash;
// Only run the solvers if the auction or block has changed.
Expand All @@ -74,7 +79,6 @@ impl RunLoop {
.await;
}
};
tokio::time::sleep(Duration::from_secs(1)).await;
}
}

Expand Down
11 changes: 11 additions & 0 deletions crates/autopilot/src/shadow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
use {
crate::{
arguments::RunLoopMode,
domain::{self, auction::order::Class},
infra::{
self,
Expand All @@ -18,6 +19,7 @@ use {
run_loop::observe,
},
::observe::metrics,
ethrpc::block_stream::CurrentBlockWatcher,
number::nonzero::U256 as NonZeroU256,
primitive_types::{H160, U256},
rand::seq::SliceRandom,
Expand All @@ -34,6 +36,8 @@ pub struct RunLoop {
block: u64,
solve_deadline: Duration,
liveness: Arc<Liveness>,
synchronization: RunLoopMode,
current_block: CurrentBlockWatcher,
}

impl RunLoop {
Expand All @@ -43,6 +47,8 @@ impl RunLoop {
trusted_tokens: AutoUpdatingTokenList,
solve_deadline: Duration,
liveness: Arc<Liveness>,
synchronization: RunLoopMode,
current_block: CurrentBlockWatcher,
) -> Self {
Self {
orderbook,
Expand All @@ -52,12 +58,17 @@ impl RunLoop {
block: 0,
solve_deadline,
liveness,
synchronization,
current_block,
}
}

pub async fn run_forever(mut self) -> ! {
let mut previous = None;
loop {
if let RunLoopMode::SyncToBlockchain = self.synchronization {
let _ = ethrpc::block_stream::next_block(&self.current_block).await;
};
let Some(domain::AuctionWithId { id, auction }) = self.next_auction().await else {
tokio::time::sleep(Duration::from_secs(1)).await;
continue;
Expand Down
6 changes: 3 additions & 3 deletions crates/autopilot/src/solvable_orders.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use {
anyhow::Result,
bigdecimal::BigDecimal,
database::order_events::OrderEventLabel,
ethrpc::current_block::CurrentBlockStream,
ethrpc::block_stream::CurrentBlockWatcher,
indexmap::IndexSet,
itertools::Itertools,
model::{
Expand Down Expand Up @@ -99,7 +99,7 @@ impl SolvableOrdersCache {
banned_users: banned::Users,
balance_fetcher: Arc<dyn BalanceFetching>,
bad_token_detector: Arc<dyn BadTokenDetecting>,
current_block: CurrentBlockStream,
current_block: CurrentBlockWatcher,
native_price_estimator: Arc<CachingNativePriceEstimator>,
signature_validator: Arc<dyn SignatureValidating>,
update_interval: Duration,
Expand Down Expand Up @@ -463,7 +463,7 @@ fn filter_dust_orders(mut orders: Vec<Order>, balances: &Balances) -> Vec<Order>
async fn update_task(
cache: Weak<SolvableOrdersCache>,
update_interval: Duration,
current_block: CurrentBlockStream,
current_block: CurrentBlockWatcher,
) {
loop {
// We are not updating on block changes because
Expand Down
2 changes: 1 addition & 1 deletion crates/cow-amm/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use {
crate::Amm,
contracts::{cow_amm_legacy_helper::Event as CowAmmEvent, CowAmmLegacyHelper},
ethcontract::Address,
ethrpc::current_block::RangeInclusive,
ethrpc::block_stream::RangeInclusive,
shared::event_handling::EventStoring,
std::{collections::BTreeMap, sync::Arc},
tokio::sync::RwLock,
Expand Down
6 changes: 3 additions & 3 deletions crates/cow-amm/src/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use {
crate::{cache::Storage, factory::Factory, maintainers::EmptyPoolRemoval, Amm},
contracts::CowAmmLegacyHelper,
ethcontract::Address,
ethrpc::{current_block::CurrentBlockStream, Web3},
ethrpc::{block_stream::CurrentBlockWatcher, Web3},
shared::{
event_handling::EventHandler,
maintenance::{Maintaining, ServiceMaintenance},
Expand All @@ -15,12 +15,12 @@ use {
#[derive(Clone, Debug)]
pub struct Registry {
web3: Web3,
current_block_stream: CurrentBlockStream,
current_block_stream: CurrentBlockWatcher,
storage: Arc<RwLock<Vec<Storage>>>,
}

impl Registry {
pub fn new(web3: Web3, current_block_stream: CurrentBlockStream) -> Self {
pub fn new(web3: Web3, current_block_stream: CurrentBlockWatcher) -> Self {
Self {
storage: Default::default(),
web3,
Expand Down
6 changes: 3 additions & 3 deletions crates/driver/src/boundary/liquidity/balancer/v2/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use {
BalancerV2WeightedPoolFactoryV3,
GPv2Settlement,
},
ethrpc::current_block::{BlockRetrieving, CurrentBlockStream},
ethrpc::block_stream::{BlockRetrieving, CurrentBlockWatcher},
shared::{
http_solver::model::TokenAmount,
sources::balancer_v2::{
Expand Down Expand Up @@ -76,7 +76,7 @@ fn to_interaction(

pub fn collector(
eth: &Ethereum,
block_stream: CurrentBlockStream,
block_stream: CurrentBlockWatcher,
block_retriever: Arc<dyn BlockRetrieving>,
config: &infra::liquidity::config::BalancerV2,
) -> Box<dyn LiquidityCollecting> {
Expand All @@ -99,7 +99,7 @@ pub fn collector(

async fn init_liquidity(
eth: &Ethereum,
block_stream: &CurrentBlockStream,
block_stream: &CurrentBlockWatcher,
block_retriever: Arc<dyn BlockRetrieving>,
config: &infra::liquidity::config::BalancerV2,
) -> Result<impl LiquidityCollecting> {
Expand Down
4 changes: 2 additions & 2 deletions crates/driver/src/boundary/liquidity/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use {
infra::{self, blockchain::Ethereum},
},
anyhow::Result,
ethrpc::current_block::CurrentBlockStream,
ethrpc::block_stream::CurrentBlockWatcher,
futures::future,
model::TokenPair,
shared::{
Expand Down Expand Up @@ -52,7 +52,7 @@ fn http_client() -> reqwest::Client {
}

pub struct Fetcher {
blocks: CurrentBlockStream,
blocks: CurrentBlockWatcher,
inner: LiquidityCollector,
swapr_routers: HashSet<eth::ContractAddress>,
}
Expand Down
4 changes: 2 additions & 2 deletions crates/driver/src/boundary/liquidity/swapr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use {
domain::liquidity::{self, swapr},
infra::{self, blockchain::Ethereum},
},
ethrpc::current_block::CurrentBlockStream,
ethrpc::block_stream::CurrentBlockWatcher,
shared::sources::{
swapr::reader::SwaprPoolReader,
uniswap_v2::pool_fetching::DefaultPoolReader,
Expand Down Expand Up @@ -40,7 +40,7 @@ pub fn to_domain(id: liquidity::Id, pool: ConstantProductOrder) -> Result<liquid

pub async fn collector(
eth: &Ethereum,
blocks: &CurrentBlockStream,
blocks: &CurrentBlockWatcher,
config: &infra::liquidity::config::Swapr,
) -> Result<Box<dyn LiquidityCollecting>> {
let eth = eth.with_metric_label("swapr".into());
Expand Down
Loading

0 comments on commit e7de6bf

Please sign in to comment.