Skip to content

Commit

Permalink
reduce spill
Browse files Browse the repository at this point in the history
  • Loading branch information
marin-ma committed Jul 30, 2024
1 parent e8dd172 commit 7e67ecd
Show file tree
Hide file tree
Showing 6 changed files with 117 additions and 73 deletions.
149 changes: 84 additions & 65 deletions cpp/core/shuffle/LocalPartitionWriter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@ class LocalPartitionWriter::LocalSpiller {
public:
LocalSpiller(
std::shared_ptr<arrow::io::OutputStream> os,
const std::string& spillFile,
std::string spillFile,
uint32_t compressionThreshold,
arrow::MemoryPool* pool,
arrow::util::Codec* codec)
: os_(os),
spillFile_(spillFile),
spillFile_(std::move(spillFile)),
compressionThreshold_(compressionThreshold),
pool_(pool),
codec_(codec),
Expand Down Expand Up @@ -69,28 +69,26 @@ class LocalPartitionWriter::LocalSpiller {
return arrow::Status::OK();
}

arrow::Result<std::shared_ptr<Spill>> finish() {
if (finished_) {
return arrow::Status::Invalid("Calling toBlockPayload() on a finished SpillEvictor.");
}
arrow::Result<std::shared_ptr<Spill>> finish(bool close) {
ARROW_RETURN_IF(finished_, arrow::Status::Invalid("Calling finish() on a finished LocalSpiller."));
ARROW_RETURN_IF(os_->closed(), arrow::Status::Invalid("Spill file os has been closed."));

finished_ = true;
RETURN_NOT_OK(os_->Close());
diskSpill_->setSpillFile(std::move(spillFile_));
if (close) {
RETURN_NOT_OK(os_->Close());
}
// std::cout << "Finish spill. spillTime_: " << spillTime_ << ", compressTime_: " << compressTime_
// << ", spillFile: " << spillFile_ << std::endl;
diskSpill_->setSpillFile(spillFile_);
diskSpill_->setSpillTime(spillTime_);
diskSpill_->setCompressTime(compressTime_);
return std::move(diskSpill_);
}

bool finished() const {
return finished_;
}

int64_t getSpillTime() const {
return spillTime_;
}

int64_t getCompressTime() const {
return compressTime_;
}

private:
std::shared_ptr<arrow::io::OutputStream> os_;
std::string spillFile_;
Expand Down Expand Up @@ -442,9 +440,32 @@ arrow::Status LocalPartitionWriter::stop(ShuffleWriterMetrics* metrics) {
}
stopped_ = true;

RETURN_NOT_OK(finishSpill());
if (useSpillFileAsDataFile_) {
RETURN_NOT_OK(finishSpill(false));
// The last spill has been written to data file.
auto spill = std::move(spills_.back());
spills_.pop_back();

// Merge the remaining partitions from spills.
if (spills_.size() > 0) {
for (auto pid = lastEvictPid_ + 1; pid < numPartitions_; ++pid) {
auto bytesEvicted = totalBytesEvicted_;
RETURN_NOT_OK(mergeSpills(pid));
partitionLengths_[pid] = totalBytesEvicted_ - bytesEvicted;
}
// std::cout << "Stop and merge spills from " << lastEvictPid_ + 1 << " to " << numPartitions_ - 1
// << ", num spills: " << spills_.size() << std::endl;
}

if (!useSpillFileAsDataFile_) {
for (auto pid = 0; pid < numPartitions_; ++pid) {
while (auto payload = spill->nextPayload(pid)) {
partitionLengths_[pid] += payload->rawSize();
}
}
writeTime_ = spill->spillTime();
compressTime_ += spill->compressTime();
} else {
RETURN_NOT_OK(finishSpill(true));
// Open final data file.
// If options_.bufferedWrite is set, it will acquire 16KB memory that can trigger spill.
RETURN_NOT_OK(openDataFile());
Expand Down Expand Up @@ -473,33 +494,25 @@ arrow::Status LocalPartitionWriter::stop(ShuffleWriterMetrics* metrics) {
ARROW_ASSIGN_OR_RAISE(endInFinalFile, dataFileOs_->Tell());
partitionLengths_[pid] = endInFinalFile - startInFinalFile;
}

for (const auto& spill : spills_) {
for (auto pid = 0; pid < numPartitions_; ++pid) {
if (spill->hasNextPayload(pid)) {
return arrow::Status::Invalid("Merging from spill is not exhausted.");
}
}
}

ARROW_ASSIGN_OR_RAISE(totalBytesWritten_, dataFileOs_->Tell());

// Close Final file. Clear buffered resources.
RETURN_NOT_OK(clearResource());
} else {
auto spill = std::move(spills_.back());
}

// Check all spills are merged.
auto s = 0;
for (const auto& spill : spills_) {
compressTime_ += spill->compressTime();
spillTime_ += spill->spillTime();
for (auto pid = 0; pid < numPartitions_; ++pid) {
uint64_t length = 0;
while (auto payload = spill->nextPayload(pid)) {
length += payload->rawSize();
if (spill->hasNextPayload(pid)) {
return arrow::Status::Invalid(
"Merging from spill " + std::to_string(s) + " is not exhausted. pid: " + std::to_string(pid));
}
partitionLengths_[pid] = length;
}
totalBytesWritten_ = std::filesystem::file_size(dataFile_);
writeTime_ = spillTime_;
spillTime_ = 0;
DLOG(INFO) << "Use spill file as data file: " << dataFile_;
++s;
}
spills_.clear();

// Populate shuffle writer metrics.
RETURN_NOT_OK(populateMetrics(metrics));
return arrow::Status::OK();
Expand All @@ -508,27 +521,29 @@ arrow::Status LocalPartitionWriter::stop(ShuffleWriterMetrics* metrics) {
arrow::Status LocalPartitionWriter::requestSpill(bool isFinal) {
if (!spiller_ || spiller_->finished()) {
std::string spillFile;
if (isFinal && useSpillFileAsDataFile()) {
std::shared_ptr<arrow::io::OutputStream> os;
if (isFinal) {
RETURN_NOT_OK(openDataFile());
spillFile = dataFile_;
os = dataFileOs_;
useSpillFileAsDataFile_ = true;
} else {
ARROW_ASSIGN_OR_RAISE(spillFile, createTempShuffleFile(nextSpilledFileDir()));
ARROW_ASSIGN_OR_RAISE(auto raw, arrow::io::FileOutputStream::Open(spillFile, true));
ARROW_ASSIGN_OR_RAISE(os, arrow::io::BufferedOutputStream::Create(16384, pool_, raw));
}
ARROW_ASSIGN_OR_RAISE(auto raw, arrow::io::FileOutputStream::Open(spillFile, true));
ARROW_ASSIGN_OR_RAISE(auto os, arrow::io::BufferedOutputStream::Create(16384, pool_, raw));
spiller_ = std::make_unique<LocalSpiller>(
os, std::move(spillFile), options_.compressionThreshold, payloadPool_.get(), codec_.get());
}
return arrow::Status::OK();
}

arrow::Status LocalPartitionWriter::finishSpill() {
arrow::Status LocalPartitionWriter::finishSpill(bool close) {
// Finish the spiller. No compression, no spill.
if (spiller_ && !spiller_->finished()) {
auto spiller = std::move(spiller_);
spills_.emplace_back();
ARROW_ASSIGN_OR_RAISE(spills_.back(), spiller->finish());
spillTime_ += spiller->getSpillTime();
compressTime_ += spiller->getCompressTime();
ARROW_ASSIGN_OR_RAISE(spills_.back(), spiller->finish(close));
}
return arrow::Status::OK();
}
Expand All @@ -543,18 +558,31 @@ arrow::Status LocalPartitionWriter::evict(
rawPartitionLengths_[partitionId] += inMemoryPayload->getBufferSize();

if (evictType == Evict::kSortSpill) {
if (partitionId < lastEvictPid_) {
RETURN_NOT_OK(finishSpill());
if (lastEvictPid_ != -1 && (partitionId < lastEvictPid_ || (isFinal && !dataFileOs_))) {
lastEvictPid_ = -1;
RETURN_NOT_OK(finishSpill(true));
}
lastEvictPid_ = partitionId;

RETURN_NOT_OK(requestSpill(isFinal));

auto payloadType = codec_ ? Payload::Type::kCompressed : Payload::Type::kUncompressed;
ARROW_ASSIGN_OR_RAISE(
auto payload,
inMemoryPayload->toBlockPayload(payloadType, payloadPool_.get(), codec_ ? codec_.get() : nullptr));
RETURN_NOT_OK(spiller_->spill(partitionId, std::move(payload)));
if (!isFinal) {
RETURN_NOT_OK(spiller_->spill(partitionId, std::move(payload)));
} else {
if (spills_.size() > 0) {
for (auto pid = lastEvictPid_ + 1; pid <= partitionId; ++pid) {
auto bytesEvicted = totalBytesEvicted_;
RETURN_NOT_OK(mergeSpills(pid));
partitionLengths_[pid] = totalBytesEvicted_ - bytesEvicted;
}
// std::cout << "Merge spills from " << lastEvictPid_ + 1 << " to " << partitionId
// << ", num spills: " << spills_.size() << std::endl;
}
RETURN_NOT_OK(spiller_->spill(partitionId, std::move(payload)));
}
lastEvictPid_ = partitionId;
return arrow::Status::OK();
}

Expand Down Expand Up @@ -586,8 +614,8 @@ arrow::Status LocalPartitionWriter::evict(
arrow::Status LocalPartitionWriter::evict(uint32_t partitionId, std::unique_ptr<BlockPayload> blockPayload, bool stop) {
rawPartitionLengths_[partitionId] += blockPayload->rawSize();

if (partitionId < lastEvictPid_) {
RETURN_NOT_OK(finishSpill());
if (lastEvictPid_ != -1 && partitionId < lastEvictPid_) {
RETURN_NOT_OK(finishSpill(true));
}
lastEvictPid_ = partitionId;

Expand All @@ -598,7 +626,7 @@ arrow::Status LocalPartitionWriter::evict(uint32_t partitionId, std::unique_ptr<

arrow::Status LocalPartitionWriter::reclaimFixedSize(int64_t size, int64_t* actual) {
// Finish last spiller.
RETURN_NOT_OK(finishSpill());
RETURN_NOT_OK(finishSpill(true));

int64_t reclaimed = 0;
// Reclaim memory from payloadCache.
Expand Down Expand Up @@ -629,7 +657,7 @@ arrow::Status LocalPartitionWriter::reclaimFixedSize(int64_t size, int64_t* actu
// This is not accurate. When the evicted partition buffers are not copied, the merged ones
// are resized from the original buffers thus allocated from partitionBufferPool.
reclaimed += beforeSpill - payloadPool_->bytes_allocated();
RETURN_NOT_OK(finishSpill());
RETURN_NOT_OK(finishSpill(true));
}
*actual = reclaimed;
return arrow::Status::OK();
Expand All @@ -646,18 +674,9 @@ arrow::Status LocalPartitionWriter::populateMetrics(ShuffleWriterMetrics* metric
metrics->totalEvictTime += spillTime_;
metrics->totalWriteTime += writeTime_;
metrics->totalBytesEvicted += totalBytesEvicted_;
metrics->totalBytesWritten += totalBytesWritten_;
metrics->totalBytesWritten += std::filesystem::file_size(dataFile_);
metrics->partitionLengths = std::move(partitionLengths_);
metrics->rawPartitionLengths = std::move(rawPartitionLengths_);
return arrow::Status::OK();
}

bool LocalPartitionWriter::useSpillFileAsDataFile() {
if (!payloadCache_ && !merger_ && !spiller_ && spills_.size() == 0) {
useSpillFileAsDataFile_ = true;
return true;
}
return false;
}

} // namespace gluten
7 changes: 2 additions & 5 deletions cpp/core/shuffle/LocalPartitionWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class LocalPartitionWriter : public PartitionWriter {

arrow::Status requestSpill(bool isFinal);

arrow::Status finishSpill();
arrow::Status finishSpill(bool close);

std::string nextSpilledFileDir();

Expand All @@ -95,8 +95,6 @@ class LocalPartitionWriter : public PartitionWriter {

arrow::Status populateMetrics(ShuffleWriterMetrics* metrics);

bool useSpillFileAsDataFile();

std::string dataFile_;
std::vector<std::string> localDirs_;

Expand All @@ -113,10 +111,9 @@ class LocalPartitionWriter : public PartitionWriter {
std::shared_ptr<arrow::io::OutputStream> dataFileOs_;

int64_t totalBytesEvicted_{0};
int64_t totalBytesWritten_{0};
std::vector<int64_t> partitionLengths_;
std::vector<int64_t> rawPartitionLengths_;

uint32_t lastEvictPid_{0};
int32_t lastEvictPid_{-1};
};
} // namespace gluten
16 changes: 16 additions & 0 deletions cpp/core/shuffle/Spill.cc
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,23 @@ void Spill::setSpillFile(const std::string& spillFile) {
spillFile_ = spillFile;
}

void Spill::setSpillTime(int64_t spillTime) {
spillTime_ = spillTime;
}

void Spill::setCompressTime(int64_t compressTime) {
compressTime_ = compressTime;
}

std::string Spill::spillFile() const {
return spillFile_;
}

int64_t Spill::spillTime() const {
return spillTime_;
}

int64_t Spill::compressTime() const {
return compressTime_;
}
} // namespace gluten
10 changes: 10 additions & 0 deletions cpp/core/shuffle/Spill.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,16 @@ class Spill final {

void setSpillFile(const std::string& spillFile);

void setSpillTime(int64_t spillTime);

void setCompressTime(int64_t compressTime);

std::string spillFile() const;

int64_t spillTime() const;

int64_t compressTime() const;

private:
struct PartitionPayload {
uint32_t partitionId{};
Expand All @@ -65,6 +73,8 @@ class Spill final {
std::list<PartitionPayload> partitionPayloads_{};
std::shared_ptr<arrow::io::MemoryMappedFile> inputStream_{};
std::string spillFile_;
int64_t spillTime_;
int64_t compressTime_;

arrow::io::InputStream* rawIs_;

Expand Down
6 changes: 4 additions & 2 deletions cpp/velox/shuffle/VeloxSortShuffleWriter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -205,17 +205,20 @@ void VeloxSortShuffleWriter::insertRows(facebook::velox::row::CompactRow& row, u
}
}

arrow::Status VeloxSortShuffleWriter::maybeSpill(int32_t nextRows) {
arrow::Status VeloxSortShuffleWriter::maybeSpill(uint32_t nextRows) {
if ((uint64_t)offset_ + nextRows > std::numeric_limits<uint32_t>::max()) {
RETURN_NOT_OK(evictAllPartitions());
}
return arrow::Status::OK();
}

arrow::Status VeloxSortShuffleWriter::evictAllPartitions() {
VELOX_CHECK(offset_ > 0);
EvictGuard evictGuard{evictState_};

auto numRecords = offset_;
// offset_ is used for checking spillable data.
offset_ = 0;
int32_t begin = 0;
{
ScopedTimer timer(&sortTime_);
Expand Down Expand Up @@ -257,7 +260,6 @@ arrow::Status VeloxSortShuffleWriter::evictAllPartitions() {
pageCursor_ = 0;

// Reset and reallocate array_ to minimal size. Allocate array_ can trigger spill.
offset_ = 0;
initArray();
}
return arrow::Status::OK();
Expand Down
2 changes: 1 addition & 1 deletion cpp/velox/shuffle/VeloxSortShuffleWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ class VeloxSortShuffleWriter final : public VeloxShuffleWriter {

void insertRows(facebook::velox::row::CompactRow& row, uint32_t offset, uint32_t rows);

arrow::Status maybeSpill(int32_t nextRows);
arrow::Status maybeSpill(uint32_t nextRows);

arrow::Status evictAllPartitions();

Expand Down

0 comments on commit 7e67ecd

Please sign in to comment.