Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Abandon collection jobs early when a fatal error is encountered #2476

Merged
merged 1 commit into from
Jan 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions aggregator/src/aggregator/aggregation_job_driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1500,7 +1500,7 @@ mod tests {
assert_matches!(
error.downcast().unwrap(),
Error::Http(error_response) => {
assert_eq!(*error_response.status().unwrap(), StatusCode::INTERNAL_SERVER_ERROR);
assert_eq!(error_response.status(), StatusCode::INTERNAL_SERVER_ERROR);
assert_eq!(*error_response.dap_problem_type().unwrap(), DapProblemType::UnauthorizedRequest);
}
);
Expand Down Expand Up @@ -2072,7 +2072,7 @@ mod tests {
assert_matches!(
error.downcast().unwrap(),
Error::Http(error_response) => {
assert_eq!(*error_response.status().unwrap(), StatusCode::INTERNAL_SERVER_ERROR);
assert_eq!(error_response.status(), StatusCode::INTERNAL_SERVER_ERROR);
assert_eq!(*error_response.dap_problem_type().unwrap(), DapProblemType::UnauthorizedRequest);
}
);
Expand Down Expand Up @@ -2794,7 +2794,7 @@ mod tests {
assert_matches!(
error.downcast().unwrap(),
Error::Http(error_response) => {
assert_eq!(*error_response.status().unwrap(), StatusCode::INTERNAL_SERVER_ERROR);
assert_eq!(error_response.status(), StatusCode::INTERNAL_SERVER_ERROR);
assert_eq!(*error_response.dap_problem_type().unwrap(), DapProblemType::UnrecognizedTask);
}
);
Expand Down Expand Up @@ -3182,7 +3182,7 @@ mod tests {
assert_matches!(
error.downcast().unwrap(),
Error::Http(error_response) => {
assert_eq!(*error_response.status().unwrap(), StatusCode::INTERNAL_SERVER_ERROR);
assert_eq!(error_response.status(), StatusCode::INTERNAL_SERVER_ERROR);
assert_eq!(*error_response.dap_problem_type().unwrap(), DapProblemType::UnrecognizedTask);
}
);
Expand Down
171 changes: 159 additions & 12 deletions aggregator/src/aggregator/collection_job_driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use janus_aggregator_core::{
},
task,
};
use janus_core::{time::Clock, vdaf_dispatch};
use janus_core::{retries::is_retryable_http_status, time::Clock, vdaf_dispatch};
use janus_messages::{
query_type::{FixedSize, QueryType, TimeInterval},
AggregateShare, AggregateShareReq, BatchSelector,
Expand All @@ -33,7 +33,7 @@ use prio::{
use reqwest::Method;
use std::{sync::Arc, time::Duration};
use tokio::try_join;
use tracing::{info, warn};
use tracing::{error, info, warn};

/// Drives a collection job.
#[derive(Derivative)]
Expand Down Expand Up @@ -325,7 +325,7 @@ impl CollectionJobDriver {
pub async fn abandon_collection_job<C: Clock>(
&self,
datastore: Arc<Datastore<C>>,
lease: Lease<AcquiredCollectionJob>,
lease: Arc<Lease<AcquiredCollectionJob>>,
) -> Result<(), Error> {
match lease.leased().query_type() {
task::QueryType::TimeInterval => {
Expand Down Expand Up @@ -360,13 +360,12 @@ impl CollectionJobDriver {
&self,
datastore: Arc<Datastore<C>>,
vdaf: Arc<A>,
lease: Lease<AcquiredCollectionJob>,
lease: Arc<Lease<AcquiredCollectionJob>>,
) -> Result<(), Error>
where
A::AggregationParam: Send + Sync,
A::AggregateShare: Send + Sync,
{
let lease = Arc::new(lease);
datastore
.run_tx("abandon_collection_job", |tx| {
let (vdaf, lease) = (Arc::clone(&vdaf), Arc::clone(&lease));
Expand Down Expand Up @@ -428,10 +427,12 @@ impl CollectionJobDriver {
) -> impl Fn(Lease<AcquiredCollectionJob>) -> BoxFuture<'static, Result<(), super::Error>> {
move |collection_job_lease: Lease<AcquiredCollectionJob>| {
let (this, datastore) = (Arc::clone(&self), Arc::clone(&datastore));
let collection_job_lease = Arc::new(collection_job_lease);
Box::pin(async move {
let attempts = collection_job_lease.lease_attempts();
if collection_job_lease.lease_attempts() > maximum_attempts_before_failure {
warn!(
attempts = %collection_job_lease.lease_attempts(),
attempts = %attempts,
max_attempts = %maximum_attempts_before_failure,
"Abandoning job due to too many failed attempts"
);
Expand All @@ -445,11 +446,61 @@ impl CollectionJobDriver {
this.metrics.job_steps_retried_counter.add(1, &[]);
}

this.step_collection_job(datastore, Arc::new(collection_job_lease))
match this
.step_collection_job(Arc::clone(&datastore), Arc::clone(&collection_job_lease))
.await
{
Ok(_) => Ok(()),
Err(error) => {
if !Self::is_retryable_error(&error) {
// Make a best-effort attempt to immediately cancel the collection job.
// on fatal errors. This protects the helper from performing wasted
// work.
//
// Cancellation might fail, but we will return the first error, since
// that's the more interesting error for debugging purposes.
//
// If cancellation fails, the job will be picked up again. This isn't
// a big deal, since stepping a collection job is idempotent. It would
// just be some wasted work next time around.
warn!(
attempts = %attempts,
max_attempts = %maximum_attempts_before_failure,
"Abandoning job due to fatal error"
);
this.metrics.jobs_abandoned_counter.add(1, &[]);
if let Err(error) = this
.abandon_collection_job(datastore, collection_job_lease)
.await
{
error!(error = ?error, "Failed to abandon job");
}
}
Err(error)
}
}
})
}
}

/// Determines whether the given [`Error`] is retryable in the context of collection job
/// processing.
fn is_retryable_error(error: &Error) -> bool {
match error {
Error::Http(http_error_response) => {
is_retryable_http_status(http_error_response.status())
}
Error::Datastore(error) => match error {
datastore::Error::Db(_) | datastore::Error::Pool(_) => true,
datastore::Error::User(error) => match error.downcast_ref::<Error>() {
Some(error) => Self::is_retryable_error(error),
None => false,
},
_ => false,
},
_ => false,
}
}
}

/// Holds various metrics instruments for a collection job driver.
Expand Down Expand Up @@ -922,7 +973,7 @@ mod tests {
error,
Error::Http(error_response) => {
assert_matches!(error_response.dap_problem_type(), Some(DapProblemType::BatchQueriedTooManyTimes));
assert_eq!(*error_response.status().unwrap(), StatusCode::INTERNAL_SERVER_ERROR);
assert_eq!(error_response.status(), StatusCode::INTERNAL_SERVER_ERROR);
}
);

Expand Down Expand Up @@ -1030,8 +1081,9 @@ mod tests {
);

// Run: abandon the collection job.
let lease = Arc::new(lease.unwrap());
collection_job_driver
.abandon_collection_job(Arc::clone(&ds), lease.unwrap())
.abandon_collection_job(Arc::clone(&ds), lease)
.await
.unwrap();

Expand Down Expand Up @@ -1066,7 +1118,101 @@ mod tests {
}

#[tokio::test]
async fn abandon_failing_collection_job() {
async fn abandon_failing_collection_job_with_fatal_error() {
install_test_trace_subscriber();
let mut server = mockito::Server::new_async().await;
let clock = MockClock::default();
let mut runtime_manager = TestRuntimeManager::new();
let ephemeral_datastore = ephemeral_datastore().await;
let ds = Arc::new(ephemeral_datastore.datastore(clock.clone()).await);
let stopper = Stopper::new();

let (task, _, collection_job) =
setup_collection_job_test_case(&mut server, clock.clone(), Arc::clone(&ds), false)
.await;

// Set up the collection job driver
let collection_job_driver = Arc::new(CollectionJobDriver::new(
reqwest::Client::new(),
&noop_meter(),
1,
));
let job_driver = Arc::new(
JobDriver::new(
clock.clone(),
runtime_manager.with_label("stepper"),
noop_meter(),
stopper.clone(),
StdDuration::from_secs(1),
10,
StdDuration::from_secs(60),
collection_job_driver.make_incomplete_job_acquirer_callback(
Arc::clone(&ds),
StdDuration::from_secs(600),
),
collection_job_driver.make_job_stepper_callback(Arc::clone(&ds), 3),
)
.unwrap(),
);

// Set up an error response from the server that returns a non-retryable error.
let failure_mock = server
.mock("POST", task.aggregate_shares_uri().unwrap().path())
.with_status(404)
.expect(1)
.create_async()
.await;
// Set up an extra response that should never be used, to make sure the job driver doesn't
// make more requests than we expect. If there were no remaining mocks, mockito would have
// respond with a fallback error response instead.
let no_more_requests_mock = server
.mock("POST", task.aggregate_shares_uri().unwrap().path())
.with_status(502)
.expect(1)
.create_async()
.await;

// Start up the job driver.
let task_handle = runtime_manager.with_label("driver").spawn(job_driver.run());

// Wait for the next task to be spawned and to complete.
runtime_manager.wait_for_completed_tasks("stepper", 1).await;
// Advance the clock by the lease duration, so that the job driver can pick up the job
// and try again.
clock.advance(&Duration::from_seconds(600));

// Shut down the job driver.
stopper.stop();
task_handle.await.unwrap();

// Check that the job driver made the HTTP requests we expected.
failure_mock.assert_async().await;
assert!(!no_more_requests_mock.matched_async().await);

// Confirm that the collection job was abandoned.
let collection_job_after = ds
.run_unnamed_tx(|tx| {
let collection_job = collection_job.clone();
Box::pin(async move {
tx.get_collection_job::<0, TimeInterval, dummy_vdaf::Vdaf>(
&dummy_vdaf::Vdaf::new(),
collection_job.task_id(),
collection_job.id(),
)
.await
})
})
.await
.unwrap()
.unwrap();
assert_eq!(
collection_job_after,
collection_job.with_state(CollectionJobState::Abandoned),
);
}

#[tokio::test]
async fn abandon_failing_collection_job_with_retryable_error() {
install_test_trace_subscriber();
let mut server = mockito::Server::new_async().await;
let clock = MockClock::default();
Expand Down Expand Up @@ -1104,10 +1250,11 @@ mod tests {
);

// Set up three error responses from our mock helper. These will cause errors in the
// leader, because the response body is empty and cannot be decoded.
// leader, because the response body is empty and cannot be decoded. The error status
// indicates that the error is retryable.
let failure_mock = server
.mock("POST", task.aggregate_shares_uri().unwrap().path())
.with_status(500)
.with_status(502)
.expect(3)
.create_async()
.await;
Expand Down
4 changes: 2 additions & 2 deletions client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -580,7 +580,7 @@ mod tests {
assert_matches!(
client.upload(&1).await,
Err(Error::Http(error_response)) => {
assert_eq!(*error_response.status().unwrap(), StatusCode::NOT_IMPLEMENTED);
assert_eq!(error_response.status(), StatusCode::NOT_IMPLEMENTED);
}
);

Expand Down Expand Up @@ -613,7 +613,7 @@ mod tests {
assert_matches!(
client.upload(&1).await,
Err(Error::Http(error_response)) => {
assert_eq!(*error_response.status().unwrap(), StatusCode::BAD_REQUEST);
assert_eq!(error_response.status(), StatusCode::BAD_REQUEST);
assert_eq!(
error_response.type_uri().unwrap(),
"urn:ietf:params:ppm:dap:error:invalidMessage"
Expand Down
16 changes: 8 additions & 8 deletions collector/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1408,7 +1408,7 @@ mod tests {
.await
.unwrap_err();
assert_matches!(error, Error::Http(error_response) => {
assert_eq!(*error_response.status().unwrap(), StatusCode::INTERNAL_SERVER_ERROR);
assert_eq!(error_response.status(), StatusCode::INTERNAL_SERVER_ERROR);
assert!(error_response.dap_problem_type().is_none());
});

Expand All @@ -1432,7 +1432,7 @@ mod tests {
.await
.unwrap_err();
assert_matches!(error, Error::Http(error_response) => {
assert_eq!(*error_response.status().unwrap(), StatusCode::INTERNAL_SERVER_ERROR);
assert_eq!(error_response.status(), StatusCode::INTERNAL_SERVER_ERROR);
assert_eq!(error_response.type_uri().unwrap(), "http://example.com/test_server_error");
assert!(error_response.dap_problem_type().is_none());
});
Expand Down Expand Up @@ -1461,7 +1461,7 @@ mod tests {
.await
.unwrap_err();
assert_matches!(error, Error::Http(error_response) => {
assert_eq!(*error_response.status().unwrap(), StatusCode::BAD_REQUEST);
assert_eq!(error_response.status(), StatusCode::BAD_REQUEST);
assert_eq!(error_response.type_uri().unwrap(), "urn:ietf:params:ppm:dap:error:invalidMessage");
assert_eq!(error_response.detail().unwrap(), "The message type for a response was incorrect or the payload was malformed.");
assert_eq!(*error_response.dap_problem_type().unwrap(), DapProblemType::InvalidMessage);
Expand Down Expand Up @@ -1506,7 +1506,7 @@ mod tests {
.unwrap();
let error = collector.poll_once(&job).await.unwrap_err();
assert_matches!(error, Error::Http(error_response) => {
assert_eq!(*error_response.status().unwrap(), StatusCode::INTERNAL_SERVER_ERROR);
assert_eq!(error_response.status(), StatusCode::INTERNAL_SERVER_ERROR);
assert!(error_response.dap_problem_type().is_none());
});

Expand All @@ -1528,7 +1528,7 @@ mod tests {

let error = collector.poll_once(&job).await.unwrap_err();
assert_matches!(error, Error::Http(error_response) => {
assert_eq!(*error_response.status().unwrap(), StatusCode::INTERNAL_SERVER_ERROR);
assert_eq!(error_response.status(), StatusCode::INTERNAL_SERVER_ERROR);
assert_eq!(error_response.type_uri().unwrap(), "http://example.com/test_server_error");
assert!(error_response.dap_problem_type().is_none());
});
Expand All @@ -1552,7 +1552,7 @@ mod tests {

let error = collector.poll_once(&job).await.unwrap_err();
assert_matches!(error, Error::Http(error_response) => {
assert_eq!(*error_response.status().unwrap(), StatusCode::BAD_REQUEST);
assert_eq!(error_response.status(), StatusCode::BAD_REQUEST);
assert_eq!(error_response.type_uri().unwrap(), "urn:ietf:params:ppm:dap:error:invalidMessage");
assert_eq!(error_response.detail().unwrap(), "The message type for a response was incorrect or the payload was malformed.");
assert_eq!(*error_response.dap_problem_type().unwrap(), DapProblemType::InvalidMessage);
Expand Down Expand Up @@ -1701,7 +1701,7 @@ mod tests {
.await;
let error = collector.poll_until_complete(&job).await.unwrap_err();
assert_matches!(error, Error::Http(error_response) => {
assert_eq!(*error_response.status().unwrap(), StatusCode::INTERNAL_SERVER_ERROR);
assert_eq!(error_response.status(), StatusCode::INTERNAL_SERVER_ERROR);
assert!(error_response.dap_problem_type().is_none());
});
mock_collection_job_always_fail.assert_async().await;
Expand Down Expand Up @@ -1948,7 +1948,7 @@ mod tests {
.await
.unwrap_err();
assert_matches!(error, Error::Http(error_response) => {
assert_eq!(*error_response.status().unwrap(), StatusCode::INTERNAL_SERVER_ERROR);
assert_eq!(error_response.status(), StatusCode::INTERNAL_SERVER_ERROR);
});

mock_error.assert_async().await;
Expand Down
5 changes: 3 additions & 2 deletions core/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,9 @@ impl HttpErrorResponse {
}

/// The HTTP status code returned by the server.
pub fn status(&self) -> Option<&StatusCode> {
self.problem_details.status.as_ref()
pub fn status(&self) -> StatusCode {
// Unwrap safety: Self::from_response() always populates this field.
self.problem_details.status.unwrap()
}

/// A URI that identifies the problem type.
Expand Down
Loading
Loading