Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
viirya committed Oct 16, 2024
1 parent 3ec4d63 commit 851427f
Showing 1 changed file with 6 additions and 1 deletion.
7 changes: 6 additions & 1 deletion native/core/src/execution/datafusion/shuffle_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ use datafusion::{
},
};
use datafusion_physical_expr::EquivalenceProperties;
use futures::executor::block_on;
use futures::{lock::Mutex, Stream, StreamExt, TryFutureExt, TryStreamExt};
use itertools::Itertools;
use simd_adler32::Adler32;
Expand Down Expand Up @@ -1111,7 +1112,11 @@ async fn external_shuffle(
);

while let Some(batch) = input.next().await {
repartitioner.insert_batch(batch?).await?;
// Block on the repartitioner to insert the batch and shuffle the rows
// into the corresponding partition buffer.
// Otherwise, pull the next batch from the input stream might overwrite the
// current batch in the repartitioner.
block_on(repartitioner.insert_batch(batch?))?;
}
repartitioner.shuffle_write().await
}
Expand Down

0 comments on commit 851427f

Please sign in to comment.