From b6ac2325dac195f5ff217727e9dc9e482e6a90a5 Mon Sep 17 00:00:00 2001 From: Brandon Pitman Date: Mon, 24 Apr 2023 16:07:42 -0700 Subject: [PATCH 1/2] Do not store output share in report aggregations. This was never necessary: when an output share is recovered, its contribution to the relevant batch aggregation is always recorded, and the collection process solely uses the batch aggregation values. The output share was stored but never (meaningfully) read. Historically, this is an artifact of the initial implementation of the aggregation process, which occurred before the collection process was written. At time of initial implementation, there was no batch aggregation to record aggregated values into. --- aggregator/src/aggregator.rs | 26 ------- .../aggregator/aggregation_job_continue.rs | 2 +- .../src/aggregator/aggregation_job_driver.rs | 11 ++- .../src/aggregator/collection_job_driver.rs | 6 +- .../src/aggregator/collection_job_tests.rs | 2 +- aggregator/src/aggregator/http_handlers.rs | 4 +- aggregator_core/src/datastore.rs | 70 ++++++------------- db/20230424220336_rm-out-share.down.sql | 1 + db/20230424220336_rm-out-share.up.sql | 2 + 9 files changed, 35 insertions(+), 89 deletions(-) create mode 100644 db/20230424220336_rm-out-share.down.sql create mode 100644 db/20230424220336_rm-out-share.up.sql diff --git a/aggregator/src/aggregator.rs b/aggregator/src/aggregator.rs index e2a4d8a49..346f33534 100644 --- a/aggregator/src/aggregator.rs +++ b/aggregator/src/aggregator.rs @@ -2,7 +2,6 @@ pub use crate::aggregator::error::Error; use crate::aggregator::{ - accumulator::Accumulator, aggregate_share::compute_aggregate_share, error::BatchMismatch, http_handlers::aggregator_handler, @@ -281,7 +280,6 @@ impl Aggregator { .handle_aggregate_init( &self.datastore, &self.aggregate_step_failure_counter, - self.cfg.batch_aggregation_shard_count, aggregation_job_id, req_bytes, ) @@ -594,7 +592,6 @@ impl TaskAggregator { &self, datastore: &Datastore, aggregate_step_failure_counter: &Counter, - batch_aggregation_shard_count: u64, aggregation_job_id: &AggregationJobId, req_bytes: &[u8], ) -> Result { @@ -603,7 +600,6 @@ impl TaskAggregator { datastore, aggregate_step_failure_counter, Arc::clone(&self.task), - batch_aggregation_shard_count, aggregation_job_id, req_bytes, ) @@ -853,7 +849,6 @@ impl VdafOps { datastore: &Datastore, aggregate_step_failure_counter: &Counter, task: Arc, - batch_aggregation_shard_count: u64, aggregation_job_id: &AggregationJobId, req_bytes: &[u8], ) -> Result { @@ -865,7 +860,6 @@ impl VdafOps { vdaf, aggregate_step_failure_counter, task, - batch_aggregation_shard_count, aggregation_job_id, verify_key, req_bytes, @@ -880,7 +874,6 @@ impl VdafOps { vdaf, aggregate_step_failure_counter, task, - batch_aggregation_shard_count, aggregation_job_id, verify_key, req_bytes, @@ -1203,7 +1196,6 @@ impl VdafOps { vdaf: &A, aggregate_step_failure_counter: &Counter, task: Arc, - batch_aggregation_shard_count: u64, aggregation_job_id: &AggregationJobId, verify_key: &VerifyKey, req_bytes: &[u8], @@ -1500,12 +1492,6 @@ impl VdafOps { // Construct a response and write any new report shares and report aggregations // as we go. if !replayed_request { - let mut accumulator = Accumulator::::new( - Arc::clone(&task), - batch_aggregation_shard_count, - aggregation_job.aggregation_parameter().clone(), - ); - for report_share_data in &mut report_share_data { // Write client report & report aggregation. @@ -1526,19 +1512,7 @@ impl VdafOps { } } tx.put_report_aggregation(&report_share_data.report_aggregation).await?; - - if let ReportAggregationState::Finished(output_share) = report_share_data.report_aggregation.state() - { - accumulator.update( - aggregation_job.partial_batch_identifier(), - report_share_data.report_share.metadata().id(), - report_share_data.report_share.metadata().time(), - output_share, - )?; - } } - - accumulator.flush_to_datastore(tx, &vdaf).await?; } Ok(Self::aggregation_job_resp_for(report_share_data.into_iter().map(|data| data.report_aggregation))) diff --git a/aggregator/src/aggregator/aggregation_job_continue.rs b/aggregator/src/aggregator/aggregation_job_continue.rs index 82f3f433f..fec4f0e77 100644 --- a/aggregator/src/aggregator/aggregation_job_continue.rs +++ b/aggregator/src/aggregator/aggregation_job_continue.rs @@ -154,7 +154,7 @@ impl VdafOps { )?; *report_aggregation = report_aggregation .clone() - .with_state(ReportAggregationState::Finished(output_share)) + .with_state(ReportAggregationState::Finished) .with_last_prep_step(Some(PrepareStep::new( *prep_step.report_id(), PrepareStepResult::Finished, diff --git a/aggregator/src/aggregator/aggregation_job_driver.rs b/aggregator/src/aggregator/aggregation_job_driver.rs index ec2a940b6..0783a0b66 100644 --- a/aggregator/src/aggregator/aggregation_job_driver.rs +++ b/aggregator/src/aggregator/aggregation_job_driver.rs @@ -218,7 +218,7 @@ impl AggregationJobDriver { match report_aggregation.state() { ReportAggregationState::Start => saw_start = true, ReportAggregationState::Waiting(_, _) => saw_waiting = true, - ReportAggregationState::Finished(_) => saw_finished = true, + ReportAggregationState::Finished => saw_finished = true, ReportAggregationState::Failed(_) | ReportAggregationState::Invalid => (), // ignore failure aggregation states } } @@ -622,7 +622,7 @@ impl AggregationJobDriver { report_aggregation.time(), out_share, ) { - Ok(_) => ReportAggregationState::Finished(out_share.clone()), + Ok(_) => ReportAggregationState::Finished, Err(error) => { warn!(report_id = %report_aggregation.report_id(), ?error, "Could not update batch aggregation"); self.aggregate_step_failure_counter.add( @@ -1107,7 +1107,7 @@ mod tests { *report.metadata().time(), 0, None, - ReportAggregationState::Finished(transcript.output_share(Role::Leader).clone()), + ReportAggregationState::Finished, ); let (got_aggregation_job, got_report_aggregation) = ds @@ -1863,7 +1863,7 @@ mod tests { *report.metadata().time(), 0, None, - ReportAggregationState::Finished(transcript.output_share(Role::Leader).clone()), + ReportAggregationState::Finished, ); let batch_interval_start = report .metadata() @@ -2147,7 +2147,6 @@ mod tests { AggregationJobState::Finished, AggregationJobRound::from(1), ); - let leader_output_share = transcript.output_share(Role::Leader); let want_report_aggregation = ReportAggregation::::new( *task.id(), aggregation_job_id, @@ -2155,7 +2154,7 @@ mod tests { *report.metadata().time(), 0, None, - ReportAggregationState::Finished(leader_output_share.clone()), + ReportAggregationState::Finished, ); let want_batch_aggregations = Vec::from([BatchAggregation::< PRIO3_VERIFY_KEY_LENGTH, diff --git a/aggregator/src/aggregator/collection_job_driver.rs b/aggregator/src/aggregator/collection_job_driver.rs index 2c082e60a..a801f276d 100644 --- a/aggregator/src/aggregator/collection_job_driver.rs +++ b/aggregator/src/aggregator/collection_job_driver.rs @@ -484,7 +484,7 @@ mod tests { use janus_core::{ task::VdafInstance, test_util::{ - dummy_vdaf::{self, AggregationParam, OutputShare}, + dummy_vdaf::{self, AggregationParam}, install_test_trace_subscriber, runtime::TestRuntimeManager, }, @@ -571,7 +571,7 @@ mod tests { *report.metadata().time(), 0, None, - ReportAggregationState::Finished(OutputShare()), + ReportAggregationState::Finished, )) .await?; @@ -700,7 +700,7 @@ mod tests { *report.metadata().time(), 0, None, - ReportAggregationState::Finished(OutputShare()), + ReportAggregationState::Finished, )) .await?; diff --git a/aggregator/src/aggregator/collection_job_tests.rs b/aggregator/src/aggregator/collection_job_tests.rs index 3f68782eb..077b4c346 100644 --- a/aggregator/src/aggregator/collection_job_tests.rs +++ b/aggregator/src/aggregator/collection_job_tests.rs @@ -197,7 +197,7 @@ async fn setup_fixed_size_current_batch_collection_job_test_case( time, ord, None, - ReportAggregationState::Finished(dummy_vdaf::OutputShare()), + ReportAggregationState::Finished, )) .await .unwrap(); diff --git a/aggregator/src/aggregator/http_handlers.rs b/aggregator/src/aggregator/http_handlers.rs index a421c31c0..4dd7cb047 100644 --- a/aggregator/src/aggregator/http_handlers.rs +++ b/aggregator/src/aggregator/http_handlers.rs @@ -2323,9 +2323,7 @@ mod tests { *report_metadata_0.id(), PrepareStepResult::Finished )), - ReportAggregationState::Finished( - transcript_0.output_share(Role::Helper).clone() - ), + ReportAggregationState::Finished, ), ReportAggregation::new( *task.id(), diff --git a/aggregator_core/src/datastore.rs b/aggregator_core/src/datastore.rs index 81be1b816..30e4a81a2 100644 --- a/aggregator_core/src/datastore.rs +++ b/aggregator_core/src/datastore.rs @@ -63,7 +63,7 @@ pub mod test_util; /// List of schema versions that this version of Janus can safely run on. If any other schema /// version is seen, [`Datastore::new`] fails. -const SUPPORTED_SCHEMA_VERSIONS: &[i64] = &[20230405185602, 20230417204528]; +const SUPPORTED_SCHEMA_VERSIONS: &[i64] = &[20230405185602, 20230417204528, 20230424220336]; /// Datastore represents a datastore for Janus, with support for transactional reads and writes. /// In practice, Datastore instances are currently backed by a PostgreSQL database. @@ -1802,8 +1802,7 @@ impl Transaction<'_, C> { client_reports.report_id, client_reports.client_timestamp, report_aggregations.ord, report_aggregations.state, report_aggregations.prep_state, report_aggregations.prep_msg, - report_aggregations.out_share, report_aggregations.error_code, - report_aggregations.last_prep_step, aggregation_jobs.aggregation_param + report_aggregations.error_code, report_aggregations.last_prep_step FROM report_aggregations JOIN client_reports ON client_reports.id = report_aggregations.client_report_id JOIN aggregation_jobs @@ -1857,8 +1856,7 @@ impl Transaction<'_, C> { client_reports.report_id, client_reports.client_timestamp, report_aggregations.ord, report_aggregations.state, report_aggregations.prep_state, report_aggregations.prep_msg, - report_aggregations.out_share, report_aggregations.error_code, - report_aggregations.last_prep_step, aggregation_jobs.aggregation_param + report_aggregations.error_code, report_aggregations.last_prep_step FROM report_aggregations JOIN client_reports ON client_reports.id = report_aggregations.client_report_id JOIN aggregation_jobs @@ -1911,9 +1909,8 @@ impl Transaction<'_, C> { aggregation_jobs.aggregation_job_id, client_reports.report_id, client_reports.client_timestamp, report_aggregations.ord, report_aggregations.state, report_aggregations.prep_state, - report_aggregations.prep_msg, report_aggregations.out_share, - report_aggregations.error_code, report_aggregations.last_prep_step, - aggregation_jobs.aggregation_param + report_aggregations.prep_msg, report_aggregations.error_code, + report_aggregations.last_prep_step FROM report_aggregations JOIN client_reports ON client_reports.id = report_aggregations.client_report_id JOIN aggregation_jobs @@ -1953,10 +1950,8 @@ impl Transaction<'_, C> { let state: ReportAggregationStateCode = row.get("state"); let prep_state_bytes: Option> = row.get("prep_state"); let prep_msg_bytes: Option> = row.get("prep_msg"); - let out_share_bytes: Option> = row.get("out_share"); let error_code: Option = row.get("error_code"); let last_prep_step_bytes: Option> = row.get("last_prep_step"); - let aggregation_param_bytes = row.get("aggregation_param"); let error_code = match error_code { Some(c) => { @@ -1997,18 +1992,7 @@ impl Transaction<'_, C> { ReportAggregationState::Waiting(prep_state, prep_msg) } - ReportAggregationStateCode::Finished => { - let aggregation_param = A::AggregationParam::get_decoded(aggregation_param_bytes)?; - ReportAggregationState::Finished(A::OutputShare::get_decoded_with_param( - &(vdaf, &aggregation_param), - &out_share_bytes.ok_or_else(|| { - Error::DbState( - "report aggregation in state FINISHED but out_share is NULL" - .to_string(), - ) - })?, - )?) - } + ReportAggregationStateCode::Finished => ReportAggregationState::Finished, ReportAggregationStateCode::Failed => { ReportAggregationState::Failed(error_code.ok_or_else(|| { @@ -2053,12 +2037,12 @@ impl Transaction<'_, C> { .prepare_cached( "INSERT INTO report_aggregations (aggregation_job_id, client_report_id, ord, state, prep_state, prep_msg, - out_share, error_code, last_prep_step) + error_code, last_prep_step) VALUES ((SELECT id FROM aggregation_jobs WHERE aggregation_job_id = $1), (SELECT id FROM client_reports WHERE task_id = (SELECT id FROM tasks WHERE task_id = $2) AND report_id = $3), - $4, $5, $6, $7, $8, $9, $10)", + $4, $5, $6, $7, $8, $9)", ) .await?; self.execute( @@ -2072,7 +2056,6 @@ impl Transaction<'_, C> { /* state */ &report_aggregation.state().state_code(), /* prep_state */ &encoded_state_values.prep_state, /* prep_msg */ &encoded_state_values.prep_msg, - /* out_share */ &encoded_state_values.output_share, /* error_code */ &encoded_state_values.report_share_err, /* last_prep_step */ &encoded_last_prep_step, ], @@ -2100,13 +2083,13 @@ impl Transaction<'_, C> { let stmt = self .prepare_cached( "UPDATE report_aggregations SET - ord = $1, state = $2, prep_state = $3, prep_msg = $4, out_share = $5, - error_code = $6, last_prep_step = $7 + ord = $1, state = $2, prep_state = $3, prep_msg = $4, error_code = $5, + last_prep_step = $6 WHERE aggregation_job_id = (SELECT id FROM aggregation_jobs WHERE - aggregation_job_id = $8) + aggregation_job_id = $7) AND client_report_id = (SELECT id FROM client_reports - WHERE task_id = (SELECT id FROM tasks WHERE task_id = $9) - AND report_id = $10)", + WHERE task_id = (SELECT id FROM tasks WHERE task_id = $8) + AND report_id = $9)", ) .await?; check_single_row_mutation( @@ -2117,7 +2100,6 @@ impl Transaction<'_, C> { /* state */ &report_aggregation.state().state_code(), /* prep_state */ &encoded_state_values.prep_state, /* prep_msg */ &encoded_state_values.prep_msg, - /* out_share */ &encoded_state_values.output_share, /* error_code */ &encoded_state_values.report_share_err, /* last_prep_step */ &encoded_last_prep_step, /* aggregation_job_id */ @@ -4598,7 +4580,7 @@ pub mod models { #[derivative(Debug = "ignore")] A::PrepareState, #[derivative(Debug = "ignore")] Option, ), - Finished(#[derivative(Debug = "ignore")] A::OutputShare), + Finished, Failed(ReportShareError), Invalid, } @@ -4610,7 +4592,7 @@ pub mod models { match self { ReportAggregationState::Start => ReportAggregationStateCode::Start, ReportAggregationState::Waiting(_, _) => ReportAggregationStateCode::Waiting, - ReportAggregationState::Finished(_) => ReportAggregationStateCode::Finished, + ReportAggregationState::Finished => ReportAggregationStateCode::Finished, ReportAggregationState::Failed(_) => ReportAggregationStateCode::Failed, ReportAggregationState::Invalid => ReportAggregationStateCode::Invalid, } @@ -4632,12 +4614,7 @@ pub mod models { ..Default::default() } } - ReportAggregationState::Finished(output_share) => { - EncodedReportAggregationStateValues { - output_share: Some(output_share.get_encoded()), - ..Default::default() - } - } + ReportAggregationState::Finished => EncodedReportAggregationStateValues::default(), ReportAggregationState::Failed(report_share_err) => { EncodedReportAggregationStateValues { report_share_err: Some(*report_share_err as i16), @@ -4653,7 +4630,6 @@ pub mod models { pub(super) struct EncodedReportAggregationStateValues { pub(super) prep_state: Option>, pub(super) prep_msg: Option>, - pub(super) output_share: Option>, pub(super) report_share_err: Option, } @@ -4690,9 +4666,6 @@ pub mod models { Self::Waiting(lhs_prep_state, lhs_prep_msg), Self::Waiting(rhs_prep_state, rhs_prep_msg), ) => lhs_prep_state == rhs_prep_state && lhs_prep_msg == rhs_prep_msg, - (Self::Finished(lhs_out_share), Self::Finished(rhs_out_share)) => { - lhs_out_share == rhs_out_share - } (Self::Failed(lhs_report_share_err), Self::Failed(rhs_report_share_err)) => { lhs_report_share_err == rhs_report_share_err } @@ -7272,7 +7245,7 @@ mod tests { Some(vdaf_transcript.prepare_messages[0].clone()), ), ReportAggregationState::Waiting(leader_prep_state.clone(), None), - ReportAggregationState::Finished(vdaf_transcript.output_share(Role::Leader).clone()), + ReportAggregationState::Finished, ReportAggregationState::Failed(ReportShareError::VdafPrepError), ReportAggregationState::Invalid, ] @@ -7596,11 +7569,10 @@ mod tests { let want_report_aggregations = ds .run_tx(|tx| { - let (task, prep_msg, prep_state, output_share) = ( + let (task, prep_msg, prep_state) = ( task.clone(), vdaf_transcript.prepare_messages[0].clone(), vdaf_transcript.leader_prep_state(0).clone(), - vdaf_transcript.output_share(Role::Leader).clone(), ); Box::pin(async move { tx.put_task(&task).await?; @@ -7624,7 +7596,7 @@ mod tests { for (ord, state) in [ ReportAggregationState::::Start, ReportAggregationState::Waiting(prep_state.clone(), Some(prep_msg)), - ReportAggregationState::Finished(output_share), + ReportAggregationState::Finished, ReportAggregationState::Failed(ReportShareError::VdafPrepError), ReportAggregationState::Invalid, ] @@ -9523,7 +9495,7 @@ mod tests { clock.now(), 0, None, - ReportAggregationState::Finished(dummy_vdaf::OutputShare()), // Counted among min_size and max_size. + ReportAggregationState::Finished, // Counted among min_size and max_size. ); let report_aggregation_1_1 = ReportAggregation::<0, dummy_vdaf::Vdaf>::new( *task.id(), @@ -9532,7 +9504,7 @@ mod tests { clock.now(), 1, None, - ReportAggregationState::Finished(dummy_vdaf::OutputShare()), // Counted among min_size and max_size. + ReportAggregationState::Finished, // Counted among min_size and max_size. ); let report_aggregation_1_2 = ReportAggregation::<0, dummy_vdaf::Vdaf>::new( *task.id(), diff --git a/db/20230424220336_rm-out-share.down.sql b/db/20230424220336_rm-out-share.down.sql new file mode 100644 index 000000000..5921163ed --- /dev/null +++ b/db/20230424220336_rm-out-share.down.sql @@ -0,0 +1 @@ +ALTER TABLE report_aggregations ADD COLUMN out_share BYTEA; \ No newline at end of file diff --git a/db/20230424220336_rm-out-share.up.sql b/db/20230424220336_rm-out-share.up.sql new file mode 100644 index 000000000..bd4f936ad --- /dev/null +++ b/db/20230424220336_rm-out-share.up.sql @@ -0,0 +1,2 @@ + +ALTER TABLE report_aggregations DROP COLUMN out_share; \ No newline at end of file From 2ffa664fcc3589128465e8d1e4dd60187a962bd5 Mon Sep 17 00:00:00 2001 From: Brandon Pitman Date: Mon, 24 Apr 2023 16:13:22 -0700 Subject: [PATCH 2/2] newlines --- db/20230424220336_rm-out-share.down.sql | 2 +- db/20230424220336_rm-out-share.up.sql | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/db/20230424220336_rm-out-share.down.sql b/db/20230424220336_rm-out-share.down.sql index 5921163ed..264083cc1 100644 --- a/db/20230424220336_rm-out-share.down.sql +++ b/db/20230424220336_rm-out-share.down.sql @@ -1 +1 @@ -ALTER TABLE report_aggregations ADD COLUMN out_share BYTEA; \ No newline at end of file +ALTER TABLE report_aggregations ADD COLUMN out_share BYTEA; diff --git a/db/20230424220336_rm-out-share.up.sql b/db/20230424220336_rm-out-share.up.sql index bd4f936ad..b3483049c 100644 --- a/db/20230424220336_rm-out-share.up.sql +++ b/db/20230424220336_rm-out-share.up.sql @@ -1,2 +1 @@ - -ALTER TABLE report_aggregations DROP COLUMN out_share; \ No newline at end of file +ALTER TABLE report_aggregations DROP COLUMN out_share;