Skip to content

Commit

Permalink
chore(query): improve insert and replace explain
Browse files Browse the repository at this point in the history
  • Loading branch information
Dousir9 committed Sep 10, 2024
1 parent 9ac1368 commit ecd2556
Show file tree
Hide file tree
Showing 3 changed files with 165 additions and 175 deletions.
167 changes: 165 additions & 2 deletions src/query/service/src/interpreters/interpreter_explain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@ use databend_common_pipeline_core::ExecutionInfo;
use databend_common_sql::binder::ExplainConfig;
use databend_common_sql::executor::format_partial_tree;
use databend_common_sql::optimizer::ColumnSet;
use databend_common_sql::plans::Insert;
use databend_common_sql::plans::InsertInputSource;
use databend_common_sql::plans::Mutation;
use databend_common_sql::plans::Replace;
use databend_common_sql::BindContext;
use databend_common_sql::MetadataRef;
use databend_common_storages_result_cache::gen_result_cache_key;
Expand All @@ -53,6 +56,8 @@ use crate::sessions::QueryContext;
use crate::sql::executor::PhysicalPlan;
use crate::sql::executor::PhysicalPlanBuilder;
use crate::sql::optimizer::SExpr;
use crate::sql::plans::CopyIntoTablePlan;
use crate::sql::plans::InsertValue;
use crate::sql::plans::Plan;

