Skip to content

Commit

Permalink
fix: Input batch to ShuffleRepartitioner.insert_batch should not be l…
Browse files Browse the repository at this point in the history
…arger than configured batch size
  • Loading branch information
viirya committed Jun 5, 2024
1 parent b3ba82f commit 2534ed1
Showing 1 changed file with 9 additions and 2 deletions.
11 changes: 9 additions & 2 deletions core/src/execution/datafusion/shuffle_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -939,6 +939,7 @@ async fn external_shuffle(
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
let schema = input.schema();
let batch_size = context.session_config().batch_size();
let mut repartitioner = ShuffleRepartitioner::new(
partition_id,
output_data_file,
Expand All @@ -947,12 +948,18 @@ async fn external_shuffle(
partitioning,
metrics,
context.runtime_env(),
context.session_config().batch_size(),
batch_size,
);

while let Some(batch) = input.next().await {
let batch = batch?;
repartitioner.insert_batch(batch).await?;
let mut start = 0;
while start < batch.num_rows() {
let end = (start + batch_size).min(batch.num_rows());
let batch = batch.slice(start, end - start);
repartitioner.insert_batch(batch).await?;
start = end;
}
}
repartitioner.shuffle_write().await
}
Expand Down

0 comments on commit 2534ed1

Please sign in to comment.