Skip to content

Commit

Permalink
Fix janus_gc_deleted_batches_total (#3160)
Browse files Browse the repository at this point in the history
* Add batch_aggregations rows to make test fail

* GC: Fix deleted batch counter updates
  • Loading branch information
divergentdave authored May 23, 2024
1 parent fc0b52e commit d39af88
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 16 deletions.
36 changes: 20 additions & 16 deletions aggregator_core/src/datastore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4854,25 +4854,29 @@ deleted_aggregate_share_jobs AS (
AND (LOWER(batch_interval) < $2
OR (aggregate_share_jobs.batch_identifier = batches_to_delete.batch_identifier
AND aggregate_share_jobs.aggregation_param = batches_to_delete.aggregation_param))
),
deleted_batch_aggregations AS (
DELETE FROM batch_aggregations
USING batches_to_delete
WHERE task_id = $1
AND batch_aggregations.batch_identifier = batches_to_delete.batch_identifier
AND batch_aggregations.aggregation_param = batches_to_delete.aggregation_param
)
DELETE FROM batch_aggregations
USING batches_to_delete
WHERE task_id = $1
AND batch_aggregations.batch_identifier = batches_to_delete.batch_identifier
AND batch_aggregations.aggregation_param = batches_to_delete.aggregation_param",
SELECT COUNT(1) AS batch_count FROM batches_to_delete",
)
.await?;
self.execute(
&stmt,
&[
/* task_id */ &task_info.pkey,
/* threshold */
&task_info.report_expiry_threshold(&self.clock.now().as_naive_date_time()?)?,
/* limit */ &i64::try_from(limit)?,
],
)
.await
.map_err(Into::into)
let row = self
.query_one(
&stmt,
&[
/* task_id */ &task_info.pkey,
/* threshold */
&task_info.report_expiry_threshold(&self.clock.now().as_naive_date_time()?)?,
/* limit */ &i64::try_from(limit)?,
],
)
.await?;
row.get_bigint_and_convert("batch_count")
}

/// Retrieve all global HPKE keypairs.
Expand Down
17 changes: 17 additions & 0 deletions aggregator_core/src/datastore/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6125,6 +6125,23 @@ async fn delete_expired_collection_artifacts(ephemeral_datastore: EphemeralDatas
},
);
tx.put_batch_aggregation(&batch_aggregation).await.unwrap();
for ord in 1..8 {
let batch_aggregation = BatchAggregation::<0, Q, dummy::Vdaf>::new(
*task.id(),
batch_identifier.clone(),
dummy::AggregationParam(0),
ord,
client_timestamp_interval,
BatchAggregationState::Aggregating {
aggregate_share: None,
report_count: 0,
checksum: ReportIdChecksum::default(),
aggregation_jobs_created: 0,
aggregation_jobs_terminated: 0,
},
);
tx.put_batch_aggregation(&batch_aggregation).await.unwrap();
}

if task.role() == &Role::Leader {
let collection_job = CollectionJob::<0, Q, dummy::Vdaf>::new(
Expand Down

0 comments on commit d39af88

Please sign in to comment.