Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Run loop additional metrics #2888

Merged
merged 14 commits into from
Aug 16, 2024
115 changes: 89 additions & 26 deletions crates/autopilot/src/run_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use {
std::{
collections::{BTreeMap, HashSet},
sync::Arc,
time::{Duration, Instant},
time::{Duration, Instant, SystemTime},
},
tokio::sync::Mutex,
tracing::Instrument,
Expand Down Expand Up @@ -66,14 +66,18 @@ impl RunLoop {
let mut last_auction = None;
let mut last_block = None;
loop {
if let RunLoopMode::SyncToBlockchain = self.synchronization {
let block = ethrpc::block_stream::next_block(self.eth.current_block()).await;
if let Err(err) = self.solvable_orders_cache.update(block.number).await {
tracing::error!(?err, "failed to build a new auction");
let init_block_timestamp = {
if let RunLoopMode::SyncToBlockchain = self.synchronization {
let block = ethrpc::block_stream::next_block(self.eth.current_block()).await;
if let Err(err) = self.solvable_orders_cache.update(block.number).await {
tracing::error!(?err, "failed to build a new auction");
}
block.timestamp
} else {
tokio::time::sleep(Duration::from_secs(1)).await;
self.eth.current_block().borrow().timestamp
}
} else {
tokio::time::sleep(Duration::from_secs(1)).await;
}
};

if let Some(domain::AuctionWithId { id, auction }) = self.next_auction().await {
let current_block = self.eth.current_block().borrow().hash;
Expand All @@ -85,7 +89,7 @@ impl RunLoop {
observe::log_auction_delta(id, &previous, &auction);
self.liveness.auction();

self.single_run(id, &auction)
self.single_run(id, &auction, init_block_timestamp)
.instrument(tracing::info_span!("auction", id))
.await;
}
Expand Down Expand Up @@ -127,10 +131,18 @@ impl RunLoop {
Some(domain::AuctionWithId { id, auction })
}

async fn single_run(&self, auction_id: domain::auction::Id, auction: &domain::Auction) {
async fn single_run(
&self,
auction_id: domain::auction::Id,
auction: &domain::Auction,
init_block_timestamp: u64,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: If this variable is only passed in to trigger the metric, we could probably also simply do it where we call single_run

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To fetch the latest block timestamp or what exactly? That would work as long as the auction update function takes less than 1 round. For arbitrum, this is not the case.

) {
Metrics::single_run_started(init_block_timestamp);
let single_run_start = Instant::now();
tracing::info!(?auction_id, "solving");

let auction = self.remove_in_flight_orders(auction.clone()).await;
Metrics::pre_processed(single_run_start.elapsed());

let solutions = {
let mut solutions = self.competition(auction_id, &auction).await;
Expand Down Expand Up @@ -158,18 +170,20 @@ impl RunLoop {
if let Some(Participant { driver, solution }) = solutions.last() {
tracing::info!(driver = %driver.name, solution = %solution.id(), "winner");

let reveal_start = Instant::now();
let revealed = match self.reveal(driver, auction_id, solution.id()).await {
Ok(result) => {
Metrics::reveal_ok(driver);
Metrics::reveal_ok(driver, reveal_start.elapsed());
result
}
Err(err) => {
Metrics::reveal_err(driver, &err);
Metrics::reveal_err(driver, reveal_start.elapsed(), &err);
tracing::warn!(driver = %driver.name, ?err, "failed to reveal winning solution");
return;
}
};

let post_processing_start = Instant::now();
let winner = solution.solver().into();
let winning_score = solution.score().get().0;
let reference_score = solutions
Expand Down Expand Up @@ -316,6 +330,7 @@ impl RunLoop {
Metrics::fee_policies_store_error();
tracing::warn!(?err, "failed to save fee policies");
}
Metrics::post_processed(post_processing_start.elapsed());

tracing::info!(driver = %driver.name, "settling");
let submission_start = Instant::now();
Expand All @@ -325,7 +340,7 @@ impl RunLoop {
{
Ok(()) => Metrics::settle_ok(driver, submission_start.elapsed()),
Err(err) => {
Metrics::settle_err(driver, &err, submission_start.elapsed());
Metrics::settle_err(driver, submission_start.elapsed(), &err);
tracing::warn!(?err, driver = %driver.name, "settlement failed");
}
}
Expand All @@ -342,6 +357,7 @@ impl RunLoop {
.filter(|uid| auction_uids.contains(uid))
.collect();
Metrics::matched_unsettled(driver, unsettled_orders);
Metrics::single_run_completed(single_run_start.elapsed());
}
}

Expand Down Expand Up @@ -647,11 +663,14 @@ struct Metrics {

/// Tracks the result of driver `/reveal` requests.
#[metric(labels("driver", "result"))]
reveal: prometheus::IntCounterVec,
reveal: prometheus::HistogramVec,

/// Tracks the times and results of driver `/settle` requests.
#[metric(labels("driver", "result"))]
settle_time: prometheus::IntCounterVec,
#[metric(
labels("driver", "result"),
buckets(1, 3, 5, 7, 9, 11, 13, 15, 17, 19, 20, 25, 30, 40)
)]
settle: prometheus::HistogramVec,

/// Tracks the number of orders that were part of some but not the winning
/// solution together with the winning driver that did't include it.
Expand All @@ -661,6 +680,22 @@ struct Metrics {
/// Tracks the number of database errors.
#[metric(labels("error_type"))]
db_metric_error: prometheus::IntCounterVec,

/// Tracks the time spent in post-processing after the auction has been
/// solved and before sending a `settle` request.
auction_postprocessing_time: prometheus::Histogram,
squadgazzz marked this conversation as resolved.
Show resolved Hide resolved

/// Tracks the time spent in pre-processing before sending a `solve`
/// request.
auction_preprocessing_time: prometheus::Histogram,

/// Total time spent in a single run of the run loop.
#[metric(buckets(1, 5, 10, 15, 20, 25, 30, 35, 40))]
single_run_time: prometheus::Histogram,

/// Time difference between the current block and when the single run
/// function is started.
single_run_delay: prometheus::Histogram,
squadgazzz marked this conversation as resolved.
Show resolved Hide resolved
}

impl Metrics {
Expand Down Expand Up @@ -710,39 +745,39 @@ impl Metrics {
.inc();
}

fn reveal_ok(driver: &infra::Driver) {
fn reveal_ok(driver: &infra::Driver, elapsed: Duration) {
Self::get()
.reveal
.with_label_values(&[&driver.name, "success"])
.inc();
.observe(elapsed.as_secs_f64());
}

fn reveal_err(driver: &infra::Driver, err: &RevealError) {
fn reveal_err(driver: &infra::Driver, elapsed: Duration, err: &RevealError) {
let label = match err {
RevealError::AuctionMismatch => "mismatch",
RevealError::Failure(_) => "error",
};
Self::get()
.reveal
.with_label_values(&[&driver.name, label])
.inc();
.observe(elapsed.as_secs_f64());
}

fn settle_ok(driver: &infra::Driver, time: Duration) {
fn settle_ok(driver: &infra::Driver, elapsed: Duration) {
Self::get()
.settle_time
.settle
.with_label_values(&[&driver.name, "success"])
.inc_by(time.as_millis().try_into().unwrap_or(u64::MAX));
.observe(elapsed.as_secs_f64());
}

fn settle_err(driver: &infra::Driver, err: &SettleError, time: Duration) {
fn settle_err(driver: &infra::Driver, elapsed: Duration, err: &SettleError) {
let label = match err {
SettleError::Failure(_) => "error",
};
Self::get()
.settle_time
.settle
.with_label_values(&[&driver.name, label])
.inc_by(time.as_millis().try_into().unwrap_or(u64::MAX));
.observe(elapsed.as_secs_f64());
}

fn matched_unsettled(winning: &infra::Driver, unsettled: HashSet<&domain::OrderUid>) {
Expand All @@ -761,6 +796,34 @@ impl Metrics {
.with_label_values(&["fee_policies_store"])
.inc();
}

fn post_processed(elapsed: Duration) {
Self::get()
.auction_postprocessing_time
.observe(elapsed.as_secs_f64());
}

fn pre_processed(elapsed: Duration) {
Self::get()
.auction_preprocessing_time
.observe(elapsed.as_secs_f64());
}

fn single_run_completed(elapsed: Duration) {
Self::get().single_run_time.observe(elapsed.as_secs_f64());
}

fn single_run_started(init_block_timestamp: u64) {
match SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.map(|d| d.as_secs())
{
Ok(now) => Self::get()
.single_run_delay
.observe((now - init_block_timestamp) as f64),
Err(err) => tracing::error!(?err, "failed to get current time"),
}
}
}

pub mod observe {
Expand Down
77 changes: 64 additions & 13 deletions crates/autopilot/src/solvable_orders.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use {
},
number::conversions::u256_to_big_decimal,
primitive_types::{H160, H256, U256},
prometheus::{IntCounter, IntCounterVec, IntGauge, IntGaugeVec},
prometheus::{Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec},
shared::{
account_balances::{BalanceFetching, Query},
bad_token::BadTokenDetecting,
Expand All @@ -43,6 +43,13 @@ pub struct Metrics {
#[metric(labels("result"))]
auction_update: IntCounterVec,

/// Time taken to update the solvable orders cache.
auction_update_total_time: Histogram,

/// Time spent on auction update individual stage.
#[metric( labels("stage"))]
auction_update_stage_time: HistogramVec,

/// Auction creations.
auction_creations: IntCounter,

Expand Down Expand Up @@ -151,28 +158,62 @@ impl SolvableOrdersCache {
/// the case in unit tests, then concurrent calls might overwrite each
/// other's results.
pub async fn update(&self, block: u64) -> Result<()> {
let start = Instant::now();
squadgazzz marked this conversation as resolved.
Show resolved Hide resolved
let min_valid_to = now_in_epoch_seconds() + self.min_order_validity_period.as_secs() as u32;
let db_solvable_orders = self.persistence.solvable_orders(min_valid_to).await?;

let mut counter = OrderFilterCounter::new(self.metrics, &db_solvable_orders.orders);
let mut invalid_order_uids = Vec::new();
let mut filtered_order_events = Vec::new();

let orders = filter_banned_user_orders(db_solvable_orders.orders, &self.banned_users).await;
let removed = counter.checkpoint("banned_user", &orders);
invalid_order_uids.extend(removed);
let orders = {
let _timer = self
squadgazzz marked this conversation as resolved.
Show resolved Hide resolved
.metrics
.auction_update_stage_time
.with_label_values(&["banned_user_filtering"])
.start_timer();
let orders =
filter_banned_user_orders(db_solvable_orders.orders, &self.banned_users).await;
let removed = counter.checkpoint("banned_user", &orders);
invalid_order_uids.extend(removed);
orders
};

let orders =
filter_invalid_signature_orders(orders, self.signature_validator.as_ref()).await;
let removed = counter.checkpoint("invalid_signature", &orders);
invalid_order_uids.extend(removed);
let orders = {
let _timer = self
.metrics
.auction_update_stage_time
.with_label_values(&["invalid_signature_filtering"])
.start_timer();
let orders =
filter_invalid_signature_orders(orders, self.signature_validator.as_ref()).await;
let removed = counter.checkpoint("invalid_signature", &orders);
invalid_order_uids.extend(removed);
orders
};

let orders = filter_unsupported_tokens(orders, self.bad_token_detector.as_ref()).await?;
let removed = counter.checkpoint("unsupported_token", &orders);
invalid_order_uids.extend(removed);
let orders = {
let _timer = self
.metrics
.auction_update_stage_time
.with_label_values(&["unsupported_token_filtering"])
.start_timer();
let orders =
filter_unsupported_tokens(orders, self.bad_token_detector.as_ref()).await?;
let removed = counter.checkpoint("unsupported_token", &orders);
invalid_order_uids.extend(removed);
orders
};

let missing_queries: Vec<_> = orders.iter().map(Query::from_order).collect();
let fetched_balances = self.balance_fetcher.get_balances(&missing_queries).await;
let fetched_balances = {
let _timer = self
.metrics
.auction_update_stage_time
.with_label_values(&["balance_fetch"])
.start_timer();
self.balance_fetcher.get_balances(&missing_queries).await
};
let balances = missing_queries
.into_iter()
.zip(fetched_balances)
Expand Down Expand Up @@ -218,7 +259,14 @@ impl SolvableOrdersCache {
entry.insert(weth_price);
}

let cow_amms = self.cow_amm_registry.amms().await;
let cow_amms = {
let _timer = self
.metrics
.auction_update_stage_time
.with_label_values(&["cow_amm_registry"])
.start_timer();
self.cow_amm_registry.amms().await
};
let cow_amm_tokens = cow_amms
.iter()
.flat_map(|cow_amm| cow_amm.traded_tokens())
Expand Down Expand Up @@ -304,6 +352,9 @@ impl SolvableOrdersCache {
};

tracing::debug!(%block, "updated current auction cache");
self.metrics
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GIven that we know that this takes a significant amount of time we could already add metrics for the individual stages of the auction building.
I assume most of the time will likely be spent on the DB query but there will probably also be outliers in the individual steps that need to be ironed out.

Copy link
Contributor Author

@squadgazzz squadgazzz Aug 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a HistogramVec for individual update stages except solvable order fetching since we already have a separate DB metric for this.

.auction_update_total_time
.observe(start.elapsed().as_secs_f64());
Ok(())
}

Expand Down
Loading