From 34f87f0d2cbea754e81c23ec5e7d9b130e4f11ea Mon Sep 17 00:00:00 2001 From: "wangxinshuo.db" Date: Sun, 2 Jun 2024 16:34:12 +0800 Subject: [PATCH] accumulate batch when shuffle --- cpp/velox/shuffle/VeloxHashBasedShuffleWriter.cc | 16 +++++++++++++++- cpp/velox/shuffle/VeloxHashBasedShuffleWriter.h | 12 ++++++++++++ cpp/velox/shuffle/VeloxShuffleWriter.h | 4 ++++ 3 files changed, 31 insertions(+), 1 deletion(-) diff --git a/cpp/velox/shuffle/VeloxHashBasedShuffleWriter.cc b/cpp/velox/shuffle/VeloxHashBasedShuffleWriter.cc index daff1370332f1..62618e8b4b3cb 100644 --- a/cpp/velox/shuffle/VeloxHashBasedShuffleWriter.cc +++ b/cpp/velox/shuffle/VeloxHashBasedShuffleWriter.cc @@ -295,7 +295,17 @@ arrow::Status VeloxHashBasedShuffleWriter::write(std::shared_ptr numRows -= length; } while (numRows); } else { - RETURN_NOT_OK(partitioningAndDoSplit(std::move(rv), memLimit)); + if (accumulateRows_ + rv->size() < maxBatchSize_) { + accumulateRows_ += rv->size(); + initAccumulateDataset(rv); + accumulateDataset_->append(rv.get()); + } else { + initAccumulateDataset(rv); + accumulateDataset_->append(rv.get()); + RETURN_NOT_OK(partitioningAndDoSplit(std::move(accumulateDataset_), memLimit)); + accumulateDataset_ = nullptr; + accumulateRows_ = 0; + } } } return arrow::Status::OK(); @@ -321,6 +331,10 @@ arrow::Status VeloxHashBasedShuffleWriter::partitioningAndDoSplit(facebook::velo } arrow::Status VeloxHashBasedShuffleWriter::stop() { + if (accumulateDataset_ != nullptr) { + RETURN_NOT_OK(partitioningAndDoSplit(std::move(accumulateDataset_), kMinMemLimit)); + accumulateRows_ = 0; + } if (options_.partitioning != Partitioning::kSingle) { for (auto pid = 0; pid < numPartitions_; ++pid) { RETURN_NOT_OK(evictPartitionBuffers(pid, false)); diff --git a/cpp/velox/shuffle/VeloxHashBasedShuffleWriter.h b/cpp/velox/shuffle/VeloxHashBasedShuffleWriter.h index a11f84e952a61..910fe290778db 100644 --- a/cpp/velox/shuffle/VeloxHashBasedShuffleWriter.h +++ b/cpp/velox/shuffle/VeloxHashBasedShuffleWriter.h @@ -303,6 +303,18 @@ class VeloxHashBasedShuffleWriter : public VeloxShuffleWriter { arrow::Status partitioningAndDoSplit(facebook::velox::RowVectorPtr rv, int64_t memLimit); + void initAccumulateDataset(facebook::velox::RowVectorPtr& rv) { + if (accumulateDataset_) { + return; + } + std::vector children; + for (int i = 0; i < rv->children().size(); ++i) { + children.push_back(nullptr); + } + accumulateDataset_ = + std::make_shared(veloxPool_.get(), rv->type(), nullptr, 0, std::move(children)); + } + BinaryArrayResizeState binaryArrayResizeState_{}; bool hasComplexType_ = false; diff --git a/cpp/velox/shuffle/VeloxShuffleWriter.h b/cpp/velox/shuffle/VeloxShuffleWriter.h index 104b876162912..2855831c51ae7 100644 --- a/cpp/velox/shuffle/VeloxShuffleWriter.h +++ b/cpp/velox/shuffle/VeloxShuffleWriter.h @@ -124,6 +124,10 @@ class VeloxShuffleWriter : public ShuffleWriter { int32_t maxBatchSize_{0}; + uint32_t accumulateRows_{0}; + + facebook::velox::RowVectorPtr accumulateDataset_; + enum EvictState { kEvictable, kUnevictable }; // stat