Skip to content

Commit

Permalink
Add swap_inputs to SMJ
Browse files Browse the repository at this point in the history
  • Loading branch information
ozankabak committed Jan 2, 2025
1 parent a08dc0a commit fe97223
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 3 deletions.
4 changes: 3 additions & 1 deletion datafusion/core/src/physical_optimizer/join_selection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -552,7 +552,9 @@ fn hash_join_swap_subrule(
/// [`JoinType::Full`], [`JoinType::Right`], [`JoinType::RightAnti`] and
/// [`JoinType::RightSemi`] can not run with an unbounded left side, even if
/// we swap join sides. Therefore, we do not consider them here.
fn swap_join_according_to_unboundedness(
/// This function is crate public as it is useful for downstream projects
/// to implement, or experiment with, their own join selection rules.
pub(crate) fn swap_join_according_to_unboundedness(
hash_join: &HashJoinExec,
) -> Result<Arc<dyn ExecutionPlan>> {
let partition_mode = hash_join.partition_mode();
Expand Down
36 changes: 34 additions & 2 deletions datafusion/physical-plan/src/joins/sort_merge_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ use crate::execution_plan::{boundedness_from_children, EmissionType};
use crate::expressions::PhysicalSortExpr;
use crate::joins::utils::{
build_join_schema, check_join_is_valid, estimate_join_statistics,
symmetric_join_output_partitioning, JoinFilter, JoinOn, JoinOnRef,
reorder_output_after_swap, symmetric_join_output_partitioning, JoinFilter, JoinOn,
JoinOnRef,
};
use crate::metrics::{Count, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet};
use crate::spill::spill_record_batches;
Expand All @@ -73,7 +74,7 @@ use futures::{Stream, StreamExt};
/// Join execution plan that executes equi-join predicates on multiple partitions using Sort-Merge
/// join algorithm and applies an optional filter post join. Can be used to join arbitrarily large
/// inputs where one or both of the inputs don't fit in the available memory.
///
///
/// # Join Expressions
///
/// Equi-join predicate (e.g. `<col1> = <col2>`) expressions are represented by [`Self::on`].
Expand Down Expand Up @@ -311,6 +312,37 @@ impl SortMergeJoinExec {
boundedness_from_children([left, right]),
)
}

pub fn swap_inputs(&self) -> Result<Arc<dyn ExecutionPlan>> {
let left = self.left();
let right = self.right();
let new_join = SortMergeJoinExec::try_new(
Arc::clone(right),
Arc::clone(left),
self.on()
.iter()
.map(|(l, r)| (Arc::clone(r), Arc::clone(l)))
.collect::<Vec<_>>(),
self.filter().as_ref().map(JoinFilter::swap),
self.join_type().swap(),
self.sort_options.clone(),
self.null_equals_null,
)?;

// TODO: OR this condition with having a built-in projection (like
// ordinary hash join) when we support it.
if matches!(
self.join_type(),
JoinType::LeftSemi
| JoinType::RightSemi
| JoinType::LeftAnti
| JoinType::RightAnti
) {
Ok(Arc::new(new_join))
} else {
reorder_output_after_swap(Arc::new(new_join), &left.schema(), &right.schema())
}
}
}

impl DisplayAs for SortMergeJoinExec {
Expand Down

0 comments on commit fe97223

Please sign in to comment.