diff --git a/datafusion/core/src/physical_optimizer/join_selection.rs b/datafusion/core/src/physical_optimizer/join_selection.rs index d7a2f1740141..736c3fbd0184 100644 --- a/datafusion/core/src/physical_optimizer/join_selection.rs +++ b/datafusion/core/src/physical_optimizer/join_selection.rs @@ -552,7 +552,9 @@ fn hash_join_swap_subrule( /// [`JoinType::Full`], [`JoinType::Right`], [`JoinType::RightAnti`] and /// [`JoinType::RightSemi`] can not run with an unbounded left side, even if /// we swap join sides. Therefore, we do not consider them here. -fn swap_join_according_to_unboundedness( +/// This function is crate public as it is useful for downstream projects +/// to implement, or experiment with, their own join selection rules. +pub(crate) fn swap_join_according_to_unboundedness( hash_join: &HashJoinExec, ) -> Result> { let partition_mode = hash_join.partition_mode(); diff --git a/datafusion/physical-plan/src/joins/sort_merge_join.rs b/datafusion/physical-plan/src/joins/sort_merge_join.rs index 438d9818475d..54bd63084ece 100644 --- a/datafusion/physical-plan/src/joins/sort_merge_join.rs +++ b/datafusion/physical-plan/src/joins/sort_merge_join.rs @@ -58,7 +58,8 @@ use crate::execution_plan::{boundedness_from_children, EmissionType}; use crate::expressions::PhysicalSortExpr; use crate::joins::utils::{ build_join_schema, check_join_is_valid, estimate_join_statistics, - symmetric_join_output_partitioning, JoinFilter, JoinOn, JoinOnRef, + reorder_output_after_swap, symmetric_join_output_partitioning, JoinFilter, JoinOn, + JoinOnRef, }; use crate::metrics::{Count, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet}; use crate::spill::spill_record_batches; @@ -73,7 +74,7 @@ use futures::{Stream, StreamExt}; /// Join execution plan that executes equi-join predicates on multiple partitions using Sort-Merge /// join algorithm and applies an optional filter post join. Can be used to join arbitrarily large /// inputs where one or both of the inputs don't fit in the available memory. -/// +/// /// # Join Expressions /// /// Equi-join predicate (e.g. ` = `) expressions are represented by [`Self::on`]. @@ -311,6 +312,37 @@ impl SortMergeJoinExec { boundedness_from_children([left, right]), ) } + + pub fn swap_inputs(&self) -> Result> { + let left = self.left(); + let right = self.right(); + let new_join = SortMergeJoinExec::try_new( + Arc::clone(right), + Arc::clone(left), + self.on() + .iter() + .map(|(l, r)| (Arc::clone(r), Arc::clone(l))) + .collect::>(), + self.filter().as_ref().map(JoinFilter::swap), + self.join_type().swap(), + self.sort_options.clone(), + self.null_equals_null, + )?; + + // TODO: OR this condition with having a built-in projection (like + // ordinary hash join) when we support it. + if matches!( + self.join_type(), + JoinType::LeftSemi + | JoinType::RightSemi + | JoinType::LeftAnti + | JoinType::RightAnti + ) { + Ok(Arc::new(new_join)) + } else { + reorder_output_after_swap(Arc::new(new_join), &left.schema(), &right.schema()) + } + } } impl DisplayAs for SortMergeJoinExec {