From fcf4aa4cfb258903daab0029277ddfed1883bd90 Mon Sep 17 00:00:00 2001 From: Brent Gardner Date: Mon, 9 Dec 2024 11:09:03 -0700 Subject: [PATCH] Fix multipart uploads with checksums on object locked buckets (#6794) Fix multipart uploads with checksums on object locked buckets (#6794) --- .github/workflows/object_store.yml | 1 + object_store/src/aws/client.rs | 39 +++++++++++++++---- object_store/src/aws/mod.rs | 60 ++++++++++++++++++++++++++++++ object_store/src/client/s3.rs | 27 ++++++++++++-- 4 files changed, 116 insertions(+), 11 deletions(-) diff --git a/.github/workflows/object_store.yml b/.github/workflows/object_store.yml index 59501b5addfe..93f809aaabd4 100644 --- a/.github/workflows/object_store.yml +++ b/.github/workflows/object_store.yml @@ -141,6 +141,7 @@ jobs: echo "LOCALSTACK_CONTAINER=$(docker run -d -p 4566:4566 localstack/localstack:4.0.3)" >> $GITHUB_ENV echo "EC2_METADATA_CONTAINER=$(docker run -d -p 1338:1338 amazon/amazon-ec2-metadata-mock:v1.9.2 --imdsv2)" >> $GITHUB_ENV aws --endpoint-url=http://localhost:4566 s3 mb s3://test-bucket + aws --endpoint-url=http://localhost:4566 s3api create-bucket --bucket test-object-lock --object-lock-enabled-for-bucket aws --endpoint-url=http://localhost:4566 dynamodb create-table --table-name test-table --key-schema AttributeName=path,KeyType=HASH AttributeName=etag,KeyType=RANGE --attribute-definitions AttributeName=path,AttributeType=S AttributeName=etag,AttributeType=S --provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5 KMS_KEY=$(aws --endpoint-url=http://localhost:4566 kms create-key --description "test key") diff --git a/object_store/src/aws/client.rs b/object_store/src/aws/client.rs index 47249685b7bb..81015e82b39c 100644 --- a/object_store/src/aws/client.rs +++ b/object_store/src/aws/client.rs @@ -29,7 +29,7 @@ use crate::client::list::ListClient; use crate::client::retry::RetryExt; use crate::client::s3::{ CompleteMultipartUpload, CompleteMultipartUploadResult, CopyPartResult, - InitiateMultipartUploadResult, ListResponse, + InitiateMultipartUploadResult, ListResponse, PartMetadata, }; use crate::client::GetOptionsExt; use crate::multipart::PartId; @@ -62,6 +62,7 @@ use std::sync::Arc; const VERSION_HEADER: &str = "x-amz-version-id"; const SHA256_CHECKSUM: &str = "x-amz-checksum-sha256"; const USER_DEFINED_METADATA_HEADER_PREFIX: &str = "x-amz-meta-"; +const ALGORITHM: &str = "x-amz-checksum-algorithm"; /// A specialized `Error` for object store-related errors #[derive(Debug, Snafu)] @@ -390,10 +391,9 @@ impl<'a> Request<'a> { let payload_sha256 = sha256.finish(); if let Some(Checksum::SHA256) = self.config.checksum { - self.builder = self.builder.header( - "x-amz-checksum-sha256", - BASE64_STANDARD.encode(payload_sha256), - ); + self.builder = self + .builder + .header(SHA256_CHECKSUM, BASE64_STANDARD.encode(payload_sha256)); } self.payload_sha256 = Some(payload_sha256); } @@ -617,8 +617,15 @@ impl S3Client { location: &Path, opts: PutMultipartOpts, ) -> Result { - let response = self - .request(Method::POST, location) + let mut request = self.request(Method::POST, location); + if let Some(algorithm) = self.config.checksum { + match algorithm { + Checksum::SHA256 => { + request = request.header(ALGORITHM, "SHA256"); + } + } + } + let response = request .query(&[("uploads", "")]) .with_encryption_headers() .with_attributes(opts.attributes) @@ -669,8 +676,13 @@ impl S3Client { request = request.with_encryption_headers(); } let response = request.send().await?; + let checksum_sha256 = response + .headers() + .get(SHA256_CHECKSUM) + .and_then(|v| v.to_str().ok()) + .map(|v| v.to_string()); - let content_id = match is_copy { + let e_tag = match is_copy { false => get_etag(response.headers()).context(MetadataSnafu)?, true => { let response = response @@ -682,6 +694,17 @@ impl S3Client { response.e_tag } }; + + let content_id = if self.config.checksum == Some(Checksum::SHA256) { + let meta = PartMetadata { + e_tag, + checksum_sha256, + }; + quick_xml::se::to_string(&meta).unwrap() + } else { + e_tag + }; + Ok(PartId { content_id }) } diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs index d7c8c9b546eb..a7f9264a6815 100644 --- a/object_store/src/aws/mod.rs +++ b/object_store/src/aws/mod.rs @@ -493,6 +493,66 @@ mod tests { const NON_EXISTENT_NAME: &str = "nonexistentname"; + #[tokio::test] + async fn write_multipart_file_with_signature() { + maybe_skip_integration!(); + + let store = AmazonS3Builder::from_env() + .with_checksum_algorithm(Checksum::SHA256) + .build() + .unwrap(); + + let str = "test.bin"; + let path = Path::parse(str).unwrap(); + let opts = PutMultipartOpts::default(); + let mut upload = store.put_multipart_opts(&path, opts).await.unwrap(); + + upload + .put_part(PutPayload::from(vec![0u8; 10_000_000])) + .await + .unwrap(); + upload + .put_part(PutPayload::from(vec![0u8; 5_000_000])) + .await + .unwrap(); + + let res = upload.complete().await.unwrap(); + assert!(res.e_tag.is_some(), "Should have valid etag"); + + store.delete(&path).await.unwrap(); + } + + #[tokio::test] + async fn write_multipart_file_with_signature_object_lock() { + maybe_skip_integration!(); + + let bucket = "test-object-lock"; + let store = AmazonS3Builder::from_env() + .with_bucket_name(bucket) + .with_checksum_algorithm(Checksum::SHA256) + .build() + .unwrap(); + + let str = "test.bin"; + let path = Path::parse(str).unwrap(); + let opts = PutMultipartOpts::default(); + let mut upload = store.put_multipart_opts(&path, opts).await.unwrap(); + + upload + .put_part(PutPayload::from(vec![0u8; 10_000_000])) + .await + .unwrap(); + upload + .put_part(PutPayload::from(vec![0u8; 5_000_000])) + .await + .unwrap(); + + let res = upload.complete().await.unwrap(); + assert!(res.e_tag.is_some(), "Should have valid etag"); + + store.delete(&path).await.unwrap(); + } + #[tokio::test] async fn s3_test() { maybe_skip_integration!(); diff --git a/object_store/src/client/s3.rs b/object_store/src/client/s3.rs index dba752cb1251..7fe956b2376e 100644 --- a/object_store/src/client/s3.rs +++ b/object_store/src/client/s3.rs @@ -106,14 +106,32 @@ pub(crate) struct CompleteMultipartUpload { pub part: Vec, } +#[derive(Serialize, Deserialize)] +pub(crate) struct PartMetadata { + pub e_tag: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub checksum_sha256: Option, +} + impl From> for CompleteMultipartUpload { fn from(value: Vec) -> Self { let part = value .into_iter() .enumerate() - .map(|(part_number, part)| MultipartPart { - e_tag: part.content_id, - part_number: part_number + 1, + .map(|(part_idx, part)| { + let md = match quick_xml::de::from_str::(&part.content_id) { + Ok(md) => md, + // fallback to old way + Err(_) => PartMetadata { + e_tag: part.content_id.clone(), + checksum_sha256: None, + }, + }; + MultipartPart { + e_tag: md.e_tag, + part_number: part_idx + 1, + checksum_sha256: md.checksum_sha256, + } }) .collect(); Self { part } @@ -126,6 +144,9 @@ pub(crate) struct MultipartPart { pub e_tag: String, #[serde(rename = "PartNumber")] pub part_number: usize, + #[serde(rename = "ChecksumSHA256")] + #[serde(skip_serializing_if = "Option::is_none")] + pub checksum_sha256: Option, } #[derive(Debug, Deserialize)]