From 398a42b16353b72c5d35870465b96f7cbdc6f91a Mon Sep 17 00:00:00 2001 From: "wangxinshuo.db" Date: Sun, 2 Jun 2024 16:34:12 +0800 Subject: [PATCH] accumulate batch when shuffle --- cpp/velox/shuffle/VeloxHashBasedShuffleWriter.cc | 16 +++++++++++++++- cpp/velox/shuffle/VeloxHashBasedShuffleWriter.h | 9 +++++++++ cpp/velox/shuffle/VeloxShuffleWriter.h | 4 ++++ 3 files changed, 28 insertions(+), 1 deletion(-) diff --git a/cpp/velox/shuffle/VeloxHashBasedShuffleWriter.cc b/cpp/velox/shuffle/VeloxHashBasedShuffleWriter.cc index daff1370332f..9798e5c83768 100644 --- a/cpp/velox/shuffle/VeloxHashBasedShuffleWriter.cc +++ b/cpp/velox/shuffle/VeloxHashBasedShuffleWriter.cc @@ -295,7 +295,17 @@ arrow::Status VeloxHashBasedShuffleWriter::write(std::shared_ptr numRows -= length; } while (numRows); } else { - RETURN_NOT_OK(partitioningAndDoSplit(std::move(rv), memLimit)); + if (accumulateRows_ + rv->size() < 8192) { + accumulateRows_ += rv->size(); + initAccumulateDataset(rv); + accumulateDataset_->append(rv.get()); + } else { + initAccumulateDataset(rv); + accumulateDataset_->append(rv.get()); + RETURN_NOT_OK(partitioningAndDoSplit(std::move(accumulateDataset_), memLimit)); + accumulateDataset_ = nullptr; + accumulateRows_ = 0; + } } } return arrow::Status::OK(); @@ -321,6 +331,10 @@ arrow::Status VeloxHashBasedShuffleWriter::partitioningAndDoSplit(facebook::velo } arrow::Status VeloxHashBasedShuffleWriter::stop() { + if (accumulateDataset_ != nullptr) { + RETURN_NOT_OK(partitioningAndDoSplit(std::move(accumulateDataset_), kMinMemLimit)); + accumulateRows_ = 0; + } if (options_.partitioning != Partitioning::kSingle) { for (auto pid = 0; pid < numPartitions_; ++pid) { RETURN_NOT_OK(evictPartitionBuffers(pid, false)); diff --git a/cpp/velox/shuffle/VeloxHashBasedShuffleWriter.h b/cpp/velox/shuffle/VeloxHashBasedShuffleWriter.h index a11f84e952a6..142c7978bdc9 100644 --- a/cpp/velox/shuffle/VeloxHashBasedShuffleWriter.h +++ b/cpp/velox/shuffle/VeloxHashBasedShuffleWriter.h @@ -303,6 +303,15 @@ class VeloxHashBasedShuffleWriter : public VeloxShuffleWriter { arrow::Status partitioningAndDoSplit(facebook::velox::RowVectorPtr rv, int64_t memLimit); + void initAccumulateDataset(facebook::velox::RowVectorPtr& rv) { + if (accumulateDataset_) { + return; + } + std::vector children(rv->children().size(), nullptr); + accumulateDataset_ = + std::make_shared(veloxPool_.get(), rv->type(), nullptr, 0, std::move(children)); + } + BinaryArrayResizeState binaryArrayResizeState_{}; bool hasComplexType_ = false; diff --git a/cpp/velox/shuffle/VeloxShuffleWriter.h b/cpp/velox/shuffle/VeloxShuffleWriter.h index 104b87616291..2855831c51ae 100644 --- a/cpp/velox/shuffle/VeloxShuffleWriter.h +++ b/cpp/velox/shuffle/VeloxShuffleWriter.h @@ -124,6 +124,10 @@ class VeloxShuffleWriter : public ShuffleWriter { int32_t maxBatchSize_{0}; + uint32_t accumulateRows_{0}; + + facebook::velox::RowVectorPtr accumulateDataset_; + enum EvictState { kEvictable, kUnevictable }; // stat