Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallelize S3 delete objects requests #5535

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion quickwit/quickwit-storage/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ impl fmt::Display for BulkDeleteError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"bulk delete error ({} success(es), {} failure(s), {} unattempted)",
"bulk delete error ({} success(es), {} failure(s), {} unattempted)",
self.successes.len(),
self.failures.len(),
self.unattempted.len()
Expand Down
144 changes: 80 additions & 64 deletions quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,20 @@ use std::collections::HashMap;
use std::ops::Range;
use std::path::{Path, PathBuf};
use std::pin::Pin;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::task::{Context, Poll};
use std::{fmt, io};

use anyhow::{anyhow, Context as AnyhhowContext};
use anyhow::anyhow;
use async_trait::async_trait;
use aws_credential_types::provider::SharedCredentialsProvider;
use aws_sdk_s3::config::{BehaviorVersion, Credentials, Region};
use aws_sdk_s3::error::{ProvideErrorMetadata, SdkError};
use aws_sdk_s3::operation::delete_objects::DeleteObjectsOutput;
use aws_sdk_s3::operation::get_object::{GetObjectError, GetObjectOutput};
use aws_sdk_s3::primitives::ByteStream;
use aws_sdk_s3::types::builders::ObjectIdentifierBuilder;
use aws_sdk_s3::types::{CompletedMultipartUpload, CompletedPart, Delete, ObjectIdentifier};
use aws_sdk_s3::types::{CompletedMultipartUpload, CompletedPart, Delete};
use aws_sdk_s3::Client as S3Client;
use base64::prelude::{Engine, BASE64_STANDARD};
use futures::{stream, StreamExt};
Expand Down Expand Up @@ -380,35 +381,35 @@ impl S3CompatibleObjectStorage {
fn build_delete_batch_requests<'a>(
&self,
delete_paths: &'a [&'a Path],
) -> anyhow::Result<Vec<(&'a [&'a Path], Delete)>> {
) -> Vec<(Vec<PathBuf>, Delete)> {
#[cfg(test)]
const MAX_NUM_KEYS: usize = 3;

#[cfg(not(test))]
const MAX_NUM_KEYS: usize = 1_000;

let path_chunks = delete_paths.chunks(MAX_NUM_KEYS);
let num_delete_requests = path_chunks.len();
let mut delete_requests: Vec<(&[&Path], Delete)> = Vec::with_capacity(num_delete_requests);

for path_chunk in path_chunks {
let object_ids: Vec<ObjectIdentifier> = path_chunk
.iter()
.map(|path| {
let key = self.key(path);
ObjectIdentifierBuilder::default()
.key(key)
.build()
.context("failed to build object identifier")
})
.collect::<anyhow::Result<_>>()?;
let delete = Delete::builder()
.set_objects(Some(object_ids))
.build()
.context("failed to build delete request")?;
delete_requests.push((path_chunk, delete));
}
Ok(delete_requests)
delete_paths
.chunks(MAX_NUM_KEYS)
.map(|path_chunk| {
let owned_path_chunk: Vec<PathBuf> =
path_chunk.iter().map(|path| path.to_path_buf()).collect();
let object_ids = path_chunk
.iter()
.map(|path| {
let key = self.key(path);
ObjectIdentifierBuilder::default()
.key(key)
.build()
.expect("`key` should bet set")
})
.collect();
let delete = Delete::builder()
.set_objects(Some(object_ids))
.build()
.expect("`objects` should be set");
(owned_path_chunk, delete)
})
.collect()
}

