diff --git a/src/query/service/src/interpreters/interpreter_explain.rs b/src/query/service/src/interpreters/interpreter_explain.rs index fbf195de57f6..74b8bb121bdc 100644 --- a/src/query/service/src/interpreters/interpreter_explain.rs +++ b/src/query/service/src/interpreters/interpreter_explain.rs @@ -30,7 +30,10 @@ use databend_common_pipeline_core::ExecutionInfo; use databend_common_sql::binder::ExplainConfig; use databend_common_sql::executor::format_partial_tree; use databend_common_sql::optimizer::ColumnSet; +use databend_common_sql::plans::Insert; +use databend_common_sql::plans::InsertInputSource; use databend_common_sql::plans::Mutation; +use databend_common_sql::plans::Replace; use databend_common_sql::BindContext; use databend_common_sql::MetadataRef; use databend_common_storages_result_cache::gen_result_cache_key; @@ -53,6 +56,8 @@ use crate::sessions::QueryContext; use crate::sql::executor::PhysicalPlan; use crate::sql::executor::PhysicalPlanBuilder; use crate::sql::optimizer::SExpr; +use crate::sql::plans::CopyIntoTablePlan; +use crate::sql::plans::InsertValue; use crate::sql::plans::Plan; pub struct ExplainInterpreter { @@ -89,8 +94,8 @@ impl Interpreter for ExplainInterpreter { self.explain_query(s_expr, metadata, bind_context, formatted_ast) .await? } - Plan::Insert(insert_plan) => insert_plan.explain(self.config.verbose).await?, - Plan::Replace(replace_plan) => replace_plan.explain(self.config.verbose).await?, + Plan::Insert(insert_plan) => self.explain_insert(&*insert_plan).await?, + Plan::Replace(replace_plan) => self.explain_replace(&*replace_plan).await?, Plan::CreateTable(plan) => match &plan.as_select { Some(box Plan::Query { s_expr, @@ -482,4 +487,162 @@ impl ExplainInterpreter { let formatted_plan = StringType::from_data(line_split_result); Ok(vec![DataBlock::new_from_columns(vec![formatted_plan])]) } + + #[async_backtrace::framed] + pub async fn explain_insert(&self, insert: &Insert) -> Result> { + let Insert { + catalog, + database, + table, + schema, + overwrite, + source, + // table_info only used create table as select. + table_info: _, + } = insert; + + let table_name = format!("{}.{}.{}", catalog, database, table); + let inserted_columns = schema + .fields + .iter() + .map(|field| format!("{}.{} (#{})", table, field.name, field.column_id)) + .collect::>() + .join(","); + + let children = vec![ + FormatTreeNode::new(format!("table: {table_name}")), + FormatTreeNode::new(format!("inserted columns: [{inserted_columns}]")), + FormatTreeNode::new(format!("overwrite: {overwrite}")), + ]; + + let formatted_plan = self + .format_insert_source("InsertPlan", source, children) + .await?; + + Ok(vec![DataBlock::concat(&formatted_plan)?]) + } + + #[async_backtrace::framed] + pub async fn explain_replace(&self, replace: &Replace) -> Result> { + let Replace { + catalog, + database, + table, + source, + on_conflict_fields, + .. + } = replace; + + let table_name = format!("{}.{}.{}", catalog, database, table); + let on_columns = on_conflict_fields + .iter() + .map(|field| format!("{}.{} (#{})", table, field.name, field.column_id)) + .collect::>() + .join(","); + + let children = vec![ + FormatTreeNode::new(format!("table: {table_name}")), + FormatTreeNode::new(format!("on columns: [{on_columns}]")), + ]; + + let formatted_plan = self + .format_insert_source("ReplacePlan", source, children) + .await?; + + Ok(vec![DataBlock::concat(&formatted_plan)?]) + } + + pub(crate) async fn format_insert_source( + &self, + plan_name: &str, + source: &InsertInputSource, + mut children: Vec, + ) -> Result> { + let (plan_name, sub_plan) = match source { + InsertInputSource::SelectPlan(plan) => { + if let Plan::Query { + s_expr, + metadata, + bind_context, + .. + } = &**plan + { + let plan_name = format!("{plan_name} (subquery)"); + let sub_plan = self + .explain_query(&s_expr, metadata, &*bind_context, &None) + .await?; + (plan_name, sub_plan) + } else { + (String::new(), vec![]) + } + } + InsertInputSource::Values(values) => match values { + InsertValue::Values { .. } => { + let plan_name = format!("{plan_name} (values):"); + (plan_name, vec![]) + } + InsertValue::RawValues { .. } => { + let plan_name = format!("{plan_name} (rawvalues):"); + (plan_name, vec![]) + } + }, + InsertInputSource::Stage(plan) => match *plan.clone() { + Plan::CopyIntoTable(copy_plan) => { + let plan_name = format!("{plan_name} (stage):"); + let CopyIntoTablePlan { + no_file_to_copy, + from_attachment, + required_values_schema, + required_source_schema, + write_mode, + validation_mode, + force, + stage_table_info, + enable_distributed, + .. + } = &*copy_plan; + let required_values_schema = required_values_schema + .fields() + .iter() + .map(|field| field.name().to_string()) + .collect::>() + .join(","); + let required_source_schema = required_source_schema + .fields() + .iter() + .map(|field| field.name().to_string()) + .collect::>() + .join(","); + let stage_node = vec![ + FormatTreeNode::new(format!("no_file_to_copy: {no_file_to_copy}")), + FormatTreeNode::new(format!("from_attachment: {from_attachment}")), + FormatTreeNode::new(format!( + "required_values_schema: [{required_values_schema}]" + )), + FormatTreeNode::new(format!( + "required_source_schema: [{required_source_schema}]" + )), + FormatTreeNode::new(format!("write_mode: {write_mode}")), + FormatTreeNode::new(format!("validation_mode: {validation_mode}")), + FormatTreeNode::new(format!("force: {force}")), + FormatTreeNode::new(format!("stage_table_info: {stage_table_info}")), + FormatTreeNode::new(format!("enable_distributed: {enable_distributed}")), + ]; + children.extend(stage_node); + (plan_name, vec![]) + } + _ => unreachable!("plan in InsertInputSource::Stag must be CopyIntoTable"), + }, + }; + + let plan = FormatTreeNode::with_children(plan_name, children).format_pretty()?; + let mut result = vec![DataBlock::new_from_columns(vec![StringType::from_data( + plan.lines().collect(), + )])]; + + if !sub_plan.is_empty() { + result.extend(sub_plan); + } + Ok(result) + } } diff --git a/src/query/sql/src/planner/plans/insert.rs b/src/query/sql/src/planner/plans/insert.rs index f20a2c0830ee..f9fbbb3c97e1 100644 --- a/src/query/sql/src/planner/plans/insert.rs +++ b/src/query/sql/src/planner/plans/insert.rs @@ -14,11 +14,7 @@ use std::sync::Arc; -use databend_common_ast::ast::FormatTreeNode; -use databend_common_expression::types::StringType; -use databend_common_expression::DataBlock; use databend_common_expression::DataSchemaRef; -use databend_common_expression::FromData; use databend_common_expression::Scalar; use databend_common_expression::TableSchemaRef; use databend_common_meta_app::schema::TableInfo; @@ -27,7 +23,6 @@ use serde::Deserialize; use serde::Serialize; use super::Plan; -use crate::plans::CopyIntoTablePlan; #[derive(Clone, Debug, EnumAsInner)] pub enum InsertInputSource { @@ -76,134 +71,6 @@ impl Insert { pub fn has_select_plan(&self) -> bool { matches!(&self.source, InsertInputSource::SelectPlan(_)) } - - #[async_backtrace::framed] - pub async fn explain( - &self, - verbose: bool, - ) -> databend_common_exception::Result> { - let mut result = vec![]; - - let Insert { - catalog, - database, - table, - schema, - overwrite, - // table_info only used create table as select. - table_info: _, - source, - } = self; - - let table_name = format!("{}.{}.{}", catalog, database, table); - let inserted_columns = schema - .fields - .iter() - .map(|field| format!("{}.{} (#{})", table, field.name, field.column_id)) - .collect::>() - .join(","); - - let children = vec![ - FormatTreeNode::new(format!("table: {table_name}")), - FormatTreeNode::new(format!("inserted columns: [{inserted_columns}]")), - FormatTreeNode::new(format!("overwrite: {overwrite}")), - ]; - - let formatted_plan = format_insert_source("InsertPlan", source, verbose, children)?; - - let line_split_result: Vec<&str> = formatted_plan.lines().collect(); - let formatted_plan = StringType::from_data(line_split_result); - result.push(DataBlock::new_from_columns(vec![formatted_plan])); - Ok(vec![DataBlock::concat(&result)?]) - } -} - -pub(crate) fn format_insert_source( - plan_name: &str, - source: &InsertInputSource, - verbose: bool, - mut children: Vec, -) -> databend_common_exception::Result { - match source { - InsertInputSource::SelectPlan(plan) => { - if let Plan::Query { - s_expr, metadata, .. - } = &**plan - { - let metadata = &*metadata.read(); - let sub_tree = s_expr.to_format_tree(metadata, verbose)?; - children.push(sub_tree); - - return Ok(FormatTreeNode::with_children( - format!("{plan_name} (subquery):"), - children, - ) - .format_pretty()?); - } - Ok(String::new()) - } - InsertInputSource::Values(values) => match values { - InsertValue::Values { .. } => Ok(FormatTreeNode::with_children( - format!("{plan_name} (values):"), - children, - ) - .format_pretty()?), - InsertValue::RawValues { .. } => Ok(FormatTreeNode::with_children( - format!("{plan_name} (rawvalues):"), - children, - ) - .format_pretty()?), - }, - InsertInputSource::Stage(plan) => match *plan.clone() { - Plan::CopyIntoTable(copy_plan) => { - let CopyIntoTablePlan { - no_file_to_copy, - from_attachment, - required_values_schema, - required_source_schema, - write_mode, - validation_mode, - force, - stage_table_info, - enable_distributed, - .. - } = &*copy_plan; - let required_values_schema = required_values_schema - .fields() - .iter() - .map(|field| field.name().to_string()) - .collect::>() - .join(","); - let required_source_schema = required_source_schema - .fields() - .iter() - .map(|field| field.name().to_string()) - .collect::>() - .join(","); - let stage_node = vec![ - FormatTreeNode::new(format!("no_file_to_copy: {no_file_to_copy}")), - FormatTreeNode::new(format!("from_attachment: {from_attachment}")), - FormatTreeNode::new(format!( - "required_values_schema: [{required_values_schema}]" - )), - FormatTreeNode::new(format!( - "required_source_schema: [{required_source_schema}]" - )), - FormatTreeNode::new(format!("write_mode: {write_mode}")), - FormatTreeNode::new(format!("validation_mode: {validation_mode}")), - FormatTreeNode::new(format!("force: {force}")), - FormatTreeNode::new(format!("stage_table_info: {stage_table_info}")), - FormatTreeNode::new(format!("enable_distributed: {enable_distributed}")), - ]; - children.extend(stage_node); - Ok( - FormatTreeNode::with_children(format!("{plan_name} (stage):"), children) - .format_pretty()?, - ) - } - _ => unreachable!("plan in InsertInputSource::Stag must be CopyIntoTable"), - }, - } } impl std::fmt::Debug for Insert { diff --git a/src/query/sql/src/planner/plans/replace.rs b/src/query/sql/src/planner/plans/replace.rs index 6582cafde9df..23ecbb90776b 100644 --- a/src/query/sql/src/planner/plans/replace.rs +++ b/src/query/sql/src/planner/plans/replace.rs @@ -15,17 +15,12 @@ use std::sync::Arc; use databend_common_ast::ast::Expr; -use databend_common_ast::ast::FormatTreeNode; -use databend_common_expression::types::StringType; -use databend_common_expression::DataBlock; use databend_common_expression::DataSchemaRef; -use databend_common_expression::FromData; use databend_common_expression::TableField; use databend_common_expression::TableSchemaRef; use databend_common_meta_types::MetaId; use databend_common_pipeline_core::LockGuard; -use super::insert::format_insert_source; use crate::plans::InsertInputSource; #[derive(Clone)] @@ -59,41 +54,6 @@ impl Replace { pub fn has_select_plan(&self) -> bool { matches!(&self.source, InsertInputSource::SelectPlan(_)) } - - #[async_backtrace::framed] - pub async fn explain( - &self, - verbose: bool, - ) -> databend_common_exception::Result> { - let mut result = vec![]; - - let Replace { - catalog, - database, - table, - source, - on_conflict_fields, - .. - } = self; - - let table_name = format!("{}.{}.{}", catalog, database, table); - let on_columns = on_conflict_fields - .iter() - .map(|field| format!("{}.{} (#{})", table, field.name, field.column_id)) - .collect::>() - .join(","); - - let children = vec![ - FormatTreeNode::new(format!("table: {table_name}")), - FormatTreeNode::new(format!("on columns: [{on_columns}]")), - ]; - - let formatted_plan = format_insert_source("ReplacePlan", source, verbose, children)?; - let line_split_result: Vec<&str> = formatted_plan.lines().collect(); - let formatted_plan = StringType::from_data(line_split_result); - result.push(DataBlock::new_from_columns(vec![formatted_plan])); - Ok(vec![DataBlock::concat(&result)?]) - } } impl std::fmt::Debug for Replace {