From 965c5e50516f8f5b8364db1e2d896d9e351ef6a7 Mon Sep 17 00:00:00 2001 From: Evgeny Maruschenko Date: Fri, 29 Sep 2023 12:07:14 +1000 Subject: [PATCH 1/7] Add naive implementation of eliminate_nested_union --- .../optimizer/src/eliminate_nested_union.rs | 136 ++++++++++++++++++ datafusion/optimizer/src/lib.rs | 1 + 2 files changed, 137 insertions(+) create mode 100644 datafusion/optimizer/src/eliminate_nested_union.rs diff --git a/datafusion/optimizer/src/eliminate_nested_union.rs b/datafusion/optimizer/src/eliminate_nested_union.rs new file mode 100644 index 000000000000..5bcf2f9aa530 --- /dev/null +++ b/datafusion/optimizer/src/eliminate_nested_union.rs @@ -0,0 +1,136 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Optimizer rule [TODO] +use crate::{OptimizerConfig, OptimizerRule}; +use datafusion_common::Result; +use datafusion_expr::logical_plan::{LogicalPlan, Union}; + +use crate::optimizer::ApplyOrder; +use std::sync::Arc; + +#[derive(Default)] +/// [TODO] add description +pub struct EliminateNestedUnion; + +impl EliminateNestedUnion { + #[allow(missing_docs)] + pub fn new() -> Self { + Self {} + } +} + +impl OptimizerRule for EliminateNestedUnion { + fn try_optimize( + &self, + plan: &LogicalPlan, + config: &dyn OptimizerConfig, + ) -> Result> { + match plan { + LogicalPlan::Union(union) => { + let Union { inputs, schema } = union; + + let inputs = inputs + .into_iter() + .flat_map(|plan| match Arc::as_ref(plan) { + LogicalPlan::Union(Union { inputs, .. }) => inputs.clone(), + _ => vec![Arc::clone(plan)], + }) + .map(|plan| Ok(plan)) + .collect::>>()?; + + let schema = schema.clone(); + + Ok(Some(LogicalPlan::Union(Union { inputs, schema }))) + } + _ => Ok(None), + } + } + + fn name(&self) -> &str { + "eliminate_nested_union" + } + + fn apply_order(&self) -> Option { + Some(ApplyOrder::BottomUp) + } +} + +#[cfg(test)] +mod tests { + use datafusion_expr::Union; + + use super::*; + use crate::test::*; + + fn assert_optimized_plan_equal(plan: &LogicalPlan, expected: &str) -> Result<()> { + assert_optimized_plan_eq(Arc::new(EliminateNestedUnion::new()), plan, expected) + } + + #[test] + fn eliminate_nothing() -> Result<()> { + let table_1 = test_table_scan_with_name("table_1")?; + let table_2 = test_table_scan_with_name("table_2")?; + + let schema = table_1.schema().clone(); + + let plan = LogicalPlan::Union(Union { + inputs: vec![Arc::new(table_1), Arc::new(table_2)], + schema, + }); + + let expected = "\ + Union\ + \n TableScan: table_1\ + \n TableScan: table_2"; + assert_optimized_plan_equal(&plan, expected) + } + + #[test] + fn eliminate_nested_union() -> Result<()> { + let table_1 = Arc::new(test_table_scan_with_name("table_1")?); + let table_2 = Arc::new(test_table_scan_with_name("table_2")?); + let table_3 = Arc::new(test_table_scan_with_name("table_3")?); + let table_4 = Arc::new(test_table_scan_with_name("table_4")?); + + let schema = table_1.schema().clone(); + + let plan = LogicalPlan::Union(Union { + inputs: vec![ + table_1, + Arc::new(LogicalPlan::Union(Union { + inputs: vec![ + table_2, + Arc::new(LogicalPlan::Union(Union { + inputs: vec![table_3, table_4], + schema: schema.clone(), + })), + ], + schema: schema.clone(), + })), + ], + schema: schema.clone(), + }); + let expected = "\ + Union\ + \n TableScan: table_1\ + \n TableScan: table_2\ + \n TableScan: table_3\ + \n TableScan: table_4"; + assert_optimized_plan_equal(&plan, expected) + } +} diff --git a/datafusion/optimizer/src/lib.rs b/datafusion/optimizer/src/lib.rs index 1d12ca7e3950..b29ebe25bbb7 100644 --- a/datafusion/optimizer/src/lib.rs +++ b/datafusion/optimizer/src/lib.rs @@ -24,6 +24,7 @@ pub mod eliminate_duplicated_expr; pub mod eliminate_filter; pub mod eliminate_join; pub mod eliminate_limit; +pub mod eliminate_nested_union; pub mod eliminate_outer_join; pub mod eliminate_project; pub mod extract_equijoin_predicate; From 71967537b30dc8cc51eafd8bafd58c9953ed18b9 Mon Sep 17 00:00:00 2001 From: Evgeny Maruschenko Date: Fri, 29 Sep 2023 14:19:26 +1000 Subject: [PATCH 2/7] Remove union optimization from LogicalPlanBuilder::union --- datafusion/expr/src/logical_plan/builder.rs | 66 +++++------ .../optimizer/src/eliminate_nested_union.rs | 104 ++++++++++-------- datafusion/optimizer/src/optimizer.rs | 2 + 3 files changed, 92 insertions(+), 80 deletions(-) diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index 6171d43b37f5..023f82056c90 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -54,6 +54,7 @@ use std::any::Any; use std::cmp::Ordering; use std::collections::{HashMap, HashSet}; use std::convert::TryFrom; +use std::iter::zip; use std::sync::Arc; /// Default table name for unnamed table @@ -1196,39 +1197,36 @@ pub fn union(left_plan: LogicalPlan, right_plan: LogicalPlan) -> Result>>()? - .to_dfschema()?; + })?; + + Ok(DFField::new( + left_field.qualifier().cloned(), + left_field.name(), + data_type, + nullable, + )) + }) + .collect::>>()? + .to_dfschema()?; let inputs = vec![left_plan, right_plan] .into_iter() - .flat_map(|p| match p { - LogicalPlan::Union(Union { inputs, .. }) => inputs, - other_plan => vec![Arc::new(other_plan)], - }) .map(|p| { let plan = coerce_plan_expr_for_schema(&p, &union_schema)?; match plan { @@ -1596,7 +1594,7 @@ mod tests { } #[test] - fn plan_builder_union_combined_single_union() -> Result<()> { + fn plan_builder_union() -> Result<()> { let plan = table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3, 4]))?; @@ -1609,9 +1607,11 @@ mod tests { // output has only one union let expected = "Union\ - \n TableScan: employee_csv projection=[state, salary]\ - \n TableScan: employee_csv projection=[state, salary]\ - \n TableScan: employee_csv projection=[state, salary]\ + \n Union\ + \n Union\ + \n TableScan: employee_csv projection=[state, salary]\ + \n TableScan: employee_csv projection=[state, salary]\ + \n TableScan: employee_csv projection=[state, salary]\ \n TableScan: employee_csv projection=[state, salary]"; assert_eq!(expected, format!("{plan:?}")); @@ -1620,7 +1620,7 @@ mod tests { } #[test] - fn plan_builder_union_distinct_combined_single_union() -> Result<()> { + fn plan_builder_union_distinct() -> Result<()> { let plan = table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3, 4]))?; diff --git a/datafusion/optimizer/src/eliminate_nested_union.rs b/datafusion/optimizer/src/eliminate_nested_union.rs index 5bcf2f9aa530..83096e966f62 100644 --- a/datafusion/optimizer/src/eliminate_nested_union.rs +++ b/datafusion/optimizer/src/eliminate_nested_union.rs @@ -15,16 +15,20 @@ // specific language governing permissions and limitations // under the License. -//! Optimizer rule [TODO] +//! Optimizer rule to replace nested unions to single union. use crate::{OptimizerConfig, OptimizerRule}; use datafusion_common::Result; -use datafusion_expr::logical_plan::{LogicalPlan, Union}; +use datafusion_expr::{ + builder::project_with_column_index, + expr_rewriter::coerce_plan_expr_for_schema, + logical_plan::{LogicalPlan, Projection, Union}, +}; use crate::optimizer::ApplyOrder; use std::sync::Arc; #[derive(Default)] -/// [TODO] add description +/// An optimization rule that replaces nested unions with a single union. pub struct EliminateNestedUnion; impl EliminateNestedUnion { @@ -38,24 +42,39 @@ impl OptimizerRule for EliminateNestedUnion { fn try_optimize( &self, plan: &LogicalPlan, - config: &dyn OptimizerConfig, + _config: &dyn OptimizerConfig, ) -> Result> { match plan { LogicalPlan::Union(union) => { let Union { inputs, schema } = union; + let union_schema = schema.clone(); + let inputs = inputs .into_iter() .flat_map(|plan| match Arc::as_ref(plan) { LogicalPlan::Union(Union { inputs, .. }) => inputs.clone(), _ => vec![Arc::clone(plan)], }) - .map(|plan| Ok(plan)) + .map(|plan| { + let plan = coerce_plan_expr_for_schema(&plan, &union_schema)?; + match plan { + LogicalPlan::Projection(Projection { + expr, input, .. + }) => Ok(Arc::new(project_with_column_index( + expr, + input, + union_schema.clone(), + )?)), + _ => Ok(Arc::new(plan)), + } + }) .collect::>>()?; - let schema = schema.clone(); - - Ok(Some(LogicalPlan::Union(Union { inputs, schema }))) + Ok(Some(LogicalPlan::Union(Union { + inputs, + schema: union_schema, + }))) } _ => Ok(None), } @@ -72,10 +91,18 @@ impl OptimizerRule for EliminateNestedUnion { #[cfg(test)] mod tests { - use datafusion_expr::Union; - use super::*; use crate::test::*; + use arrow::datatypes::{DataType, Field, Schema}; + use datafusion_expr::logical_plan::table_scan; + + fn schema() -> Schema { + Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("key", DataType::Utf8, false), + Field::new("value", DataType::Int32, false), + ]) + } fn assert_optimized_plan_equal(plan: &LogicalPlan, expected: &str) -> Result<()> { assert_optimized_plan_eq(Arc::new(EliminateNestedUnion::new()), plan, expected) @@ -83,54 +110,37 @@ mod tests { #[test] fn eliminate_nothing() -> Result<()> { - let table_1 = test_table_scan_with_name("table_1")?; - let table_2 = test_table_scan_with_name("table_2")?; - - let schema = table_1.schema().clone(); + let plan_builder = table_scan(Some("table"), &schema(), None)?; - let plan = LogicalPlan::Union(Union { - inputs: vec![Arc::new(table_1), Arc::new(table_2)], - schema, - }); + let plan = plan_builder + .clone() + .union(plan_builder.clone().build()?)? + .build()?; let expected = "\ Union\ - \n TableScan: table_1\ - \n TableScan: table_2"; + \n TableScan: table\ + \n TableScan: table"; assert_optimized_plan_equal(&plan, expected) } #[test] fn eliminate_nested_union() -> Result<()> { - let table_1 = Arc::new(test_table_scan_with_name("table_1")?); - let table_2 = Arc::new(test_table_scan_with_name("table_2")?); - let table_3 = Arc::new(test_table_scan_with_name("table_3")?); - let table_4 = Arc::new(test_table_scan_with_name("table_4")?); - - let schema = table_1.schema().clone(); - - let plan = LogicalPlan::Union(Union { - inputs: vec![ - table_1, - Arc::new(LogicalPlan::Union(Union { - inputs: vec![ - table_2, - Arc::new(LogicalPlan::Union(Union { - inputs: vec![table_3, table_4], - schema: schema.clone(), - })), - ], - schema: schema.clone(), - })), - ], - schema: schema.clone(), - }); + let plan_builder = table_scan(Some("table"), &schema(), None)?; + + let plan = plan_builder + .clone() + .union(plan_builder.clone().build()?)? + .union(plan_builder.clone().build()?)? + .union(plan_builder.clone().build()?)? + .build()?; + let expected = "\ Union\ - \n TableScan: table_1\ - \n TableScan: table_2\ - \n TableScan: table_3\ - \n TableScan: table_4"; + \n TableScan: table\ + \n TableScan: table\ + \n TableScan: table\ + \n TableScan: table"; assert_optimized_plan_equal(&plan, expected) } } diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs index d3bdd47c5cb3..747db44b0661 100644 --- a/datafusion/optimizer/src/optimizer.rs +++ b/datafusion/optimizer/src/optimizer.rs @@ -24,6 +24,7 @@ use crate::eliminate_duplicated_expr::EliminateDuplicatedExpr; use crate::eliminate_filter::EliminateFilter; use crate::eliminate_join::EliminateJoin; use crate::eliminate_limit::EliminateLimit; +use crate::eliminate_nested_union::EliminateNestedUnion; use crate::eliminate_outer_join::EliminateOuterJoin; use crate::eliminate_project::EliminateProjection; use crate::extract_equijoin_predicate::ExtractEquijoinPredicate; @@ -227,6 +228,7 @@ impl Optimizer { Arc::new(DecorrelatePredicateSubquery::new()), Arc::new(ScalarSubqueryToJoin::new()), Arc::new(ExtractEquijoinPredicate::new()), + Arc::new(EliminateNestedUnion::new()), // simplify expressions does not simplify expressions in subqueries, so we // run it again after running the optimizations that potentially converted // subqueries to joins From 63645b215e2a7b332e73d063f8771580ecc1ce32 Mon Sep 17 00:00:00 2001 From: Evgeny Maruschenko Date: Fri, 29 Sep 2023 16:58:50 +1000 Subject: [PATCH 3/7] Fix propagate_union_children_different_schema test --- datafusion/optimizer/src/propagate_empty_relation.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/datafusion/optimizer/src/propagate_empty_relation.rs b/datafusion/optimizer/src/propagate_empty_relation.rs index 4de7596b329c..d12ea3d19d29 100644 --- a/datafusion/optimizer/src/propagate_empty_relation.rs +++ b/datafusion/optimizer/src/propagate_empty_relation.rs @@ -182,6 +182,7 @@ fn empty_child(plan: &LogicalPlan) -> Result> { #[cfg(test)] mod tests { use crate::eliminate_filter::EliminateFilter; + use crate::eliminate_nested_union::EliminateNestedUnion; use crate::optimizer::Optimizer; use crate::test::{ assert_optimized_plan_eq, test_table_scan, test_table_scan_fields, @@ -209,6 +210,7 @@ mod tests { fn observe(_plan: &LogicalPlan, _rule: &dyn OptimizerRule) {} let optimizer = Optimizer::with_rules(vec![ Arc::new(EliminateFilter::new()), + Arc::new(EliminateNestedUnion::new()), Arc::new(PropagateEmptyRelation::new()), ]); let config = &mut OptimizerContext::new() From 879d0f3fc961e1cd3b12c08938081993616d52e6 Mon Sep 17 00:00:00 2001 From: Evgeny Maruschenko Date: Fri, 29 Sep 2023 18:08:43 +1000 Subject: [PATCH 4/7] Add implementation of eliminate_one_union --- datafusion/expr/src/logical_plan/builder.rs | 1 - .../optimizer/src/eliminate_one_union.rs | 125 ++++++++++++++++++ datafusion/optimizer/src/lib.rs | 1 + datafusion/optimizer/src/optimizer.rs | 2 + .../optimizer/src/propagate_empty_relation.rs | 31 ++--- datafusion/optimizer/src/test/mod.rs | 19 +++ 6 files changed, 158 insertions(+), 21 deletions(-) create mode 100644 datafusion/optimizer/src/eliminate_one_union.rs diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index 023f82056c90..3d62bcf55d6c 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -1605,7 +1605,6 @@ mod tests { .union(plan.build()?)? .build()?; - // output has only one union let expected = "Union\ \n Union\ \n Union\ diff --git a/datafusion/optimizer/src/eliminate_one_union.rs b/datafusion/optimizer/src/eliminate_one_union.rs new file mode 100644 index 000000000000..679c81f46aca --- /dev/null +++ b/datafusion/optimizer/src/eliminate_one_union.rs @@ -0,0 +1,125 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Optimizer rule to eliminate one union. +use crate::{OptimizerConfig, OptimizerRule}; +use datafusion_common::Result; +use datafusion_expr::logical_plan::{LogicalPlan, Union}; + +use crate::optimizer::ApplyOrder; + +#[derive(Default)] +/// An optimization rule that eliminates union with one element. +pub struct EliminateOneUnion; + +impl EliminateOneUnion { + #[allow(missing_docs)] + pub fn new() -> Self { + Self {} + } +} + +impl OptimizerRule for EliminateOneUnion { + fn try_optimize( + &self, + plan: &LogicalPlan, + _config: &dyn OptimizerConfig, + ) -> Result> { + match plan { + LogicalPlan::Union(union) if union.inputs.len() == 1 => { + let Union { inputs, schema: _ } = union; + + Ok(inputs.first().map(|input| input.as_ref().clone())) + } + _ => Ok(None), + } + } + + fn name(&self) -> &str { + "eliminate_one_union" + } + + fn apply_order(&self) -> Option { + Some(ApplyOrder::TopDown) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::eliminate_filter::EliminateFilter; + use crate::propagate_empty_relation::PropagateEmptyRelation; + use crate::test::*; + use arrow::datatypes::{DataType, Field, Schema}; + use datafusion_common::ScalarValue; + use datafusion_expr::{logical_plan::table_scan, Expr}; + use std::sync::Arc; + + fn schema() -> Schema { + Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("key", DataType::Utf8, false), + Field::new("value", DataType::Int32, false), + ]) + } + + fn assert_optimized_plan_equal(plan: &LogicalPlan, expected: &str) -> Result<()> { + assert_optimized_plan_eq_with_rules( + vec![ + Arc::new(EliminateFilter::new()), + Arc::new(PropagateEmptyRelation::new()), + Arc::new(EliminateOneUnion::new()), + ], + plan, + expected, + ) + } + + #[test] + fn eliminate_nothing() -> Result<()> { + let plan_builder = table_scan(Some("table"), &schema(), None)?; + + let plan = plan_builder + .clone() + .union(plan_builder.clone().build()?)? + .build()?; + + let expected = "\ + Union\ + \n TableScan: table\ + \n TableScan: table"; + assert_optimized_plan_equal(&plan, expected) + } + + #[test] + fn eliminate_nested_union() -> Result<()> { + let plan_builder = table_scan(Some("table"), &schema(), None)?; + + let plan = plan_builder + .clone() + .union( + plan_builder + .clone() + .filter(Expr::Literal(ScalarValue::Boolean(Some(false))))? + .build()?, + )? + .build()?; + + let expected = "TableScan: table"; + assert_optimized_plan_equal(&plan, expected) + } +} diff --git a/datafusion/optimizer/src/lib.rs b/datafusion/optimizer/src/lib.rs index b29ebe25bbb7..ede0ac5c7164 100644 --- a/datafusion/optimizer/src/lib.rs +++ b/datafusion/optimizer/src/lib.rs @@ -25,6 +25,7 @@ pub mod eliminate_filter; pub mod eliminate_join; pub mod eliminate_limit; pub mod eliminate_nested_union; +pub mod eliminate_one_union; pub mod eliminate_outer_join; pub mod eliminate_project; pub mod extract_equijoin_predicate; diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs index 747db44b0661..a1db9dcc171d 100644 --- a/datafusion/optimizer/src/optimizer.rs +++ b/datafusion/optimizer/src/optimizer.rs @@ -25,6 +25,7 @@ use crate::eliminate_filter::EliminateFilter; use crate::eliminate_join::EliminateJoin; use crate::eliminate_limit::EliminateLimit; use crate::eliminate_nested_union::EliminateNestedUnion; +use crate::eliminate_one_union::EliminateOneUnion; use crate::eliminate_outer_join::EliminateOuterJoin; use crate::eliminate_project::EliminateProjection; use crate::extract_equijoin_predicate::ExtractEquijoinPredicate; @@ -243,6 +244,7 @@ impl Optimizer { Arc::new(PropagateEmptyRelation::new()), Arc::new(FilterNullJoinKeys::default()), Arc::new(EliminateOuterJoin::new()), + Arc::new(EliminateOneUnion::new()), // Filters can't be pushed down past Limits, we should do PushDownFilter after PushDownLimit Arc::new(PushDownLimit::new()), Arc::new(PushDownFilter::new()), diff --git a/datafusion/optimizer/src/propagate_empty_relation.rs b/datafusion/optimizer/src/propagate_empty_relation.rs index d12ea3d19d29..ec567d000d13 100644 --- a/datafusion/optimizer/src/propagate_empty_relation.rs +++ b/datafusion/optimizer/src/propagate_empty_relation.rs @@ -183,12 +183,10 @@ fn empty_child(plan: &LogicalPlan) -> Result> { mod tests { use crate::eliminate_filter::EliminateFilter; use crate::eliminate_nested_union::EliminateNestedUnion; - use crate::optimizer::Optimizer; use crate::test::{ - assert_optimized_plan_eq, test_table_scan, test_table_scan_fields, - test_table_scan_with_name, + assert_optimized_plan_eq, assert_optimized_plan_eq_with_rules, test_table_scan, + test_table_scan_fields, test_table_scan_with_name, }; - use crate::OptimizerContext; use arrow::datatypes::{DataType, Field, Schema}; use datafusion_common::{Column, DFField, DFSchema, ScalarValue}; use datafusion_expr::logical_plan::table_scan; @@ -207,22 +205,15 @@ mod tests { plan: &LogicalPlan, expected: &str, ) -> Result<()> { - fn observe(_plan: &LogicalPlan, _rule: &dyn OptimizerRule) {} - let optimizer = Optimizer::with_rules(vec![ - Arc::new(EliminateFilter::new()), - Arc::new(EliminateNestedUnion::new()), - Arc::new(PropagateEmptyRelation::new()), - ]); - let config = &mut OptimizerContext::new() - .with_max_passes(1) - .with_skip_failing_rules(false); - let optimized_plan = optimizer - .optimize(plan, config, observe) - .expect("failed to optimize plan"); - let formatted_plan = format!("{optimized_plan:?}"); - assert_eq!(formatted_plan, expected); - assert_eq!(plan.schema(), optimized_plan.schema()); - Ok(()) + assert_optimized_plan_eq_with_rules( + vec![ + Arc::new(EliminateFilter::new()), + Arc::new(EliminateNestedUnion::new()), + Arc::new(PropagateEmptyRelation::new()), + ], + &plan, + expected, + ) } #[test] diff --git a/datafusion/optimizer/src/test/mod.rs b/datafusion/optimizer/src/test/mod.rs index 7d334a80b682..3eac2317b849 100644 --- a/datafusion/optimizer/src/test/mod.rs +++ b/datafusion/optimizer/src/test/mod.rs @@ -169,6 +169,25 @@ pub fn assert_optimized_plan_eq( Ok(()) } +pub fn assert_optimized_plan_eq_with_rules( + rules: Vec>, + plan: &LogicalPlan, + expected: &str, +) -> Result<()> { + fn observe(_plan: &LogicalPlan, _rule: &dyn OptimizerRule) {} + let config = &mut OptimizerContext::new() + .with_max_passes(1) + .with_skip_failing_rules(false); + let optimizer = Optimizer::with_rules(rules); + let optimized_plan = optimizer + .optimize(plan, config, observe) + .expect("failed to optimize plan"); + let formatted_plan = format!("{optimized_plan:?}"); + assert_eq!(formatted_plan, expected); + assert_eq!(plan.schema(), optimized_plan.schema()); + Ok(()) +} + pub fn assert_optimized_plan_eq_display_indent( rule: Arc, plan: &LogicalPlan, From ace9c6d84e34a5bd8dee78e9c7ac41e6ead0341b Mon Sep 17 00:00:00 2001 From: Evgeny Maruschenko Date: Tue, 3 Oct 2023 10:26:27 +1000 Subject: [PATCH 5/7] Simplified eliminate_nested_union test --- .../optimizer/src/eliminate_one_union.rs | 37 ++++++++----------- 1 file changed, 16 insertions(+), 21 deletions(-) diff --git a/datafusion/optimizer/src/eliminate_one_union.rs b/datafusion/optimizer/src/eliminate_one_union.rs index 679c81f46aca..c343e2576c04 100644 --- a/datafusion/optimizer/src/eliminate_one_union.rs +++ b/datafusion/optimizer/src/eliminate_one_union.rs @@ -61,12 +61,13 @@ impl OptimizerRule for EliminateOneUnion { #[cfg(test)] mod tests { use super::*; - use crate::eliminate_filter::EliminateFilter; - use crate::propagate_empty_relation::PropagateEmptyRelation; use crate::test::*; use arrow::datatypes::{DataType, Field, Schema}; - use datafusion_common::ScalarValue; - use datafusion_expr::{logical_plan::table_scan, Expr}; + use datafusion_common::ToDFSchema; + use datafusion_expr::{ + expr_rewriter::coerce_plan_expr_for_schema, + logical_plan::{table_scan, Union}, + }; use std::sync::Arc; fn schema() -> Schema { @@ -79,11 +80,7 @@ mod tests { fn assert_optimized_plan_equal(plan: &LogicalPlan, expected: &str) -> Result<()> { assert_optimized_plan_eq_with_rules( - vec![ - Arc::new(EliminateFilter::new()), - Arc::new(PropagateEmptyRelation::new()), - Arc::new(EliminateOneUnion::new()), - ], + vec![Arc::new(EliminateOneUnion::new())], plan, expected, ) @@ -107,19 +104,17 @@ mod tests { #[test] fn eliminate_nested_union() -> Result<()> { - let plan_builder = table_scan(Some("table"), &schema(), None)?; - - let plan = plan_builder - .clone() - .union( - plan_builder - .clone() - .filter(Expr::Literal(ScalarValue::Boolean(Some(false))))? - .build()?, - )? - .build()?; + let table_plan = coerce_plan_expr_for_schema( + &table_scan(Some("table"), &schema(), None)?.build()?, + &schema().to_dfschema()?, + )?; + let schema = table_plan.schema().clone(); + let single_union_plan = LogicalPlan::Union(Union { + inputs: vec![Arc::new(table_plan)], + schema, + }); let expected = "TableScan: table"; - assert_optimized_plan_equal(&plan, expected) + assert_optimized_plan_equal(&single_union_plan, expected) } } From bd6c4d4cffac339e274d8543bbb316a16cc68bf1 Mon Sep 17 00:00:00 2001 From: Evgeny Maruschenko Date: Tue, 3 Oct 2023 12:42:31 +1000 Subject: [PATCH 6/7] Fix --- .../optimizer/src/eliminate_nested_union.rs | 125 +++++++++++++----- .../optimizer/src/eliminate_one_union.rs | 6 +- datafusion/optimizer/src/optimizer.rs | 5 +- datafusion/sql/tests/sql_integration.rs | 18 --- .../sqllogictest/test_files/explain.slt | 4 + 5 files changed, 104 insertions(+), 54 deletions(-) diff --git a/datafusion/optimizer/src/eliminate_nested_union.rs b/datafusion/optimizer/src/eliminate_nested_union.rs index 83096e966f62..7fd097489a42 100644 --- a/datafusion/optimizer/src/eliminate_nested_union.rs +++ b/datafusion/optimizer/src/eliminate_nested_union.rs @@ -18,13 +18,10 @@ //! Optimizer rule to replace nested unions to single union. use crate::{OptimizerConfig, OptimizerRule}; use datafusion_common::Result; -use datafusion_expr::{ - builder::project_with_column_index, - expr_rewriter::coerce_plan_expr_for_schema, - logical_plan::{LogicalPlan, Projection, Union}, -}; +use datafusion_expr::logical_plan::{LogicalPlan, Union}; use crate::optimizer::ApplyOrder; +use datafusion_expr::expr_rewriter::coerce_plan_expr_for_schema; use std::sync::Arc; #[derive(Default)] @@ -44,36 +41,27 @@ impl OptimizerRule for EliminateNestedUnion { plan: &LogicalPlan, _config: &dyn OptimizerConfig, ) -> Result> { + // TODO: Add optimization for nested distinct unions. match plan { - LogicalPlan::Union(union) => { - let Union { inputs, schema } = union; - - let union_schema = schema.clone(); - + LogicalPlan::Union(Union { inputs, schema }) => { let inputs = inputs .into_iter() - .flat_map(|plan| match Arc::as_ref(plan) { - LogicalPlan::Union(Union { inputs, .. }) => inputs.clone(), - _ => vec![Arc::clone(plan)], - }) - .map(|plan| { - let plan = coerce_plan_expr_for_schema(&plan, &union_schema)?; - match plan { - LogicalPlan::Projection(Projection { - expr, input, .. - }) => Ok(Arc::new(project_with_column_index( - expr, - input, - union_schema.clone(), - )?)), - _ => Ok(Arc::new(plan)), - } + .flat_map(|plan| match plan.as_ref() { + LogicalPlan::Union(Union { inputs, schema }) => inputs + .into_iter() + .map(|plan| { + Arc::new( + coerce_plan_expr_for_schema(plan, schema).unwrap(), + ) + }) + .collect::>(), + _ => vec![plan.clone()], }) - .collect::>>()?; + .collect::>(); Ok(Some(LogicalPlan::Union(Union { inputs, - schema: union_schema, + schema: schema.clone(), }))) } _ => Ok(None), @@ -94,13 +82,13 @@ mod tests { use super::*; use crate::test::*; use arrow::datatypes::{DataType, Field, Schema}; - use datafusion_expr::logical_plan::table_scan; + use datafusion_expr::{col, logical_plan::table_scan}; fn schema() -> Schema { Schema::new(vec![ Field::new("id", DataType::Int32, false), Field::new("key", DataType::Utf8, false), - Field::new("value", DataType::Int32, false), + Field::new("value", DataType::Float64, false), ]) } @@ -143,4 +131,81 @@ mod tests { \n TableScan: table"; assert_optimized_plan_equal(&plan, expected) } + + // We don't need to use project_with_column_index in logical optimizer, + // after LogicalPlanBuilder::union, we already have all equal expression aliases + #[test] + fn eliminate_nested_union_with_projection() -> Result<()> { + let plan_builder = table_scan(Some("table"), &schema(), None)?; + + let plan = plan_builder + .clone() + .union( + plan_builder + .clone() + .project(vec![col("id").alias("table_id"), col("key"), col("value")])? + .build()?, + )? + .union( + plan_builder + .clone() + .project(vec![col("id").alias("_id"), col("key"), col("value")])? + .build()?, + )? + .build()?; + + let expected = "Union\ + \n TableScan: table\ + \n Projection: table.id AS id, table.key, table.value\ + \n TableScan: table\ + \n Projection: table.id AS id, table.key, table.value\ + \n TableScan: table"; + assert_optimized_plan_equal(&plan, expected) + } + + #[test] + fn eliminate_nested_union_with_type_cast_projection() -> Result<()> { + let table_1 = table_scan( + Some("table_1"), + &Schema::new(vec![ + Field::new("id", DataType::Int64, false), + Field::new("key", DataType::Utf8, false), + Field::new("value", DataType::Float64, false), + ]), + None, + )?; + + let table_2 = table_scan( + Some("table_1"), + &Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("key", DataType::Utf8, false), + Field::new("value", DataType::Float32, false), + ]), + None, + )?; + + let table_3 = table_scan( + Some("table_1"), + &Schema::new(vec![ + Field::new("id", DataType::Int16, false), + Field::new("key", DataType::Utf8, false), + Field::new("value", DataType::Float32, false), + ]), + None, + )?; + + let plan = table_1 + .union(table_2.build()?)? + .union(table_3.build()?)? + .build()?; + + let expected = "Union\ + \n TableScan: table_1\ + \n Projection: CAST(table_1.id AS Int64) AS id, table_1.key, CAST(table_1.value AS Float64) AS value\ + \n TableScan: table_1\ + \n Projection: CAST(table_1.id AS Int64) AS id, table_1.key, CAST(table_1.value AS Float64) AS value\ + \n TableScan: table_1"; + assert_optimized_plan_equal(&plan, expected) + } } diff --git a/datafusion/optimizer/src/eliminate_one_union.rs b/datafusion/optimizer/src/eliminate_one_union.rs index c343e2576c04..70ee490346ff 100644 --- a/datafusion/optimizer/src/eliminate_one_union.rs +++ b/datafusion/optimizer/src/eliminate_one_union.rs @@ -40,9 +40,7 @@ impl OptimizerRule for EliminateOneUnion { _config: &dyn OptimizerConfig, ) -> Result> { match plan { - LogicalPlan::Union(union) if union.inputs.len() == 1 => { - let Union { inputs, schema: _ } = union; - + LogicalPlan::Union(Union { inputs, .. }) if inputs.len() == 1 => { Ok(inputs.first().map(|input| input.as_ref().clone())) } _ => Ok(None), @@ -103,7 +101,7 @@ mod tests { } #[test] - fn eliminate_nested_union() -> Result<()> { + fn eliminate_one_union() -> Result<()> { let table_plan = coerce_plan_expr_for_schema( &table_scan(Some("table"), &schema(), None)?.build()?, &schema().to_dfschema()?, diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs index a1db9dcc171d..5231dc869875 100644 --- a/datafusion/optimizer/src/optimizer.rs +++ b/datafusion/optimizer/src/optimizer.rs @@ -222,6 +222,7 @@ impl Optimizer { /// Create a new optimizer using the recommended list of rules pub fn new() -> Self { let rules: Vec> = vec![ + Arc::new(EliminateNestedUnion::new()), Arc::new(SimplifyExpressions::new()), Arc::new(UnwrapCastInComparison::new()), Arc::new(ReplaceDistinctWithAggregate::new()), @@ -229,7 +230,6 @@ impl Optimizer { Arc::new(DecorrelatePredicateSubquery::new()), Arc::new(ScalarSubqueryToJoin::new()), Arc::new(ExtractEquijoinPredicate::new()), - Arc::new(EliminateNestedUnion::new()), // simplify expressions does not simplify expressions in subqueries, so we // run it again after running the optimizations that potentially converted // subqueries to joins @@ -242,9 +242,10 @@ impl Optimizer { Arc::new(CommonSubexprEliminate::new()), Arc::new(EliminateLimit::new()), Arc::new(PropagateEmptyRelation::new()), + // Must be after PropagateEmptyRelation + Arc::new(EliminateOneUnion::new()), Arc::new(FilterNullJoinKeys::default()), Arc::new(EliminateOuterJoin::new()), - Arc::new(EliminateOneUnion::new()), // Filters can't be pushed down past Limits, we should do PushDownFilter after PushDownLimit Arc::new(PushDownLimit::new()), Arc::new(PushDownFilter::new()), diff --git a/datafusion/sql/tests/sql_integration.rs b/datafusion/sql/tests/sql_integration.rs index f4ffea06b7a1..e97b7cd38c10 100644 --- a/datafusion/sql/tests/sql_integration.rs +++ b/datafusion/sql/tests/sql_integration.rs @@ -2053,24 +2053,6 @@ fn union_all() { quick_test(sql, expected); } -#[test] -fn union_4_combined_in_one() { - let sql = "SELECT order_id from orders - UNION ALL SELECT order_id FROM orders - UNION ALL SELECT order_id FROM orders - UNION ALL SELECT order_id FROM orders"; - let expected = "Union\ - \n Projection: orders.order_id\ - \n TableScan: orders\ - \n Projection: orders.order_id\ - \n TableScan: orders\ - \n Projection: orders.order_id\ - \n TableScan: orders\ - \n Projection: orders.order_id\ - \n TableScan: orders"; - quick_test(sql, expected); -} - #[test] fn union_with_different_column_names() { let sql = "SELECT order_id from orders UNION ALL SELECT customer_id FROM orders"; diff --git a/datafusion/sqllogictest/test_files/explain.slt b/datafusion/sqllogictest/test_files/explain.slt index 23055cd978ff..0e190a6acd62 100644 --- a/datafusion/sqllogictest/test_files/explain.slt +++ b/datafusion/sqllogictest/test_files/explain.slt @@ -184,6 +184,7 @@ logical_plan after inline_table_scan SAME TEXT AS ABOVE logical_plan after type_coercion SAME TEXT AS ABOVE logical_plan after count_wildcard_rule SAME TEXT AS ABOVE analyzed_logical_plan SAME TEXT AS ABOVE +logical_plan after eliminate_nested_union SAME TEXT AS ABOVE logical_plan after simplify_expressions SAME TEXT AS ABOVE logical_plan after unwrap_cast_in_comparison SAME TEXT AS ABOVE logical_plan after replace_distinct_aggregate SAME TEXT AS ABOVE @@ -200,6 +201,7 @@ logical_plan after eliminate_cross_join SAME TEXT AS ABOVE logical_plan after common_sub_expression_eliminate SAME TEXT AS ABOVE logical_plan after eliminate_limit SAME TEXT AS ABOVE logical_plan after propagate_empty_relation SAME TEXT AS ABOVE +logical_plan after eliminate_one_union SAME TEXT AS ABOVE logical_plan after filter_null_join_keys SAME TEXT AS ABOVE logical_plan after eliminate_outer_join SAME TEXT AS ABOVE logical_plan after push_down_limit SAME TEXT AS ABOVE @@ -213,6 +215,7 @@ Projection: simple_explain_test.a, simple_explain_test.b, simple_explain_test.c --TableScan: simple_explain_test projection=[a, b, c] logical_plan after eliminate_projection TableScan: simple_explain_test projection=[a, b, c] logical_plan after push_down_limit SAME TEXT AS ABOVE +logical_plan after eliminate_nested_union SAME TEXT AS ABOVE logical_plan after simplify_expressions SAME TEXT AS ABOVE logical_plan after unwrap_cast_in_comparison SAME TEXT AS ABOVE logical_plan after replace_distinct_aggregate SAME TEXT AS ABOVE @@ -229,6 +232,7 @@ logical_plan after eliminate_cross_join SAME TEXT AS ABOVE logical_plan after common_sub_expression_eliminate SAME TEXT AS ABOVE logical_plan after eliminate_limit SAME TEXT AS ABOVE logical_plan after propagate_empty_relation SAME TEXT AS ABOVE +logical_plan after eliminate_one_union SAME TEXT AS ABOVE logical_plan after filter_null_join_keys SAME TEXT AS ABOVE logical_plan after eliminate_outer_join SAME TEXT AS ABOVE logical_plan after push_down_limit SAME TEXT AS ABOVE From 1d5e66aefab47774ab6b68de8c6c0846a806d2a0 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 10 Oct 2023 10:05:07 -0400 Subject: [PATCH 7/7] clippy --- datafusion/optimizer/src/eliminate_nested_union.rs | 4 ++-- datafusion/optimizer/src/propagate_empty_relation.rs | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/datafusion/optimizer/src/eliminate_nested_union.rs b/datafusion/optimizer/src/eliminate_nested_union.rs index 7fd097489a42..e22c73e5794d 100644 --- a/datafusion/optimizer/src/eliminate_nested_union.rs +++ b/datafusion/optimizer/src/eliminate_nested_union.rs @@ -45,10 +45,10 @@ impl OptimizerRule for EliminateNestedUnion { match plan { LogicalPlan::Union(Union { inputs, schema }) => { let inputs = inputs - .into_iter() + .iter() .flat_map(|plan| match plan.as_ref() { LogicalPlan::Union(Union { inputs, schema }) => inputs - .into_iter() + .iter() .map(|plan| { Arc::new( coerce_plan_expr_for_schema(plan, schema).unwrap(), diff --git a/datafusion/optimizer/src/propagate_empty_relation.rs b/datafusion/optimizer/src/propagate_empty_relation.rs index ec567d000d13..040b69fc8bf3 100644 --- a/datafusion/optimizer/src/propagate_empty_relation.rs +++ b/datafusion/optimizer/src/propagate_empty_relation.rs @@ -211,7 +211,7 @@ mod tests { Arc::new(EliminateNestedUnion::new()), Arc::new(PropagateEmptyRelation::new()), ], - &plan, + plan, expected, ) }