diff --git a/src/query/service/src/interpreters/interpreter_delete.rs b/src/query/service/src/interpreters/interpreter_delete.rs index 32eab48b99425..7e1ae829d5b18 100644 --- a/src/query/service/src/interpreters/interpreter_delete.rs +++ b/src/query/service/src/interpreters/interpreter_delete.rs @@ -68,7 +68,6 @@ use crate::interpreters::common::create_push_down_filters; use crate::interpreters::HookOperator; use crate::interpreters::Interpreter; use crate::interpreters::SelectInterpreter; -use crate::locks::LockManager; use crate::pipelines::executor::ExecutorSettings; use crate::pipelines::executor::PipelinePullingExecutor; use crate::pipelines::PipelineBuildResult; @@ -111,21 +110,21 @@ impl Interpreter for DeleteInterpreter { let is_distributed = !self.ctx.get_cluster().is_empty(); let catalog_name = self.plan.catalog_name.as_str(); - let catalog = self.ctx.get_catalog(catalog_name).await?; - let catalog_info = catalog.info(); - let db_name = self.plan.database_name.as_str(); let tbl_name = self.plan.table_name.as_str(); - let tbl = catalog - .get_table(&self.ctx.get_tenant(), db_name, tbl_name) - .await?; // Add table lock. - let table_lock = LockManager::create_table_lock(tbl.get_table_info().clone())?; - let lock_guard = table_lock.try_lock(self.ctx.clone()).await?; + let lock_guard = self + .ctx + .clone() + .acquire_table_lock(catalog_name, db_name, tbl_name) + .await?; - // refresh table. - let tbl = tbl.refresh(self.ctx.as_ref()).await?; + let catalog = self.ctx.get_catalog(catalog_name).await?; + let catalog_info = catalog.info(); + let tbl = catalog + .get_table(&self.ctx.get_tenant(), db_name, tbl_name) + .await?; // check mutability tbl.check_mutable()?; @@ -355,7 +354,6 @@ impl DeleteInterpreter { mutation_kind: MutationKind::Delete, update_stream_meta: vec![], merge_meta, - need_lock: false, deduplicated_label: None, plan_id: u32::MAX, })); diff --git a/src/query/service/src/interpreters/interpreter_merge_into.rs b/src/query/service/src/interpreters/interpreter_merge_into.rs index ee409461a2989..f58f2766c5948 100644 --- a/src/query/service/src/interpreters/interpreter_merge_into.rs +++ b/src/query/service/src/interpreters/interpreter_merge_into.rs @@ -60,7 +60,6 @@ use itertools::Itertools; use crate::interpreters::common::dml_build_update_stream_req; use crate::interpreters::HookOperator; use crate::interpreters::Interpreter; -use crate::locks::LockManager; use crate::pipelines::PipelineBuildResult; use crate::schedulers::build_query_pipeline_without_render_result_set; use crate::sessions::QueryContext; @@ -145,7 +144,17 @@ impl MergeIntoInterpreter { } = &self.plan; let enable_right_broadcast = *enable_right_broadcast; let mut columns_set = columns_set.clone(); + + // Add table lock before execution. + let lock_guard = self + .ctx + .clone() + .acquire_table_lock(catalog, database, table_name) + .await?; + let table = self.ctx.get_table(catalog, database, table_name).await?; + // check mutability + table.check_mutable()?; let fuse_table = table.as_any().downcast_ref::().ok_or_else(|| { ErrorCode::Unimplemented(format!( "table {}, engine type {}, does not support MERGE INTO", @@ -155,9 +164,6 @@ impl MergeIntoInterpreter { })?; let table_info = fuse_table.get_table_info(); - // Add table lock before execution. - let table_lock = LockManager::create_table_lock(table_info.clone())?; - let lock_guard = table_lock.try_lock(self.ctx.clone()).await?; // attentation!! for now we have some strategies: // 1. target_build_optimization, this is enabled in standalone mode and in this case we don't need rowid column anymore. @@ -204,10 +210,6 @@ impl MergeIntoInterpreter { } } - // check mutability - let check_table = self.ctx.get_table(catalog, database, table_name).await?; - check_table.check_mutable()?; - let update_stream_meta = dml_build_update_stream_req(self.ctx.clone(), meta_data).await?; let table_name = table_name.clone(); @@ -485,7 +487,6 @@ impl MergeIntoInterpreter { mutation_kind: MutationKind::Update, update_stream_meta: update_stream_meta.clone(), merge_meta: false, - need_lock: false, deduplicated_label: unsafe { self.ctx.get_settings().get_deduplicate_label()? }, plan_id: u32::MAX, })); diff --git a/src/query/service/src/interpreters/interpreter_replace.rs b/src/query/service/src/interpreters/interpreter_replace.rs index 9cb90a64d5fd4..a2f040de0845e 100644 --- a/src/query/service/src/interpreters/interpreter_replace.rs +++ b/src/query/service/src/interpreters/interpreter_replace.rs @@ -335,7 +335,6 @@ impl ReplaceInterpreter { mutation_kind: MutationKind::Replace, update_stream_meta: update_stream_meta.clone(), merge_meta: false, - need_lock: false, deduplicated_label: unsafe { self.ctx.get_settings().get_deduplicate_label()? }, plan_id: u32::MAX, }))); diff --git a/src/query/service/src/interpreters/interpreter_table_index_refresh.rs b/src/query/service/src/interpreters/interpreter_table_index_refresh.rs index daa39cb4c5cee..a45453dc64228 100644 --- a/src/query/service/src/interpreters/interpreter_table_index_refresh.rs +++ b/src/query/service/src/interpreters/interpreter_table_index_refresh.rs @@ -20,13 +20,11 @@ use databend_common_exception::Result; use databend_common_expression::TableSchemaRefExt; use databend_common_license::license::Feature; use databend_common_license::license_manager::get_license_manager; -use databend_common_sql::plans::LockTableOption; use databend_common_sql::plans::RefreshTableIndexPlan; use databend_common_storages_fuse::FuseTable; use databend_common_storages_fuse::TableContext; use crate::interpreters::Interpreter; -use crate::locks::LockManager; use crate::pipelines::PipelineBuildResult; use crate::sessions::QueryContext; @@ -58,10 +56,24 @@ impl Interpreter for RefreshTableIndexInterpreter { .manager .check_enterprise_enabled(self.ctx.get_license_key(), Feature::InvertedIndex)?; + // Add table lock. + let lock_guard = self + .ctx + .clone() + .acquire_table_lock_with_opt( + &self.plan.catalog, + &self.plan.database, + &self.plan.table, + &self.plan.lock_opt, + ) + .await?; + let table = self .ctx .get_table(&self.plan.catalog, &self.plan.database, &self.plan.table) .await?; + // check mutability + table.check_mutable()?; let index_name = self.plan.index_name.clone(); let segment_locs = self.plan.segment_locs.clone(); @@ -90,19 +102,6 @@ impl Interpreter for RefreshTableIndexInterpreter { let index_version = index.version.clone(); let index_schema = TableSchemaRefExt::create(index_fields); - // Add table lock if need. - let table_lock = LockManager::create_table_lock(table.get_table_info().clone())?; - let lock_guard = match self.plan.lock_opt { - LockTableOption::LockNoRetry => table_lock.try_lock_no_retry(self.ctx.clone()).await?, - LockTableOption::LockWithRetry => table_lock.try_lock(self.ctx.clone()).await?, - LockTableOption::NoLock => None, - }; - - // refresh table. - let table = table.refresh(self.ctx.as_ref()).await?; - // check mutability - table.check_mutable()?; - let mut build_res = PipelineBuildResult::create(); build_res.main_pipeline.add_lock_guard(lock_guard); diff --git a/src/query/service/src/interpreters/interpreter_table_modify_column.rs b/src/query/service/src/interpreters/interpreter_table_modify_column.rs index 4372419f0ef8e..d444a5ea1c528 100644 --- a/src/query/service/src/interpreters/interpreter_table_modify_column.rs +++ b/src/query/service/src/interpreters/interpreter_table_modify_column.rs @@ -51,9 +51,8 @@ use databend_enterprise_data_mask_feature::get_datamask_handler; use databend_storages_common_index::BloomIndex; use databend_storages_common_table_meta::table::OPT_KEY_BLOOM_INDEX_COLUMNS; -use super::common::check_referenced_computed_columns; +use crate::interpreters::common::check_referenced_computed_columns; use crate::interpreters::Interpreter; -use crate::locks::LockManager; use crate::pipelines::PipelineBuildResult; use crate::schedulers::build_query_pipeline_without_render_result_set; use crate::sessions::QueryContext; @@ -73,7 +72,7 @@ impl ModifyTableColumnInterpreter { async fn do_set_data_mask_policy( &self, catalog: Arc, - table: &Arc, + table: Arc, column: String, mask_name: String, ) -> Result { @@ -140,15 +139,9 @@ impl ModifyTableColumnInterpreter { // Set data column type. async fn do_set_data_type( &self, - table: &Arc, + table: Arc, field_and_comments: &[(TableField, String)], ) -> Result { - // Add table lock. - let table_lock = LockManager::create_table_lock(table.get_table_info().clone())?; - let lock_guard = table_lock.try_lock(self.ctx.clone()).await?; - // refresh table. - let table = table.refresh(self.ctx.as_ref()).await?; - let schema = table.schema().as_ref().clone(); let table_info = table.get_table_info(); let mut new_schema = schema.clone(); @@ -420,7 +413,6 @@ impl ModifyTableColumnInterpreter { None, )?; - build_res.main_pipeline.add_lock_guard(lock_guard); Ok(build_res) } @@ -428,7 +420,7 @@ impl ModifyTableColumnInterpreter { async fn do_unset_data_mask_policy( &self, catalog: Arc, - table: &Arc, + table: Arc, column: String, ) -> Result { let license_manager = get_license_manager(); @@ -474,7 +466,7 @@ impl ModifyTableColumnInterpreter { async fn do_convert_stored_computed_column( &self, catalog: Arc, - table: &Arc, + table: Arc, table_meta: TableMeta, column: String, ) -> Result { @@ -553,53 +545,50 @@ impl Interpreter for ModifyTableColumnInterpreter { let db_name = self.plan.database.as_str(); let tbl_name = self.plan.table.as_str(); - let tbl = self + // try add lock table. + let lock_guard = self .ctx - .get_catalog(catalog_name) - .await? + .clone() + .acquire_table_lock(catalog_name, db_name, tbl_name) + .await?; + + let catalog = self.ctx.get_catalog(catalog_name).await?; + let table = catalog .get_table(&self.ctx.get_tenant(), db_name, tbl_name) - .await - .ok(); + .await?; - let table = if let Some(table) = &tbl { - // check mutability - table.check_mutable()?; - table - } else { - return Ok(PipelineBuildResult::create()); - }; + table.check_mutable()?; let table_info = table.get_table_info(); let engine = table.engine(); if matches!(engine, VIEW_ENGINE | STREAM_ENGINE) { return Err(ErrorCode::TableEngineNotSupported(format!( "{}.{} engine is {} that doesn't support alter", - &self.plan.database, &self.plan.table, engine + db_name, tbl_name, engine ))); } if table_info.db_type != DatabaseType::NormalDB { return Err(ErrorCode::TableEngineNotSupported(format!( "{}.{} doesn't support alter", - &self.plan.database, &self.plan.table + db_name, tbl_name ))); } - let catalog = self.ctx.get_catalog(catalog_name).await?; let table_meta = table.get_table_info().meta.clone(); // NOTICE: if we support modify column data type, // need to check whether this column is referenced by other computed columns. - match &self.plan.action { + let mut build_res = match &self.plan.action { ModifyColumnAction::SetMaskingPolicy(column, mask_name) => { self.do_set_data_mask_policy(catalog, table, column.to_string(), mask_name.clone()) - .await + .await? } ModifyColumnAction::UnsetMaskingPolicy(column) => { self.do_unset_data_mask_policy(catalog, table, column.to_string()) - .await + .await? } ModifyColumnAction::SetDataType(field_and_comment) => { - self.do_set_data_type(table, field_and_comment).await + self.do_set_data_type(table, field_and_comment).await? } ModifyColumnAction::ConvertStoredComputedColumn(column) => { self.do_convert_stored_computed_column( @@ -608,8 +597,11 @@ impl Interpreter for ModifyTableColumnInterpreter { table_meta, column.to_string(), ) - .await + .await? } - } + }; + + build_res.main_pipeline.add_lock_guard(lock_guard); + Ok(build_res) } } diff --git a/src/query/service/src/interpreters/interpreter_table_optimize.rs b/src/query/service/src/interpreters/interpreter_table_optimize.rs index 26fd7944a86a6..8b71de7dc1cd5 100644 --- a/src/query/service/src/interpreters/interpreter_table_optimize.rs +++ b/src/query/service/src/interpreters/interpreter_table_optimize.rs @@ -21,7 +21,6 @@ use databend_common_catalog::plan::PartInfoType; use databend_common_catalog::plan::Partitions; use databend_common_catalog::table::CompactTarget; use databend_common_catalog::table::CompactionLimits; -use databend_common_catalog::table::Table; use databend_common_catalog::table::TableExt; use databend_common_exception::Result; use databend_common_meta_app::schema::CatalogInfo; @@ -33,7 +32,6 @@ use databend_common_sql::executor::physical_plans::Exchange; use databend_common_sql::executor::physical_plans::FragmentKind; use databend_common_sql::executor::physical_plans::MutationKind; use databend_common_sql::executor::PhysicalPlan; -use databend_common_sql::plans::LockTableOption; use databend_common_sql::plans::OptimizeTableAction; use databend_common_sql::plans::OptimizeTablePlan; use databend_common_storages_factory::NavigationPoint; @@ -43,7 +41,6 @@ use databend_storages_common_table_meta::meta::TableSnapshot; use crate::interpreters::interpreter_table_recluster::build_recluster_physical_plan; use crate::interpreters::Interpreter; use crate::interpreters::InterpreterClusteringHistory; -use crate::locks::LockManager; use crate::pipelines::executor::ExecutorSettings; use crate::pipelines::executor::PipelineCompleteExecutor; use crate::pipelines::PipelineBuildResult; @@ -78,20 +75,14 @@ impl Interpreter for OptimizeTableInterpreter { let plan = self.plan.clone(); let catalog = self.ctx.get_catalog(&self.plan.catalog).await?; - let tenant = self.ctx.get_tenant(); - let table = catalog - .get_table(&tenant, &self.plan.database, &self.plan.table) - .await?; - // check mutability - table.check_mutable()?; match self.plan.action.clone() { OptimizeTableAction::CompactBlocks(limit) => { - self.build_pipeline(catalog, table, CompactTarget::Blocks(limit), false) + self.build_pipeline(catalog, CompactTarget::Blocks(limit), false) .await } OptimizeTableAction::CompactSegments => { - self.build_pipeline(catalog, table, CompactTarget::Segments, false) + self.build_pipeline(catalog, CompactTarget::Segments, false) .await } OptimizeTableAction::Purge(point) => { @@ -99,7 +90,7 @@ impl Interpreter for OptimizeTableInterpreter { Ok(PipelineBuildResult::create()) } OptimizeTableAction::All => { - self.build_pipeline(catalog, table, CompactTarget::Blocks(None), true) + self.build_pipeline(catalog, CompactTarget::Blocks(None), true) .await } } @@ -142,7 +133,6 @@ impl OptimizeTableInterpreter { mutation_kind: MutationKind::Compact, update_stream_meta: vec![], merge_meta, - need_lock: false, deduplicated_label: None, plan_id: u32::MAX, }))) @@ -151,20 +141,26 @@ impl OptimizeTableInterpreter { async fn build_pipeline( &self, catalog: Arc, - mut table: Arc, target: CompactTarget, need_purge: bool, ) -> Result { let tenant = self.ctx.get_tenant(); - let table_info = table.get_table_info().clone(); + let lock_guard = self + .ctx + .clone() + .acquire_table_lock_with_opt( + &self.plan.catalog, + &self.plan.database, + &self.plan.table, + &self.plan.lock_opt, + ) + .await?; - // check if the table is locked. - let table_lock = LockManager::create_table_lock(table_info.clone())?; - let lock_guard = match self.plan.lock_opt { - LockTableOption::LockNoRetry => table_lock.try_lock_no_retry(self.ctx.clone()).await?, - LockTableOption::LockWithRetry => table_lock.try_lock(self.ctx.clone()).await?, - LockTableOption::NoLock => None, - }; + let mut table = catalog + .get_table(&tenant, &self.plan.database, &self.plan.table) + .await?; + // check mutability + table.check_mutable()?; let compaction_limits = match target { CompactTarget::Segments => { @@ -191,7 +187,7 @@ impl OptimizeTableInterpreter { let mut compact_pipeline = if let Some((parts, snapshot)) = res { let physical_plan = Self::build_physical_plan( parts, - table_info, + table.get_table_info().clone(), snapshot, catalog_info, compact_is_distributed, @@ -243,7 +239,6 @@ impl OptimizeTableInterpreter { mutator.remained_blocks, mutator.removed_segment_indexes, mutator.removed_segment_summary, - false, )?; build_res = @@ -303,6 +298,8 @@ async fn purge( let table = catalog .get_table(&ctx.get_tenant(), &plan.database, &plan.table) .await?; + // check mutability + table.check_mutable()?; let keep_latest = true; let res = table diff --git a/src/query/service/src/interpreters/interpreter_table_recluster.rs b/src/query/service/src/interpreters/interpreter_table_recluster.rs index 316081ecc79cc..51fc511ab062b 100644 --- a/src/query/service/src/interpreters/interpreter_table_recluster.rs +++ b/src/query/service/src/interpreters/interpreter_table_recluster.rs @@ -19,7 +19,6 @@ use std::time::SystemTime; use databend_common_catalog::plan::Filters; use databend_common_catalog::plan::PushDownInfo; use databend_common_catalog::table::TableExt; -use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_expression::type_check::check_function; use databend_common_functions::BUILTIN_FUNCTIONS; @@ -40,8 +39,6 @@ use log::warn; use crate::interpreters::Interpreter; use crate::interpreters::InterpreterClusteringHistory; -use crate::locks::LockExt; -use crate::locks::LockManager; use crate::pipelines::executor::ExecutorSettings; use crate::pipelines::executor::PipelineCompleteExecutor; use crate::pipelines::PipelineBuildResult; @@ -107,12 +104,6 @@ impl Interpreter for ReclusterTableInterpreter { let catalog = self.ctx.get_catalog(&self.plan.catalog).await?; let tenant = self.ctx.get_tenant(); - let mut table = catalog - .get_table(&tenant, &self.plan.database, &self.plan.table) - .await?; - - // check mutability - table.check_mutable()?; let mut times = 0; let mut block_count = 0; @@ -127,16 +118,18 @@ impl Interpreter for ReclusterTableInterpreter { return Err(err); } - let table_info = table.get_table_info().clone(); + // try add lock table. + let lock_guard = self + .ctx + .clone() + .acquire_table_lock(&self.plan.catalog, &self.plan.database, &self.plan.table) + .await?; - // check if the table is locked. - let table_lock = LockManager::create_table_lock(table_info.clone())?; - if !table_lock.wait_lock_expired(catalog.clone()).await? { - return Err(ErrorCode::TableAlreadyLocked(format!( - "table '{}' is locked, please retry recluster later", - self.plan.table - ))); - } + let table = catalog + .get_table(&tenant, &self.plan.database, &self.plan.table) + .await?; + // check mutability + table.check_mutable()?; let fuse_table = FuseTable::try_from_table(table.as_ref())?; let mutator = fuse_table @@ -153,17 +146,17 @@ impl Interpreter for ReclusterTableInterpreter { block_count += mutator.recluster_blocks_count; let physical_plan = build_recluster_physical_plan( mutator.tasks, - table_info, + table.get_table_info().clone(), catalog.info(), mutator.snapshot, mutator.remained_blocks, mutator.removed_segment_indexes, mutator.removed_segment_summary, - true, )?; let mut build_res = build_query_pipeline_without_render_result_set(&self.ctx, &physical_plan).await?; + build_res.main_pipeline.add_lock_guard(lock_guard); assert!(build_res.main_pipeline.is_complete_pipeline()?); build_res.set_max_threads(max_threads); @@ -201,11 +194,6 @@ impl Interpreter for ReclusterTableInterpreter { ); break; } - - // refresh table. - table = catalog - .get_table(&tenant, &self.plan.database, &self.plan.table) - .await?; } if block_count != 0 { @@ -231,7 +219,6 @@ pub fn build_recluster_physical_plan( remained_blocks: Vec>, removed_segment_indexes: Vec, removed_segment_summary: Statistics, - need_lock: bool, ) -> Result { let is_distributed = tasks.len() > 1; let mut root = PhysicalPlan::ReclusterSource(Box::new(ReclusterSource { @@ -260,7 +247,6 @@ pub fn build_recluster_physical_plan( removed_segment_indexes, removed_segment_summary, plan_id: u32::MAX, - need_lock, })); plan.adjust_plan_id(&mut 0); Ok(plan) diff --git a/src/query/service/src/interpreters/interpreter_table_truncate.rs b/src/query/service/src/interpreters/interpreter_table_truncate.rs index 6c2ac941be574..56d41b271cc84 100644 --- a/src/query/service/src/interpreters/interpreter_table_truncate.rs +++ b/src/query/service/src/interpreters/interpreter_table_truncate.rs @@ -18,10 +18,8 @@ use databend_common_catalog::table::TableExt; use databend_common_config::GlobalConfig; use databend_common_exception::Result; use databend_common_sql::plans::TruncateTablePlan; -use databend_common_storages_fuse::FuseTable; use crate::interpreters::Interpreter; -use crate::locks::LockManager; use crate::pipelines::PipelineBuildResult; use crate::servers::flight::v1::packets::Packet; use crate::servers::flight::v1::packets::TruncateTablePacket; @@ -72,23 +70,20 @@ impl Interpreter for TruncateTableInterpreter { #[async_backtrace::framed] #[minitrace::trace] async fn execute2(&self) -> Result { + // try add lock table. + let lock_guard = self + .ctx + .clone() + .acquire_table_lock(&self.catalog_name, &self.database_name, &self.table_name) + .await?; + let table = self .ctx .get_table(&self.catalog_name, &self.database_name, &self.table_name) .await?; - // check mutability table.check_mutable()?; - // Add table lock. - let maybe_fuse_table = FuseTable::try_from_table(table.as_ref()).is_ok(); - let lock_guard = if maybe_fuse_table { - let table_lock = LockManager::create_table_lock(table.get_table_info().clone())?; - table_lock.try_lock(self.ctx.clone()).await? - } else { - None - }; - if self.proxy_to_cluster && table.broadcast_truncate_to_cluster() { let settings = self.ctx.get_settings(); let timeout = settings.get_flight_client_timeout()?; diff --git a/src/query/service/src/interpreters/interpreter_update.rs b/src/query/service/src/interpreters/interpreter_update.rs index 8339a25d35bd5..7e17b4d38c722 100644 --- a/src/query/service/src/interpreters/interpreter_update.rs +++ b/src/query/service/src/interpreters/interpreter_update.rs @@ -54,7 +54,6 @@ use crate::interpreters::interpreter_delete::replace_subquery; use crate::interpreters::interpreter_delete::subquery_filter; use crate::interpreters::HookOperator; use crate::interpreters::Interpreter; -use crate::locks::LockManager; use crate::pipelines::PipelineBuildResult; use crate::schedulers::build_query_pipeline_without_render_result_set; use crate::sessions::QueryContext; @@ -95,17 +94,15 @@ impl Interpreter for UpdateInterpreter { } let catalog_name = self.plan.catalog.as_str(); - let catalog = self.ctx.get_catalog(catalog_name).await?; - let db_name = self.plan.database.as_str(); let tbl_name = self.plan.table.as_str(); - let tbl = catalog - .get_table(&self.ctx.get_tenant(), db_name, tbl_name) - .await?; // Add table lock. - let table_lock = LockManager::create_table_lock(tbl.get_table_info().clone())?; - let lock_guard = table_lock.try_lock(self.ctx.clone()).await?; + let lock_guard = self + .ctx + .clone() + .acquire_table_lock(catalog_name, db_name, tbl_name) + .await?; // build physical plan. let physical_plan = self.get_physical_plan().await?; @@ -147,8 +144,6 @@ impl UpdateInterpreter { let tbl = catalog .get_table(&self.ctx.get_tenant(), db_name, tbl_name) .await?; - // refresh table. - let tbl = tbl.refresh(self.ctx.as_ref()).await?; // check mutability tbl.check_mutable()?; @@ -328,7 +323,6 @@ impl UpdateInterpreter { mutation_kind: MutationKind::Update, update_stream_meta: vec![], merge_meta, - need_lock: false, deduplicated_label: unsafe { ctx.get_settings().get_deduplicate_label()? }, plan_id: u32::MAX, })); diff --git a/src/query/service/src/locks/table_lock/mod.rs b/src/query/service/src/locks/table_lock/mod.rs index 9d38234341f61..d8e5069c933d4 100644 --- a/src/query/service/src/locks/table_lock/mod.rs +++ b/src/query/service/src/locks/table_lock/mod.rs @@ -56,20 +56,10 @@ impl Lock for TableLock { } async fn try_lock(&self, ctx: Arc) -> Result> { - let enabled_table_lock = ctx.get_settings().get_enable_table_lock().unwrap_or(false); - if enabled_table_lock { - self.lock_mgr.try_lock(ctx, self).await - } else { - Ok(None) - } + self.lock_mgr.try_lock(ctx, self).await } async fn try_lock_no_retry(&self, ctx: Arc) -> Result> { - let enabled_table_lock = ctx.get_settings().get_enable_table_lock().unwrap_or(false); - if enabled_table_lock { - self.lock_mgr.try_lock_no_retry(ctx, self).await - } else { - Ok(None) - } + self.lock_mgr.try_lock_no_retry(ctx, self).await } } diff --git a/src/query/service/src/pipelines/builders/builder_commit.rs b/src/query/service/src/pipelines/builders/builder_commit.rs index 82ad4a74054b2..6359727b47b98 100644 --- a/src/query/service/src/pipelines/builders/builder_commit.rs +++ b/src/query/service/src/pipelines/builders/builder_commit.rs @@ -24,7 +24,6 @@ use databend_common_storages_fuse::operations::TableMutationAggregator; use databend_common_storages_fuse::operations::TransformMergeCommitMeta; use databend_common_storages_fuse::FuseTable; -use crate::locks::LockManager; use crate::pipelines::PipelineBuilder; impl PipelineBuilder { @@ -66,11 +65,6 @@ impl PipelineBuilder { } let snapshot_gen = MutationGenerator::new(plan.snapshot.clone(), plan.mutation_kind); - let lock = if plan.need_lock { - Some(LockManager::create_table_lock(plan.table_info.clone())?) - } else { - None - }; self.main_pipeline.add_sink(|input| { CommitSink::try_create( table, @@ -80,7 +74,6 @@ impl PipelineBuilder { snapshot_gen.clone(), input, None, - lock.clone(), None, plan.deduplicated_label.clone(), ) diff --git a/src/query/service/src/pipelines/builders/builder_recluster.rs b/src/query/service/src/pipelines/builders/builder_recluster.rs index 44a156fd18563..43ac9f5f46170 100644 --- a/src/query/service/src/pipelines/builders/builder_recluster.rs +++ b/src/query/service/src/pipelines/builders/builder_recluster.rs @@ -39,7 +39,6 @@ use databend_common_storages_fuse::operations::TransformSerializeBlock; use databend_common_storages_fuse::FuseTable; use databend_common_storages_fuse::TableContext; -use crate::locks::LockManager; use crate::pipelines::builders::SortPipelineBuilder; use crate::pipelines::processors::TransformAddStreamColumns; use crate::pipelines::PipelineBuilder; @@ -229,13 +228,6 @@ impl PipelineBuilder { let snapshot_gen = MutationGenerator::new(recluster_sink.snapshot.clone(), MutationKind::Recluster); - let lock = if recluster_sink.need_lock { - Some(LockManager::create_table_lock( - recluster_sink.table_info.clone(), - )?) - } else { - None - }; self.main_pipeline.add_sink(|input| { CommitSink::try_create( table, @@ -245,7 +237,6 @@ impl PipelineBuilder { snapshot_gen.clone(), input, None, - lock.clone(), None, None, ) diff --git a/src/query/service/src/sessions/query_ctx.rs b/src/query/service/src/sessions/query_ctx.rs index ab66f2483d53b..a7277ee64db55 100644 --- a/src/query/service/src/sessions/query_ctx.rs +++ b/src/query/service/src/sessions/query_ctx.rs @@ -75,7 +75,9 @@ use databend_common_meta_app::tenant::Tenant; use databend_common_metrics::storage::*; use databend_common_pipeline_core::processors::PlanProfile; use databend_common_pipeline_core::InputError; +use databend_common_pipeline_core::LockGuard; use databend_common_settings::Settings; +use databend_common_sql::plans::LockTableOption; use databend_common_sql::IndexType; use databend_common_storage::CopyStatus; use databend_common_storage::DataOperator; @@ -102,6 +104,7 @@ use xorf::BinaryFuse16; use crate::catalogs::Catalog; use crate::clusters::Cluster; +use crate::locks::LockManager; use crate::pipelines::executor::PipelineExecutor; use crate::servers::flight::v1::exchange::DataExchangeManager; use crate::sessions::query_affect::QueryAffect; @@ -315,6 +318,59 @@ impl QueryContext { pub fn clear_tables_cache(&self) { self.shared.clear_tables_cache() } + + pub async fn acquire_table_lock( + self: Arc, + catalog_name: &str, + db_name: &str, + tbl_name: &str, + ) -> Result> { + let enabled_table_lock = self.get_settings().get_enable_table_lock().unwrap_or(false); + if !enabled_table_lock { + return Ok(None); + } + + let catalog = self.get_catalog(catalog_name).await?; + let tbl = catalog + .get_table(&self.get_tenant(), db_name, tbl_name) + .await?; + if tbl.engine() != "FUSE" { + return Ok(None); + } + + // Add table lock. + let table_lock = LockManager::create_table_lock(tbl.get_table_info().clone())?; + table_lock.try_lock(self).await + } + + pub async fn acquire_table_lock_with_opt( + self: Arc, + catalog_name: &str, + db_name: &str, + tbl_name: &str, + lock_opt: &LockTableOption, + ) -> Result> { + let enabled_table_lock = self.get_settings().get_enable_table_lock().unwrap_or(false); + if !enabled_table_lock { + return Ok(None); + } + + let catalog = self.get_catalog(catalog_name).await?; + let tbl = catalog + .get_table(&self.get_tenant(), db_name, tbl_name) + .await?; + if tbl.engine() != "FUSE" { + return Ok(None); + } + + // Add table lock. + let table_lock = LockManager::create_table_lock(tbl.get_table_info().clone())?; + match lock_opt { + LockTableOption::LockNoRetry => table_lock.try_lock_no_retry(self).await, + LockTableOption::LockWithRetry => table_lock.try_lock(self).await, + LockTableOption::NoLock => Ok(None), + } + } } #[async_trait::async_trait] diff --git a/src/query/sql/src/executor/physical_plans/physical_commit_sink.rs b/src/query/sql/src/executor/physical_plans/physical_commit_sink.rs index 86ea6dd5783da..c345865e0d75e 100644 --- a/src/query/sql/src/executor/physical_plans/physical_commit_sink.rs +++ b/src/query/sql/src/executor/physical_plans/physical_commit_sink.rs @@ -34,6 +34,5 @@ pub struct CommitSink { pub mutation_kind: MutationKind, pub update_stream_meta: Vec, pub merge_meta: bool, - pub need_lock: bool, pub deduplicated_label: Option, } diff --git a/src/query/sql/src/executor/physical_plans/physical_recluster_sink.rs b/src/query/sql/src/executor/physical_plans/physical_recluster_sink.rs index b58f763eca1d2..f9fd7c1742859 100644 --- a/src/query/sql/src/executor/physical_plans/physical_recluster_sink.rs +++ b/src/query/sql/src/executor/physical_plans/physical_recluster_sink.rs @@ -34,5 +34,4 @@ pub struct ReclusterSink { pub remained_blocks: Vec>, pub removed_segment_indexes: Vec, pub removed_segment_summary: Statistics, - pub need_lock: bool, } diff --git a/src/query/storages/fuse/src/operations/commit.rs b/src/query/storages/fuse/src/operations/commit.rs index b9d617c7479ef..ab9948736c404 100644 --- a/src/query/storages/fuse/src/operations/commit.rs +++ b/src/query/storages/fuse/src/operations/commit.rs @@ -104,7 +104,6 @@ impl FuseTable { snapshot_gen.clone(), input, None, - None, prev_snapshot_id, deduplicated_label.clone(), ) diff --git a/src/query/storages/fuse/src/operations/common/processors/sink_commit.rs b/src/query/storages/fuse/src/operations/common/processors/sink_commit.rs index ad26a0aef5e0a..4def9bbf6ff38 100644 --- a/src/query/storages/fuse/src/operations/common/processors/sink_commit.rs +++ b/src/query/storages/fuse/src/operations/common/processors/sink_commit.rs @@ -19,7 +19,6 @@ use std::time::Instant; use backoff::backoff::Backoff; use backoff::ExponentialBackoff; -use databend_common_catalog::lock::Lock; use databend_common_catalog::table::Table; use databend_common_catalog::table::TableExt; use databend_common_catalog::table_context::TableContext; @@ -35,7 +34,6 @@ use databend_common_pipeline_core::processors::Event; use databend_common_pipeline_core::processors::InputPort; use databend_common_pipeline_core::processors::Processor; use databend_common_pipeline_core::processors::ProcessorPtr; -use databend_common_pipeline_core::LockGuard; use databend_storages_common_table_meta::meta::ClusterKey; use databend_storages_common_table_meta::meta::Location; use databend_storages_common_table_meta::meta::SnapshotId; @@ -59,7 +57,6 @@ use crate::FuseTable; enum State { None, FillDefault, - TryLock, RefreshTable, GenerateSnapshot { previous: Option>, @@ -94,8 +91,6 @@ pub struct CommitSink { backoff: ExponentialBackoff, new_segment_locs: Vec, - lock_guard: Option, - lock: Option>, start_time: Instant, prev_snapshot_id: Option, @@ -116,7 +111,6 @@ where F: SnapshotGenerator + Send + 'static snapshot_gen: F, input: Arc, max_retry_elapsed: Option, - lock: Option>, prev_snapshot_id: Option, deduplicated_label: Option, ) -> Result { @@ -129,14 +123,12 @@ where F: SnapshotGenerator + Send + 'static table: Arc::new(table.clone()), copied_files, snapshot_gen, - lock_guard: None, purge, backoff: ExponentialBackoff::default(), retries: 0, max_retry_elapsed, input, new_segment_locs: vec![], - lock, start_time: Instant::now(), prev_snapshot_id, change_tracking: table.change_tracking_enabled(), @@ -179,11 +171,7 @@ where F: SnapshotGenerator + Send + 'static self.snapshot_gen .set_conflict_resolve_context(meta.conflict_resolve_context); - if self.lock.is_some() { - self.state = State::TryLock; - } else { - self.state = State::FillDefault; - } + self.state = State::FillDefault; Ok(Event::Async) } @@ -242,8 +230,6 @@ where F: SnapshotGenerator + Send + 'static } if matches!(self.state, State::Finish) { - // release the lock manually. - std::mem::take(&mut self.lock_guard); return Ok(Event::Finished); } @@ -364,15 +350,6 @@ where F: SnapshotGenerator + Send + 'static }; } } - State::TryLock => match self.lock.as_ref().unwrap().try_lock(self.ctx.clone()).await { - Ok(guard) => { - self.lock_guard = guard; - self.state = State::FillDefault; - } - Err(e) => { - self.state = State::Abort(e); - } - }, State::TryCommit { data, snapshot, diff --git a/src/query/storages/fuse/src/operations/inverted_index.rs b/src/query/storages/fuse/src/operations/inverted_index.rs index ac9611850c216..72f018d1a087f 100644 --- a/src/query/storages/fuse/src/operations/inverted_index.rs +++ b/src/query/storages/fuse/src/operations/inverted_index.rs @@ -107,8 +107,11 @@ impl FuseTable { MetaReaders::segment_info_reader(self.get_operator(), table_schema.clone()); // If no segment locations are specified, iterates through all segments - let segment_locs = if let Some(segment_locs) = &segment_locs { - segment_locs.clone() + let segment_locs = if let Some(segment_locs) = segment_locs { + segment_locs + .into_iter() + .filter(|s| snapshot.segments.contains(s)) + .collect() } else { snapshot.segments.clone() }; diff --git a/src/query/storages/fuse/src/operations/truncate.rs b/src/query/storages/fuse/src/operations/truncate.rs index b7657f04c003d..f3397cc6e2571 100644 --- a/src/query/storages/fuse/src/operations/truncate.rs +++ b/src/query/storages/fuse/src/operations/truncate.rs @@ -67,7 +67,6 @@ impl FuseTable { snapshot_gen.clone(), input, None, - None, prev_snapshot_id, None, ) diff --git a/tests/cloud_control_server/simple_server.py b/tests/cloud_control_server/simple_server.py index 0ef3de850ee90..f2371ae0c46fc 100644 --- a/tests/cloud_control_server/simple_server.py +++ b/tests/cloud_control_server/simple_server.py @@ -44,9 +44,9 @@ def load_data_from_json(): notification_history_data = json.load(f) notification_history = notification_pb2.NotificationHistory() json_format.ParseDict(notification_history_data, notification_history) - NOTIFICATION_HISTORY_DB[ - notification_history.name - ] = notification_history + NOTIFICATION_HISTORY_DB[notification_history.name] = ( + notification_history + ) def create_task_request_to_task(id, create_task_request):