Skip to content
This repository has been archived by the owner on Jul 21, 2023. It is now read-only.

Commit

Permalink
Merge pull request #615 from abetterinternet/timg/s3-error-handling
Browse files Browse the repository at this point in the history
facilitator: hack to detect S3 upload failure
  • Loading branch information
tgeoghegan authored Apr 30, 2021
2 parents c8e35c7 + e4a1d27 commit ba24f5f
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 13 deletions.
2 changes: 1 addition & 1 deletion facilitator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ atty = "0.2"
avro-rs = { version = "0.13.0", features = ["snappy"] }
backoff = "0.3.0"
base64 = "0.13.0"
bytes = "1.0.1"
chrono = { version = "0.4", features = ["serde"] }
clap = "2.33.3"
derivative = "2.1.1"
Expand Down Expand Up @@ -52,6 +53,5 @@ xml-rs = "0.8"

[dev-dependencies]
assert_matches = "1.5.0"
bytes = "1.0.1"
mockito = "0.30.0"
serde_test = "1.0"
119 changes: 107 additions & 12 deletions facilitator/src/transport/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@ use crate::{
Error,
};
use anyhow::{Context, Result};
use bytes::Bytes;
use derivative::Derivative;
use http::{HeaderMap, StatusCode};
use hyper_rustls::HttpsConnector;
use rusoto_core::{ByteStream, Region};
use rusoto_core::{request::BufferedHttpResponse, ByteStream, Region, RusotoError};
use rusoto_s3::{
AbortMultipartUploadRequest, CompleteMultipartUploadRequest, CompletedMultipartUpload,
CompletedPart, CreateMultipartUploadRequest, GetObjectRequest, S3Client, UploadPartRequest, S3,
Expand Down Expand Up @@ -337,17 +339,40 @@ impl TransportWriter for MultipartUploadWriter {
retry_request(
&self.logger.new(o!(event::ACTION => "complete upload")),
|| {
self.runtime.block_on(self.client.complete_multipart_upload(
CompleteMultipartUploadRequest {
bucket: self.bucket.to_string(),
key: self.key.to_string(),
upload_id: self.upload_id.clone(),
multipart_upload: Some(CompletedMultipartUpload {
parts: Some(completed_parts.clone()),
}),
..Default::default()
},
))
let output = self
.runtime
.block_on(self.client.complete_multipart_upload(
CompleteMultipartUploadRequest {
bucket: self.bucket.to_string(),
key: self.key.to_string(),
upload_id: self.upload_id.clone(),
multipart_upload: Some(CompletedMultipartUpload {
parts: Some(completed_parts.clone()),
}),
..Default::default()
},
))?;

// Due to an oddity in S3's CompleteMultipartUpload API, some
// failed uploads can cause complete_multipart_upload to return
// Ok(). To work around this, we check if key fields of the
// output are None, and if so, synthesize a RusotoError::Unknown
// wrapping an HTTP response with status code 500 so that our
// retry logic will gracefully try again.
// https://github.com/rusoto/rusoto/issues/1936
if output.location == None
&& output.e_tag == None
&& output.bucket == None
&& output.key == None
{
return Err(RusotoError::Unknown(BufferedHttpResponse {
status: StatusCode::from_u16(500).unwrap(),
headers: HeaderMap::with_capacity(0),
body: Bytes::from_static(b"synthetic HTTP 500 response"),
}));
}

Ok(output)
},
)
.context("error completing upload")?;
Expand Down Expand Up @@ -529,6 +554,76 @@ mod tests {
.expect_err("expected error");
}

#[test]
fn multipart_upload_fails_with_http_200_ok() {
// This test exercises our workaround for a known issue in Rusoto and
// should be removed if/when the issue is resolved.
// https://github.com/rusoto/rusoto/issues/1936
let logger = setup_test_logging();

// Response body format from
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_Operations_Amazon_Simple_Storage_Service.html
let requests = vec![
// Response to CreateMultipartUpload
MockRequestDispatcher::with_status(200)
.with_body(
r#"<?xml version="1.0" encoding="UTF-8"?>
<InitiateMultipartUploadResult>
<Bucket>fake-bucket</Bucket>
<Key>fake-key</Key>
<UploadId>upload-id</UploadId>
</InitiateMultipartUploadResult>"#,
)
.with_request_checker(is_create_multipart_upload_request),
// Well formed response to UploadPart.
MockRequestDispatcher::with_status(200)
.with_request_checker(is_upload_part_request)
.with_header("ETag", "fake-etag"),
// HTTP 200 response to CompleteMultipartUpload that contains an
// error. Should cause us to retry.
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html#API_CompleteMultipartUpload_Examples
MockRequestDispatcher::with_status(200)
.with_request_checker(is_complete_multipart_upload_request)
.with_body(
r#"<?xml version="1.0" encoding="UTF-8"?>
<Error>
<Code>InternalError</Code>
<Message>We encountered an internal error. Please try again.</Message>
<RequestId>656c76696e6727732072657175657374</RequestId>
<HostId>Uuag1LuByRx9e6j5Onimru9pO4ZVKnJ2Qz7/C1NPcfTWAtRPfTaOFg==</HostId>
</Error>"#,
),
// Well formed response to CompleteMultipartUpload
MockRequestDispatcher::with_status(200)
.with_request_checker(is_complete_multipart_upload_request)
.with_body(
r#"<?xml version="1.0" encoding="UTF-8"?>
<CompleteMultipartUploadResult>
<Location>string</Location>
<Bucket>fake-bucket</Bucket>
<Key>fake-key</Key>
<ETag>fake-etag</ETag>
</CompleteMultipartUploadResult>"#,
),
];

let mut writer = MultipartUploadWriter::new(
String::from(TEST_BUCKET),
String::from(TEST_KEY),
50,
S3Client::new_with(
MultipleMockRequestDispatcher::new(requests),
aws_credentials::Provider::new_mock(),
Region::UsWest2,
),
&logger,
)
.unwrap();

writer.write_all(&[0; 50]).unwrap();
writer.complete_upload().unwrap();
}

#[test]
fn multipart_upload() {
let logger = setup_test_logging();
Expand Down

0 comments on commit ba24f5f

Please sign in to comment.