From 8aa8d97b4f8bda4f8154f4e407c094faab68fe9e Mon Sep 17 00:00:00 2001 From: Tim Geoghegan Date: Tue, 16 Feb 2021 21:56:37 -0800 Subject: [PATCH] facilitator: retries on SQS and STS requests Analysis of NCI logs shows that requests to SQS can sometimes fail because of DNS lookup failures. Further, S3 requests sometimes fail in the credentials phase due to DNS failures, which doesn't get caught by existing retry mechanisms because of how Rusoto bubbles up errors (see comment in `aws_credentials.rs::retry_request`). We now will retry both SQS client requests, and check for a CredentialsError that _probably_ wraps an HttpDispatchError so we can retry requests to STS. Additionally, to ensure we can accurately detect the failure when using the webidp_provider (which is what NCI does), we short circuit the chain of providers in `ChainProvider::chain_provider_credentials`. --- facilitator/src/aws_credentials.rs | 73 ++++++++++++++++++++++------- facilitator/src/task/sqs.rs | 75 ++++++++++++++++-------------- facilitator/src/transport/s3.rs | 41 ++-------------- 3 files changed, 99 insertions(+), 90 deletions(-) diff --git a/facilitator/src/aws_credentials.rs b/facilitator/src/aws_credentials.rs index 7c6072cff..336206039 100644 --- a/facilitator/src/aws_credentials.rs +++ b/facilitator/src/aws_credentials.rs @@ -1,11 +1,11 @@ use anyhow::Result; -use log::info; +use log::{debug, info}; use rusoto_core::{ - credential::EnvironmentProvider, credential::{ AutoRefreshingProvider, AwsCredentials, ContainerProvider, CredentialsError, - InstanceMetadataProvider, ProfileProvider, ProvideAwsCredentials, + EnvironmentProvider, InstanceMetadataProvider, ProfileProvider, ProvideAwsCredentials, }, + RusotoError, RusotoResult, }; use rusoto_sts::WebIdentityProvider; use std::{boxed::Box, time::Duration}; @@ -16,6 +16,59 @@ pub(crate) fn basic_runtime() -> Result { Ok(Builder::new().basic_scheduler().enable_all().build()?) } +/// We attempt AWS API requests up to three times (i.e., two retries) +const MAX_ATTEMPT_COUNT: i32 = 3; + +/// Calls the provided closure, retrying up to MAX_ATTEMPT_COUNT times if it +/// fails with RusotoError::HttpDispatch, which indicates a problem sending the +/// request such as the connection getting closed under us. +/// Additionally, in the case where there is an error while fetching credentials +/// before making an actual request, the error will be RusotoError::Credentials, +/// wrapping a rusoto_core::credential::CredentialsError, which can in turn +/// contain an HttpDispatchError! Sadly, CredentialsError does not preserve the +/// structure of the underlying error, just its message, so we must resort to +/// matching on a substring in order to detect it. +pub fn retry_request(action: &str, mut f: F) -> RusotoResult +where + F: FnMut() -> RusotoResult, + E: std::fmt::Debug, +{ + let mut attempts = 0; + loop { + match f() { + Err(RusotoError::HttpDispatch(err)) => { + attempts += 1; + if attempts >= MAX_ATTEMPT_COUNT { + break Err(RusotoError::HttpDispatch(err)); + } + info!( + "failed to {} (will retry {} more times): {}", + action, + MAX_ATTEMPT_COUNT - attempts, + err + ); + } + Err(RusotoError::Credentials(err)) if err.message.contains("Error during dispatch") => { + attempts += 1; + if attempts >= MAX_ATTEMPT_COUNT { + break Err(RusotoError::Credentials(err)); + } + info!( + "failed to {} (will retry {} more times): {}", + action, + MAX_ATTEMPT_COUNT - attempts, + err + ); + } + Err(err) => { + debug!("encountered non retryable error: {:?}", err); + break Err(err); + } + result => break result, + } + } +} + // ------------- Everything below here was copied from rusoto/credential/src/lib.rs in the rusoto repo --------------------------- // ------------------------------------------------------------------------------------------------------------------------------- @@ -109,19 +162,7 @@ async fn chain_provider_credentials( if let Ok(creds) = provider.container_provider.credentials().await { return Ok(creds); } - match provider.webidp_provider.credentials().await { - Ok(creds) => return Ok(creds), - Err(err) => info!( - "failed to obtain credentials from webidp provider: {:?}", - err - ), - } - if let Ok(creds) = provider.instance_metadata_provider.credentials().await { - return Ok(creds); - } - Err(CredentialsError::new( - "Couldn't find AWS credentials in environment, credentials file, or IAM role.", - )) + provider.webidp_provider.credentials().await } use async_trait::async_trait; diff --git a/facilitator/src/task/sqs.rs b/facilitator/src/task/sqs.rs index 0959cad80..c332549f4 100644 --- a/facilitator/src/task/sqs.rs +++ b/facilitator/src/task/sqs.rs @@ -9,7 +9,7 @@ use std::{convert::TryFrom, marker::PhantomData, str::FromStr, time::Duration}; use tokio::runtime::Runtime; use crate::{ - aws_credentials::{basic_runtime, DefaultCredentialsProvider}, + aws_credentials::{basic_runtime, retry_request, DefaultCredentialsProvider}, task::{Task, TaskHandle, TaskQueue}, }; @@ -43,24 +43,24 @@ impl TaskQueue for AwsSqsTaskQueue { let client = self.sqs_client()?; - let request = ReceiveMessageRequest { - // Dequeue one task at a time - max_number_of_messages: Some(1), - queue_url: self.queue_url.clone(), - // Long polling. SQS allows us to wait up to 20 seconds. - // https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-short-and-long-polling.html#sqs-long-polling - wait_time_seconds: Some(20), - // Visibility timeout configures how long SQS will wait for message - // deletion by this client before making a message visible again to - // other queue consumers. We set it to 600s = 10 minutes. - visibility_timeout: Some(600), - ..Default::default() - }; - - let response = self - .runtime - .block_on(client.receive_message(request)) - .context("failed to dequeue message from SQS")?; + let response = retry_request("dequeue SQS message", || { + let request = ReceiveMessageRequest { + // Dequeue one task at a time + max_number_of_messages: Some(1), + queue_url: self.queue_url.clone(), + // Long polling. SQS allows us to wait up to 20 seconds. + // https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-short-and-long-polling.html#sqs-long-polling + wait_time_seconds: Some(20), + // Visibility timeout configures how long SQS will wait for message + // deletion by this client before making a message visible again to + // other queue consumers. We set it to 600s = 10 minutes. + visibility_timeout: Some(600), + ..Default::default() + }; + + self.runtime.block_on(client.receive_message(request)) + }) + .context("failed to dequeue message from SQS")?; let received_messages = match response.messages { Some(ref messages) => messages, @@ -104,15 +104,16 @@ impl TaskQueue for AwsSqsTaskQueue { let client = self.sqs_client()?; - let request = DeleteMessageRequest { - queue_url: self.queue_url.clone(), - receipt_handle: task.acknowledgment_id.clone(), - }; + let response = retry_request("delete/acknowledge message in SQS", || { + let request = DeleteMessageRequest { + queue_url: self.queue_url.clone(), + receipt_handle: task.acknowledgment_id.clone(), + }; + self.runtime.block_on(client.delete_message(request)) + }) + .context("failed to delete/acknowledge message in SQS"); - Ok(self - .runtime - .block_on(client.delete_message(request)) - .context("failed to delete/acknowledge message in SQS")?) + response } fn nacknowledge_task(&mut self, task: TaskHandle) -> Result<()> { @@ -179,15 +180,17 @@ impl AwsSqsTaskQueue { visibility_timeout ))?; - let request = ChangeMessageVisibilityRequest { - queue_url: self.queue_url.clone(), - receipt_handle: task.acknowledgment_id.clone(), - visibility_timeout: timeout, - }; + let response = retry_request("changing message visibility", || { + let request = ChangeMessageVisibilityRequest { + queue_url: self.queue_url.clone(), + receipt_handle: task.acknowledgment_id.clone(), + visibility_timeout: timeout, + }; + self.runtime + .block_on(client.change_message_visibility(request)) + }) + .context("failed to change message visibility message in SQS"); - Ok(self - .runtime - .block_on(client.change_message_visibility(request)) - .context("failed to change message visibility message in SQS")?) + response } } diff --git a/facilitator/src/transport/s3.rs b/facilitator/src/transport/s3.rs index 2e815da18..b1bf288f6 100644 --- a/facilitator/src/transport/s3.rs +++ b/facilitator/src/transport/s3.rs @@ -1,5 +1,5 @@ use crate::{ - aws_credentials::{basic_runtime, DefaultCredentialsProvider}, + aws_credentials::{basic_runtime, retry_request, DefaultCredentialsProvider}, config::{Identity, S3Path}, transport::{Transport, TransportWriter}, Error, @@ -7,10 +7,10 @@ use crate::{ use anyhow::{Context, Result}; use derivative::Derivative; use hyper_rustls::HttpsConnector; -use log::{debug, info}; +use log::info; use rusoto_core::{ credential::{AutoRefreshingProvider, CredentialsError, Secret, Variable}, - ByteStream, Region, RusotoError, RusotoResult, + ByteStream, Region, }; use rusoto_s3::{ AbortMultipartUploadRequest, CompleteMultipartUploadRequest, CompletedMultipartUpload, @@ -42,44 +42,9 @@ const METADATA_SERVICE_TOKEN_URL: &str = "http://metadata.google.internal:80/com // via environment variable. const AWS_ACCOUNT_ID_ENVIRONMENT_VARIABLE: &str = "AWS_ACCOUNT_ID"; -/// We attempt AWS API requests up to three times (i.e., two retries) -const MAX_ATTEMPT_COUNT: i32 = 3; - /// ClientProvider allows mocking out a client for testing. type ClientProvider = Box) -> Result>; -/// Calls the provided closure, retrying up to MAX_ATTEMPT_COUNT times if it -/// fails with RusotoError::HttpDispatch, which indicates a problem sending the -/// request such as the connection getting closed under us. -fn retry_request(action: &str, mut f: F) -> RusotoResult -where - F: FnMut() -> RusotoResult, - E: std::fmt::Debug, -{ - let mut attempts = 0; - loop { - match f() { - Err(RusotoError::HttpDispatch(err)) => { - attempts += 1; - if attempts >= MAX_ATTEMPT_COUNT { - break Err(RusotoError::HttpDispatch(err)); - } - info!( - "failed to {} (will retry {} more times): {}", - action, - MAX_ATTEMPT_COUNT - attempts, - err - ); - } - Err(err) => { - debug!("encountered non retryable error: {:?}", err); - break Err(err); - } - result => break result, - } - } -} - /// Implementation of Transport that reads and writes objects from Amazon S3. #[derive(Derivative)] #[derivative(Debug)]