Skip to content

Commit

Permalink
fix test
Browse files Browse the repository at this point in the history
  • Loading branch information
zhyass committed May 27, 2024
1 parent 2d93b4e commit aee2584
Show file tree
Hide file tree
Showing 20 changed files with 173 additions and 206 deletions.
22 changes: 10 additions & 12 deletions src/query/service/src/interpreters/interpreter_delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ use crate::interpreters::common::create_push_down_filters;
use crate::interpreters::HookOperator;
use crate::interpreters::Interpreter;
use crate::interpreters::SelectInterpreter;
use crate::locks::LockManager;
use crate::pipelines::executor::ExecutorSettings;
use crate::pipelines::executor::PipelinePullingExecutor;
use crate::pipelines::PipelineBuildResult;
Expand Down Expand Up @@ -111,21 +110,21 @@ impl Interpreter for DeleteInterpreter {
let is_distributed = !self.ctx.get_cluster().is_empty();

let catalog_name = self.plan.catalog_name.as_str();
let catalog = self.ctx.get_catalog(catalog_name).await?;
let catalog_info = catalog.info();

let db_name = self.plan.database_name.as_str();
let tbl_name = self.plan.table_name.as_str();
let tbl = catalog
.get_table(&self.ctx.get_tenant(), db_name, tbl_name)
.await?;

// Add table lock.
let table_lock = LockManager::create_table_lock(tbl.get_table_info().clone())?;
let lock_guard = table_lock.try_lock(self.ctx.clone()).await?;
let lock_guard = self
.ctx
.clone()
.acquire_table_lock(catalog_name, db_name, tbl_name)
.await?;

// refresh table.
let tbl = tbl.refresh(self.ctx.as_ref()).await?;
let catalog = self.ctx.get_catalog(catalog_name).await?;
let catalog_info = catalog.info();
let tbl = catalog
.get_table(&self.ctx.get_tenant(), db_name, tbl_name)
.await?;

// check mutability
tbl.check_mutable()?;
Expand Down Expand Up @@ -355,7 +354,6 @@ impl DeleteInterpreter {
mutation_kind: MutationKind::Delete,
update_stream_meta: vec![],
merge_meta,
need_lock: false,
deduplicated_label: None,
plan_id: u32::MAX,
}));
Expand Down
19 changes: 10 additions & 9 deletions src/query/service/src/interpreters/interpreter_merge_into.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ use itertools::Itertools;
use crate::interpreters::common::dml_build_update_stream_req;
use crate::interpreters::HookOperator;
use crate::interpreters::Interpreter;
use crate::locks::LockManager;
use crate::pipelines::PipelineBuildResult;
use crate::schedulers::build_query_pipeline_without_render_result_set;
use crate::sessions::QueryContext;
Expand Down Expand Up @@ -145,7 +144,17 @@ impl MergeIntoInterpreter {
} = &self.plan;
let enable_right_broadcast = *enable_right_broadcast;
let mut columns_set = columns_set.clone();

// Add table lock before execution.
let lock_guard = self
.ctx
.clone()
.acquire_table_lock(catalog, database, table_name)
.await?;

