diff --git a/cpp/velox/shuffle/VeloxHashBasedShuffleWriter.cc b/cpp/velox/shuffle/VeloxHashBasedShuffleWriter.cc index 741ca8ab9b40..cc648cf7fdd0 100644 --- a/cpp/velox/shuffle/VeloxHashBasedShuffleWriter.cc +++ b/cpp/velox/shuffle/VeloxHashBasedShuffleWriter.cc @@ -303,7 +303,17 @@ arrow::Status VeloxHashBasedShuffleWriter::write(std::shared_ptr numRows -= length; } while (numRows); } else { - RETURN_NOT_OK(partitioningAndDoSplit(std::move(rv), memLimit)); + if (accumulateRows_ + rv->size() < 8192) { + accumulateRows_ += rv->size(); + initAccumulateDataset(rv); + accumulateDataset_->append(rv.get()); + } else { + initAccumulateDataset(rv); + accumulateDataset_->append(rv.get()); + RETURN_NOT_OK(partitioningAndDoSplit(std::move(accumulateDataset_), memLimit)); + accumulateDataset_ = nullptr; + accumulateRows_ = 0; + } } } return arrow::Status::OK(); @@ -329,6 +339,10 @@ arrow::Status VeloxHashBasedShuffleWriter::partitioningAndDoSplit(facebook::velo } arrow::Status VeloxHashBasedShuffleWriter::stop() { + if (accumulateDataset_ != nullptr) { + RETURN_NOT_OK(partitioningAndDoSplit(std::move(accumulateDataset_), kMinMemLimit)); + accumulateRows_ = 0; + } if (options_.partitioning != Partitioning::kSingle) { for (auto pid = 0; pid < numPartitions_; ++pid) { RETURN_NOT_OK(evictPartitionBuffers(pid, false)); diff --git a/cpp/velox/shuffle/VeloxHashBasedShuffleWriter.h b/cpp/velox/shuffle/VeloxHashBasedShuffleWriter.h index a11f84e952a6..142c7978bdc9 100644 --- a/cpp/velox/shuffle/VeloxHashBasedShuffleWriter.h +++ b/cpp/velox/shuffle/VeloxHashBasedShuffleWriter.h @@ -303,6 +303,15 @@ class VeloxHashBasedShuffleWriter : public VeloxShuffleWriter { arrow::Status partitioningAndDoSplit(facebook::velox::RowVectorPtr rv, int64_t memLimit); + void initAccumulateDataset(facebook::velox::RowVectorPtr& rv) { + if (accumulateDataset_) { + return; + } + std::vector children(rv->children().size(), nullptr); + accumulateDataset_ = + std::make_shared(veloxPool_.get(), rv->type(), nullptr, 0, std::move(children)); + } + BinaryArrayResizeState binaryArrayResizeState_{}; bool hasComplexType_ = false; diff --git a/cpp/velox/shuffle/VeloxShuffleWriter.h b/cpp/velox/shuffle/VeloxShuffleWriter.h index 104b87616291..2855831c51ae 100644 --- a/cpp/velox/shuffle/VeloxShuffleWriter.h +++ b/cpp/velox/shuffle/VeloxShuffleWriter.h @@ -124,6 +124,10 @@ class VeloxShuffleWriter : public ShuffleWriter { int32_t maxBatchSize_{0}; + uint32_t accumulateRows_{0}; + + facebook::velox::RowVectorPtr accumulateDataset_; + enum EvictState { kEvictable, kUnevictable }; // stat