From bb5a241789576c33e9d1307e296be11ce6af5b14 Mon Sep 17 00:00:00 2001 From: Martin Beckmann Date: Thu, 7 Dec 2023 16:58:28 +0100 Subject: [PATCH] Speed up token meta data look up (#2134) # Description We've noticed weird delays between the autopilot sending an auction and the driver being ready to start working on it. The linked issue suggests that we spend a lot of time converting the auction and enriching it with token meta data. This should be super fast because all the data should be cached at all times. However, was one token that caused it all to break down: `0xeeeee...` (the address we use to denote the chains native token) This is roughly what happened: 1. every driver receives auction 2. every driver collects all the addresses for which we don't have cached metadata 3. this always yielded `0xeeee...` (it's not a real token so we never get data when calling ERC20 functions on it) 4. every driver wasted some time sending RPC requests for `0xeeee` 5. every driver tried to take an exclusive lock to write no data (empty vec) to the cache Overall this wasted time with unnecessary network requests and serially taking an exclusive lock for all drivers. # Changes * most importantly filter out `0xeee` * avoid sending duplicate RPC requests by using `RequestSharing` * don't take an exclusive lock if some other driver cached the same data in the meantime * moved logging of token balance update task out of the critical section All changes combined results in us now spending a couple microseconds on getting cached balances instead of seconds. And because this was the bulk of the work captured by the log `auction task execution time` we reduced that time to ~11ms. ## How to test e2e tests should still pass and manual test confirmed the reduced latency. Fixes #2133 --- .../driver/src/infra/api/routes/solve/mod.rs | 2 +- crates/driver/src/infra/tokens.rs | 132 ++++++++++++------ 2 files changed, 88 insertions(+), 46 deletions(-) diff --git a/crates/driver/src/infra/api/routes/solve/mod.rs b/crates/driver/src/infra/api/routes/solve/mod.rs index 997995f5cb..07fc1916fa 100644 --- a/crates/driver/src/infra/api/routes/solve/mod.rs +++ b/crates/driver/src/infra/api/routes/solve/mod.rs @@ -30,7 +30,7 @@ async fn route( .tap_err(|err| { observe::invalid_dto(err, "auction"); })?; - tracing::debug!(elapsed=?start.elapsed(), auction_id=%auction_id, "auction task execution time"); + tracing::debug!(elapsed = ?start.elapsed(), "auction task execution time"); let auction = state.pre_processor().prioritize(auction).await; let competition = state.competition(); let result = competition.solve(&auction).await; diff --git a/crates/driver/src/infra/tokens.rs b/crates/driver/src/infra/tokens.rs index f255943bf6..946c7319b6 100644 --- a/crates/driver/src/infra/tokens.rs +++ b/crates/driver/src/infra/tokens.rs @@ -3,10 +3,14 @@ use { domain::eth, infra::{blockchain, Ethereum}, }, + anyhow::Result, ethrpc::current_block::{self, CurrentBlockStream}, - futures::StreamExt, + futures::{FutureExt, StreamExt}, + itertools::Itertools, + model::order::BUY_ETH_ADDRESS, + shared::request_sharing::BoxRequestSharing, std::{ - collections::{HashMap, HashSet}, + collections::HashMap, sync::{Arc, RwLock}, }, tracing::Instrument, @@ -29,6 +33,7 @@ impl Fetcher { let inner = Arc::new(Inner { eth, cache: RwLock::new(HashMap::new()), + requests: BoxRequestSharing::labelled("token_info".into()), }); tokio::task::spawn( update_task(block_stream, Arc::downgrade(&inner)) @@ -70,10 +75,6 @@ async fn update_balances(inner: Arc) -> Result<(), blockchain::Error> { let futures = { let cache = inner.cache.read().unwrap(); let tokens = cache.keys().cloned().collect::>(); - tracing::debug!( - tokens = tokens.len(), - "updating settlement contract balances" - ); tokens.into_iter().map(|token| { let erc20 = inner.eth.erc20(token); async move { @@ -85,6 +86,11 @@ async fn update_balances(inner: Arc) -> Result<(), blockchain::Error> { }) }; + tracing::debug!( + tokens = futures.len(), + "updating settlement contract balances" + ); + // Don't hold on to the lock while fetching balances to allow concurrent // updates. This may lead to new entries arriving in the meantime, however // their balances should already be up-to-date. @@ -93,14 +99,22 @@ async fn update_balances(inner: Arc) -> Result<(), blockchain::Error> { .into_iter() .collect::>(); - let mut cache = inner.cache.write().unwrap(); - for (key, entry) in cache.iter_mut() { - if let Some(balance) = balances.remove(key) { - entry.balance = balance; - } else { - tracing::info!(?key, "key without balance update"); + let mut keys_without_balances = vec![]; + { + let mut cache = inner.cache.write().unwrap(); + for (key, entry) in cache.iter_mut() { + if let Some(balance) = balances.remove(key) { + entry.balance = balance; + } else { + // Avoid logging while holding the exclusive lock. + keys_without_balances.push(*key); + } } } + if !keys_without_balances.is_empty() { + tracing::info!(keys = ?keys_without_balances, "updated keys without balance"); + } + Ok(()) } @@ -108,60 +122,88 @@ async fn update_balances(inner: Arc) -> Result<(), blockchain::Error> { struct Inner { eth: Ethereum, cache: RwLock>, + requests: BoxRequestSharing>, } impl Inner { /// Fetches `Metadata` of the requested tokens from a node. async fn fetch_token_infos( &self, - tokens: HashSet, - ) -> Vec> { + tokens: &[eth::TokenAddress], + ) -> Vec> { let settlement = self.eth.contracts().settlement().address().into(); - let futures = tokens.into_iter().map(|token| async move { - let token = self.eth.erc20(token); - // Use `try_join` because these calls get batched under the hood - // so if one of them fails the others will as well. - // Also this way we won't get incomplete data for a token. - let (decimals, symbol, balance) = futures::future::try_join3( - token.decimals(), - token.symbol(), - token.balance(settlement), - ) - .await?; - Ok(( - token.address(), - Metadata { - decimals, - symbol, - balance, - }, - )) + let futures = tokens.iter().map(|token| { + let build_request = |token: ð::TokenAddress| { + let token = self.eth.erc20(*token); + async move { + // Use `try_join` because these calls get batched under the hood + // so if one of them fails the others will as well. + // Also this way we won't get incomplete data for a token. + let (decimals, symbol, balance) = futures::future::try_join3( + token.decimals(), + token.symbol(), + token.balance(settlement), + ) + .await + .ok()?; + + Some(( + token.address(), + Metadata { + decimals, + symbol, + balance, + }, + )) + } + .boxed() + }; + + self.requests.shared_or_else(*token, build_request) }); futures::future::join_all(futures).await } + /// Ensures that all the missing tokens are in the cache afterwards while + /// taking into account that the function might be called multiple times + /// for the same tokens. + async fn cache_missing_tokens(&self, tokens: &[eth::TokenAddress]) { + if tokens.is_empty() { + return; + } + + let fetched = self.fetch_token_infos(tokens).await; + { + let cache = self.cache.read().unwrap(); + if tokens.iter().all(|token| cache.contains_key(token)) { + // Often multiple callers are racing to fetch the same Metadata. + // If somebody else already cached the data we don't want to take an + // exclusive lock for nothing. + return; + } + } + self.cache + .write() + .unwrap() + .extend(fetched.into_iter().flatten()); + } + async fn get(&self, addresses: &[eth::TokenAddress]) -> HashMap { - let to_fetch: HashSet<_> = { + let to_fetch: Vec<_> = { let cache = self.cache.read().unwrap(); // Compute set of requested addresses that are not in cache. addresses .iter() - .filter(|address| !cache.contains_key(*address)) + // BUY_ETH_ADDRESS is just a marker and not a real address. We'll never be able to + // fetch data for it so ignore it to avoid taking exclusive locks all the time. + .filter(|address| !cache.contains_key(*address) && address.0.0 != BUY_ETH_ADDRESS) .cloned() + .unique() .collect() }; - // Fetch token infos not yet in cache. - if !to_fetch.is_empty() { - let fetched = self.fetch_token_infos(to_fetch).await; - - // Add valid token infos to cache. - self.cache - .write() - .unwrap() - .extend(fetched.into_iter().flatten()); - }; + self.cache_missing_tokens(&to_fetch).await; let cache = self.cache.read().unwrap(); // Return token infos from the cache.