Skip to content

Commit

Permalink
cache partition buffer as payload in the last evict
Browse files Browse the repository at this point in the history
  • Loading branch information
marin-ma committed Dec 13, 2023
1 parent f5e3c80 commit 30c5e87
Show file tree
Hide file tree
Showing 11 changed files with 150 additions and 111 deletions.
40 changes: 38 additions & 2 deletions cpp/core/shuffle/BlockPayload.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,19 @@

#include "shuffle/BlockPayload.h"

namespace {
static const gluten::BlockPayload::Type kCompressedType = gluten::BlockPayload::kCompressed;
}
namespace gluten {

arrow::Result<std::unique_ptr<BlockPayload>> BlockPayload::fromBuffers(
BlockPayload::Type payloadType,
uint32_t numRows,
std::vector<std::shared_ptr<arrow::Buffer>> buffers,
ShuffleWriterOptions* options,
arrow::MemoryPool* pool,
arrow::util::Codec* codec,
bool reuseBuffers) {
if (codec && numRows >= options->compression_threshold) {
if (payloadType == BlockPayload::Type::kCompressed) {
// Compress.
// Compressed buffer layout: | buffer1 compressedLength | buffer1 uncompressedLength | buffer1 | ...
auto metadataLength = sizeof(int64_t) * 2 * buffers.size();
Expand Down Expand Up @@ -68,6 +71,9 @@ arrow::Result<std::unique_ptr<BlockPayload>> BlockPayload::fromBuffers(
}
return std::make_unique<BlockPayload>(Type::kUncompressed, numRows, std::move(copies));
}
if (payloadType == Type::kToBeCompressed) {
return std::make_unique<CompressibleBlockPayload>(Type::kUncompressed, numRows, std::move(buffers), pool, codec);
}
return std::make_unique<BlockPayload>(Type::kUncompressed, numRows, std::move(buffers));
}

Expand Down Expand Up @@ -193,4 +199,34 @@ arrow::Result<std::shared_ptr<arrow::Buffer>> BlockPayload::readCompressedBuffer
compressedLength, compressed->data(), uncompressedLength, const_cast<uint8_t*>(output->data())));
return output;
}

