From d39af8866813dc6b5c92e32e2b13f49dc2d1aa3f Mon Sep 17 00:00:00 2001 From: David Cook Date: Thu, 23 May 2024 13:46:35 -0500 Subject: [PATCH] Fix `janus_gc_deleted_batches_total` (#3160) * Add batch_aggregations rows to make test fail * GC: Fix deleted batch counter updates --- aggregator_core/src/datastore.rs | 36 ++++++++++++++------------ aggregator_core/src/datastore/tests.rs | 17 ++++++++++++ 2 files changed, 37 insertions(+), 16 deletions(-) diff --git a/aggregator_core/src/datastore.rs b/aggregator_core/src/datastore.rs index c280530dd..624ba5227 100644 --- a/aggregator_core/src/datastore.rs +++ b/aggregator_core/src/datastore.rs @@ -4854,25 +4854,29 @@ deleted_aggregate_share_jobs AS ( AND (LOWER(batch_interval) < $2 OR (aggregate_share_jobs.batch_identifier = batches_to_delete.batch_identifier AND aggregate_share_jobs.aggregation_param = batches_to_delete.aggregation_param)) +), +deleted_batch_aggregations AS ( + DELETE FROM batch_aggregations + USING batches_to_delete + WHERE task_id = $1 + AND batch_aggregations.batch_identifier = batches_to_delete.batch_identifier + AND batch_aggregations.aggregation_param = batches_to_delete.aggregation_param ) -DELETE FROM batch_aggregations -USING batches_to_delete -WHERE task_id = $1 - AND batch_aggregations.batch_identifier = batches_to_delete.batch_identifier - AND batch_aggregations.aggregation_param = batches_to_delete.aggregation_param", +SELECT COUNT(1) AS batch_count FROM batches_to_delete", ) .await?; - self.execute( - &stmt, - &[ - /* task_id */ &task_info.pkey, - /* threshold */ - &task_info.report_expiry_threshold(&self.clock.now().as_naive_date_time()?)?, - /* limit */ &i64::try_from(limit)?, - ], - ) - .await - .map_err(Into::into) + let row = self + .query_one( + &stmt, + &[ + /* task_id */ &task_info.pkey, + /* threshold */ + &task_info.report_expiry_threshold(&self.clock.now().as_naive_date_time()?)?, + /* limit */ &i64::try_from(limit)?, + ], + ) + .await?; + row.get_bigint_and_convert("batch_count") } /// Retrieve all global HPKE keypairs. diff --git a/aggregator_core/src/datastore/tests.rs b/aggregator_core/src/datastore/tests.rs index 4ade1fd2d..7d0a02149 100644 --- a/aggregator_core/src/datastore/tests.rs +++ b/aggregator_core/src/datastore/tests.rs @@ -6125,6 +6125,23 @@ async fn delete_expired_collection_artifacts(ephemeral_datastore: EphemeralDatas }, ); tx.put_batch_aggregation(&batch_aggregation).await.unwrap(); + for ord in 1..8 { + let batch_aggregation = BatchAggregation::<0, Q, dummy::Vdaf>::new( + *task.id(), + batch_identifier.clone(), + dummy::AggregationParam(0), + ord, + client_timestamp_interval, + BatchAggregationState::Aggregating { + aggregate_share: None, + report_count: 0, + checksum: ReportIdChecksum::default(), + aggregation_jobs_created: 0, + aggregation_jobs_terminated: 0, + }, + ); + tx.put_batch_aggregation(&batch_aggregation).await.unwrap(); + } if task.role() == &Role::Leader { let collection_job = CollectionJob::<0, Q, dummy::Vdaf>::new(