Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VTX-3411: fix serialisation for multi part struct #48

Merged
merged 14 commits into from
Dec 12, 2023
2 changes: 1 addition & 1 deletion object_store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ walkdir = "2"
# Cloud storage support
base64 = { version = "0.21", default-features = false, features = ["std"], optional = true }
hyper = { version = "0.14", default-features = false, optional = true }
quick-xml = { version = "0.30.0", features = ["serialize", "overlapped-lists"], optional = true }
quick-xml = { git = "https://github.com/tafia/quick-xml.git", rev="db8546a", features = ["serialize", "overlapped-lists"], optional = true }
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in the latest version of the library (not released yet) escape level was decreased to Partial, meaning it would not escape " by default

serde = { version = "1.0", default-features = false, features = ["derive"], optional = true }
serde_json = { version = "1.0", default-features = false, optional = true }
rand = { version = "0.8", default-features = false, features = ["std", "std_rng"], optional = true }
Expand Down
53 changes: 49 additions & 4 deletions object_store/src/aws/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
use crate::aws::checksum::Checksum;
use crate::aws::credential::{AwsCredential, CredentialExt};
use crate::aws::{
AwsCredentialProvider, S3CopyIfNotExists, STORE, STRICT_ENCODE_SET, STRICT_PATH_ENCODE_SET,
AwsCredentialProvider, S3CopyIfNotExists, STORE, STRICT_ENCODE_SET,
STRICT_PATH_ENCODE_SET,
};
use crate::client::get::GetClient;
use crate::client::list::ListClient;
Expand All @@ -41,6 +42,7 @@ use reqwest::{
header::{CONTENT_LENGTH, CONTENT_TYPE},
Client as ReqwestClient, Method, Response, StatusCode,
};
use serde::ser::SerializeStruct;
use serde::{Deserialize, Serialize};
use snafu::{ResultExt, Snafu};
use std::collections::HashMap;
Expand Down Expand Up @@ -152,14 +154,24 @@ struct CompleteMultipart {
part: Vec<MultipartPart>,
}

#[derive(Debug, Serialize)]
#[derive(Debug)]
struct MultipartPart {
#[serde(rename = "ETag")]
e_tag: String,
#[serde(rename = "PartNumber")]
part_number: usize,
}

impl Serialize for MultipartPart {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let mut s = serializer.serialize_struct("Part", 2)?;
s.serialize_field("ETag", format!("\"{}\"", &self.e_tag).as_str())?;
s.serialize_field("PartNumber", &self.part_number)?;
s.end()
}
}

