Skip to content

Commit

Permalink
Split database query for unaggregated reports (#2808)
Browse files Browse the repository at this point in the history
  • Loading branch information
divergentdave authored Mar 12, 2024
1 parent ac0e6da commit fdaca31
Showing 1 changed file with 49 additions and 28 deletions.
77 changes: 49 additions & 28 deletions aggregator_core/src/datastore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1191,14 +1191,23 @@ impl<C: Clock> Transaction<'_, C> {
A::InputShare: PartialEq,
A::PublicShare: PartialEq,
{
// Fetch the task's primary key and calculate the report timestamp threshold in a separate
// query. This will allow the query planner to make more accurate row count estimates, by
// comparing concrete values to the statistics of the `client_reports` table, and help the
// query planner reason that there will be only one task_id, so it can satisfy the `ORDER BY
// client_timestamp DESC` clause by using a reverse index scan, without an intermediate
// sort.
let (id, threshold) = self
.get_task_primary_key_and_expiry_threshold(task_id)
.await?;

let stmt = self
.prepare_cached(
"WITH unaggregated_reports AS (
SELECT client_reports.id FROM client_reports
JOIN tasks ON tasks.id = client_reports.task_id
WHERE tasks.task_id = $1
WHERE client_reports.task_id = $1
AND client_reports.aggregation_started = FALSE
AND client_reports.client_timestamp >= COALESCE($2::TIMESTAMP - tasks.report_expiry_age * '1 second'::INTERVAL, '-infinity'::TIMESTAMP)
AND client_reports.client_timestamp >= $2
ORDER BY client_timestamp DESC
FOR UPDATE OF client_reports SKIP LOCKED
LIMIT $5::BIGINT
Expand All @@ -1213,8 +1222,8 @@ impl<C: Clock> Transaction<'_, C> {
.query(
&stmt,
&[
/* task_id */ &task_id.as_ref(),
/* now */ &self.clock.now().as_naive_date_time()?,
/* task_id */ &id,
/* threshold */ &threshold,
/* updated_at */ &self.clock.now().as_naive_date_time()?,
/* updated_by */ &self.name,
/* limit */ &i64::try_from(limit)?,
Expand Down Expand Up @@ -4323,30 +4332,11 @@ impl<C: Clock> Transaction<'_, C> {
// client_timestamp column's histogram, and make an accurate row count estimate. If the
// threshold is determined in a single query via a join, the query planner is not able to
// predict the task's report_expiry_age, and the accuracy of the row count estimate suffers.
let stmt_1 = self
.prepare_cached(
"SELECT
id,
COALESCE(
$2::TIMESTAMP - tasks.report_expiry_age * '1 second'::INTERVAL,
'-infinity'::TIMESTAMP
) AS threshold
FROM tasks WHERE tasks.task_id = $1",
)
let (id, threshold) = self
.get_task_primary_key_and_expiry_threshold(task_id)
.await?;
let row = self
.query_one(
&stmt_1,
&[
/* task_id */ &task_id.get_encoded(),
/* now */ &self.clock.now().as_naive_date_time()?,
],
)
.await?;
let id = row.get::<_, i64>("id");
let threshold = row.get::<_, Timestamp<NaiveDateTime>>("threshold");

let stmt_2 = self
let stmt = self
.prepare_cached(
"WITH client_reports_to_delete AS (
SELECT client_reports.id FROM client_reports
Expand All @@ -4360,7 +4350,7 @@ impl<C: Clock> Transaction<'_, C> {
)
.await?;
self.execute(
&stmt_2,
&stmt,
&[
/* id */ &id,
/* threshold */ &threshold,
Expand All @@ -4371,6 +4361,37 @@ impl<C: Clock> Transaction<'_, C> {
.map_err(Into::into)
}

/// Helper function to look up a task's primary key, and compute a garbage collection visibility
/// threshold timestamp from its report expiry duration.
async fn get_task_primary_key_and_expiry_threshold(
&self,
task_id: &TaskId,
) -> Result<(i64, Timestamp<NaiveDateTime>), Error> {
let stmt = self
.prepare_cached(
"SELECT
id,
COALESCE(
$2::TIMESTAMP - tasks.report_expiry_age * '1 second'::INTERVAL,
'-infinity'::TIMESTAMP
) AS threshold
FROM tasks WHERE tasks.task_id = $1",
)
.await?;
let row = self
.query_one(
&stmt,
&[
/* task_id */ &task_id.get_encoded(),
/* now */ &self.clock.now().as_naive_date_time()?,
],
)
.await?;
let id = row.get::<_, i64>("id");
let threshold = row.get::<_, Timestamp<NaiveDateTime>>("threshold");
Ok((id, threshold))
}

/// Deletes old aggregation artifacts (aggregation jobs/report aggregations) for a given task,
/// that is, aggregation artifacts for which the aggregation job's maximum client timestamp is
/// older than the task's report expiry age. Up to `limit` aggregation jobs will be deleted,
Expand Down

0 comments on commit fdaca31

Please sign in to comment.