Skip to content

Commit

Permalink
Check existing collect jobs before resolving query (#1896)
Browse files Browse the repository at this point in the history
  • Loading branch information
divergentdave authored Sep 7, 2023
1 parent 577888c commit c8dd494
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 22 deletions.
44 changes: 22 additions & 22 deletions aggregator/src/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2153,28 +2153,6 @@ impl VdafOps {
Arc::clone(&aggregation_param),
);
Box::pin(async move {
let collection_identifier =
Q::collection_identifier_for_query(tx, &task, req.query())
.await?
.ok_or_else(|| {
datastore::Error::User(
Error::BatchInvalid(
*task.id(),
"no batch ready for collection".to_string(),
)
.into(),
)
})?;

// Check that the batch interval is valid for the task
// https://www.ietf.org/archive/id/draft-ietf-ppm-dap-02.html#section-4.5.6.1.1
if !Q::validate_collection_identifier(&task, &collection_identifier) {
return Err(datastore::Error::User(
Error::BatchInvalid(*task.id(), format!("{collection_identifier}"))
.into(),
));
}

// Check if this collection job already exists, ensuring that all parameters match.
if let Some(collection_job) = tx
.get_collection_job::<SEED_SIZE, Q, A>(&vdaf, task.id(), &collection_job_id)
Expand All @@ -2200,6 +2178,28 @@ impl VdafOps {
}
}

let collection_identifier =
Q::collection_identifier_for_query(tx, &task, req.query())
.await?
.ok_or_else(|| {
datastore::Error::User(
Error::BatchInvalid(
*task.id(),
"no batch ready for collection".to_string(),
)
.into(),
)
})?;

// Check that the batch interval is valid for the task
// https://www.ietf.org/archive/id/draft-ietf-ppm-dap-02.html#section-4.5.6.1.1
if !Q::validate_collection_identifier(&task, &collection_identifier) {
return Err(datastore::Error::User(
Error::BatchInvalid(*task.id(), format!("{collection_identifier}"))
.into(),
));
}

debug!(collect_request = ?req, "Cache miss, creating new collection job");
let (_, report_count, batches, batches_with_reports) = try_join!(
Q::validate_query_count::<SEED_SIZE, C, A>(
Expand Down
44 changes: 44 additions & 0 deletions aggregator/src/aggregator/collection_job_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -739,6 +739,50 @@ async fn collection_job_put_idempotence_fixed_size_current_batch_mutate_aggregat
assert_eq!(response.status(), Some(Status::Conflict));
}

#[tokio::test]
async fn collection_job_put_idempotence_fixed_size_current_batch_no_extra_reports() {
let (test_case, _batch_id_1, _batch_id_2, _) =
setup_fixed_size_current_batch_collection_job_test_case().await;

let collection_job_id_1 = random();
let collection_job_id_2 = random();
let request = Arc::new(CollectionReq::new(
Query::new_fixed_size(FixedSizeQuery::CurrentBatch),
AggregationParam(0).get_encoded(),
));

// Create the first collection job.
let response = test_case
.put_collection_job(&collection_job_id_1, &request)
.await;
assert_eq!(response.status(), Some(Status::Created));

// Fetch the first collection job, to advance the current batch.
let response = test_case.post_collection_job(&collection_job_id_1).await;
assert_eq!(response.status(), Some(Status::Accepted));

// Create the second collection job.
let response = test_case
.put_collection_job(&collection_job_id_2, &request)
.await;
assert_eq!(response.status(), Some(Status::Created));

// Fetch the second collection job, to advance the current batch. There are now no outstanding
// batches left.
let response = test_case.post_collection_job(&collection_job_id_2).await;
assert_eq!(response.status(), Some(Status::Accepted));

// Re-send the collection job creation requests to confirm they are still idempotent.
let response = test_case
.put_collection_job(&collection_job_id_1, &request)
.await;
assert_eq!(response.status(), Some(Status::Created));
let response = test_case
.put_collection_job(&collection_job_id_2, &request)
.await;
assert_eq!(response.status(), Some(Status::Created));
}

#[tokio::test]
async fn collection_job_put_idempotence_fixed_size_by_batch_id() {
let test_case = setup_collection_job_test_case(
Expand Down

0 comments on commit c8dd494

Please sign in to comment.