Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
viirya committed Oct 17, 2024
1 parent 67f86ae commit f44b423
Showing 1 changed file with 48 additions and 28 deletions.
76 changes: 48 additions & 28 deletions native/core/src/execution/datafusion/shuffle_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,10 @@ use datafusion::{
},
};
use datafusion_physical_expr::EquivalenceProperties;
use futures::{lock::Mutex, Stream, StreamExt, TryFutureExt, TryStreamExt};
use futures::executor::block_on;
use futures::{lock::Mutex, Stream, StreamExt, TryFutureExt, TryStreamExt};
use itertools::Itertools;
use simd_adler32::Adler32;
use tokio::task;

use crate::{
common::bit::ceil,
Expand All @@ -68,6 +67,7 @@ use crate::{
use datafusion_comet_spark_expr::spark_hash::create_murmur3_hashes;

/// The status of appending rows to a partition buffer.
#[derive(Debug)]
enum AppendRowStatus {
/// The difference in memory usage after appending rows
MemDiff(Result<isize>),
Expand Down Expand Up @@ -291,7 +291,13 @@ impl PartitionBuffer {
.iter_mut()
.zip(columns)
.for_each(|(builder, column)| {
append_columns(builder, column, &indices[start..end], column.data_type(), partition_id);
append_columns(
builder,
column,
&indices[start..end],
column.data_type(),
partition_id,
);
});
self.num_active_rows += end - start;
if self.num_active_rows >= self.batch_size {
Expand Down Expand Up @@ -732,6 +738,7 @@ impl ShuffleRepartitioner {
async fn insert_batch(&mut self, batch: RecordBatch) -> Result<()> {
let mut start = 0;
while start < batch.num_rows() {
println!("start: {}, batch.num_rows(): {}", start, batch.num_rows());
let end = (start + self.batch_size).min(batch.num_rows());
let batch = batch.slice(start, end - start);
self.partitioning_batch(batch).await?;
Expand Down Expand Up @@ -824,6 +831,7 @@ impl ShuffleRepartitioner {
.enumerate()
.filter(|(_, (start, end))| start < end)
{
println!("partition id: {}", partition_id);
mem_diff += self
.append_rows_to_partition(
input.columns(),
Expand All @@ -832,6 +840,8 @@ impl ShuffleRepartitioner {
)
.await?;

println!("mem_diff: {}", mem_diff);

if mem_diff > 0 {
let mem_increase = mem_diff as usize;
if self.reservation.try_grow(mem_increase).is_err() {
Expand Down Expand Up @@ -1001,8 +1011,7 @@ impl ShuffleRepartitioner {
&mut self.buffered_partitions,
spillfile.path(),
self.num_output_partitions,
)
.await?;
)?;

timer.stop();

Expand Down Expand Up @@ -1033,9 +1042,11 @@ impl ShuffleRepartitioner {
// If the range of indices is not big enough, just appending the rows into
// active array builders instead of directly adding them as a record batch.
let mut start_index: usize = 0;
let mut output_ret = output.append_rows(columns, indices, start_index, time_metric, partition_id);
let mut output_ret =
output.append_rows(columns, indices, start_index, time_metric, partition_id);

loop {
println!("output_ret: {:?}", output_ret);
match output_ret {
AppendRowStatus::MemDiff(l) => {
mem_diff += l?;
Expand All @@ -1046,13 +1057,24 @@ impl ShuffleRepartitioner {
// spill partitions and retry.
self.spill().await?;

println!(
"partition_id: {}, new_start: {}, start_index: {}",
partition_id, new_start, start_index
);

let output = &mut self.buffered_partitions[partition_id];
output.reservation.free();

let time_metric = self.metrics.baseline.elapsed_compute();

start_index = new_start;
output_ret = output.append_rows(columns, indices, start_index, time_metric, partition_id);
output_ret = output.append_rows(
columns,
indices,
start_index,
time_metric,
partition_id,
);

if let AppendRowStatus::StartIndex(new_start) = output_ret {
if new_start == start_index {
Expand All @@ -1073,7 +1095,7 @@ impl ShuffleRepartitioner {
}

/// consume the `buffered_partitions` and do spill into a single temp shuffle output file
async fn spill_into(
fn spill_into(
buffered_partitions: &mut [PartitionBuffer],
path: &Path,
num_output_partitions: usize,
Expand All @@ -1086,25 +1108,22 @@ async fn spill_into(
}
let path = path.to_owned();

task::spawn_blocking(move || {
let mut offsets = vec![0; num_output_partitions + 1];
let mut spill_data = OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(path)?;
let mut offsets = vec![0; num_output_partitions + 1];
let mut spill_data = OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(path)
.map_err(|e| DataFusionError::Execution(format!("Error occurred while spilling {}", e)))?;

for i in 0..num_output_partitions {
offsets[i] = spill_data.stream_position()?;
spill_data.write_all(&output_batches[i])?;
output_batches[i].clear();
}
// add one extra offset at last to ease partition length computation
offsets[num_output_partitions] = spill_data.stream_position()?;
Ok(offsets)
})
.await
.map_err(|e| DataFusionError::Execution(format!("Error occurred while spilling {}", e)))?
for i in 0..num_output_partitions {
offsets[i] = spill_data.stream_position()?;
spill_data.write_all(&output_batches[i])?;
output_batches[i].clear();
}
// add one extra offset at last to ease partition length computation
offsets[num_output_partitions] = spill_data.stream_position()?;
Ok(offsets)
}

impl Debug for ShuffleRepartitioner {
Expand Down Expand Up @@ -1139,8 +1158,11 @@ async fn external_shuffle(
context.session_config().batch_size(),
);

let mut id = 0;
while let Some(batch) = input.next().await {
println!("id: {}", id);
block_on(repartitioner.insert_batch(batch?))?;
id += 1;
}
repartitioner.shuffle_write().await
}
Expand Down Expand Up @@ -1634,15 +1656,13 @@ mod test {

#[test]
#[cfg_attr(miri, ignore)] // miri can't call foreign function `ZSTD_createCCtx`
#[cfg(not(target_os = "macos"))] // Github MacOS runner fails with "Too many open files".
fn test_large_number_of_partitions() {
shuffle_write_test(10000, 10, 200, Some(10 * 1024 * 1024));
shuffle_write_test(10000, 10, 2000, Some(10 * 1024 * 1024));
}

#[test]
#[cfg_attr(miri, ignore)] // miri can't call foreign function `ZSTD_createCCtx`
#[cfg(not(target_os = "macos"))] // Github MacOS runner fails with "Too many open files".
fn test_large_number_of_partitions_spilling() {
shuffle_write_test(10000, 100, 200, Some(10 * 1024 * 1024));
}
Expand Down

0 comments on commit f44b423

Please sign in to comment.