Skip to content
This repository has been archived by the owner on Jul 21, 2023. It is now read-only.

Commit

Permalink
Merge pull request #417 from abetterinternet/timg/retry-sqs-and-sts
Browse files Browse the repository at this point in the history
facilitator: retries on SQS and STS requests
  • Loading branch information
tgeoghegan authored Feb 17, 2021
2 parents d23c14f + 8aa8d97 commit 441af9f
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 90 deletions.
73 changes: 57 additions & 16 deletions facilitator/src/aws_credentials.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use anyhow::Result;
use log::info;
use log::{debug, info};
use rusoto_core::{
credential::EnvironmentProvider,
credential::{
AutoRefreshingProvider, AwsCredentials, ContainerProvider, CredentialsError,
InstanceMetadataProvider, ProfileProvider, ProvideAwsCredentials,
EnvironmentProvider, InstanceMetadataProvider, ProfileProvider, ProvideAwsCredentials,
},
RusotoError, RusotoResult,
};
use rusoto_sts::WebIdentityProvider;
use std::{boxed::Box, time::Duration};
Expand All @@ -16,6 +16,59 @@ pub(crate) fn basic_runtime() -> Result<Runtime> {
Ok(Builder::new().basic_scheduler().enable_all().build()?)
}

/// We attempt AWS API requests up to three times (i.e., two retries)
const MAX_ATTEMPT_COUNT: i32 = 3;

/// Calls the provided closure, retrying up to MAX_ATTEMPT_COUNT times if it
/// fails with RusotoError::HttpDispatch, which indicates a problem sending the
/// request such as the connection getting closed under us.
/// Additionally, in the case where there is an error while fetching credentials
/// before making an actual request, the error will be RusotoError::Credentials,
/// wrapping a rusoto_core::credential::CredentialsError, which can in turn
/// contain an HttpDispatchError! Sadly, CredentialsError does not preserve the
/// structure of the underlying error, just its message, so we must resort to
/// matching on a substring in order to detect it.
pub fn retry_request<F, T, E>(action: &str, mut f: F) -> RusotoResult<T, E>
where
F: FnMut() -> RusotoResult<T, E>,
E: std::fmt::Debug,
{
let mut attempts = 0;
loop {
match f() {
Err(RusotoError::HttpDispatch(err)) => {
attempts += 1;
if attempts >= MAX_ATTEMPT_COUNT {
break Err(RusotoError::HttpDispatch(err));
}
info!(
"failed to {} (will retry {} more times): {}",
action,
MAX_ATTEMPT_COUNT - attempts,
err
);
}
Err(RusotoError::Credentials(err)) if err.message.contains("Error during dispatch") => {
attempts += 1;
if attempts >= MAX_ATTEMPT_COUNT {
break Err(RusotoError::Credentials(err));
}
info!(
"failed to {} (will retry {} more times): {}",
action,
MAX_ATTEMPT_COUNT - attempts,
err
);
}
Err(err) => {
debug!("encountered non retryable error: {:?}", err);
break Err(err);
}
result => break result,
}
}
}

// ------------- Everything below here was copied from rusoto/credential/src/lib.rs in the rusoto repo ---------------------------
// -------------------------------------------------------------------------------------------------------------------------------

Expand Down Expand Up @@ -109,19 +162,7 @@ async fn chain_provider_credentials(
if let Ok(creds) = provider.container_provider.credentials().await {
return Ok(creds);
}
match provider.webidp_provider.credentials().await {
Ok(creds) => return Ok(creds),
Err(err) => info!(
"failed to obtain credentials from webidp provider: {:?}",
err
),
}
if let Ok(creds) = provider.instance_metadata_provider.credentials().await {
return Ok(creds);
}
Err(CredentialsError::new(
"Couldn't find AWS credentials in environment, credentials file, or IAM role.",
))
provider.webidp_provider.credentials().await
}

use async_trait::async_trait;
Expand Down
75 changes: 39 additions & 36 deletions facilitator/src/task/sqs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::{convert::TryFrom, marker::PhantomData, str::FromStr, time::Duration};
use tokio::runtime::Runtime;

use crate::{
aws_credentials::{basic_runtime, DefaultCredentialsProvider},
aws_credentials::{basic_runtime, retry_request, DefaultCredentialsProvider},
task::{Task, TaskHandle, TaskQueue},
};