#[derive(Deserialize)]
#[serde(rename_all = "PascalCase", rename = "DeleteResult")]
struct BatchDeleteResponse {
Expand Down Expand Up @@ -668,3 +680,36 @@ impl ListClient for S3Client {
fn encode_path(path: &Path) -> PercentEncode<'_> {
utf8_percent_encode(path.as_ref(), &STRICT_PATH_ENCODE_SET)
}

#[cfg(test)]
mod tests {
use crate::aws::client::{CompleteMultipart, MultipartPart};
use quick_xml;

#[test]
fn test_multipart_serialization() {
let request = CompleteMultipart {
part: vec![
MultipartPart {
e_tag: "1".to_string(),
part_number: 1,
},
MultipartPart {
e_tag: "2".to_string(),
part_number: 2,
},
MultipartPart {
e_tag: "3".to_string(),
part_number: 3,
},
],
};

let body = quick_xml::se::to_string(&request).unwrap();

assert_eq!(
body,
r#"<CompleteMultipartUpload><Part><ETag>"1"</ETag><PartNumber>1</PartNumber></Part><Part><ETag>"2"</ETag><PartNumber>2</PartNumber></Part><Part><ETag>"3"</ETag><PartNumber>3</PartNumber></Part></CompleteMultipartUpload>"#
)
}
}
7 changes: 3 additions & 4 deletions object_store/src/aws/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,8 @@ impl AmazonS3 {
#[async_trait]
impl ObjectStore for AmazonS3 {
async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> {
self.client.put_request(location, bytes, &(), None).await.map(|_| ())
self.client.put_request(location, bytes, &(), None).await?;
Ok(())
}

async fn put_opts(
Expand All @@ -224,9 +225,7 @@ impl ObjectStore for AmazonS3 {
options: PutOptions,
) -> Result<()> {
if options.tags.is_empty() {
self.client
.put_request(location, bytes, &(), None)
.await?;
self.client.put_request(location, bytes, &(), None).await?;
} else {
self.client
.put_request(location, bytes, &(), Some(&options.tags))
Expand Down
2 changes: 1 addition & 1 deletion object_store/src/azure/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ struct ListResultInternal {
}

fn to_list_result(value: ListResultInternal, prefix: Option<&str>) -> Result<ListResult> {
let prefix = prefix.map(Path::from).unwrap_or_else(Path::default);
let prefix = prefix.map(Path::from).unwrap_or_default();
let common_prefixes = value
.blobs
.blob_prefix
Expand Down
7 changes: 2 additions & 5 deletions object_store/src/azure/credential.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,11 +234,8 @@ fn string_to_sign(h: &HeaderMap, u: &Url, method: &Method, account: &str) -> Str
fn canonicalize_header(headers: &HeaderMap) -> String {
let mut names = headers
.iter()
.filter_map(|(k, _)| {
(k.as_str().starts_with("x-ms"))
// TODO remove unwraps
.then(|| (k.as_str(), headers.get(k).unwrap().to_str().unwrap()))
})
.filter(|(k, _)| (k.as_str().starts_with("x-ms")))
.map(|(k, _)| (k.as_str(), headers.get(k).unwrap().to_str().unwrap()))
.collect::<Vec<_>>();
names.sort_unstable();

Expand Down
18 changes: 6 additions & 12 deletions object_store/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ pub enum ClientConfigKey {
/// User-Agent header to be used by this client
UserAgent,
/// PEM-formatted CA certificate
RootCa
RootCa,
}

impl AsRef<str> for ClientConfigKey {
Expand Down Expand Up @@ -160,11 +160,11 @@ impl FromStr for ClientConfigKey {
"proxy_url" => Ok(Self::ProxyUrl),
"timeout" => Ok(Self::Timeout),
"user_agent" => Ok(Self::UserAgent),
"root_ca" => Ok(Self::RootCa),
_ => Err(super::Error::UnknownConfigurationKey {
store: "HTTP",
key: s.into(),
}),
"root_ca" => Ok(Self::RootCa),
}
}
}
Expand Down Expand Up @@ -238,9 +238,7 @@ impl ClientOptions {
ClientConfigKey::UserAgent => {
self.user_agent = Some(ConfigValue::Deferred(value.into()))
}
ClientConfigKey::RootCa => {
self.root_ca = Some(value.into())
}
ClientConfigKey::RootCa => self.root_ca = Some(value.into()),
}
self
}
Expand Down Expand Up @@ -282,9 +280,7 @@ impl ClientOptions {
.as_ref()
.and_then(|v| v.get().ok())
.and_then(|v| v.to_str().ok().map(|s| s.to_string())),
ClientConfigKey::RootCa => self
.root_ca
.clone(),
ClientConfigKey::RootCa => self.root_ca.clone(),
}
}

Expand Down Expand Up @@ -437,7 +433,6 @@ impl ClientOptions {
self
}


/// Set a trusted CA certificate
pub fn with_root_ca(mut self, root_ca: impl Into<String>) -> Self {
self.root_ca = Some(root_ca.into());
Expand Down Expand Up @@ -600,6 +595,7 @@ pub struct StaticCredentialProvider<T> {

impl<T> StaticCredentialProvider<T> {
/// A [`CredentialProvider`] for a static credential of type `T`
#[allow(dead_code)]
pub fn new(credential: T) -> Self {
Self {
credential: Arc::new(credential),
Expand Down Expand Up @@ -811,9 +807,7 @@ mod tests {
user_agent
);
assert_eq!(
builder
.get_config_value(&ClientConfigKey::RootCa)
.unwrap(),
builder.get_config_value(&ClientConfigKey::RootCa).unwrap(),
root_ca
);
}
Expand Down
2 changes: 1 addition & 1 deletion object_store/src/gcp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -567,7 +567,7 @@ impl PutPart for GCSMultipartUpload {
#[async_trait]
impl ObjectStore for GoogleCloudStorage {
async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> {
self.client.put_request(location, bytes, None).await
self.client.put_request(location, bytes).await
}

async fn put_multipart(
Expand Down
9 changes: 6 additions & 3 deletions object_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,10 @@ pub mod throttle;
mod client;

#[cfg(feature = "cloud")]
pub use client::{backoff::BackoffConfig, retry::RetryConfig, CredentialProvider, ClientConfigKey, ClientOptions};
pub use client::{
backoff::BackoffConfig, retry::RetryConfig, ClientConfigKey, ClientOptions,
CredentialProvider,
};
use std::collections::HashMap;

#[cfg(feature = "cloud")]
Expand Down Expand Up @@ -1255,12 +1258,12 @@ mod tests {

let expected: Vec<_> = files
.iter()
.cloned()
.filter(|x| {
let prefix_match =
prefix.as_ref().map(|p| x.prefix_matches(p)).unwrap_or(true);
prefix_match && x > &offset
prefix_match && (x > &&offset)
})
.cloned()
.collect();

assert_eq!(actual, expected, "{prefix:?} - {offset:?}");
Expand Down
2 changes: 1 addition & 1 deletion object_store/src/limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

use crate::{
BoxStream, GetOptions, GetResult, GetResultPayload, ListResult, MultipartId,
ObjectMeta, ObjectStore, Path, PutOptions, Result, StreamExt,
ObjectMeta, ObjectStore, Path, PutOptions, Result, StreamExt,
};
use async_trait::async_trait;
use bytes::Bytes;
Expand Down
6 changes: 3 additions & 3 deletions object_store/src/parse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,12 @@ impl From<Error> for super::Error {
}
}

/// Recognises various URL formats, identifying the relevant [`ObjectStore`](crate::ObjectStore)
/// Recognises various URL formats, identifying the relevant [`ObjectStore`]
#[derive(Debug, Eq, PartialEq)]
enum ObjectStoreScheme {
/// Url corresponding to [`LocalFileSystem`](crate::local::LocalFileSystem)
/// Url corresponding to [`LocalFileSystem`]
Local,
/// Url corresponding to [`InMemory`](crate::memory::InMemory)
/// Url corresponding to [`InMemory`]
Memory,
/// Url corresponding to [`AmazonS3`](crate::aws::AmazonS3)
AmazonS3,
Expand Down
Loading