arrow::Status CompressibleBlockPayload::serialize(arrow::io::OutputStream* outputStream) {
RETURN_NOT_OK(outputStream->Write(&kCompressedType, sizeof(Type)));
RETURN_NOT_OK(outputStream->Write(&numRows_, sizeof(uint32_t)));
auto metadataLength = sizeof(int64_t) * 2 * buffers_.size();
int64_t totalCompressedLength =
std::accumulate(buffers_.begin(), buffers_.end(), 0LL, [&](auto sum, const auto& buffer) {
if (!buffer) {
return sum;
}
return sum + codec_->MaxCompressedLen(buffer->size(), buffer->data());
});
ARROW_ASSIGN_OR_RAISE(
std::shared_ptr<arrow::ResizableBuffer> compressed,
arrow::AllocateResizableBuffer(metadataLength + totalCompressedLength, pool_));
auto output = compressed->mutable_data();

// Compress buffers one by one.
for (auto& buffer : buffers_) {
auto availableLength = compressed->size() - (output - compressed->data());
RETURN_NOT_OK(compressBuffer(buffer, output, availableLength, codec_));
}

int64_t actualLength = output - compressed->data();
ARROW_RETURN_IF(actualLength < 0, arrow::Status::Invalid("Writing compressed buffer out of bound."));
RETURN_NOT_OK(compressed->Resize(actualLength));

RETURN_NOT_OK(outputStream->Write(std::move(compressed)));
return arrow::Status::OK();
}
} // namespace gluten
23 changes: 20 additions & 3 deletions cpp/core/shuffle/BlockPayload.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,15 +77,15 @@ arrow::Status compressBuffer(

class BlockPayload : public Payload {
public:
enum Type : int32_t { kCompressed, kUncompressed };
enum Type : int32_t { kCompressed, kUncompressed, kToBeCompressed };

BlockPayload(BlockPayload::Type type, uint32_t numRows, std::vector<std::shared_ptr<arrow::Buffer>> buffers)
: type_(type), numRows_(numRows), buffers_(std::move(buffers)) {}

static arrow::Result<std::unique_ptr<BlockPayload>> fromBuffers(
BlockPayload::Type payloadType,
uint32_t numRows,
std::vector<std::shared_ptr<arrow::Buffer>> buffers,
ShuffleWriterOptions* options,
arrow::MemoryPool* pool,
arrow::util::Codec* codec,
bool reuseBuffers);
Expand Down Expand Up @@ -145,10 +145,27 @@ class BlockPayload : public Payload {
return arrow::Status::OK();
}

private:
protected:
Type type_;
uint32_t numRows_;
std::vector<std::shared_ptr<arrow::Buffer>> buffers_;
};

class CompressibleBlockPayload : public BlockPayload {
public:
CompressibleBlockPayload(
BlockPayload::Type type,
uint32_t numRows,
std::vector<std::shared_ptr<arrow::Buffer>> buffers,
arrow::MemoryPool* pool,
arrow::util::Codec* codec)
: BlockPayload(type, numRows, std::move(buffers)), pool_(pool), codec_(codec) {}

arrow::Status serialize(arrow::io::OutputStream* outputStream);

private:
arrow::MemoryPool* pool_;
arrow::util::Codec* codec_;
};

} // namespace gluten
81 changes: 25 additions & 56 deletions cpp/core/shuffle/LocalPartitionWriter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,15 @@ class CacheEvictor final : public LocalPartitionWriter::LocalEvictor {
ScopedTimer timer(evictTime_);
auto payloads = std::move(partitionCachedPayload_[partitionId]);
partitionCachedPayload_.erase(partitionId);
int32_t numPayloads = 0;

ARROW_ASSIGN_OR_RAISE(auto startInFinalFile, os->Tell());
for (auto& payload : payloads) {
RETURN_NOT_OK(payload->serialize(os));
ARROW_ASSIGN_OR_RAISE(auto spillPos, os->Tell());
DEBUG_OUT << "Partition " << partitionId << " cached payload " << numPayloads++ << " of bytes "
<< spillPos - startInFinalFile << std::endl;
startInFinalFile = spillPos;
}
return arrow::Status::OK();
}
Expand Down Expand Up @@ -198,7 +205,6 @@ arrow::Status LocalPartitionWriter::clearResource() {
// When buffered_write = true, dataFileOs_->Close doesn't release underlying buffer.
dataFileOs_.reset();
spills_.clear();
cachedPartitionBuffers_.clear();
return arrow::Status::OK();
}

