diff --git a/.github/workflows/arrow.yml b/.github/workflows/arrow.yml index d3b2526740fa..2277fd9458ae 100644 --- a/.github/workflows/arrow.yml +++ b/.github/workflows/arrow.yml @@ -151,49 +151,3 @@ jobs: run: cargo build -p arrow --no-default-features --features=json,csv,ipc,ffi --target wasm32-unknown-unknown - name: Build wasm32-wasi run: cargo build -p arrow --no-default-features --features=json,csv,ipc,ffi --target wasm32-wasi - - clippy: - name: Clippy - runs-on: ubuntu-latest - container: - image: amd64/rust - steps: - - uses: actions/checkout@v4 - - name: Setup Rust toolchain - uses: ./.github/actions/setup-builder - - name: Setup Clippy - run: rustup component add clippy - - name: Clippy arrow-buffer with all features - run: cargo clippy -p arrow-buffer --all-targets --all-features -- -D warnings - - name: Clippy arrow-data with all features - run: cargo clippy -p arrow-data --all-targets --all-features -- -D warnings - - name: Clippy arrow-schema with all features - run: cargo clippy -p arrow-schema --all-targets --all-features -- -D warnings - - name: Clippy arrow-array with all features - run: cargo clippy -p arrow-array --all-targets --all-features -- -D warnings - - name: Clippy arrow-select with all features - run: cargo clippy -p arrow-select --all-targets --all-features -- -D warnings - - name: Clippy arrow-cast with all features - run: cargo clippy -p arrow-cast --all-targets --all-features -- -D warnings - - name: Clippy arrow-ipc with all features - run: cargo clippy -p arrow-ipc --all-targets --all-features -- -D warnings - - name: Clippy arrow-csv with all features - run: cargo clippy -p arrow-csv --all-targets --all-features -- -D warnings - - name: Clippy arrow-json with all features - run: cargo clippy -p arrow-json --all-targets --all-features -- -D warnings - - name: Clippy arrow-avro with all features - run: cargo clippy -p arrow-avro --all-targets --all-features -- -D warnings - - name: Clippy arrow-string with all features - run: cargo clippy -p arrow-string --all-targets --all-features -- -D warnings - - name: Clippy arrow-ord with all features - run: cargo clippy -p arrow-ord --all-targets --all-features -- -D warnings - - name: Clippy arrow-arith with all features - run: cargo clippy -p arrow-arith --all-targets --all-features -- -D warnings - - name: Clippy arrow-row with all features - run: cargo clippy -p arrow-row --all-targets --all-features -- -D warnings - - name: Clippy arrow with all features - run: cargo clippy -p arrow --all-features --all-targets -- -D warnings - - name: Clippy arrow-integration-test with all features - run: cargo clippy -p arrow-integration-test --all-targets --all-features -- -D warnings - - name: Clippy arrow-integration-testing with all features - run: cargo clippy -p arrow-integration-testing --all-targets --all-features -- -D warnings diff --git a/.github/workflows/arrow_flight.yml b/.github/workflows/arrow_flight.yml index 242e0f2a3b0d..974ceb5e0a3d 100644 --- a/.github/workflows/arrow_flight.yml +++ b/.github/workflows/arrow_flight.yml @@ -75,17 +75,3 @@ jobs: run: ./arrow-flight/regen.sh - name: Verify workspace clean (if this fails, run ./arrow-flight/regen.sh and check in results) run: git diff --exit-code - - clippy: - name: Clippy - runs-on: ubuntu-latest - container: - image: amd64/rust - steps: - - uses: actions/checkout@v4 - - name: Setup Rust toolchain - uses: ./.github/actions/setup-builder - - name: Setup Clippy - run: rustup component add clippy - - name: Run clippy - run: cargo clippy -p arrow-flight --all-targets --all-features -- -D warnings diff --git a/.github/workflows/integration.yml b/.github/workflows/integration.yml index 1937fafe3a62..fb7ae4f1a48c 100644 --- a/.github/workflows/integration.yml +++ b/.github/workflows/integration.yml @@ -129,7 +129,7 @@ jobs: run: | rustup toolchain install ${{ matrix.rust }} rustup default ${{ matrix.rust }} - rustup component add rustfmt clippy + rustup component add rustfmt - name: Cache Cargo uses: actions/cache@v4 with: diff --git a/.github/workflows/object_store.yml b/.github/workflows/object_store.yml index bdbfc0bec4bb..76694c94bb3e 100644 --- a/.github/workflows/object_store.yml +++ b/.github/workflows/object_store.yml @@ -34,39 +34,6 @@ on: - .github/** jobs: - clippy: - name: Clippy - runs-on: ubuntu-latest - container: - image: amd64/rust - defaults: - run: - working-directory: object_store - steps: - - uses: actions/checkout@v4 - - name: Setup Rust toolchain - uses: ./.github/actions/setup-builder - - name: Setup Clippy - run: rustup component add clippy - # Run different tests for the library on its own as well as - # all targets to ensure that it still works in the absence of - # features that might be enabled by dev-dependencies of other - # targets. - - name: Run clippy with default features - run: cargo clippy -- -D warnings - - name: Run clippy with aws feature - run: cargo clippy --features aws -- -D warnings - - name: Run clippy with gcp feature - run: cargo clippy --features gcp -- -D warnings - - name: Run clippy with azure feature - run: cargo clippy --features azure -- -D warnings - - name: Run clippy with http feature - run: cargo clippy --features http -- -D warnings - - name: Run clippy with all features - run: cargo clippy --all-features -- -D warnings - - name: Run clippy with all features and all targets - run: cargo clippy --all-features --all-targets -- -D warnings - # test doc links still work # # Note that since object_store is not part of the main workspace, @@ -141,6 +108,7 @@ jobs: echo "LOCALSTACK_CONTAINER=$(docker run -d -p 4566:4566 localstack/localstack:3.3.0)" >> $GITHUB_ENV echo "EC2_METADATA_CONTAINER=$(docker run -d -p 1338:1338 amazon/amazon-ec2-metadata-mock:v1.9.2 --imdsv2)" >> $GITHUB_ENV aws --endpoint-url=http://localhost:4566 s3 mb s3://test-bucket + aws --endpoint-url=http://localhost:4566 s3api create-bucket --bucket test-object-lock --object-lock-enabled-for-bucket aws --endpoint-url=http://localhost:4566 dynamodb create-table --table-name test-table --key-schema AttributeName=path,KeyType=HASH AttributeName=etag,KeyType=RANGE --attribute-definitions AttributeName=path,AttributeType=S AttributeName=etag,AttributeType=S --provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5 KMS_KEY=$(aws --endpoint-url=http://localhost:4566 kms create-key --description "test key") diff --git a/.github/workflows/parquet.yml b/.github/workflows/parquet.yml index a4e654892662..37c1b38ed71c 100644 --- a/.github/workflows/parquet.yml +++ b/.github/workflows/parquet.yml @@ -164,17 +164,3 @@ jobs: run: | cd parquet/pytest pytest -v - - clippy: - name: Clippy - runs-on: ubuntu-latest - container: - image: amd64/rust - steps: - - uses: actions/checkout@v4 - - name: Setup Rust toolchain - uses: ./.github/actions/setup-builder - - name: Setup Clippy - run: rustup component add clippy - - name: Run clippy - run: cargo clippy -p parquet --all-targets --all-features -- -D warnings diff --git a/.github/workflows/parquet_derive.yml b/.github/workflows/parquet_derive.yml index d8b02f73a8aa..0cf25b27190f 100644 --- a/.github/workflows/parquet_derive.yml +++ b/.github/workflows/parquet_derive.yml @@ -50,17 +50,3 @@ jobs: uses: ./.github/actions/setup-builder - name: Test run: cargo test -p parquet_derive - - clippy: - name: Clippy - runs-on: ubuntu-latest - container: - image: amd64/rust - steps: - - uses: actions/checkout@v4 - - name: Setup Rust toolchain - uses: ./.github/actions/setup-builder - - name: Setup Clippy - run: rustup component add clippy - - name: Run clippy - run: cargo clippy -p parquet_derive --all-features -- -D warnings diff --git a/object_store/src/aws/client.rs b/object_store/src/aws/client.rs index ab4da86f504b..faa9d36fd6a9 100644 --- a/object_store/src/aws/client.rs +++ b/object_store/src/aws/client.rs @@ -29,7 +29,7 @@ use crate::client::list::ListClient; use crate::client::retry::RetryExt; use crate::client::s3::{ CompleteMultipartUpload, CompleteMultipartUploadResult, InitiateMultipartUploadResult, - ListResponse, + ListResponse, PartMetadata, }; use crate::client::GetOptionsExt; use crate::multipart::PartId; @@ -62,6 +62,7 @@ use std::sync::Arc; const VERSION_HEADER: &str = "x-amz-version-id"; const SHA256_CHECKSUM: &str = "x-amz-checksum-sha256"; const USER_DEFINED_METADATA_HEADER_PREFIX: &str = "x-amz-meta-"; +const ALGORITHM: &str = "x-amz-checksum-algorithm"; /// A specialized `Error` for object store-related errors #[derive(Debug, Snafu)] @@ -349,10 +350,9 @@ impl<'a> Request<'a> { let payload_sha256 = sha256.finish(); if let Some(Checksum::SHA256) = self.config.checksum { - self.builder = self.builder.header( - "x-amz-checksum-sha256", - BASE64_STANDARD.encode(payload_sha256), - ); + self.builder = self + .builder + .header(SHA256_CHECKSUM, BASE64_STANDARD.encode(payload_sha256)); } self.payload_sha256 = Some(payload_sha256); } @@ -534,8 +534,11 @@ impl S3Client { location: &Path, opts: PutMultipartOpts, ) -> Result { - let response = self - .request(Method::POST, location) + let mut reqquest = self.request(Method::POST, location); + if let Some(algorithm) = self.config.checksum { + reqquest = reqquest.header(ALGORITHM, &algorithm.to_string().to_uppercase()); + } + let response = reqquest .query(&[("uploads", "")]) .with_encryption_headers() .with_attributes(opts.attributes) @@ -569,8 +572,21 @@ impl S3Client { .idempotent(true) .send() .await?; + let checksum = response + .headers() + .get(SHA256_CHECKSUM) + .and_then(|v| v.to_str().ok()) + .map(|v| v.to_string()); + + let e_tag = get_etag(response.headers()).context(MetadataSnafu)?; + + let content_id = if self.config.checksum == Some(Checksum::SHA256) { + let meta = PartMetadata { e_tag, checksum }; + quick_xml::se::to_string(&meta).unwrap() + } else { + e_tag + }; - let content_id = get_etag(response.headers()).context(MetadataSnafu)?; Ok(PartId { content_id }) } diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs index f5204a5365ed..caf42150cd29 100644 --- a/object_store/src/aws/mod.rs +++ b/object_store/src/aws/mod.rs @@ -416,6 +416,66 @@ mod tests { const NON_EXISTENT_NAME: &str = "nonexistentname"; + #[tokio::test] + async fn write_multipart_file_with_signature() { + maybe_skip_integration!(); + + let store = AmazonS3Builder::from_env() + .with_checksum_algorithm(Checksum::SHA256) + .build() + .unwrap(); + + let str = "test.bin"; + let path = Path::parse(str).unwrap(); + let opts = PutMultipartOpts::default(); + let mut upload = store.put_multipart_opts(&path, opts).await.unwrap(); + + upload + .put_part(PutPayload::from(vec![0u8; 10_000_000])) + .await + .unwrap(); + upload + .put_part(PutPayload::from(vec![0u8; 5_000_000])) + .await + .unwrap(); + + let res = upload.complete().await.unwrap(); + assert!(res.e_tag.is_some(), "Should have valid etag"); + + store.delete(&path).await.unwrap(); + } + + #[tokio::test] + async fn write_multipart_file_with_signature_object_lock() { + maybe_skip_integration!(); + + let bucket = "test-object-lock"; + let store = AmazonS3Builder::from_env() + .with_bucket_name(bucket) + .with_checksum_algorithm(Checksum::SHA256) + .build() + .unwrap(); + + let str = "test.bin"; + let path = Path::parse(str).unwrap(); + let opts = PutMultipartOpts::default(); + let mut upload = store.put_multipart_opts(&path, opts).await.unwrap(); + + upload + .put_part(PutPayload::from(vec![0u8; 10_000_000])) + .await + .unwrap(); + upload + .put_part(PutPayload::from(vec![0u8; 5_000_000])) + .await + .unwrap(); + + let res = upload.complete().await.unwrap(); + assert!(res.e_tag.is_some(), "Should have valid etag"); + + store.delete(&path).await.unwrap(); + } + #[tokio::test] async fn s3_test() { maybe_skip_integration!(); diff --git a/object_store/src/client/s3.rs b/object_store/src/client/s3.rs index 61237dc4beab..e091086aabb3 100644 --- a/object_store/src/client/s3.rs +++ b/object_store/src/client/s3.rs @@ -98,14 +98,32 @@ pub struct CompleteMultipartUpload { pub part: Vec, } +#[derive(Serialize, Deserialize)] +pub(crate) struct PartMetadata { + pub e_tag: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub checksum: Option, +} + impl From> for CompleteMultipartUpload { fn from(value: Vec) -> Self { let part = value .into_iter() .enumerate() - .map(|(part_number, part)| MultipartPart { - e_tag: part.content_id, - part_number: part_number + 1, + .map(|(part_idx, part)| { + let md = match quick_xml::de::from_str::(&part.content_id) { + Ok(md) => md, + // fallback to old way + Err(_) => PartMetadata { + e_tag: part.content_id.clone(), + checksum: None, + }, + }; + MultipartPart { + e_tag: md.e_tag, + part_number: part_idx + 1, + checksum_sha256: md.checksum, + } }) .collect(); Self { part } @@ -118,6 +136,9 @@ pub struct MultipartPart { pub e_tag: String, #[serde(rename = "PartNumber")] pub part_number: usize, + #[serde(rename = "ChecksumSHA256")] + #[serde(skip_serializing_if = "Option::is_none")] + pub checksum_sha256: Option, } #[derive(Debug, Deserialize)] diff --git a/object_store/src/upload.rs b/object_store/src/upload.rs index 4df4d8fd46ad..6b686f63edf0 100644 --- a/object_store/src/upload.rs +++ b/object_store/src/upload.rs @@ -220,13 +220,13 @@ impl WriteMultipart { /// Flush final chunk, and await completion of all in-flight requests pub async fn finish(mut self) -> Result { + self.wait_for_capacity(0).await?; + if !self.buffer.is_empty() { let part = std::mem::take(&mut self.buffer); - self.put_part(part.into()) + self.upload.put_part(part.into()).await?; } - self.wait_for_capacity(0).await?; - match self.upload.complete().await { Err(e) => { self.tasks.shutdown().await;