From 3a9f404ea147cc4ee72f5d009673ee0d8fdbba0f Mon Sep 17 00:00:00 2001 From: Sky Fan <3374614481@qq.com> Date: Fri, 13 Dec 2024 08:50:02 +0800 Subject: [PATCH] feat: support vacuum leaked table data (#17022) * feat: support vacuum leaked table data * regen golden file --- src/query/ast/src/ast/statements/table.rs | 4 ++ src/query/ast/src/parser/statement.rs | 5 +- src/query/ast/tests/it/testdata/stmt.txt | 5 ++ .../interpreter_vacuum_drop_tables.rs | 67 +++++++++++++++++++ src/query/sql/src/planner/binder/ddl/table.rs | 1 + src/query/sql/src/planner/plans/ddl/table.rs | 1 + 6 files changed, 81 insertions(+), 2 deletions(-) diff --git a/src/query/ast/src/ast/statements/table.rs b/src/query/ast/src/ast/statements/table.rs index 57792449eebe..7efc87fa6b8f 100644 --- a/src/query/ast/src/ast/statements/table.rs +++ b/src/query/ast/src/ast/statements/table.rs @@ -774,6 +774,7 @@ pub struct VacuumDropTableOption { // Some(true) means dry run with summary option pub dry_run: Option, pub limit: Option, + pub force: bool, } impl Display for VacuumDropTableOption { @@ -787,6 +788,9 @@ impl Display for VacuumDropTableOption { if let Some(limit) = self.limit { write!(f, " LIMIT {}", limit)?; } + if self.force { + write!(f, " FORCE")?; + } Ok(()) } } diff --git a/src/query/ast/src/parser/statement.rs b/src/query/ast/src/parser/statement.rs index ad8ebaea488f..8f5aa6d31829 100644 --- a/src/query/ast/src/parser/statement.rs +++ b/src/query/ast/src/parser/statement.rs @@ -3777,11 +3777,12 @@ pub fn literal_duration(i: Input) -> IResult { pub fn vacuum_drop_table_option(i: Input) -> IResult { alt((map( rule! { - (DRY ~ ^RUN ~ SUMMARY?)? ~ (LIMIT ~ #literal_u64)? + (DRY ~ ^RUN ~ SUMMARY?)? ~ (LIMIT ~ #literal_u64)? ~ FORCE? }, - |(opt_dry_run, opt_limit)| VacuumDropTableOption { + |(opt_dry_run, opt_limit, opt_force)| VacuumDropTableOption { dry_run: opt_dry_run.map(|dry_run| dry_run.2.is_some()), limit: opt_limit.map(|(_, limit)| limit as usize), + force: opt_force.is_some(), }, ),))(i) } diff --git a/src/query/ast/tests/it/testdata/stmt.txt b/src/query/ast/tests/it/testdata/stmt.txt index 87f5b50ad42b..678365bbd6c1 100644 --- a/src/query/ast/tests/it/testdata/stmt.txt +++ b/src/query/ast/tests/it/testdata/stmt.txt @@ -13016,6 +13016,7 @@ VacuumDropTable( option: VacuumDropTableOption { dry_run: None, limit: None, + force: false, }, }, ) @@ -13035,6 +13036,7 @@ VacuumDropTable( false, ), limit: None, + force: false, }, }, ) @@ -13054,6 +13056,7 @@ VacuumDropTable( true, ), limit: None, + force: false, }, }, ) @@ -13080,6 +13083,7 @@ VacuumDropTable( option: VacuumDropTableOption { dry_run: None, limit: None, + force: false, }, }, ) @@ -13108,6 +13112,7 @@ VacuumDropTable( limit: Some( 10, ), + force: false, }, }, ) diff --git a/src/query/service/src/interpreters/interpreter_vacuum_drop_tables.rs b/src/query/service/src/interpreters/interpreter_vacuum_drop_tables.rs index 1aad2e680b70..4ca3f8000770 100644 --- a/src/query/service/src/interpreters/interpreter_vacuum_drop_tables.rs +++ b/src/query/service/src/interpreters/interpreter_vacuum_drop_tables.rs @@ -31,8 +31,10 @@ use databend_common_meta_app::schema::DroppedId; use databend_common_meta_app::schema::GcDroppedTableReq; use databend_common_meta_app::schema::ListDroppedTableReq; use databend_common_sql::plans::VacuumDropTablePlan; +use databend_common_storage::DataOperator; use databend_common_storages_view::view_table::VIEW_ENGINE; use databend_enterprise_vacuum_handler::get_vacuum_handler; +use futures_util::TryStreamExt; use log::info; use crate::interpreters::Interpreter; @@ -116,6 +118,11 @@ impl Interpreter for VacuumDropTablesInterpreter { LicenseManagerSwitch::instance() .check_enterprise_enabled(self.ctx.get_license_key(), Vacuum)?; + if self.plan.option.force { + self.vacuum_drop_tables_force().await?; + return Ok(PipelineBuildResult::create()); + } + let ctx = self.ctx.clone(); let duration = Duration::days(ctx.get_settings().get_data_retention_time_in_days()? as i64); @@ -133,6 +140,7 @@ impl Interpreter for VacuumDropTablesInterpreter { }; let tenant = self.ctx.get_tenant(); + let (tables, drop_ids) = catalog .get_drop_table_infos(ListDroppedTableReq::new4( &tenant, @@ -275,3 +283,62 @@ impl Interpreter for VacuumDropTablesInterpreter { } } } + +impl VacuumDropTablesInterpreter { + async fn vacuum_drop_tables_force(&self) -> Result<()> { + let catalog = self.ctx.get_catalog(self.plan.catalog.as_str()).await?; + let op = DataOperator::instance().operator(); + let databases = match self.plan.database.is_empty() { + true => catalog.list_databases(&self.ctx.get_tenant()).await?, + false => { + let database = catalog + .get_database(&self.ctx.get_tenant(), &self.plan.database) + .await?; + vec![database] + } + }; + + for database in databases { + if database.name() == "system" || database.name() == "information_schema" { + continue; + } + let db_id = database.get_db_info().database_id.db_id; + info!( + "vacuum drop table force from db name: {}, id: {}", + database.name(), + db_id + ); + let mut lister = op.lister_with(&db_id.to_string()).recursive(true).await?; + let mut paths = vec![]; + let mut orphan_paths = vec![]; + while let Some(entry) = lister.try_next().await? { + paths.push(entry.path().to_string()); + } + let tables_in_meta = database.list_tables_history().await?; + let table_ids_in_meta = tables_in_meta + .iter() + .map(|t| t.get_id()) + .collect::>(); + for path in paths { + let Some(table_id) = path.split('/').nth(1) else { + info!("can not parse table id from path: {}", path); + continue; + }; + let Some(table_id) = table_id.parse::().ok() else { + info!("can not parse table id from path: {}", path); + continue; + }; + if !table_ids_in_meta.contains(&table_id) { + orphan_paths.push(path); + } + } + info!( + "orphan_paths summary: {:?}", + orphan_paths.iter().take(100).collect::>() + ); + op.remove(orphan_paths).await?; + } + + Ok(()) + } +} diff --git a/src/query/sql/src/planner/binder/ddl/table.rs b/src/query/sql/src/planner/binder/ddl/table.rs index 656b25e74e68..9d5782480e77 100644 --- a/src/query/sql/src/planner/binder/ddl/table.rs +++ b/src/query/sql/src/planner/binder/ddl/table.rs @@ -1271,6 +1271,7 @@ impl Binder { VacuumDropTableOption { dry_run: option.dry_run, limit: option.limit, + force: option.force, } }; Ok(Plan::VacuumDropTable(Box::new(VacuumDropTablePlan { diff --git a/src/query/sql/src/planner/plans/ddl/table.rs b/src/query/sql/src/planner/plans/ddl/table.rs index 6a1739741a59..9a45f098d3b2 100644 --- a/src/query/sql/src/planner/plans/ddl/table.rs +++ b/src/query/sql/src/planner/plans/ddl/table.rs @@ -188,6 +188,7 @@ pub struct VacuumDropTableOption { // Some(true) means dry run with summary option pub dry_run: Option, pub limit: Option, + pub force: bool, } #[derive(Debug, Clone)]