Expand Down Expand Up @@ -43,24 +43,24 @@ impl<T: Task> TaskQueue<T> for AwsSqsTaskQueue<T> {

let client = self.sqs_client()?;

let request = ReceiveMessageRequest {
// Dequeue one task at a time
max_number_of_messages: Some(1),
queue_url: self.queue_url.clone(),
// Long polling. SQS allows us to wait up to 20 seconds.
// https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-short-and-long-polling.html#sqs-long-polling
wait_time_seconds: Some(20),
// Visibility timeout configures how long SQS will wait for message
// deletion by this client before making a message visible again to
// other queue consumers. We set it to 600s = 10 minutes.
visibility_timeout: Some(600),
..Default::default()
};

let response = self
.runtime
.block_on(client.receive_message(request))
.context("failed to dequeue message from SQS")?;
let response = retry_request("dequeue SQS message", || {
let request = ReceiveMessageRequest {
// Dequeue one task at a time
max_number_of_messages: Some(1),
queue_url: self.queue_url.clone(),
// Long polling. SQS allows us to wait up to 20 seconds.
// https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-short-and-long-polling.html#sqs-long-polling
wait_time_seconds: Some(20),
// Visibility timeout configures how long SQS will wait for message
// deletion by this client before making a message visible again to
// other queue consumers. We set it to 600s = 10 minutes.
visibility_timeout: Some(600),
..Default::default()
};

self.runtime.block_on(client.receive_message(request))
})
.context("failed to dequeue message from SQS")?;

let received_messages = match response.messages {
Some(ref messages) => messages,
Expand Down Expand Up @@ -104,15 +104,16 @@ impl<T: Task> TaskQueue<T> for AwsSqsTaskQueue<T> {

let client = self.sqs_client()?;

let request = DeleteMessageRequest {
queue_url: self.queue_url.clone(),
receipt_handle: task.acknowledgment_id.clone(),
};
let response = retry_request("delete/acknowledge message in SQS", || {
let request = DeleteMessageRequest {
queue_url: self.queue_url.clone(),
receipt_handle: task.acknowledgment_id.clone(),
};
self.runtime.block_on(client.delete_message(request))
})
.context("failed to delete/acknowledge message in SQS");

Ok(self
.runtime
.block_on(client.delete_message(request))
.context("failed to delete/acknowledge message in SQS")?)
response
}

fn nacknowledge_task(&mut self, task: TaskHandle<T>) -> Result<()> {
Expand Down Expand Up @@ -179,15 +180,17 @@ impl<T: Task> AwsSqsTaskQueue<T> {
visibility_timeout
))?;

let request = ChangeMessageVisibilityRequest {
queue_url: self.queue_url.clone(),
receipt_handle: task.acknowledgment_id.clone(),
visibility_timeout: timeout,
};
let response = retry_request("changing message visibility", || {
let request = ChangeMessageVisibilityRequest {
queue_url: self.queue_url.clone(),
receipt_handle: task.acknowledgment_id.clone(),
visibility_timeout: timeout,
};
self.runtime
.block_on(client.change_message_visibility(request))
})
.context("failed to change message visibility message in SQS");

Ok(self
.runtime
.block_on(client.change_message_visibility(request))
.context("failed to change message visibility message in SQS")?)
response
}
}
41 changes: 3 additions & 38 deletions facilitator/src/transport/s3.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
use crate::{
aws_credentials::{basic_runtime, DefaultCredentialsProvider},
aws_credentials::{basic_runtime, retry_request, DefaultCredentialsProvider},
config::{Identity, S3Path},
transport::{Transport, TransportWriter},
Error,
};
use anyhow::{Context, Result};
use derivative::Derivative;
use hyper_rustls::HttpsConnector;
use log::{debug, info};
use log::info;
use rusoto_core::{
credential::{AutoRefreshingProvider, CredentialsError, Secret, Variable},
ByteStream, Region, RusotoError, RusotoResult,
ByteStream, Region,
};
use rusoto_s3::{
AbortMultipartUploadRequest, CompleteMultipartUploadRequest, CompletedMultipartUpload,
Expand Down Expand Up @@ -42,44 +42,9 @@ const METADATA_SERVICE_TOKEN_URL: &str = "http://metadata.google.internal:80/com
// via environment variable.
const AWS_ACCOUNT_ID_ENVIRONMENT_VARIABLE: &str = "AWS_ACCOUNT_ID";

/// We attempt AWS API requests up to three times (i.e., two retries)
const MAX_ATTEMPT_COUNT: i32 = 3;

/// ClientProvider allows mocking out a client for testing.
type ClientProvider = Box<dyn Fn(&Region, Option<String>) -> Result<S3Client>>;

/// Calls the provided closure, retrying up to MAX_ATTEMPT_COUNT times if it
/// fails with RusotoError::HttpDispatch, which indicates a problem sending the
/// request such as the connection getting closed under us.
fn retry_request<F, T, E>(action: &str, mut f: F) -> RusotoResult<T, E>
where
F: FnMut() -> RusotoResult<T, E>,
E: std::fmt::Debug,
{
let mut attempts = 0;
loop {
match f() {
Err(RusotoError::HttpDispatch(err)) => {
attempts += 1;
if attempts >= MAX_ATTEMPT_COUNT {
break Err(RusotoError::HttpDispatch(err));
}
info!(
"failed to {} (will retry {} more times): {}",
action,
MAX_ATTEMPT_COUNT - attempts,
err
);
}
Err(err) => {
debug!("encountered non retryable error: {:?}", err);
break Err(err);
}
result => break result,
}
}
}

/// Implementation of Transport that reads and writes objects from Amazon S3.
#[derive(Derivative)]
#[derivative(Debug)]
Expand Down

0 comments on commit 441af9f

Please sign in to comment.