diff --git a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs index 5423af384a3b..6c9ae24816e7 100644 --- a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs @@ -16,12 +16,15 @@ // under the License. use std::ops::Deref; +use std::sync::Arc; -use super::*; +use datafusion::config::ConfigOptions; use datafusion::datasource::file_format::file_compression_type::FileCompressionType; use datafusion::datasource::listing::PartitionedFile; use datafusion::datasource::object_store::ObjectStoreUrl; use datafusion::datasource::physical_plan::{CsvExec, FileScanConfig, ParquetExec}; + +use datafusion_physical_expr::PhysicalExpr; use datafusion_physical_optimizer::enforce_sorting::EnforceSorting; use datafusion_physical_optimizer::output_requirements::OutputRequirements; use datafusion_physical_optimizer::test_utils::{ @@ -31,22 +34,48 @@ use datafusion_physical_plan::coalesce_batches::CoalesceBatchesExec; use datafusion_physical_plan::expressions::col; use datafusion_physical_plan::filter::FilterExec; use datafusion_physical_plan::joins::utils::JoinOn; +use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode, SortMergeJoinExec}; use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; use datafusion_physical_plan::sorts::sort::SortExec; +use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; +use datafusion_physical_plan::tree_node::PlanContext; +use datafusion_physical_plan::union::UnionExec; + use datafusion_physical_plan::{displayable, DisplayAs, DisplayFormatType, Statistics}; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use datafusion_common::ScalarValue; -use datafusion_expr::Operator; -use datafusion_physical_expr::expressions::{BinaryExpr, Literal}; +use datafusion_expr::{JoinType, Operator}; +use datafusion_physical_expr::expressions::{BinaryExpr, Column, Literal}; use datafusion_physical_expr::{ expressions::binary, expressions::lit, LexOrdering, PhysicalSortExpr, }; use datafusion_physical_expr_common::sort_expr::LexRequirement; +use datafusion_physical_optimizer::enforce_distribution::*; +use datafusion_physical_optimizer::PhysicalOptimizerRule; +use datafusion_physical_plan::aggregates::{ + AggregateExec, AggregateMode, PhysicalGroupBy, +}; +use datafusion_physical_plan::execution_plan::ExecutionPlan; +use datafusion_physical_plan::projection::ProjectionExec; use datafusion_physical_plan::PlanProperties; +use std::fmt::Debug; + +use datafusion_common::error::Result; + +use arrow::compute::SortOptions; +use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; + +use datafusion_physical_plan::ExecutionPlanProperties; + +type DistributionContext = PlanContext; + +/// Keeps track of parent required key orderings. +type PlanWithKeyRequirements = PlanContext>>; /// Models operators like BoundedWindowExec that require an input /// ordering but is easy to construct +/// #[derive(Debug)] struct SortRequiredExec { input: Arc, @@ -133,8 +162,8 @@ impl ExecutionPlan for SortRequiredExec { fn execute( &self, _partition: usize, - _context: Arc, - ) -> Result { + _context: Arc, + ) -> Result { unreachable!(); } @@ -340,7 +369,6 @@ fn filter_exec(input: Arc) -> Arc { )); Arc::new(FilterExec::try_new(predicate, input).unwrap()) } - fn sort_exec( sort_exprs: LexOrdering, input: Arc, diff --git a/datafusion/physical-optimizer/src/enforce_distribution.rs b/datafusion/physical-optimizer/src/enforce_distribution.rs index be4c6667df3f..611106941d59 100644 --- a/datafusion/physical-optimizer/src/enforce_distribution.rs +++ b/datafusion/physical-optimizer/src/enforce_distribution.rs @@ -278,7 +278,7 @@ impl PhysicalOptimizerRule for EnforceDistribution { /// 4) If the current plan is Projection, transform the requirements to the columns before the Projection and push down requirements /// 5) For other types of operators, by default, pushdown the parent requirements to children. /// -fn adjust_input_keys_ordering( +pub fn adjust_input_keys_ordering( mut requirements: PlanWithKeyRequirements, ) -> Result> { let plan = Arc::clone(&requirements.plan); @@ -427,7 +427,7 @@ fn adjust_input_keys_ordering( Ok(Transformed::yes(requirements)) } -fn reorder_partitioned_join_keys( +pub fn reorder_partitioned_join_keys( mut join_plan: PlanWithKeyRequirements, on: &[(PhysicalExprRef, PhysicalExprRef)], sort_options: &[SortOptions], @@ -465,7 +465,7 @@ where Ok(join_plan) } -fn reorder_aggregate_keys( +pub fn reorder_aggregate_keys( mut agg_node: PlanWithKeyRequirements, agg_exec: &AggregateExec, ) -> Result { @@ -600,7 +600,7 @@ fn shift_right_required( /// The Bottom-Up approach will be useful in future if we plan to support storage partition-wised Joins. /// In that case, the datasources/tables might be pre-partitioned and we can't adjust the key ordering of the datasources /// and then can't apply the Top-Down reordering process. -pub(crate) fn reorder_join_keys_to_inputs( +pub fn reorder_join_keys_to_inputs( plan: Arc, ) -> Result> { let plan_any = plan.as_any(); @@ -1150,7 +1150,7 @@ fn get_repartition_requirement_status( /// operators to satisfy distribution requirements. Since this function /// takes care of such requirements, we should avoid manually adding data /// exchange operators in other places. -fn ensure_distribution( +pub fn ensure_distribution( dist_context: DistributionContext, config: &ConfigOptions, ) -> Result> { diff --git a/datafusion/physical-optimizer/src/lib.rs b/datafusion/physical-optimizer/src/lib.rs index 4fb26b950419..fed7bf07b34f 100644 --- a/datafusion/physical-optimizer/src/lib.rs +++ b/datafusion/physical-optimizer/src/lib.rs @@ -34,4 +34,4 @@ pub mod test_utils; pub mod topk_aggregation; pub mod update_aggr_exprs; pub use optimizer::PhysicalOptimizerRule; -mod utils; +pub mod utils;