Skip to content

Commit

Permalink
Task aggregator cache: remove redundant reads on concurrent requests. (
Browse files Browse the repository at this point in the history
…#3028)

Previously, if multiple requestors concurrently retrieved the same task
from the task aggregator cache, we would query the DB for the task for
each requestor.

Now, we use an additional layer of per-task locking so that, in the
concurrent-requestor scenario, one requestor will read the task from the
DB, and the remaining tasks will wait on that requestor's value.
  • Loading branch information
branlwyd authored Apr 18, 2024
1 parent 69b5e67 commit 117a807
Showing 1 changed file with 49 additions and 36 deletions.
85 changes: 49 additions & 36 deletions aggregator/src/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ pub struct Aggregator<C: Clock> {
/// Report writer, with support for batching.
report_writer: Arc<ReportWriteBatcher<C>>,
/// Cache of task aggregators.
task_aggregators: Mutex<HashMap<TaskId, Arc<TaskAggregator<C>>>>,
task_aggregators: TaskAggregatorCache<C>,
/// Metrics.
metrics: AggregatorMetrics,

Expand All @@ -151,6 +151,9 @@ pub struct Aggregator<C: Clock> {
peer_aggregators: PeerAggregatorCache,
}

type TaskAggregatorCache<C> =
SyncMutex<HashMap<TaskId, Arc<Mutex<Option<Arc<TaskAggregator<C>>>>>>>;

#[derive(Clone)]
struct AggregatorMetrics {
/// Counter tracking the number of failed decryptions while handling the
Expand Down Expand Up @@ -271,7 +274,7 @@ impl<C: Clock> Aggregator<C> {
clock,
cfg,
report_writer,
task_aggregators: Mutex::new(HashMap::new()),
task_aggregators: SyncMutex::new(HashMap::new()),
metrics: AggregatorMetrics {
upload_decrypt_failure_counter,
upload_decode_failure_counter,
Expand Down Expand Up @@ -676,45 +679,55 @@ impl<C: Clock> Aggregator<C> {
&self,
task_id: &TaskId,
) -> Result<Option<Arc<TaskAggregator<C>>>, Error> {
// TODO(#238): don't cache forever (decide on & implement some cache eviction policy).
// This is important both to avoid ever-growing resource usage, and to allow aggregators to
// TODO(#238): don't cache forever (decide on & implement some cache eviction policy). This
// is important both to avoid ever-growing resource usage, and to allow aggregators to
// notice when a task changes (e.g. due to key rotation).

// Fast path: grab an existing task aggregator if one exists for this task.
{
let task_aggs = self.task_aggregators.lock().await;
if let Some(task_agg) = task_aggs.get(task_id) {
return Ok(Some(Arc::clone(task_agg)));
}
}

// TODO(#1639): not holding the lock while querying means that multiple tokio::tasks could
// enter this section and redundantly query the database. This could be costly at high QPS.
// Step one: grab the existing entry for this task, if one exists. If there is no existing
// entry, write a new (empty) entry.
let cache_entry = {
// Unwrap safety: mutex poisoning.
let mut task_aggs = self.task_aggregators.lock().unwrap();
Arc::clone(
task_aggs
.entry(*task_id)
.or_insert_with(|| Arc::new(Mutex::default())),
)
};

// Slow path: retrieve task, create a task aggregator, store it to the cache, then return it.
let task_opt = self
.datastore
.run_tx("task_aggregator_get_task", |tx| {
let task_id = *task_id;
Box::pin(async move { tx.get_aggregator_task(&task_id).await })
})
.await?;
match task_opt {
Some(task) => {
let task_agg =
Arc::new(TaskAggregator::new(task, Arc::clone(&self.report_writer))?);
{
let mut task_aggs = self.task_aggregators.lock().await;
Ok(Some(Arc::clone(
task_aggs.entry(*task_id).or_insert(task_agg),
)))
}
// Step two: if the entry is empty, fill it via a database query. Concurrent callers
// requesting the same task will contend over this lock while awaiting the result of the DB
// query, ensuring that in the common case only a single query will be made for each task.
let task_aggregator = {
let mut cache_entry = cache_entry.lock().await;
if cache_entry.is_none() {
*cache_entry = self
.datastore
.run_tx("task_aggregator_get_task", |tx| {
let task_id = *task_id;
Box::pin(async move { tx.get_aggregator_task(&task_id).await })
})
.await?
.map(|task| TaskAggregator::new(task, Arc::clone(&self.report_writer)))
.transpose()?
.map(Arc::new);
}
// Avoid caching None, in case a previously non-existent task is provisioned while the
// system is live. Note that for #238, if we're improving this cache to indeed cache
// None, we must provide some mechanism for taskprov tasks to force a cache refresh.
None => Ok(None),
cache_entry.as_ref().map(Arc::clone)
};

// If the task doesn't exist, remove the task entry from the cache to avoid caching a
// negative result. Then return the result.
//
// TODO(#238): once cache eviction is implemented, we can likely remove this step. We only
// need to do this to avoid trivial DoS via a requestor spraying many nonexistent task IDs.
// However, we need to consider the taskprov case, where an aggregator can add a task and
// expect it to be immediately visible.
if task_aggregator.is_none() {
// Unwrap safety: mutex poisoning.
let mut task_aggs = self.task_aggregators.lock().unwrap();
task_aggs.remove(task_id);
}
Ok(task_aggregator)
}

/// Opts in or out of a taskprov task.
Expand Down

0 comments on commit 117a807

Please sign in to comment.