async fn upload_part<'a>(
Expand Down Expand Up @@ -617,30 +618,21 @@ impl S3CompatibleObjectStorage {
/// Bulk delete implementation based on the DeleteObjects API, also called Multi-Object Delete
/// API: <https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html>
async fn bulk_delete_multi<'a>(&self, paths: &[&'a Path]) -> Result<(), BulkDeleteError> {
let _permit = REQUEST_SEMAPHORE.acquire().await;
let cancellation_token = CancellationToken::default();
let delete_requests: Vec<(Vec<PathBuf>, Delete)> = self.build_delete_batch_requests(paths);

let delete_requests: Vec<(&[&Path], Delete)> = self
.build_delete_batch_requests(paths)
.map_err(|error: anyhow::Error| {
let unattempted = paths.iter().copied().map(Path::to_path_buf).collect();
BulkDeleteError {
error: Some(StorageErrorKind::Internal.with_error(error)),
successes: Default::default(),
failures: Default::default(),
unattempted,
}
})?;
let delete_objects_futures = delete_requests.into_iter().map(|(path_chunk, delete)| {
let bucket = self.bucket.clone();
let retry_params = self.retry_params;
let cancellation_token = cancellation_token.clone();
Comment on lines +625 to +627
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because of how this code is written (structured concurrency vs the more usual tokio::spawn), bucket and cancellation_token need not be cloned, refs are enough (and CancellationToken could be made to not have an inner Arc)

i don't know if it should be done that way, but it certainly can


let mut error = None;
let mut successes = Vec::with_capacity(paths.len());
let mut failures = HashMap::new();
let mut unattempted = Vec::new();

let mut delete_requests_it = delete_requests.iter();
async move {
let _permit = REQUEST_SEMAPHORE.acquire().await;

for (path_chunk, delete) in &mut delete_requests_it {
let delete_objects_res: StorageResult<DeleteObjectsOutput> =
aws_retry(&self.retry_params, || async {
if cancellation_token.is_cancelled() {
return DeleteObjectsOutcome::Cancelled(path_chunk);
}
let delete_objects_result = aws_retry(&retry_params, || async {
crate::STORAGE_METRICS
.object_storage_bulk_delete_requests_total
.inc();
Expand All @@ -649,16 +641,29 @@ impl S3CompatibleObjectStorage {
.start_timer();
self.s3_client
.delete_objects()
.bucket(self.bucket.clone())
.bucket(bucket.clone())
.delete(delete.clone())
.send()
.await
})
.await
.map_err(Into::into);

match delete_objects_res {
Ok(delete_objects_output) => {
.await;
if delete_objects_result.is_err() {
cancellation_token.cancel();
}
DeleteObjectsOutcome::Completed(delete_objects_result, path_chunk)
}
});
let mut delete_objects_outcome_stream =
futures::stream::iter(delete_objects_futures).buffer_unordered(10);

let mut error: Option<StorageError> = None;
let mut successes: Vec<PathBuf> = Vec::with_capacity(paths.len());
let mut failures: HashMap<PathBuf, DeleteFailure> = HashMap::new();
let mut unattempted: Vec<PathBuf> = Vec::new();

while let Some(delete_objects_outcome) = delete_objects_outcome_stream.next().await {
match delete_objects_outcome {
DeleteObjectsOutcome::Completed(Ok(delete_objects_output), _) => {
if let Some(deleted_objects) = delete_objects_output.deleted {
for deleted_object in deleted_objects {
if let Some(key) = deleted_object.key {
Expand Down Expand Up @@ -688,23 +693,16 @@ impl S3CompatibleObjectStorage {
}
}
}
Err(delete_objects_error) => {
error = Some(delete_objects_error);
unattempted.extend(path_chunk.iter().copied().map(PathBuf::from));
break;
DeleteObjectsOutcome::Completed(Err(delete_objects_error), paths) => {
error = Some(delete_objects_error.into());
unattempted.extend(paths)
}
DeleteObjectsOutcome::Cancelled(paths) => unattempted.extend(paths),
}
}

if error.is_none() && failures.is_empty() {
return Ok(());
}

// Do we have remaining requests?
for (path_chunk, _) in delete_requests_it {
unattempted.extend(path_chunk.iter().copied().map(PathBuf::from));
}

Err(BulkDeleteError {
error,
successes,
Expand Down Expand Up @@ -880,6 +878,24 @@ impl Storage for S3CompatibleObjectStorage {
}
}

enum DeleteObjectsOutcome<T> {
Completed(T, Vec<PathBuf>),
Cancelled(Vec<PathBuf>),
}

#[derive(Clone, Debug, Default)]
struct CancellationToken(Arc<AtomicBool>);

impl CancellationToken {
fn cancel(&self) {
self.0.store(true, Ordering::Release);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this access can be Relaxed, we're only interested in how the memory address changes over time, not to how its changes relate to changes in the rest of the memory of our program

}

fn is_cancelled(&self) -> bool {
self.0.load(Ordering::Acquire)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same wrt Relaxed

}
}

#[cfg(test)]
mod tests {

Expand Down
Loading