Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make LexOrdering::inner non pub, add comments, update usages #14155

Merged
merged 1 commit into from
Jan 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1313,15 +1313,15 @@ mod tests {
// ok with one column
(
vec![vec![col("string_col").sort(true, false)]],
Ok(vec![LexOrdering {
inner: vec![PhysicalSortExpr {
Ok(vec![LexOrdering::new(
vec![PhysicalSortExpr {
expr: physical_col("string_col", &schema).unwrap(),
options: SortOptions {
descending: false,
nulls_first: false,
},
}],
}
)
])
),
// ok with two columns, different options
Expand All @@ -1330,16 +1330,16 @@ mod tests {
col("string_col").sort(true, false),
col("int_col").sort(false, true),
]],
Ok(vec![LexOrdering {
inner: vec![
Ok(vec![LexOrdering::new(
vec![
PhysicalSortExpr::new_default(physical_col("string_col", &schema).unwrap())
.asc()
.nulls_last(),
PhysicalSortExpr::new_default(physical_col("int_col", &schema).unwrap())
.desc()
.nulls_first()
],
}
)
])
),
];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1112,9 +1112,8 @@ mod tests {
))))
.collect::<Vec<_>>(),
));
let sort_order = LexOrdering {
inner: case
.sort
let sort_order = LexOrdering::from(
case.sort
.into_iter()
.map(|expr| {
crate::physical_planner::create_physical_sort_expr(
Expand All @@ -1124,7 +1123,7 @@ mod tests {
)
})
.collect::<Result<Vec<_>>>()?,
};
);

let partitioned_files =
case.files.into_iter().map(From::from).collect::<Vec<_>>();
Expand Down
6 changes: 3 additions & 3 deletions datafusion/core/src/datasource/physical_plan/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,8 @@ impl MinMaxStatistics {
projected_schema
.project(&(sort_columns.iter().map(|c| c.index()).collect::<Vec<_>>()))?,
);
let min_max_sort_order = LexOrdering {
inner: sort_columns
let min_max_sort_order = LexOrdering::from(
sort_columns
.iter()
.zip(projected_sort_order.iter())
.enumerate()
Expand All @@ -129,7 +129,7 @@ impl MinMaxStatistics {
options: sort.options,
})
.collect::<Vec<_>>(),
};
);

