diff --git a/aggregator_core/src/datastore.rs b/aggregator_core/src/datastore.rs index 9197fd5da..545f3a83c 100644 --- a/aggregator_core/src/datastore.rs +++ b/aggregator_core/src/datastore.rs @@ -1191,14 +1191,23 @@ impl Transaction<'_, C> { A::InputShare: PartialEq, A::PublicShare: PartialEq, { + // Fetch the task's primary key and calculate the report timestamp threshold in a separate + // query. This will allow the query planner to make more accurate row count estimates, by + // comparing concrete values to the statistics of the `client_reports` table, and help the + // query planner reason that there will be only one task_id, so it can satisfy the `ORDER BY + // client_timestamp DESC` clause by using a reverse index scan, without an intermediate + // sort. + let (id, threshold) = self + .get_task_primary_key_and_expiry_threshold(task_id) + .await?; + let stmt = self .prepare_cached( "WITH unaggregated_reports AS ( SELECT client_reports.id FROM client_reports - JOIN tasks ON tasks.id = client_reports.task_id - WHERE tasks.task_id = $1 + WHERE client_reports.task_id = $1 AND client_reports.aggregation_started = FALSE - AND client_reports.client_timestamp >= COALESCE($2::TIMESTAMP - tasks.report_expiry_age * '1 second'::INTERVAL, '-infinity'::TIMESTAMP) + AND client_reports.client_timestamp >= $2 ORDER BY client_timestamp DESC FOR UPDATE OF client_reports SKIP LOCKED LIMIT $5::BIGINT @@ -1213,8 +1222,8 @@ impl Transaction<'_, C> { .query( &stmt, &[ - /* task_id */ &task_id.as_ref(), - /* now */ &self.clock.now().as_naive_date_time()?, + /* task_id */ &id, + /* threshold */ &threshold, /* updated_at */ &self.clock.now().as_naive_date_time()?, /* updated_by */ &self.name, /* limit */ &i64::try_from(limit)?, @@ -4323,30 +4332,11 @@ impl Transaction<'_, C> { // client_timestamp column's histogram, and make an accurate row count estimate. If the // threshold is determined in a single query via a join, the query planner is not able to // predict the task's report_expiry_age, and the accuracy of the row count estimate suffers. - let stmt_1 = self - .prepare_cached( - "SELECT - id, - COALESCE( - $2::TIMESTAMP - tasks.report_expiry_age * '1 second'::INTERVAL, - '-infinity'::TIMESTAMP - ) AS threshold - FROM tasks WHERE tasks.task_id = $1", - ) + let (id, threshold) = self + .get_task_primary_key_and_expiry_threshold(task_id) .await?; - let row = self - .query_one( - &stmt_1, - &[ - /* task_id */ &task_id.get_encoded(), - /* now */ &self.clock.now().as_naive_date_time()?, - ], - ) - .await?; - let id = row.get::<_, i64>("id"); - let threshold = row.get::<_, Timestamp>("threshold"); - let stmt_2 = self + let stmt = self .prepare_cached( "WITH client_reports_to_delete AS ( SELECT client_reports.id FROM client_reports @@ -4360,7 +4350,7 @@ impl Transaction<'_, C> { ) .await?; self.execute( - &stmt_2, + &stmt, &[ /* id */ &id, /* threshold */ &threshold, @@ -4371,6 +4361,37 @@ impl Transaction<'_, C> { .map_err(Into::into) } + /// Helper function to look up a task's primary key, and compute a garbage collection visibility + /// threshold timestamp from its report expiry duration. + async fn get_task_primary_key_and_expiry_threshold( + &self, + task_id: &TaskId, + ) -> Result<(i64, Timestamp), Error> { + let stmt = self + .prepare_cached( + "SELECT + id, + COALESCE( + $2::TIMESTAMP - tasks.report_expiry_age * '1 second'::INTERVAL, + '-infinity'::TIMESTAMP + ) AS threshold + FROM tasks WHERE tasks.task_id = $1", + ) + .await?; + let row = self + .query_one( + &stmt, + &[ + /* task_id */ &task_id.get_encoded(), + /* now */ &self.clock.now().as_naive_date_time()?, + ], + ) + .await?; + let id = row.get::<_, i64>("id"); + let threshold = row.get::<_, Timestamp>("threshold"); + Ok((id, threshold)) + } + /// Deletes old aggregation artifacts (aggregation jobs/report aggregations) for a given task, /// that is, aggregation artifacts for which the aggregation job's maximum client timestamp is /// older than the task's report expiry age. Up to `limit` aggregation jobs will be deleted,