Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(query): improve insert and replace explain #16432

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
167 changes: 165 additions & 2 deletions src/query/service/src/interpreters/interpreter_explain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@ use databend_common_pipeline_core::ExecutionInfo;
use databend_common_sql::binder::ExplainConfig;
use databend_common_sql::executor::format_partial_tree;
use databend_common_sql::optimizer::ColumnSet;
use databend_common_sql::plans::Insert;
use databend_common_sql::plans::InsertInputSource;
use databend_common_sql::plans::Mutation;
use databend_common_sql::plans::Replace;
use databend_common_sql::BindContext;
use databend_common_sql::MetadataRef;
use databend_common_storages_result_cache::gen_result_cache_key;
Expand All @@ -53,6 +56,8 @@ use crate::sessions::QueryContext;
use crate::sql::executor::PhysicalPlan;
use crate::sql::executor::PhysicalPlanBuilder;
use crate::sql::optimizer::SExpr;
use crate::sql::plans::CopyIntoTablePlan;
use crate::sql::plans::InsertValue;
use crate::sql::plans::Plan;

pub struct ExplainInterpreter {
Expand Down Expand Up @@ -89,8 +94,8 @@ impl Interpreter for ExplainInterpreter {
self.explain_query(s_expr, metadata, bind_context, formatted_ast)
.await?
}
Plan::Insert(insert_plan) => insert_plan.explain(self.config.verbose).await?,
Plan::Replace(replace_plan) => replace_plan.explain(self.config.verbose).await?,
Plan::Insert(insert_plan) => self.explain_insert(&*insert_plan).await?,
Plan::Replace(replace_plan) => self.explain_replace(&*replace_plan).await?,
Plan::CreateTable(plan) => match &plan.as_select {
Some(box Plan::Query {
s_expr,
Expand Down Expand Up @@ -482,4 +487,162 @@ impl ExplainInterpreter {
let formatted_plan = StringType::from_data(line_split_result);
Ok(vec![DataBlock::new_from_columns(vec![formatted_plan])])
}

#[async_backtrace::framed]
pub async fn explain_insert(&self, insert: &Insert) -> Result<Vec<DataBlock>> {
let Insert {
catalog,
database,
table,
schema,
overwrite,
source,
// table_info only used create table as select.
table_info: _,
} = insert;

let table_name = format!("{}.{}.{}", catalog, database, table);
let inserted_columns = schema
.fields
.iter()
.map(|field| format!("{}.{} (#{})", table, field.name, field.column_id))
.collect::<Vec<_>>()
.join(",");

let children = vec![
FormatTreeNode::new(format!("table: {table_name}")),
FormatTreeNode::new(format!("inserted columns: [{inserted_columns}]")),
FormatTreeNode::new(format!("overwrite: {overwrite}")),
];

let formatted_plan = self
.format_insert_source("InsertPlan", source, children)
.await?;

Ok(vec![DataBlock::concat(&formatted_plan)?])
}

#[async_backtrace::framed]
pub async fn explain_replace(&self, replace: &Replace) -> Result<Vec<DataBlock>> {
let Replace {
catalog,
database,
table,
source,
on_conflict_fields,
..
} = replace;

let table_name = format!("{}.{}.{}", catalog, database, table);
let on_columns = on_conflict_fields
.iter()
.map(|field| format!("{}.{} (#{})", table, field.name, field.column_id))
.collect::<Vec<_>>()
.join(",");

let children = vec![
FormatTreeNode::new(format!("table: {table_name}")),
FormatTreeNode::new(format!("on columns: [{on_columns}]")),
];

let formatted_plan = self
.format_insert_source("ReplacePlan", source, children)
.await?;

Ok(vec![DataBlock::concat(&formatted_plan)?])
}

pub(crate) async fn format_insert_source(
&self,
plan_name: &str,
source: &InsertInputSource,
mut children: Vec<FormatTreeNode>,
) -> Result<Vec<DataBlock>> {
let (plan_name, sub_plan) = match source {
InsertInputSource::SelectPlan(plan) => {
if let Plan::Query {
s_expr,
metadata,
bind_context,
..
} = &**plan
{
let plan_name = format!("{plan_name} (subquery)");
let sub_plan = self
.explain_query(&s_expr, metadata, &*bind_context, &None)
.await?;
(plan_name, sub_plan)
} else {
(String::new(), vec![])
}
}
InsertInputSource::Values(values) => match values {
InsertValue::Values { .. } => {
let plan_name = format!("{plan_name} (values):");
(plan_name, vec![])
}
InsertValue::RawValues { .. } => {
let plan_name = format!("{plan_name} (rawvalues):");
(plan_name, vec![])
}
},
InsertInputSource::Stage(plan) => match *plan.clone() {
Plan::CopyIntoTable(copy_plan) => {
let plan_name = format!("{plan_name} (stage):");
let CopyIntoTablePlan {
no_file_to_copy,
from_attachment,
required_values_schema,
required_source_schema,
write_mode,
validation_mode,
force,
stage_table_info,
enable_distributed,
..
} = &*copy_plan;
let required_values_schema = required_values_schema
.fields()
.iter()
.map(|field| field.name().to_string())
.collect::<Vec<_>>()
.join(",");
let required_source_schema = required_source_schema
.fields()
.iter()
.map(|field| field.name().to_string())
.collect::<Vec<_>>()
.join(",");
let stage_node = vec![
FormatTreeNode::new(format!("no_file_to_copy: {no_file_to_copy}")),
FormatTreeNode::new(format!("from_attachment: {from_attachment}")),
FormatTreeNode::new(format!(
"required_values_schema: [{required_values_schema}]"
)),
FormatTreeNode::new(format!(
"required_source_schema: [{required_source_schema}]"
)),
FormatTreeNode::new(format!("write_mode: {write_mode}")),
FormatTreeNode::new(format!("validation_mode: {validation_mode}")),
FormatTreeNode::new(format!("force: {force}")),
FormatTreeNode::new(format!("stage_table_info: {stage_table_info}")),
FormatTreeNode::new(format!("enable_distributed: {enable_distributed}")),
];
children.extend(stage_node);
(plan_name, vec![])
}
_ => unreachable!("plan in InsertInputSource::Stag must be CopyIntoTable"),
},
};

let plan = FormatTreeNode::with_children(plan_name, children).format_pretty()?;
let mut result = vec![DataBlock::new_from_columns(vec![StringType::from_data(
plan.lines().collect(),
)])];

if !sub_plan.is_empty() {
result.extend(sub_plan);
}
Ok(result)
}
}
133 changes: 0 additions & 133 deletions src/query/sql/src/planner/plans/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,7 @@

use std::sync::Arc;

use databend_common_ast::ast::FormatTreeNode;
use databend_common_expression::types::StringType;
use databend_common_expression::DataBlock;
use databend_common_expression::DataSchemaRef;
use databend_common_expression::FromData;
use databend_common_expression::Scalar;
use databend_common_expression::TableSchemaRef;
use databend_common_meta_app::schema::TableInfo;
Expand All @@ -27,7 +23,6 @@ use serde::Deserialize;
use serde::Serialize;

use super::Plan;
use crate::plans::CopyIntoTablePlan;

#[derive(Clone, Debug, EnumAsInner)]
pub enum InsertInputSource {
Expand Down Expand Up @@ -76,134 +71,6 @@ impl Insert {
pub fn has_select_plan(&self) -> bool {
matches!(&self.source, InsertInputSource::SelectPlan(_))
}

#[async_backtrace::framed]
pub async fn explain(
&self,
verbose: bool,
) -> databend_common_exception::Result<Vec<DataBlock>> {
let mut result = vec![];

let Insert {
catalog,
database,
table,
schema,
overwrite,
// table_info only used create table as select.
table_info: _,
source,
} = self;

let table_name = format!("{}.{}.{}", catalog, database, table);
let inserted_columns = schema
.fields
.iter()
.map(|field| format!("{}.{} (#{})", table, field.name, field.column_id))
.collect::<Vec<_>>()
.join(",");

let children = vec![
FormatTreeNode::new(format!("table: {table_name}")),
FormatTreeNode::new(format!("inserted columns: [{inserted_columns}]")),
FormatTreeNode::new(format!("overwrite: {overwrite}")),
];

let formatted_plan = format_insert_source("InsertPlan", source, verbose, children)?;

let line_split_result: Vec<&str> = formatted_plan.lines().collect();
let formatted_plan = StringType::from_data(line_split_result);
result.push(DataBlock::new_from_columns(vec![formatted_plan]));
Ok(vec![DataBlock::concat(&result)?])
}
}

pub(crate) fn format_insert_source(
plan_name: &str,
source: &InsertInputSource,
verbose: bool,
mut children: Vec<FormatTreeNode>,
) -> databend_common_exception::Result<String> {
match source {
InsertInputSource::SelectPlan(plan) => {
if let Plan::Query {
s_expr, metadata, ..
} = &**plan
{
let metadata = &*metadata.read();
let sub_tree = s_expr.to_format_tree(metadata, verbose)?;
children.push(sub_tree);

return Ok(FormatTreeNode::with_children(
format!("{plan_name} (subquery):"),
children,
)
.format_pretty()?);
}
Ok(String::new())
}
InsertInputSource::Values(values) => match values {
InsertValue::Values { .. } => Ok(FormatTreeNode::with_children(
format!("{plan_name} (values):"),
children,
)
.format_pretty()?),
InsertValue::RawValues { .. } => Ok(FormatTreeNode::with_children(
format!("{plan_name} (rawvalues):"),
children,
)
.format_pretty()?),
},
InsertInputSource::Stage(plan) => match *plan.clone() {
Plan::CopyIntoTable(copy_plan) => {
let CopyIntoTablePlan {
no_file_to_copy,
from_attachment,
required_values_schema,
required_source_schema,
write_mode,
validation_mode,
force,
stage_table_info,
enable_distributed,
..
} = &*copy_plan;
let required_values_schema = required_values_schema
.fields()
.iter()
.map(|field| field.name().to_string())
.collect::<Vec<_>>()
.join(",");
let required_source_schema = required_source_schema
.fields()
.iter()
.map(|field| field.name().to_string())
.collect::<Vec<_>>()
.join(",");
let stage_node = vec![
FormatTreeNode::new(format!("no_file_to_copy: {no_file_to_copy}")),
FormatTreeNode::new(format!("from_attachment: {from_attachment}")),
FormatTreeNode::new(format!(
"required_values_schema: [{required_values_schema}]"
)),
FormatTreeNode::new(format!(
"required_source_schema: [{required_source_schema}]"
)),
FormatTreeNode::new(format!("write_mode: {write_mode}")),
FormatTreeNode::new(format!("validation_mode: {validation_mode}")),
FormatTreeNode::new(format!("force: {force}")),
FormatTreeNode::new(format!("stage_table_info: {stage_table_info}")),
FormatTreeNode::new(format!("enable_distributed: {enable_distributed}")),
];
children.extend(stage_node);
Ok(
FormatTreeNode::with_children(format!("{plan_name} (stage):"), children)
.format_pretty()?,
)
}
_ => unreachable!("plan in InsertInputSource::Stag must be CopyIntoTable"),
},
}
}

impl std::fmt::Debug for Insert {
Expand Down
Loading
Loading