Skip to content

Commit

Permalink
accumulate batch when shuffle
Browse files Browse the repository at this point in the history
  • Loading branch information
wangxinshuo.db committed Jun 3, 2024
1 parent 26ff58d commit 34f87f0
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 1 deletion.
16 changes: 15 additions & 1 deletion cpp/velox/shuffle/VeloxHashBasedShuffleWriter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,17 @@ arrow::Status VeloxHashBasedShuffleWriter::write(std::shared_ptr<ColumnarBatch>
numRows -= length;
} while (numRows);
} else {
RETURN_NOT_OK(partitioningAndDoSplit(std::move(rv), memLimit));
if (accumulateRows_ + rv->size() < maxBatchSize_) {
accumulateRows_ += rv->size();
initAccumulateDataset(rv);
accumulateDataset_->append(rv.get());
} else {
initAccumulateDataset(rv);
accumulateDataset_->append(rv.get());
RETURN_NOT_OK(partitioningAndDoSplit(std::move(accumulateDataset_), memLimit));
accumulateDataset_ = nullptr;
accumulateRows_ = 0;
}
}
}
return arrow::Status::OK();
Expand All @@ -321,6 +331,10 @@ arrow::Status VeloxHashBasedShuffleWriter::partitioningAndDoSplit(facebook::velo
}

arrow::Status VeloxHashBasedShuffleWriter::stop() {
if (accumulateDataset_ != nullptr) {
RETURN_NOT_OK(partitioningAndDoSplit(std::move(accumulateDataset_), kMinMemLimit));
accumulateRows_ = 0;
}
if (options_.partitioning != Partitioning::kSingle) {
for (auto pid = 0; pid < numPartitions_; ++pid) {
RETURN_NOT_OK(evictPartitionBuffers(pid, false));
Expand Down
12 changes: 12 additions & 0 deletions cpp/velox/shuffle/VeloxHashBasedShuffleWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,18 @@ class VeloxHashBasedShuffleWriter : public VeloxShuffleWriter {

arrow::Status partitioningAndDoSplit(facebook::velox::RowVectorPtr rv, int64_t memLimit);

void initAccumulateDataset(facebook::velox::RowVectorPtr& rv) {
if (accumulateDataset_) {
return;
}
std::vector<facebook::velox::VectorPtr> children;
for (int i = 0; i < rv->children().size(); ++i) {
children.push_back(nullptr);
}
accumulateDataset_ =
std::make_shared<facebook::velox::RowVector>(veloxPool_.get(), rv->type(), nullptr, 0, std::move(children));
}

BinaryArrayResizeState binaryArrayResizeState_{};

bool hasComplexType_ = false;
Expand Down
4 changes: 4 additions & 0 deletions cpp/velox/shuffle/VeloxShuffleWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,10 @@ class VeloxShuffleWriter : public ShuffleWriter {

int32_t maxBatchSize_{0};

uint32_t accumulateRows_{0};

facebook::velox::RowVectorPtr accumulateDataset_;

enum EvictState { kEvictable, kUnevictable };

// stat
Expand Down

0 comments on commit 34f87f0

Please sign in to comment.