Skip to content

Commit

Permalink
Leader: do not send empty aggregation initialization requests. (#2439)
Browse files Browse the repository at this point in the history
Before this PR, this would happen if every report in the job was
filtered (for example, because the report was GC'ed before the
aggregation job was sent). DAP did not proscribe how to handle this;
a Helper Janus would return an error, causing the aggregation job to
become stuck.

Now, instead of sending an empty aggregation initialization request, the
Leader sends no request at all and continues processing as if it
received an empty response. This will cause the aggregation job to be
finished; the Helper will not know about this aggregation job, but
that's OK since the aggregation job will not contribute anything to the
eventual aggregate.
  • Loading branch information
branlwyd authored Jan 5, 2024
1 parent d65263d commit 313fa72
Showing 1 changed file with 194 additions and 24 deletions.
218 changes: 194 additions & 24 deletions aggregator/src/aggregator/aggregation_job_driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -408,31 +408,38 @@ impl AggregationJobDriver {
}
}

// Construct request, send it to the helper, and process the response.
// TODO(#235): abandon work immediately on "terminal" failures from helper, or other
// unexpected cases such as unknown/unexpected content type.
let req = AggregationJobInitializeReq::<Q>::new(
aggregation_job.aggregation_parameter().get_encoded(),
PartialBatchSelector::new(aggregation_job.partial_batch_identifier().clone()),
prepare_inits,
);
let resp = if !prepare_inits.is_empty() {
// Construct request, send it to the helper, and process the response.
// TODO(#235): abandon work immediately on "terminal" failures from helper, or other
// unexpected cases such as unknown/unexpected content type.
let req = AggregationJobInitializeReq::<Q>::new(
aggregation_job.aggregation_parameter().get_encoded(),
PartialBatchSelector::new(aggregation_job.partial_batch_identifier().clone()),
prepare_inits,
);

let resp_bytes = send_request_to_helper(
&self.http_client,
Method::PUT,
task.aggregation_job_uri(aggregation_job.id())?
.ok_or_else(|| anyhow!("task is not leader and has no aggregation job URI"))?,
AGGREGATION_JOB_ROUTE,
AggregationJobInitializeReq::<Q>::MEDIA_TYPE,
req,
// The only way a task wouldn't have an aggregator auth token in it is in the taskprov
// case, and Janus never acts as the leader with taskprov enabled.
task.aggregator_auth_token()
.ok_or_else(|| anyhow!("task has no aggregator auth token"))?,
&self.http_request_duration_histogram,
)
.await?;
let resp = AggregationJobResp::get_decoded(&resp_bytes)?;
let resp_bytes = send_request_to_helper(
&self.http_client,
Method::PUT,
task.aggregation_job_uri(aggregation_job.id())?
.ok_or_else(|| anyhow!("task is not leader and has no aggregation job URI"))?,
AGGREGATION_JOB_ROUTE,
AggregationJobInitializeReq::<Q>::MEDIA_TYPE,
req,
// The only way a task wouldn't have an aggregator auth token in it is in the taskprov
// case, and Janus never acts as the leader with taskprov enabled.
task.aggregator_auth_token()
.ok_or_else(|| anyhow!("task has no aggregator auth token"))?,
&self.http_request_duration_histogram,
)
.await?;
AggregationJobResp::get_decoded(&resp_bytes)?
} else {
// If there are no prepare inits to send (because every report aggregation was filtered by
// the block above), don't send a request to the Helper at all and process an artificial
// aggregation job response instead, which will finish the aggregation job.
AggregationJobResp::new(Vec::new())
};

