diff --git a/cpp/core/shuffle/LocalPartitionWriter.cc b/cpp/core/shuffle/LocalPartitionWriter.cc index 3ce72aeaa261..5ef803859a66 100644 --- a/cpp/core/shuffle/LocalPartitionWriter.cc +++ b/cpp/core/shuffle/LocalPartitionWriter.cc @@ -272,6 +272,10 @@ arrow::Status LocalPartitionWriter::stop(ShuffleWriterMetrics* metrics) { if (evictor_) { RETURN_NOT_OK(evictor_->flushCachedPayloads(pid, dataFileOs_.get())); } + // Compress and write partition buffers. + if (partitionBufferEvictor_) { + RETURN_NOT_OK(partitionBufferEvictor_->flushCachedPayloads(pid, dataFileOs_.get())); + } ARROW_ASSIGN_OR_RAISE(endInFinalFile, dataFileOs_->Tell()); if (endInFinalFile != startInFinalFile && options_->write_eos) { // Write EOS if any payload written. @@ -342,18 +346,25 @@ arrow::Status LocalPartitionWriter::evict( auto payloadType = (codec_ && evictType != Evictor::kFlush && numRows >= options_->compression_threshold) ? BlockPayload::Type::kCompressed : BlockPayload::Type::kUncompressed; - if (evictType == Evictor::kStop) { - evictType = Evictor::kCache; - if (payloadType == BlockPayload::Type::kCompressed) { - payloadType = BlockPayload::Type::kToBeCompressed; - } + if (evictType == Evictor::kStop && payloadType == BlockPayload::Type::kCompressed) { + payloadType = BlockPayload::Type::kToBeCompressed; } ARROW_ASSIGN_OR_RAISE( auto payload, BlockPayload::fromBuffers( payloadType, numRows, std::move(buffers), payloadPool_.get(), codec_ ? codec_.get() : nullptr, reuseBuffers)); - RETURN_NOT_OK(requestEvict(evictType)); - RETURN_NOT_OK(evictor_->evict(partitionId, std::move(payload))); + if (evictType == Evictor::kStop) { + // Create an empty SpillInfo because the last partition buffers won't be spilled. + if (!partitionBufferEvictor_) { + auto spillInfo = std::make_shared(""); + ARROW_ASSIGN_OR_RAISE( + partitionBufferEvictor_, LocalEvictor::create(numPartitions_, options_, spillInfo, Evictor::Type::kCache)); + } + RETURN_NOT_OK(partitionBufferEvictor_->evict(partitionId, std::move(payload))); + } else { + RETURN_NOT_OK(requestEvict(evictType)); + RETURN_NOT_OK(evictor_->evict(partitionId, std::move(payload))); + } return arrow::Status::OK(); } diff --git a/cpp/core/shuffle/LocalPartitionWriter.h b/cpp/core/shuffle/LocalPartitionWriter.h index 78dd32454eb8..698b239b77a5 100644 --- a/cpp/core/shuffle/LocalPartitionWriter.h +++ b/cpp/core/shuffle/LocalPartitionWriter.h @@ -103,6 +103,7 @@ class LocalPartitionWriter : public PartitionWriter { bool stopped_{false}; std::shared_ptr fs_{nullptr}; std::shared_ptr evictor_{nullptr}; + std::shared_ptr partitionBufferEvictor_{nullptr}; std::vector> spills_{}; // configured local dirs for spilled file