-
Notifications
You must be signed in to change notification settings - Fork 847
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support non-contiguous put payloads / vectored writes (#5514) #5538
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -35,7 +35,8 @@ use crate::client::GetOptionsExt; | |
use crate::multipart::PartId; | ||
use crate::path::DELIMITER; | ||
use crate::{ | ||
ClientOptions, GetOptions, ListResult, MultipartId, Path, PutResult, Result, RetryConfig, | ||
ClientOptions, GetOptions, ListResult, MultipartId, Path, PutPayload, PutResult, Result, | ||
RetryConfig, | ||
}; | ||
use async_trait::async_trait; | ||
use base64::prelude::BASE64_STANDARD; | ||
|
@@ -51,11 +52,14 @@ use reqwest::{ | |
header::{CONTENT_LENGTH, CONTENT_TYPE}, | ||
Client as ReqwestClient, Method, RequestBuilder, Response, | ||
}; | ||
use ring::digest; | ||
use ring::digest::Context; | ||
use serde::{Deserialize, Serialize}; | ||
use snafu::{ResultExt, Snafu}; | ||
use std::sync::Arc; | ||
|
||
const VERSION_HEADER: &str = "x-amz-version-id"; | ||
const SHA256_CHECKSUM: &str = "x-amz-checksum-sha256"; | ||
|
||
/// A specialized `Error` for object store-related errors | ||
#[derive(Debug, Snafu)] | ||
|
@@ -266,7 +270,8 @@ pub(crate) struct Request<'a> { | |
path: &'a Path, | ||
config: &'a S3Config, | ||
builder: RequestBuilder, | ||
payload_sha256: Option<Vec<u8>>, | ||
payload_sha256: Option<digest::Digest>, | ||
payload: Option<PutPayload>, | ||
use_session_creds: bool, | ||
idempotent: bool, | ||
} | ||
|
@@ -286,7 +291,7 @@ impl<'a> Request<'a> { | |
Self { builder, ..self } | ||
} | ||
|
||
pub fn set_idempotent(mut self, idempotent: bool) -> Self { | ||
pub fn idempotent(mut self, idempotent: bool) -> Self { | ||
self.idempotent = idempotent; | ||
self | ||
} | ||
|
@@ -301,10 +306,15 @@ impl<'a> Request<'a> { | |
}, | ||
}; | ||
|
||
let sha = self.payload_sha256.as_ref().map(|x| x.as_ref()); | ||
|
||
let path = self.path.as_ref(); | ||
self.builder | ||
.with_aws_sigv4(credential.authorizer(), self.payload_sha256.as_deref()) | ||
.send_retry_with_idempotency(&self.config.retry_config, self.idempotent) | ||
.with_aws_sigv4(credential.authorizer(), sha) | ||
.retryable(&self.config.retry_config) | ||
.idempotent(self.idempotent) | ||
.payload(self.payload) | ||
.send() | ||
.await | ||
.context(RetrySnafu { path }) | ||
} | ||
|
@@ -333,37 +343,35 @@ impl S3Client { | |
pub fn put_request<'a>( | ||
&'a self, | ||
path: &'a Path, | ||
bytes: Bytes, | ||
payload: PutPayload, | ||
with_encryption_headers: bool, | ||
) -> Request<'a> { | ||
let url = self.config.path_url(path); | ||
let mut builder = self.client.request(Method::PUT, url); | ||
if with_encryption_headers { | ||
builder = builder.headers(self.config.encryption_headers.clone().into()); | ||
} | ||
let mut payload_sha256 = None; | ||
|
||
if let Some(checksum) = self.config.checksum { | ||
let digest = checksum.digest(&bytes); | ||
builder = builder.header(checksum.header_name(), BASE64_STANDARD.encode(&digest)); | ||
if checksum == Checksum::SHA256 { | ||
payload_sha256 = Some(digest); | ||
} | ||
} | ||
let mut sha256 = Context::new(&digest::SHA256); | ||
payload.iter().for_each(|x| sha256.update(x)); | ||
let payload_sha256 = sha256.finish(); | ||
|
||
builder = match bytes.is_empty() { | ||
true => builder.header(CONTENT_LENGTH, 0), // Handle empty uploads (#4514) | ||
false => builder.body(bytes), | ||
}; | ||
if let Some(Checksum::SHA256) = self.config.checksum { | ||
builder = builder.header( | ||
"x-amz-checksum-sha256", | ||
BASE64_STANDARD.encode(payload_sha256), | ||
) | ||
} | ||
|
||
if let Some(value) = self.config.client_options.get_content_type(path) { | ||
builder = builder.header(CONTENT_TYPE, value); | ||
} | ||
|
||
Request { | ||
path, | ||
builder, | ||
payload_sha256, | ||
builder: builder.header(CONTENT_LENGTH, payload.content_length()), | ||
payload: Some(payload), | ||
payload_sha256: Some(payload_sha256), | ||
config: &self.config, | ||
use_session_creds: true, | ||
idempotent: false, | ||
|
@@ -446,16 +454,8 @@ impl S3Client { | |
|
||
let mut builder = self.client.request(Method::POST, url); | ||
|
||
// Compute checksum - S3 *requires* this for DeleteObjects requests, so we default to | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Cleanup given SHA256 is the only checksum we support and are ever likely to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (and to be clear for anyone else reading this PR -- this code is not exposed publicly so it can be changed in the future if needed as well) |
||
// their algorithm if the user hasn't specified one. | ||
let checksum = self.config.checksum.unwrap_or(Checksum::SHA256); | ||
let digest = checksum.digest(&body); | ||
builder = builder.header(checksum.header_name(), BASE64_STANDARD.encode(&digest)); | ||
let payload_sha256 = if checksum == Checksum::SHA256 { | ||
Some(digest) | ||
} else { | ||
None | ||
}; | ||
let digest = digest::digest(&digest::SHA256, &body); | ||
builder = builder.header(SHA256_CHECKSUM, BASE64_STANDARD.encode(digest)); | ||
|
||
// S3 *requires* DeleteObjects to include a Content-MD5 header: | ||
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html | ||
|
@@ -468,8 +468,8 @@ impl S3Client { | |
let response = builder | ||
.header(CONTENT_TYPE, "application/xml") | ||
.body(body) | ||
.with_aws_sigv4(credential.authorizer(), payload_sha256.as_deref()) | ||
.send_retry_with_idempotency(&self.config.retry_config, false) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
.with_aws_sigv4(credential.authorizer(), Some(digest.as_ref())) | ||
.send_retry(&self.config.retry_config) | ||
.await | ||
.context(DeleteObjectsRequestSnafu {})? | ||
.bytes() | ||
|
@@ -515,6 +515,7 @@ impl S3Client { | |
builder, | ||
path: from, | ||
config: &self.config, | ||
payload: None, | ||
payload_sha256: None, | ||
use_session_creds: false, | ||
idempotent: false, | ||
|
@@ -530,7 +531,9 @@ impl S3Client { | |
.request(Method::POST, url) | ||
.headers(self.config.encryption_headers.clone().into()) | ||
.with_aws_sigv4(credential.authorizer(), None) | ||
.send_retry_with_idempotency(&self.config.retry_config, true) | ||
.retryable(&self.config.retry_config) | ||
.idempotent(true) | ||
.send() | ||
.await | ||
.context(CreateMultipartRequestSnafu)? | ||
.bytes() | ||
|
@@ -548,14 +551,14 @@ impl S3Client { | |
path: &Path, | ||
upload_id: &MultipartId, | ||
part_idx: usize, | ||
data: Bytes, | ||
data: PutPayload, | ||
) -> Result<PartId> { | ||
let part = (part_idx + 1).to_string(); | ||
|
||
let response = self | ||
.put_request(path, data, false) | ||
.query(&[("partNumber", &part), ("uploadId", upload_id)]) | ||
.set_idempotent(true) | ||
.idempotent(true) | ||
.send() | ||
.await?; | ||
|
||
|
@@ -573,7 +576,7 @@ impl S3Client { | |
// If no parts were uploaded, upload an empty part | ||
// otherwise the completion request will fail | ||
let part = self | ||
.put_part(location, &upload_id.to_string(), 0, Bytes::new()) | ||
.put_part(location, &upload_id.to_string(), 0, PutPayload::default()) | ||
.await?; | ||
vec![part] | ||
} else { | ||
|
@@ -591,7 +594,9 @@ impl S3Client { | |
.query(&[("uploadId", upload_id)]) | ||
.body(body) | ||
.with_aws_sigv4(credential.authorizer(), None) | ||
.send_retry_with_idempotency(&self.config.retry_config, true) | ||
.retryable(&self.config.retry_config) | ||
.idempotent(true) | ||
.send() | ||
.await | ||
.context(CompleteMultipartRequestSnafu)?; | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -186,11 +186,7 @@ impl DynamoCommit { | |
to: &Path, | ||
) -> Result<()> { | ||
self.conditional_op(client, to, None, || async { | ||
client | ||
.copy_request(from, to) | ||
.set_idempotent(false) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
.send() | ||
.await?; | ||
client.copy_request(from, to).send().await?; | ||
Ok(()) | ||
}) | ||
.await | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are unrelated cleanups right? Or is there a reason
digest::Digest
is preferred overVec<u8>
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It avoids an allocation, but yes an unrelated cleanup