Skip to content

Commit

Permalink
helper stores prep state instead of PingPingTransition
Browse files Browse the repository at this point in the history
And move some more tests to Poplar1 so we can exercise continuation
  • Loading branch information
tgeoghegan committed Sep 13, 2023
1 parent 384ed5e commit fff1d93
Show file tree
Hide file tree
Showing 11 changed files with 410 additions and 225 deletions.
14 changes: 4 additions & 10 deletions aggregator/src/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1776,13 +1776,7 @@ impl VdafOps {
&input_share,
prepare_init.message(),
)
.and_then(|transition| {
transition
.evaluate(vdaf)
.map(|(ping_pong_state, outgoing_message)| {
(transition, ping_pong_state, outgoing_message)
})
})
.and_then(|transition| transition.evaluate(vdaf))
.map_err(|error| {
handle_ping_pong_error(
task.id(),
Expand All @@ -1796,18 +1790,18 @@ impl VdafOps {
});

let (report_aggregation_state, prepare_step_result) = match init_rslt {
Ok((transition, PingPongState::Continued(_), outgoing_message)) => {
Ok((PingPongState::Continued(prep_state), outgoing_message)) => {
// Helper is not finished. Await the next message from the Leader to advance to
// the next round.
saw_continue = true;
(
ReportAggregationState::Waiting(transition),
ReportAggregationState::WaitingHelper(prep_state),
PrepareStepResult::Continue {
message: outgoing_message,
},
)
}
Ok((_, PingPongState::Finished(output_share), outgoing_message)) => {
Ok((PingPongState::Finished(output_share), outgoing_message)) => {
// Helper finished. Unlike the Leader, the Helper does not wait for confirmation
// that the Leader finished before accumulating its output share.
accumulator.update(
Expand Down
74 changes: 58 additions & 16 deletions aggregator/src/aggregator/aggregate_init_tests.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use crate::aggregator::{
http_handlers::aggregator_handler, tests::generate_helper_report_share, Config,
http_handlers::{aggregator_handler, test_util::decode_response_body},
tests::generate_helper_report_share,
Config,
};
use assert_matches::assert_matches;
use janus_aggregator_core::{
datastore::{
test_util::{ephemeral_datastore, EphemeralDatastore},
Expand All @@ -15,15 +18,16 @@ use janus_core::{
time::{Clock, MockClock, TimeExt as _},
};
use janus_messages::{
query_type::TimeInterval, AggregationJobId, AggregationJobInitializeReq, PartialBatchSelector,
PrepareInit, ReportMetadata, Role,
query_type::TimeInterval, AggregationJobId, AggregationJobInitializeReq, AggregationJobResp,
PartialBatchSelector, PrepareInit, PrepareStepResult, ReportMetadata, Role,
};
use prio::{
codec::Encode,
idpf::IdpfInput,
vdaf::{
self,
poplar1::{Poplar1, Poplar1AggregationParam},
prg::PrgSha3,
},
};
use rand::random;
Expand Down Expand Up @@ -115,6 +119,7 @@ pub(super) struct AggregationJobInitTestCase<
pub(super) prepare_inits: Vec<PrepareInit>,
pub(super) aggregation_job_id: AggregationJobId,
aggregation_job_init_req: AggregationJobInitializeReq<TimeInterval>,
aggregation_job_init_resp: Option<AggregationJobResp>,
pub(super) aggregation_param: V::AggregationParam,
pub(super) handler: Box<dyn Handler>,
pub(super) datastore: Arc<Datastore<MockClock>>,
Expand All @@ -131,6 +136,20 @@ pub(super) async fn setup_aggregate_init_test() -> AggregationJobInitTestCase<0,
.await
}

async fn setup_poplar1_aggregate_init_test() -> AggregationJobInitTestCase<16, Poplar1<PrgSha3, 16>>
{
let aggregation_param =
Poplar1AggregationParam::try_from_prefixes(Vec::from([IdpfInput::from_bools(&[false])]))
.unwrap();
setup_aggregate_init_test_for_vdaf(
Poplar1::new_sha3(1),
VdafInstance::Poplar1 { bits: 1 },
aggregation_param,
IdpfInput::from_bools(&[true]),
)
.await
}

async fn setup_aggregate_init_test_for_vdaf<
const VERIFY_KEY_SIZE: usize,
V: vdaf::Aggregator<VERIFY_KEY_SIZE, 16> + vdaf::Client<16>,
Expand All @@ -140,15 +159,15 @@ async fn setup_aggregate_init_test_for_vdaf<
aggregation_param: V::AggregationParam,
measurement: V::Measurement,
) -> AggregationJobInitTestCase<VERIFY_KEY_SIZE, V> {
let test_case = setup_aggregate_init_test_without_sending_request(
let mut test_case = setup_aggregate_init_test_without_sending_request(
vdaf,
vdaf_instance,
aggregation_param,
measurement,
)
.await;

let response = put_aggregation_job(
let mut response = put_aggregation_job(
&test_case.task,
&test_case.aggregation_job_id,
&test_case.aggregation_job_init_req,
Expand All @@ -157,6 +176,17 @@ async fn setup_aggregate_init_test_for_vdaf<
.await;
assert_eq!(response.status(), Some(Status::Ok));

let aggregation_job_init_resp: AggregationJobResp = decode_response_body(&mut response).await;
assert_eq!(
aggregation_job_init_resp.prepare_resps().len(),
test_case.aggregation_job_init_req.prepare_inits().len(),
);
assert_matches!(
aggregation_job_init_resp.prepare_resps()[0].result(),
&PrepareStepResult::Continue { .. }
);

test_case.aggregation_job_init_resp = Some(aggregation_job_init_resp);
test_case
}

Expand Down Expand Up @@ -209,6 +239,7 @@ async fn setup_aggregate_init_test_without_sending_request<
prepare_init_generator,
aggregation_job_id,
aggregation_job_init_req,
aggregation_job_init_resp: None,
aggregation_param,
handler: Box::new(handler),
datastore,
Expand Down Expand Up @@ -376,17 +407,7 @@ async fn aggregation_job_mutation_report_shares() {
#[tokio::test]
async fn aggregation_job_mutation_report_aggregations() {
// We must run Poplar1 in this test so that the aggregation job won't finish on the first step

let aggregation_param =
Poplar1AggregationParam::try_from_prefixes(Vec::from([IdpfInput::from_bools(&[false])]))
.unwrap();
let test_case = setup_aggregate_init_test_for_vdaf(
Poplar1::new_sha3(1),
VdafInstance::Poplar1 { bits: 1 },
aggregation_param,
IdpfInput::from_bools(&[true]),
)
.await;
let test_case = setup_poplar1_aggregate_init_test().await;

// Generate some new reports using the existing reports' metadata, but varying the measurement
// values such that the prepare state computed during aggregation initializaton won't match the
Expand Down Expand Up @@ -420,3 +441,24 @@ async fn aggregation_job_mutation_report_aggregations() {
.await;
assert_eq!(response.status(), Some(Status::Conflict));
}

#[tokio::test]
async fn aggregation_job_init_two_round_vdaf_idempotence() {
// We must run Poplar1 in this test so that the aggregation job won't finish on the first step
let test_case = setup_poplar1_aggregate_init_test().await;

// Send the aggregation job init request again. We should get an identical response back.
let mut response = put_aggregation_job(
&test_case.task,
&test_case.aggregation_job_id,
&test_case.aggregation_job_init_req,
&test_case.handler,
)
.await;

let aggregation_job_resp: AggregationJobResp = decode_response_body(&mut response).await;
assert_eq!(
aggregation_job_resp,
test_case.aggregation_job_init_resp.unwrap()
);
}
49 changes: 25 additions & 24 deletions aggregator/src/aggregator/aggregation_job_continue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ impl VdafOps {
if report_agg.report_id() != prep_step.report_id() {
// This report was omitted by the leader because of a prior failure. Note that
// the report was dropped (if it's not already in an error state) and continue.
if matches!(report_agg.state(), ReportAggregationState::Waiting(_)) {
if matches!(report_agg.state(), ReportAggregationState::WaitingHelper(_)) {
*report_agg = report_agg
.clone()
.with_state(ReportAggregationState::Failed(PrepareError::ReportDropped))
Expand Down Expand Up @@ -103,8 +103,8 @@ impl VdafOps {
continue;
}

let transition = match report_aggregation.state() {
ReportAggregationState::Waiting(transition) => transition,
let prep_state = match report_aggregation.state() {
ReportAggregationState::WaitingHelper(prep_state) => prep_state,
_ => {
return Err(datastore::Error::User(
Error::UnrecognizedMessage(
Expand All @@ -119,25 +119,23 @@ impl VdafOps {
let (report_aggregation_state, prepare_step_result, output_share) =
trace_span!("VDAF preparation")
.in_scope(|| {
// Evaluate the stored transition to recover our current state.
transition
.evaluate(vdaf.as_ref())
.and_then(|(state, _)| {
// Then continue with the incoming message.
vdaf.helper_continued(state, prep_step.message())
})
.and_then(|continued_value| match continued_value {
PingPongContinuedValue::WithMessage {
transition: new_transition,
} => {
// Continue with the incoming message.
vdaf.helper_continued(
PingPongState::Continued(prep_state.clone()),
prep_step.message(),
)
.and_then(
|continued_value| match continued_value {
PingPongContinuedValue::WithMessage { transition } => {
let (new_state, message) =
new_transition.evaluate(vdaf.as_ref())?;
transition.evaluate(vdaf.as_ref())?;
let (report_aggregation_state, output_share) = match new_state {
// Helper did not finish. Store the new transition and await the next message
// from the Leader to advance preparation.
PingPongState::Continued(_) => {
(ReportAggregationState::Waiting(new_transition), None)
}
// Helper did not finish. Store the new state and await the
// next message from the Leader to advance preparation.
PingPongState::Continued(prep_state) => (
ReportAggregationState::WaitingHelper(prep_state),
None,
),
// Helper finished. Commit the output share.
PingPongState::Finished(output_share) => {
(ReportAggregationState::Finished, Some(output_share))
Expand All @@ -156,7 +154,8 @@ impl VdafOps {
PrepareStepResult::Finished,
Some(output_share),
)),
})
},
)
})
.map_err(|error| {
handle_ping_pong_error(
Expand Down Expand Up @@ -195,7 +194,7 @@ impl VdafOps {
for report_agg in report_aggregations_iter {
// This report was omitted by the leader because of a prior failure. Note that the
// report was dropped (if it's not already in an error state) and continue.
if matches!(report_agg.state(), ReportAggregationState::Waiting(_)) {
if matches!(report_agg.state(), ReportAggregationState::WaitingHelper(_)) {
*report_agg = report_agg
.clone()
.with_state(ReportAggregationState::Failed(PrepareError::ReportDropped))
Expand Down Expand Up @@ -494,8 +493,10 @@ mod tests {
*prepare_init.report_share().metadata().time(),
0,
None,
ReportAggregationState::Waiting(
transcript.helper_prepare_transitions[0].transition.clone(),
ReportAggregationState::WaitingHelper(
transcript.helper_prepare_transitions[0]
.prepare_state()
.clone(),
),
),
)
Expand Down
16 changes: 8 additions & 8 deletions aggregator/src/aggregator/aggregation_job_driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,9 +236,9 @@ impl AggregationJobDriver {
for report_aggregation in &report_aggregations {
match report_aggregation.state() {
ReportAggregationState::Start => saw_start = true,
ReportAggregationState::Waiting(_) => saw_waiting = true,
ReportAggregationState::WaitingLeader(_) => saw_waiting = true,
ReportAggregationState::Finished => saw_finished = true,
ReportAggregationState::Failed(_) => (), // ignore failed aggregations
_ => (), // ignore failed aggregations
}
}
match (saw_start, saw_waiting, saw_finished) {
Expand Down Expand Up @@ -458,7 +458,7 @@ impl AggregationJobDriver {
let mut prepare_continues = Vec::new();
let mut stepped_aggregations = Vec::new();
for report_aggregation in report_aggregations {
if let ReportAggregationState::Waiting(transition) = report_aggregation.state() {
if let ReportAggregationState::WaitingLeader(transition) = report_aggregation.state() {
let (prep_state, message) = match transition.evaluate(vdaf.as_ref()) {
Ok((state, message)) => (state, message),
Err(error) => {
Expand Down Expand Up @@ -589,7 +589,7 @@ impl AggregationJobDriver {
// VDAF level (i.e., state may be PingPongState::Finished) but we cannot
// finish at the DAP layer and commit the output share until we get
// confirmation from the Helper that they finished, too.
ReportAggregationState::Waiting(transition)
ReportAggregationState::WaitingLeader(transition)
}
Ok(PingPongContinuedValue::FinishedNoMessage { output_share }) => {
// We finished and have no outgoing message, meaning the Helper was
Expand Down Expand Up @@ -1804,7 +1804,7 @@ mod tests {
*report.metadata().time(),
0,
None,
ReportAggregationState::Waiting(
ReportAggregationState::WaitingLeader(
transcript.leader_prepare_transitions[1]
.transition
.clone()
Expand Down Expand Up @@ -2313,7 +2313,7 @@ mod tests {
*report.metadata().time(),
0,
None,
ReportAggregationState::Waiting(
ReportAggregationState::WaitingLeader(
transcript.leader_prepare_transitions[1]
.transition
.clone()
Expand Down Expand Up @@ -2480,7 +2480,7 @@ mod tests {
*report.metadata().time(),
0,
None,
ReportAggregationState::Waiting(
ReportAggregationState::WaitingLeader(
transcript.leader_prepare_transitions[1]
.transition
.clone()
Expand Down Expand Up @@ -2879,7 +2879,7 @@ mod tests {
*report.metadata().time(),
0,
None,
ReportAggregationState::Waiting(
ReportAggregationState::WaitingLeader(
transcript.leader_prepare_transitions[1]
.transition
.clone()
Expand Down
Loading

0 comments on commit fff1d93

Please sign in to comment.