Skip to content

Commit

Permalink
feat: support vacuum leaked table data (#17022)
Browse files Browse the repository at this point in the history
* feat: support vacuum leaked table data

* regen golden file
  • Loading branch information
SkyFan2002 authored Dec 13, 2024
1 parent 811c639 commit 3a9f404
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 2 deletions.
4 changes: 4 additions & 0 deletions src/query/ast/src/ast/statements/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -774,6 +774,7 @@ pub struct VacuumDropTableOption {
// Some(true) means dry run with summary option
pub dry_run: Option<bool>,
pub limit: Option<usize>,
pub force: bool,
}

impl Display for VacuumDropTableOption {
Expand All @@ -787,6 +788,9 @@ impl Display for VacuumDropTableOption {
if let Some(limit) = self.limit {
write!(f, " LIMIT {}", limit)?;
}
if self.force {
write!(f, " FORCE")?;
}
Ok(())
}
}
Expand Down
5 changes: 3 additions & 2 deletions src/query/ast/src/parser/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3777,11 +3777,12 @@ pub fn literal_duration(i: Input) -> IResult<Duration> {
pub fn vacuum_drop_table_option(i: Input) -> IResult<VacuumDropTableOption> {
alt((map(
rule! {
(DRY ~ ^RUN ~ SUMMARY?)? ~ (LIMIT ~ #literal_u64)?
(DRY ~ ^RUN ~ SUMMARY?)? ~ (LIMIT ~ #literal_u64)? ~ FORCE?
},
|(opt_dry_run, opt_limit)| VacuumDropTableOption {
|(opt_dry_run, opt_limit, opt_force)| VacuumDropTableOption {
dry_run: opt_dry_run.map(|dry_run| dry_run.2.is_some()),
limit: opt_limit.map(|(_, limit)| limit as usize),
force: opt_force.is_some(),
},
),))(i)
}
Expand Down
5 changes: 5 additions & 0 deletions src/query/ast/tests/it/testdata/stmt.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13016,6 +13016,7 @@ VacuumDropTable(
option: VacuumDropTableOption {
dry_run: None,
limit: None,
force: false,
},
},
)
Expand All @@ -13035,6 +13036,7 @@ VacuumDropTable(
false,
),
limit: None,
force: false,
},
},
)
Expand All @@ -13054,6 +13056,7 @@ VacuumDropTable(
true,
),
limit: None,
force: false,
},
},
)
Expand All @@ -13080,6 +13083,7 @@ VacuumDropTable(
option: VacuumDropTableOption {
dry_run: None,
limit: None,
force: false,
},
},
)
Expand Down Expand Up @@ -13108,6 +13112,7 @@ VacuumDropTable(
limit: Some(
10,
),
force: false,
},
},
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@ use databend_common_meta_app::schema::DroppedId;
use databend_common_meta_app::schema::GcDroppedTableReq;
use databend_common_meta_app::schema::ListDroppedTableReq;
use databend_common_sql::plans::VacuumDropTablePlan;
use databend_common_storage::DataOperator;
use databend_common_storages_view::view_table::VIEW_ENGINE;
use databend_enterprise_vacuum_handler::get_vacuum_handler;
use futures_util::TryStreamExt;
use log::info;

use crate::interpreters::Interpreter;
Expand Down Expand Up @@ -116,6 +118,11 @@ impl Interpreter for VacuumDropTablesInterpreter {
LicenseManagerSwitch::instance()
.check_enterprise_enabled(self.ctx.get_license_key(), Vacuum)?;

if self.plan.option.force {
self.vacuum_drop_tables_force().await?;
return Ok(PipelineBuildResult::create());
}

let ctx = self.ctx.clone();
let duration = Duration::days(ctx.get_settings().get_data_retention_time_in_days()? as i64);

Expand All @@ -133,6 +140,7 @@ impl Interpreter for VacuumDropTablesInterpreter {
};

let tenant = self.ctx.get_tenant();

let (tables, drop_ids) = catalog
.get_drop_table_infos(ListDroppedTableReq::new4(
&tenant,
Expand Down Expand Up @@ -275,3 +283,62 @@ impl Interpreter for VacuumDropTablesInterpreter {
}
}
}

impl VacuumDropTablesInterpreter {
async fn vacuum_drop_tables_force(&self) -> Result<()> {
let catalog = self.ctx.get_catalog(self.plan.catalog.as_str()).await?;
let op = DataOperator::instance().operator();
let databases = match self.plan.database.is_empty() {
true => catalog.list_databases(&self.ctx.get_tenant()).await?,
false => {
let database = catalog
.get_database(&self.ctx.get_tenant(), &self.plan.database)
.await?;
vec![database]
}
};

for database in databases {
if database.name() == "system" || database.name() == "information_schema" {
continue;
}
let db_id = database.get_db_info().database_id.db_id;
info!(
"vacuum drop table force from db name: {}, id: {}",
database.name(),
db_id
);
let mut lister = op.lister_with(&db_id.to_string()).recursive(true).await?;
let mut paths = vec![];
let mut orphan_paths = vec![];
while let Some(entry) = lister.try_next().await? {
paths.push(entry.path().to_string());
}
let tables_in_meta = database.list_tables_history().await?;
let table_ids_in_meta = tables_in_meta
.iter()
.map(|t| t.get_id())
.collect::<HashSet<_>>();
for path in paths {
let Some(table_id) = path.split('/').nth(1) else {
info!("can not parse table id from path: {}", path);
continue;
};
let Some(table_id) = table_id.parse::<u64>().ok() else {
info!("can not parse table id from path: {}", path);
continue;
};
if !table_ids_in_meta.contains(&table_id) {
orphan_paths.push(path);
}
}
info!(
"orphan_paths summary: {:?}",
orphan_paths.iter().take(100).collect::<Vec<_>>()
);
op.remove(orphan_paths).await?;
}

Ok(())
}
}
1 change: 1 addition & 0 deletions src/query/sql/src/planner/binder/ddl/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1271,6 +1271,7 @@ impl Binder {
VacuumDropTableOption {
dry_run: option.dry_run,
limit: option.limit,
force: option.force,
}
};
Ok(Plan::VacuumDropTable(Box::new(VacuumDropTablePlan {
Expand Down
1 change: 1 addition & 0 deletions src/query/sql/src/planner/plans/ddl/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ pub struct VacuumDropTableOption {
// Some(true) means dry run with summary option
pub dry_run: Option<bool>,
pub limit: Option<usize>,
pub force: bool,
}

#[derive(Debug, Clone)]
Expand Down

0 comments on commit 3a9f404

Please sign in to comment.