diff --git a/aggregator/src/aggregator/aggregation_job_driver.rs b/aggregator/src/aggregator/aggregation_job_driver.rs index b2de59476..1e8ca2519 100644 --- a/aggregator/src/aggregator/aggregation_job_driver.rs +++ b/aggregator/src/aggregator/aggregation_job_driver.rs @@ -1500,7 +1500,7 @@ mod tests { assert_matches!( error.downcast().unwrap(), Error::Http(error_response) => { - assert_eq!(*error_response.status().unwrap(), StatusCode::INTERNAL_SERVER_ERROR); + assert_eq!(error_response.status(), StatusCode::INTERNAL_SERVER_ERROR); assert_eq!(*error_response.dap_problem_type().unwrap(), DapProblemType::UnauthorizedRequest); } ); @@ -2072,7 +2072,7 @@ mod tests { assert_matches!( error.downcast().unwrap(), Error::Http(error_response) => { - assert_eq!(*error_response.status().unwrap(), StatusCode::INTERNAL_SERVER_ERROR); + assert_eq!(error_response.status(), StatusCode::INTERNAL_SERVER_ERROR); assert_eq!(*error_response.dap_problem_type().unwrap(), DapProblemType::UnauthorizedRequest); } ); @@ -2794,7 +2794,7 @@ mod tests { assert_matches!( error.downcast().unwrap(), Error::Http(error_response) => { - assert_eq!(*error_response.status().unwrap(), StatusCode::INTERNAL_SERVER_ERROR); + assert_eq!(error_response.status(), StatusCode::INTERNAL_SERVER_ERROR); assert_eq!(*error_response.dap_problem_type().unwrap(), DapProblemType::UnrecognizedTask); } ); @@ -3182,7 +3182,7 @@ mod tests { assert_matches!( error.downcast().unwrap(), Error::Http(error_response) => { - assert_eq!(*error_response.status().unwrap(), StatusCode::INTERNAL_SERVER_ERROR); + assert_eq!(error_response.status(), StatusCode::INTERNAL_SERVER_ERROR); assert_eq!(*error_response.dap_problem_type().unwrap(), DapProblemType::UnrecognizedTask); } ); diff --git a/aggregator/src/aggregator/collection_job_driver.rs b/aggregator/src/aggregator/collection_job_driver.rs index d8d58aa5f..ef8d24d4e 100644 --- a/aggregator/src/aggregator/collection_job_driver.rs +++ b/aggregator/src/aggregator/collection_job_driver.rs @@ -16,7 +16,7 @@ use janus_aggregator_core::{ }, task, }; -use janus_core::{time::Clock, vdaf_dispatch}; +use janus_core::{retries::is_retryable_http_status, time::Clock, vdaf_dispatch}; use janus_messages::{ query_type::{FixedSize, QueryType, TimeInterval}, AggregateShare, AggregateShareReq, BatchSelector, @@ -33,7 +33,7 @@ use prio::{ use reqwest::Method; use std::{sync::Arc, time::Duration}; use tokio::try_join; -use tracing::{info, warn}; +use tracing::{error, info, warn}; /// Drives a collection job. #[derive(Derivative)] @@ -325,7 +325,7 @@ impl CollectionJobDriver { pub async fn abandon_collection_job( &self, datastore: Arc>, - lease: Lease, + lease: Arc>, ) -> Result<(), Error> { match lease.leased().query_type() { task::QueryType::TimeInterval => { @@ -360,13 +360,12 @@ impl CollectionJobDriver { &self, datastore: Arc>, vdaf: Arc, - lease: Lease, + lease: Arc>, ) -> Result<(), Error> where A::AggregationParam: Send + Sync, A::AggregateShare: Send + Sync, { - let lease = Arc::new(lease); datastore .run_tx("abandon_collection_job", |tx| { let (vdaf, lease) = (Arc::clone(&vdaf), Arc::clone(&lease)); @@ -428,10 +427,12 @@ impl CollectionJobDriver { ) -> impl Fn(Lease) -> BoxFuture<'static, Result<(), super::Error>> { move |collection_job_lease: Lease| { let (this, datastore) = (Arc::clone(&self), Arc::clone(&datastore)); + let collection_job_lease = Arc::new(collection_job_lease); Box::pin(async move { + let attempts = collection_job_lease.lease_attempts(); if collection_job_lease.lease_attempts() > maximum_attempts_before_failure { warn!( - attempts = %collection_job_lease.lease_attempts(), + attempts = %attempts, max_attempts = %maximum_attempts_before_failure, "Abandoning job due to too many failed attempts" ); @@ -445,11 +446,61 @@ impl CollectionJobDriver { this.metrics.job_steps_retried_counter.add(1, &[]); } - this.step_collection_job(datastore, Arc::new(collection_job_lease)) + match this + .step_collection_job(Arc::clone(&datastore), Arc::clone(&collection_job_lease)) .await + { + Ok(_) => Ok(()), + Err(error) => { + if !Self::is_retryable_error(&error) { + // Make a best-effort attempt to immediately cancel the collection job. + // on fatal errors. This protects the helper from performing wasted + // work. + // + // Cancellation might fail, but we will return the first error, since + // that's the more interesting error for debugging purposes. + // + // If cancellation fails, the job will be picked up again. This isn't + // a big deal, since stepping a collection job is idempotent. It would + // just be some wasted work next time around. + warn!( + attempts = %attempts, + max_attempts = %maximum_attempts_before_failure, + "Abandoning job due to fatal error" + ); + this.metrics.jobs_abandoned_counter.add(1, &[]); + if let Err(error) = this + .abandon_collection_job(datastore, collection_job_lease) + .await + { + error!(error = ?error, "Failed to abandon job"); + } + } + Err(error) + } + } }) } } + + /// Determines whether the given [`Error`] is retryable in the context of collection job + /// processing. + fn is_retryable_error(error: &Error) -> bool { + match error { + Error::Http(http_error_response) => { + is_retryable_http_status(http_error_response.status()) + } + Error::Datastore(error) => match error { + datastore::Error::Db(_) | datastore::Error::Pool(_) => true, + datastore::Error::User(error) => match error.downcast_ref::() { + Some(error) => Self::is_retryable_error(error), + None => false, + }, + _ => false, + }, + _ => false, + } + } } /// Holds various metrics instruments for a collection job driver. @@ -922,7 +973,7 @@ mod tests { error, Error::Http(error_response) => { assert_matches!(error_response.dap_problem_type(), Some(DapProblemType::BatchQueriedTooManyTimes)); - assert_eq!(*error_response.status().unwrap(), StatusCode::INTERNAL_SERVER_ERROR); + assert_eq!(error_response.status(), StatusCode::INTERNAL_SERVER_ERROR); } ); @@ -1030,8 +1081,9 @@ mod tests { ); // Run: abandon the collection job. + let lease = Arc::new(lease.unwrap()); collection_job_driver - .abandon_collection_job(Arc::clone(&ds), lease.unwrap()) + .abandon_collection_job(Arc::clone(&ds), lease) .await .unwrap(); @@ -1066,7 +1118,101 @@ mod tests { } #[tokio::test] - async fn abandon_failing_collection_job() { + async fn abandon_failing_collection_job_with_fatal_error() { + install_test_trace_subscriber(); + let mut server = mockito::Server::new_async().await; + let clock = MockClock::default(); + let mut runtime_manager = TestRuntimeManager::new(); + let ephemeral_datastore = ephemeral_datastore().await; + let ds = Arc::new(ephemeral_datastore.datastore(clock.clone()).await); + let stopper = Stopper::new(); + + let (task, _, collection_job) = + setup_collection_job_test_case(&mut server, clock.clone(), Arc::clone(&ds), false) + .await; + + // Set up the collection job driver + let collection_job_driver = Arc::new(CollectionJobDriver::new( + reqwest::Client::new(), + &noop_meter(), + 1, + )); + let job_driver = Arc::new( + JobDriver::new( + clock.clone(), + runtime_manager.with_label("stepper"), + noop_meter(), + stopper.clone(), + StdDuration::from_secs(1), + 10, + StdDuration::from_secs(60), + collection_job_driver.make_incomplete_job_acquirer_callback( + Arc::clone(&ds), + StdDuration::from_secs(600), + ), + collection_job_driver.make_job_stepper_callback(Arc::clone(&ds), 3), + ) + .unwrap(), + ); + + // Set up an error response from the server that returns a non-retryable error. + let failure_mock = server + .mock("POST", task.aggregate_shares_uri().unwrap().path()) + .with_status(404) + .expect(1) + .create_async() + .await; + // Set up an extra response that should never be used, to make sure the job driver doesn't + // make more requests than we expect. If there were no remaining mocks, mockito would have + // respond with a fallback error response instead. + let no_more_requests_mock = server + .mock("POST", task.aggregate_shares_uri().unwrap().path()) + .with_status(502) + .expect(1) + .create_async() + .await; + + // Start up the job driver. + let task_handle = runtime_manager.with_label("driver").spawn(job_driver.run()); + + // Wait for the next task to be spawned and to complete. + runtime_manager.wait_for_completed_tasks("stepper", 1).await; + // Advance the clock by the lease duration, so that the job driver can pick up the job + // and try again. + clock.advance(&Duration::from_seconds(600)); + + // Shut down the job driver. + stopper.stop(); + task_handle.await.unwrap(); + + // Check that the job driver made the HTTP requests we expected. + failure_mock.assert_async().await; + assert!(!no_more_requests_mock.matched_async().await); + + // Confirm that the collection job was abandoned. + let collection_job_after = ds + .run_unnamed_tx(|tx| { + let collection_job = collection_job.clone(); + Box::pin(async move { + tx.get_collection_job::<0, TimeInterval, dummy_vdaf::Vdaf>( + &dummy_vdaf::Vdaf::new(), + collection_job.task_id(), + collection_job.id(), + ) + .await + }) + }) + .await + .unwrap() + .unwrap(); + assert_eq!( + collection_job_after, + collection_job.with_state(CollectionJobState::Abandoned), + ); + } + + #[tokio::test] + async fn abandon_failing_collection_job_with_retryable_error() { install_test_trace_subscriber(); let mut server = mockito::Server::new_async().await; let clock = MockClock::default(); @@ -1104,10 +1250,11 @@ mod tests { ); // Set up three error responses from our mock helper. These will cause errors in the - // leader, because the response body is empty and cannot be decoded. + // leader, because the response body is empty and cannot be decoded. The error status + // indicates that the error is retryable. let failure_mock = server .mock("POST", task.aggregate_shares_uri().unwrap().path()) - .with_status(500) + .with_status(502) .expect(3) .create_async() .await; diff --git a/client/src/lib.rs b/client/src/lib.rs index ac28e0176..4bb0f5121 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -580,7 +580,7 @@ mod tests { assert_matches!( client.upload(&1).await, Err(Error::Http(error_response)) => { - assert_eq!(*error_response.status().unwrap(), StatusCode::NOT_IMPLEMENTED); + assert_eq!(error_response.status(), StatusCode::NOT_IMPLEMENTED); } ); @@ -613,7 +613,7 @@ mod tests { assert_matches!( client.upload(&1).await, Err(Error::Http(error_response)) => { - assert_eq!(*error_response.status().unwrap(), StatusCode::BAD_REQUEST); + assert_eq!(error_response.status(), StatusCode::BAD_REQUEST); assert_eq!( error_response.type_uri().unwrap(), "urn:ietf:params:ppm:dap:error:invalidMessage" diff --git a/collector/src/lib.rs b/collector/src/lib.rs index 1df020fcb..8c2b8969f 100644 --- a/collector/src/lib.rs +++ b/collector/src/lib.rs @@ -1408,7 +1408,7 @@ mod tests { .await .unwrap_err(); assert_matches!(error, Error::Http(error_response) => { - assert_eq!(*error_response.status().unwrap(), StatusCode::INTERNAL_SERVER_ERROR); + assert_eq!(error_response.status(), StatusCode::INTERNAL_SERVER_ERROR); assert!(error_response.dap_problem_type().is_none()); }); @@ -1432,7 +1432,7 @@ mod tests { .await .unwrap_err(); assert_matches!(error, Error::Http(error_response) => { - assert_eq!(*error_response.status().unwrap(), StatusCode::INTERNAL_SERVER_ERROR); + assert_eq!(error_response.status(), StatusCode::INTERNAL_SERVER_ERROR); assert_eq!(error_response.type_uri().unwrap(), "http://example.com/test_server_error"); assert!(error_response.dap_problem_type().is_none()); }); @@ -1461,7 +1461,7 @@ mod tests { .await .unwrap_err(); assert_matches!(error, Error::Http(error_response) => { - assert_eq!(*error_response.status().unwrap(), StatusCode::BAD_REQUEST); + assert_eq!(error_response.status(), StatusCode::BAD_REQUEST); assert_eq!(error_response.type_uri().unwrap(), "urn:ietf:params:ppm:dap:error:invalidMessage"); assert_eq!(error_response.detail().unwrap(), "The message type for a response was incorrect or the payload was malformed."); assert_eq!(*error_response.dap_problem_type().unwrap(), DapProblemType::InvalidMessage); @@ -1506,7 +1506,7 @@ mod tests { .unwrap(); let error = collector.poll_once(&job).await.unwrap_err(); assert_matches!(error, Error::Http(error_response) => { - assert_eq!(*error_response.status().unwrap(), StatusCode::INTERNAL_SERVER_ERROR); + assert_eq!(error_response.status(), StatusCode::INTERNAL_SERVER_ERROR); assert!(error_response.dap_problem_type().is_none()); }); @@ -1528,7 +1528,7 @@ mod tests { let error = collector.poll_once(&job).await.unwrap_err(); assert_matches!(error, Error::Http(error_response) => { - assert_eq!(*error_response.status().unwrap(), StatusCode::INTERNAL_SERVER_ERROR); + assert_eq!(error_response.status(), StatusCode::INTERNAL_SERVER_ERROR); assert_eq!(error_response.type_uri().unwrap(), "http://example.com/test_server_error"); assert!(error_response.dap_problem_type().is_none()); }); @@ -1552,7 +1552,7 @@ mod tests { let error = collector.poll_once(&job).await.unwrap_err(); assert_matches!(error, Error::Http(error_response) => { - assert_eq!(*error_response.status().unwrap(), StatusCode::BAD_REQUEST); + assert_eq!(error_response.status(), StatusCode::BAD_REQUEST); assert_eq!(error_response.type_uri().unwrap(), "urn:ietf:params:ppm:dap:error:invalidMessage"); assert_eq!(error_response.detail().unwrap(), "The message type for a response was incorrect or the payload was malformed."); assert_eq!(*error_response.dap_problem_type().unwrap(), DapProblemType::InvalidMessage); @@ -1701,7 +1701,7 @@ mod tests { .await; let error = collector.poll_until_complete(&job).await.unwrap_err(); assert_matches!(error, Error::Http(error_response) => { - assert_eq!(*error_response.status().unwrap(), StatusCode::INTERNAL_SERVER_ERROR); + assert_eq!(error_response.status(), StatusCode::INTERNAL_SERVER_ERROR); assert!(error_response.dap_problem_type().is_none()); }); mock_collection_job_always_fail.assert_async().await; @@ -1948,7 +1948,7 @@ mod tests { .await .unwrap_err(); assert_matches!(error, Error::Http(error_response) => { - assert_eq!(*error_response.status().unwrap(), StatusCode::INTERNAL_SERVER_ERROR); + assert_eq!(error_response.status(), StatusCode::INTERNAL_SERVER_ERROR); }); mock_error.assert_async().await; diff --git a/core/src/http.rs b/core/src/http.rs index 483b53b02..8c0229a33 100644 --- a/core/src/http.rs +++ b/core/src/http.rs @@ -36,8 +36,9 @@ impl HttpErrorResponse { } /// The HTTP status code returned by the server. - pub fn status(&self) -> Option<&StatusCode> { - self.problem_details.status.as_ref() + pub fn status(&self) -> StatusCode { + // Unwrap safety: Self::from_response() always populates this field. + self.problem_details.status.unwrap() } /// A URI that identifies the problem type. diff --git a/core/src/retries.rs b/core/src/retries.rs index 7d8c7a250..85b1dd32b 100644 --- a/core/src/retries.rs +++ b/core/src/retries.rs @@ -86,10 +86,7 @@ where // reqwest::Response, which the caller may need in order to examine its body or headers. match request_fn().await { Ok(response) => { - if (response.status().is_server_error() - && response.status() != StatusCode::NOT_IMPLEMENTED) - || response.status() == StatusCode::TOO_MANY_REQUESTS - { + if is_retryable_http_status(response.status()) { warn!(?response, "encountered retryable server error"); return Err(backoff::Error::transient(Ok(response))); } @@ -120,6 +117,11 @@ where .await } +pub fn is_retryable_http_status(status: StatusCode) -> bool { + (status.is_server_error() && status != StatusCode::NOT_IMPLEMENTED) + || status == StatusCode::TOO_MANY_REQUESTS +} + #[cfg(test)] mod tests { use crate::{