From 9c9556a214c9f05a8bd505ac29ead1b789d99d62 Mon Sep 17 00:00:00 2001 From: Tim Geoghegan Date: Fri, 12 Feb 2021 07:58:57 -0800 Subject: [PATCH] facilitator: fail aggregations on bad batch The change in af63ea39 was based on faulty reasoning: you can't drop individual batches from the aggregation if the other side hasn't also dropped those batches, because otherwise their sum part will include shares that aren't matched by ours, introducing (as @winstrom put it) "infinite noise". What's worse, because I had punted on differentiating recoverable errors (e.g., getting a 500 response from GCS: if we retry, it'll probably work) from unrecoverable ones (e.g., an invalid signature), we effectively stopped retrying aggregation tasks, degrading our instance's output. This commit partially revers af63ea39, restoring the prior behavior where a single error causes facilitator to abort the entire aggregation. This means that PubSub (or SQS) will redeliver the task message, so we will once again get a reasonable number of retries for recoverable errors. We don't completely revert af63ea39 because that change did include some stuff we want to keep: metrics on invalid own and peer validation batches as well as some test coverage for these cases of aggregation failures. --- facilitator/src/aggregation.rs | 14 +--- facilitator/tests/integration_tests.rs | 108 +++++-------------------- 2 files changed, 22 insertions(+), 100 deletions(-) diff --git a/facilitator/src/aggregation.rs b/facilitator/src/aggregation.rs index f83dda55a..f6a5c0c76 100644 --- a/facilitator/src/aggregation.rs +++ b/facilitator/src/aggregation.rs @@ -108,18 +108,8 @@ impl<'a> BatchAggregator<'a> { .collect::>(); for batch_id in batch_ids { - if let Err(error) = - self.aggregate_share(&batch_id.0, &batch_id.1, &mut servers, &mut invalid_uuids) - { - log::info!( - "ignoring batch {}/{} due to error {:?}", - batch_id.0, - batch_id.1, - error - ); - } else { - included_batch_uuids.push(batch_id.0); - } + self.aggregate_share(&batch_id.0, &batch_id.1, &mut servers, &mut invalid_uuids)?; + included_batch_uuids.push(batch_id.0); callback(); } diff --git a/facilitator/tests/integration_tests.rs b/facilitator/tests/integration_tests.rs index 48285fc83..d22b058a6 100644 --- a/facilitator/tests/integration_tests.rs +++ b/facilitator/tests/integration_tests.rs @@ -34,9 +34,9 @@ fn inconsistent_ingestion_batches() { end_to_end_test(Some(3), Some(4)) } -/// This test verifies that if some subset of the batches being aggregated are -/// invalid due to either an invalid signature or an invalid packet file digest, -/// the remainder of the valid batches will still be summed. +/// This test verifies that aggregations fail as expected if some subset of the +/// batches being aggregated are invalid due to either an invalid signature or +/// an invalid packet file digest. #[test] fn aggregation_including_invalid_batch() { log_init(); @@ -82,10 +82,10 @@ fn aggregation_including_invalid_batch() { // We generate four batches: // - batches 1 and 2 will have valid ingestion, own and peer batches and // should be summed normally - // - batch 3's peer validation batches will be signed with the wrong key - // by PHA and facilitator - // - batch 4's own validation batches will have the wrong packet file - // digest inserted into their headers by PHA and facilitator + // - batch 3: PHA will sign the peer validation batch sent to facilitator + // with the wrong key + // - batch 4: facilitator will insert the wrong packet file digest into + // the header of the peer validation batch sent to PHA let batch_uuids_and_dates = vec![ (Uuid::new_v4(), date), (Uuid::new_v4(), date), @@ -168,23 +168,13 @@ fn aggregation_including_invalid_batch() { // Facilitator uses this transport to send correctly signed validation // batches to PHA - let mut facilitator_to_pha_validation_transport_valid_key = SignableTransport { + let mut facilitator_to_pha_validation_transport = SignableTransport { transport: Box::new(LocalFileTransport::new( pha_tempdir.path().join("peer-validation"), )), batch_signing_key: default_facilitator_signing_private_key(), }; - // Facilitator uses this transport to send incorrectly signed validation - // batches to facilitator - let mut facilitator_to_pha_validation_transport_wrong_key = SignableTransport { - transport: Box::new(LocalFileTransport::new( - pha_tempdir.path().join("peer-validation"), - )), - // Intentionally the wrong key - batch_signing_key: default_pha_signing_private_key(), - }; - // PHA uses this transport to send correctly signed validation batches to // itself let mut pha_own_validation_transport = SignableTransport { @@ -207,17 +197,11 @@ fn aggregation_including_invalid_batch() { // tampering with signatures or batch headers as needed. for (index, (uuid, _)) in batch_uuids_and_dates.iter().enumerate() { let pha_peer_validation_transport = if index == 2 { - log::info!("using wrong key for peer validations"); + log::info!("pha using wrong key for peer validations"); &mut pha_to_facilitator_validation_transport_wrong_key } else { &mut pha_to_facilitator_validation_transport_valid_key }; - let facilitator_peer_validation_transport = if index == 2 { - log::info!("using wrong key for peer validations"); - &mut facilitator_to_pha_validation_transport_wrong_key - } else { - &mut facilitator_to_pha_validation_transport_valid_key - }; let mut pha_batch_intaker = BatchIntaker::new( aggregation_name, @@ -236,7 +220,7 @@ fn aggregation_including_invalid_batch() { &date, &mut facilitator_ingest_transport, &mut facilitator_own_validation_transport, - facilitator_peer_validation_transport, + &mut facilitator_to_pha_validation_transport, false, // is_first ) .unwrap(); @@ -313,7 +297,7 @@ fn aggregation_including_invalid_batch() { }; // Perform the aggregation on PHA and facilitator - BatchAggregator::new( + let err = BatchAggregator::new( instance_name, aggregation_name, &start_date, @@ -326,9 +310,12 @@ fn aggregation_including_invalid_batch() { ) .unwrap() .generate_sum_part(&batch_uuids_and_dates, || {}) - .unwrap(); + .unwrap_err(); + // Ideally we would be able to match on a variant in an error enum to check + // what the failure was but for now check the error description + assert!(err.to_string().contains("packet file digest in header")); - BatchAggregator::new( + let err = BatchAggregator::new( instance_name, aggregation_name, &start_date, @@ -341,65 +328,10 @@ fn aggregation_including_invalid_batch() { ) .unwrap() .generate_sum_part(&batch_uuids_and_dates, || {}) - .unwrap(); - - // Read in sum parts written by PHA and facilitator - let mut pha_aggregation_batch_reader: BatchReader<'_, SumPart, InvalidPacket> = - BatchReader::new( - Batch::new_sum( - instance_name, - aggregation_name, - &start_date, - &end_date, - true, // is_first - ), - &mut *pha_aggregation_transport.transport, - ); - let pha_sum_part = pha_aggregation_batch_reader.header(&pha_pub_keys).unwrap(); - - let mut facilitator_aggregation_batch_reader: BatchReader<'_, SumPart, InvalidPacket> = - BatchReader::new( - Batch::new_sum( - instance_name, - aggregation_name, - &start_date, - &end_date, - false, // is_first - ), - &mut *facilitator_aggregation_transport.transport, - ); - let facilitator_sum_part = facilitator_aggregation_batch_reader - .header(&facilitator_pub_keys) - .unwrap(); - - // We expect only the first two batches to be included in the aggregation - // because we tampered with 3 and 4 - let reference_sum = reconstruct_shares(&reference_sums[0].sum, &reference_sums[1].sum).unwrap(); - let expected_included_batch_uuids = - vec![batch_uuids_and_dates[0].0, batch_uuids_and_dates[1].0]; - let expected_contributions_count = - reference_sums[0].contributions as i64 + reference_sums[1].contributions as i64; - - assert_eq!( - pha_sum_part.total_individual_clients, - expected_contributions_count - ); - assert_eq!(pha_sum_part.batch_uuids, expected_included_batch_uuids); - assert_eq!( - facilitator_sum_part.total_individual_clients, - expected_contributions_count - ); - assert_eq!( - facilitator_sum_part.batch_uuids, - expected_included_batch_uuids - ); - - let reconstructed_sum = reconstruct_shares( - &pha_sum_part.sum().unwrap(), - &facilitator_sum_part.sum().unwrap(), - ) - .unwrap(); - assert_eq!(reference_sum, reconstructed_sum); + .unwrap_err(); + assert!(err + .to_string() + .contains("key identifier default-facilitator-signing-key not present in key map")); } fn end_to_end_test(drop_nth_pha: Option, drop_nth_facilitator: Option) {