pub struct ExplainInterpreter {
Expand Down Expand Up @@ -89,8 +94,8 @@ impl Interpreter for ExplainInterpreter {
self.explain_query(s_expr, metadata, bind_context, formatted_ast)
.await?
}
Plan::Insert(insert_plan) => insert_plan.explain(self.config.verbose).await?,
Plan::Replace(replace_plan) => replace_plan.explain(self.config.verbose).await?,
Plan::Insert(insert_plan) => self.explain_insert(&*insert_plan).await?,
Plan::Replace(replace_plan) => self.explain_replace(&*replace_plan).await?,
Plan::CreateTable(plan) => match &plan.as_select {
Some(box Plan::Query {
s_expr,
Expand Down Expand Up @@ -482,4 +487,162 @@ impl ExplainInterpreter {
let formatted_plan = StringType::from_data(line_split_result);
Ok(vec![DataBlock::new_from_columns(vec![formatted_plan])])
}

#[async_backtrace::framed]
pub async fn explain_insert(&self, insert: &Insert) -> Result<Vec<DataBlock>> {
let Insert {
catalog,
database,
table,
schema,
overwrite,
source,
// table_info only used create table as select.
table_info: _,
} = insert;

let table_name = format!("{}.{}.{}", catalog, database, table);
let inserted_columns = schema
.fields
.iter()
.map(|field| format!("{}.{} (#{})", table, field.name, field.column_id))
.collect::<Vec<_>>()
.join(",");

let children = vec![
FormatTreeNode::new(format!("table: {table_name}")),
FormatTreeNode::new(format!("inserted columns: [{inserted_columns}]")),
FormatTreeNode::new(format!("overwrite: {overwrite}")),
];

let formatted_plan = self
.format_insert_source("InsertPlan", source, children)
.await?;

Ok(vec![DataBlock::concat(&formatted_plan)?])
}

#[async_backtrace::framed]
pub async fn explain_replace(&self, replace: &Replace) -> Result<Vec<DataBlock>> {
let Replace {
catalog,
database,
table,
source,
on_conflict_fields,
..
} = replace;

let table_name = format!("{}.{}.{}", catalog, database, table);
let on_columns = on_conflict_fields
.iter()
.map(|field| format!("{}.{} (#{})", table, field.name, field.column_id))
.collect::<Vec<_>>()
.join(",");

let children = vec![
FormatTreeNode::new(format!("table: {table_name}")),
FormatTreeNode::new(format!("on columns: [{on_columns}]")),
];

let formatted_plan = self
.format_insert_source("ReplacePlan", source, children)
.await?;

Ok(vec![DataBlock::concat(&formatted_plan)?])
}

pub(crate) async fn format_insert_source(
&self,
plan_name: &str,
source: &InsertInputSource,
mut children: Vec<FormatTreeNode>,
) -> Result<Vec<DataBlock>> {
let (plan_name, sub_plan) = match source {
InsertInputSource::SelectPlan(plan) => {
if let Plan::Query {
s_expr,
metadata,
bind_context,
..
} = &**plan
{
let plan_name = format!("{plan_name} (subquery)");
let sub_plan = self
.explain_query(&s_expr, metadata, &*bind_context, &None)
.await?;
(plan_name, sub_plan)
} else {
(String::new(), vec![])
}
}
InsertInputSource::Values(values) => match values {
InsertValue::Values { .. } => {
let plan_name = format!("{plan_name} (values):");
(plan_name, vec![])
}
InsertValue::RawValues { .. } => {
let plan_name = format!("{plan_name} (rawvalues):");
(plan_name, vec![])
}
},
InsertInputSource::Stage(plan) => match *plan.clone() {
Plan::CopyIntoTable(copy_plan) => {
let plan_name = format!("{plan_name} (stage):");
let CopyIntoTablePlan {
no_file_to_copy,
from_attachment,
required_values_schema,
required_source_schema,
write_mode,
validation_mode,
force,
stage_table_info,
enable_distributed,
..
} = &*copy_plan;
let required_values_schema = required_values_schema
.fields()
.iter()
.map(|field| field.name().to_string())
.collect::<Vec<_>>()
.join(",");
let required_source_schema = required_source_schema
.fields()
.iter()
.map(|field| field.name().to_string())
.collect::<Vec<_>>()
.join(",");
let stage_node = vec![
FormatTreeNode::new(format!("no_file_to_copy: {no_file_to_copy}")),
FormatTreeNode::new(format!("from_attachment: {from_attachment}")),
FormatTreeNode::new(format!(
"required_values_schema: [{required_values_schema}]"
)),
FormatTreeNode::new(format!(
"required_source_schema: [{required_source_schema}]"
)),
FormatTreeNode::new(format!("write_mode: {write_mode}")),
FormatTreeNode::new(format!("validation_mode: {validation_mode}")),
FormatTreeNode::new(format!("force: {force}")),
FormatTreeNode::new(format!("stage_table_info: {stage_table_info}")),
FormatTreeNode::new(format!("enable_distributed: {enable_distributed}")),
];
children.extend(stage_node);
(plan_name, vec![])
}
_ => unreachable!("plan in InsertInputSource::Stag must be CopyIntoTable"),
},
};

let plan = FormatTreeNode::with_children(plan_name, children).format_pretty()?;
let mut result = vec![DataBlock::new_from_columns(vec![StringType::from_data(
plan.lines().collect(),
)])];

if !sub_plan.is_empty() {
result.extend(sub_plan);
}
Ok(result)
}
}
133 changes: 0 additions & 133 deletions src/query/sql/src/planner/plans/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,7 @@

use std::sync::Arc;

use databend_common_ast::ast::FormatTreeNode;
use databend_common_expression::types::StringType;
use databend_common_expression::DataBlock;
use databend_common_expression::DataSchemaRef;
use databend_common_expression::FromData;
use databend_common_expression::Scalar;
use databend_common_expression::TableSchemaRef;
use databend_common_meta_app::schema::TableInfo;
Expand All @@ -27,7 +23,6 @@ use serde::Deserialize;
use serde::Serialize;

use super::Plan;
use crate::plans::CopyIntoTablePlan;

#[derive(Clone, Debug, EnumAsInner)]
pub enum InsertInputSource {
Expand Down Expand Up @@ -76,134 +71,6 @@ impl Insert {
pub fn has_select_plan(&self) -> bool {
matches!(&self.source, InsertInputSource::SelectPlan(_))
}

#[async_backtrace::framed]
pub async fn explain(
&self,
verbose: bool,
) -> databend_common_exception::Result<Vec<DataBlock>> {
let mut result = vec![];

let Insert {
catalog,
database,
table,
schema,
overwrite,
// table_info only used create table as select.
table_info: _,
source,
} = self;

let table_name = format!("{}.{}.{}", catalog, database, table);
let inserted_columns = schema
.fields
.iter()
.map(|field| format!("{}.{} (#{})", table, field.name, field.column_id))
.collect::<Vec<_>>()
.join(",");

let children = vec![
FormatTreeNode::new(format!("table: {table_name}")),
FormatTreeNode::new(format!("inserted columns: [{inserted_columns}]")),
FormatTreeNode::new(format!("overwrite: {overwrite}")),
];

let formatted_plan = format_insert_source("InsertPlan", source, verbose, children)?;

let line_split_result: Vec<&str> = formatted_plan.lines().collect();
let formatted_plan = StringType::from_data(line_split_result);
result.push(DataBlock::new_from_columns(vec![formatted_plan]));
Ok(vec![DataBlock::concat(&result)?])
}
}

pub(crate) fn format_insert_source(
plan_name: &str,
source: &InsertInputSource,
verbose: bool,
mut children: Vec<FormatTreeNode>,
) -> databend_common_exception::Result<String> {
match source {
InsertInputSource::SelectPlan(plan) => {
if let Plan::Query {
s_expr, metadata, ..
} = &**plan
{
let metadata = &*metadata.read();
let sub_tree = s_expr.to_format_tree(metadata, verbose)?;
children.push(sub_tree);

return Ok(FormatTreeNode::with_children(
format!("{plan_name} (subquery):"),
children,
)
.format_pretty()?);
}
Ok(String::new())
}
InsertInputSource::Values(values) => match values {
InsertValue::Values { .. } => Ok(FormatTreeNode::with_children(
format!("{plan_name} (values):"),
children,
)
.format_pretty()?),
InsertValue::RawValues { .. } => Ok(FormatTreeNode::with_children(
format!("{plan_name} (rawvalues):"),
children,
)
.format_pretty()?),
},
InsertInputSource::Stage(plan) => match *plan.clone() {
Plan::CopyIntoTable(copy_plan) => {
let CopyIntoTablePlan {
no_file_to_copy,
from_attachment,
required_values_schema,
required_source_schema,
write_mode,
validation_mode,
force,
stage_table_info,
enable_distributed,
..
} = &*copy_plan;
let required_values_schema = required_values_schema
.fields()
.iter()
.map(|field| field.name().to_string())
.collect::<Vec<_>>()
.join(",");
let required_source_schema = required_source_schema
.fields()
.iter()
.map(|field| field.name().to_string())
.collect::<Vec<_>>()
.join(",");
let stage_node = vec![
FormatTreeNode::new(format!("no_file_to_copy: {no_file_to_copy}")),
FormatTreeNode::new(format!("from_attachment: {from_attachment}")),
FormatTreeNode::new(format!(
"required_values_schema: [{required_values_schema}]"
)),
FormatTreeNode::new(format!(
"required_source_schema: [{required_source_schema}]"
)),
FormatTreeNode::new(format!("write_mode: {write_mode}")),
FormatTreeNode::new(format!("validation_mode: {validation_mode}")),
FormatTreeNode::new(format!("force: {force}")),
FormatTreeNode::new(format!("stage_table_info: {stage_table_info}")),
FormatTreeNode::new(format!("enable_distributed: {enable_distributed}")),
];
children.extend(stage_node);
Ok(
FormatTreeNode::with_children(format!("{plan_name} (stage):"), children)
.format_pretty()?,
)
}
_ => unreachable!("plan in InsertInputSource::Stag must be CopyIntoTable"),
},
}
}

impl std::fmt::Debug for Insert {
Expand Down
Loading

0 comments on commit ecd2556

Please sign in to comment.