From 72c7a0c99ce2b2241e54e5534eb230bdf4a5ff3c Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Thu, 17 Oct 2024 12:36:56 -0700 Subject: [PATCH] fix --- .../execution/datafusion/shuffle_writer.rs | 38 +++++++++---------- 1 file changed, 17 insertions(+), 21 deletions(-) diff --git a/native/core/src/execution/datafusion/shuffle_writer.rs b/native/core/src/execution/datafusion/shuffle_writer.rs index 32c12c8043..81f54a1d89 100644 --- a/native/core/src/execution/datafusion/shuffle_writer.rs +++ b/native/core/src/execution/datafusion/shuffle_writer.rs @@ -973,8 +973,7 @@ impl ShuffleRepartitioner { &mut self.buffered_partitions, spillfile.path(), self.num_output_partitions, - ) - .await?; + )?; timer.stop(); @@ -1045,7 +1044,7 @@ impl ShuffleRepartitioner { } /// consume the `buffered_partitions` and do spill into a single temp shuffle output file -async fn spill_into( +fn spill_into( buffered_partitions: &mut [PartitionBuffer], path: &Path, num_output_partitions: usize, @@ -1058,25 +1057,22 @@ async fn spill_into( } let path = path.to_owned(); - task::spawn_blocking(move || { - let mut offsets = vec![0; num_output_partitions + 1]; - let mut spill_data = OpenOptions::new() - .write(true) - .create(true) - .truncate(true) - .open(path)?; + let mut offsets = vec![0; num_output_partitions + 1]; + let mut spill_data = OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .open(path) + .map_err(|e| DataFusionError::Execution(format!("Error occurred while spilling {}", e)))?; - for i in 0..num_output_partitions { - offsets[i] = spill_data.stream_position()?; - spill_data.write_all(&output_batches[i])?; - output_batches[i].clear(); - } - // add one extra offset at last to ease partition length computation - offsets[num_output_partitions] = spill_data.stream_position()?; - Ok(offsets) - }) - .await - .map_err(|e| DataFusionError::Execution(format!("Error occurred while spilling {}", e)))? + for i in 0..num_output_partitions { + offsets[i] = spill_data.stream_position()?; + spill_data.write_all(&output_batches[i])?; + output_batches[i].clear(); + } + // add one extra offset at last to ease partition length computation + offsets[num_output_partitions] = spill_data.stream_position()?; + Ok(offsets) } impl Debug for ShuffleRepartitioner {