self.process_response_from_helper(
datastore,
Expand Down Expand Up @@ -2397,6 +2404,169 @@ mod tests {
assert_eq!(want_batch, got_batch);
}

#[tokio::test]
async fn step_fixed_size_aggregation_job_init_missing_report() {
// Setup: insert a client report and add it to a new aggregation job.
install_test_trace_subscriber();
let server = mockito::Server::new_async().await;
let clock = MockClock::default();
let ephemeral_datastore = ephemeral_datastore().await;
let ds = Arc::new(ephemeral_datastore.datastore(clock.clone()).await);
let vdaf = Arc::new(Prio3::new_count(2).unwrap());

let task = TaskBuilder::new(
QueryType::FixedSize {
max_batch_size: 10,
batch_time_window_size: None,
},
VdafInstance::Prio3Count,
)
.with_helper_aggregator_endpoint(server.url().parse().unwrap())
.build();

let leader_task = task.leader_view().unwrap();

// We make up a report ID & report_timestamp to place into the report aggregation, in order
// to simulate a report aggregation referring to a report which has been garbage collected.
let report_id = random();
let report_timestamp = clock
.now()
.to_batch_interval_start(task.time_precision())
.unwrap();
let batch_id = random();
let aggregation_job_id = random();

let lease = ds
.run_unnamed_tx(|tx| {
let leader_task = leader_task.clone();

Box::pin(async move {
tx.put_aggregator_task(&leader_task).await?;

tx.put_aggregation_job(&AggregationJob::<
VERIFY_KEY_LENGTH,
FixedSize,
Prio3Count,
>::new(
*leader_task.id(),
aggregation_job_id,
(),
batch_id,
Interval::new(Time::from_seconds_since_epoch(0), Duration::from_seconds(1))
.unwrap(),
AggregationJobState::InProgress,
AggregationJobStep::from(0),
))
.await?;

tx.put_report_aggregation(
&ReportAggregation::<VERIFY_KEY_LENGTH, Prio3Count>::new(
*leader_task.id(),
aggregation_job_id,
report_id,
report_timestamp,
0,
None,
ReportAggregationState::Start,
),
)
.await?;

tx.put_batch(&Batch::<VERIFY_KEY_LENGTH, FixedSize, Prio3Count>::new(
*leader_task.id(),
batch_id,
(),
BatchState::Open,
1,
Interval::from_time(&report_timestamp).unwrap(),
))
.await?;

Ok(tx
.acquire_incomplete_aggregation_jobs(&StdDuration::from_secs(60), 1)
.await?
.remove(0))
})
})
.await
.unwrap();
assert_eq!(lease.leased().task_id(), task.id());
assert_eq!(lease.leased().aggregation_job_id(), &aggregation_job_id);

// Run: create an aggregation job driver & try to step the aggregation we've created.
let aggregation_job_driver = AggregationJobDriver::new(
reqwest::Client::builder().build().unwrap(),
&noop_meter(),
32,
);
aggregation_job_driver
.step_aggregation_job(ds.clone(), Arc::new(lease))
.await
.unwrap();

let want_aggregation_job = AggregationJob::<VERIFY_KEY_LENGTH, FixedSize, Prio3Count>::new(
*task.id(),
aggregation_job_id,
(),
batch_id,
Interval::new(Time::from_seconds_since_epoch(0), Duration::from_seconds(1)).unwrap(),
AggregationJobState::Finished,
AggregationJobStep::from(1),
);
let want_report_aggregation = ReportAggregation::<VERIFY_KEY_LENGTH, Prio3Count>::new(
*task.id(),
aggregation_job_id,
report_id,
report_timestamp,
0,
None,
ReportAggregationState::Failed(PrepareError::ReportDropped),
);
let want_batch = Batch::<VERIFY_KEY_LENGTH, FixedSize, Prio3Count>::new(
*task.id(),
batch_id,
(),
BatchState::Open,
0,
Interval::from_time(&report_timestamp).unwrap(),
);

let (got_aggregation_job, got_report_aggregation, got_batch) = ds
.run_unnamed_tx(|tx| {
let vdaf = Arc::clone(&vdaf);
let task = task.clone();

Box::pin(async move {
let aggregation_job = tx
.get_aggregation_job::<VERIFY_KEY_LENGTH, FixedSize, Prio3Count>(
task.id(),
&aggregation_job_id,
)
.await?
.unwrap();
let report_aggregation = tx
.get_report_aggregation(
vdaf.as_ref(),
&Role::Leader,
task.id(),
&aggregation_job_id,
aggregation_job.aggregation_parameter(),
&report_id,
)
.await?
.unwrap();
let batch = tx.get_batch(task.id(), &batch_id, &()).await?.unwrap();
Ok((aggregation_job, report_aggregation, batch))
})
})
.await
.unwrap();

assert_eq!(want_aggregation_job, got_aggregation_job);
assert_eq!(want_report_aggregation, got_report_aggregation);
assert_eq!(want_batch, got_batch);
}

#[tokio::test]
async fn step_time_interval_aggregation_job_continue() {
// Setup: insert a client report and add it to an aggregation job whose state has already
Expand Down

0 comments on commit 313fa72

Please sign in to comment.