Skip to content

Commit

Permalink
fix in stop step and format
Browse files Browse the repository at this point in the history
  • Loading branch information
wangxinshuo.db committed Jun 3, 2024
1 parent 82708df commit 742ecb7
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 8 deletions.
17 changes: 13 additions & 4 deletions cpp/velox/shuffle/VeloxHashBasedShuffleWriter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -294,19 +294,19 @@ arrow::Status VeloxHashBasedShuffleWriter::write(std::shared_ptr<ColumnarBatch>
offset += length;
numRows -= length;
} while (numRows);
} else if (accumulateBatches_ + rv->size() < maxBatchSize_) {
} else if (accumulateRows_ + rv->size() < maxBatchSize_) {
accumulateDataset_.emplace_back(rv);
accumulateBatches_ += rv->size();
accumulateRows_ += rv->size();
} else {
if (accumulateDataset_.empty()) {
RETURN_NOT_OK(partitioningAndDoSplit(std::move(rv), memLimit));
} else {
for(auto v : accumulateDataset_) {
for (auto v : accumulateDataset_) {
rv->append(v.get());
}
RETURN_NOT_OK(partitioningAndDoSplit(std::move(rv), memLimit));
accumulateDataset_.clear();
accumulateBatches_ = 0;
accumulateRows_ = 0;
}
}
}
Expand All @@ -333,6 +333,15 @@ arrow::Status VeloxHashBasedShuffleWriter::partitioningAndDoSplit(facebook::velo
}

arrow::Status VeloxHashBasedShuffleWriter::stop() {
if (!accumulateDataset_.empty()) {
auto rv = accumulateDataset_[0];
for (int i = 1; i < accumulateDataset_.size(); ++i) {
rv->append(accumulateDataset_[i].get());
}
RETURN_NOT_OK(partitioningAndDoSplit(std::move(rv), kMinMemLimit));
accumulateDataset_.clear();
accumulateRows_ = 0;
}
if (options_.partitioning != Partitioning::kSingle) {
for (auto pid = 0; pid < numPartitions_; ++pid) {
RETURN_NOT_OK(evictPartitionBuffers(pid, false));
Expand Down
3 changes: 0 additions & 3 deletions cpp/velox/shuffle/VeloxHashBasedShuffleWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,6 @@ class VeloxHashBasedShuffleWriter : public VeloxShuffleWriter {

template <typename T>
arrow::Status splitFixedType(const uint8_t* srcAddr, const std::vector<uint8_t*>& dstAddrs) {
ScopedTimer timer(&splitFixedTypeTimer_);
for (auto& pid : partitionUsed_) {
auto dstPidBase = (T*)(dstAddrs[pid] + partitionBufferBase_[pid] * sizeof(T));
auto pos = partition2RowOffsetBase_[pid];
Expand Down Expand Up @@ -402,8 +401,6 @@ class VeloxHashBasedShuffleWriter : public VeloxShuffleWriter {
facebook::velox::serializer::presto::PrestoVectorSerde serde_;

SplitState splitState_{kInit};

int64_t splitFixedTypeTimer_{0};
}; // class VeloxHashBasedShuffleWriter

} // namespace gluten
2 changes: 1 addition & 1 deletion cpp/velox/shuffle/VeloxShuffleWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ class VeloxShuffleWriter : public ShuffleWriter {

int32_t maxBatchSize_{0};

int32_t accumulateBatches_{0};
uint32_t accumulateRows_{0};

std::vector<facebook::velox::RowVectorPtr> accumulateDataset_;

Expand Down

0 comments on commit 742ecb7

Please sign in to comment.