Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
marin-ma committed Jul 25, 2024
1 parent 2c46dbc commit aeeb632
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 11 deletions.
27 changes: 19 additions & 8 deletions cpp/velox/shuffle/VeloxSortShuffleWriter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -168,16 +168,18 @@ arrow::Status VeloxSortShuffleWriter::insert(const facebook::velox::RowVectorPtr
ARROW_RETURN_IF(
rows == 0, arrow::Status::Invalid("Failed to insert rows. Remaining rows: " + std::to_string(remainingRows)));
}
// Spill to avoid offset_ overflow.
RETURN_NOT_OK(maybeSpill(rows));
// Allocate newArray can trigger spill.
growArrayIfNecessary(rows);
insertRows(row, rowOffset, rows);
rowOffset += rows;
}
return arrow::Status::OK();
}

void VeloxSortShuffleWriter::insertRows(facebook::velox::row::CompactRow& row, uint32_t offset, uint32_t rows) {
// Allocate newArray can trigger spill.
growArrayIfNecessary(rows);
VELOX_CHECK(!pages_.empty());
for (auto i = offset; i < offset + rows; ++i) {
auto size = row.serialize(i, currentPage_ + pageCursor_);
array_[offset_++] = {toCompactRowId(row2Partition_[i], pageNumber_, pageCursor_), size};
Expand Down Expand Up @@ -218,18 +220,27 @@ arrow::Status VeloxSortShuffleWriter::evictAllPartitions() {
}
RETURN_NOT_OK(evictPartition(pid, begin, cur));

pageCursor_ = 0;
pages_.clear();
pageAddresses_.clear();

sortedBuffer_ = nullptr;
offset_ = 0;
array_.clear();

sortedBuffer_ = nullptr;

if (!stopped_) {
auto numPages = pages_.size();
while (--numPages) {
pages_.pop_front();
}
auto& page = pages_.front();
memset(page->asMutable<char>(), 0, page->size());
pageAddresses_.resize(1);
pageAddresses_[0] = currentPage_;
pageNumber_ = 0;
pageCursor_ = 0;

// Allocate array_ can trigger spill.
array_.resize(initialSize_);
} else {
pages_.clear();
pageAddresses_.clear();
}
return arrow::Status::OK();
}
Expand Down
2 changes: 1 addition & 1 deletion cpp/velox/shuffle/VeloxSortShuffleWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ class VeloxSortShuffleWriter final : public VeloxShuffleWriter {
SortArray array_;
uint32_t offset_{0};

std::vector<facebook::velox::BufferPtr> pages_;
std::list<facebook::velox::BufferPtr> pages_;
std::vector<char*> pageAddresses_;
char* currentPage_;
uint32_t pageNumber_;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,9 @@ class VeloxCelebornColumnarShuffleWriter[K, V](

dep.metrics("shuffleWallTime").add(System.nanoTime() - startTime)
val nativeMetrics = if (isSort) {
dep.metrics("splitTime")
} else {
dep.metrics("sortTime")
} else {
dep.metrics("splitTime")
}
nativeMetrics
.add(
Expand Down

0 comments on commit aeeb632

Please sign in to comment.