From 712bec0000021234e6ec78d99684c31214dd6386 Mon Sep 17 00:00:00 2001 From: Ameer Ghani Date: Mon, 22 Jan 2024 13:33:04 -0500 Subject: [PATCH 1/2] Abandon aggregation jobs early when a fatal error is encountered (#2502) * Abandon aggregation jobs early when a fatal error is encountered * Update aggregator/src/aggregator/aggregation_job_driver.rs Co-authored-by: Brandon Pitman * Rebase pain --------- Co-authored-by: Brandon Pitman --- .../src/aggregator/aggregation_job_driver.rs | 371 +++++++++++++++--- .../src/aggregator/collection_job_driver.rs | 23 +- 2 files changed, 333 insertions(+), 61 deletions(-) diff --git a/aggregator/src/aggregator/aggregation_job_driver.rs b/aggregator/src/aggregator/aggregation_job_driver.rs index 1aa1e2f0d..3caaddeb4 100644 --- a/aggregator/src/aggregator/aggregation_job_driver.rs +++ b/aggregator/src/aggregator/aggregation_job_driver.rs @@ -1,9 +1,10 @@ +use super::{error::handle_ping_pong_error, Error}; use crate::aggregator::{ accumulator::Accumulator, aggregate_step_failure_counter, aggregation_job_writer::AggregationJobWriter, http_handlers::AGGREGATION_JOB_ROUTE, query_type::CollectableQueryType, send_request_to_helper, }; -use anyhow::{anyhow, Context as _, Result}; +use anyhow::{anyhow, Context as _}; use derivative::Derivative; use futures::future::{try_join_all, BoxFuture, FutureExt}; use janus_aggregator_core::{ @@ -17,7 +18,7 @@ use janus_aggregator_core::{ }, task::{self, AggregatorTask, VerifyKey}, }; -use janus_core::{time::Clock, vdaf_dispatch}; +use janus_core::{retries::is_retryable_http_status, time::Clock, vdaf_dispatch}; use janus_messages::{ query_type::{FixedSize, TimeInterval}, AggregationJobContinueReq, AggregationJobInitializeReq, AggregationJobResp, @@ -40,9 +41,7 @@ use std::{ time::Duration, }; use tokio::try_join; -use tracing::{debug, info, trace_span, warn}; - -use super::error::handle_ping_pong_error; +use tracing::{debug, error, info, trace_span, warn}; #[derive(Derivative)] #[derivative(Debug)] @@ -103,7 +102,7 @@ impl AggregationJobDriver { &self, datastore: Arc>, lease: Arc>, - ) -> Result<()> { + ) -> Result<(), Error> { match lease.leased().query_type() { task::QueryType::TimeInterval => { vdaf_dispatch!(lease.leased().vdaf(), (vdaf, VdafType, VERIFY_KEY_LENGTH) => { @@ -128,7 +127,7 @@ impl AggregationJobDriver { datastore: Arc>, vdaf: Arc, lease: Arc>, - ) -> Result<()> + ) -> Result<(), Error> where A: 'static + Send + Sync, A::AggregationParam: Send + Sync + PartialEq + Eq, @@ -239,8 +238,9 @@ impl AggregationJobDriver { ReportAggregationState::Start => saw_start = true, ReportAggregationState::WaitingLeader(_) => saw_waiting = true, ReportAggregationState::WaitingHelper(_) => { - return Err(anyhow!( + return Err(Error::Internal( "Leader encountered unexpected ReportAggregationState::WaitingHelper" + .to_string(), )); } ReportAggregationState::Finished => saw_finished = true, @@ -276,13 +276,10 @@ impl AggregationJobDriver { .await } - _ => Err(anyhow!( - "unexpected combination of report aggregation states (saw_start = {}, saw_waiting \ - = {}, saw_finished = {})", - saw_start, - saw_waiting, - saw_finished - )), + _ => Err(Error::Internal(format!( + "unexpected combination of report aggregation states (saw_start = {saw_start}, \ + saw_waiting = {saw_waiting}, saw_finished = {saw_finished})", + ))), } } @@ -302,7 +299,7 @@ impl AggregationJobDriver { report_aggregations: Vec>, client_reports: HashMap>, verify_key: VerifyKey, - ) -> Result<()> + ) -> Result<(), Error> where A: 'static, A::AggregationParam: Send + Sync + PartialEq + Eq, @@ -410,8 +407,6 @@ impl AggregationJobDriver { let resp = if !prepare_inits.is_empty() { // Construct request, send it to the helper, and process the response. - // TODO(#235): abandon work immediately on "terminal" failures from helper, or other - // unexpected cases such as unknown/unexpected content type. let req = AggregationJobInitializeReq::::new( aggregation_job.aggregation_parameter().get_encoded(), PartialBatchSelector::new(aggregation_job.partial_batch_identifier().clone()), @@ -422,14 +417,19 @@ impl AggregationJobDriver { &self.http_client, Method::PUT, task.aggregation_job_uri(aggregation_job.id())? - .ok_or_else(|| anyhow!("task is not leader and has no aggregation job URI"))?, + .ok_or_else(|| { + Error::InvalidConfiguration( + "task is not leader and has no aggregate share URI", + ) + })?, AGGREGATION_JOB_ROUTE, AggregationJobInitializeReq::::MEDIA_TYPE, req, // The only way a task wouldn't have an aggregator auth token in it is in the taskprov // case, and Janus never acts as the leader with taskprov enabled. - task.aggregator_auth_token() - .ok_or_else(|| anyhow!("task has no aggregator auth token"))?, + task.aggregator_auth_token().ok_or_else(|| { + Error::InvalidConfiguration("no aggregator auth token in task") + })?, &self.http_request_duration_histogram, ) .await?; @@ -468,7 +468,7 @@ impl AggregationJobDriver { task: Arc, aggregation_job: AggregationJob, report_aggregations: Vec>, - ) -> Result<()> + ) -> Result<(), Error> where A: 'static, A::AggregationParam: Send + Sync + PartialEq + Eq, @@ -515,22 +515,22 @@ impl AggregationJobDriver { } // Construct request, send it to the helper, and process the response. - // TODO(#235): abandon work immediately on "terminal" failures from helper, or other - // unexpected cases such as unknown/unexpected content type. let req = AggregationJobContinueReq::new(aggregation_job.step(), prepare_continues); let resp_bytes = send_request_to_helper( &self.http_client, Method::POST, task.aggregation_job_uri(aggregation_job.id())? - .ok_or_else(|| anyhow!("task is not leader and has no aggregation job URI"))?, + .ok_or_else(|| { + Error::InvalidConfiguration("task is not leader and has no aggregate share URI") + })?, AGGREGATION_JOB_ROUTE, AggregationJobContinueReq::MEDIA_TYPE, req, // The only way a task wouldn't have an aggregator auth token in it is in the taskprov // case, and Janus never acts as the leader with taskprov enabled. task.aggregator_auth_token() - .ok_or_else(|| anyhow!("task has no aggregator auth token"))?, + .ok_or_else(|| Error::InvalidConfiguration("no aggregator auth token in task"))?, &self.http_request_duration_histogram, ) .await?; @@ -567,7 +567,7 @@ impl AggregationJobDriver { mut report_aggregations_to_write: Vec>, report_ids_to_scrub: Vec, helper_prep_resps: &[PrepareResp], - ) -> Result<()> + ) -> Result<(), Error> where A: 'static, A::AggregationParam: Send + Sync + Eq + PartialEq, @@ -579,8 +579,9 @@ impl AggregationJobDriver { { // Handle response, computing the new report aggregations to be stored. if stepped_aggregations.len() != helper_prep_resps.len() { - return Err(anyhow!( + return Err(Error::Internal( "missing, duplicate, out-of-order, or unexpected prepare steps in response" + .to_string(), )); } let mut accumulator = Accumulator::::new( @@ -592,8 +593,9 @@ impl AggregationJobDriver { stepped_aggregations.iter().zip(helper_prep_resps) { if helper_prep_resp.report_id() != stepped_aggregation.report_aggregation.report_id() { - return Err(anyhow!( + return Err(Error::Internal( "missing, duplicate, out-of-order, or unexpected prepare steps in response" + .to_string(), )); } @@ -758,11 +760,11 @@ impl AggregationJobDriver { Ok(()) } - async fn cancel_aggregation_job( + async fn abandon_aggregation_job( &self, datastore: Arc>, - lease: Lease, - ) -> Result<()> { + lease: Arc>, + ) -> Result<(), Error> { match lease.leased().query_type() { task::QueryType::TimeInterval => { vdaf_dispatch!(lease.leased().vdaf(), (vdaf, VdafType, VERIFY_KEY_LENGTH) => { @@ -798,8 +800,8 @@ impl AggregationJobDriver { &self, vdaf: A, datastore: Arc>, - lease: Lease, - ) -> Result<()> + lease: Arc>, + ) -> Result<(), Error> where A: Send + Sync + 'static, A::AggregateShare: Send + Sync, @@ -809,7 +811,6 @@ impl AggregationJobDriver { for<'a> A::PrepareState: Send + Sync + Encode + ParameterizedDecode<(&'a A, usize)>, { let vdaf = Arc::new(vdaf); - let lease = Arc::new(lease); datastore .run_tx("cancel_aggregation_job", |tx| { let vdaf = Arc::clone(&vdaf); @@ -899,29 +900,80 @@ impl AggregationJobDriver { self: Arc, datastore: Arc>, maximum_attempts_before_failure: usize, - ) -> impl Fn(Lease) -> BoxFuture<'static, Result<(), anyhow::Error>> - { + ) -> impl Fn(Lease) -> BoxFuture<'static, Result<(), Error>> { move |lease| { let (this, datastore) = (Arc::clone(&self), Arc::clone(&datastore)); + let lease = Arc::new(lease); Box::pin(async move { - if lease.lease_attempts() > maximum_attempts_before_failure { + let attempts = lease.lease_attempts(); + if attempts > maximum_attempts_before_failure { warn!( attempts = %lease.lease_attempts(), max_attempts = %maximum_attempts_before_failure, - "Canceling job due to too many failed attempts" + "Abandoning job due to too many failed attempts" ); this.job_cancel_counter.add(1, &[]); - return this.cancel_aggregation_job(datastore, lease).await; + return this.abandon_aggregation_job(datastore, lease).await; } - if lease.lease_attempts() > 1 { + if attempts > 1 { this.job_retry_counter.add(1, &[]); } - this.step_aggregation_job(datastore, Arc::new(lease)).await + match this + .step_aggregation_job(Arc::clone(&datastore), Arc::clone(&lease)) + .await + { + Ok(_) => Ok(()), + Err(error) => { + if !Self::is_retryable_error(&error) { + // Make a best-effort attempt to immediately cancel the aggregation job. + // on fatal errors. This protects the helper from performing wasted + // work. + // + // Cancellation might fail, but we will return the first error, since + // that's the more interesting error for debugging purposes. + // + // If cancellation fails, the job will be picked up again. This isn't + // a big deal, since stepping an aggregation job is idempotent. It would + // just be some wasted work next time around. + warn!( + %attempts, + max_attempts = %maximum_attempts_before_failure, + ?error, + "Abandoning job due to fatal error" + ); + this.job_cancel_counter.add(1, &[]); + if let Err(error) = this.abandon_aggregation_job(datastore, lease).await + { + error!(error = ?error, "Failed to abandon job"); + } + } + Err(error) + } + } }) } } + + /// Determines whether the given [`Error`] is retryable in the context of aggregation job + /// processing. + fn is_retryable_error(error: &Error) -> bool { + match error { + Error::Http(http_error_response) => { + is_retryable_http_status(http_error_response.status()) + } + Error::Datastore(error) => match error { + datastore::Error::Db(_) | datastore::Error::Pool(_) => true, + datastore::Error::User(error) => match error.downcast_ref::() { + Some(error) => Self::is_retryable_error(error), + None => false, + }, + _ => false, + }, + _ => false, + } + } } /// SteppedAggregation represents a report aggregation along with the associated preparation-state @@ -1498,7 +1550,7 @@ mod tests { .await .unwrap_err(); assert_matches!( - error.downcast().unwrap(), + error, Error::Http(error_response) => { assert_eq!(error_response.status(), StatusCode::INTERNAL_SERVER_ERROR); assert_eq!(*error_response.dap_problem_type().unwrap(), DapProblemType::UnauthorizedRequest); @@ -2390,7 +2442,7 @@ mod tests { .await .unwrap_err(); assert_matches!( - error.downcast().unwrap(), + error, Error::Http(error_response) => { assert_eq!(error_response.status(), StatusCode::INTERNAL_SERVER_ERROR); assert_eq!(*error_response.dap_problem_type().unwrap(), DapProblemType::UnauthorizedRequest); @@ -3112,7 +3164,7 @@ mod tests { .await .unwrap_err(); assert_matches!( - error.downcast().unwrap(), + error, Error::Http(error_response) => { assert_eq!(error_response.status(), StatusCode::INTERNAL_SERVER_ERROR); assert_eq!(*error_response.dap_problem_type().unwrap(), DapProblemType::UnrecognizedTask); @@ -3500,7 +3552,7 @@ mod tests { .await .unwrap_err(); assert_matches!( - error.downcast().unwrap(), + error, Error::Http(error_response) => { assert_eq!(error_response.status(), StatusCode::INTERNAL_SERVER_ERROR); assert_eq!(*error_response.dap_problem_type().unwrap(), DapProblemType::UnrecognizedTask); @@ -3750,7 +3802,7 @@ mod tests { 32, ); aggregation_job_driver - .cancel_aggregation_job(Arc::clone(&ds), lease) + .abandon_aggregation_job(Arc::clone(&ds), Arc::new(lease)) .await .unwrap(); @@ -3845,7 +3897,7 @@ mod tests { } #[tokio::test] - async fn abandon_failing_aggregation_job() { + async fn abandon_failing_aggregation_job_with_retryable_error() { install_test_trace_subscriber(); let mut server = mockito::Server::new_async().await; let clock = MockClock::default(); @@ -4066,4 +4118,227 @@ mod tests { ), ); } + + #[tokio::test] + async fn abandon_failing_aggregation_job_with_fatal_error() { + install_test_trace_subscriber(); + let mut server = mockito::Server::new_async().await; + let clock = MockClock::default(); + let mut runtime_manager = TestRuntimeManager::new(); + let ephemeral_datastore = ephemeral_datastore().await; + let ds = Arc::new(ephemeral_datastore.datastore(clock.clone()).await); + let stopper = Stopper::new(); + + let task = TaskBuilder::new(QueryType::TimeInterval, VdafInstance::Prio3Count) + .with_helper_aggregator_endpoint(server.url().parse().unwrap()) + .build(); + + let leader_task = task.leader_view().unwrap(); + let agg_auth_token = task.aggregator_auth_token(); + let aggregation_job_id = random(); + let verify_key: VerifyKey = task.vdaf_verify_key().unwrap(); + + let helper_hpke_keypair = generate_test_hpke_config_and_private_key(); + + let vdaf = Prio3::new_count(2).unwrap(); + let time = clock + .now() + .to_batch_interval_start(task.time_precision()) + .unwrap(); + let batch_identifier = TimeInterval::to_batch_identifier(&leader_task, &(), &time).unwrap(); + let report_metadata = ReportMetadata::new(random(), time); + let transcript = run_vdaf( + &vdaf, + verify_key.as_bytes(), + &(), + report_metadata.id(), + &false, + ); + let report = generate_report::( + *task.id(), + report_metadata, + helper_hpke_keypair.config(), + transcript.public_share, + Vec::new(), + &transcript.leader_input_share, + &transcript.helper_input_share, + ); + + // Set up fixtures in the database. + ds.run_unnamed_tx(|tx| { + let vdaf = vdaf.clone(); + let task = leader_task.clone(); + let report = report.clone(); + Box::pin(async move { + tx.put_aggregator_task(&task).await?; + + // We need to store a well-formed report, as it will get parsed by the leader and + // run through initial VDAF preparation before sending a request to the helper. + tx.put_client_report(&vdaf, &report).await?; + + tx.put_aggregation_job(&AggregationJob::< + VERIFY_KEY_LENGTH, + TimeInterval, + Prio3Count, + >::new( + *task.id(), + aggregation_job_id, + (), + (), + Interval::new(Time::from_seconds_since_epoch(0), Duration::from_seconds(1)) + .unwrap(), + AggregationJobState::InProgress, + AggregationJobStep::from(0), + )) + .await?; + + tx.put_report_aggregation( + &ReportAggregation::::new( + *task.id(), + aggregation_job_id, + *report.metadata().id(), + *report.metadata().time(), + 0, + None, + ReportAggregationState::Start, + ), + ) + .await?; + + tx.put_batch(&Batch::::new( + *task.id(), + batch_identifier, + (), + BatchState::Open, + 1, + Interval::from_time(report.metadata().time()).unwrap(), + )) + .await?; + + Ok(()) + }) + }) + .await + .unwrap(); + + // Set up the aggregation job driver. + let aggregation_job_driver = Arc::new(AggregationJobDriver::new( + reqwest::Client::new(), + &noop_meter(), + 32, + )); + let job_driver = Arc::new( + JobDriver::new( + clock.clone(), + runtime_manager.with_label("stepper"), + noop_meter(), + stopper.clone(), + StdDuration::from_secs(1), + 10, + StdDuration::from_secs(60), + aggregation_job_driver.make_incomplete_job_acquirer_callback( + Arc::clone(&ds), + StdDuration::from_secs(600), + ), + aggregation_job_driver.make_job_stepper_callback(Arc::clone(&ds), 3), + ) + .unwrap(), + ); + + // Set up one fatal error response from our mock helper. These will cause errors in the + // leader, because the response body is empty and cannot be decoded. + let (header, value) = agg_auth_token.request_authentication(); + let failure_mock = server + .mock( + "PUT", + task.aggregation_job_uri(&aggregation_job_id) + .unwrap() + .path(), + ) + .match_header(header, value.as_str()) + .match_header( + CONTENT_TYPE.as_str(), + AggregationJobInitializeReq::::MEDIA_TYPE, + ) + .with_status(404) + .expect(1) + .create_async() + .await; + // Set up an extra response that should never be used, to make sure the job driver doesn't + // make more requests than we expect. If there were no remaining mocks, mockito would have + // respond with a fallback error response instead. + let no_more_requests_mock = server + .mock( + "PUT", + task.aggregation_job_uri(&aggregation_job_id) + .unwrap() + .path(), + ) + .match_header(header, value.as_str()) + .match_header( + CONTENT_TYPE.as_str(), + AggregationJobInitializeReq::::MEDIA_TYPE, + ) + .with_status(500) + .expect(1) + .create_async() + .await; + + // Start up the job driver. + let task_handle = runtime_manager.with_label("driver").spawn(job_driver.run()); + // Wait for the next task to be spawned and to complete. + runtime_manager.wait_for_completed_tasks("stepper", 1).await; + // Advance the clock by the lease duration, so that the job driver can pick up the job + // and try again. + clock.advance(&Duration::from_seconds(600)); + stopper.stop(); + task_handle.await.unwrap(); + + // Check that the job driver made the HTTP requests we expected. + failure_mock.assert_async().await; + assert!(!no_more_requests_mock.matched_async().await); + + // Confirm in the database that the job was abandoned. + let (got_aggregation_job, got_batch) = ds + .run_unnamed_tx(|tx| { + let task = task.clone(); + Box::pin(async move { + let got_aggregation_job = tx + .get_aggregation_job(task.id(), &aggregation_job_id) + .await? + .unwrap(); + let got_batch = tx + .get_batch(task.id(), &batch_identifier, &()) + .await? + .unwrap(); + Ok((got_aggregation_job, got_batch)) + }) + }) + .await + .unwrap(); + assert_eq!( + got_aggregation_job, + AggregationJob::::new( + *task.id(), + aggregation_job_id, + (), + (), + Interval::new(Time::from_seconds_since_epoch(0), Duration::from_seconds(1)) + .unwrap(), + AggregationJobState::Abandoned, + AggregationJobStep::from(0), + ), + ); + assert_eq!( + got_batch, + Batch::::new( + *task.id(), + batch_identifier, + (), + BatchState::Open, + 0, + Interval::from_time(report.metadata().time()).unwrap(), + ), + ); + } } diff --git a/aggregator/src/aggregator/collection_job_driver.rs b/aggregator/src/aggregator/collection_job_driver.rs index ef8d24d4e..9711f2317 100644 --- a/aggregator/src/aggregator/collection_job_driver.rs +++ b/aggregator/src/aggregator/collection_job_driver.rs @@ -425,29 +425,27 @@ impl CollectionJobDriver { datastore: Arc>, maximum_attempts_before_failure: usize, ) -> impl Fn(Lease) -> BoxFuture<'static, Result<(), super::Error>> { - move |collection_job_lease: Lease| { + move |lease: Lease| { let (this, datastore) = (Arc::clone(&self), Arc::clone(&datastore)); - let collection_job_lease = Arc::new(collection_job_lease); + let lease = Arc::new(lease); Box::pin(async move { - let attempts = collection_job_lease.lease_attempts(); - if collection_job_lease.lease_attempts() > maximum_attempts_before_failure { + let attempts = lease.lease_attempts(); + if attempts > maximum_attempts_before_failure { warn!( - attempts = %attempts, + %attempts, max_attempts = %maximum_attempts_before_failure, "Abandoning job due to too many failed attempts" ); this.metrics.jobs_abandoned_counter.add(1, &[]); - return this - .abandon_collection_job(datastore, collection_job_lease) - .await; + return this.abandon_collection_job(datastore, lease).await; } - if collection_job_lease.lease_attempts() > 1 { + if attempts > 1 { this.metrics.job_steps_retried_counter.add(1, &[]); } match this - .step_collection_job(Arc::clone(&datastore), Arc::clone(&collection_job_lease)) + .step_collection_job(Arc::clone(&datastore), Arc::clone(&lease)) .await { Ok(_) => Ok(()), @@ -466,12 +464,11 @@ impl CollectionJobDriver { warn!( attempts = %attempts, max_attempts = %maximum_attempts_before_failure, + ?error, "Abandoning job due to fatal error" ); this.metrics.jobs_abandoned_counter.add(1, &[]); - if let Err(error) = this - .abandon_collection_job(datastore, collection_job_lease) - .await + if let Err(error) = this.abandon_collection_job(datastore, lease).await { error!(error = ?error, "Failed to abandon job"); } From 9d856296478685e3b5d74a58c0ffaac18f5dd829 Mon Sep 17 00:00:00 2001 From: Ameer Ghani Date: Mon, 22 Jan 2024 13:46:56 -0500 Subject: [PATCH 2/2] Fix for 0.6 --- aggregator/src/aggregator/aggregation_job_driver.rs | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/aggregator/src/aggregator/aggregation_job_driver.rs b/aggregator/src/aggregator/aggregation_job_driver.rs index 3caaddeb4..7708c0b5a 100644 --- a/aggregator/src/aggregator/aggregation_job_driver.rs +++ b/aggregator/src/aggregator/aggregation_job_driver.rs @@ -4147,13 +4147,7 @@ mod tests { .unwrap(); let batch_identifier = TimeInterval::to_batch_identifier(&leader_task, &(), &time).unwrap(); let report_metadata = ReportMetadata::new(random(), time); - let transcript = run_vdaf( - &vdaf, - verify_key.as_bytes(), - &(), - report_metadata.id(), - &false, - ); + let transcript = run_vdaf(&vdaf, verify_key.as_bytes(), &(), report_metadata.id(), &0); let report = generate_report::( *task.id(), report_metadata,