Skip to content

Commit

Permalink
Build auction serially in run loop (#2885)
Browse files Browse the repository at this point in the history
Fixes: #1673

# Description
Depending on the `run-loop-mode` we now either build the auction in a
background task like we do today or build it on the hot path of the
run-loop. I added the change in a somewhat weird place but mostly
because I wanted to have a minimal change given that the feature still
gets controlled via a CLI flag so the nice and tidy final design likely
cannot be achieved at the moment anyway.
  • Loading branch information
MartinquaXD authored Aug 13, 2024
1 parent e7de6bf commit a7ba48e
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 14 deletions.
2 changes: 1 addition & 1 deletion crates/autopilot/src/arguments.rs
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,7 @@ impl FromStr for CowAmmConfig {
}

/// Controls the timing of the run loop.
#[derive(clap::Parser, clap::ValueEnum, Clone, Debug, Default)]
#[derive(clap::Parser, clap::ValueEnum, Clone, Debug, Default, Copy)]
pub enum RunLoopMode {
/// The run loop starts with the next mined block.
SyncToBlockchain,
Expand Down
4 changes: 1 addition & 3 deletions crates/autopilot/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -457,10 +457,8 @@ pub async fn run(args: Arguments) {
),
balance_fetcher.clone(),
bad_token_detector.clone(),
eth.current_block().clone(),
native_price_estimator.clone(),
signature_validator.clone(),
args.auction_update_interval,
eth.contracts().weth().address(),
args.limit_order_price_factor
.try_into()
Expand Down Expand Up @@ -517,7 +515,7 @@ pub async fn run(args: Arguments) {
liveness: liveness.clone(),
synchronization: args.run_loop_mode,
};
run.run_forever().await;
run.run_forever(args.auction_update_interval).await;
unreachable!("run loop exited");
}

Expand Down
15 changes: 13 additions & 2 deletions crates/autopilot/src/run_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,23 @@ pub struct RunLoop {
}

impl RunLoop {
pub async fn run_forever(self) -> ! {
pub async fn run_forever(self, update_interval: Duration) -> ! {
if let RunLoopMode::Unsynchronized = self.synchronization {
SolvableOrdersCache::spawn_background_task(
&self.solvable_orders_cache,
self.eth.current_block().clone(),
update_interval,
);
}

let mut last_auction = None;
let mut last_block = None;
loop {
if let RunLoopMode::SyncToBlockchain = self.synchronization {
let _ = ethrpc::block_stream::next_block(self.eth.current_block()).await;
let block = ethrpc::block_stream::next_block(self.eth.current_block()).await;
if let Err(err) = self.solvable_orders_cache.update(block.number).await {
tracing::error!(?err, "failed to build a new auction");
}
} else {
tokio::time::sleep(Duration::from_secs(1)).await;
}
Expand Down
17 changes: 12 additions & 5 deletions crates/autopilot/src/solvable_orders.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,8 @@ impl SolvableOrdersCache {
banned_users: banned::Users,
balance_fetcher: Arc<dyn BalanceFetching>,
bad_token_detector: Arc<dyn BadTokenDetecting>,
current_block: CurrentBlockWatcher,
native_price_estimator: Arc<CachingNativePriceEstimator>,
signature_validator: Arc<dyn SignatureValidating>,
update_interval: Duration,
weth: H160,
limit_order_price_factor: BigDecimal,
protocol_fees: domain::ProtocolFees,
Expand All @@ -126,11 +124,20 @@ impl SolvableOrdersCache {
protocol_fees,
cow_amm_registry,
});
self_
}

/// Spawns a task that periodically updates the set of open orders
/// and builds a new auction with them.
pub fn spawn_background_task(
cache: &Arc<Self>,
block_stream: CurrentBlockWatcher,
update_interval: Duration,
) {
tokio::task::spawn(
update_task(Arc::downgrade(&self_), update_interval, current_block)
update_task(Arc::downgrade(cache), update_interval, block_stream)
.instrument(tracing::info_span!("solvable_orders_cache")),
);
self_
}

pub fn current_auction(&self) -> Option<domain::Auction> {
Expand All @@ -143,7 +150,7 @@ impl SolvableOrdersCache {
/// Usually this method is called from update_task. If it isn't, which is
/// the case in unit tests, then concurrent calls might overwrite each
/// other's results.
async fn update(&self, block: u64) -> Result<()> {
pub async fn update(&self, block: u64) -> Result<()> {
let min_valid_to = now_in_epoch_seconds() + self.min_order_validity_period.as_secs() as u32;
let db_solvable_orders = self.persistence.solvable_orders(min_valid_to).await?;

Expand Down
6 changes: 3 additions & 3 deletions crates/ethrpc/src/block_stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,12 +315,12 @@ fn update_block_metrics(current_block: u64, new_block: u64) {
}

/// Awaits and returns the next block that will be pushed into the stream.
pub async fn next_block(current_block: &CurrentBlockWatcher) -> Option<BlockInfo> {
pub async fn next_block(current_block: &CurrentBlockWatcher) -> BlockInfo {
let mut stream = into_stream(current_block.clone());
// the stream always yields the current value right away
// so we simply ignore it
let _ = stream.next().await;
stream.next().await
stream.next().await.expect("block_stream must never end")
}

#[cfg(test)]
Expand Down Expand Up @@ -448,6 +448,6 @@ mod tests {
});

let received_block = timeout(2 * TIMEOUT, next_block(&receiver)).await;
assert_eq!(received_block, Ok(Some(new_block(1))));
assert_eq!(received_block, Ok(new_block(1)));
}
}

0 comments on commit a7ba48e

Please sign in to comment.