Skip to content

Commit

Permalink
Return max batch size from trait
Browse files Browse the repository at this point in the history
  • Loading branch information
MartinquaXD committed Aug 14, 2024
1 parent ffe855b commit 32acb70
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 38 deletions.
75 changes: 37 additions & 38 deletions crates/shared/src/price_estimation/buffered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@ pub struct Configuration {
///
/// Specifying `None` means no limit on concurrency.
pub max_concurrent_requests: Option<NonZeroUsize>,
/// The maximum batch size.
pub max_batch_len: usize,
/// An additional minimum delay to wait for collecting requests.
///
/// The delay to start counting after receiving the first request.
Expand Down Expand Up @@ -61,6 +59,9 @@ pub trait NativePriceBatchFetching: Sync + Send + NativePriceEstimating {
>
where
'b: 'a;

/// Returns the number of prices that can be fetched in a single batch.
fn max_batch_size(&self) -> usize;
}

/// Buffered implementation that implements automatic batching of
Expand Down Expand Up @@ -160,38 +161,43 @@ where
requests: mpsc::UnboundedReceiver<H160>,
results_sender: broadcast::Sender<NativePriceResult>,
) -> JoinHandle<()> {
tokio::task::spawn(batched_for_each(config, requests, move |batch| {
let inner = inner.clone();
let results_sender = results_sender.clone();
async move {
if batch.is_empty() {
return;
}
let batch = batch.into_iter().collect::<HashSet<_>>();
let results: Vec<_> = match inner.fetch_native_prices(&batch).await {
Ok(results) => results
.into_iter()
.map(|(token, price)| NativePriceResult {
token,
result: price,
})
.collect(),
Err(err) => {
tracing::error!(?err, "failed to send native price batch request");
batch
tokio::task::spawn(batched_for_each(
config,
requests,
inner.max_batch_size(),
move |batch| {
let inner = inner.clone();
let results_sender = results_sender.clone();
async move {
if batch.is_empty() {
return;
}
let batch = batch.into_iter().collect::<HashSet<_>>();
let results: Vec<_> = match inner.fetch_native_prices(&batch).await {
Ok(results) => results
.into_iter()
.map(|token| NativePriceResult {
.map(|(token, price)| NativePriceResult {
token,
result: Err(err.clone()),
result: price,
})
.collect()
.collect(),
Err(err) => {
tracing::error!(?err, "failed to send native price batch request");
batch
.into_iter()
.map(|token| NativePriceResult {
token,
result: Err(err.clone()),
})
.collect()
}
};
for result in results {
let _ = results_sender.send(result);
}
};
for result in results {
let _ = results_sender.send(result);
}
}
}))
},
))
}
}

Expand All @@ -204,6 +210,7 @@ where
fn batched_for_each<T, St, F, Fut>(
config: Configuration,
items: St,
max_batch_size: usize,
work: F,
) -> impl Future<Output = ()>
where
Expand All @@ -222,7 +229,7 @@ where
// Append new elements to the bulk until reaching either of the scenarios:
// - reach maximum number of elements per batch (`max_batch_len)
// - we reach the `debouncing_time`
while chunk.len() < config.max_batch_len {
while chunk.len() < max_batch_size {
futures::select_biased! {
item = items.next() => match item {
Some(item) => chunk.push(item),
Expand Down Expand Up @@ -292,7 +299,6 @@ mod tests {
});
let config = Configuration {
max_concurrent_requests: NonZeroUsize::new(1),
max_batch_len: 20,
debouncing_time: Duration::from_millis(50),
result_ready_timeout: Duration::from_millis(500),
broadcast_channel_capacity: 50,
Expand All @@ -319,7 +325,6 @@ mod tests {
});
let config = Configuration {
max_concurrent_requests: NonZeroUsize::new(1),
max_batch_len: 20,
debouncing_time: Duration::from_millis(50),
result_ready_timeout: Duration::from_millis(500),
broadcast_channel_capacity: 50,
Expand All @@ -345,7 +350,6 @@ mod tests {

let config = Configuration {
max_concurrent_requests: NonZeroUsize::new(1),
max_batch_len: 20,
debouncing_time: Duration::from_millis(50),
result_ready_timeout: Duration::from_millis(500),
broadcast_channel_capacity: 50,
Expand Down Expand Up @@ -402,7 +406,6 @@ mod tests {

let config = Configuration {
max_concurrent_requests: NonZeroUsize::new(1),
max_batch_len: 20,
debouncing_time: Duration::from_millis(50),
result_ready_timeout: Duration::from_millis(500),
broadcast_channel_capacity: 50,
Expand Down Expand Up @@ -440,7 +443,6 @@ mod tests {

let config = Configuration {
max_concurrent_requests: NonZeroUsize::new(1),
max_batch_len: 20,
debouncing_time: Duration::from_millis(50),
result_ready_timeout: Duration::from_millis(500),
broadcast_channel_capacity: 50,
Expand Down Expand Up @@ -490,7 +492,6 @@ mod tests {

let config = Configuration {
max_concurrent_requests: NonZeroUsize::new(2),
max_batch_len: 20,
debouncing_time: Duration::from_millis(50),
result_ready_timeout: Duration::from_millis(500),
broadcast_channel_capacity: 50,
Expand All @@ -513,7 +514,6 @@ mod tests {
.never();
let config = Configuration {
max_concurrent_requests: NonZeroUsize::new(2),
max_batch_len: 20,
debouncing_time: Duration::from_millis(50),
result_ready_timeout: Duration::from_millis(500),
broadcast_channel_capacity: 50,
Expand Down Expand Up @@ -545,7 +545,6 @@ mod tests {

let config = Configuration {
max_concurrent_requests: NonZeroUsize::new(2),
max_batch_len: 20,
debouncing_time: Duration::from_millis(10),
result_ready_timeout: Duration::from_millis(500),
broadcast_channel_capacity: 50,
Expand Down
13 changes: 13 additions & 0 deletions crates/shared/src/price_estimation/native/coingecko.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,19 @@ impl NativePriceBatchFetching for CoinGecko {
}
.boxed()
}

fn max_batch_size(&self) -> usize {
/// maximum number of price the coingecko API returns in a single batch
const MAX_BATCH_SIZE: usize = 20;

match self.quote_token {
QuoteToken::Eth => MAX_BATCH_SIZE,
// when fetching price denominated in a custom token we need to
// fetch the price for that token in addition to the requested
// tokens so we reserve 1 spot in the batch
QuoteToken::Other(_) => MAX_BATCH_SIZE - 1,
}
}
}

impl NativePriceEstimating for CoinGecko {
Expand Down

0 comments on commit 32acb70

Please sign in to comment.