Skip to content

Commit

Permalink
[VL] Optimize the performance of hash based shuffle by accumulating b…
Browse files Browse the repository at this point in the history
…atches
  • Loading branch information
XinShuoWang authored Jun 11, 2024
1 parent 13babf3 commit ec3e92e
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 1 deletion.
16 changes: 15 additions & 1 deletion cpp/velox/shuffle/VeloxHashBasedShuffleWriter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,17 @@ arrow::Status VeloxHashBasedShuffleWriter::write(std::shared_ptr<ColumnarBatch>
numRows -= length;
} while (numRows);
} else {
RETURN_NOT_OK(partitioningAndDoSplit(std::move(rv), memLimit));
if (accumulateRows_ + rv->size() < 8192) {
accumulateRows_ += rv->size();
initAccumulateDataset(rv);
accumulateDataset_->append(rv.get());
} else {
initAccumulateDataset(rv);
accumulateDataset_->append(rv.get());
RETURN_NOT_OK(partitioningAndDoSplit(std::move(accumulateDataset_), memLimit));
accumulateDataset_ = nullptr;
accumulateRows_ = 0;
}
}
}
return arrow::Status::OK();
Expand All @@ -329,6 +339,10 @@ arrow::Status VeloxHashBasedShuffleWriter::partitioningAndDoSplit(facebook::velo
}

arrow::Status VeloxHashBasedShuffleWriter::stop() {
if (accumulateDataset_ != nullptr) {
RETURN_NOT_OK(partitioningAndDoSplit(std::move(accumulateDataset_), kMinMemLimit));
accumulateRows_ = 0;
}
if (options_.partitioning != Partitioning::kSingle) {
for (auto pid = 0; pid < numPartitions_; ++pid) {
RETURN_NOT_OK(evictPartitionBuffers(pid, false));
Expand Down
9 changes: 9 additions & 0 deletions cpp/velox/shuffle/VeloxHashBasedShuffleWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,15 @@ class VeloxHashBasedShuffleWriter : public VeloxShuffleWriter {

arrow::Status partitioningAndDoSplit(facebook::velox::RowVectorPtr rv, int64_t memLimit);

void initAccumulateDataset(facebook::velox::RowVectorPtr& rv) {
if (accumulateDataset_) {
return;
}
std::vector<facebook::velox::VectorPtr> children(rv->children().size(), nullptr);
accumulateDataset_ =
std::make_shared<facebook::velox::RowVector>(veloxPool_.get(), rv->type(), nullptr, 0, std::move(children));
}

BinaryArrayResizeState binaryArrayResizeState_{};

bool hasComplexType_ = false;
Expand Down
4 changes: 4 additions & 0 deletions cpp/velox/shuffle/VeloxShuffleWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,10 @@ class VeloxShuffleWriter : public ShuffleWriter {

int32_t maxBatchSize_{0};

uint32_t accumulateRows_{0};

facebook::velox::RowVectorPtr accumulateDataset_;

enum EvictState { kEvictable, kUnevictable };

// stat
Expand Down

0 comments on commit ec3e92e

Please sign in to comment.