From aeeb632e982a18e289c8931b81ef9bdecfb8a0c8 Mon Sep 17 00:00:00 2001 From: Rong Ma Date: Thu, 25 Jul 2024 02:45:03 +0000 Subject: [PATCH] fix --- cpp/velox/shuffle/VeloxSortShuffleWriter.cc | 27 +++++++++++++------ cpp/velox/shuffle/VeloxSortShuffleWriter.h | 2 +- .../VeloxCelebornColumnarShuffleWriter.scala | 4 +-- 3 files changed, 22 insertions(+), 11 deletions(-) diff --git a/cpp/velox/shuffle/VeloxSortShuffleWriter.cc b/cpp/velox/shuffle/VeloxSortShuffleWriter.cc index 5439b73402f2..f03065eeed39 100644 --- a/cpp/velox/shuffle/VeloxSortShuffleWriter.cc +++ b/cpp/velox/shuffle/VeloxSortShuffleWriter.cc @@ -168,7 +168,10 @@ arrow::Status VeloxSortShuffleWriter::insert(const facebook::velox::RowVectorPtr ARROW_RETURN_IF( rows == 0, arrow::Status::Invalid("Failed to insert rows. Remaining rows: " + std::to_string(remainingRows))); } + // Spill to avoid offset_ overflow. RETURN_NOT_OK(maybeSpill(rows)); + // Allocate newArray can trigger spill. + growArrayIfNecessary(rows); insertRows(row, rowOffset, rows); rowOffset += rows; } @@ -176,8 +179,7 @@ arrow::Status VeloxSortShuffleWriter::insert(const facebook::velox::RowVectorPtr } void VeloxSortShuffleWriter::insertRows(facebook::velox::row::CompactRow& row, uint32_t offset, uint32_t rows) { - // Allocate newArray can trigger spill. - growArrayIfNecessary(rows); + VELOX_CHECK(!pages_.empty()); for (auto i = offset; i < offset + rows; ++i) { auto size = row.serialize(i, currentPage_ + pageCursor_); array_[offset_++] = {toCompactRowId(row2Partition_[i], pageNumber_, pageCursor_), size}; @@ -218,18 +220,27 @@ arrow::Status VeloxSortShuffleWriter::evictAllPartitions() { } RETURN_NOT_OK(evictPartition(pid, begin, cur)); - pageCursor_ = 0; - pages_.clear(); - pageAddresses_.clear(); - + sortedBuffer_ = nullptr; offset_ = 0; array_.clear(); - sortedBuffer_ = nullptr; - if (!stopped_) { + auto numPages = pages_.size(); + while (--numPages) { + pages_.pop_front(); + } + auto& page = pages_.front(); + memset(page->asMutable(), 0, page->size()); + pageAddresses_.resize(1); + pageAddresses_[0] = currentPage_; + pageNumber_ = 0; + pageCursor_ = 0; + // Allocate array_ can trigger spill. array_.resize(initialSize_); + } else { + pages_.clear(); + pageAddresses_.clear(); } return arrow::Status::OK(); } diff --git a/cpp/velox/shuffle/VeloxSortShuffleWriter.h b/cpp/velox/shuffle/VeloxSortShuffleWriter.h index 0185662cc0b4..45f54556081d 100644 --- a/cpp/velox/shuffle/VeloxSortShuffleWriter.h +++ b/cpp/velox/shuffle/VeloxSortShuffleWriter.h @@ -94,7 +94,7 @@ class VeloxSortShuffleWriter final : public VeloxShuffleWriter { SortArray array_; uint32_t offset_{0}; - std::vector pages_; + std::list pages_; std::vector pageAddresses_; char* currentPage_; uint32_t pageNumber_; diff --git a/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarShuffleWriter.scala b/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarShuffleWriter.scala index f13ef20a59ff..4069f1b44324 100644 --- a/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarShuffleWriter.scala +++ b/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarShuffleWriter.scala @@ -87,9 +87,9 @@ class VeloxCelebornColumnarShuffleWriter[K, V]( dep.metrics("shuffleWallTime").add(System.nanoTime() - startTime) val nativeMetrics = if (isSort) { - dep.metrics("splitTime") - } else { dep.metrics("sortTime") + } else { + dep.metrics("splitTime") } nativeMetrics .add(