Skip to content

Commit

Permalink
Merge SortMergeJoin filtered batches into bigger batches
Browse files Browse the repository at this point in the history
  • Loading branch information
comphead committed Jan 17, 2025
1 parent 405324d commit 9f089cb
Showing 1 changed file with 5 additions and 0 deletions.
5 changes: 5 additions & 0 deletions datafusion/physical-plan/src/joins/sort_merge_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1056,19 +1056,24 @@ impl Stream for SortMergeJoinStream {
{
self.freeze_all()?;

// If join is filtered and there is joined tuples waiting
// to be filtered
if !self
.staging_output_record_batches
.batches
.is_empty()
{
// Apply filter on joined tuples and get filtered batch
let out_filtered_batch =
self.filter_joined_batch()?;

// Append filtered batch to the output buffer
self.output = concat_batches(
&self.schema(),
vec![&self.output, &out_filtered_batch],
)?;

// Send to output if the output buffer surpassed the `batch_size`
if self.output.num_rows() >= self.batch_size {
let record_batch = std::mem::replace(
&mut self.output,
Expand Down

0 comments on commit 9f089cb

Please sign in to comment.