diff --git a/quickwit/quickwit-storage/src/error.rs b/quickwit/quickwit-storage/src/error.rs index 482130b8879..9a6fbf5c1e6 100644 --- a/quickwit/quickwit-storage/src/error.rs +++ b/quickwit/quickwit-storage/src/error.rs @@ -193,7 +193,7 @@ impl fmt::Display for BulkDeleteError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!( f, - "bulk delete error ({} success(es), {} failure(s), {} unattempted)", + "bulk delete error ({} success(es), {} failure(s), {} unattempted)", self.successes.len(), self.failures.len(), self.unattempted.len() diff --git a/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs b/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs index b19c18db57d..1a967dcee25 100644 --- a/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs +++ b/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs @@ -21,19 +21,20 @@ use std::collections::HashMap; use std::ops::Range; use std::path::{Path, PathBuf}; use std::pin::Pin; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; use std::task::{Context, Poll}; use std::{fmt, io}; -use anyhow::{anyhow, Context as AnyhhowContext}; +use anyhow::anyhow; use async_trait::async_trait; use aws_credential_types::provider::SharedCredentialsProvider; use aws_sdk_s3::config::{BehaviorVersion, Credentials, Region}; use aws_sdk_s3::error::{ProvideErrorMetadata, SdkError}; -use aws_sdk_s3::operation::delete_objects::DeleteObjectsOutput; use aws_sdk_s3::operation::get_object::{GetObjectError, GetObjectOutput}; use aws_sdk_s3::primitives::ByteStream; use aws_sdk_s3::types::builders::ObjectIdentifierBuilder; -use aws_sdk_s3::types::{CompletedMultipartUpload, CompletedPart, Delete, ObjectIdentifier}; +use aws_sdk_s3::types::{CompletedMultipartUpload, CompletedPart, Delete}; use aws_sdk_s3::Client as S3Client; use base64::prelude::{Engine, BASE64_STANDARD}; use futures::{stream, StreamExt}; @@ -380,35 +381,35 @@ impl S3CompatibleObjectStorage { fn build_delete_batch_requests<'a>( &self, delete_paths: &'a [&'a Path], - ) -> anyhow::Result> { + ) -> Vec<(Vec, Delete)> { #[cfg(test)] const MAX_NUM_KEYS: usize = 3; #[cfg(not(test))] const MAX_NUM_KEYS: usize = 1_000; - let path_chunks = delete_paths.chunks(MAX_NUM_KEYS); - let num_delete_requests = path_chunks.len(); - let mut delete_requests: Vec<(&[&Path], Delete)> = Vec::with_capacity(num_delete_requests); - - for path_chunk in path_chunks { - let object_ids: Vec = path_chunk - .iter() - .map(|path| { - let key = self.key(path); - ObjectIdentifierBuilder::default() - .key(key) - .build() - .context("failed to build object identifier") - }) - .collect::>()?; - let delete = Delete::builder() - .set_objects(Some(object_ids)) - .build() - .context("failed to build delete request")?; - delete_requests.push((path_chunk, delete)); - } - Ok(delete_requests) + delete_paths + .chunks(MAX_NUM_KEYS) + .map(|path_chunk| { + let owned_path_chunk: Vec = + path_chunk.iter().map(|path| path.to_path_buf()).collect(); + let object_ids = path_chunk + .iter() + .map(|path| { + let key = self.key(path); + ObjectIdentifierBuilder::default() + .key(key) + .build() + .expect("`key` should bet set") + }) + .collect(); + let delete = Delete::builder() + .set_objects(Some(object_ids)) + .build() + .expect("`objects` should be set"); + (owned_path_chunk, delete) + }) + .collect() } async fn upload_part<'a>( @@ -617,30 +618,21 @@ impl S3CompatibleObjectStorage { /// Bulk delete implementation based on the DeleteObjects API, also called Multi-Object Delete /// API: async fn bulk_delete_multi<'a>(&self, paths: &[&'a Path]) -> Result<(), BulkDeleteError> { - let _permit = REQUEST_SEMAPHORE.acquire().await; + let cancellation_token = CancellationToken::default(); + let delete_requests: Vec<(Vec, Delete)> = self.build_delete_batch_requests(paths); - let delete_requests: Vec<(&[&Path], Delete)> = self - .build_delete_batch_requests(paths) - .map_err(|error: anyhow::Error| { - let unattempted = paths.iter().copied().map(Path::to_path_buf).collect(); - BulkDeleteError { - error: Some(StorageErrorKind::Internal.with_error(error)), - successes: Default::default(), - failures: Default::default(), - unattempted, - } - })?; + let delete_objects_futures = delete_requests.into_iter().map(|(path_chunk, delete)| { + let bucket = self.bucket.clone(); + let retry_params = self.retry_params; + let cancellation_token = cancellation_token.clone(); - let mut error = None; - let mut successes = Vec::with_capacity(paths.len()); - let mut failures = HashMap::new(); - let mut unattempted = Vec::new(); - - let mut delete_requests_it = delete_requests.iter(); + async move { + let _permit = REQUEST_SEMAPHORE.acquire().await; - for (path_chunk, delete) in &mut delete_requests_it { - let delete_objects_res: StorageResult = - aws_retry(&self.retry_params, || async { + if cancellation_token.is_cancelled() { + return DeleteObjectsOutcome::Cancelled(path_chunk); + } + let delete_objects_result = aws_retry(&retry_params, || async { crate::STORAGE_METRICS .object_storage_bulk_delete_requests_total .inc(); @@ -649,16 +641,29 @@ impl S3CompatibleObjectStorage { .start_timer(); self.s3_client .delete_objects() - .bucket(self.bucket.clone()) + .bucket(bucket.clone()) .delete(delete.clone()) .send() .await }) - .await - .map_err(Into::into); - - match delete_objects_res { - Ok(delete_objects_output) => { + .await; + if delete_objects_result.is_err() { + cancellation_token.cancel(); + } + DeleteObjectsOutcome::Completed(delete_objects_result, path_chunk) + } + }); + let mut delete_objects_outcome_stream = + futures::stream::iter(delete_objects_futures).buffer_unordered(10); + + let mut error: Option = None; + let mut successes: Vec = Vec::with_capacity(paths.len()); + let mut failures: HashMap = HashMap::new(); + let mut unattempted: Vec = Vec::new(); + + while let Some(delete_objects_outcome) = delete_objects_outcome_stream.next().await { + match delete_objects_outcome { + DeleteObjectsOutcome::Completed(Ok(delete_objects_output), _) => { if let Some(deleted_objects) = delete_objects_output.deleted { for deleted_object in deleted_objects { if let Some(key) = deleted_object.key { @@ -688,23 +693,16 @@ impl S3CompatibleObjectStorage { } } } - Err(delete_objects_error) => { - error = Some(delete_objects_error); - unattempted.extend(path_chunk.iter().copied().map(PathBuf::from)); - break; + DeleteObjectsOutcome::Completed(Err(delete_objects_error), paths) => { + error = Some(delete_objects_error.into()); + unattempted.extend(paths) } + DeleteObjectsOutcome::Cancelled(paths) => unattempted.extend(paths), } } - if error.is_none() && failures.is_empty() { return Ok(()); } - - // Do we have remaining requests? - for (path_chunk, _) in delete_requests_it { - unattempted.extend(path_chunk.iter().copied().map(PathBuf::from)); - } - Err(BulkDeleteError { error, successes, @@ -880,6 +878,24 @@ impl Storage for S3CompatibleObjectStorage { } } +enum DeleteObjectsOutcome { + Completed(T, Vec), + Cancelled(Vec), +} + +#[derive(Clone, Debug, Default)] +struct CancellationToken(Arc); + +impl CancellationToken { + fn cancel(&self) { + self.0.store(true, Ordering::Release); + } + + fn is_cancelled(&self) -> bool { + self.0.load(Ordering::Acquire) + } +} + #[cfg(test)] mod tests {