Skip to content

Commit

Permalink
Prevent take_optimizable from discarding arbitrary plan node
Browse files Browse the repository at this point in the history
`take_optimizable` started from inspecting top level plan node (it
should be final aggregation) and then descended trying to find matching
partial aggregation. When doing so, it would ignore any single-source
nodes it passes by. As a result, `AggregateStatistics` could change the
plan in an undesired manner.
  • Loading branch information
findepi committed Oct 21, 2024
1 parent b42d9b8 commit 7f67ab7
Showing 1 changed file with 7 additions and 80 deletions.
87 changes: 7 additions & 80 deletions datafusion/physical-optimizer/src/aggregate_statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,22 +115,14 @@ fn take_optimizable(node: &dyn ExecutionPlan) -> Option<Arc<dyn ExecutionPlan>>
if !final_agg_exec.mode().is_first_stage()
&& final_agg_exec.group_expr().is_empty()
{
let mut child = Arc::clone(final_agg_exec.input());
loop {
if let Some(partial_agg_exec) =
child.as_any().downcast_ref::<AggregateExec>()
let child = final_agg_exec.input();
if let Some(partial_agg_exec) = child.as_any().downcast_ref::<AggregateExec>()
{
if partial_agg_exec.mode().is_first_stage()
&& partial_agg_exec.group_expr().is_empty()
&& partial_agg_exec.filter_expr().iter().all(|e| e.is_none())
{
if partial_agg_exec.mode().is_first_stage()
&& partial_agg_exec.group_expr().is_empty()
&& partial_agg_exec.filter_expr().iter().all(|e| e.is_none())
{
return Some(child);
}
}
if let [childrens_child] = child.children().as_slice() {
child = Arc::clone(childrens_child);
} else {
break;
return Some(Arc::clone(child));
}
}
}
Expand Down Expand Up @@ -167,7 +159,6 @@ mod tests {
use datafusion_expr_common::operator::Operator;

use datafusion_physical_plan::aggregates::PhysicalGroupBy;
use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion_physical_plan::common;
use datafusion_physical_plan::filter::FilterExec;
use datafusion_physical_plan::memory::MemoryExec;
Expand Down Expand Up @@ -362,70 +353,6 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn test_count_partial_indirect_child() -> Result<()> {
let source = mock_data()?;
let schema = source.schema();
let agg = TestAggregate::new_count_star();

let partial_agg = AggregateExec::try_new(
AggregateMode::Partial,
PhysicalGroupBy::default(),
vec![Arc::new(agg.count_expr(&schema))],
vec![None],
source,
Arc::clone(&schema),
)?;

// We introduce an intermediate optimization step between the partial and final aggregtator
let coalesce = CoalescePartitionsExec::new(Arc::new(partial_agg));

let final_agg = AggregateExec::try_new(
AggregateMode::Final,
PhysicalGroupBy::default(),
vec![Arc::new(agg.count_expr(&schema))],
vec![None],
Arc::new(coalesce),
Arc::clone(&schema),
)?;

assert_count_optim_success(final_agg, agg).await?;

Ok(())
}

#[tokio::test]
async fn test_count_partial_with_nulls_indirect_child() -> Result<()> {
let source = mock_data()?;
let schema = source.schema();
let agg = TestAggregate::new_count_column(&schema);

let partial_agg = AggregateExec::try_new(
AggregateMode::Partial,
PhysicalGroupBy::default(),
vec![Arc::new(agg.count_expr(&schema))],
vec![None],
source,
Arc::clone(&schema),
)?;

// We introduce an intermediate optimization step between the partial and final aggregtator
let coalesce = CoalescePartitionsExec::new(Arc::new(partial_agg));

let final_agg = AggregateExec::try_new(
AggregateMode::Final,
PhysicalGroupBy::default(),
vec![Arc::new(agg.count_expr(&schema))],
vec![None],
Arc::new(coalesce),
Arc::clone(&schema),
)?;

assert_count_optim_success(final_agg, agg).await?;

Ok(())
}

#[tokio::test]
async fn test_count_inexact_stat() -> Result<()> {
let source = mock_data()?;
Expand Down

0 comments on commit 7f67ab7

Please sign in to comment.