diff --git a/Cargo.lock b/Cargo.lock index 652f97516..7de359d42 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -257,7 +257,7 @@ dependencies = [ "log", "parking", "polling", - "rustix 0.37.3", + "rustix", "slab", "socket2 0.4.9", "waker-fn", diff --git a/aggregator/src/aggregator.rs b/aggregator/src/aggregator.rs index e2a4d8a49..346f33534 100644 --- a/aggregator/src/aggregator.rs +++ b/aggregator/src/aggregator.rs @@ -2,7 +2,6 @@ pub use crate::aggregator::error::Error; use crate::aggregator::{ - accumulator::Accumulator, aggregate_share::compute_aggregate_share, error::BatchMismatch, http_handlers::aggregator_handler, @@ -281,7 +280,6 @@ impl Aggregator { .handle_aggregate_init( &self.datastore, &self.aggregate_step_failure_counter, - self.cfg.batch_aggregation_shard_count, aggregation_job_id, req_bytes, ) @@ -594,7 +592,6 @@ impl TaskAggregator { &self, datastore: &Datastore, aggregate_step_failure_counter: &Counter, - batch_aggregation_shard_count: u64, aggregation_job_id: &AggregationJobId, req_bytes: &[u8], ) -> Result { @@ -603,7 +600,6 @@ impl TaskAggregator { datastore, aggregate_step_failure_counter, Arc::clone(&self.task), - batch_aggregation_shard_count, aggregation_job_id, req_bytes, ) @@ -853,7 +849,6 @@ impl VdafOps { datastore: &Datastore, aggregate_step_failure_counter: &Counter, task: Arc, - batch_aggregation_shard_count: u64, aggregation_job_id: &AggregationJobId, req_bytes: &[u8], ) -> Result { @@ -865,7 +860,6 @@ impl VdafOps { vdaf, aggregate_step_failure_counter, task, - batch_aggregation_shard_count, aggregation_job_id, verify_key, req_bytes, @@ -880,7 +874,6 @@ impl VdafOps { vdaf, aggregate_step_failure_counter, task, - batch_aggregation_shard_count, aggregation_job_id, verify_key, req_bytes, @@ -1203,7 +1196,6 @@ impl VdafOps { vdaf: &A, aggregate_step_failure_counter: &Counter, task: Arc, - batch_aggregation_shard_count: u64, aggregation_job_id: &AggregationJobId, verify_key: &VerifyKey, req_bytes: &[u8], @@ -1500,12 +1492,6 @@ impl VdafOps { // Construct a response and write any new report shares and report aggregations // as we go. if !replayed_request { - let mut accumulator = Accumulator::::new( - Arc::clone(&task), - batch_aggregation_shard_count, - aggregation_job.aggregation_parameter().clone(), - ); - for report_share_data in &mut report_share_data { // Write client report & report aggregation. @@ -1526,19 +1512,7 @@ impl VdafOps { } } tx.put_report_aggregation(&report_share_data.report_aggregation).await?; - - if let ReportAggregationState::Finished(output_share) = report_share_data.report_aggregation.state() - { - accumulator.update( - aggregation_job.partial_batch_identifier(), - report_share_data.report_share.metadata().id(), - report_share_data.report_share.metadata().time(), - output_share, - )?; - } } - - accumulator.flush_to_datastore(tx, &vdaf).await?; } Ok(Self::aggregation_job_resp_for(report_share_data.into_iter().map(|data| data.report_aggregation))) diff --git a/aggregator/src/aggregator/aggregation_job_continue.rs b/aggregator/src/aggregator/aggregation_job_continue.rs index 82f3f433f..fec4f0e77 100644 --- a/aggregator/src/aggregator/aggregation_job_continue.rs +++ b/aggregator/src/aggregator/aggregation_job_continue.rs @@ -154,7 +154,7 @@ impl VdafOps { )?; *report_aggregation = report_aggregation .clone() - .with_state(ReportAggregationState::Finished(output_share)) + .with_state(ReportAggregationState::Finished) .with_last_prep_step(Some(PrepareStep::new( *prep_step.report_id(), PrepareStepResult::Finished, diff --git a/aggregator/src/aggregator/aggregation_job_driver.rs b/aggregator/src/aggregator/aggregation_job_driver.rs index ec2a940b6..0783a0b66 100644 --- a/aggregator/src/aggregator/aggregation_job_driver.rs +++ b/aggregator/src/aggregator/aggregation_job_driver.rs @@ -218,7 +218,7 @@ impl AggregationJobDriver { match report_aggregation.state() { ReportAggregationState::Start => saw_start = true, ReportAggregationState::Waiting(_, _) => saw_waiting = true, - ReportAggregationState::Finished(_) => saw_finished = true, + ReportAggregationState::Finished => saw_finished = true, ReportAggregationState::Failed(_) | ReportAggregationState::Invalid => (), // ignore failure aggregation states } } @@ -622,7 +622,7 @@ impl AggregationJobDriver { report_aggregation.time(), out_share, ) { - Ok(_) => ReportAggregationState::Finished(out_share.clone()), + Ok(_) => ReportAggregationState::Finished, Err(error) => { warn!(report_id = %report_aggregation.report_id(), ?error, "Could not update batch aggregation"); self.aggregate_step_failure_counter.add( @@ -1107,7 +1107,7 @@ mod tests { *report.metadata().time(), 0, None, - ReportAggregationState::Finished(transcript.output_share(Role::Leader).clone()), + ReportAggregationState::Finished, ); let (got_aggregation_job, got_report_aggregation) = ds @@ -1863,7 +1863,7 @@ mod tests { *report.metadata().time(), 0, None, - ReportAggregationState::Finished(transcript.output_share(Role::Leader).clone()), + ReportAggregationState::Finished, ); let batch_interval_start = report .metadata() @@ -2147,7 +2147,6 @@ mod tests { AggregationJobState::Finished, AggregationJobRound::from(1), ); - let leader_output_share = transcript.output_share(Role::Leader); let want_report_aggregation = ReportAggregation::::new( *task.id(), aggregation_job_id, @@ -2155,7 +2154,7 @@ mod tests { *report.metadata().time(), 0, None, - ReportAggregationState::Finished(leader_output_share.clone()), + ReportAggregationState::Finished, ); let want_batch_aggregations = Vec::from([BatchAggregation::< PRIO3_VERIFY_KEY_LENGTH, diff --git a/aggregator/src/aggregator/collection_job_driver.rs b/aggregator/src/aggregator/collection_job_driver.rs index 2c082e60a..a801f276d 100644 --- a/aggregator/src/aggregator/collection_job_driver.rs +++ b/aggregator/src/aggregator/collection_job_driver.rs @@ -484,7 +484,7 @@ mod tests { use janus_core::{ task::VdafInstance, test_util::{ - dummy_vdaf::{self, AggregationParam, OutputShare}, + dummy_vdaf::{self, AggregationParam}, install_test_trace_subscriber, runtime::TestRuntimeManager, }, @@ -571,7 +571,7 @@ mod tests { *report.metadata().time(), 0, None, - ReportAggregationState::Finished(OutputShare()), + ReportAggregationState::Finished, )) .await?; @@ -700,7 +700,7 @@ mod tests { *report.metadata().time(), 0, None, - ReportAggregationState::Finished(OutputShare()), + ReportAggregationState::Finished, )) .await?; diff --git a/aggregator/src/aggregator/collection_job_tests.rs b/aggregator/src/aggregator/collection_job_tests.rs index 3f68782eb..077b4c346 100644 --- a/aggregator/src/aggregator/collection_job_tests.rs +++ b/aggregator/src/aggregator/collection_job_tests.rs @@ -197,7 +197,7 @@ async fn setup_fixed_size_current_batch_collection_job_test_case( time, ord, None, - ReportAggregationState::Finished(dummy_vdaf::OutputShare()), + ReportAggregationState::Finished, )) .await .unwrap(); diff --git a/aggregator/src/aggregator/http_handlers.rs b/aggregator/src/aggregator/http_handlers.rs index a421c31c0..4dd7cb047 100644 --- a/aggregator/src/aggregator/http_handlers.rs +++ b/aggregator/src/aggregator/http_handlers.rs @@ -2323,9 +2323,7 @@ mod tests { *report_metadata_0.id(), PrepareStepResult::Finished )), - ReportAggregationState::Finished( - transcript_0.output_share(Role::Helper).clone() - ), + ReportAggregationState::Finished, ), ReportAggregation::new( *task.id(), diff --git a/aggregator_core/src/datastore.rs b/aggregator_core/src/datastore.rs index 8c92ca1a6..46dd1678e 100644 --- a/aggregator_core/src/datastore.rs +++ b/aggregator_core/src/datastore.rs @@ -86,7 +86,7 @@ macro_rules! supported_schema_versions { // List of schema versions that this version of Janus can safely run on. If any other schema // version is seen, [`Datastore::new`] fails. -supported_schema_versions!(20230417204528); +supported_schema_versions!(20230417204528, 20230424220336); /// Datastore represents a datastore for Janus, with support for transactional reads and writes. /// In practice, Datastore instances are currently backed by a PostgreSQL database. @@ -1825,8 +1825,7 @@ impl Transaction<'_, C> { client_reports.report_id, client_reports.client_timestamp, report_aggregations.ord, report_aggregations.state, report_aggregations.prep_state, report_aggregations.prep_msg, - report_aggregations.out_share, report_aggregations.error_code, - report_aggregations.last_prep_step, aggregation_jobs.aggregation_param + report_aggregations.error_code, report_aggregations.last_prep_step FROM report_aggregations JOIN client_reports ON client_reports.id = report_aggregations.client_report_id JOIN aggregation_jobs @@ -1880,8 +1879,7 @@ impl Transaction<'_, C> { client_reports.report_id, client_reports.client_timestamp, report_aggregations.ord, report_aggregations.state, report_aggregations.prep_state, report_aggregations.prep_msg, - report_aggregations.out_share, report_aggregations.error_code, - report_aggregations.last_prep_step, aggregation_jobs.aggregation_param + report_aggregations.error_code, report_aggregations.last_prep_step FROM report_aggregations JOIN client_reports ON client_reports.id = report_aggregations.client_report_id JOIN aggregation_jobs @@ -1934,9 +1932,8 @@ impl Transaction<'_, C> { aggregation_jobs.aggregation_job_id, client_reports.report_id, client_reports.client_timestamp, report_aggregations.ord, report_aggregations.state, report_aggregations.prep_state, - report_aggregations.prep_msg, report_aggregations.out_share, - report_aggregations.error_code, report_aggregations.last_prep_step, - aggregation_jobs.aggregation_param + report_aggregations.prep_msg, report_aggregations.error_code, + report_aggregations.last_prep_step FROM report_aggregations JOIN client_reports ON client_reports.id = report_aggregations.client_report_id JOIN aggregation_jobs @@ -1976,10 +1973,8 @@ impl Transaction<'_, C> { let state: ReportAggregationStateCode = row.get("state"); let prep_state_bytes: Option> = row.get("prep_state"); let prep_msg_bytes: Option> = row.get("prep_msg"); - let out_share_bytes: Option> = row.get("out_share"); let error_code: Option = row.get("error_code"); let last_prep_step_bytes: Option> = row.get("last_prep_step"); - let aggregation_param_bytes = row.get("aggregation_param"); let error_code = match error_code { Some(c) => { @@ -2020,18 +2015,7 @@ impl Transaction<'_, C> { ReportAggregationState::Waiting(prep_state, prep_msg) } - ReportAggregationStateCode::Finished => { - let aggregation_param = A::AggregationParam::get_decoded(aggregation_param_bytes)?; - ReportAggregationState::Finished(A::OutputShare::get_decoded_with_param( - &(vdaf, &aggregation_param), - &out_share_bytes.ok_or_else(|| { - Error::DbState( - "report aggregation in state FINISHED but out_share is NULL" - .to_string(), - ) - })?, - )?) - } + ReportAggregationStateCode::Finished => ReportAggregationState::Finished, ReportAggregationStateCode::Failed => { ReportAggregationState::Failed(error_code.ok_or_else(|| { @@ -2076,12 +2060,12 @@ impl Transaction<'_, C> { .prepare_cached( "INSERT INTO report_aggregations (aggregation_job_id, client_report_id, ord, state, prep_state, prep_msg, - out_share, error_code, last_prep_step) + error_code, last_prep_step) VALUES ((SELECT id FROM aggregation_jobs WHERE aggregation_job_id = $1), (SELECT id FROM client_reports WHERE task_id = (SELECT id FROM tasks WHERE task_id = $2) AND report_id = $3), - $4, $5, $6, $7, $8, $9, $10)", + $4, $5, $6, $7, $8, $9)", ) .await?; self.execute( @@ -2095,7 +2079,6 @@ impl Transaction<'_, C> { /* state */ &report_aggregation.state().state_code(), /* prep_state */ &encoded_state_values.prep_state, /* prep_msg */ &encoded_state_values.prep_msg, - /* out_share */ &encoded_state_values.output_share, /* error_code */ &encoded_state_values.report_share_err, /* last_prep_step */ &encoded_last_prep_step, ], @@ -2123,13 +2106,13 @@ impl Transaction<'_, C> { let stmt = self .prepare_cached( "UPDATE report_aggregations SET - ord = $1, state = $2, prep_state = $3, prep_msg = $4, out_share = $5, - error_code = $6, last_prep_step = $7 + ord = $1, state = $2, prep_state = $3, prep_msg = $4, error_code = $5, + last_prep_step = $6 WHERE aggregation_job_id = (SELECT id FROM aggregation_jobs WHERE - aggregation_job_id = $8) + aggregation_job_id = $7) AND client_report_id = (SELECT id FROM client_reports - WHERE task_id = (SELECT id FROM tasks WHERE task_id = $9) - AND report_id = $10)", + WHERE task_id = (SELECT id FROM tasks WHERE task_id = $8) + AND report_id = $9)", ) .await?; check_single_row_mutation( @@ -2140,7 +2123,6 @@ impl Transaction<'_, C> { /* state */ &report_aggregation.state().state_code(), /* prep_state */ &encoded_state_values.prep_state, /* prep_msg */ &encoded_state_values.prep_msg, - /* out_share */ &encoded_state_values.output_share, /* error_code */ &encoded_state_values.report_share_err, /* last_prep_step */ &encoded_last_prep_step, /* aggregation_job_id */ @@ -4621,7 +4603,7 @@ pub mod models { #[derivative(Debug = "ignore")] A::PrepareState, #[derivative(Debug = "ignore")] Option, ), - Finished(#[derivative(Debug = "ignore")] A::OutputShare), + Finished, Failed(ReportShareError), Invalid, } @@ -4633,7 +4615,7 @@ pub mod models { match self { ReportAggregationState::Start => ReportAggregationStateCode::Start, ReportAggregationState::Waiting(_, _) => ReportAggregationStateCode::Waiting, - ReportAggregationState::Finished(_) => ReportAggregationStateCode::Finished, + ReportAggregationState::Finished => ReportAggregationStateCode::Finished, ReportAggregationState::Failed(_) => ReportAggregationStateCode::Failed, ReportAggregationState::Invalid => ReportAggregationStateCode::Invalid, } @@ -4655,12 +4637,7 @@ pub mod models { ..Default::default() } } - ReportAggregationState::Finished(output_share) => { - EncodedReportAggregationStateValues { - output_share: Some(output_share.get_encoded()), - ..Default::default() - } - } + ReportAggregationState::Finished => EncodedReportAggregationStateValues::default(), ReportAggregationState::Failed(report_share_err) => { EncodedReportAggregationStateValues { report_share_err: Some(*report_share_err as i16), @@ -4676,7 +4653,6 @@ pub mod models { pub(super) struct EncodedReportAggregationStateValues { pub(super) prep_state: Option>, pub(super) prep_msg: Option>, - pub(super) output_share: Option>, pub(super) report_share_err: Option, } @@ -4713,9 +4689,6 @@ pub mod models { Self::Waiting(lhs_prep_state, lhs_prep_msg), Self::Waiting(rhs_prep_state, rhs_prep_msg), ) => lhs_prep_state == rhs_prep_state && lhs_prep_msg == rhs_prep_msg, - (Self::Finished(lhs_out_share), Self::Finished(rhs_out_share)) => { - lhs_out_share == rhs_out_share - } (Self::Failed(lhs_report_share_err), Self::Failed(rhs_report_share_err)) => { lhs_report_share_err == rhs_report_share_err } @@ -7300,7 +7273,7 @@ mod tests { Some(vdaf_transcript.prepare_messages[0].clone()), ), ReportAggregationState::Waiting(leader_prep_state.clone(), None), - ReportAggregationState::Finished(vdaf_transcript.output_share(Role::Leader).clone()), + ReportAggregationState::Finished, ReportAggregationState::Failed(ReportShareError::VdafPrepError), ReportAggregationState::Invalid, ] @@ -7624,11 +7597,10 @@ mod tests { let want_report_aggregations = ds .run_tx(|tx| { - let (task, prep_msg, prep_state, output_share) = ( + let (task, prep_msg, prep_state) = ( task.clone(), vdaf_transcript.prepare_messages[0].clone(), vdaf_transcript.leader_prep_state(0).clone(), - vdaf_transcript.output_share(Role::Leader).clone(), ); Box::pin(async move { tx.put_task(&task).await?; @@ -7652,7 +7624,7 @@ mod tests { for (ord, state) in [ ReportAggregationState::::Start, ReportAggregationState::Waiting(prep_state.clone(), Some(prep_msg)), - ReportAggregationState::Finished(output_share), + ReportAggregationState::Finished, ReportAggregationState::Failed(ReportShareError::VdafPrepError), ReportAggregationState::Invalid, ] @@ -9563,7 +9535,7 @@ mod tests { clock.now(), 0, None, - ReportAggregationState::Finished(dummy_vdaf::OutputShare()), // Counted among min_size and max_size. + ReportAggregationState::Finished, // Counted among min_size and max_size. ); let report_aggregation_1_1 = ReportAggregation::<0, dummy_vdaf::Vdaf>::new( *task.id(), @@ -9572,7 +9544,7 @@ mod tests { clock.now(), 1, None, - ReportAggregationState::Finished(dummy_vdaf::OutputShare()), // Counted among min_size and max_size. + ReportAggregationState::Finished, // Counted among min_size and max_size. ); let report_aggregation_1_2 = ReportAggregation::<0, dummy_vdaf::Vdaf>::new( *task.id(), diff --git a/db/20230424220336_rm-out-share.down.sql b/db/20230424220336_rm-out-share.down.sql new file mode 100644 index 000000000..264083cc1 --- /dev/null +++ b/db/20230424220336_rm-out-share.down.sql @@ -0,0 +1 @@ +ALTER TABLE report_aggregations ADD COLUMN out_share BYTEA; diff --git a/db/20230424220336_rm-out-share.up.sql b/db/20230424220336_rm-out-share.up.sql new file mode 100644 index 000000000..b3483049c --- /dev/null +++ b/db/20230424220336_rm-out-share.up.sql @@ -0,0 +1 @@ +ALTER TABLE report_aggregations DROP COLUMN out_share;