From 9005585fa6f4eb6a4d0cc515b6ad76794c33c626 Mon Sep 17 00:00:00 2001 From: Jagdish Parihar Date: Tue, 5 Nov 2024 20:21:36 +0530 Subject: [PATCH] Deprecate `LexOrderingRef` and `LexRequirementRef` (#13233) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * converted LexOrderingRef to &LexOrdering * using LexOrdering::from_ref fn instead of directly cloning it * using as_ref instread of & * using as_ref * removed commented code * updated cargo lock * updated LexRequirementRef to &LexRequirement * fixed clippy issues * fixed taplo error for cargo.toml in physical-expr-common * removed commented code * fixed clippy errors * fixed clippy error * fixes * removed LexOrdering::from_ref instead using clone and created LexOrdering::empty() fn * Update mod.rs --------- Co-authored-by: Berkay Şahin <124376117+berkaysynnada@users.noreply.github.com> Co-authored-by: berkaysynnada --- benchmarks/src/sort.rs | 39 ++++--- datafusion-cli/Cargo.lock | 33 +++--- .../physical_plan/file_scan_config.rs | 27 ++--- .../datasource/physical_plan/statistics.rs | 34 +++--- .../enforce_distribution.rs | 8 +- .../src/physical_optimizer/enforce_sorting.rs | 17 +-- .../replace_with_order_preserving_variants.rs | 15 ++- .../src/physical_optimizer/sort_pushdown.rs | 72 +++++++------ .../physical_optimizer/update_aggr_exprs.rs | 19 ++-- .../tests/fuzz_cases/equivalence/utils.rs | 6 +- .../src/accumulator.rs | 4 +- .../functions-aggregate-common/src/utils.rs | 6 +- .../functions-aggregate/benches/count.rs | 4 +- datafusion/functions-aggregate/benches/sum.rs | 4 +- .../functions-aggregate/src/array_agg.rs | 2 +- .../functions-aggregate/src/first_last.rs | 11 +- .../functions-aggregate/src/nth_value.rs | 2 +- datafusion/functions-aggregate/src/stddev.rs | 6 +- datafusion/physical-expr-common/Cargo.toml | 1 + .../physical-expr-common/src/sort_expr.rs | 102 ++++++++++++++---- datafusion/physical-expr-common/src/utils.rs | 4 +- datafusion/physical-expr/src/aggregate.rs | 9 +- .../physical-expr/src/equivalence/class.rs | 9 +- .../physical-expr/src/equivalence/ordering.rs | 7 +- .../src/equivalence/properties.rs | 34 +++--- datafusion/physical-expr/src/lib.rs | 3 +- datafusion/physical-expr/src/utils/mod.rs | 4 +- .../physical-expr/src/window/aggregate.rs | 8 +- .../physical-expr/src/window/built_in.rs | 8 +- .../src/window/sliding_aggregate.rs | 8 +- .../physical-expr/src/window/window_expr.rs | 6 +- .../physical-plan/src/aggregates/mod.rs | 4 +- .../physical-plan/src/aggregates/order/mod.rs | 4 +- .../src/aggregates/order/partial.rs | 4 +- .../physical-plan/src/execution_plan.rs | 12 +-- .../src/joins/stream_join_utils.rs | 6 +- .../src/joins/symmetric_hash_join.rs | 12 +-- datafusion/physical-plan/src/joins/utils.rs | 16 +-- .../physical-plan/src/repartition/mod.rs | 10 +- .../physical-plan/src/sorts/partial_sort.rs | 3 +- datafusion/physical-plan/src/sorts/sort.rs | 36 +++---- .../src/sorts/sort_preserving_merge.rs | 8 +- datafusion/physical-plan/src/sorts/stream.rs | 4 +- .../src/sorts/streaming_merge.rs | 22 +++- .../src/windows/bounded_window_agg_exec.rs | 7 +- datafusion/physical-plan/src/windows/mod.rs | 10 +- .../proto/src/physical_plan/to_proto.rs | 2 +- .../tests/cases/roundtrip_physical_plan.rs | 27 ++--- 48 files changed, 396 insertions(+), 303 deletions(-) diff --git a/benchmarks/src/sort.rs b/benchmarks/src/sort.rs index b2038c432f77..f4b707611cfb 100644 --- a/benchmarks/src/sort.rs +++ b/benchmarks/src/sort.rs @@ -22,7 +22,7 @@ use crate::util::{AccessLogOpt, BenchmarkRun, CommonOpt}; use arrow::util::pretty; use datafusion::common::Result; -use datafusion::physical_expr::{LexOrdering, LexOrderingRef, PhysicalSortExpr}; +use datafusion::physical_expr::{LexOrdering, PhysicalSortExpr}; use datafusion::physical_plan::collect; use datafusion::physical_plan::sorts::sort::SortExec; use datafusion::prelude::{SessionConfig, SessionContext}; @@ -70,31 +70,28 @@ impl RunOpt { let sort_cases = vec![ ( "sort utf8", - vec![PhysicalSortExpr { + LexOrdering::new(vec![PhysicalSortExpr { expr: col("request_method", &schema)?, options: Default::default(), - }], + }]), ), ( "sort int", - vec![PhysicalSortExpr { - expr: col("request_bytes", &schema)?, + LexOrdering::new(vec![PhysicalSortExpr { + expr: col("response_bytes", &schema)?, options: Default::default(), - }], + }]), ), ( "sort decimal", - vec![ - // sort decimal - PhysicalSortExpr { - expr: col("decimal_price", &schema)?, - options: Default::default(), - }, - ], + LexOrdering::new(vec![PhysicalSortExpr { + expr: col("decimal_price", &schema)?, + options: Default::default(), + }]), ), ( "sort integer tuple", - vec![ + LexOrdering::new(vec![ PhysicalSortExpr { expr: col("request_bytes", &schema)?, options: Default::default(), @@ -103,11 +100,11 @@ impl RunOpt { expr: col("response_bytes", &schema)?, options: Default::default(), }, - ], + ]), ), ( "sort utf8 tuple", - vec![ + LexOrdering::new(vec![ // sort utf8 tuple PhysicalSortExpr { expr: col("service", &schema)?, @@ -125,11 +122,11 @@ impl RunOpt { expr: col("image", &schema)?, options: Default::default(), }, - ], + ]), ), ( "sort mixed tuple", - vec![ + LexOrdering::new(vec![ PhysicalSortExpr { expr: col("service", &schema)?, options: Default::default(), @@ -142,7 +139,7 @@ impl RunOpt { expr: col("decimal_price", &schema)?, options: Default::default(), }, - ], + ]), ), ]; for (title, expr) in sort_cases { @@ -170,13 +167,13 @@ impl RunOpt { async fn exec_sort( ctx: &SessionContext, - expr: LexOrderingRef<'_>, + expr: &LexOrdering, test_file: &TestParquetFile, debug: bool, ) -> Result<(usize, std::time::Duration)> { let start = Instant::now(); let scan = test_file.create_scan(ctx, None).await?; - let exec = Arc::new(SortExec::new(LexOrdering::new(expr.to_owned()), scan)); + let exec = Arc::new(SortExec::new(expr.clone(), scan)); let task_ctx = ctx.task_ctx(); let result = collect(exec, task_ctx).await?; let elapsed = start.elapsed(); diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index 541d464d381f..3348adb10386 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -99,9 +99,9 @@ dependencies = [ [[package]] name = "anstyle" -version = "1.0.9" +version = "1.0.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8365de52b16c035ff4fcafe0092ba9390540e3e352870ac09933bebcaa2c8c56" +checksum = "55cc3b69f167a1ef2e161439aa98aed94e6028e5f9a59be9a6ffb47aef1651f9" [[package]] name = "anstyle-parse" @@ -523,9 +523,9 @@ dependencies = [ [[package]] name = "aws-sdk-sso" -version = "1.47.0" +version = "1.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a8776850becacbd3a82a4737a9375ddb5c6832a51379f24443a98e61513f852c" +checksum = "ded855583fa1d22e88fe39fd6062b062376e50a8211989e07cf5e38d52eb3453" dependencies = [ "aws-credential-types", "aws-runtime", @@ -545,9 +545,9 @@ dependencies = [ [[package]] name = "aws-sdk-ssooidc" -version = "1.48.0" +version = "1.49.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0007b5b8004547133319b6c4e87193eee2a0bcb3e4c18c75d09febe9dab7b383" +checksum = "9177ea1192e6601ae16c7273385690d88a7ed386a00b74a6bc894d12103cd933" dependencies = [ "aws-credential-types", "aws-runtime", @@ -567,9 +567,9 @@ dependencies = [ [[package]] name = "aws-sdk-sts" -version = "1.47.0" +version = "1.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9fffaa356e7f1c725908b75136d53207fa714e348f365671df14e95a60530ad3" +checksum = "823ef553cf36713c97453e2ddff1eb8f62be7f4523544e2a5db64caf80100f0a" dependencies = [ "aws-credential-types", "aws-runtime", @@ -917,9 +917,9 @@ dependencies = [ [[package]] name = "cc" -version = "1.1.31" +version = "1.1.34" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c2e7962b54006dcfcc61cb72735f4d89bb97061dd6a7ed882ec6b8ee53714c6f" +checksum = "67b9470d453346108f93a59222a9a1a5724db32d0a4727b7ab7ace4b4d822dc9" dependencies = [ "jobserver", "libc", @@ -1520,6 +1520,7 @@ dependencies = [ "datafusion-common", "datafusion-expr-common", "hashbrown 0.14.5", + "itertools", "rand", ] @@ -3614,9 +3615,9 @@ checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292" [[package]] name = "syn" -version = "2.0.85" +version = "2.0.87" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5023162dfcd14ef8f32034d8bcd4cc5ddc61ef7a247c024a33e24e1f24d21b56" +checksum = "25aa4ce346d03a6dcd68dd8b4010bcb74e54e62c90c573f394c46eae99aba32d" dependencies = [ "proc-macro2", "quote", @@ -3653,18 +3654,18 @@ checksum = "3369f5ac52d5eb6ab48c6b4ffdc8efbcad6b89c765749064ba298f2c68a16a76" [[package]] name = "thiserror" -version = "1.0.65" +version = "1.0.66" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5d11abd9594d9b38965ef50805c5e469ca9cc6f197f883f717e0269a3057b3d5" +checksum = "5d171f59dbaa811dbbb1aee1e73db92ec2b122911a48e1390dfe327a821ddede" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.65" +version = "1.0.66" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ae71770322cbd277e69d762a16c444af02aa0575ac0d174f0b9562d3b37f8602" +checksum = "b08be0f17bd307950653ce45db00cd31200d82b624b36e181337d9c7d92765b5" dependencies = [ "proc-macro2", "quote", diff --git a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs index 74ab0126a557..6a162c97b666 100644 --- a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs +++ b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs @@ -35,7 +35,6 @@ use arrow_schema::{DataType, Field, Schema, SchemaRef}; use datafusion_common::stats::Precision; use datafusion_common::{exec_err, ColumnStatistics, DataFusionError, Statistics}; use datafusion_physical_expr::LexOrdering; -use datafusion_physical_expr_common::sort_expr::LexOrderingRef; use log::warn; @@ -308,7 +307,7 @@ impl FileScanConfig { pub fn split_groups_by_statistics( table_schema: &SchemaRef, file_groups: &[Vec], - sort_order: LexOrderingRef, + sort_order: &LexOrdering, ) -> Result>> { let flattened_files = file_groups.iter().flatten().collect::>(); // First Fit: @@ -1113,17 +1112,19 @@ mod tests { )))) .collect::>(), )); - let sort_order = case - .sort - .into_iter() - .map(|expr| { - crate::physical_planner::create_physical_sort_expr( - &expr, - &DFSchema::try_from(table_schema.as_ref().clone())?, - &ExecutionProps::default(), - ) - }) - .collect::>>()?; + let sort_order = LexOrdering { + inner: case + .sort + .into_iter() + .map(|expr| { + crate::physical_planner::create_physical_sort_expr( + &expr, + &DFSchema::try_from(table_schema.as_ref().clone())?, + &ExecutionProps::default(), + ) + }) + .collect::>>()?, + }; let partitioned_files = case.files.into_iter().map(From::from).collect::>(); diff --git a/datafusion/core/src/datasource/physical_plan/statistics.rs b/datafusion/core/src/datasource/physical_plan/statistics.rs index 6af153a731b0..488098e7861c 100644 --- a/datafusion/core/src/datasource/physical_plan/statistics.rs +++ b/datafusion/core/src/datasource/physical_plan/statistics.rs @@ -36,7 +36,7 @@ use arrow_array::RecordBatch; use arrow_schema::SchemaRef; use datafusion_common::{DataFusionError, Result}; use datafusion_physical_expr::{expressions::Column, PhysicalSortExpr}; -use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexOrderingRef}; +use datafusion_physical_expr_common::sort_expr::LexOrdering; /// A normalized representation of file min/max statistics that allows for efficient sorting & comparison. /// The min/max values are ordered by [`Self::sort_order`]. @@ -50,7 +50,7 @@ pub(crate) struct MinMaxStatistics { impl MinMaxStatistics { /// Sort order used to sort the statistics #[allow(unused)] - pub fn sort_order(&self) -> LexOrderingRef { + pub fn sort_order(&self) -> &LexOrdering { &self.sort_order } @@ -66,8 +66,8 @@ impl MinMaxStatistics { } pub fn new_from_files<'a>( - projected_sort_order: LexOrderingRef, // Sort order with respect to projected schema - projected_schema: &SchemaRef, // Projected schema + projected_sort_order: &LexOrdering, // Sort order with respect to projected schema + projected_schema: &SchemaRef, // Projected schema projection: Option<&[usize]>, // Indices of projection in full table schema (None = all columns) files: impl IntoIterator, ) -> Result { @@ -119,15 +119,17 @@ impl MinMaxStatistics { projected_schema .project(&(sort_columns.iter().map(|c| c.index()).collect::>()))?, ); - let min_max_sort_order = sort_columns - .iter() - .zip(projected_sort_order.iter()) - .enumerate() - .map(|(i, (col, sort))| PhysicalSortExpr { - expr: Arc::new(Column::new(col.name(), i)), - options: sort.options, - }) - .collect::>(); + let min_max_sort_order = LexOrdering { + inner: sort_columns + .iter() + .zip(projected_sort_order.iter()) + .enumerate() + .map(|(i, (col, sort))| PhysicalSortExpr { + expr: Arc::new(Column::new(col.name(), i)), + options: sort.options, + }) + .collect::>(), + }; let (min_values, max_values): (Vec<_>, Vec<_>) = sort_columns .iter() @@ -167,7 +169,7 @@ impl MinMaxStatistics { } pub fn new( - sort_order: LexOrderingRef, + sort_order: &LexOrdering, schema: &SchemaRef, min_values: RecordBatch, max_values: RecordBatch, @@ -257,7 +259,7 @@ impl MinMaxStatistics { Ok(Self { min_by_sort_order: min.map_err(|e| e.context("build min rows"))?, max_by_sort_order: max.map_err(|e| e.context("build max rows"))?, - sort_order: LexOrdering::from_ref(sort_order), + sort_order: sort_order.clone(), }) } @@ -278,7 +280,7 @@ impl MinMaxStatistics { } fn sort_columns_from_physical_sort_exprs( - sort_order: LexOrderingRef, + sort_order: &LexOrdering, ) -> Option> { sort_order .iter() diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs b/datafusion/core/src/physical_optimizer/enforce_distribution.rs index 6cd902db7244..6863978610db 100644 --- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs @@ -52,12 +52,12 @@ use datafusion_physical_expr::utils::map_columns_before_projection; use datafusion_physical_expr::{ physical_exprs_equal, EquivalenceProperties, PhysicalExpr, PhysicalExprRef, }; -use datafusion_physical_expr_common::sort_expr::LexOrdering; use datafusion_physical_optimizer::output_requirements::OutputRequirementExec; use datafusion_physical_optimizer::PhysicalOptimizerRule; use datafusion_physical_plan::windows::{get_best_fitting_window, BoundedWindowAggExec}; use datafusion_physical_plan::ExecutionPlanProperties; +use datafusion_physical_expr_common::sort_expr::LexOrdering; use itertools::izip; /// The `EnforceDistribution` rule ensures that distribution requirements are @@ -936,7 +936,11 @@ fn add_spm_on_top(input: DistributionContext) -> DistributionContext { let new_plan = if should_preserve_ordering { Arc::new(SortPreservingMergeExec::new( - LexOrdering::from_ref(input.plan.output_ordering().unwrap_or(&[])), + input + .plan + .output_ordering() + .unwrap_or(&LexOrdering::default()) + .clone(), input.plan.clone(), )) as _ } else { diff --git a/datafusion/core/src/physical_optimizer/enforce_sorting.rs b/datafusion/core/src/physical_optimizer/enforce_sorting.rs index 7b111cddc6fd..adc3d7cac10c 100644 --- a/datafusion/core/src/physical_optimizer/enforce_sorting.rs +++ b/datafusion/core/src/physical_optimizer/enforce_sorting.rs @@ -62,7 +62,7 @@ use crate::physical_plan::{Distribution, ExecutionPlan, InputOrderMode}; use datafusion_common::plan_err; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_physical_expr::{Partitioning, PhysicalSortRequirement}; -use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexOrderingRef}; +use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement}; use datafusion_physical_optimizer::PhysicalOptimizerRule; use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; use datafusion_physical_plan::repartition::RepartitionExec; @@ -224,9 +224,9 @@ fn replace_with_partial_sort( let sort_req = PhysicalSortRequirement::from_sort_exprs(sort_plan.expr()); let mut common_prefix_length = 0; - while child_eq_properties - .ordering_satisfy_requirement(&sort_req[0..common_prefix_length + 1]) - { + while child_eq_properties.ordering_satisfy_requirement(&LexRequirement { + inner: sort_req[0..common_prefix_length + 1].to_vec(), + }) { common_prefix_length += 1; } if common_prefix_length > 0 { @@ -392,7 +392,10 @@ fn analyze_immediate_sort_removal( let sort_input = sort_exec.input(); // If this sort is unnecessary, we should remove it: if sort_input.equivalence_properties().ordering_satisfy( - sort_exec.properties().output_ordering().unwrap_or_default(), + sort_exec + .properties() + .output_ordering() + .unwrap_or(LexOrdering::empty()), ) { node.plan = if !sort_exec.preserve_partitioning() && sort_input.output_partitioning().partition_count() > 1 @@ -632,10 +635,10 @@ fn remove_corresponding_sort_from_sub_plan( Ok(node) } -/// Converts an [ExecutionPlan] trait object to a [LexOrderingRef] when possible. +/// Converts an [ExecutionPlan] trait object to a [LexOrdering] reference when possible. fn get_sort_exprs( sort_any: &Arc, -) -> Result<(LexOrderingRef, Option)> { +) -> Result<(&LexOrdering, Option)> { if let Some(sort_exec) = sort_any.as_any().downcast_ref::() { Ok((sort_exec.expr(), sort_exec.fetch())) } else if let Some(spm) = sort_any.as_any().downcast_ref::() diff --git a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs index 930ce52e6fa2..c80aea411f57 100644 --- a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs +++ b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs @@ -129,11 +129,13 @@ fn plan_with_order_preserving_variants( return Ok(sort_input); } else if is_coalesce_partitions(&sort_input.plan) && is_spm_better { let child = &sort_input.children[0].plan; - if let Some(ordering) = child.output_ordering().map(Vec::from) { + if let Some(ordering) = child.output_ordering() { // When the input of a `CoalescePartitionsExec` has an ordering, // replace it with a `SortPreservingMergeExec` if appropriate: - let spm = - SortPreservingMergeExec::new(LexOrdering::new(ordering), child.clone()); + let spm = SortPreservingMergeExec::new( + LexOrdering::new(ordering.inner.clone()), + child.clone(), + ); sort_input.plan = Arc::new(spm) as _; sort_input.children[0].data = true; return Ok(sort_input); @@ -257,7 +259,12 @@ pub(crate) fn replace_with_order_preserving_variants( if alternate_plan .plan .equivalence_properties() - .ordering_satisfy(requirements.plan.output_ordering().unwrap_or_default()) + .ordering_satisfy( + requirements + .plan + .output_ordering() + .unwrap_or(LexOrdering::empty()), + ) { for child in alternate_plan.children.iter_mut() { child.data = false; diff --git a/datafusion/core/src/physical_optimizer/sort_pushdown.rs b/datafusion/core/src/physical_optimizer/sort_pushdown.rs index 1a53077b1fd5..e231e719b25f 100644 --- a/datafusion/core/src/physical_optimizer/sort_pushdown.rs +++ b/datafusion/core/src/physical_optimizer/sort_pushdown.rs @@ -36,10 +36,8 @@ use datafusion_common::{plan_err, HashSet, JoinSide, Result}; use datafusion_expr::JoinType; use datafusion_physical_expr::expressions::Column; use datafusion_physical_expr::utils::collect_columns; -use datafusion_physical_expr::{LexRequirementRef, PhysicalSortRequirement}; -use datafusion_physical_expr_common::sort_expr::{ - LexOrdering, LexOrderingRef, LexRequirement, -}; +use datafusion_physical_expr::PhysicalSortRequirement; +use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement}; /// This is a "data class" we use within the [`EnforceSorting`] rule to push /// down [`SortExec`] in the plan. In some cases, we can reduce the total @@ -87,11 +85,12 @@ fn pushdown_sorts_helper( let parent_reqs = requirements .data .ordering_requirement - .as_deref() - .unwrap_or(&[]); + .clone() + .unwrap_or_default(); let satisfy_parent = plan .equivalence_properties() - .ordering_satisfy_requirement(parent_reqs); + .ordering_satisfy_requirement(&parent_reqs); + if is_sort(plan) { let required_ordering = plan .output_ordering() @@ -139,7 +138,7 @@ fn pushdown_sorts_helper( for (child, order) in requirements.children.iter_mut().zip(reqs) { child.data.ordering_requirement = order; } - } else if let Some(adjusted) = pushdown_requirement_to_children(plan, parent_reqs)? { + } else if let Some(adjusted) = pushdown_requirement_to_children(plan, &parent_reqs)? { // Can not satisfy the parent requirements, check whether we can push // requirements down: for (child, order) in requirements.children.iter_mut().zip(adjusted) { @@ -162,14 +161,16 @@ fn pushdown_sorts_helper( fn pushdown_requirement_to_children( plan: &Arc, - parent_required: LexRequirementRef, + parent_required: &LexRequirement, ) -> Result>>> { let maintains_input_order = plan.maintains_input_order(); if is_window(plan) { let required_input_ordering = plan.required_input_ordering(); - let request_child = required_input_ordering[0].as_deref().unwrap_or(&[]); + let request_child = required_input_ordering[0].clone().unwrap_or_default(); let child_plan = plan.children().swap_remove(0); - match determine_children_requirement(parent_required, request_child, child_plan) { + + match determine_children_requirement(parent_required, &request_child, child_plan) + { RequirementsCompatibility::Satisfy => { let req = (!request_child.is_empty()) .then(|| LexRequirement::new(request_child.to_vec())); @@ -180,7 +181,10 @@ fn pushdown_requirement_to_children( } } else if let Some(sort_exec) = plan.as_any().downcast_ref::() { let sort_req = PhysicalSortRequirement::from_sort_exprs( - sort_exec.properties().output_ordering().unwrap_or(&[]), + sort_exec + .properties() + .output_ordering() + .unwrap_or(&LexOrdering::default()), ); if sort_exec .properties() @@ -202,7 +206,9 @@ fn pushdown_requirement_to_children( .all(|maintain| *maintain) { let output_req = PhysicalSortRequirement::from_sort_exprs( - plan.properties().output_ordering().unwrap_or(&[]), + plan.properties() + .output_ordering() + .unwrap_or(&LexOrdering::default()), ); // Push down through operator with fetch when: // - requirement is aligned with output ordering @@ -229,7 +235,11 @@ fn pushdown_requirement_to_children( let left_columns_len = smj.left().schema().fields().len(); let parent_required_expr = PhysicalSortRequirement::to_sort_exprs(parent_required.iter().cloned()); - match expr_source_side(&parent_required_expr, smj.join_type(), left_columns_len) { + match expr_source_side( + parent_required_expr.as_ref(), + smj.join_type(), + left_columns_len, + ) { Some(JoinSide::Left) => try_pushdown_requirements_to_join( smj, parent_required, @@ -275,7 +285,8 @@ fn pushdown_requirement_to_children( spm_eqs = spm_eqs.with_reorder(new_ordering); // Do not push-down through SortPreservingMergeExec when // ordering requirement invalidates requirement of sort preserving merge exec. - if !spm_eqs.ordering_satisfy(plan.output_ordering().unwrap_or_default()) { + if !spm_eqs.ordering_satisfy(&plan.output_ordering().cloned().unwrap_or_default()) + { Ok(None) } else { // Can push-down through SortPreservingMergeExec, because parent requirement is finer @@ -293,7 +304,7 @@ fn pushdown_requirement_to_children( /// Return true if pushing the sort requirements through a node would violate /// the input sorting requirements for the plan fn pushdown_would_violate_requirements( - parent_required: LexRequirementRef, + parent_required: &LexRequirement, child: &dyn ExecutionPlan, ) -> bool { child @@ -319,8 +330,8 @@ fn pushdown_would_violate_requirements( /// - If parent requirements are more specific, push down parent requirements. /// - If they are not compatible, need to add a sort. fn determine_children_requirement( - parent_required: LexRequirementRef, - request_child: LexRequirementRef, + parent_required: &LexRequirement, + request_child: &LexRequirement, child_plan: &Arc, ) -> RequirementsCompatibility { if child_plan @@ -345,8 +356,8 @@ fn determine_children_requirement( fn try_pushdown_requirements_to_join( smj: &SortMergeJoinExec, - parent_required: LexRequirementRef, - sort_expr: LexOrderingRef, + parent_required: &LexRequirement, + sort_expr: &LexOrdering, push_side: JoinSide, ) -> Result>>> { let left_eq_properties = smj.left().equivalence_properties(); @@ -354,13 +365,13 @@ fn try_pushdown_requirements_to_join( let mut smj_required_orderings = smj.required_input_ordering(); let right_requirement = smj_required_orderings.swap_remove(1); let left_requirement = smj_required_orderings.swap_remove(0); - let left_ordering = smj.left().output_ordering().unwrap_or_default(); - let right_ordering = smj.right().output_ordering().unwrap_or_default(); + let left_ordering = &smj.left().output_ordering().cloned().unwrap_or_default(); + let right_ordering = &smj.right().output_ordering().cloned().unwrap_or_default(); + let (new_left_ordering, new_right_ordering) = match push_side { JoinSide::Left => { - let left_eq_properties = left_eq_properties - .clone() - .with_reorder(LexOrdering::from_ref(sort_expr)); + let left_eq_properties = + left_eq_properties.clone().with_reorder(sort_expr.clone()); if left_eq_properties .ordering_satisfy_requirement(&left_requirement.unwrap_or_default()) { @@ -371,9 +382,8 @@ fn try_pushdown_requirements_to_join( } } JoinSide::Right => { - let right_eq_properties = right_eq_properties - .clone() - .with_reorder(LexOrdering::from_ref(sort_expr)); + let right_eq_properties = + right_eq_properties.clone().with_reorder(sort_expr.clone()); if right_eq_properties .ordering_satisfy_requirement(&right_requirement.unwrap_or_default()) { @@ -417,7 +427,7 @@ fn try_pushdown_requirements_to_join( } fn expr_source_side( - required_exprs: LexOrderingRef, + required_exprs: &LexOrdering, join_type: JoinType, left_columns_len: usize, ) -> Option { @@ -469,7 +479,7 @@ fn expr_source_side( } fn shift_right_required( - parent_required: LexRequirementRef, + parent_required: &LexRequirement, left_columns_len: usize, ) -> Result { let new_right_required = parent_required @@ -507,7 +517,7 @@ fn shift_right_required( /// pushed down, `Ok(None)` if not. On error, returns a `Result::Err`. fn handle_custom_pushdown( plan: &Arc, - parent_required: LexRequirementRef, + parent_required: &LexRequirement, maintains_input_order: Vec, ) -> Result>>> { // If there's no requirement from the parent or the plan has no children, return early diff --git a/datafusion/core/src/physical_optimizer/update_aggr_exprs.rs b/datafusion/core/src/physical_optimizer/update_aggr_exprs.rs index d85278556cc4..c9363b00e18f 100644 --- a/datafusion/core/src/physical_optimizer/update_aggr_exprs.rs +++ b/datafusion/core/src/physical_optimizer/update_aggr_exprs.rs @@ -27,6 +27,7 @@ use datafusion_physical_expr::aggregate::AggregateFunctionExpr; use datafusion_physical_expr::{ reverse_order_bys, EquivalenceProperties, PhysicalSortRequirement, }; +use datafusion_physical_expr_common::sort_expr::LexRequirement; use datafusion_physical_optimizer::PhysicalOptimizerRule; use datafusion_physical_plan::aggregates::concat_slices; use datafusion_physical_plan::windows::get_ordered_partition_by_indices; @@ -138,7 +139,7 @@ fn try_convert_aggregate_if_better( aggr_exprs .into_iter() .map(|aggr_expr| { - let aggr_sort_exprs = &aggr_expr.order_bys().unwrap_or_default(); + let aggr_sort_exprs = &aggr_expr.order_bys().cloned().unwrap_or_default(); let reverse_aggr_sort_exprs = reverse_order_bys(aggr_sort_exprs); let aggr_sort_reqs = PhysicalSortRequirement::from_sort_exprs(aggr_sort_exprs.iter()); @@ -151,14 +152,20 @@ fn try_convert_aggregate_if_better( // Otherwise, leave it as is. if aggr_expr.order_sensitivity().is_beneficial() && !aggr_sort_reqs.is_empty() { - let reqs = concat_slices(prefix_requirement, &aggr_sort_reqs); + let reqs = LexRequirement { + inner: concat_slices(prefix_requirement, &aggr_sort_reqs), + }; + + let prefix_requirement = LexRequirement { + inner: prefix_requirement.to_vec(), + }; + if eq_properties.ordering_satisfy_requirement(&reqs) { // Existing ordering satisfies the aggregator requirements: aggr_expr.with_beneficial_ordering(true)?.map(Arc::new) - } else if eq_properties.ordering_satisfy_requirement(&concat_slices( - prefix_requirement, - &reverse_aggr_req, - )) { + } else if eq_properties.ordering_satisfy_requirement(&LexRequirement { + inner: concat_slices(&prefix_requirement, &reverse_aggr_req), + }) { // Converting to reverse enables more efficient execution // given the existing ordering (if possible): aggr_expr diff --git a/datafusion/core/tests/fuzz_cases/equivalence/utils.rs b/datafusion/core/tests/fuzz_cases/equivalence/utils.rs index 35da8b596380..921332bca539 100644 --- a/datafusion/core/tests/fuzz_cases/equivalence/utils.rs +++ b/datafusion/core/tests/fuzz_cases/equivalence/utils.rs @@ -32,7 +32,7 @@ use datafusion_expr::sort_properties::{ExprProperties, SortProperties}; use datafusion_expr::{ColumnarValue, ScalarUDFImpl, Signature, Volatility}; use datafusion_physical_expr::equivalence::{EquivalenceClass, ProjectionMapping}; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; -use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexOrderingRef}; +use datafusion_physical_expr_common::sort_expr::LexOrdering; use itertools::izip; use rand::prelude::*; @@ -465,7 +465,7 @@ pub fn generate_table_for_orderings( // prune out rows that is invalid according to remaining orderings. for ordering in orderings.iter().skip(1) { - let sort_columns = get_sort_columns(&batch, ordering.as_ref())?; + let sort_columns = get_sort_columns(&batch, ordering)?; // Collect sort options and values into separate vectors. let (sort_options, sort_col_values): (Vec<_>, Vec<_>) = sort_columns @@ -530,7 +530,7 @@ fn generate_random_f64_array( // Helper function to get sort columns from a batch fn get_sort_columns( batch: &RecordBatch, - ordering: LexOrderingRef, + ordering: &LexOrdering, ) -> Result> { ordering .iter() diff --git a/datafusion/functions-aggregate-common/src/accumulator.rs b/datafusion/functions-aggregate-common/src/accumulator.rs index 67ada562800b..a230bb028909 100644 --- a/datafusion/functions-aggregate-common/src/accumulator.rs +++ b/datafusion/functions-aggregate-common/src/accumulator.rs @@ -19,7 +19,7 @@ use arrow::datatypes::{DataType, Field, Schema}; use datafusion_common::Result; use datafusion_expr_common::accumulator::Accumulator; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; -use datafusion_physical_expr_common::sort_expr::LexOrderingRef; +use datafusion_physical_expr_common::sort_expr::LexOrdering; use std::sync::Arc; /// [`AccumulatorArgs`] contains information about how an aggregate @@ -52,7 +52,7 @@ pub struct AccumulatorArgs<'a> { /// ``` /// /// If no `ORDER BY` is specified, `ordering_req` will be empty. - pub ordering_req: LexOrderingRef<'a>, + pub ordering_req: &'a LexOrdering, /// Whether the aggregation is running in reverse order pub is_reversed: bool, diff --git a/datafusion/functions-aggregate-common/src/utils.rs b/datafusion/functions-aggregate-common/src/utils.rs index f55e5ec9a41d..e440abe2de69 100644 --- a/datafusion/functions-aggregate-common/src/utils.rs +++ b/datafusion/functions-aggregate-common/src/utils.rs @@ -30,7 +30,7 @@ use arrow::{ }; use datafusion_common::{exec_err, DataFusionError, Result}; use datafusion_expr_common::accumulator::Accumulator; -use datafusion_physical_expr_common::sort_expr::LexOrderingRef; +use datafusion_physical_expr_common::sort_expr::LexOrdering; /// Convert scalar values from an accumulator into arrays. pub fn get_accum_scalar_values_as_arrays( @@ -88,7 +88,7 @@ pub fn adjust_output_array(data_type: &DataType, array: ArrayRef) -> Result Vec { @@ -107,7 +107,7 @@ pub fn ordering_fields( } /// Selects the sort option attribute from all the given `PhysicalSortExpr`s. -pub fn get_sort_options(ordering_req: LexOrderingRef) -> Vec { +pub fn get_sort_options(ordering_req: &LexOrdering) -> Vec { ordering_req.iter().map(|item| item.options).collect() } diff --git a/datafusion/functions-aggregate/benches/count.rs b/datafusion/functions-aggregate/benches/count.rs index 1c8266ed5b89..e6b62e6e1856 100644 --- a/datafusion/functions-aggregate/benches/count.rs +++ b/datafusion/functions-aggregate/benches/count.rs @@ -23,7 +23,7 @@ use criterion::{black_box, criterion_group, criterion_main, Criterion}; use datafusion_expr::{function::AccumulatorArgs, AggregateUDFImpl, GroupsAccumulator}; use datafusion_functions_aggregate::count::Count; use datafusion_physical_expr::expressions::col; -use datafusion_physical_expr_common::sort_expr::LexOrderingRef; +use datafusion_physical_expr_common::sort_expr::LexOrdering; use std::sync::Arc; fn prepare_accumulator() -> Box { @@ -32,7 +32,7 @@ fn prepare_accumulator() -> Box { return_type: &DataType::Int64, schema: &schema, ignore_nulls: false, - ordering_req: LexOrderingRef::default(), + ordering_req: &LexOrdering::default(), is_reversed: false, name: "COUNT(f)", is_distinct: false, diff --git a/datafusion/functions-aggregate/benches/sum.rs b/datafusion/functions-aggregate/benches/sum.rs index 1e9493280ed2..1c180126a313 100644 --- a/datafusion/functions-aggregate/benches/sum.rs +++ b/datafusion/functions-aggregate/benches/sum.rs @@ -23,7 +23,7 @@ use criterion::{black_box, criterion_group, criterion_main, Criterion}; use datafusion_expr::{function::AccumulatorArgs, AggregateUDFImpl, GroupsAccumulator}; use datafusion_functions_aggregate::sum::Sum; use datafusion_physical_expr::expressions::col; -use datafusion_physical_expr_common::sort_expr::LexOrderingRef; +use datafusion_physical_expr_common::sort_expr::LexOrdering; use std::sync::Arc; fn prepare_accumulator(data_type: &DataType) -> Box { @@ -32,7 +32,7 @@ fn prepare_accumulator(data_type: &DataType) -> Box { return_type: data_type, schema: &schema, ignore_nulls: false, - ordering_req: LexOrderingRef::default(), + ordering_req: &LexOrdering::default(), is_reversed: false, name: "SUM(f)", is_distinct: false, diff --git a/datafusion/functions-aggregate/src/array_agg.rs b/datafusion/functions-aggregate/src/array_agg.rs index 7c22c21e38c9..252a07cb11d8 100644 --- a/datafusion/functions-aggregate/src/array_agg.rs +++ b/datafusion/functions-aggregate/src/array_agg.rs @@ -135,7 +135,7 @@ impl AggregateUDFImpl for ArrayAgg { OrderSensitiveArrayAggAccumulator::try_new( &data_type, &ordering_dtypes, - LexOrdering::from_ref(acc_args.ordering_req), + acc_args.ordering_req.clone(), acc_args.is_reversed, ) .map(|acc| Box::new(acc) as _) diff --git a/datafusion/functions-aggregate/src/first_last.rs b/datafusion/functions-aggregate/src/first_last.rs index 0b05713499a9..3ca1422668e0 100644 --- a/datafusion/functions-aggregate/src/first_last.rs +++ b/datafusion/functions-aggregate/src/first_last.rs @@ -37,7 +37,7 @@ use datafusion_expr::{ ExprFunctionExt, Signature, SortExpr, TypeSignature, Volatility, }; use datafusion_functions_aggregate_common::utils::get_sort_options; -use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexOrderingRef}; +use datafusion_physical_expr_common::sort_expr::LexOrdering; create_func!(FirstValue, first_value_udaf); @@ -130,7 +130,7 @@ impl AggregateUDFImpl for FirstValue { FirstValueAccumulator::try_new( acc_args.return_type, &ordering_dtypes, - LexOrdering::from_ref(acc_args.ordering_req), + acc_args.ordering_req.clone(), acc_args.ignore_nulls, ) .map(|acc| Box::new(acc.with_requirement_satisfied(requirement_satisfied)) as _) @@ -455,7 +455,7 @@ impl AggregateUDFImpl for LastValue { LastValueAccumulator::try_new( acc_args.return_type, &ordering_dtypes, - LexOrdering::from_ref(acc_args.ordering_req), + acc_args.ordering_req.clone(), acc_args.ignore_nulls, ) .map(|acc| Box::new(acc.with_requirement_satisfied(requirement_satisfied)) as _) @@ -723,10 +723,7 @@ fn filter_states_according_to_is_set( } /// Combines array refs and their corresponding orderings to construct `SortColumn`s. -fn convert_to_sort_cols( - arrs: &[ArrayRef], - sort_exprs: LexOrderingRef, -) -> Vec { +fn convert_to_sort_cols(arrs: &[ArrayRef], sort_exprs: &LexOrdering) -> Vec { arrs.iter() .zip(sort_exprs.iter()) .map(|(item, sort_expr)| SortColumn { diff --git a/datafusion/functions-aggregate/src/nth_value.rs b/datafusion/functions-aggregate/src/nth_value.rs index 5f3a8cf2f161..f3e892fa73d8 100644 --- a/datafusion/functions-aggregate/src/nth_value.rs +++ b/datafusion/functions-aggregate/src/nth_value.rs @@ -133,7 +133,7 @@ impl AggregateUDFImpl for NthValueAgg { n, &data_type, &ordering_dtypes, - LexOrdering::from_ref(acc_args.ordering_req), + acc_args.ordering_req.clone(), ) .map(|acc| Box::new(acc) as _) } diff --git a/datafusion/functions-aggregate/src/stddev.rs b/datafusion/functions-aggregate/src/stddev.rs index 95269ed8217c..b785d8e9859e 100644 --- a/datafusion/functions-aggregate/src/stddev.rs +++ b/datafusion/functions-aggregate/src/stddev.rs @@ -410,7 +410,7 @@ mod tests { use datafusion_expr::AggregateUDF; use datafusion_functions_aggregate_common::utils::get_accum_scalar_values_as_arrays; use datafusion_physical_expr::expressions::col; - use datafusion_physical_expr_common::sort_expr::LexOrderingRef; + use datafusion_physical_expr_common::sort_expr::LexOrdering; use std::sync::Arc; #[test] @@ -462,7 +462,7 @@ mod tests { return_type: &DataType::Float64, schema, ignore_nulls: false, - ordering_req: LexOrderingRef::default(), + ordering_req: &LexOrdering::default(), name: "a", is_distinct: false, is_reversed: false, @@ -473,7 +473,7 @@ mod tests { return_type: &DataType::Float64, schema, ignore_nulls: false, - ordering_req: LexOrderingRef::default(), + ordering_req: &LexOrdering::default(), name: "a", is_distinct: false, is_reversed: false, diff --git a/datafusion/physical-expr-common/Cargo.toml b/datafusion/physical-expr-common/Cargo.toml index 45ccb08e52e9..ad27c9d49cf7 100644 --- a/datafusion/physical-expr-common/Cargo.toml +++ b/datafusion/physical-expr-common/Cargo.toml @@ -41,4 +41,5 @@ arrow = { workspace = true } datafusion-common = { workspace = true, default-features = true } datafusion-expr-common = { workspace = true } hashbrown = { workspace = true } +itertools = { workspace = true } rand = { workspace = true } diff --git a/datafusion/physical-expr-common/src/sort_expr.rs b/datafusion/physical-expr-common/src/sort_expr.rs index addf2fbfca0c..f91d583215b3 100644 --- a/datafusion/physical-expr-common/src/sort_expr.rs +++ b/datafusion/physical-expr-common/src/sort_expr.rs @@ -22,7 +22,7 @@ use std::fmt; use std::fmt::{Display, Formatter}; use std::hash::{Hash, Hasher}; use std::ops::{Deref, Index, Range, RangeFrom, RangeTo}; -use std::sync::Arc; +use std::sync::{Arc, OnceLock}; use std::vec::IntoIter; use arrow::compute::kernels::sort::{SortColumn, SortOptions}; @@ -30,6 +30,7 @@ use arrow::datatypes::Schema; use arrow::record_batch::RecordBatch; use datafusion_common::Result; use datafusion_expr_common::columnar_value::ColumnarValue; +use itertools::Itertools; /// Represents Sort operation for a column in a RecordBatch /// @@ -218,7 +219,7 @@ impl From for PhysicalSortExpr { /// If options is `None`, the default sort options `ASC, NULLS LAST` is used. /// /// The default is picked to be consistent with - /// PostgreSQL: + /// PostgreSQL: fn from(value: PhysicalSortRequirement) -> Self { let options = value.options.unwrap_or(SortOptions { descending: false, @@ -309,13 +310,8 @@ impl PhysicalSortRequirement { pub fn from_sort_exprs<'a>( ordering: impl IntoIterator, ) -> LexRequirement { - LexRequirement::new( - ordering - .into_iter() - .cloned() - .map(PhysicalSortRequirement::from) - .collect(), - ) + let ordering = ordering.into_iter().cloned().collect(); + LexRequirement::from_lex_ordering(ordering) } /// Converts an iterator of [`PhysicalSortRequirement`] into a Vec @@ -327,10 +323,8 @@ impl PhysicalSortRequirement { pub fn to_sort_exprs( requirements: impl IntoIterator, ) -> LexOrdering { - requirements - .into_iter() - .map(PhysicalSortExpr::from) - .collect() + let requirements = requirements.into_iter().collect(); + LexOrdering::from_lex_requirement(requirements) } } @@ -352,14 +346,23 @@ pub struct LexOrdering { pub inner: Vec, } +impl AsRef for LexOrdering { + fn as_ref(&self) -> &LexOrdering { + self + } +} + +static EMPTY_ORDER: OnceLock = OnceLock::new(); + impl LexOrdering { // Creates a new [`LexOrdering`] from a vector pub fn new(inner: Vec) -> Self { Self { inner } } - pub fn as_ref(&self) -> LexOrderingRef { - &self.inner + /// Return an empty LexOrdering (no expressions) + pub fn empty() -> &'static LexOrdering { + EMPTY_ORDER.get_or_init(LexOrdering::default) } pub fn capacity(&self) -> usize { @@ -378,10 +381,6 @@ impl LexOrdering { self.inner.extend(iter) } - pub fn from_ref(lex_ordering_ref: LexOrderingRef) -> Self { - Self::new(lex_ordering_ref.to_vec()) - } - pub fn is_empty(&self) -> bool { self.inner.is_empty() } @@ -409,6 +408,36 @@ impl LexOrdering { pub fn truncate(&mut self, len: usize) { self.inner.truncate(len) } + + /// Merge the contents of `other` into `self`, removing duplicates. + pub fn merge(mut self, other: LexOrdering) -> Self { + self.inner = self.inner.into_iter().chain(other).unique().collect(); + self + } + + /// Converts a `LexRequirement` into a `LexOrdering`. + /// + /// This function converts `PhysicalSortRequirement` to `PhysicalSortExpr` + /// for each entry in the input. If required ordering is None for an entry + /// default ordering `ASC, NULLS LAST` if given (see the `PhysicalSortExpr::from`). + pub fn from_lex_requirement(requirements: LexRequirement) -> LexOrdering { + requirements + .into_iter() + .map(PhysicalSortExpr::from) + .collect() + } +} + +impl From> for LexOrdering { + fn from(value: Vec) -> Self { + Self::new(value) + } +} + +impl From for LexOrdering { + fn from(value: LexRequirement) -> Self { + Self::from_lex_requirement(value) + } } impl Deref for LexOrdering { @@ -489,6 +518,7 @@ impl IntoIterator for LexOrdering { ///`LexOrderingRef` is an alias for the type &`[PhysicalSortExpr]`, which represents /// a reference to a lexicographical ordering. +#[deprecated(since = "43.0.0", note = "use &LexOrdering instead")] pub type LexOrderingRef<'a> = &'a [PhysicalSortExpr]; ///`LexRequirement` is an struct containing a `Vec`, which @@ -514,6 +544,30 @@ impl LexRequirement { pub fn push(&mut self, physical_sort_requirement: PhysicalSortRequirement) { self.inner.push(physical_sort_requirement) } + + /// Create a new [`LexRequirement`] from a vector of [`PhysicalSortExpr`]s. + /// + /// Returns [`PhysicalSortRequirement`] that requires the exact + /// sort of the [`PhysicalSortExpr`]s in `ordering` + /// + /// This method takes `&'a PhysicalSortExpr` to make it easy to + /// use implementing [`ExecutionPlan::required_input_ordering`]. + /// + /// [`ExecutionPlan::required_input_ordering`]: https://docs.rs/datafusion/latest/datafusion/physical_plan/trait.ExecutionPlan.html#method.required_input_ordering + pub fn from_lex_ordering(ordering: LexOrdering) -> Self { + Self::new( + ordering + .into_iter() + .map(PhysicalSortRequirement::from) + .collect(), + ) + } +} + +impl From for LexRequirement { + fn from(value: LexOrdering) -> Self { + Self::from_lex_ordering(value) + } } impl Deref for LexRequirement { @@ -545,6 +599,16 @@ impl IntoIterator for LexRequirement { } } +impl<'a> IntoIterator for &'a LexOrdering { + type Item = &'a PhysicalSortExpr; + type IntoIter = std::slice::Iter<'a, PhysicalSortExpr>; + + fn into_iter(self) -> Self::IntoIter { + self.inner.iter() + } +} + ///`LexRequirementRef` is an alias for the type &`[PhysicalSortRequirement]`, which /// represents a reference to a lexicographical ordering requirement. +/// #[deprecated(since = "43.0.0", note = "use &LexRequirement instead")] pub type LexRequirementRef<'a> = &'a [PhysicalSortRequirement]; diff --git a/datafusion/physical-expr-common/src/utils.rs b/datafusion/physical-expr-common/src/utils.rs index 26293b1a76a2..ffdab6c6d385 100644 --- a/datafusion/physical-expr-common/src/utils.rs +++ b/datafusion/physical-expr-common/src/utils.rs @@ -24,7 +24,7 @@ use datafusion_common::Result; use datafusion_expr_common::sort_properties::ExprProperties; use crate::physical_expr::PhysicalExpr; -use crate::sort_expr::{LexOrdering, LexOrderingRef, PhysicalSortExpr}; +use crate::sort_expr::{LexOrdering, PhysicalSortExpr}; use crate::tree_node::ExprContext; /// Represents a [`PhysicalExpr`] node with associated properties (order and @@ -96,7 +96,7 @@ pub fn scatter(mask: &BooleanArray, truthy: &dyn Array) -> Result { /// Reverses the ORDER BY expression, which is useful during equivalent window /// expression construction. For instance, 'ORDER BY a ASC, NULLS LAST' turns into /// 'ORDER BY a DESC, NULLS FIRST'. -pub fn reverse_order_bys(order_bys: LexOrderingRef) -> LexOrdering { +pub fn reverse_order_bys(order_bys: &LexOrdering) -> LexOrdering { order_bys .iter() .map(|e| PhysicalSortExpr::new(e.expr.clone(), !e.options)) diff --git a/datafusion/physical-expr/src/aggregate.rs b/datafusion/physical-expr/src/aggregate.rs index e446776affc0..5dc138933430 100644 --- a/datafusion/physical-expr/src/aggregate.rs +++ b/datafusion/physical-expr/src/aggregate.rs @@ -45,7 +45,7 @@ use datafusion_functions_aggregate_common::accumulator::AccumulatorArgs; use datafusion_functions_aggregate_common::accumulator::StateFieldsArgs; use datafusion_functions_aggregate_common::order::AggregateOrderSensitivity; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; -use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexOrderingRef}; +use datafusion_physical_expr_common::sort_expr::LexOrdering; use datafusion_physical_expr_common::utils::reverse_order_bys; use datafusion_expr_common::groups_accumulator::GroupsAccumulator; @@ -292,7 +292,7 @@ impl AggregateFunctionExpr { /// Order by requirements for the aggregate function /// By default it is `None` (there is no requirement) /// Order-sensitive aggregators, such as `FIRST_VALUE(x ORDER BY y)` should implement this - pub fn order_bys(&self) -> Option { + pub fn order_bys(&self) -> Option<&LexOrdering> { if self.ordering_req.is_empty() { return None; } @@ -490,7 +490,10 @@ impl AggregateFunctionExpr { /// These expressions are (1)function arguments, (2) order by expressions. pub fn all_expressions(&self) -> AggregatePhysicalExpressions { let args = self.expressions(); - let order_bys = self.order_bys().unwrap_or_default(); + let order_bys = self + .order_bys() + .cloned() + .unwrap_or_else(LexOrdering::default); let order_by_exprs = order_bys .iter() .map(|sort_expr| Arc::clone(&sort_expr.expr)) diff --git a/datafusion/physical-expr/src/equivalence/class.rs b/datafusion/physical-expr/src/equivalence/class.rs index 7305bc1b0a2b..35ff6f685b53 100644 --- a/datafusion/physical-expr/src/equivalence/class.rs +++ b/datafusion/physical-expr/src/equivalence/class.rs @@ -21,9 +21,8 @@ use std::sync::Arc; use super::{add_offset_to_expr, collapse_lex_req, ProjectionMapping}; use crate::{ expressions::Column, physical_expr::deduplicate_physical_exprs, - physical_exprs_bag_equal, physical_exprs_contains, LexOrdering, LexOrderingRef, - LexRequirement, LexRequirementRef, PhysicalExpr, PhysicalExprRef, PhysicalSortExpr, - PhysicalSortRequirement, + physical_exprs_bag_equal, physical_exprs_contains, LexOrdering, LexRequirement, + PhysicalExpr, PhysicalExprRef, PhysicalSortExpr, PhysicalSortRequirement, }; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; @@ -475,7 +474,7 @@ impl EquivalenceGroup { /// This function applies the `normalize_sort_expr` function for all sort /// expressions in `sort_exprs` and returns the corresponding normalized /// sort expressions. - pub fn normalize_sort_exprs(&self, sort_exprs: LexOrderingRef) -> LexOrdering { + pub fn normalize_sort_exprs(&self, sort_exprs: &LexOrdering) -> LexOrdering { // Convert sort expressions to sort requirements: let sort_reqs = PhysicalSortRequirement::from_sort_exprs(sort_exprs.iter()); // Normalize the requirements: @@ -489,7 +488,7 @@ impl EquivalenceGroup { /// sort requirements. pub fn normalize_sort_requirements( &self, - sort_reqs: LexRequirementRef, + sort_reqs: &LexRequirement, ) -> LexRequirement { collapse_lex_req(LexRequirement::new( sort_reqs diff --git a/datafusion/physical-expr/src/equivalence/ordering.rs b/datafusion/physical-expr/src/equivalence/ordering.rs index 838c9800f942..06f85b657e09 100644 --- a/datafusion/physical-expr/src/equivalence/ordering.rs +++ b/datafusion/physical-expr/src/equivalence/ordering.rs @@ -146,12 +146,7 @@ impl OrderingEquivalenceClass { /// Returns the concatenation of all the orderings. This enables merge /// operations to preserve all equivalent orderings simultaneously. pub fn output_ordering(&self) -> Option { - let output_ordering = self - .orderings - .iter() - .flat_map(|ordering| ordering.as_ref()) - .cloned() - .collect(); + let output_ordering = self.orderings.iter().flatten().cloned().collect(); let output_ordering = collapse_lex_ordering(output_ordering); (!output_ordering.is_empty()).then_some(output_ordering) } diff --git a/datafusion/physical-expr/src/equivalence/properties.rs b/datafusion/physical-expr/src/equivalence/properties.rs index 55c99e93d040..061e77222f29 100644 --- a/datafusion/physical-expr/src/equivalence/properties.rs +++ b/datafusion/physical-expr/src/equivalence/properties.rs @@ -30,9 +30,8 @@ use crate::equivalence::{ }; use crate::expressions::{with_new_schema, CastExpr, Column, Literal}; use crate::{ - physical_exprs_contains, ConstExpr, LexOrdering, LexOrderingRef, LexRequirement, - LexRequirementRef, PhysicalExpr, PhysicalExprRef, PhysicalSortExpr, - PhysicalSortRequirement, + physical_exprs_contains, ConstExpr, LexOrdering, LexRequirement, PhysicalExpr, + PhysicalExprRef, PhysicalSortExpr, PhysicalSortRequirement, }; use arrow_schema::{SchemaRef, SortOptions}; @@ -197,7 +196,7 @@ impl EquivalenceProperties { OrderingEquivalenceClass::new( self.oeq_class .iter() - .map(|ordering| self.normalize_sort_exprs(ordering.as_ref())) + .map(|ordering| self.normalize_sort_exprs(ordering)) .collect(), ) } @@ -408,7 +407,7 @@ impl EquivalenceProperties { /// function would return `vec![a ASC, c ASC]`. Internally, it would first /// normalize to `vec![a ASC, c ASC, a ASC]` and end up with the final result /// after deduplication. - fn normalize_sort_exprs(&self, sort_exprs: LexOrderingRef) -> LexOrdering { + fn normalize_sort_exprs(&self, sort_exprs: &LexOrdering) -> LexOrdering { // Convert sort expressions to sort requirements: let sort_reqs = PhysicalSortRequirement::from_sort_exprs(sort_exprs.iter()); // Normalize the requirements: @@ -430,10 +429,7 @@ impl EquivalenceProperties { /// function would return `vec![a ASC, c ASC]`. Internally, it would first /// normalize to `vec![a ASC, c ASC, a ASC]` and end up with the final result /// after deduplication. - fn normalize_sort_requirements( - &self, - sort_reqs: LexRequirementRef, - ) -> LexRequirement { + fn normalize_sort_requirements(&self, sort_reqs: &LexRequirement) -> LexRequirement { let normalized_sort_reqs = self.eq_group.normalize_sort_requirements(sort_reqs); let mut constant_exprs = vec![]; constant_exprs.extend( @@ -456,7 +452,7 @@ impl EquivalenceProperties { /// Checks whether the given ordering is satisfied by any of the existing /// orderings. - pub fn ordering_satisfy(&self, given: LexOrderingRef) -> bool { + pub fn ordering_satisfy(&self, given: &LexOrdering) -> bool { // Convert the given sort expressions to sort requirements: let sort_requirements = PhysicalSortRequirement::from_sort_exprs(given.iter()); self.ordering_satisfy_requirement(&sort_requirements) @@ -464,7 +460,7 @@ impl EquivalenceProperties { /// Checks whether the given sort requirements are satisfied by any of the /// existing orderings. - pub fn ordering_satisfy_requirement(&self, reqs: LexRequirementRef) -> bool { + pub fn ordering_satisfy_requirement(&self, reqs: &LexRequirement) -> bool { let mut eq_properties = self.clone(); // First, standardize the given requirement: let normalized_reqs = eq_properties.normalize_sort_requirements(reqs); @@ -525,8 +521,8 @@ impl EquivalenceProperties { /// than the `reference` sort requirements. pub fn requirements_compatible( &self, - given: LexRequirementRef, - reference: LexRequirementRef, + given: &LexRequirement, + reference: &LexRequirement, ) -> bool { let normalized_given = self.normalize_sort_requirements(given); let normalized_reference = self.normalize_sort_requirements(reference); @@ -548,8 +544,8 @@ impl EquivalenceProperties { /// the latter. pub fn get_finer_ordering( &self, - lhs: LexOrderingRef, - rhs: LexOrderingRef, + lhs: &LexOrdering, + rhs: &LexOrdering, ) -> Option { // Convert the given sort expressions to sort requirements: let lhs = PhysicalSortRequirement::from_sort_exprs(lhs); @@ -569,8 +565,8 @@ impl EquivalenceProperties { /// is the latter. pub fn get_finer_requirement( &self, - req1: LexRequirementRef, - req2: LexRequirementRef, + req1: &LexRequirement, + req2: &LexRequirement, ) -> Option { let mut lhs = self.normalize_sort_requirements(req1); let mut rhs = self.normalize_sort_requirements(req2); @@ -606,7 +602,7 @@ impl EquivalenceProperties { pub fn substitute_ordering_component( &self, mapping: &ProjectionMapping, - sort_expr: LexOrderingRef, + sort_expr: &LexOrdering, ) -> Result> { let new_orderings = sort_expr .iter() @@ -656,7 +652,7 @@ impl EquivalenceProperties { let orderings = &self.oeq_class.orderings; let new_order = orderings .iter() - .map(|order| self.substitute_ordering_component(mapping, order.as_ref())) + .map(|order| self.substitute_ordering_component(mapping, order)) .collect::>>()?; let new_order = new_order.into_iter().flatten().collect(); self.oeq_class = OrderingEquivalenceClass::new(new_order); diff --git a/datafusion/physical-expr/src/lib.rs b/datafusion/physical-expr/src/lib.rs index e7c2b4119c5a..405b6bbd69f4 100644 --- a/datafusion/physical-expr/src/lib.rs +++ b/datafusion/physical-expr/src/lib.rs @@ -54,8 +54,7 @@ pub use physical_expr::{ pub use datafusion_physical_expr_common::physical_expr::PhysicalExpr; pub use datafusion_physical_expr_common::sort_expr::{ - LexOrdering, LexOrderingRef, LexRequirement, LexRequirementRef, PhysicalSortExpr, - PhysicalSortRequirement, + LexOrdering, LexRequirement, PhysicalSortExpr, PhysicalSortRequirement, }; pub use planner::{create_physical_expr, create_physical_exprs}; diff --git a/datafusion/physical-expr/src/utils/mod.rs b/datafusion/physical-expr/src/utils/mod.rs index 73d744b4b614..30cfecf0e235 100644 --- a/datafusion/physical-expr/src/utils/mod.rs +++ b/datafusion/physical-expr/src/utils/mod.rs @@ -33,7 +33,7 @@ use datafusion_common::tree_node::{ use datafusion_common::{HashMap, HashSet, Result}; use datafusion_expr::Operator; -use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexOrderingRef}; +use datafusion_physical_expr_common::sort_expr::LexOrdering; use itertools::Itertools; use petgraph::graph::NodeIndex; use petgraph::stable_graph::StableGraph; @@ -244,7 +244,7 @@ pub fn reassign_predicate_columns( } /// Merge left and right sort expressions, checking for duplicates. -pub fn merge_vectors(left: LexOrderingRef, right: LexOrderingRef) -> LexOrdering { +pub fn merge_vectors(left: &LexOrdering, right: &LexOrdering) -> LexOrdering { left.iter() .cloned() .chain(right.iter().cloned()) diff --git a/datafusion/physical-expr/src/window/aggregate.rs b/datafusion/physical-expr/src/window/aggregate.rs index 94960c95e4bb..0c56bdc92985 100644 --- a/datafusion/physical-expr/src/window/aggregate.rs +++ b/datafusion/physical-expr/src/window/aggregate.rs @@ -34,7 +34,7 @@ use crate::{reverse_order_bys, PhysicalExpr}; use datafusion_common::ScalarValue; use datafusion_common::{DataFusionError, Result}; use datafusion_expr::{Accumulator, WindowFrame}; -use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexOrderingRef}; +use datafusion_physical_expr_common::sort_expr::LexOrdering; /// A window expr that takes the form of an aggregate function. /// @@ -52,13 +52,13 @@ impl PlainAggregateWindowExpr { pub fn new( aggregate: Arc, partition_by: &[Arc], - order_by: LexOrderingRef, + order_by: &LexOrdering, window_frame: Arc, ) -> Self { Self { aggregate, partition_by: partition_by.to_vec(), - order_by: LexOrdering::from_ref(order_by), + order_by: order_by.clone(), window_frame, } } @@ -124,7 +124,7 @@ impl WindowExpr for PlainAggregateWindowExpr { &self.partition_by } - fn order_by(&self) -> LexOrderingRef { + fn order_by(&self) -> &LexOrdering { self.order_by.as_ref() } diff --git a/datafusion/physical-expr/src/window/built_in.rs b/datafusion/physical-expr/src/window/built_in.rs index 5f6c5e5c2c1b..0f6c3f921892 100644 --- a/datafusion/physical-expr/src/window/built_in.rs +++ b/datafusion/physical-expr/src/window/built_in.rs @@ -33,7 +33,7 @@ use datafusion_common::utils::evaluate_partition_ranges; use datafusion_common::{Result, ScalarValue}; use datafusion_expr::window_state::{WindowAggState, WindowFrameContext}; use datafusion_expr::WindowFrame; -use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexOrderingRef}; +use datafusion_physical_expr_common::sort_expr::LexOrdering; /// A window expr that takes the form of a [`BuiltInWindowFunctionExpr`]. #[derive(Debug)] @@ -49,13 +49,13 @@ impl BuiltInWindowExpr { pub fn new( expr: Arc, partition_by: &[Arc], - order_by: LexOrderingRef, + order_by: &LexOrdering, window_frame: Arc, ) -> Self { Self { expr, partition_by: partition_by.to_vec(), - order_by: LexOrdering::from_ref(order_by), + order_by: order_by.clone(), window_frame, } } @@ -118,7 +118,7 @@ impl WindowExpr for BuiltInWindowExpr { &self.partition_by } - fn order_by(&self) -> LexOrderingRef { + fn order_by(&self) -> &LexOrdering { self.order_by.as_ref() } diff --git a/datafusion/physical-expr/src/window/sliding_aggregate.rs b/datafusion/physical-expr/src/window/sliding_aggregate.rs index 1e46baae7b0a..572eb8866a44 100644 --- a/datafusion/physical-expr/src/window/sliding_aggregate.rs +++ b/datafusion/physical-expr/src/window/sliding_aggregate.rs @@ -33,7 +33,7 @@ use crate::window::{ use crate::{expressions::PhysicalSortExpr, reverse_order_bys, PhysicalExpr}; use datafusion_common::{Result, ScalarValue}; use datafusion_expr::{Accumulator, WindowFrame}; -use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexOrderingRef}; +use datafusion_physical_expr_common::sort_expr::LexOrdering; /// A window expr that takes the form of an aggregate function that /// can be incrementally computed over sliding windows. @@ -52,13 +52,13 @@ impl SlidingAggregateWindowExpr { pub fn new( aggregate: Arc, partition_by: &[Arc], - order_by: LexOrderingRef, + order_by: &LexOrdering, window_frame: Arc, ) -> Self { Self { aggregate, partition_by: partition_by.to_vec(), - order_by: LexOrdering::from_ref(order_by), + order_by: order_by.clone(), window_frame, } } @@ -108,7 +108,7 @@ impl WindowExpr for SlidingAggregateWindowExpr { &self.partition_by } - fn order_by(&self) -> LexOrderingRef { + fn order_by(&self) -> &LexOrdering { self.order_by.as_ref() } diff --git a/datafusion/physical-expr/src/window/window_expr.rs b/datafusion/physical-expr/src/window/window_expr.rs index 0f882def4433..828e5ad20625 100644 --- a/datafusion/physical-expr/src/window/window_expr.rs +++ b/datafusion/physical-expr/src/window/window_expr.rs @@ -20,7 +20,7 @@ use std::fmt::Debug; use std::ops::Range; use std::sync::Arc; -use crate::{LexOrderingRef, PhysicalExpr}; +use crate::{LexOrdering, PhysicalExpr}; use arrow::array::{new_empty_array, Array, ArrayRef}; use arrow::compute::kernels::sort::SortColumn; @@ -109,7 +109,7 @@ pub trait WindowExpr: Send + Sync + Debug { fn partition_by(&self) -> &[Arc]; /// Expressions that's from the window function's order by clause, empty if absent - fn order_by(&self) -> LexOrderingRef; + fn order_by(&self) -> &LexOrdering; /// Get order by columns, empty if absent fn order_by_columns(&self, batch: &RecordBatch) -> Result> { @@ -344,7 +344,7 @@ pub(crate) fn is_end_bound_safe( window_frame_ctx: &WindowFrameContext, order_bys: &[ArrayRef], most_recent_order_bys: Option<&[ArrayRef]>, - sort_exprs: LexOrderingRef, + sort_exprs: &LexOrdering, idx: usize, ) -> Result { if sort_exprs.is_empty() { diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index 5ffe797c5c26..a71039b5733b 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -940,7 +940,7 @@ fn get_aggregate_expr_req( return LexOrdering::default(); } - let mut req = LexOrdering::from_ref(aggr_expr.order_bys().unwrap_or_default()); + let mut req = aggr_expr.order_bys().cloned().unwrap_or_default(); // In non-first stage modes, we accumulate data (using `merge_batch`) from // different partitions (i.e. merge partial results). During this merge, we @@ -983,7 +983,7 @@ fn finer_ordering( agg_mode: &AggregateMode, ) -> Option { let aggr_req = get_aggregate_expr_req(aggr_expr, group_by, agg_mode); - eq_properties.get_finer_ordering(existing_req.as_ref(), aggr_req.as_ref()) + eq_properties.get_finer_ordering(existing_req, aggr_req.as_ref()) } /// Concatenates the given slices. diff --git a/datafusion/physical-plan/src/aggregates/order/mod.rs b/datafusion/physical-plan/src/aggregates/order/mod.rs index 24846d239591..7d9a50e20ae0 100644 --- a/datafusion/physical-plan/src/aggregates/order/mod.rs +++ b/datafusion/physical-plan/src/aggregates/order/mod.rs @@ -19,7 +19,7 @@ use arrow_array::ArrayRef; use arrow_schema::Schema; use datafusion_common::Result; use datafusion_expr::EmitTo; -use datafusion_physical_expr_common::sort_expr::LexOrderingRef; +use datafusion_physical_expr_common::sort_expr::LexOrdering; use std::mem::size_of; mod full; @@ -45,7 +45,7 @@ impl GroupOrdering { pub fn try_new( input_schema: &Schema, mode: &InputOrderMode, - ordering: LexOrderingRef, + ordering: &LexOrdering, ) -> Result { match mode { InputOrderMode::Linear => Ok(GroupOrdering::None), diff --git a/datafusion/physical-plan/src/aggregates/order/partial.rs b/datafusion/physical-plan/src/aggregates/order/partial.rs index 5cc55dc0d028..5a05b88798ef 100644 --- a/datafusion/physical-plan/src/aggregates/order/partial.rs +++ b/datafusion/physical-plan/src/aggregates/order/partial.rs @@ -21,7 +21,7 @@ use arrow_schema::Schema; use datafusion_common::Result; use datafusion_execution::memory_pool::proxy::VecAllocExt; use datafusion_expr::EmitTo; -use datafusion_physical_expr_common::sort_expr::LexOrderingRef; +use datafusion_physical_expr_common::sort_expr::LexOrdering; use std::mem::size_of; use std::sync::Arc; @@ -107,7 +107,7 @@ impl GroupOrderingPartial { pub fn try_new( input_schema: &Schema, order_indices: &[usize], - ordering: LexOrderingRef, + ordering: &LexOrdering, ) -> Result { assert!(!order_indices.is_empty()); assert!(order_indices.len() <= ordering.len()); diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index d65320dbab68..7220e7594ea6 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -38,7 +38,7 @@ pub use datafusion_physical_expr::{ expressions, udf, Distribution, Partitioning, PhysicalExpr, }; use datafusion_physical_expr::{EquivalenceProperties, LexOrdering}; -use datafusion_physical_expr_common::sort_expr::{LexOrderingRef, LexRequirement}; +use datafusion_physical_expr_common::sort_expr::LexRequirement; use crate::coalesce_partitions::CoalescePartitionsExec; use crate::display::DisplayableExecutionPlan; @@ -443,7 +443,7 @@ pub trait ExecutionPlanProperties { /// For example, `SortExec` (obviously) produces sorted output as does /// `SortPreservingMergeStream`. Less obviously, `Projection` produces sorted /// output if its input is sorted as it does not reorder the input rows. - fn output_ordering(&self) -> Option; + fn output_ordering(&self) -> Option<&LexOrdering>; /// Get the [`EquivalenceProperties`] within the plan. /// @@ -474,7 +474,7 @@ impl ExecutionPlanProperties for Arc { self.properties().execution_mode() } - fn output_ordering(&self) -> Option { + fn output_ordering(&self) -> Option<&LexOrdering> { self.properties().output_ordering() } @@ -492,7 +492,7 @@ impl ExecutionPlanProperties for &dyn ExecutionPlan { self.properties().execution_mode() } - fn output_ordering(&self) -> Option { + fn output_ordering(&self) -> Option<&LexOrdering> { self.properties().output_ordering() } @@ -643,8 +643,8 @@ impl PlanProperties { &self.partitioning } - pub fn output_ordering(&self) -> Option { - self.output_ordering.as_deref() + pub fn output_ordering(&self) -> Option<&LexOrdering> { + self.output_ordering.as_ref() } pub fn execution_mode(&self) -> ExecutionMode { diff --git a/datafusion/physical-plan/src/joins/stream_join_utils.rs b/datafusion/physical-plan/src/joins/stream_join_utils.rs index f08ce0ea2f0f..cea04ccad3fc 100644 --- a/datafusion/physical-plan/src/joins/stream_join_utils.rs +++ b/datafusion/physical-plan/src/joins/stream_join_utils.rs @@ -40,7 +40,7 @@ use datafusion_physical_expr::intervals::cp_solver::ExprIntervalGraph; use datafusion_physical_expr::utils::collect_columns; use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr}; -use datafusion_physical_expr_common::sort_expr::LexOrderingRef; +use datafusion_physical_expr_common::sort_expr::LexOrdering; use hashbrown::raw::RawTable; /// Implementation of `JoinHashMapType` for `PruningJoinHashMap`. @@ -744,8 +744,8 @@ pub fn prepare_sorted_exprs( filter: &JoinFilter, left: &Arc, right: &Arc, - left_sort_exprs: LexOrderingRef, - right_sort_exprs: LexOrderingRef, + left_sort_exprs: &LexOrdering, + right_sort_exprs: &LexOrdering, ) -> Result<(SortedFilterExpr, SortedFilterExpr, ExprIntervalGraph)> { let err = || { datafusion_common::plan_datafusion_err!("Filter does not include the child order") diff --git a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs index f082bdbdd3f9..84c9f03b07be 100644 --- a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs +++ b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs @@ -73,9 +73,7 @@ use datafusion_physical_expr::intervals::cp_solver::ExprIntervalGraph; use datafusion_physical_expr::{PhysicalExprRef, PhysicalSortRequirement}; use ahash::RandomState; -use datafusion_physical_expr_common::sort_expr::{ - LexOrdering, LexOrderingRef, LexRequirement, -}; +use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement}; use futures::{ready, Stream, StreamExt}; use parking_lot::Mutex; @@ -319,13 +317,13 @@ impl SymmetricHashJoinExec { } /// Get left_sort_exprs - pub fn left_sort_exprs(&self) -> Option { - self.left_sort_exprs.as_deref() + pub fn left_sort_exprs(&self) -> Option<&LexOrdering> { + self.left_sort_exprs.as_ref() } /// Get right_sort_exprs - pub fn right_sort_exprs(&self) -> Option { - self.right_sort_exprs.as_deref() + pub fn right_sort_exprs(&self) -> Option<&LexOrdering> { + self.right_sort_exprs.as_ref() } /// Check if order information covers every column in the filter expression. diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index a257119a8b7c..0366c9fa5e46 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -51,7 +51,7 @@ use datafusion_physical_expr::equivalence::add_offset_to_expr; use datafusion_physical_expr::expressions::Column; use datafusion_physical_expr::utils::{collect_columns, merge_vectors}; use datafusion_physical_expr::{ - LexOrdering, LexOrderingRef, PhysicalExpr, PhysicalExprRef, PhysicalSortExpr, + LexOrdering, PhysicalExpr, PhysicalExprRef, PhysicalSortExpr, }; use futures::future::{BoxFuture, Shared}; @@ -469,7 +469,7 @@ fn replace_on_columns_of_right_ordering( } fn offset_ordering( - ordering: LexOrderingRef, + ordering: &LexOrdering, join_type: &JoinType, offset: usize, ) -> LexOrdering { @@ -483,14 +483,14 @@ fn offset_ordering( options: sort_expr.options, }) .collect(), - _ => LexOrdering::from_ref(ordering), + _ => ordering.clone(), } } /// Calculate the output ordering of a given join operation. pub fn calculate_join_output_ordering( - left_ordering: LexOrderingRef, - right_ordering: LexOrderingRef, + left_ordering: &LexOrdering, + right_ordering: &LexOrdering, join_type: JoinType, on_columns: &[(PhysicalExprRef, PhysicalExprRef)], left_columns_len: usize, @@ -503,7 +503,7 @@ pub fn calculate_join_output_ordering( if join_type == JoinType::Inner && probe_side == Some(JoinSide::Left) { replace_on_columns_of_right_ordering( on_columns, - &mut LexOrdering::from_ref(right_ordering), + &mut right_ordering.clone(), ) .ok()?; merge_vectors( @@ -512,7 +512,7 @@ pub fn calculate_join_output_ordering( .as_ref(), ) } else { - LexOrdering::from_ref(left_ordering) + left_ordering.clone() } } [false, true] => { @@ -520,7 +520,7 @@ pub fn calculate_join_output_ordering( if join_type == JoinType::Inner && probe_side == Some(JoinSide::Right) { replace_on_columns_of_right_ordering( on_columns, - &mut LexOrdering::from_ref(right_ordering), + &mut right_ordering.clone(), ) .ok()?; merge_vectors( diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index 4d0dbc75d40a..1730c7d8dc61 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -29,6 +29,7 @@ use super::metrics::{self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet}; use super::{ DisplayAs, ExecutionPlanProperties, RecordBatchStream, SendableRecordBatchStream, }; +use crate::execution_plan::CardinalityEffect; use crate::hash_utils::create_hashes; use crate::metrics::BaselineMetrics; use crate::repartition::distributor_channels::{ @@ -48,10 +49,9 @@ use datafusion_common_runtime::SpawnedTask; use datafusion_execution::memory_pool::MemoryConsumer; use datafusion_execution::TaskContext; use datafusion_physical_expr::{EquivalenceProperties, PhysicalExpr}; +use datafusion_physical_expr_common::sort_expr::LexOrdering; -use crate::execution_plan::CardinalityEffect; use datafusion_common::HashMap; -use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr}; use futures::stream::Stream; use futures::{FutureExt, StreamExt, TryStreamExt}; use log::trace; @@ -503,7 +503,7 @@ impl DisplayAs for RepartitionExec { } if let Some(sort_exprs) = self.sort_exprs() { - write!(f, ", sort_exprs={}", LexOrdering::from_ref(sort_exprs))?; + write!(f, ", sort_exprs={}", sort_exprs.clone())?; } Ok(()) } @@ -572,7 +572,7 @@ impl ExecutionPlan for RepartitionExec { let schema_captured = Arc::clone(&schema); // Get existing ordering to use for merging - let sort_exprs = self.sort_exprs().unwrap_or(&[]).to_owned(); + let sort_exprs = self.sort_exprs().cloned().unwrap_or_default(); let stream = futures::stream::once(async move { let num_input_partitions = input.output_partitioning().partition_count(); @@ -756,7 +756,7 @@ impl RepartitionExec { } /// Return the sort expressions that are used to merge - fn sort_exprs(&self) -> Option<&[PhysicalSortExpr]> { + fn sort_exprs(&self) -> Option<&LexOrdering> { if self.preserve_order { self.input.output_ordering() } else { diff --git a/datafusion/physical-plan/src/sorts/partial_sort.rs b/datafusion/physical-plan/src/sorts/partial_sort.rs index 8f853464c9bd..e69989c1be91 100644 --- a/datafusion/physical-plan/src/sorts/partial_sort.rs +++ b/datafusion/physical-plan/src/sorts/partial_sort.rs @@ -72,7 +72,6 @@ use datafusion_common::Result; use datafusion_execution::{RecordBatchStream, TaskContext}; use datafusion_physical_expr::LexOrdering; -use datafusion_physical_expr_common::sort_expr::LexOrderingRef; use futures::{ready, Stream, StreamExt}; use log::trace; @@ -159,7 +158,7 @@ impl PartialSortExec { } /// Sort expressions - pub fn expr(&self) -> LexOrderingRef { + pub fn expr(&self) -> &LexOrdering { self.expr.as_ref() } diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index d90d0f64ceb4..ce7efce41577 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -52,9 +52,7 @@ use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation}; use datafusion_execution::runtime_env::RuntimeEnv; use datafusion_execution::TaskContext; use datafusion_physical_expr::LexOrdering; -use datafusion_physical_expr_common::sort_expr::{ - LexOrderingRef, PhysicalSortRequirement, -}; +use datafusion_physical_expr_common::sort_expr::LexRequirement; use crate::execution_plan::CardinalityEffect; use futures::{StreamExt, TryStreamExt}; @@ -344,10 +342,12 @@ impl ExternalSorter { streams.push(stream); } + let expressions: LexOrdering = self.expr.iter().cloned().collect(); + StreamingMergeBuilder::new() .with_streams(streams) .with_schema(Arc::clone(&self.schema)) - .with_expressions(self.expr.to_vec().as_slice()) + .with_expressions(expressions.as_ref()) .with_metrics(self.metrics.baseline.clone()) .with_batch_size(self.batch_size) .with_fetch(self.fetch) @@ -536,10 +536,12 @@ impl ExternalSorter { }) .collect::>()?; + let expressions: LexOrdering = self.expr.iter().cloned().collect(); + StreamingMergeBuilder::new() .with_streams(streams) .with_schema(Arc::clone(&self.schema)) - .with_expressions(self.expr.as_ref()) + .with_expressions(expressions.as_ref()) .with_metrics(metrics) .with_batch_size(self.batch_size) .with_fetch(self.fetch) @@ -561,7 +563,7 @@ impl ExternalSorter { let schema = batch.schema(); let fetch = self.fetch; - let expressions = Arc::clone(&self.expr); + let expressions: LexOrdering = self.expr.iter().cloned().collect(); let stream = futures::stream::once(futures::future::lazy(move |_| { let timer = metrics.elapsed_compute().timer(); let sorted = sort_batch(&batch, &expressions, fetch)?; @@ -603,7 +605,7 @@ impl Debug for ExternalSorter { pub fn sort_batch( batch: &RecordBatch, - expressions: LexOrderingRef, + expressions: &LexOrdering, fetch: Option, ) -> Result { let sort_columns = expressions @@ -762,8 +764,8 @@ impl SortExec { } /// Sort expressions - pub fn expr(&self) -> LexOrderingRef { - self.expr.as_ref() + pub fn expr(&self) -> &LexOrdering { + &self.expr } /// If `Some(fetch)`, limits output to only the first "fetch" items @@ -790,11 +792,10 @@ impl SortExec { preserve_partitioning: bool, ) -> PlanProperties { // Determine execution mode: - let sort_satisfied = input.equivalence_properties().ordering_satisfy_requirement( - PhysicalSortRequirement::from_sort_exprs(sort_exprs.iter()) - .inner - .as_slice(), - ); + let requirement = LexRequirement::from(sort_exprs); + let sort_satisfied = input + .equivalence_properties() + .ordering_satisfy_requirement(&requirement); let mode = match input.execution_mode() { ExecutionMode::Unbounded if sort_satisfied => ExecutionMode::Unbounded, ExecutionMode::Bounded => ExecutionMode::Bounded, @@ -803,6 +804,7 @@ impl SortExec { // Calculate equivalence properties; i.e. reset the ordering equivalence // class with the new ordering: + let sort_exprs = LexOrdering::from(requirement); let eq_properties = input .equivalence_properties() .clone() @@ -890,11 +892,7 @@ impl ExecutionPlan for SortExec { let sort_satisfied = self .input .equivalence_properties() - .ordering_satisfy_requirement( - PhysicalSortRequirement::from_sort_exprs(self.expr.iter()) - .inner - .as_slice(), - ); + .ordering_satisfy_requirement(&LexRequirement::from(self.expr.clone())); match (sort_satisfied, self.fetch.as_ref()) { (true, Some(fetch)) => Ok(Box::pin(LimitStream::new( diff --git a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs index 9ee0faaa0a44..5c80322afe0c 100644 --- a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs +++ b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs @@ -34,9 +34,7 @@ use datafusion_execution::memory_pool::MemoryConsumer; use datafusion_execution::TaskContext; use datafusion_physical_expr::PhysicalSortRequirement; -use datafusion_physical_expr_common::sort_expr::{ - LexOrdering, LexOrderingRef, LexRequirement, -}; +use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement}; use log::{debug, trace}; /// Sort preserving merge execution plan @@ -122,8 +120,8 @@ impl SortPreservingMergeExec { } /// Sort expressions - pub fn expr(&self) -> LexOrderingRef { - &self.expr + pub fn expr(&self) -> &LexOrdering { + self.expr.as_ref() } /// Fetch diff --git a/datafusion/physical-plan/src/sorts/stream.rs b/datafusion/physical-plan/src/sorts/stream.rs index 70beb2c4a91b..7c57fdf9baae 100644 --- a/datafusion/physical-plan/src/sorts/stream.rs +++ b/datafusion/physical-plan/src/sorts/stream.rs @@ -24,7 +24,7 @@ use arrow::record_batch::RecordBatch; use arrow::row::{RowConverter, SortField}; use datafusion_common::Result; use datafusion_execution::memory_pool::MemoryReservation; -use datafusion_physical_expr_common::sort_expr::LexOrderingRef; +use datafusion_physical_expr_common::sort_expr::LexOrdering; use futures::stream::{Fuse, StreamExt}; use std::marker::PhantomData; use std::sync::Arc; @@ -93,7 +93,7 @@ pub struct RowCursorStream { impl RowCursorStream { pub fn try_new( schema: &Schema, - expressions: LexOrderingRef, + expressions: &LexOrdering, streams: Vec, reservation: MemoryReservation, ) -> Result { diff --git a/datafusion/physical-plan/src/sorts/streaming_merge.rs b/datafusion/physical-plan/src/sorts/streaming_merge.rs index bd74685eac94..4350235ef47d 100644 --- a/datafusion/physical-plan/src/sorts/streaming_merge.rs +++ b/datafusion/physical-plan/src/sorts/streaming_merge.rs @@ -28,7 +28,7 @@ use arrow::datatypes::{DataType, SchemaRef}; use arrow_array::*; use datafusion_common::{internal_err, Result}; use datafusion_execution::memory_pool::MemoryReservation; -use datafusion_physical_expr_common::sort_expr::LexOrderingRef; +use datafusion_physical_expr_common::sort_expr::LexOrdering; macro_rules! primitive_merge_helper { ($t:ty, $($v:ident),+) => { @@ -51,11 +51,10 @@ macro_rules! merge_helper { }}; } -#[derive(Default)] pub struct StreamingMergeBuilder<'a> { streams: Vec, schema: Option, - expressions: LexOrderingRef<'a>, + expressions: &'a LexOrdering, metrics: Option, batch_size: Option, fetch: Option, @@ -63,6 +62,21 @@ pub struct StreamingMergeBuilder<'a> { enable_round_robin_tie_breaker: bool, } +impl<'a> Default for StreamingMergeBuilder<'a> { + fn default() -> Self { + Self { + streams: vec![], + schema: None, + expressions: LexOrdering::empty(), + metrics: None, + batch_size: None, + fetch: None, + reservation: None, + enable_round_robin_tie_breaker: false, + } + } +} + impl<'a> StreamingMergeBuilder<'a> { pub fn new() -> Self { Self { @@ -81,7 +95,7 @@ impl<'a> StreamingMergeBuilder<'a> { self } - pub fn with_expressions(mut self, expressions: LexOrderingRef<'a>) -> Self { + pub fn with_expressions(mut self, expressions: &'a LexOrdering) -> Self { self.expressions = expressions; self } diff --git a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs index c3e0a4e3897c..602efa54f8da 100644 --- a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs +++ b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs @@ -1188,7 +1188,6 @@ mod tests { }; use datafusion_physical_expr::{LexOrdering, PhysicalExpr}; - use datafusion_physical_expr_common::sort_expr::LexOrderingRef; use futures::future::Shared; use futures::{pin_mut, ready, FutureExt, Stream, StreamExt}; use itertools::Itertools; @@ -1555,7 +1554,7 @@ mod tests { Arc::new(BuiltInWindowExpr::new( last_value_func, &[], - LexOrderingRef::default(), + &LexOrdering::default(), Arc::new(WindowFrame::new_bounds( WindowFrameUnits::Rows, WindowFrameBound::Preceding(ScalarValue::UInt64(None)), @@ -1566,7 +1565,7 @@ mod tests { Arc::new(BuiltInWindowExpr::new( nth_value_func1, &[], - LexOrderingRef::default(), + &LexOrdering::default(), Arc::new(WindowFrame::new_bounds( WindowFrameUnits::Rows, WindowFrameBound::Preceding(ScalarValue::UInt64(None)), @@ -1577,7 +1576,7 @@ mod tests { Arc::new(BuiltInWindowExpr::new( nth_value_func2, &[], - LexOrderingRef::default(), + &LexOrdering::default(), Arc::new(WindowFrame::new_bounds( WindowFrameUnits::Rows, WindowFrameBound::Preceding(ScalarValue::UInt64(None)), diff --git a/datafusion/physical-plan/src/windows/mod.rs b/datafusion/physical-plan/src/windows/mod.rs index 217823fb6a0a..da7f6d79e578 100644 --- a/datafusion/physical-plan/src/windows/mod.rs +++ b/datafusion/physical-plan/src/windows/mod.rs @@ -53,7 +53,7 @@ use datafusion_physical_expr::expressions::Column; pub use datafusion_physical_expr::window::{ BuiltInWindowExpr, PlainAggregateWindowExpr, WindowExpr, }; -use datafusion_physical_expr_common::sort_expr::{LexOrderingRef, LexRequirement}; +use datafusion_physical_expr_common::sort_expr::LexRequirement; pub use window_agg_exec::WindowAggExec; /// Build field from window function and add it into schema @@ -98,7 +98,7 @@ pub fn create_window_expr( name: String, args: &[Arc], partition_by: &[Arc], - order_by: LexOrderingRef, + order_by: &LexOrdering, window_frame: Arc, input_schema: &Schema, ignore_nulls: bool, @@ -139,7 +139,7 @@ pub fn create_window_expr( /// Creates an appropriate [`WindowExpr`] based on the window frame and fn window_expr_from_aggregate_expr( partition_by: &[Arc], - order_by: LexOrderingRef, + order_by: &LexOrdering, window_frame: Arc, aggregate: Arc, ) -> Arc { @@ -497,7 +497,7 @@ pub fn get_best_fitting_window( /// the mode this window operator should work in to accommodate the existing ordering. pub fn get_window_mode( partitionby_exprs: &[Arc], - orderby_keys: LexOrderingRef, + orderby_keys: &LexOrdering, input: &Arc, ) -> Option<(bool, InputOrderMode)> { let input_eqs = input.equivalence_properties().clone(); @@ -699,7 +699,7 @@ mod tests { "count".to_owned(), &[col("a", &schema)?], &[], - LexOrderingRef::default(), + &LexOrdering::default(), Arc::new(WindowFrame::new(None)), schema.as_ref(), false, diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index 4bf7e353326e..dc94ad075c53 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -53,7 +53,7 @@ pub fn serialize_physical_aggr_expr( ) -> Result { let expressions = serialize_physical_exprs(&aggr_expr.expressions(), codec)?; let ordering_req = match aggr_expr.order_bys() { - Some(order) => LexOrdering::from_ref(order), + Some(order) => order.clone(), None => LexOrdering::default(), }; let ordering_req = serialize_physical_sort_exprs(ordering_req, codec)?; diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index 1e078ee410c6..8c8dcccee376 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -52,8 +52,7 @@ use datafusion::logical_expr::{create_udf, JoinType, Operator, Volatility}; use datafusion::physical_expr::expressions::Literal; use datafusion::physical_expr::window::SlidingAggregateWindowExpr; use datafusion::physical_expr::{ - LexOrdering, LexOrderingRef, LexRequirement, PhysicalSortRequirement, - ScalarFunctionExpr, + LexOrdering, LexRequirement, PhysicalSortRequirement, ScalarFunctionExpr, }; use datafusion::physical_plan::aggregates::{ AggregateExec, AggregateMode, PhysicalGroupBy, @@ -288,13 +287,15 @@ fn roundtrip_window() -> Result<()> { false, )), &[col("b", &schema)?], - &[PhysicalSortExpr { - expr: col("a", &schema)?, - options: SortOptions { - descending: false, - nulls_first: false, - }, - }], + &LexOrdering{ + inner: vec![PhysicalSortExpr { + expr: col("a", &schema)?, + options: SortOptions { + descending: false, + nulls_first: false, + }, + }] + }, Arc::new(window_frame), )); @@ -308,7 +309,7 @@ fn roundtrip_window() -> Result<()> { .build() .map(Arc::new)?, &[], - LexOrderingRef::default(), + &LexOrdering::default(), Arc::new(WindowFrame::new(None)), )); @@ -328,7 +329,7 @@ fn roundtrip_window() -> Result<()> { let sliding_aggr_window_expr = Arc::new(SlidingAggregateWindowExpr::new( sum_expr, &[], - LexOrderingRef::default(), + &LexOrdering::default(), Arc::new(window_frame), )); @@ -1014,7 +1015,7 @@ fn roundtrip_scalar_udf_extension_codec() -> Result<()> { vec![Arc::new(PlainAggregateWindowExpr::new( aggr_expr.clone(), &[col("author", &schema)?], - LexOrderingRef::default(), + &LexOrdering::default(), Arc::new(WindowFrame::new(None)), ))], filter, @@ -1075,7 +1076,7 @@ fn roundtrip_aggregate_udf_extension_codec() -> Result<()> { vec![Arc::new(PlainAggregateWindowExpr::new( aggr_expr, &[col("author", &schema)?], - LexOrderingRef::default(), + &LexOrdering::default(), Arc::new(WindowFrame::new(None)), ))], filter,