diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index 721160f1ed4..735dc321972 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -6577,6 +6577,7 @@ dependencies = [ "lru", "md5", "mockall", + "murmurhash32", "once_cell", "opendal", "pin-project", diff --git a/quickwit/Cargo.toml b/quickwit/Cargo.toml index 6fcf67c2e10..4e34668f3a0 100644 --- a/quickwit/Cargo.toml +++ b/quickwit/Cargo.toml @@ -152,6 +152,7 @@ matches = "0.1.9" md5 = "0.7" mime_guess = "2.0.4" mockall = "0.11" +murmurhash32 = "0.3" mrecordlog = { git = "https://github.com/quickwit-oss/mrecordlog", rev = "306c0a7" } new_string_template = "1.5.1" nom = "7.1.3" diff --git a/quickwit/quickwit-config/src/storage_config.rs b/quickwit/quickwit-config/src/storage_config.rs index 651271d0c61..9c50e55af48 100644 --- a/quickwit/quickwit-config/src/storage_config.rs +++ b/quickwit/quickwit-config/src/storage_config.rs @@ -26,6 +26,7 @@ use itertools::Itertools; use quickwit_common::get_bool_from_env; use serde::{Deserialize, Serialize}; use serde_with::{serde_as, EnumMap}; +use tracing::warn; /// Lists the storage backends supported by Quickwit. #[derive(Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Hash, Serialize, Deserialize)] @@ -93,6 +94,9 @@ impl StorageConfigs { } pub fn validate(&self) -> anyhow::Result<()> { + for storage_config in self.0.iter() { + storage_config.validate()?; + } let backends: Vec = self .0 .iter() @@ -216,6 +220,14 @@ impl StorageConfig { _ => None, } } + + pub fn validate(&self) -> anyhow::Result<()> { + if let StorageConfig::S3(config) = self { + config.validate() + } else { + Ok(()) + } + } } impl From for StorageConfig { @@ -313,6 +325,8 @@ impl fmt::Debug for AzureStorageConfig { } } +const MAX_S3_HASH_PREFIX_CARDINALITY: usize = 16usize.pow(3); + #[derive(Clone, Default, Eq, PartialEq, Serialize, Deserialize)] #[serde(deny_unknown_fields)] pub struct S3StorageConfig { @@ -334,9 +348,30 @@ pub struct S3StorageConfig { pub disable_multi_object_delete: bool, #[serde(default)] pub disable_multipart_upload: bool, + #[serde(default)] + #[serde(skip_serializing_if = "lower_than_2")] + pub hash_prefix_cardinality: usize, +} + +fn lower_than_2(n: &usize) -> bool { + *n < 2 } impl S3StorageConfig { + fn validate(&self) -> anyhow::Result<()> { + if self.hash_prefix_cardinality == 1 { + warn!("A hash prefix of 1 will be ignored"); + } + if self.hash_prefix_cardinality > MAX_S3_HASH_PREFIX_CARDINALITY { + anyhow::bail!( + "hash_prefix_cardinality can take values of at most \ + {MAX_S3_HASH_PREFIX_CARDINALITY}, currently set to {}", + self.hash_prefix_cardinality + ); + } + Ok(()) + } + fn apply_flavor(&mut self) { match self.flavor { Some(StorageBackendFlavor::DigitalOcean) => { @@ -383,7 +418,7 @@ impl S3StorageConfig { } impl fmt::Debug for S3StorageConfig { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.debug_struct("S3StorageConfig") .field("access_key_id", &self.access_key_id) .field( diff --git a/quickwit/quickwit-storage/Cargo.toml b/quickwit/quickwit-storage/Cargo.toml index c883c61c265..385ec152683 100644 --- a/quickwit/quickwit-storage/Cargo.toml +++ b/quickwit/quickwit-storage/Cargo.toml @@ -26,6 +26,7 @@ once_cell = { workspace = true } pin-project = { workspace = true } rand = { workspace = true } regex = { workspace = true } +murmurhash32 = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } tantivy = { workspace = true } diff --git a/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs b/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs index 86ef692c671..215f09d2d64 100644 --- a/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs +++ b/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs @@ -86,11 +86,13 @@ pub struct S3CompatibleObjectStorage { s3_client: S3Client, uri: Uri, bucket: String, - prefix: PathBuf, + prefix: String, multipart_policy: MultiPartPolicy, retry_params: RetryParams, disable_multi_object_delete: bool, disable_multipart_upload: bool, + // If 0, we don't have any prefix + hash_prefix_cardinality: usize, } impl fmt::Debug for S3CompatibleObjectStorage { @@ -99,6 +101,7 @@ impl fmt::Debug for S3CompatibleObjectStorage { .debug_struct("S3CompatibleObjectStorage") .field("bucket", &self.bucket) .field("prefix", &self.prefix) + .field("hash_prefix_cardinality", &self.hash_prefix_cardinality) .finish() } } @@ -181,11 +184,12 @@ impl S3CompatibleObjectStorage { s3_client, uri: uri.clone(), bucket, - prefix, + prefix: prefix.to_string_lossy().to_string(), multipart_policy: MultiPartPolicy::default(), retry_params, disable_multi_object_delete, disable_multipart_upload, + hash_prefix_cardinality: s3_storage_config.hash_prefix_cardinality, }) } @@ -193,7 +197,7 @@ impl S3CompatibleObjectStorage { /// /// This method overrides any existing prefix. (It does NOT /// append the argument to any existing prefix.) - pub fn with_prefix(self, prefix: PathBuf) -> Self { + pub fn with_prefix(self, prefix: String) -> Self { Self { s3_client: self.s3_client, uri: self.uri, @@ -203,6 +207,7 @@ impl S3CompatibleObjectStorage { retry_params: self.retry_params, disable_multi_object_delete: self.disable_multi_object_delete, disable_multipart_upload: self.disable_multipart_upload, + hash_prefix_cardinality: self.hash_prefix_cardinality, } } @@ -262,12 +267,49 @@ async fn compute_md5(mut read: T) -> io::Resu } } } +const HEX_ALPHABET: [u8; 16] = *b"0123456789abcdef"; +const UNINITIALIZED_HASH_PREFIX: &str = "00000000"; + +fn build_key(prefix: &str, relative_path: &str, hash_prefix_cardinality: usize) -> String { + let mut key = String::with_capacity( + UNINITIALIZED_HASH_PREFIX.len() + 1 + prefix.len() + 1 + relative_path.len(), + ); + if hash_prefix_cardinality > 1 { + key.push_str(UNINITIALIZED_HASH_PREFIX); + key.push('/'); + } + key.push_str(prefix); + if key.as_bytes().last().copied() != Some(b'/') { + key.push('/'); + } + key.push_str(relative_path); + // We then set up the prefix. + if hash_prefix_cardinality > 1 { + let key_without_prefix = &key.as_bytes()[UNINITIALIZED_HASH_PREFIX.len() + 1..]; + let mut prefix_hash: usize = + murmurhash32::murmurhash3(key_without_prefix) as usize % hash_prefix_cardinality; + unsafe { + let prefix_buf: &mut [u8] = &mut key.as_bytes_mut()[..UNINITIALIZED_HASH_PREFIX.len()]; + for prefix_byte in prefix_buf { + let hex: u8 = HEX_ALPHABET[(prefix_hash % 16) as usize]; + *prefix_byte = hex; + if prefix_hash < 16 { + break; + } + prefix_hash /= 16; + } + } + } + key +} impl S3CompatibleObjectStorage { fn key(&self, relative_path: &Path) -> String { - // FIXME: This may not work on Windows. - let key_path = self.prefix.join(relative_path); - key_path.to_string_lossy().to_string() + build_key( + &self.prefix, + relative_path.to_string_lossy().as_ref(), + self.hash_prefix_cardinality, + ) } fn relative_path(&self, key: &str) -> PathBuf { @@ -945,13 +987,13 @@ mod tests { let s3_client = S3Client::new(&sdk_config); let uri = Uri::for_test("s3://bucket/indexes"); let bucket = "bucket".to_string(); - let prefix = PathBuf::new(); let mut s3_storage = S3CompatibleObjectStorage { s3_client, uri, bucket, - prefix, + prefix: String::new(), + hash_prefix_cardinality: 0, multipart_policy: MultiPartPolicy::default(), retry_params: RetryParams::for_test(), disable_multi_object_delete: false, @@ -962,7 +1004,7 @@ mod tests { PathBuf::from("indexes/foo") ); - s3_storage.prefix = PathBuf::from("indexes"); + s3_storage.prefix = "indexes".to_string(); assert_eq!( s3_storage.relative_path("indexes/foo"), @@ -1000,13 +1042,13 @@ mod tests { let s3_client = S3Client::from_conf(config); let uri = Uri::for_test("s3://bucket/indexes"); let bucket = "bucket".to_string(); - let prefix = PathBuf::new(); let s3_storage = S3CompatibleObjectStorage { s3_client, uri, bucket, - prefix, + prefix: String::new(), + hash_prefix_cardinality: 0, multipart_policy: MultiPartPolicy::default(), retry_params: RetryParams::for_test(), disable_multi_object_delete: true, @@ -1041,13 +1083,13 @@ mod tests { let s3_client = S3Client::from_conf(config); let uri = Uri::for_test("s3://bucket/indexes"); let bucket = "bucket".to_string(); - let prefix = PathBuf::new(); let s3_storage = S3CompatibleObjectStorage { s3_client, uri, bucket, - prefix, + prefix: String::new(), + hash_prefix_cardinality: 0, multipart_policy: MultiPartPolicy::default(), retry_params: RetryParams::for_test(), disable_multi_object_delete: false, @@ -1123,13 +1165,13 @@ mod tests { let s3_client = S3Client::from_conf(config); let uri = Uri::for_test("s3://bucket/indexes"); let bucket = "bucket".to_string(); - let prefix = PathBuf::new(); let s3_storage = S3CompatibleObjectStorage { s3_client, uri, bucket, - prefix, + prefix: String::new(), + hash_prefix_cardinality: 0, multipart_policy: MultiPartPolicy::default(), retry_params: RetryParams::for_test(), disable_multi_object_delete: false, @@ -1216,13 +1258,13 @@ mod tests { let s3_client = S3Client::from_conf(config); let uri = Uri::for_test("s3://bucket/indexes"); let bucket = "bucket".to_string(); - let prefix = PathBuf::new(); let s3_storage = S3CompatibleObjectStorage { s3_client, uri, bucket, - prefix, + prefix: String::new(), + hash_prefix_cardinality: 0, multipart_policy: MultiPartPolicy::default(), retry_params: RetryParams::for_test(), disable_multi_object_delete: false, @@ -1233,4 +1275,19 @@ mod tests { .await .unwrap(); } + + #[test] + fn test_build_key() { + assert_eq!(build_key("hello", "coucou", 0), "hello/coucou"); + assert_eq!(build_key("hello/", "coucou", 0), "hello/coucou"); + assert_eq!(build_key("hello/", "coucou", 1), "hello/coucou"); + assert_eq!(build_key("hello", "coucou", 1), "hello/coucou"); + assert_eq!(build_key("hello/", "coucou", 2), "10000000/hello/coucou"); + assert_eq!(build_key("hello", "coucou", 2), "10000000/hello/coucou"); + assert_eq!(build_key("hello/", "coucou", 16), "d0000000/hello/coucou"); + assert_eq!(build_key("hello", "coucou", 16), "d0000000/hello/coucou"); + assert_eq!(build_key("hello/", "coucou", 17), "50000000/hello/coucou"); + assert_eq!(build_key("hello", "coucou", 17), "50000000/hello/coucou"); + assert_eq!(build_key("hello/", "coucou", 70), "f0000000/hello/coucou"); + } } diff --git a/quickwit/quickwit-storage/tests/s3_storage.rs b/quickwit/quickwit-storage/tests/s3_storage.rs index 07533b8c692..889115e1446 100644 --- a/quickwit/quickwit-storage/tests/s3_storage.rs +++ b/quickwit/quickwit-storage/tests/s3_storage.rs @@ -69,7 +69,7 @@ pub mod s3_storage_test_suite { S3CompatibleObjectStorage::from_uri(&s3_storage_config, &storage_uri) .await .unwrap() - .with_prefix(PathBuf::from("test-s3-compatible-storage")); + .with_prefix("test-s3-compatible-storage".to_string()); quickwit_storage::storage_test_single_part_upload(&mut object_storage) .await