diff --git a/native/Cargo.lock b/native/Cargo.lock index 9f436c32c..c9301c6e3 100644 --- a/native/Cargo.lock +++ b/native/Cargo.lock @@ -892,7 +892,6 @@ dependencies = [ "datafusion-expr", "datafusion-functions-nested", "datafusion-physical-expr", - "either", "flate2", "futures", "half", diff --git a/native/core/Cargo.toml b/native/core/Cargo.toml index 83a18b5bb..13f6b135f 100644 --- a/native/core/Cargo.toml +++ b/native/core/Cargo.toml @@ -77,7 +77,6 @@ crc32fast = "1.3.2" simd-adler32 = "0.3.7" datafusion-comet-spark-expr = { workspace = true } datafusion-comet-proto = { workspace = true } -either = "1.13.0" [dev-dependencies] pprof = { version = "0.13.0", features = ["flamegraph"] } diff --git a/native/core/src/execution/datafusion/shuffle_writer.rs b/native/core/src/execution/datafusion/shuffle_writer.rs index 46c3d4995..6c3174667 100644 --- a/native/core/src/execution/datafusion/shuffle_writer.rs +++ b/native/core/src/execution/datafusion/shuffle_writer.rs @@ -55,7 +55,6 @@ use datafusion::{ }, }; use datafusion_physical_expr::EquivalenceProperties; -use either::{Either, Left, Right}; use futures::{lock::Mutex, Stream, StreamExt, TryFutureExt, TryStreamExt}; use itertools::Itertools; use simd_adler32::Adler32; @@ -67,6 +66,14 @@ use crate::{ }; use datafusion_comet_spark_expr::spark_hash::create_murmur3_hashes; +/// The status of appending rows to a partition buffer. +enum AppendRowStatus { + /// The difference in memory usage after appending rows + MemDiff(Result), + /// The index of the next row to append + StartIndex(usize), +} + /// The shuffle writer operator maps each input partition to M output partitions based on a /// partitioning scheme. No guarantees are made about the order of the resulting partitions. #[derive(Debug)] @@ -265,14 +272,14 @@ impl PartitionBuffer { indices: &[usize], start_index: usize, time_metric: &Time, - ) -> Either, usize> { + ) -> AppendRowStatus { let mut mem_diff = 0; let mut start = start_index; // lazy init because some partition may be empty let init = self.init_active_if_necessary(); if init.is_err() { - return Right(start); + return AppendRowStatus::StartIndex(start); } mem_diff += init.unwrap(); @@ -289,20 +296,20 @@ impl PartitionBuffer { let mut timer = time_metric.timer(); let flush = self.flush(); if let Err(e) = flush { - return Left(Err(e)); + return AppendRowStatus::MemDiff(Err(e)); } mem_diff += flush.unwrap(); timer.stop(); let init = self.init_active_if_necessary(); if init.is_err() { - return Right(end); + return AppendRowStatus::StartIndex(end); } mem_diff += init.unwrap(); } start = end; } - Left(Ok(mem_diff)) + AppendRowStatus::MemDiff(Ok(mem_diff)) } /// flush active data into frozen bytes @@ -1001,11 +1008,11 @@ impl ShuffleRepartitioner { loop { match output_ret { - Left(l) => { + AppendRowStatus::MemDiff(l) => { mem_diff += l?; break; } - Right(new_start) => { + AppendRowStatus::StartIndex(new_start) => { // Cannot allocate enough memory for the array builders in the partition, // spill partitions and retry. self.spill().await?; @@ -1018,7 +1025,7 @@ impl ShuffleRepartitioner { start_index = new_start; output_ret = output.append_rows(columns, indices, start_index, time_metric); - if let Right(new_start) = output_ret { + if let AppendRowStatus::StartIndex(new_start) = output_ret { if new_start == start_index { // If the start index is not updated, it means that the partition // is still not able to allocate enough memory for the array builders.