diff --git a/cpp/core/shuffle/LocalPartitionWriter.cc b/cpp/core/shuffle/LocalPartitionWriter.cc index 4383e6489237..e74a4e277d97 100644 --- a/cpp/core/shuffle/LocalPartitionWriter.cc +++ b/cpp/core/shuffle/LocalPartitionWriter.cc @@ -32,12 +32,12 @@ class LocalPartitionWriter::LocalSpiller { public: LocalSpiller( std::shared_ptr os, - const std::string& spillFile, + std::string spillFile, uint32_t compressionThreshold, arrow::MemoryPool* pool, arrow::util::Codec* codec) : os_(os), - spillFile_(spillFile), + spillFile_(std::move(spillFile)), compressionThreshold_(compressionThreshold), pool_(pool), codec_(codec), @@ -69,13 +69,19 @@ class LocalPartitionWriter::LocalSpiller { return arrow::Status::OK(); } - arrow::Result> finish() { - if (finished_) { - return arrow::Status::Invalid("Calling toBlockPayload() on a finished SpillEvictor."); - } + arrow::Result> finish(bool close) { + ARROW_RETURN_IF(finished_, arrow::Status::Invalid("Calling finish() on a finished LocalSpiller.")); + ARROW_RETURN_IF(os_->closed(), arrow::Status::Invalid("Spill file os has been closed.")); + finished_ = true; - RETURN_NOT_OK(os_->Close()); - diskSpill_->setSpillFile(std::move(spillFile_)); + if (close) { + RETURN_NOT_OK(os_->Close()); + } + // std::cout << "Finish spill. spillTime_: " << spillTime_ << ", compressTime_: " << compressTime_ + // << ", spillFile: " << spillFile_ << std::endl; + diskSpill_->setSpillFile(spillFile_); + diskSpill_->setSpillTime(spillTime_); + diskSpill_->setCompressTime(compressTime_); return std::move(diskSpill_); } @@ -83,14 +89,6 @@ class LocalPartitionWriter::LocalSpiller { return finished_; } - int64_t getSpillTime() const { - return spillTime_; - } - - int64_t getCompressTime() const { - return compressTime_; - } - private: std::shared_ptr os_; std::string spillFile_; @@ -442,9 +440,32 @@ arrow::Status LocalPartitionWriter::stop(ShuffleWriterMetrics* metrics) { } stopped_ = true; - RETURN_NOT_OK(finishSpill()); + if (useSpillFileAsDataFile_) { + RETURN_NOT_OK(finishSpill(false)); + // The last spill has been written to data file. + auto spill = std::move(spills_.back()); + spills_.pop_back(); + + // Merge the remaining partitions from spills. + if (spills_.size() > 0) { + for (auto pid = lastEvictPid_ + 1; pid < numPartitions_; ++pid) { + auto bytesEvicted = totalBytesEvicted_; + RETURN_NOT_OK(mergeSpills(pid)); + partitionLengths_[pid] = totalBytesEvicted_ - bytesEvicted; + } + // std::cout << "Stop and merge spills from " << lastEvictPid_ + 1 << " to " << numPartitions_ - 1 + // << ", num spills: " << spills_.size() << std::endl; + } - if (!useSpillFileAsDataFile_) { + for (auto pid = 0; pid < numPartitions_; ++pid) { + while (auto payload = spill->nextPayload(pid)) { + partitionLengths_[pid] += payload->rawSize(); + } + } + writeTime_ = spill->spillTime(); + compressTime_ += spill->compressTime(); + } else { + RETURN_NOT_OK(finishSpill(true)); // Open final data file. // If options_.bufferedWrite is set, it will acquire 16KB memory that can trigger spill. RETURN_NOT_OK(openDataFile()); @@ -473,33 +494,25 @@ arrow::Status LocalPartitionWriter::stop(ShuffleWriterMetrics* metrics) { ARROW_ASSIGN_OR_RAISE(endInFinalFile, dataFileOs_->Tell()); partitionLengths_[pid] = endInFinalFile - startInFinalFile; } - - for (const auto& spill : spills_) { - for (auto pid = 0; pid < numPartitions_; ++pid) { - if (spill->hasNextPayload(pid)) { - return arrow::Status::Invalid("Merging from spill is not exhausted."); - } - } - } - - ARROW_ASSIGN_OR_RAISE(totalBytesWritten_, dataFileOs_->Tell()); - // Close Final file. Clear buffered resources. RETURN_NOT_OK(clearResource()); - } else { - auto spill = std::move(spills_.back()); + } + + // Check all spills are merged. + auto s = 0; + for (const auto& spill : spills_) { + compressTime_ += spill->compressTime(); + spillTime_ += spill->spillTime(); for (auto pid = 0; pid < numPartitions_; ++pid) { - uint64_t length = 0; - while (auto payload = spill->nextPayload(pid)) { - length += payload->rawSize(); + if (spill->hasNextPayload(pid)) { + return arrow::Status::Invalid( + "Merging from spill " + std::to_string(s) + " is not exhausted. pid: " + std::to_string(pid)); } - partitionLengths_[pid] = length; } - totalBytesWritten_ = std::filesystem::file_size(dataFile_); - writeTime_ = spillTime_; - spillTime_ = 0; - DLOG(INFO) << "Use spill file as data file: " << dataFile_; + ++s; } + spills_.clear(); + // Populate shuffle writer metrics. RETURN_NOT_OK(populateMetrics(metrics)); return arrow::Status::OK(); @@ -508,27 +521,29 @@ arrow::Status LocalPartitionWriter::stop(ShuffleWriterMetrics* metrics) { arrow::Status LocalPartitionWriter::requestSpill(bool isFinal) { if (!spiller_ || spiller_->finished()) { std::string spillFile; - if (isFinal && useSpillFileAsDataFile()) { + std::shared_ptr os; + if (isFinal) { + RETURN_NOT_OK(openDataFile()); spillFile = dataFile_; + os = dataFileOs_; + useSpillFileAsDataFile_ = true; } else { ARROW_ASSIGN_OR_RAISE(spillFile, createTempShuffleFile(nextSpilledFileDir())); + ARROW_ASSIGN_OR_RAISE(auto raw, arrow::io::FileOutputStream::Open(spillFile, true)); + ARROW_ASSIGN_OR_RAISE(os, arrow::io::BufferedOutputStream::Create(16384, pool_, raw)); } - ARROW_ASSIGN_OR_RAISE(auto raw, arrow::io::FileOutputStream::Open(spillFile, true)); - ARROW_ASSIGN_OR_RAISE(auto os, arrow::io::BufferedOutputStream::Create(16384, pool_, raw)); spiller_ = std::make_unique( os, std::move(spillFile), options_.compressionThreshold, payloadPool_.get(), codec_.get()); } return arrow::Status::OK(); } -arrow::Status LocalPartitionWriter::finishSpill() { +arrow::Status LocalPartitionWriter::finishSpill(bool close) { // Finish the spiller. No compression, no spill. if (spiller_ && !spiller_->finished()) { auto spiller = std::move(spiller_); spills_.emplace_back(); - ARROW_ASSIGN_OR_RAISE(spills_.back(), spiller->finish()); - spillTime_ += spiller->getSpillTime(); - compressTime_ += spiller->getCompressTime(); + ARROW_ASSIGN_OR_RAISE(spills_.back(), spiller->finish(close)); } return arrow::Status::OK(); } @@ -543,18 +558,31 @@ arrow::Status LocalPartitionWriter::evict( rawPartitionLengths_[partitionId] += inMemoryPayload->getBufferSize(); if (evictType == Evict::kSortSpill) { - if (partitionId < lastEvictPid_) { - RETURN_NOT_OK(finishSpill()); + if (lastEvictPid_ != -1 && (partitionId < lastEvictPid_ || (isFinal && !dataFileOs_))) { + lastEvictPid_ = -1; + RETURN_NOT_OK(finishSpill(true)); } - lastEvictPid_ = partitionId; - RETURN_NOT_OK(requestSpill(isFinal)); auto payloadType = codec_ ? Payload::Type::kCompressed : Payload::Type::kUncompressed; ARROW_ASSIGN_OR_RAISE( auto payload, inMemoryPayload->toBlockPayload(payloadType, payloadPool_.get(), codec_ ? codec_.get() : nullptr)); - RETURN_NOT_OK(spiller_->spill(partitionId, std::move(payload))); + if (!isFinal) { + RETURN_NOT_OK(spiller_->spill(partitionId, std::move(payload))); + } else { + if (spills_.size() > 0) { + for (auto pid = lastEvictPid_ + 1; pid <= partitionId; ++pid) { + auto bytesEvicted = totalBytesEvicted_; + RETURN_NOT_OK(mergeSpills(pid)); + partitionLengths_[pid] = totalBytesEvicted_ - bytesEvicted; + } + // std::cout << "Merge spills from " << lastEvictPid_ + 1 << " to " << partitionId + // << ", num spills: " << spills_.size() << std::endl; + } + RETURN_NOT_OK(spiller_->spill(partitionId, std::move(payload))); + } + lastEvictPid_ = partitionId; return arrow::Status::OK(); } @@ -586,8 +614,8 @@ arrow::Status LocalPartitionWriter::evict( arrow::Status LocalPartitionWriter::evict(uint32_t partitionId, std::unique_ptr blockPayload, bool stop) { rawPartitionLengths_[partitionId] += blockPayload->rawSize(); - if (partitionId < lastEvictPid_) { - RETURN_NOT_OK(finishSpill()); + if (lastEvictPid_ != -1 && partitionId < lastEvictPid_) { + RETURN_NOT_OK(finishSpill(true)); } lastEvictPid_ = partitionId; @@ -598,7 +626,7 @@ arrow::Status LocalPartitionWriter::evict(uint32_t partitionId, std::unique_ptr< arrow::Status LocalPartitionWriter::reclaimFixedSize(int64_t size, int64_t* actual) { // Finish last spiller. - RETURN_NOT_OK(finishSpill()); + RETURN_NOT_OK(finishSpill(true)); int64_t reclaimed = 0; // Reclaim memory from payloadCache. @@ -629,7 +657,7 @@ arrow::Status LocalPartitionWriter::reclaimFixedSize(int64_t size, int64_t* actu // This is not accurate. When the evicted partition buffers are not copied, the merged ones // are resized from the original buffers thus allocated from partitionBufferPool. reclaimed += beforeSpill - payloadPool_->bytes_allocated(); - RETURN_NOT_OK(finishSpill()); + RETURN_NOT_OK(finishSpill(true)); } *actual = reclaimed; return arrow::Status::OK(); @@ -646,18 +674,9 @@ arrow::Status LocalPartitionWriter::populateMetrics(ShuffleWriterMetrics* metric metrics->totalEvictTime += spillTime_; metrics->totalWriteTime += writeTime_; metrics->totalBytesEvicted += totalBytesEvicted_; - metrics->totalBytesWritten += totalBytesWritten_; + metrics->totalBytesWritten += std::filesystem::file_size(dataFile_); metrics->partitionLengths = std::move(partitionLengths_); metrics->rawPartitionLengths = std::move(rawPartitionLengths_); return arrow::Status::OK(); } - -bool LocalPartitionWriter::useSpillFileAsDataFile() { - if (!payloadCache_ && !merger_ && !spiller_ && spills_.size() == 0) { - useSpillFileAsDataFile_ = true; - return true; - } - return false; -} - } // namespace gluten diff --git a/cpp/core/shuffle/LocalPartitionWriter.h b/cpp/core/shuffle/LocalPartitionWriter.h index a29f04fb748f..efd7b4df3f4f 100644 --- a/cpp/core/shuffle/LocalPartitionWriter.h +++ b/cpp/core/shuffle/LocalPartitionWriter.h @@ -83,7 +83,7 @@ class LocalPartitionWriter : public PartitionWriter { arrow::Status requestSpill(bool isFinal); - arrow::Status finishSpill(); + arrow::Status finishSpill(bool close); std::string nextSpilledFileDir(); @@ -95,8 +95,6 @@ class LocalPartitionWriter : public PartitionWriter { arrow::Status populateMetrics(ShuffleWriterMetrics* metrics); - bool useSpillFileAsDataFile(); - std::string dataFile_; std::vector localDirs_; @@ -113,10 +111,9 @@ class LocalPartitionWriter : public PartitionWriter { std::shared_ptr dataFileOs_; int64_t totalBytesEvicted_{0}; - int64_t totalBytesWritten_{0}; std::vector partitionLengths_; std::vector rawPartitionLengths_; - uint32_t lastEvictPid_{0}; + int32_t lastEvictPid_{-1}; }; } // namespace gluten diff --git a/cpp/core/shuffle/Spill.cc b/cpp/core/shuffle/Spill.cc index 0603b5edfd18..0bbe667ab4d8 100644 --- a/cpp/core/shuffle/Spill.cc +++ b/cpp/core/shuffle/Spill.cc @@ -86,7 +86,23 @@ void Spill::setSpillFile(const std::string& spillFile) { spillFile_ = spillFile; } +void Spill::setSpillTime(int64_t spillTime) { + spillTime_ = spillTime; +} + +void Spill::setCompressTime(int64_t compressTime) { + compressTime_ = compressTime; +} + std::string Spill::spillFile() const { return spillFile_; } + +int64_t Spill::spillTime() const { + return spillTime_; +} + +int64_t Spill::compressTime() const { + return compressTime_; +} } // namespace gluten diff --git a/cpp/core/shuffle/Spill.h b/cpp/core/shuffle/Spill.h index 71cb3d0515e1..7ee60ef299fe 100644 --- a/cpp/core/shuffle/Spill.h +++ b/cpp/core/shuffle/Spill.h @@ -52,8 +52,16 @@ class Spill final { void setSpillFile(const std::string& spillFile); + void setSpillTime(int64_t spillTime); + + void setCompressTime(int64_t compressTime); + std::string spillFile() const; + int64_t spillTime() const; + + int64_t compressTime() const; + private: struct PartitionPayload { uint32_t partitionId{}; @@ -65,6 +73,8 @@ class Spill final { std::list partitionPayloads_{}; std::shared_ptr inputStream_{}; std::string spillFile_; + int64_t spillTime_; + int64_t compressTime_; arrow::io::InputStream* rawIs_; diff --git a/cpp/velox/shuffle/VeloxSortShuffleWriter.cc b/cpp/velox/shuffle/VeloxSortShuffleWriter.cc index d7db69659d25..c0d9b467d98c 100644 --- a/cpp/velox/shuffle/VeloxSortShuffleWriter.cc +++ b/cpp/velox/shuffle/VeloxSortShuffleWriter.cc @@ -205,7 +205,7 @@ void VeloxSortShuffleWriter::insertRows(facebook::velox::row::CompactRow& row, u } } -arrow::Status VeloxSortShuffleWriter::maybeSpill(int32_t nextRows) { +arrow::Status VeloxSortShuffleWriter::maybeSpill(uint32_t nextRows) { if ((uint64_t)offset_ + nextRows > std::numeric_limits::max()) { RETURN_NOT_OK(evictAllPartitions()); } @@ -213,9 +213,12 @@ arrow::Status VeloxSortShuffleWriter::maybeSpill(int32_t nextRows) { } arrow::Status VeloxSortShuffleWriter::evictAllPartitions() { + VELOX_CHECK(offset_ > 0); EvictGuard evictGuard{evictState_}; auto numRecords = offset_; + // offset_ is used for checking spillable data. + offset_ = 0; int32_t begin = 0; { ScopedTimer timer(&sortTime_); @@ -257,7 +260,6 @@ arrow::Status VeloxSortShuffleWriter::evictAllPartitions() { pageCursor_ = 0; // Reset and reallocate array_ to minimal size. Allocate array_ can trigger spill. - offset_ = 0; initArray(); } return arrow::Status::OK(); diff --git a/cpp/velox/shuffle/VeloxSortShuffleWriter.h b/cpp/velox/shuffle/VeloxSortShuffleWriter.h index 747593ae457d..69b8b2503095 100644 --- a/cpp/velox/shuffle/VeloxSortShuffleWriter.h +++ b/cpp/velox/shuffle/VeloxSortShuffleWriter.h @@ -71,7 +71,7 @@ class VeloxSortShuffleWriter final : public VeloxShuffleWriter { void insertRows(facebook::velox::row::CompactRow& row, uint32_t offset, uint32_t rows); - arrow::Status maybeSpill(int32_t nextRows); + arrow::Status maybeSpill(uint32_t nextRows); arrow::Status evictAllPartitions();