let table = self.ctx.get_table(catalog, database, table_name).await?;
// check mutability
table.check_mutable()?;
let fuse_table = table.as_any().downcast_ref::<FuseTable>().ok_or_else(|| {
ErrorCode::Unimplemented(format!(
"table {}, engine type {}, does not support MERGE INTO",
Expand All @@ -155,9 +164,6 @@ impl MergeIntoInterpreter {
})?;

let table_info = fuse_table.get_table_info();
// Add table lock before execution.
let table_lock = LockManager::create_table_lock(table_info.clone())?;
let lock_guard = table_lock.try_lock(self.ctx.clone()).await?;

// attentation!! for now we have some strategies:
// 1. target_build_optimization, this is enabled in standalone mode and in this case we don't need rowid column anymore.
Expand Down Expand Up @@ -204,10 +210,6 @@ impl MergeIntoInterpreter {
}
}

// check mutability
let check_table = self.ctx.get_table(catalog, database, table_name).await?;
check_table.check_mutable()?;

let update_stream_meta = dml_build_update_stream_req(self.ctx.clone(), meta_data).await?;

let table_name = table_name.clone();
Expand Down Expand Up @@ -485,7 +487,6 @@ impl MergeIntoInterpreter {
mutation_kind: MutationKind::Update,
update_stream_meta: update_stream_meta.clone(),
merge_meta: false,
need_lock: false,
deduplicated_label: unsafe { self.ctx.get_settings().get_deduplicate_label()? },
plan_id: u32::MAX,
}));
Expand Down
1 change: 0 additions & 1 deletion src/query/service/src/interpreters/interpreter_replace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,6 @@ impl ReplaceInterpreter {
mutation_kind: MutationKind::Replace,
update_stream_meta: update_stream_meta.clone(),
merge_meta: false,
need_lock: false,
deduplicated_label: unsafe { self.ctx.get_settings().get_deduplicate_label()? },
plan_id: u32::MAX,
})));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,11 @@ use databend_common_exception::Result;
use databend_common_expression::TableSchemaRefExt;
use databend_common_license::license::Feature;
use databend_common_license::license_manager::get_license_manager;
use databend_common_sql::plans::LockTableOption;
use databend_common_sql::plans::RefreshTableIndexPlan;
use databend_common_storages_fuse::FuseTable;
use databend_common_storages_fuse::TableContext;

use crate::interpreters::Interpreter;
use crate::locks::LockManager;
use crate::pipelines::PipelineBuildResult;
use crate::sessions::QueryContext;

