From 3a8c04c6886214aa0acc7aeee7fb47867512f498 Mon Sep 17 00:00:00 2001 From: codedump Date: Sat, 18 Jan 2025 21:27:30 +0800 Subject: [PATCH 1/2] fix: fix issue 10326, add missing columns into list directory,instead of traversing the plan looking for projection node --- datafusion/expr/src/logical_plan/builder.rs | 41 +++++++-------------- 1 file changed, 14 insertions(+), 27 deletions(-) diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index c7cff3ac26b1..67b6c326a31b 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -575,44 +575,31 @@ impl LogicalPlanBuilder { missing_cols: &IndexSet, is_distinct: bool, ) -> Result { - match curr_plan { - LogicalPlan::Projection(Projection { - input, - mut expr, - schema: _, - }) if missing_cols.iter().all(|c| input.schema().has_column(c)) => { + let inputs = curr_plan.inputs(); + let mut exprs = curr_plan.expressions(); + for input in inputs { + if missing_cols.iter().all(|c| input.schema().has_column(c)) { let mut missing_exprs = missing_cols .iter() - .map(|c| normalize_col(Expr::Column(c.clone()), &input)) + .map(|c| normalize_col(Expr::Column(c.clone()), input)) .collect::>>()?; // Do not let duplicate columns to be added, some of the // missing_cols may be already present but without the new // projected alias. - missing_exprs.retain(|e| !expr.contains(e)); + missing_exprs.retain(|e| !exprs.contains(e)); if is_distinct { - Self::ambiguous_distinct_check(&missing_exprs, missing_cols, &expr)?; + Self::ambiguous_distinct_check(&missing_exprs, missing_cols, &exprs)?; } - expr.extend(missing_exprs); - project(Arc::unwrap_or_clone(input), expr) - } - _ => { - let is_distinct = - is_distinct || matches!(curr_plan, LogicalPlan::Distinct(_)); - let new_inputs = curr_plan - .inputs() - .into_iter() - .map(|input_plan| { - Self::add_missing_columns( - (*input_plan).clone(), - missing_cols, - is_distinct, - ) - }) - .collect::>>()?; - curr_plan.with_new_exprs(curr_plan.expressions(), new_inputs) + exprs.extend(missing_exprs); } } + let inputs = curr_plan + .inputs() + .into_iter() + .map(|input| input.to_owned()) + .collect::>(); + curr_plan.with_new_exprs(exprs, inputs) } fn ambiguous_distinct_check( From 11acffef9f7a6dfed98cbe1f914ee532ca6445bc Mon Sep 17 00:00:00 2001 From: codedump Date: Thu, 23 Jan 2025 21:15:02 +0800 Subject: [PATCH 2/2] fix: fix issue 10326, do ambiguous_distinct_check in select --- datafusion/expr/src/logical_plan/builder.rs | 40 ++++++--- datafusion/sql/src/select.rs | 94 ++++++++++++++++++++- 2 files changed, 117 insertions(+), 17 deletions(-) diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index 67b6c326a31b..78a2da9855c2 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -577,21 +577,33 @@ impl LogicalPlanBuilder { ) -> Result { let inputs = curr_plan.inputs(); let mut exprs = curr_plan.expressions(); + let is_distinct = is_distinct || matches!(curr_plan, LogicalPlan::Distinct(_)); for input in inputs { - if missing_cols.iter().all(|c| input.schema().has_column(c)) { - let mut missing_exprs = missing_cols - .iter() - .map(|c| normalize_col(Expr::Column(c.clone()), input)) - .collect::>>()?; - - // Do not let duplicate columns to be added, some of the - // missing_cols may be already present but without the new - // projected alias. - missing_exprs.retain(|e| !exprs.contains(e)); - if is_distinct { - Self::ambiguous_distinct_check(&missing_exprs, missing_cols, &exprs)?; + if let LogicalPlan::Projection(Projection { + input, + expr: _, + schema: _, + }) = input + { + if missing_cols.iter().all(|c| input.schema().has_column(c)) { + let mut missing_exprs = missing_cols + .iter() + .map(|c| normalize_col(Expr::Column(c.clone()), input)) + .collect::>>()?; + + // Do not let duplicate columns to be added, some of the + // missing_cols may be already present but without the new + // projected alias. + missing_exprs.retain(|e| !exprs.contains(e)); + if is_distinct { + Self::ambiguous_distinct_check( + &missing_exprs, + missing_cols, + &exprs, + )?; + } + exprs.extend(missing_exprs); } - exprs.extend(missing_exprs); } } let inputs = curr_plan @@ -602,7 +614,7 @@ impl LogicalPlanBuilder { curr_plan.with_new_exprs(exprs, inputs) } - fn ambiguous_distinct_check( + pub fn ambiguous_distinct_check( missing_exprs: &[Expr], missing_cols: &IndexSet, projection_exprs: &[Expr], diff --git a/datafusion/sql/src/select.rs b/datafusion/sql/src/select.rs index b160c9d8748d..52b0177e6d44 100644 --- a/datafusion/sql/src/select.rs +++ b/datafusion/sql/src/select.rs @@ -25,7 +25,7 @@ use crate::utils::{ }; use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion}; -use datafusion_common::{not_impl_err, plan_err, Result}; +use datafusion_common::{not_impl_err, plan_err, Column, Result}; use datafusion_common::{RecursionUnnestOption, UnnestOptions}; use datafusion_expr::expr::{Alias, PlannedReplaceSelectItem, WildcardOptions}; use datafusion_expr::expr_rewriter::{ @@ -36,10 +36,10 @@ use datafusion_expr::utils::{ }; use datafusion_expr::{ qualified_wildcard_with_options, wildcard_with_options, Aggregate, Expr, Filter, - GroupingSet, LogicalPlan, LogicalPlanBuilder, Partitioning, + GroupingSet, LogicalPlan, LogicalPlanBuilder, Partitioning, Projection, }; -use indexmap::IndexMap; +use indexmap::{IndexMap, IndexSet}; use sqlparser::ast::{ Distinct, Expr as SQLExpr, GroupByExpr, NamedWindowExpr, OrderByExpr, WildcardAdditionalOptions, WindowType, @@ -264,6 +264,10 @@ impl SqlToRel<'_, S> { } }?; + if let LogicalPlan::Distinct(_) = &plan { + Self::ambiguous_distinct_project_check(&plan, &order_by_rex)?; + } + // DISTRIBUTE BY let plan = if !select.distribute_by.is_empty() { let x = select @@ -287,6 +291,90 @@ impl SqlToRel<'_, S> { self.order_by(plan, order_by_rex) } + /// Adding a new column is not correct if there is a `Distinct` + /// node, which produces only distinct values of its + /// inputs. Adding a new column to its input will result in + /// potentially different results than with the original column. + /// + /// For example, if the input is like: + /// + /// Distinct(A, B) + /// + /// If the input looks like + /// + /// a | b | c + /// --+---+--- + /// 1 | 2 | 3 + /// 1 | 2 | 4 + /// + /// Distinct (A, B) --> (1,2) + /// + /// But Distinct (A, B, C) --> (1, 2, 3), (1, 2, 4) + /// (which will appear as a (1, 2), (1, 2) if a and b are projected + /// + /// See for more details + fn ambiguous_distinct_project_check( + plan: &LogicalPlan, + order_by: &[datafusion_expr::expr::Sort], + ) -> Result<()> { + let schema = plan.schema(); + // Collect sort columns that are missing in the input plan's schema + let mut missing_cols: IndexSet = IndexSet::new(); + order_by.iter().try_for_each::<_, Result<()>>(|sort| { + let columns = sort.expr.column_refs(); + + missing_cols.extend( + columns + .into_iter() + .filter(|c| !schema.has_column(c)) + .cloned(), + ); + + Ok(()) + })?; + + if missing_cols.is_empty() { + return Ok(()); + } + Self::do_ambiguous_distinct_project_check(plan, &missing_cols) + } + + fn do_ambiguous_distinct_project_check( + plan: &LogicalPlan, + missing_cols: &IndexSet, + ) -> Result<()> { + for input in plan.inputs() { + if let LogicalPlan::Projection(Projection { + input, + expr, + schema: _, + .. + }) = input + { + if missing_cols.iter().all(|c| input.schema().has_column(c)) { + let mut missing_exprs = missing_cols + .iter() + .map(|c| normalize_col(Expr::Column(c.clone()), input)) + .collect::>>()?; + + // Do not let duplicate columns to be added, some of the + // missing_cols may be already present but without the new + // projected alias. + missing_exprs.retain(|e| !expr.contains(e)); + LogicalPlanBuilder::ambiguous_distinct_check( + &missing_exprs, + missing_cols, + expr, + )?; + } + } else { + Self::do_ambiguous_distinct_project_check(input, missing_cols)?; + } + } + + Ok(()) + } + /// Try converting Expr(Unnest(Expr)) to Projection/Unnest/Projection pub(super) fn try_process_unnest( &self,