let (min_values, max_values): (Vec<_>, Vec<_>) = sort_columns
.iter()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,10 +133,7 @@ fn plan_with_order_preserving_variants(
if let Some(ordering) = child.output_ordering() {
// When the input of a `CoalescePartitionsExec` has an ordering,
// replace it with a `SortPreservingMergeExec` if appropriate:
let spm = SortPreservingMergeExec::new(
LexOrdering::new(ordering.inner.clone()),
Arc::clone(child),
);
let spm = SortPreservingMergeExec::new(ordering.clone(), Arc::clone(child));
sort_input.plan = Arc::new(spm) as _;
sort_input.children[0].data = true;
return Ok(sort_input);
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_optimizer/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pub fn add_sort_above<T: Clone + Default>(
fetch: Option<usize>,
) -> PlanContext<T> {
let mut sort_expr = LexOrdering::from(sort_requirements);
sort_expr.inner.retain(|sort_expr| {
sort_expr.retain(|sort_expr| {
!node
.plan
.equivalence_properties()
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/tests/fuzz_cases/equivalence/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,7 @@ pub fn generate_table_for_orderings(

assert!(!orderings.is_empty());
// Sort the inner vectors by their lengths (longest first)
orderings.sort_by_key(|v| std::cmp::Reverse(v.inner.len()));
orderings.sort_by_key(|v| std::cmp::Reverse(v.len()));

let arrays = schema
.fields
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/tests/fuzz_cases/window_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -617,7 +617,7 @@ async fn run_window_test(
options: SortOptions::default(),
})
}
for order_by_expr in &orderby_exprs.inner {
for order_by_expr in &orderby_exprs {
if !sort_keys.contains(order_by_expr) {
sort_keys.push(order_by_expr.clone())
}
Expand Down
58 changes: 48 additions & 10 deletions datafusion/physical-expr-common/src/sort_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,9 +325,13 @@ fn to_str(options: &SortOptions) -> &str {

///`LexOrdering` contains a `Vec<PhysicalSortExpr>`, which represents
/// a lexicographical ordering.
///
/// For example, `vec![a ASC, b DESC]` represents a lexicographical ordering
/// that first sorts by column `a` in ascending order, then by column `b` in
/// descending order.
#[derive(Debug, Default, Clone, PartialEq, Eq, Hash)]
pub struct LexOrdering {
pub inner: Vec<PhysicalSortExpr>,
inner: Vec<PhysicalSortExpr>,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the core change of this PR -- to make this field non pub. All the rest of the changes follow from there

}

impl AsRef<LexOrdering> for LexOrdering {
Expand All @@ -337,7 +341,7 @@ impl AsRef<LexOrdering> for LexOrdering {
}

impl LexOrdering {
// Creates a new [`LexOrdering`] from a vector
/// Creates a new [`LexOrdering`] from a vector
pub fn new(inner: Vec<PhysicalSortExpr>) -> Self {
Self { inner }
}
Expand All @@ -348,46 +352,61 @@ impl LexOrdering {
&EMPTY_ORDER
}

/// Returns the number of elements that can be stored in the LexOrdering
/// without reallocating.
pub fn capacity(&self) -> usize {
self.inner.capacity()
}

/// Clears the LexOrdering, removing all elements.
pub fn clear(&mut self) {
self.inner.clear()
}

/// Returns `true` if the LexOrdering contains `expr`
pub fn contains(&self, expr: &PhysicalSortExpr) -> bool {
self.inner.contains(expr)
}

/// Add all elements from `iter` to the LexOrdering.
pub fn extend<I: IntoIterator<Item = PhysicalSortExpr>>(&mut self, iter: I) {
self.inner.extend(iter)
}

/// Remove all elements from the LexOrdering where `f` evaluates to `false`.
pub fn retain<F>(&mut self, f: F)
where
F: FnMut(&PhysicalSortExpr) -> bool,
{
self.inner.retain(f)
}

/// Returns `true` if the LexOrdering contains no elements.
pub fn is_empty(&self) -> bool {
self.inner.is_empty()
}

pub fn iter(&self) -> impl Iterator<Item = &PhysicalSortExpr> {
/// Returns an iterator over each `&PhysicalSortExpr` in the LexOrdering.
pub fn iter(&self) -> core::slice::Iter<PhysicalSortExpr> {
self.inner.iter()
}

/// Returns the number of elements in the LexOrdering.
pub fn len(&self) -> usize {
self.inner.len()
}

/// Removes the last element from the LexOrdering and returns it, or `None` if it is empty.
pub fn pop(&mut self) -> Option<PhysicalSortExpr> {
self.inner.pop()
}

/// Appends an element to the back of the LexOrdering.
pub fn push(&mut self, physical_sort_expr: PhysicalSortExpr) {
self.inner.push(physical_sort_expr)
}

pub fn retain(&mut self, f: impl FnMut(&PhysicalSortExpr) -> bool) {
self.inner.retain(f)
}

/// Truncates the LexOrdering, keeping only the first `len` elements.
pub fn truncate(&mut self, len: usize) {
self.inner.truncate(len)
}
Expand All @@ -400,9 +419,12 @@ impl LexOrdering {

/// Converts a `LexRequirement` into a `LexOrdering`.
///
/// This function converts `PhysicalSortRequirement` to `PhysicalSortExpr`
/// for each entry in the input. If required ordering is None for an entry
/// default ordering `ASC, NULLS LAST` if given (see the `PhysicalSortExpr::from`).
/// This function converts [`PhysicalSortRequirement`] to [`PhysicalSortExpr`]
/// for each entry in the input.
///
/// If the required ordering is `None` for an entry in `requirement`, the
/// default ordering `ASC, NULLS LAST` is used (see
/// [`PhysicalSortExpr::from`]).
pub fn from_lex_requirement(requirement: LexRequirement) -> LexOrdering {
requirement
.into_iter()
Expand All @@ -425,6 +447,15 @@ impl LexOrdering {
}
output
}

/// Transforms each `PhysicalSortExpr` in the `LexOrdering`
/// in place using the provided closure `f`.
pub fn transform<F>(&mut self, f: F)
where
F: FnMut(&mut PhysicalSortExpr),
{
self.inner.iter_mut().for_each(f);
}
}

impl From<Vec<PhysicalSortExpr>> for LexOrdering {
Expand All @@ -439,6 +470,13 @@ impl From<LexRequirement> for LexOrdering {
}
}

/// Convert a `LexOrdering` into a `Arc[<PhysicalSortExpr>]` for fast copies
impl From<LexOrdering> for Arc<[PhysicalSortExpr]> {
fn from(value: LexOrdering) -> Self {
value.inner.into()
}
}

impl Deref for LexOrdering {
type Target = [PhysicalSortExpr];

Expand Down
6 changes: 3 additions & 3 deletions datafusion/physical-expr/src/equivalence/ordering.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ impl OrderingEquivalenceClass {
for idx in 0..n_ordering {
// Calculate cross product index
let idx = outer_idx * n_ordering + idx;
self.orderings[idx].inner.extend(ordering.iter().cloned());
self.orderings[idx].extend(ordering.iter().cloned());
}
}
self
Expand All @@ -217,9 +217,9 @@ impl OrderingEquivalenceClass {
/// ordering equivalence class.
pub fn add_offset(&mut self, offset: usize) {
for ordering in self.orderings.iter_mut() {
for sort_expr in ordering.inner.iter_mut() {
ordering.transform(|sort_expr| {
sort_expr.expr = add_offset_to_expr(Arc::clone(&sort_expr.expr), offset);
}
})
}
}

Expand Down
11 changes: 4 additions & 7 deletions datafusion/physical-expr/src/equivalence/properties.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,6 @@ impl EquivalenceProperties {
let mut output_ordering = self.oeq_class().output_ordering().unwrap_or_default();
// Prune out constant expressions
output_ordering
.inner
.retain(|sort_expr| !const_exprs_contains(constants, &sort_expr.expr));
(!output_ordering.is_empty()).then_some(output_ordering)
}
Expand Down Expand Up @@ -697,7 +696,6 @@ impl EquivalenceProperties {
// Generate all valid orderings, given substituted expressions.
let res = new_orderings
.into_iter()
.map(|ordering| ordering.inner)
.multi_cartesian_product()
.map(LexOrdering::new)
.collect::<Vec<_>>();
Expand Down Expand Up @@ -1221,7 +1219,6 @@ impl EquivalenceProperties {
let mut new_orderings = vec![];
for ordering in self.oeq_class {
let new_ordering = ordering
.inner
.into_iter()
.map(|mut sort_expr| {
sort_expr.expr = with_new_schema(sort_expr.expr, &schema)?;
Expand Down Expand Up @@ -1507,7 +1504,7 @@ fn generate_dependency_orderings(
.map(|prefixes| {
prefixes
.into_iter()
.flat_map(|ordering| ordering.inner.clone())
.flat_map(|ordering| ordering.clone())
.collect()
})
.collect::<Vec<_>>()
Expand Down Expand Up @@ -2177,8 +2174,8 @@ impl UnionEquivalentOrderingBuilder {
existing_constants: &[ConstExpr],
) -> Option<LexOrdering> {
let mut augmented_ordering = LexOrdering::default();
let mut sort_expr_iter = ordering.inner.iter().peekable();
let mut existing_sort_expr_iter = existing_ordering.inner.iter().peekable();
let mut sort_expr_iter = ordering.iter().peekable();
let mut existing_sort_expr_iter = existing_ordering.iter().peekable();

// walk in parallel down the two orderings, trying to match them up
while sort_expr_iter.peek().is_some() || existing_sort_expr_iter.peek().is_some()
Expand Down Expand Up @@ -2758,7 +2755,7 @@ mod tests {
let leading_orderings = eq_properties
.oeq_class()
.iter()
.flat_map(|ordering| ordering.inner.first().cloned())
.flat_map(|ordering| ordering.first().cloned())
.collect::<Vec<_>>();
let expr_props = eq_properties.get_expr_properties(Arc::clone(&expr));
let err_msg = format!(
Expand Down
7 changes: 4 additions & 3 deletions datafusion/physical-plan/src/joins/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,7 @@ fn replace_on_columns_of_right_ordering(
right_ordering: &mut LexOrdering,
) -> Result<()> {
for (left_col, right_col) in on_columns {
for item in right_ordering.inner.iter_mut() {
right_ordering.transform(|item| {
let new_expr = Arc::clone(&item.expr)
.transform(|e| {
if e.eq(right_col) {
Expand All @@ -465,9 +465,10 @@ fn replace_on_columns_of_right_ordering(
Ok(Transformed::no(e))
}
})
.data()?;
.data()
.expect("closure is infallible");
item.expr = new_expr;
}
});
}
Ok(())
}
Expand Down
6 changes: 3 additions & 3 deletions datafusion/physical-plan/src/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ impl MemoryExec {
let fields = self.schema.fields();
let ambiguous_column = sort_information
.iter()
.flat_map(|ordering| ordering.inner.clone())
.flat_map(|ordering| ordering.clone())
.flat_map(|expr| collect_columns(&expr.expr))
.find(|col| {
fields
Expand Down Expand Up @@ -660,8 +660,8 @@ mod memory_exec_tests {
.try_with_sort_information(sort_information)?;

assert_eq!(
mem_exec.properties().output_ordering().unwrap().to_vec(),
expected_output_order.inner
mem_exec.properties().output_ordering().unwrap(),
&expected_output_order
);
let eq_properties = mem_exec.properties().equivalence_properties();
assert!(eq_properties.oeq_class().contains(&sort1));
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-plan/src/sorts/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ impl ExternalSorter {
in_mem_batches: vec![],
in_mem_batches_sorted: true,
spills: vec![],
expr: expr.inner.into(),
expr: expr.into(),
metrics,
fetch,
reservation,
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-plan/src/topk/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ impl TopK {
let reservation = MemoryConsumer::new(format!("TopK[{partition_id}]"))
.register(&runtime.memory_pool);

let expr: Arc<[PhysicalSortExpr]> = expr.inner.into();
let expr: Arc<[PhysicalSortExpr]> = expr.into();

let sort_fields: Vec<_> = expr
.iter()
Expand Down
Loading
Loading