Skip to content

Commit

Permalink
Abandon collection jobs early when a fatal error is encountered (#2476)
Browse files Browse the repository at this point in the history
  • Loading branch information
inahga committed Jan 12, 2024
1 parent 79e515b commit 3a9f441
Show file tree
Hide file tree
Showing 6 changed files with 182 additions and 32 deletions.
8 changes: 4 additions & 4 deletions aggregator/src/aggregator/aggregation_job_driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1500,7 +1500,7 @@ mod tests {
assert_matches!(
error.downcast().unwrap(),
Error::Http(error_response) => {
assert_eq!(*error_response.status().unwrap(), StatusCode::INTERNAL_SERVER_ERROR);
assert_eq!(error_response.status(), StatusCode::INTERNAL_SERVER_ERROR);
assert_eq!(*error_response.dap_problem_type().unwrap(), DapProblemType::UnauthorizedRequest);
}
);
Expand Down Expand Up @@ -2392,7 +2392,7 @@ mod tests {
assert_matches!(
error.downcast().unwrap(),
Error::Http(error_response) => {
assert_eq!(*error_response.status().unwrap(), StatusCode::INTERNAL_SERVER_ERROR);
assert_eq!(error_response.status(), StatusCode::INTERNAL_SERVER_ERROR);
assert_eq!(*error_response.dap_problem_type().unwrap(), DapProblemType::UnauthorizedRequest);
}
);
Expand Down Expand Up @@ -3114,7 +3114,7 @@ mod tests {
assert_matches!(
error.downcast().unwrap(),
Error::Http(error_response) => {
assert_eq!(*error_response.status().unwrap(), StatusCode::INTERNAL_SERVER_ERROR);
assert_eq!(error_response.status(), StatusCode::INTERNAL_SERVER_ERROR);
assert_eq!(*error_response.dap_problem_type().unwrap(), DapProblemType::UnrecognizedTask);
}
);
Expand Down Expand Up @@ -3502,7 +3502,7 @@ mod tests {
assert_matches!(
error.downcast().unwrap(),
Error::Http(error_response) => {
assert_eq!(*error_response.status().unwrap(), StatusCode::INTERNAL_SERVER_ERROR);
assert_eq!(error_response.status(), StatusCode::INTERNAL_SERVER_ERROR);
assert_eq!(*error_response.dap_problem_type().unwrap(), DapProblemType::UnrecognizedTask);
}
);
Expand Down
171 changes: 159 additions & 12 deletions aggregator/src/aggregator/collection_job_driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use janus_aggregator_core::{
},
task,
};
use janus_core::{time::Clock, vdaf_dispatch};
use janus_core::{retries::is_retryable_http_status, time::Clock, vdaf_dispatch};
use janus_messages::{
query_type::{FixedSize, QueryType, TimeInterval},
AggregateShare, AggregateShareReq, BatchSelector,
Expand All @@ -33,7 +33,7 @@ use prio::{
use reqwest::Method;
use std::{sync::Arc, time::Duration};
use tokio::try_join;
use tracing::{info, warn};
use tracing::{error, info, warn};

/// Drives a collection job.
#[derive(Derivative)]
Expand Down Expand Up @@ -325,7 +325,7 @@ impl CollectionJobDriver {
pub async fn abandon_collection_job<C: Clock>(
&self,
datastore: Arc<Datastore<C>>,
lease: Lease<AcquiredCollectionJob>,
lease: Arc<Lease<AcquiredCollectionJob>>,
) -> Result<(), Error> {
match lease.leased().query_type() {
task::QueryType::TimeInterval => {
Expand Down Expand Up @@ -360,13 +360,12 @@ impl CollectionJobDriver {
&self,
datastore: Arc<Datastore<C>>,
vdaf: Arc<A>,
lease: Lease<AcquiredCollectionJob>,
lease: Arc<Lease<AcquiredCollectionJob>>,
) -> Result<(), Error>
where
A::AggregationParam: Send + Sync,
A::AggregateShare: Send + Sync,
{
let lease = Arc::new(lease);
datastore
.run_tx("abandon_collection_job", |tx| {
let (vdaf, lease) = (Arc::clone(&vdaf), Arc::clone(&lease));
Expand Down Expand Up @@ -428,10 +427,12 @@ impl CollectionJobDriver {
) -> impl Fn(Lease<AcquiredCollectionJob>) -> BoxFuture<'static, Result<(), super::Error>> {
move |collection_job_lease: Lease<AcquiredCollectionJob>| {
let (this, datastore) = (Arc::clone(&self), Arc::clone(&datastore));
let collection_job_lease = Arc::new(collection_job_lease);
Box::pin(async move {
let attempts = collection_job_lease.lease_attempts();
if collection_job_lease.lease_attempts() > maximum_attempts_before_failure {
warn!(
attempts = %collection_job_lease.lease_attempts(),
attempts = %attempts,
max_attempts = %maximum_attempts_before_failure,
"Abandoning job due to too many failed attempts"
);
Expand All @@ -445,11 +446,61 @@ impl CollectionJobDriver {
this.metrics.job_steps_retried_counter.add(1, &[]);
}

this.step_collection_job(datastore, Arc::new(collection_job_lease))
match this
.step_collection_job(Arc::clone(&datastore), Arc::clone(&collection_job_lease))
.await
{
Ok(_) => Ok(()),
Err(error) => {
if !Self::is_retryable_error(&error) {
// Make a best-effort attempt to immediately cancel the collection job.
// on fatal errors. This protects the helper from performing wasted
// work.
//
// Cancellation might fail, but we will return the first error, since
// that's the more interesting error for debugging purposes.
//
// If cancellation fails, the job will be picked up again. This isn't
// a big deal, since stepping a collection job is idempotent. It would
// just be some wasted work next time around.
warn!(
attempts = %attempts,
max_attempts = %maximum_attempts_before_failure,
"Abandoning job due to fatal error"
);
this.metrics.jobs_abandoned_counter.add(1, &[]);
if let Err(error) = this
.abandon_collection_job(datastore, collection_job_lease)
.await
{
error!(error = ?error, "Failed to abandon job");
}
}
Err(error)
}
}
})
}
}

/// Determines whether the given [`Error`] is retryable in the context of collection job
/// processing.
fn is_retryable_error(error: &Error) -> bool {
match error {
Error::Http(http_error_response) => {
is_retryable_http_status(http_error_response.status())
}
Error::Datastore(error) => match error {
datastore::Error::Db(_) | datastore::Error::Pool(_) => true,
datastore::Error::User(error) => match error.downcast_ref::<Error>() {
Some(error) => Self::is_retryable_error(error),
None => false,
},
_ => false,
},
_ => false,
}
}
}

/// Holds various metrics instruments for a collection job driver.
Expand Down Expand Up @@ -922,7 +973,7 @@ mod tests {
error,
Error::Http(error_response) => {
assert_matches!(error_response.dap_problem_type(), Some(DapProblemType::BatchQueriedTooManyTimes));
assert_eq!(*error_response.status().unwrap(), StatusCode::INTERNAL_SERVER_ERROR);
assert_eq!(error_response.status(), StatusCode::INTERNAL_SERVER_ERROR);
}
);

Expand Down Expand Up @@ -1030,8 +1081,9 @@ mod tests {
);

// Run: abandon the collection job.
let lease = Arc::new(lease.unwrap());
collection_job_driver
.abandon_collection_job(Arc::clone(&ds), lease.unwrap())
.abandon_collection_job(Arc::clone(&ds), lease)
.await
.unwrap();

Expand Down Expand Up @@ -1066,7 +1118,101 @@ mod tests {
}

#[tokio::test]
async fn abandon_failing_collection_job() {
async fn abandon_failing_collection_job_with_fatal_error() {
install_test_trace_subscriber();
let mut server = mockito::Server::new_async().await;
let clock = MockClock::default();
let mut runtime_manager = TestRuntimeManager::new();
let ephemeral_datastore = ephemeral_datastore().await;
let ds = Arc::new(ephemeral_datastore.datastore(clock.clone()).await);
let stopper = Stopper::new();

let (task, _, collection_job) =
setup_collection_job_test_case(&mut server, clock.clone(), Arc::clone(&ds), false)
.await;

// Set up the collection job driver
let collection_job_driver = Arc::new(CollectionJobDriver::new(
reqwest::Client::new(),
&noop_meter(),
1,
));
let job_driver = Arc::new(
JobDriver::new(
clock.clone(),
runtime_manager.with_label("stepper"),
noop_meter(),
stopper.clone(),
StdDuration::from_secs(1),
10,
StdDuration::from_secs(60),
collection_job_driver.make_incomplete_job_acquirer_callback(
Arc::clone(&ds),
StdDuration::from_secs(600),
),
collection_job_driver.make_job_stepper_callback(Arc::clone(&ds), 3),
)
.unwrap(),
);

// Set up an error response from the server that returns a non-retryable error.
let failure_mock = server
.mock("POST", task.aggregate_shares_uri().unwrap().path())
.with_status(404)
.expect(1)
.create_async()
.await;
// Set up an extra response that should never be used, to make sure the job driver doesn't
// make more requests than we expect. If there were no remaining mocks, mockito would have
// respond with a fallback error response instead.
let no_more_requests_mock = server
.mock("POST", task.aggregate_shares_uri().unwrap().path())
.with_status(502)
.expect(1)
.create_async()
.await;

// Start up the job driver.
let task_handle = runtime_manager.with_label("driver").spawn(job_driver.run());

// Wait for the next task to be spawned and to complete.
runtime_manager.wait_for_completed_tasks("stepper", 1).await;
// Advance the clock by the lease duration, so that the job driver can pick up the job
// and try again.
clock.advance(&Duration::from_seconds(600));

// Shut down the job driver.
stopper.stop();
task_handle.await.unwrap();

// Check that the job driver made the HTTP requests we expected.
failure_mock.assert_async().await;
assert!(!no_more_requests_mock.matched_async().await);

// Confirm that the collection job was abandoned.
let collection_job_after = ds
.run_unnamed_tx(|tx| {
let collection_job = collection_job.clone();
Box::pin(async move {
tx.get_collection_job::<0, TimeInterval, dummy_vdaf::Vdaf>(
&dummy_vdaf::Vdaf::new(),
collection_job.task_id(),
collection_job.id(),
)
.await
})
})
.await
.unwrap()
.unwrap();
assert_eq!(
collection_job_after,
collection_job.with_state(CollectionJobState::Abandoned),
);
}

#[tokio::test]
async fn abandon_failing_collection_job_with_retryable_error() {
install_test_trace_subscriber();
let mut server = mockito::Server::new_async().await;
let clock = MockClock::default();
Expand Down Expand Up @@ -1104,10 +1250,11 @@ mod tests {
);

// Set up three error responses from our mock helper. These will cause errors in the
// leader, because the response body is empty and cannot be decoded.
// leader, because the response body is empty and cannot be decoded. The error status
// indicates that the error is retryable.
let failure_mock = server
.mock("POST", task.aggregate_shares_uri().unwrap().path())
.with_status(500)
.with_status(502)
.expect(3)
.create_async()
.await;
Expand Down
4 changes: 2 additions & 2 deletions client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -580,7 +580,7 @@ mod tests {
assert_matches!(
client.upload(&1).await,
Err(Error::Http(error_response)) => {
assert_eq!(*error_response.status().unwrap(), StatusCode::NOT_IMPLEMENTED);
assert_eq!(error_response.status(), StatusCode::NOT_IMPLEMENTED);
}
);

Expand Down Expand Up @@ -613,7 +613,7 @@ mod tests {
assert_matches!(
client.upload(&1).await,
Err(Error::Http(error_response)) => {
assert_eq!(*error_response.status().unwrap(), StatusCode::BAD_REQUEST);
assert_eq!(error_response.status(), StatusCode::BAD_REQUEST);
assert_eq!(
error_response.type_uri().unwrap(),
"urn:ietf:params:ppm:dap:error:invalidMessage"
Expand Down
16 changes: 8 additions & 8 deletions collector/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1408,7 +1408,7 @@ mod tests {
.await
.unwrap_err();
assert_matches!(error, Error::Http(error_response) => {
assert_eq!(*error_response.status().unwrap(), StatusCode::INTERNAL_SERVER_ERROR);
assert_eq!(error_response.status(), StatusCode::INTERNAL_SERVER_ERROR);
assert!(error_response.dap_problem_type().is_none());
});

Expand All @@ -1432,7 +1432,7 @@ mod tests {
.await
.unwrap_err();
assert_matches!(error, Error::Http(error_response) => {
assert_eq!(*error_response.status().unwrap(), StatusCode::INTERNAL_SERVER_ERROR);
assert_eq!(error_response.status(), StatusCode::INTERNAL_SERVER_ERROR);
assert_eq!(error_response.type_uri().unwrap(), "http://example.com/test_server_error");
assert!(error_response.dap_problem_type().is_none());
});
Expand Down Expand Up @@ -1461,7 +1461,7 @@ mod tests {
.await
.unwrap_err();
assert_matches!(error, Error::Http(error_response) => {
assert_eq!(*error_response.status().unwrap(), StatusCode::BAD_REQUEST);
assert_eq!(error_response.status(), StatusCode::BAD_REQUEST);
assert_eq!(error_response.type_uri().unwrap(), "urn:ietf:params:ppm:dap:error:invalidMessage");
assert_eq!(error_response.detail().unwrap(), "The message type for a response was incorrect or the payload was malformed.");
assert_eq!(*error_response.dap_problem_type().unwrap(), DapProblemType::InvalidMessage);
Expand Down Expand Up @@ -1506,7 +1506,7 @@ mod tests {
.unwrap();
let error = collector.poll_once(&job).await.unwrap_err();
assert_matches!(error, Error::Http(error_response) => {
assert_eq!(*error_response.status().unwrap(), StatusCode::INTERNAL_SERVER_ERROR);
assert_eq!(error_response.status(), StatusCode::INTERNAL_SERVER_ERROR);
assert!(error_response.dap_problem_type().is_none());
});

Expand All @@ -1528,7 +1528,7 @@ mod tests {

let error = collector.poll_once(&job).await.unwrap_err();
assert_matches!(error, Error::Http(error_response) => {
assert_eq!(*error_response.status().unwrap(), StatusCode::INTERNAL_SERVER_ERROR);
assert_eq!(error_response.status(), StatusCode::INTERNAL_SERVER_ERROR);
assert_eq!(error_response.type_uri().unwrap(), "http://example.com/test_server_error");
assert!(error_response.dap_problem_type().is_none());
});
Expand All @@ -1552,7 +1552,7 @@ mod tests {

let error = collector.poll_once(&job).await.unwrap_err();
assert_matches!(error, Error::Http(error_response) => {
assert_eq!(*error_response.status().unwrap(), StatusCode::BAD_REQUEST);
assert_eq!(error_response.status(), StatusCode::BAD_REQUEST);
assert_eq!(error_response.type_uri().unwrap(), "urn:ietf:params:ppm:dap:error:invalidMessage");
assert_eq!(error_response.detail().unwrap(), "The message type for a response was incorrect or the payload was malformed.");
assert_eq!(*error_response.dap_problem_type().unwrap(), DapProblemType::InvalidMessage);
Expand Down Expand Up @@ -1701,7 +1701,7 @@ mod tests {
.await;
let error = collector.poll_until_complete(&job).await.unwrap_err();
assert_matches!(error, Error::Http(error_response) => {
assert_eq!(*error_response.status().unwrap(), StatusCode::INTERNAL_SERVER_ERROR);
assert_eq!(error_response.status(), StatusCode::INTERNAL_SERVER_ERROR);
assert!(error_response.dap_problem_type().is_none());
});
mock_collection_job_always_fail.assert_async().await;
Expand Down Expand Up @@ -1948,7 +1948,7 @@ mod tests {
.await
.unwrap_err();
assert_matches!(error, Error::Http(error_response) => {
assert_eq!(*error_response.status().unwrap(), StatusCode::INTERNAL_SERVER_ERROR);
assert_eq!(error_response.status(), StatusCode::INTERNAL_SERVER_ERROR);
});

mock_error.assert_async().await;
Expand Down
5 changes: 3 additions & 2 deletions core/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,9 @@ impl HttpErrorResponse {
}

/// The HTTP status code returned by the server.
pub fn status(&self) -> Option<&StatusCode> {
self.problem_details.status.as_ref()
pub fn status(&self) -> StatusCode {
// Unwrap safety: Self::from_response() always populates this field.
self.problem_details.status.unwrap()
}

/// A URI that identifies the problem type.
Expand Down
Loading

0 comments on commit 3a9f441

Please sign in to comment.