Skip to content

Commit

Permalink
chore: make VACUUM TEMPORARY FILES killable (#16751)
Browse files Browse the repository at this point in the history
* refactor: move `AbortChecker` into databend-common-catalog

* make `VACUUM TEMPORARY FILES` killable
  • Loading branch information
dantengsky authored Nov 4, 2024
1 parent 694bf74 commit 67c5cce
Show file tree
Hide file tree
Showing 15 changed files with 79 additions and 26 deletions.
1 change: 1 addition & 0 deletions src/common/io/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ pub mod geography;
pub mod geometry;
mod position;
mod stat_buffer;

pub mod wkb;

pub use bitmap::deserialize_bitmap;
Expand Down
2 changes: 1 addition & 1 deletion src/query/catalog/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use chrono::DateTime;
use chrono::Utc;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_expression::AbortChecker;
use databend_common_expression::BlockThresholds;
use databend_common_expression::ColumnId;
use databend_common_expression::RemoteExpr;
Expand Down Expand Up @@ -53,6 +52,7 @@ use crate::plan::ReclusterParts;
use crate::plan::StreamColumn;
use crate::statistics::BasicColumnStatistics;
use crate::table_args::TableArgs;
use crate::table_context::AbortChecker;
use crate::table_context::TableContext;

#[async_trait::async_trait]
Expand Down
12 changes: 6 additions & 6 deletions src/query/catalog/src/table_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,7 @@ use databend_common_base::base::ProgressValues;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_exception::ResultExt;
use databend_common_expression::AbortChecker;
use databend_common_expression::BlockThresholds;
use databend_common_expression::CheckAbort;
use databend_common_expression::DataBlock;
use databend_common_expression::Expr;
use databend_common_expression::FunctionContext;
Expand Down Expand Up @@ -206,10 +204,6 @@ pub trait TableContext: Send + Sync {
this: S,
}
impl<S: TableContext + ?Sized> CheckAbort for Checker<Arc<S>> {
fn is_aborting(&self) -> bool {
self.this.as_ref().check_aborting().is_err()
}

fn try_check_aborting(&self) -> Result<()> {
self.this.check_aborting().with_context(|| "query aborted")
}
Expand Down Expand Up @@ -385,3 +379,9 @@ pub trait TableContext: Send + Sync {
fn is_temp_table(&self, catalog_name: &str, database_name: &str, table_name: &str) -> bool;
fn get_shared_settings(&self) -> Arc<Settings>;
}

pub type AbortChecker = Arc<dyn CheckAbort + Send + Sync>;

pub trait CheckAbort {
fn try_check_aborting(&self) -> Result<()>;
}
4 changes: 3 additions & 1 deletion src/query/ee/src/storages/fuse/operations/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use chrono::DateTime;
use chrono::Utc;
use databend_common_base::base::GlobalInstance;
use databend_common_catalog::table::Table;
use databend_common_catalog::table_context::AbortChecker;
use databend_common_catalog::table_context::TableContext;
use databend_common_exception::Result;
use databend_common_storages_fuse::FuseTable;
Expand Down Expand Up @@ -54,11 +55,12 @@ impl VacuumHandler for RealVacuumHandler {

async fn do_vacuum_temporary_files(
&self,
abort_checker: AbortChecker,
temporary_dir: String,
retain: Option<Duration>,
vacuum_limit: usize,
) -> Result<usize> {
do_vacuum_temporary_files(temporary_dir, retain, vacuum_limit).await
do_vacuum_temporary_files(abort_checker, temporary_dir, retain, vacuum_limit).await
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::time::Instant;
use std::time::SystemTime;
use std::time::UNIX_EPOCH;

use databend_common_catalog::table_context::AbortChecker;
use databend_common_exception::Result;
use databend_common_storage::DataOperator;
use futures_util::stream;
Expand All @@ -31,6 +32,7 @@ const DEFAULT_RETAIN_DURATION: Duration = Duration::from_secs(60 * 60 * 24 * 3);

#[async_backtrace::framed]
pub async fn do_vacuum_temporary_files(
abort_checker: AbortChecker,
temporary_dir: String,
retain: Option<Duration>,
limit: usize,
Expand Down Expand Up @@ -66,6 +68,7 @@ pub async fn do_vacuum_temporary_files(
let mut batch_size = 0;

while let Some(de) = ds.try_next().await? {
abort_checker.try_check_aborting()?;
if de.path() == temporary_dir {
continue;
}
Expand All @@ -81,6 +84,7 @@ pub async fn do_vacuum_temporary_files(
};

vacuum_finished_query(
&abort_checker,
start_time,
&mut removed_temp_files,
&mut total_cleaned_size,
Expand Down Expand Up @@ -159,6 +163,7 @@ pub async fn do_vacuum_temporary_files(
}

async fn vacuum_finished_query(
abort_checker: &AbortChecker,
total_instant: Instant,
removed_temp_files: &mut usize,
total_cleaned_size: &mut usize,
Expand All @@ -184,6 +189,7 @@ async fn vacuum_finished_query(
let mut remove_temp_files_path = Vec::with_capacity(1001);

while let Some(de) = ds.try_next().await? {
abort_checker.try_check_aborting()?;
if de.path() == parent.path() {
continue;
}
Expand Down
53 changes: 50 additions & 3 deletions src/query/ee/tests/it/storages/fuse/operations/vacuum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ use std::sync::Arc;
use std::time::Duration;

use databend_common_base::base::tokio;
use databend_common_catalog::table_context::CheckAbort;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_meta_app::schema::TableInfo;
use databend_common_meta_app::schema::TableMeta;
Expand Down Expand Up @@ -125,8 +127,41 @@ async fn test_do_vacuum_temporary_files() -> Result<()> {
let size = operator.list_with("test_dir/").recursive(true).await?.len();
assert!((3..=4).contains(&size));

struct NoAbort;
struct AbortRightNow;
impl CheckAbort for NoAbort {
fn try_check_aborting(&self) -> Result<()> {
Ok(())
}
}

impl CheckAbort for AbortRightNow {
fn try_check_aborting(&self) -> Result<()> {
Err(ErrorCode::AbortedQuery(""))
}
}

// check abort

let r = do_vacuum_temporary_files(
Arc::new(AbortRightNow),
"test_dir/".to_string(),
Some(Duration::from_secs(2)),
1,
)
.await;

assert!(r.is_err_and(|e| e.code() == ErrorCode::ABORTED_QUERY));

let no_abort = Arc::new(NoAbort);
tokio::time::sleep(Duration::from_secs(2)).await;
do_vacuum_temporary_files("test_dir/".to_string(), Some(Duration::from_secs(2)), 1).await?;
do_vacuum_temporary_files(
no_abort.clone(),
"test_dir/".to_string(),
Some(Duration::from_secs(2)),
1,
)
.await?;

let size = operator.list("test_dir/").await?.len();
assert!((2..=3).contains(&size));
Expand All @@ -137,12 +172,24 @@ async fn test_do_vacuum_temporary_files() -> Result<()> {
.write("test_dir/test5/finished", vec![1, 2])
.await?;

do_vacuum_temporary_files("test_dir/".to_string(), Some(Duration::from_secs(2)), 2).await?;
do_vacuum_temporary_files(
no_abort.clone(),
"test_dir/".to_string(),
Some(Duration::from_secs(2)),
2,
)
.await?;
let size = operator.list("test_dir/").await?.len();
assert!((2..=3).contains(&size));

tokio::time::sleep(Duration::from_secs(3)).await;
do_vacuum_temporary_files("test_dir/".to_string(), Some(Duration::from_secs(3)), 1000).await?;
do_vacuum_temporary_files(
no_abort.clone(),
"test_dir/".to_string(),
Some(Duration::from_secs(3)),
1000,
)
.await?;

dbg!(operator.list_with("test_dir/").await?);

Expand Down
5 changes: 4 additions & 1 deletion src/query/ee_features/vacuum_handler/src/vacuum_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use chrono::DateTime;
use chrono::Utc;
use databend_common_base::base::GlobalInstance;
use databend_common_catalog::table::Table;
use databend_common_catalog::table_context::AbortChecker;
use databend_common_catalog::table_context::TableContext;
use databend_common_exception::Result;
use databend_common_storages_fuse::FuseTable;
Expand Down Expand Up @@ -48,6 +49,7 @@ pub trait VacuumHandler: Sync + Send {

async fn do_vacuum_temporary_files(
&self,
abort_checker: AbortChecker,
temporary_dir: String,
retain: Option<Duration>,
vacuum_limit: usize,
Expand Down Expand Up @@ -91,12 +93,13 @@ impl VacuumHandlerWrapper {
#[async_backtrace::framed]
pub async fn do_vacuum_temporary_files(
&self,
abort_checker: AbortChecker,
temporary_dir: String,
retain: Option<Duration>,
vacuum_limit: usize,
) -> Result<usize> {
self.handler
.do_vacuum_temporary_files(temporary_dir, retain, vacuum_limit)
.do_vacuum_temporary_files(abort_checker, temporary_dir, retain, vacuum_limit)
.await
}
}
Expand Down
9 changes: 0 additions & 9 deletions src/query/expression/src/kernels/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use databend_common_exception::Result;

use crate::types::DataType;
Expand All @@ -24,13 +22,6 @@ use crate::Scalar;
use crate::SortCompare;
use crate::Value;

pub type AbortChecker = Arc<dyn CheckAbort + Send + Sync>;

pub trait CheckAbort {
fn is_aborting(&self) -> bool;
fn try_check_aborting(&self) -> Result<()>;
}

#[derive(Clone)]
pub struct SortColumnDescription {
pub offset: usize,
Expand Down
2 changes: 2 additions & 0 deletions src/query/service/src/interpreters/hook/vacuum_hook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,11 @@ pub fn hook_vacuum_temp_files(query_ctx: &Arc<QueryContext>) -> Result<()> {
{
let handler = get_vacuum_handler();

let abort_checker = query_ctx.clone().get_abort_checker();
let _ = GlobalIORuntime::instance().block_on(async move {
let removed_files = handler
.do_vacuum_temporary_files(
abort_checker,
spill_prefix.clone(),
Some(Duration::from_secs(0)),
vacuum_limit as usize,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ impl Interpreter for VacuumTemporaryFilesInterpreter {
let temporary_files_prefix = query_spill_prefix(self.ctx.get_tenant().tenant_name(), "");
let removed_files = handler
.do_vacuum_temporary_files(
self.ctx.clone().get_abort_checker(),
temporary_files_prefix,
self.plan.retain,
self.plan.limit.map(|x| x as usize).unwrap_or(usize::MAX),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ async fn test_fuse_navigate() -> Result<()> {
ctx.get_current_session()
.force_kill_query(ErrorCode::AbortedQuery("mission aborted"));
let checker = ctx.clone().get_abort_checker();
assert!(checker.is_aborting());
assert!(checker.try_check_aborting().is_err());
let res = fuse_table
.navigate_to_time_point(loc, instant, ctx.get_abort_checker())
.await;
Expand Down
2 changes: 1 addition & 1 deletion src/query/sql/src/planner/binder/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,14 @@ use databend_common_catalog::catalog_kind::CATALOG_DEFAULT;
use databend_common_catalog::table::NavigationPoint;
use databend_common_catalog::table::Table;
use databend_common_catalog::table::TimeNavigation;
use databend_common_catalog::table_context::AbortChecker;
use databend_common_catalog::table_context::TableContext;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_expression::is_stream_column;
use databend_common_expression::type_check::check_number;
use databend_common_expression::types::DataType;
use databend_common_expression::types::NumberDataType;
use databend_common_expression::AbortChecker;
use databend_common_expression::ConstantFolder;
use databend_common_expression::DataField;
use databend_common_expression::FunctionContext;
Expand Down
2 changes: 1 addition & 1 deletion src/query/storages/fuse/src/fuse_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@ use databend_common_catalog::table::ColumnStatisticsProvider;
use databend_common_catalog::table::CompactionLimits;
use databend_common_catalog::table::NavigationDescriptor;
use databend_common_catalog::table::TimeNavigation;
use databend_common_catalog::table_context::AbortChecker;
use databend_common_catalog::table_context::TableContext;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_expression::types::DataType;
use databend_common_expression::AbortChecker;
use databend_common_expression::BlockThresholds;
use databend_common_expression::ColumnId;
use databend_common_expression::RemoteExpr;
Expand Down
2 changes: 1 addition & 1 deletion src/query/storages/fuse/src/operations/changes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@ use databend_common_catalog::plan::StreamTablePart;
use databend_common_catalog::table::NavigationPoint;
use databend_common_catalog::table::Table;
use databend_common_catalog::table::TableStatistics;
use databend_common_catalog::table_context::AbortChecker;
use databend_common_catalog::table_context::TableContext;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_expression::types::decimal::Decimal128Type;
use databend_common_expression::AbortChecker;
use databend_common_expression::FromData;
use databend_common_expression::RemoteExpr;
use databend_common_expression::Scalar;
Expand Down
2 changes: 1 addition & 1 deletion src/query/storages/fuse/src/operations/navigate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ use std::sync::Arc;
use chrono::DateTime;
use chrono::Utc;
use databend_common_catalog::table::NavigationPoint;
use databend_common_catalog::table_context::AbortChecker;
use databend_common_catalog::table_context::TableContext;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_exception::ResultExt;
use databend_common_expression::AbortChecker;
use databend_common_meta_app::schema::TableInfo;
use databend_common_meta_app::schema::TableStatistics;
use databend_storages_common_cache::LoadParams;
Expand Down

0 comments on commit 67c5cce

Please sign in to comment.