Skip to content

Commit

Permalink
fix: ci tests
Browse files Browse the repository at this point in the history
  • Loading branch information
logan-keede committed Jan 21, 2025
1 parent 3d78ef2 commit 9d19863
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 12 deletions.
40 changes: 34 additions & 6 deletions datafusion/core/tests/physical_optimizer/enforce_distribution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@
// under the License.

use std::ops::Deref;
use std::sync::Arc;

use super::*;
use datafusion::config::ConfigOptions;
use datafusion::datasource::file_format::file_compression_type::FileCompressionType;
use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::object_store::ObjectStoreUrl;
use datafusion::datasource::physical_plan::{CsvExec, FileScanConfig, ParquetExec};

use datafusion_physical_expr::PhysicalExpr;
use datafusion_physical_optimizer::enforce_sorting::EnforceSorting;
use datafusion_physical_optimizer::output_requirements::OutputRequirements;
use datafusion_physical_optimizer::test_utils::{
Expand All @@ -31,22 +34,48 @@ use datafusion_physical_plan::coalesce_batches::CoalesceBatchesExec;
use datafusion_physical_plan::expressions::col;
use datafusion_physical_plan::filter::FilterExec;
use datafusion_physical_plan::joins::utils::JoinOn;
use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode, SortMergeJoinExec};
use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
use datafusion_physical_plan::sorts::sort::SortExec;
use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
use datafusion_physical_plan::tree_node::PlanContext;
use datafusion_physical_plan::union::UnionExec;

use datafusion_physical_plan::{displayable, DisplayAs, DisplayFormatType, Statistics};

use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion_common::ScalarValue;
use datafusion_expr::Operator;
use datafusion_physical_expr::expressions::{BinaryExpr, Literal};
use datafusion_expr::{JoinType, Operator};
use datafusion_physical_expr::expressions::{BinaryExpr, Column, Literal};
use datafusion_physical_expr::{
expressions::binary, expressions::lit, LexOrdering, PhysicalSortExpr,
};
use datafusion_physical_expr_common::sort_expr::LexRequirement;
use datafusion_physical_optimizer::enforce_distribution::*;
use datafusion_physical_optimizer::PhysicalOptimizerRule;
use datafusion_physical_plan::aggregates::{
AggregateExec, AggregateMode, PhysicalGroupBy,
};
use datafusion_physical_plan::execution_plan::ExecutionPlan;
use datafusion_physical_plan::projection::ProjectionExec;
use datafusion_physical_plan::PlanProperties;
use std::fmt::Debug;

use datafusion_common::error::Result;

use arrow::compute::SortOptions;
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};

use datafusion_physical_plan::ExecutionPlanProperties;

type DistributionContext = PlanContext<bool>;

/// Keeps track of parent required key orderings.
type PlanWithKeyRequirements = PlanContext<Vec<Arc<dyn PhysicalExpr>>>;

/// Models operators like BoundedWindowExec that require an input
/// ordering but is easy to construct
///
#[derive(Debug)]
struct SortRequiredExec {
input: Arc<dyn ExecutionPlan>,
Expand Down Expand Up @@ -133,8 +162,8 @@ impl ExecutionPlan for SortRequiredExec {
fn execute(
&self,
_partition: usize,
_context: Arc<crate::execution::context::TaskContext>,
) -> Result<crate::physical_plan::SendableRecordBatchStream> {
_context: Arc<datafusion::execution::context::TaskContext>,
) -> Result<datafusion_physical_plan::SendableRecordBatchStream> {
unreachable!();
}

Expand Down Expand Up @@ -340,7 +369,6 @@ fn filter_exec(input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
));
Arc::new(FilterExec::try_new(predicate, input).unwrap())
}

fn sort_exec(
sort_exprs: LexOrdering,
input: Arc<dyn ExecutionPlan>,
Expand Down
10 changes: 5 additions & 5 deletions datafusion/physical-optimizer/src/enforce_distribution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ impl PhysicalOptimizerRule for EnforceDistribution {
/// 4) If the current plan is Projection, transform the requirements to the columns before the Projection and push down requirements
/// 5) For other types of operators, by default, pushdown the parent requirements to children.
///
fn adjust_input_keys_ordering(
pub fn adjust_input_keys_ordering(
mut requirements: PlanWithKeyRequirements,
) -> Result<Transformed<PlanWithKeyRequirements>> {
let plan = Arc::clone(&requirements.plan);
Expand Down Expand Up @@ -427,7 +427,7 @@ fn adjust_input_keys_ordering(
Ok(Transformed::yes(requirements))
}

fn reorder_partitioned_join_keys<F>(
pub fn reorder_partitioned_join_keys<F>(
mut join_plan: PlanWithKeyRequirements,
on: &[(PhysicalExprRef, PhysicalExprRef)],
sort_options: &[SortOptions],
Expand Down Expand Up @@ -465,7 +465,7 @@ where
Ok(join_plan)
}

fn reorder_aggregate_keys(
pub fn reorder_aggregate_keys(
mut agg_node: PlanWithKeyRequirements,
agg_exec: &AggregateExec,
) -> Result<PlanWithKeyRequirements> {
Expand Down Expand Up @@ -600,7 +600,7 @@ fn shift_right_required(
/// The Bottom-Up approach will be useful in future if we plan to support storage partition-wised Joins.
/// In that case, the datasources/tables might be pre-partitioned and we can't adjust the key ordering of the datasources
/// and then can't apply the Top-Down reordering process.
pub(crate) fn reorder_join_keys_to_inputs(
pub fn reorder_join_keys_to_inputs(
plan: Arc<dyn ExecutionPlan>,
) -> Result<Arc<dyn ExecutionPlan>> {
let plan_any = plan.as_any();
Expand Down Expand Up @@ -1150,7 +1150,7 @@ fn get_repartition_requirement_status(
/// operators to satisfy distribution requirements. Since this function
/// takes care of such requirements, we should avoid manually adding data
/// exchange operators in other places.
fn ensure_distribution(
pub fn ensure_distribution(
dist_context: DistributionContext,
config: &ConfigOptions,
) -> Result<Transformed<DistributionContext>> {
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-optimizer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,4 @@ pub mod test_utils;
pub mod topk_aggregation;
pub mod update_aggr_exprs;
pub use optimizer::PhysicalOptimizerRule;
mod utils;
pub mod utils;

0 comments on commit 9d19863

Please sign in to comment.