Expand Down Expand Up @@ -252,35 +258,20 @@ arrow::Status LocalPartitionWriter::stop(ShuffleWriterMetrics* metrics) {
writeTimer.start();

int64_t endInFinalFile = 0;
auto cachedPartitionBuffersIter = cachedPartitionBuffers_.begin();
// Iterator over pid.
for (auto pid = 0; pid < numPartitions_; ++pid) {
// Record start offset.
auto startInFinalFile = endInFinalFile;
// Iterator over all spilled files.
RETURN_NOT_OK(mergeSpills(pid));
#ifdef GLUTEN_PRINT_DEBUG
ARROW_ASSIGN_OR_RAISE(auto spillPos, dataFileOs_->Tell());
DEBUG_OUT << "Partition " << pid << " spilled from file of bytes " << spillPos - startInFinalFile << std::endl;
#endif
// Write cached batches.
if (evictor_) {
RETURN_NOT_OK(evictor_->flushCachedPayloads(pid, dataFileOs_.get()));
}
// Compress and write the last payload.
// Stop the timer to prevent counting the compression time into write time.
if (cachedPartitionBuffersIter != cachedPartitionBuffers_.end() &&
std::get<0>(*cachedPartitionBuffersIter) == pid) {
writeTimer.stop();
ARROW_ASSIGN_OR_RAISE(
auto payload,
BlockPayload::fromBuffers(
std::get<1>(*cachedPartitionBuffersIter),
std::move(std::get<2>(*cachedPartitionBuffersIter)),
options_,
payloadPool_.get(),
codec_ ? codec_.get() : nullptr,
false));
writeTimer.start();
RETURN_NOT_OK(payload->serialize(dataFileOs_.get()));
cachedPartitionBuffersIter++;
}
ARROW_ASSIGN_OR_RAISE(endInFinalFile, dataFileOs_->Tell());
if (endInFinalFile != startInFinalFile && options_->write_eos) {
// Write EOS if any payload written.
Expand All @@ -302,10 +293,6 @@ arrow::Status LocalPartitionWriter::stop(ShuffleWriterMetrics* metrics) {
}
RETURN_NOT_OK(fs_->DeleteFile(spill->spilledFile));
}
// Check if all partition buffers are merged.
ARROW_RETURN_IF(
cachedPartitionBuffersIter != cachedPartitionBuffers_.end(),
arrow::Status::Invalid("Not all partition buffers are merged."));

writeTimer.stop();
writeTime_ = writeTimer.realTimeUsed();
Expand Down Expand Up @@ -352,21 +339,21 @@ arrow::Status LocalPartitionWriter::evict(
bool reuseBuffers,
Evictor::Type evictType) {
rawPartitionLengths_[partitionId] += getBufferSize(buffers);
if (evictType == Evictor::Type::kStop) {
cachedPartitionBuffers_.emplace_back(partitionId, numRows, std::move(buffers));
} else {
ARROW_ASSIGN_OR_RAISE(
auto payload,
BlockPayload::fromBuffers(
numRows,
std::move(buffers),
options_,
payloadPool_.get(),
(codec_ && evictType == Evictor::kCache) ? codec_.get() : nullptr,
reuseBuffers));
RETURN_NOT_OK(requestEvict(evictType));
RETURN_NOT_OK(evictor_->evict(partitionId, std::move(payload)));
auto payloadType = (codec_ && evictType != Evictor::kFlush && numRows >= options_->compression_threshold)
? BlockPayload::Type::kCompressed
: BlockPayload::Type::kUncompressed;
if (evictType == Evictor::kStop) {
evictType = Evictor::kCache;
if (payloadType == BlockPayload::Type::kCompressed) {
payloadType = BlockPayload::Type::kToBeCompressed;
}
}
ARROW_ASSIGN_OR_RAISE(
auto payload,
BlockPayload::fromBuffers(
payloadType, numRows, std::move(buffers), payloadPool_.get(), codec_ ? codec_.get() : nullptr, reuseBuffers));
RETURN_NOT_OK(requestEvict(evictType));
RETURN_NOT_OK(evictor_->evict(partitionId, std::move(payload)));
return arrow::Status::OK();
}

Expand All @@ -380,22 +367,4 @@ arrow::Status LocalPartitionWriter::populateMetrics(ShuffleWriterMetrics* metric
metrics->rawPartitionLengths = std::move(rawPartitionLengths_);
return arrow::Status::OK();
}

arrow::Status LocalPartitionWriter::evictFixedSize(int64_t size, int64_t* actual) {
auto beforeShrink = options_->memory_pool->bytes_allocated();
for (auto& item : cachedPartitionBuffers_) {
auto& buffers = std::get<2>(item);
for (auto& buffer : buffers) {
if (!buffer) {
continue;
}
if (auto parent = std::dynamic_pointer_cast<arrow::ResizableBuffer>(buffer->parent())) {
RETURN_NOT_OK(parent->Resize(buffer->size()));
}
}
}
*actual = beforeShrink - options_->memory_pool->bytes_allocated();
return arrow::Status::OK();
}

} // namespace gluten
4 changes: 0 additions & 4 deletions cpp/core/shuffle/LocalPartitionWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,6 @@ class LocalPartitionWriter : public PartitionWriter {
/// it will shrink partition buffers to free more memory.
arrow::Status stop(ShuffleWriterMetrics* metrics) override;

arrow::Status evictFixedSize(int64_t size, int64_t* actual) override;

class LocalEvictor;

private:
Expand Down Expand Up @@ -116,7 +114,5 @@ class LocalPartitionWriter : public PartitionWriter {
int64_t totalBytesWritten_{0};
std::vector<int64_t> partitionLengths_;
std::vector<int64_t> rawPartitionLengths_;
// Partition id, num rows, partition buffers.
std::vector<std::tuple<uint32_t, uint32_t, std::vector<std::shared_ptr<arrow::Buffer>>>> cachedPartitionBuffers_;
};
} // namespace gluten
2 changes: 1 addition & 1 deletion cpp/core/shuffle/PartitionWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class Evictor {
int64_t evictTime_{0};
};

class PartitionWriter : public Evictable {
class PartitionWriter {
public:
PartitionWriter(uint32_t numPartitions, ShuffleWriterOptions* options)
: numPartitions_(numPartitions), options_(options) {
Expand Down
5 changes: 5 additions & 0 deletions cpp/core/shuffle/Utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -332,3 +332,8 @@ arrow::Status gluten::writeEos(arrow::io::OutputStream* os, int64_t* bytes) {
*bytes = kSizeOfEos;
return arrow::Status::OK();
}

std::shared_ptr<arrow::Buffer> gluten::zeroLengthNullBuffer() {
static std::shared_ptr<arrow::Buffer> kNullBuffer = std::make_shared<arrow::Buffer>(nullptr, 0);
return kNullBuffer;
}
2 changes: 2 additions & 0 deletions cpp/core/shuffle/Utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,4 +75,6 @@ arrow::Result<std::shared_ptr<arrow::RecordBatch>> makeUncompressedRecordBatch(
const std::shared_ptr<arrow::Schema> writeSchema,
arrow::MemoryPool* pool);

std::shared_ptr<arrow::Buffer> zeroLengthNullBuffer();

} // namespace gluten
9 changes: 3 additions & 6 deletions cpp/core/shuffle/rss/CelebornPartitionWriter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -84,16 +84,13 @@ arrow::Status CelebornPartitionWriter::evict(
Evictor::Type evictType) {
rawPartitionLengths_[partitionId] += getBufferSize(buffers);
ScopedTimer timer(evictTime_);
auto payloadType = (codec_ && numRows >= options_->compression_threshold) ? BlockPayload::Type::kCompressed
: BlockPayload::Type::kUncompressed;
ARROW_ASSIGN_OR_RAISE(
auto payload,
BlockPayload::fromBuffers(
numRows, std::move(buffers), options_, payloadPool_.get(), codec_ ? codec_.get() : nullptr, false));
payloadType, numRows, std::move(buffers), payloadPool_.get(), codec_ ? codec_.get() : nullptr, false));
RETURN_NOT_OK(evictor_->evict(partitionId, std::move(payload)));
return arrow::Status::OK();
}

arrow::Status CelebornPartitionWriter::evictFixedSize(int64_t size, int64_t* actual) {
*actual = 0;
return arrow::Status::OK();
}
} // namespace gluten
2 changes: 0 additions & 2 deletions cpp/core/shuffle/rss/CelebornPartitionWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,6 @@ class CelebornPartitionWriter final : public RemotePartitionWriter {

arrow::Status stop(ShuffleWriterMetrics* metrics) override;

arrow::Status evictFixedSize(int64_t size, int64_t* actual) override;

private:
void init();

Expand Down
Loading

0 comments on commit 30c5e87

Please sign in to comment.