Skip to content

Commit

Permalink
Deprecate PhysicalSortRequirement::from_sort_exprs and `PhysicalSor…
Browse files Browse the repository at this point in the history
…tRequirement::to_sort_exprs` (#13222)

* Deprecate `PhysicalSortRequirement::from_sort_exprs` and `PhysicalSortRequirement::to_sort_exprs`

* Update for API changes

* fix clippy

* fmt
  • Loading branch information
alamb authored Nov 6, 2024
1 parent 07e8793 commit 6a02384
Show file tree
Hide file tree
Showing 16 changed files with 61 additions and 99 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1423,7 +1423,6 @@ pub(crate) mod tests {
use datafusion_physical_expr::expressions::{BinaryExpr, Literal};
use datafusion_physical_expr::{
expressions::binary, expressions::lit, LexOrdering, PhysicalSortExpr,
PhysicalSortRequirement,
};
use datafusion_physical_expr_common::sort_expr::LexRequirement;
use datafusion_physical_plan::PlanProperties;
Expand Down Expand Up @@ -1496,9 +1495,7 @@ pub(crate) mod tests {
if self.expr.is_empty() {
vec![None]
} else {
vec![Some(PhysicalSortRequirement::from_sort_exprs(
self.expr.iter(),
))]
vec![Some(LexRequirement::from(self.expr.clone()))]
}
}

Expand Down
8 changes: 4 additions & 4 deletions datafusion/core/src/physical_optimizer/enforce_sorting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ use crate::physical_plan::{Distribution, ExecutionPlan, InputOrderMode};

use datafusion_common::plan_err;
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_physical_expr::{Partitioning, PhysicalSortRequirement};
use datafusion_physical_expr::Partitioning;
use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement};
use datafusion_physical_optimizer::PhysicalOptimizerRule;
use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
Expand Down Expand Up @@ -221,7 +221,7 @@ fn replace_with_partial_sort(
// here we're trying to find the common prefix for sorted columns that is required for the
// sort and already satisfied by the given ordering
let child_eq_properties = child.equivalence_properties();
let sort_req = PhysicalSortRequirement::from_sort_exprs(sort_plan.expr());
let sort_req = LexRequirement::from(sort_plan.expr().clone());

let mut common_prefix_length = 0;
while child_eq_properties.ordering_satisfy_requirement(&LexRequirement {
Expand Down Expand Up @@ -275,8 +275,8 @@ fn parallelize_sorts(
{
// Take the initial sort expressions and requirements
let (sort_exprs, fetch) = get_sort_exprs(&requirements.plan)?;
let sort_reqs = PhysicalSortRequirement::from_sort_exprs(sort_exprs);
let sort_exprs = LexOrdering::new(sort_exprs.to_vec());
let sort_reqs = LexRequirement::from(sort_exprs.clone());
let sort_exprs = sort_exprs.clone();

// If there is a connection between a `CoalescePartitionsExec` and a
// global sort that satisfy the requirements (i.e. intermediate
Expand Down
27 changes: 13 additions & 14 deletions datafusion/core/src/physical_optimizer/sort_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@ fn pushdown_sorts_helper(
if is_sort(plan) {
let required_ordering = plan
.output_ordering()
.map(PhysicalSortRequirement::from_sort_exprs)
.cloned()
.map(LexRequirement::from)
.unwrap_or_default();
if !satisfy_parent {
// Make sure this `SortExec` satisfies parent requirements:
Expand Down Expand Up @@ -180,11 +181,12 @@ fn pushdown_requirement_to_children(
RequirementsCompatibility::NonCompatible => Ok(None),
}
} else if let Some(sort_exec) = plan.as_any().downcast_ref::<SortExec>() {
let sort_req = PhysicalSortRequirement::from_sort_exprs(
let sort_req = LexRequirement::from(
sort_exec
.properties()
.output_ordering()
.unwrap_or(&LexOrdering::default()),
.cloned()
.unwrap_or(LexOrdering::default()),
);
if sort_exec
.properties()
Expand All @@ -205,10 +207,11 @@ fn pushdown_requirement_to_children(
.iter()
.all(|maintain| *maintain)
{
let output_req = PhysicalSortRequirement::from_sort_exprs(
let output_req = LexRequirement::from(
plan.properties()
.output_ordering()
.unwrap_or(&LexOrdering::default()),
.cloned()
.unwrap_or(LexOrdering::default()),
);
// Push down through operator with fetch when:
// - requirement is aligned with output ordering
Expand All @@ -227,14 +230,12 @@ fn pushdown_requirement_to_children(
} else if is_union(plan) {
// UnionExec does not have real sort requirements for its input. Here we change the adjusted_request_ordering to UnionExec's output ordering and
// propagate the sort requirements down to correct the unnecessary descendant SortExec under the UnionExec
let req = (!parent_required.is_empty())
.then(|| LexRequirement::new(parent_required.to_vec()));
let req = (!parent_required.is_empty()).then(|| parent_required.clone());
Ok(Some(vec![req; plan.children().len()]))
} else if let Some(smj) = plan.as_any().downcast_ref::<SortMergeJoinExec>() {
// If the current plan is SortMergeJoinExec
let left_columns_len = smj.left().schema().fields().len();
let parent_required_expr =
PhysicalSortRequirement::to_sort_exprs(parent_required.iter().cloned());
let parent_required_expr = LexOrdering::from(parent_required.clone());
match expr_source_side(
parent_required_expr.as_ref(),
smj.join_type(),
Expand All @@ -251,8 +252,7 @@ fn pushdown_requirement_to_children(
smj.schema().fields.len() - smj.right().schema().fields.len();
let new_right_required =
shift_right_required(parent_required, right_offset)?;
let new_right_required_expr =
PhysicalSortRequirement::to_sort_exprs(new_right_required);
let new_right_required_expr = LexOrdering::from(new_right_required);
try_pushdown_requirements_to_join(
smj,
parent_required,
Expand All @@ -278,8 +278,7 @@ fn pushdown_requirement_to_children(
// Pushing down is not beneficial
Ok(None)
} else if is_sort_preserving_merge(plan) {
let new_ordering =
PhysicalSortRequirement::to_sort_exprs(parent_required.to_vec());
let new_ordering = LexOrdering::from(parent_required.clone());
let mut spm_eqs = plan.equivalence_properties().clone();
// Sort preserving merge will have new ordering, one requirement above is pushed down to its below.
spm_eqs = spm_eqs.with_reorder(new_ordering);
Expand Down Expand Up @@ -412,7 +411,7 @@ fn try_pushdown_requirements_to_join(
let should_pushdown = smj_eqs.ordering_satisfy_requirement(parent_required);
Ok(should_pushdown.then(|| {
let mut required_input_ordering = smj.required_input_ordering();
let new_req = Some(PhysicalSortRequirement::from_sort_exprs(sort_expr));
let new_req = Some(LexRequirement::from(sort_expr.clone()));
match push_side {
JoinSide::Left => {
required_input_ordering[0] = new_req;
Expand Down
8 changes: 2 additions & 6 deletions datafusion/core/src/physical_optimizer/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,7 @@ use datafusion_physical_plan::{

use async_trait::async_trait;
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
use datafusion_physical_expr_common::sort_expr::{
LexOrdering, LexRequirement, PhysicalSortRequirement,
};
use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement};

async fn register_current_csv(
ctx: &SessionContext,
Expand Down Expand Up @@ -419,9 +417,7 @@ impl ExecutionPlan for RequirementsTestExec {
}

fn required_input_ordering(&self) -> Vec<Option<LexRequirement>> {
let requirement = PhysicalSortRequirement::from_sort_exprs(
self.required_input_ordering.as_ref().iter(),
);
let requirement = LexRequirement::from(self.required_input_ordering.clone());
vec![Some(requirement)]
}

Expand Down
10 changes: 4 additions & 6 deletions datafusion/core/src/physical_optimizer/update_aggr_exprs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use datafusion_physical_expr::aggregate::AggregateFunctionExpr;
use datafusion_physical_expr::{
reverse_order_bys, EquivalenceProperties, PhysicalSortRequirement,
};
use datafusion_physical_expr_common::sort_expr::LexRequirement;
use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement};
use datafusion_physical_optimizer::PhysicalOptimizerRule;
use datafusion_physical_plan::aggregates::concat_slices;
use datafusion_physical_plan::windows::get_ordered_partition_by_indices;
Expand Down Expand Up @@ -139,12 +139,10 @@ fn try_convert_aggregate_if_better(
aggr_exprs
.into_iter()
.map(|aggr_expr| {
let aggr_sort_exprs = &aggr_expr.order_bys().cloned().unwrap_or_default();
let aggr_sort_exprs = aggr_expr.order_bys().unwrap_or(LexOrdering::empty());
let reverse_aggr_sort_exprs = reverse_order_bys(aggr_sort_exprs);
let aggr_sort_reqs =
PhysicalSortRequirement::from_sort_exprs(aggr_sort_exprs.iter());
let reverse_aggr_req =
PhysicalSortRequirement::from_sort_exprs(&reverse_aggr_sort_exprs.inner);
let aggr_sort_reqs = LexRequirement::from(aggr_sort_exprs.clone());
let reverse_aggr_req = LexRequirement::from(reverse_aggr_sort_exprs);

// If the aggregate expression benefits from input ordering, and
// there is an actual ordering enabling this, try to update the
Expand Down
5 changes: 3 additions & 2 deletions datafusion/core/src/physical_optimizer/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ use crate::physical_plan::union::UnionExec;
use crate::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec};
use crate::physical_plan::{ExecutionPlan, ExecutionPlanProperties};

use datafusion_physical_expr::{LexRequirement, PhysicalSortRequirement};
use datafusion_physical_expr::LexRequirement;
use datafusion_physical_expr_common::sort_expr::LexOrdering;
use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
use datafusion_physical_plan::tree_node::PlanContext;

Expand All @@ -38,7 +39,7 @@ pub fn add_sort_above<T: Clone + Default>(
sort_requirements: LexRequirement,
fetch: Option<usize>,
) -> PlanContext<T> {
let mut sort_expr = PhysicalSortRequirement::to_sort_exprs(sort_requirements);
let mut sort_expr = LexOrdering::from(sort_requirements);
sort_expr.inner.retain(|sort_expr| {
!node
.plan
Expand Down
29 changes: 6 additions & 23 deletions datafusion/physical-expr-common/src/sort_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,26 +296,14 @@ impl PhysicalSortRequirement {
})
}

/// Returns [`PhysicalSortRequirement`] that requires the exact
/// sort of the [`PhysicalSortExpr`]s in `ordering`
///
/// This method takes `&'a PhysicalSortExpr` to make it easy to
/// use implementing [`ExecutionPlan::required_input_ordering`].
///
/// [`ExecutionPlan::required_input_ordering`]: https://docs.rs/datafusion/latest/datafusion/physical_plan/trait.ExecutionPlan.html#method.required_input_ordering
#[deprecated(since = "43.0.0", note = "use LexRequirement::from_lex_ordering")]
pub fn from_sort_exprs<'a>(
ordering: impl IntoIterator<Item = &'a PhysicalSortExpr>,
) -> LexRequirement {
let ordering = ordering.into_iter().cloned().collect();
LexRequirement::from_lex_ordering(ordering)
}

/// Converts an iterator of [`PhysicalSortRequirement`] into a Vec
/// of [`PhysicalSortExpr`]s.
///
/// This function converts `PhysicalSortRequirement` to `PhysicalSortExpr`
/// for each entry in the input. If required ordering is None for an entry
/// default ordering `ASC, NULLS LAST` if given (see the `PhysicalSortExpr::from`).
#[deprecated(since = "43.0.0", note = "use LexOrdering::from_lex_requirement")]
pub fn to_sort_exprs(
requirements: impl IntoIterator<Item = PhysicalSortRequirement>,
) -> LexOrdering {
Expand Down Expand Up @@ -416,8 +404,8 @@ impl LexOrdering {
/// This function converts `PhysicalSortRequirement` to `PhysicalSortExpr`
/// for each entry in the input. If required ordering is None for an entry
/// default ordering `ASC, NULLS LAST` if given (see the `PhysicalSortExpr::from`).
pub fn from_lex_requirement(requirements: LexRequirement) -> LexOrdering {
requirements
pub fn from_lex_requirement(requirement: LexRequirement) -> LexOrdering {
requirement
.into_iter()
.map(PhysicalSortExpr::from)
.collect()
Expand Down Expand Up @@ -541,15 +529,10 @@ impl LexRequirement {
self.inner.push(physical_sort_requirement)
}

/// Create a new [`LexRequirement`] from a vector of [`PhysicalSortExpr`]s.
/// Create a new [`LexRequirement`] from a [`LexOrdering`]
///
/// Returns [`PhysicalSortRequirement`] that requires the exact
/// Returns [`LexRequirement`] that requires the exact
/// sort of the [`PhysicalSortExpr`]s in `ordering`
///
/// This method takes `&'a PhysicalSortExpr` to make it easy to
/// use implementing [`ExecutionPlan::required_input_ordering`].
///
/// [`ExecutionPlan::required_input_ordering`]: https://docs.rs/datafusion/latest/datafusion/physical_plan/trait.ExecutionPlan.html#method.required_input_ordering
pub fn from_lex_ordering(ordering: LexOrdering) -> Self {
Self::new(
ordering
Expand Down
4 changes: 2 additions & 2 deletions datafusion/physical-expr/src/equivalence/class.rs
Original file line number Diff line number Diff line change
Expand Up @@ -475,11 +475,11 @@ impl EquivalenceGroup {
/// sort expressions.
pub fn normalize_sort_exprs(&self, sort_exprs: &LexOrdering) -> LexOrdering {
// Convert sort expressions to sort requirements:
let sort_reqs = PhysicalSortRequirement::from_sort_exprs(sort_exprs.iter());
let sort_reqs = LexRequirement::from(sort_exprs.clone());
// Normalize the requirements:
let normalized_sort_reqs = self.normalize_sort_requirements(&sort_reqs);
// Convert sort requirements back to sort expressions:
PhysicalSortRequirement::to_sort_exprs(normalized_sort_reqs.inner)
LexOrdering::from(normalized_sort_reqs)
}

/// This function applies the `normalize_sort_requirement` function for all
Expand Down
12 changes: 6 additions & 6 deletions datafusion/physical-expr/src/equivalence/properties.rs
Original file line number Diff line number Diff line change
Expand Up @@ -409,11 +409,11 @@ impl EquivalenceProperties {
/// after deduplication.
fn normalize_sort_exprs(&self, sort_exprs: &LexOrdering) -> LexOrdering {
// Convert sort expressions to sort requirements:
let sort_reqs = PhysicalSortRequirement::from_sort_exprs(sort_exprs.iter());
let sort_reqs = LexRequirement::from(sort_exprs.clone());
// Normalize the requirements:
let normalized_sort_reqs = self.normalize_sort_requirements(&sort_reqs);
// Convert sort requirements back to sort expressions:
PhysicalSortRequirement::to_sort_exprs(normalized_sort_reqs)
LexOrdering::from(normalized_sort_reqs)
}

/// Normalizes the given sort requirements (i.e. `sort_reqs`) using the
Expand Down Expand Up @@ -454,7 +454,7 @@ impl EquivalenceProperties {
/// orderings.
pub fn ordering_satisfy(&self, given: &LexOrdering) -> bool {
// Convert the given sort expressions to sort requirements:
let sort_requirements = PhysicalSortRequirement::from_sort_exprs(given.iter());
let sort_requirements = LexRequirement::from(given.clone());
self.ordering_satisfy_requirement(&sort_requirements)
}

Expand Down Expand Up @@ -548,11 +548,11 @@ impl EquivalenceProperties {
rhs: &LexOrdering,
) -> Option<LexOrdering> {
// Convert the given sort expressions to sort requirements:
let lhs = PhysicalSortRequirement::from_sort_exprs(lhs);
let rhs = PhysicalSortRequirement::from_sort_exprs(rhs);
let lhs = LexRequirement::from(lhs.clone());
let rhs = LexRequirement::from(rhs.clone());
let finer = self.get_finer_requirement(&lhs, &rhs);
// Convert the chosen sort requirements back to sort expressions:
finer.map(PhysicalSortRequirement::to_sort_exprs)
finer.map(LexOrdering::from)
}

/// Returns the finer ordering among the requirements `lhs` and `rhs`,
Expand Down
6 changes: 3 additions & 3 deletions datafusion/physical-optimizer/src/output_requirements.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use datafusion_physical_plan::{
use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_common::{Result, Statistics};
use datafusion_physical_expr::{Distribution, LexRequirement, PhysicalSortRequirement};
use datafusion_physical_expr::{Distribution, LexRequirement};
use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
use datafusion_physical_plan::{ExecutionPlanProperties, PlanProperties};

Expand Down Expand Up @@ -256,13 +256,13 @@ fn require_top_ordering_helper(
// Therefore; we check the sort expression field of the SortExec to assign the requirements.
let req_ordering = sort_exec.expr();
let req_dist = sort_exec.required_input_distribution()[0].clone();
let reqs = PhysicalSortRequirement::from_sort_exprs(req_ordering);
let reqs = LexRequirement::from(req_ordering.clone());
Ok((
Arc::new(OutputRequirementExec::new(plan, Some(reqs), req_dist)) as _,
true,
))
} else if let Some(spm) = plan.as_any().downcast_ref::<SortPreservingMergeExec>() {
let reqs = PhysicalSortRequirement::from_sort_exprs(spm.expr());
let reqs = LexRequirement::from(spm.expr().clone());
Ok((
Arc::new(OutputRequirementExec::new(
plan,
Expand Down
6 changes: 2 additions & 4 deletions datafusion/physical-plan/src/aggregates/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1074,9 +1074,7 @@ pub fn get_finer_aggregate_exprs_requirement(
);
}

Ok(PhysicalSortRequirement::from_sort_exprs(
requirement.inner.iter(),
))
Ok(LexRequirement::from(requirement))
}

/// Returns physical expressions for arguments to evaluate against a batch.
Expand Down Expand Up @@ -2304,7 +2302,7 @@ mod tests {
&eq_properties,
&AggregateMode::Partial,
)?;
let res = PhysicalSortRequirement::to_sort_exprs(res);
let res = LexOrdering::from(res);
assert_eq!(res, common_requirement);
Ok(())
}
Expand Down
10 changes: 3 additions & 7 deletions datafusion/physical-plan/src/joins/sort_merge_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
use datafusion_execution::runtime_env::RuntimeEnv;
use datafusion_execution::TaskContext;
use datafusion_physical_expr::equivalence::join_equivalence_properties;
use datafusion_physical_expr::{PhysicalExprRef, PhysicalSortRequirement};
use datafusion_physical_expr::PhysicalExprRef;
use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement};
use futures::{Stream, StreamExt};

Expand Down Expand Up @@ -297,12 +297,8 @@ impl ExecutionPlan for SortMergeJoinExec {

fn required_input_ordering(&self) -> Vec<Option<LexRequirement>> {
vec![
Some(PhysicalSortRequirement::from_sort_exprs(
self.left_sort_exprs.iter(),
)),
Some(PhysicalSortRequirement::from_sort_exprs(
self.right_sort_exprs.iter(),
)),
Some(LexRequirement::from(self.left_sort_exprs.clone())),
Some(LexRequirement::from(self.right_sort_exprs.clone())),
]
}

Expand Down
Loading

0 comments on commit 6a02384

Please sign in to comment.