Skip to content

Commit

Permalink
[EASY] Parallelize post processing in run loop (#3013)
Browse files Browse the repository at this point in the history
# Description
When we briefly stored all fee policies for a given auction in the post
processing it increased the time for that critical section quite
significantly.
That change was removed since then but the learning still stands that
these operations might as well happen in parallel.

# Changes
Run the 3 DB inserts futures of the run loop post processing in
parallel.

## How to test
It's a very small change so the e2e tests should be sufficient
  • Loading branch information
MartinquaXD authored Sep 25, 2024
1 parent 440d12f commit 399903a
Showing 1 changed file with 16 additions and 30 deletions.
46 changes: 16 additions & 30 deletions crates/autopilot/src/run_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ use {
solvable_orders::SolvableOrdersCache,
},
::observe::metrics,
anyhow::{Context, Result},
anyhow::Result,
database::order_events::OrderEventLabel,
ethcontract::U256,
ethrpc::block_stream::BlockInfo,
futures::future::BoxFuture,
futures::{future::BoxFuture, TryFutureExt},
itertools::Itertools,
model::solver_competition::{
CompetitionAuction,
Expand Down Expand Up @@ -458,28 +458,21 @@ impl RunLoop {
};

tracing::trace!(?competition, "saving competition");
self.persistence
.save_competition(&competition)
.await
.context("failed to save competition")?;

self.persistence
.save_surplus_capturing_jit_orders_orders(
auction_id,
&auction.surplus_capturing_jit_order_owners,
)
.await
.context("failed to save surplus capturing jit order owners")?;
futures::try_join!(
self.persistence
.save_competition(&competition)
.map_err(|e| e.0.context("failed to save competition")),
self.persistence
.save_surplus_capturing_jit_orders_orders(
auction_id,
&auction.surplus_capturing_jit_order_owners,
)
.map_err(|e| e.0.context("failed to save jit order owners")),
self.persistence
.store_fee_policies(auction_id, fee_policies)
.map_err(|e| e.context("failed to fee_policies")),
)?;

tracing::info!("saving fee policies");
if let Err(err) = self
.persistence
.store_fee_policies(auction_id, fee_policies)
.await
{
Metrics::fee_policies_store_error();
tracing::warn!(?err, "failed to save fee policies");
}
Metrics::post_processed(start.elapsed());
Ok(())
}
Expand Down Expand Up @@ -1013,13 +1006,6 @@ impl Metrics {
.inc_by(unsettled.len() as u64);
}

fn fee_policies_store_error() {
Self::get()
.db_metric_error
.with_label_values(&["fee_policies_store"])
.inc();
}

fn post_processed(elapsed: Duration) {
Self::get()
.auction_postprocessing_time
Expand Down

0 comments on commit 399903a

Please sign in to comment.