Expand Down Expand Up @@ -58,10 +56,24 @@ impl Interpreter for RefreshTableIndexInterpreter {
.manager
.check_enterprise_enabled(self.ctx.get_license_key(), Feature::InvertedIndex)?;

// Add table lock.
let lock_guard = self
.ctx
.clone()
.acquire_table_lock_with_opt(
&self.plan.catalog,
&self.plan.database,
&self.plan.table,
&self.plan.lock_opt,
)
.await?;

let table = self
.ctx
.get_table(&self.plan.catalog, &self.plan.database, &self.plan.table)
.await?;
// check mutability
table.check_mutable()?;

let index_name = self.plan.index_name.clone();
let segment_locs = self.plan.segment_locs.clone();
Expand Down Expand Up @@ -90,19 +102,6 @@ impl Interpreter for RefreshTableIndexInterpreter {
let index_version = index.version.clone();
let index_schema = TableSchemaRefExt::create(index_fields);

// Add table lock if need.
let table_lock = LockManager::create_table_lock(table.get_table_info().clone())?;
let lock_guard = match self.plan.lock_opt {
LockTableOption::LockNoRetry => table_lock.try_lock_no_retry(self.ctx.clone()).await?,
LockTableOption::LockWithRetry => table_lock.try_lock(self.ctx.clone()).await?,
LockTableOption::NoLock => None,
};

// refresh table.
let table = table.refresh(self.ctx.as_ref()).await?;
// check mutability
table.check_mutable()?;

let mut build_res = PipelineBuildResult::create();
build_res.main_pipeline.add_lock_guard(lock_guard);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,8 @@ use databend_enterprise_data_mask_feature::get_datamask_handler;
use databend_storages_common_index::BloomIndex;
use databend_storages_common_table_meta::table::OPT_KEY_BLOOM_INDEX_COLUMNS;

use super::common::check_referenced_computed_columns;
use crate::interpreters::common::check_referenced_computed_columns;
use crate::interpreters::Interpreter;
use crate::locks::LockManager;
use crate::pipelines::PipelineBuildResult;
use crate::schedulers::build_query_pipeline_without_render_result_set;
use crate::sessions::QueryContext;
Expand All @@ -73,7 +72,7 @@ impl ModifyTableColumnInterpreter {
async fn do_set_data_mask_policy(
&self,
catalog: Arc<dyn Catalog>,
table: &Arc<dyn Table>,
table: Arc<dyn Table>,
column: String,
mask_name: String,
) -> Result<PipelineBuildResult> {
Expand Down Expand Up @@ -140,15 +139,9 @@ impl ModifyTableColumnInterpreter {
// Set data column type.
async fn do_set_data_type(
&self,
table: &Arc<dyn Table>,
table: Arc<dyn Table>,
field_and_comments: &[(TableField, String)],
) -> Result<PipelineBuildResult> {
// Add table lock.
let table_lock = LockManager::create_table_lock(table.get_table_info().clone())?;
let lock_guard = table_lock.try_lock(self.ctx.clone()).await?;
// refresh table.
let table = table.refresh(self.ctx.as_ref()).await?;

let schema = table.schema().as_ref().clone();
let table_info = table.get_table_info();
let mut new_schema = schema.clone();
Expand Down Expand Up @@ -420,15 +413,14 @@ impl ModifyTableColumnInterpreter {
None,
)?;

build_res.main_pipeline.add_lock_guard(lock_guard);
Ok(build_res)
}

// unset data mask policy to a column is a ee feature.
async fn do_unset_data_mask_policy(
&self,
catalog: Arc<dyn Catalog>,
table: &Arc<dyn Table>,
table: Arc<dyn Table>,
column: String,
) -> Result<PipelineBuildResult> {
let license_manager = get_license_manager();
Expand Down Expand Up @@ -474,7 +466,7 @@ impl ModifyTableColumnInterpreter {
async fn do_convert_stored_computed_column(
&self,
catalog: Arc<dyn Catalog>,
table: &Arc<dyn Table>,
table: Arc<dyn Table>,
table_meta: TableMeta,
column: String,
) -> Result<PipelineBuildResult> {
Expand Down Expand Up @@ -553,53 +545,50 @@ impl Interpreter for ModifyTableColumnInterpreter {
let db_name = self.plan.database.as_str();
let tbl_name = self.plan.table.as_str();

let tbl = self
// try add lock table.
let lock_guard = self
.ctx
.get_catalog(catalog_name)
.await?
.clone()
.acquire_table_lock(catalog_name, db_name, tbl_name)
.await?;

let catalog = self.ctx.get_catalog(catalog_name).await?;
let table = catalog
.get_table(&self.ctx.get_tenant(), db_name, tbl_name)
.await
.ok();
.await?;

let table = if let Some(table) = &tbl {
// check mutability
table.check_mutable()?;
table
} else {
return Ok(PipelineBuildResult::create());
};
table.check_mutable()?;

let table_info = table.get_table_info();
let engine = table.engine();
if matches!(engine, VIEW_ENGINE | STREAM_ENGINE) {
return Err(ErrorCode::TableEngineNotSupported(format!(
"{}.{} engine is {} that doesn't support alter",
&self.plan.database, &self.plan.table, engine
db_name, tbl_name, engine
)));
}
if table_info.db_type != DatabaseType::NormalDB {
return Err(ErrorCode::TableEngineNotSupported(format!(
"{}.{} doesn't support alter",
&self.plan.database, &self.plan.table
db_name, tbl_name
)));
}

let catalog = self.ctx.get_catalog(catalog_name).await?;
let table_meta = table.get_table_info().meta.clone();

// NOTICE: if we support modify column data type,
// need to check whether this column is referenced by other computed columns.
match &self.plan.action {
let mut build_res = match &self.plan.action {
ModifyColumnAction::SetMaskingPolicy(column, mask_name) => {
self.do_set_data_mask_policy(catalog, table, column.to_string(), mask_name.clone())
.await
.await?
}
ModifyColumnAction::UnsetMaskingPolicy(column) => {
self.do_unset_data_mask_policy(catalog, table, column.to_string())
.await
.await?
}
ModifyColumnAction::SetDataType(field_and_comment) => {
self.do_set_data_type(table, field_and_comment).await
self.do_set_data_type(table, field_and_comment).await?
}
ModifyColumnAction::ConvertStoredComputedColumn(column) => {
self.do_convert_stored_computed_column(
Expand All @@ -608,8 +597,11 @@ impl Interpreter for ModifyTableColumnInterpreter {
table_meta,
column.to_string(),
)
.await
.await?
}
}
};

build_res.main_pipeline.add_lock_guard(lock_guard);
Ok(build_res)
}
}
Loading

0 comments on commit aee2584

Please sign in to comment.