Skip to content

Commit

Permalink
separate last payload
Browse files Browse the repository at this point in the history
  • Loading branch information
marin-ma committed Dec 13, 2023
1 parent 30c5e87 commit 0788cbc
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 7 deletions.
25 changes: 18 additions & 7 deletions cpp/core/shuffle/LocalPartitionWriter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,10 @@ arrow::Status LocalPartitionWriter::stop(ShuffleWriterMetrics* metrics) {
if (evictor_) {
RETURN_NOT_OK(evictor_->flushCachedPayloads(pid, dataFileOs_.get()));
}
// Compress and write partition buffers.
if (partitionBufferEvictor_) {
RETURN_NOT_OK(partitionBufferEvictor_->flushCachedPayloads(pid, dataFileOs_.get()));
}
ARROW_ASSIGN_OR_RAISE(endInFinalFile, dataFileOs_->Tell());
if (endInFinalFile != startInFinalFile && options_->write_eos) {
// Write EOS if any payload written.
Expand Down Expand Up @@ -342,18 +346,25 @@ arrow::Status LocalPartitionWriter::evict(
auto payloadType = (codec_ && evictType != Evictor::kFlush && numRows >= options_->compression_threshold)
? BlockPayload::Type::kCompressed
: BlockPayload::Type::kUncompressed;
if (evictType == Evictor::kStop) {
evictType = Evictor::kCache;
if (payloadType == BlockPayload::Type::kCompressed) {
payloadType = BlockPayload::Type::kToBeCompressed;
}
if (evictType == Evictor::kStop && payloadType == BlockPayload::Type::kCompressed) {
payloadType = BlockPayload::Type::kToBeCompressed;
}
ARROW_ASSIGN_OR_RAISE(
auto payload,
BlockPayload::fromBuffers(
payloadType, numRows, std::move(buffers), payloadPool_.get(), codec_ ? codec_.get() : nullptr, reuseBuffers));
RETURN_NOT_OK(requestEvict(evictType));
RETURN_NOT_OK(evictor_->evict(partitionId, std::move(payload)));
if (evictType == Evictor::kStop) {
// Create an empty SpillInfo because the last partition buffers won't be spilled.
if (!partitionBufferEvictor_) {
auto spillInfo = std::make_shared<SpillInfo>("");
ARROW_ASSIGN_OR_RAISE(
partitionBufferEvictor_, LocalEvictor::create(numPartitions_, options_, spillInfo, Evictor::Type::kCache));
}
RETURN_NOT_OK(partitionBufferEvictor_->evict(partitionId, std::move(payload)));
} else {
RETURN_NOT_OK(requestEvict(evictType));
RETURN_NOT_OK(evictor_->evict(partitionId, std::move(payload)));
}
return arrow::Status::OK();
}

Expand Down
1 change: 1 addition & 0 deletions cpp/core/shuffle/LocalPartitionWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ class LocalPartitionWriter : public PartitionWriter {
bool stopped_{false};
std::shared_ptr<arrow::fs::LocalFileSystem> fs_{nullptr};
std::shared_ptr<LocalEvictor> evictor_{nullptr};
std::shared_ptr<LocalEvictor> partitionBufferEvictor_{nullptr};
std::vector<std::shared_ptr<SpillInfo>> spills_{};

// configured local dirs for spilled file
Expand Down

0 comments on commit 0788cbc

Please sign in to comment.