Skip to content

Commit

Permalink
pre-alloc and reused compressed buffer
Browse files Browse the repository at this point in the history
  • Loading branch information
marin-ma committed Aug 15, 2024
1 parent fc7f9cd commit f017b2f
Show file tree
Hide file tree
Showing 14 changed files with 200 additions and 113 deletions.
90 changes: 44 additions & 46 deletions cpp/core/shuffle/LocalPartitionWriter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -385,17 +385,15 @@ std::string LocalPartitionWriter::nextSpilledFileDir() {
return spilledFileDir;
}

arrow::Status LocalPartitionWriter::openDataFile() {
// open data file output stream
arrow::Result<std::shared_ptr<arrow::io::OutputStream>> LocalPartitionWriter::openFile(const std::string& file) {
std::shared_ptr<arrow::io::FileOutputStream> fout;
ARROW_ASSIGN_OR_RAISE(fout, arrow::io::FileOutputStream::Open(dataFile_));
ARROW_ASSIGN_OR_RAISE(fout, arrow::io::FileOutputStream::Open(file));
if (options_.bufferedWrite) {
// Output stream buffer is neither partition buffer memory nor ipc memory.
ARROW_ASSIGN_OR_RAISE(dataFileOs_, arrow::io::BufferedOutputStream::Create(16384, pool_, fout));
} else {
dataFileOs_ = fout;
// The 16k bytes is a temporary allocation and will be freed with file close.
// Use default memory pool and count treat the memory as executor memory overhead to avoid unnecessary spill.
return arrow::io::BufferedOutputStream::Create(16384, arrow::default_memory_pool(), fout);
}
return arrow::Status::OK();
return fout;
}

arrow::Status LocalPartitionWriter::clearResource() {
Expand Down Expand Up @@ -467,9 +465,7 @@ arrow::Status LocalPartitionWriter::stop(ShuffleWriterMetrics* metrics) {
compressTime_ += spill->compressTime();
} else {
RETURN_NOT_OK(finishSpill(true));
// Open final data file.
// If options_.bufferedWrite is set, it will acquire 16KB memory that can trigger spill.
RETURN_NOT_OK(openDataFile());
ARROW_ASSIGN_OR_RAISE(dataFileOs_, openFile(dataFile_));

int64_t endInFinalFile = 0;
DLOG(INFO) << "LocalPartitionWriter stopped. Total spills: " << spills_.size();
Expand Down Expand Up @@ -523,14 +519,13 @@ arrow::Status LocalPartitionWriter::requestSpill(bool isFinal) {
std::string spillFile;
std::shared_ptr<arrow::io::OutputStream> os;
if (isFinal) {
RETURN_NOT_OK(openDataFile());
ARROW_ASSIGN_OR_RAISE(dataFileOs_, openFile(dataFile_));
spillFile = dataFile_;
os = dataFileOs_;
useSpillFileAsDataFile_ = true;
} else {
ARROW_ASSIGN_OR_RAISE(spillFile, createTempShuffleFile(nextSpilledFileDir()));
ARROW_ASSIGN_OR_RAISE(auto raw, arrow::io::FileOutputStream::Open(spillFile, true));
ARROW_ASSIGN_OR_RAISE(os, arrow::io::BufferedOutputStream::Create(16384, pool_, raw));
ARROW_ASSIGN_OR_RAISE(os, openFile(spillFile));
}
spiller_ = std::make_unique<LocalSpiller>(
os, std::move(spillFile), options_.compressionThreshold, payloadPool_.get(), codec_.get());
Expand All @@ -548,42 +543,14 @@ arrow::Status LocalPartitionWriter::finishSpill(bool close) {
return arrow::Status::OK();
}

arrow::Status LocalPartitionWriter::evict(
arrow::Status LocalPartitionWriter::hashEvict(
uint32_t partitionId,
std::unique_ptr<InMemoryPayload> inMemoryPayload,
Evict::type evictType,
bool reuseBuffers,
bool hasComplexType,
bool isFinal) {
bool hasComplexType) {
rawPartitionLengths_[partitionId] += inMemoryPayload->rawSize();

if (evictType == Evict::kSortSpill) {
if (lastEvictPid_ != -1 && (partitionId < lastEvictPid_ || (isFinal && !dataFileOs_))) {
lastEvictPid_ = -1;
RETURN_NOT_OK(finishSpill(true));
}
RETURN_NOT_OK(requestSpill(isFinal));

auto payloadType = codec_ ? Payload::Type::kCompressed : Payload::Type::kUncompressed;
ARROW_ASSIGN_OR_RAISE(
auto payload,
inMemoryPayload->toBlockPayload(payloadType, payloadPool_.get(), codec_ ? codec_.get() : nullptr));
if (!isFinal) {
RETURN_NOT_OK(spiller_->spill(partitionId, std::move(payload)));
} else {
if (spills_.size() > 0) {
for (auto pid = lastEvictPid_ + 1; pid <= partitionId; ++pid) {
auto bytesEvicted = totalBytesEvicted_;
RETURN_NOT_OK(mergeSpills(pid));
partitionLengths_[pid] = totalBytesEvicted_ - bytesEvicted;
}
}
RETURN_NOT_OK(spiller_->spill(partitionId, std::move(payload)));
}
lastEvictPid_ = partitionId;
return arrow::Status::OK();
}

if (evictType == Evict::kSpill) {
RETURN_NOT_OK(requestSpill(false));
ARROW_ASSIGN_OR_RAISE(
Expand All @@ -609,6 +576,38 @@ arrow::Status LocalPartitionWriter::evict(
return arrow::Status::OK();
}

arrow::Status LocalPartitionWriter::sortEvict(
uint32_t partitionId,
std::unique_ptr<InMemoryPayload> inMemoryPayload,
std::shared_ptr<arrow::Buffer> compressed,
bool isFinal) {
if (lastEvictPid_ != -1 && (partitionId < lastEvictPid_ || (isFinal && !dataFileOs_))) {
lastEvictPid_ = -1;
RETURN_NOT_OK(finishSpill(true));
}
RETURN_NOT_OK(requestSpill(isFinal));

auto payloadType = codec_ ? Payload::Type::kCompressed : Payload::Type::kUncompressed;
ARROW_ASSIGN_OR_RAISE(
auto payload,
inMemoryPayload->toBlockPayload(
payloadType, payloadPool_.get(), codec_ ? codec_.get() : nullptr, std::move(compressed)));
if (!isFinal) {
RETURN_NOT_OK(spiller_->spill(partitionId, std::move(payload)));
} else {
if (spills_.size() > 0) {
for (auto pid = lastEvictPid_ + 1; pid <= partitionId; ++pid) {
auto bytesEvicted = totalBytesEvicted_;
RETURN_NOT_OK(mergeSpills(pid));
partitionLengths_[pid] = totalBytesEvicted_ - bytesEvicted;
}
}
RETURN_NOT_OK(spiller_->spill(partitionId, std::move(payload)));
}
lastEvictPid_ = partitionId;
return arrow::Status::OK();
}

// FIXME: Remove this code path for local partition writer.
arrow::Status LocalPartitionWriter::evict(uint32_t partitionId, std::unique_ptr<BlockPayload> blockPayload, bool stop) {
rawPartitionLengths_[partitionId] += blockPayload->rawSize();
Expand Down Expand Up @@ -644,8 +643,7 @@ arrow::Status LocalPartitionWriter::reclaimFixedSize(int64_t size, int64_t* actu
if (payloadCache_ && payloadCache_->canSpill()) {
auto beforeSpill = payloadPool_->bytes_allocated();
ARROW_ASSIGN_OR_RAISE(auto spillFile, createTempShuffleFile(nextSpilledFileDir()));
ARROW_ASSIGN_OR_RAISE(auto raw, arrow::io::FileOutputStream::Open(spillFile, true));
ARROW_ASSIGN_OR_RAISE(auto os, arrow::io::BufferedOutputStream::Create(16384, pool_, raw));
ARROW_ASSIGN_OR_RAISE(auto os, openFile(spillFile));
spills_.emplace_back();
ARROW_ASSIGN_OR_RAISE(
spills_.back(),
Expand Down
11 changes: 8 additions & 3 deletions cpp/core/shuffle/LocalPartitionWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,17 @@ class LocalPartitionWriter : public PartitionWriter {
const std::string& dataFile,
const std::vector<std::string>& localDirs);

arrow::Status evict(
arrow::Status hashEvict(
uint32_t partitionId,
std::unique_ptr<InMemoryPayload> inMemoryPayload,
Evict::type evictType,
bool reuseBuffers,
bool hasComplexType,
bool hasComplexType) override;

arrow::Status sortEvict(
uint32_t partitionId,
std::unique_ptr<InMemoryPayload> inMemoryPayload,
std::shared_ptr<arrow::Buffer> compressed,
bool isFinal) override;

arrow::Status evict(uint32_t partitionId, std::unique_ptr<BlockPayload> blockPayload, bool stop) override;
Expand Down Expand Up @@ -87,7 +92,7 @@ class LocalPartitionWriter : public PartitionWriter {

std::string nextSpilledFileDir();

arrow::Status openDataFile();
arrow::Result<std::shared_ptr<arrow::io::OutputStream>> openFile(const std::string& file);

arrow::Status mergeSpills(uint32_t partitionId);

Expand Down
21 changes: 18 additions & 3 deletions cpp/core/shuffle/PartitionWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
namespace gluten {

struct Evict {
enum type { kCache, kSpill, kSortSpill };
enum type { kCache, kSpill };
};

class PartitionWriter : public Reclaimable {
Expand All @@ -42,14 +42,29 @@ class PartitionWriter : public Reclaimable {
virtual arrow::Status stop(ShuffleWriterMetrics* metrics) = 0;

/// Evict buffers for `partitionId` partition.
virtual arrow::Status evict(
virtual arrow::Status hashEvict(
uint32_t partitionId,
std::unique_ptr<InMemoryPayload> inMemoryPayload,
Evict::type evictType,
bool reuseBuffers,
bool hasComplexType,
bool hasComplexType) = 0;

virtual arrow::Status sortEvict(
uint32_t partitionId,
std::unique_ptr<InMemoryPayload> inMemoryPayload,
std::shared_ptr<arrow::Buffer> compressed,
bool isFinal) = 0;

arrow::Result<std::shared_ptr<arrow::Buffer>> getCompressedBuffer(
const std::vector<std::shared_ptr<arrow::Buffer>>& buffers,
arrow::MemoryPool* pool) {
if (!codec_) {
return nullptr;
}
auto compressedLength = BlockPayload::maxCompressedLength(buffers, codec_.get());
return arrow::AllocateBuffer(compressedLength, pool);
}

virtual arrow::Status evict(uint32_t partitionId, std::unique_ptr<BlockPayload> blockPayload, bool stop) = 0;

uint64_t cachedPayloadSize() {
Expand Down
73 changes: 46 additions & 27 deletions cpp/core/shuffle/Payload.cc
Original file line number Diff line number Diff line change
Expand Up @@ -186,45 +186,45 @@ arrow::Result<std::unique_ptr<BlockPayload>> BlockPayload::fromBuffers(
std::vector<std::shared_ptr<arrow::Buffer>> buffers,
const std::vector<bool>* isValidityBuffer,
arrow::MemoryPool* pool,
arrow::util::Codec* codec) {
arrow::util::Codec* codec,
std::shared_ptr<arrow::Buffer> compressed) {
if (payloadType == Payload::Type::kCompressed) {
Timer compressionTime;
compressionTime.start();
// Compress.
// Compressed buffer layout: | buffer1 compressedLength | buffer1 uncompressedLength | buffer1 | ...
const auto metadataLength = sizeof(int64_t) * 2 * buffers.size();
int64_t totalCompressedLength =
std::accumulate(buffers.begin(), buffers.end(), 0LL, [&](auto sum, const auto& buffer) {
if (!buffer) {
return sum;
}
return sum + codec->MaxCompressedLen(buffer->size(), buffer->data());
});
const auto maxCompressedLength = metadataLength + totalCompressedLength;
ARROW_ASSIGN_OR_RAISE(
std::shared_ptr<arrow::ResizableBuffer> compressed, arrow::AllocateResizableBuffer(maxCompressedLength, pool));

auto output = compressed->mutable_data();
auto maxLength = maxCompressedLength(buffers, codec);
std::shared_ptr<arrow::Buffer> compressedBuffer;
uint8_t* output;
if (compressed) {
ARROW_RETURN_IF(
compressed->size() < maxLength,
arrow::Status::Invalid(
"Compressed buffer length < maxCompressedLength. (", compressed->size(), " vs ", maxLength, ")"));
output = const_cast<uint8_t*>(compressed->data());
} else {
ARROW_ASSIGN_OR_RAISE(compressedBuffer, arrow::AllocateResizableBuffer(maxLength, pool));
output = compressedBuffer->mutable_data();
}

int64_t actualLength = 0;
// Compress buffers one by one.
for (auto& buffer : buffers) {
auto availableLength = maxCompressedLength - actualLength;
auto availableLength = maxLength - actualLength;
// Release buffer after compression.
ARROW_ASSIGN_OR_RAISE(auto compressedSize, compressBuffer(std::move(buffer), output, availableLength, codec));
output += compressedSize;
actualLength += compressedSize;
}

ARROW_RETURN_IF(actualLength < 0, arrow::Status::Invalid("Writing compressed buffer out of bound."));
RETURN_NOT_OK(compressed->Resize(actualLength));
if (compressed) {
compressedBuffer = std::make_shared<arrow::Buffer>(compressed->data(), actualLength);
} else {
RETURN_NOT_OK(std::dynamic_pointer_cast<arrow::ResizableBuffer>(compressedBuffer)->Resize(actualLength));
}
compressionTime.stop();
auto payload = std::unique_ptr<BlockPayload>(new BlockPayload(
Type::kCompressed,
numRows,
std::vector<std::shared_ptr<arrow::Buffer>>{compressed},
isValidityBuffer,
pool,
codec));
auto payload = std::unique_ptr<BlockPayload>(
new BlockPayload(Type::kCompressed, numRows, {compressedBuffer}, isValidityBuffer, pool, codec));
payload->setCompressionTime(compressionTime.realTimeUsed());
return payload;
}
Expand Down Expand Up @@ -329,6 +329,21 @@ int64_t BlockPayload::rawSize() {
return getBufferSize(buffers_);
}

int64_t BlockPayload::maxCompressedLength(
const std::vector<std::shared_ptr<arrow::Buffer>>& buffers,
arrow::util::Codec* codec) {
// Compressed buffer layout: | buffer1 compressedLength | buffer1 uncompressedLength | buffer1 | ...
const auto metadataLength = sizeof(int64_t) * 2 * buffers.size();
int64_t totalCompressedLength =
std::accumulate(buffers.begin(), buffers.end(), 0LL, [&](auto sum, const auto& buffer) {
if (!buffer) {
return sum;
}
return sum + codec->MaxCompressedLen(buffer->size(), buffer->data());
});
return metadataLength + totalCompressedLength;
}

arrow::Result<std::unique_ptr<InMemoryPayload>> InMemoryPayload::merge(
std::unique_ptr<InMemoryPayload> source,
std::unique_ptr<InMemoryPayload> append,
Expand Down Expand Up @@ -404,9 +419,13 @@ arrow::Result<std::unique_ptr<InMemoryPayload>> InMemoryPayload::merge(
return std::make_unique<InMemoryPayload>(mergedRows, isValidityBuffer, std::move(merged));
}

arrow::Result<std::unique_ptr<BlockPayload>>
InMemoryPayload::toBlockPayload(Payload::Type payloadType, arrow::MemoryPool* pool, arrow::util::Codec* codec) {
return BlockPayload::fromBuffers(payloadType, numRows_, std::move(buffers_), isValidityBuffer_, pool, codec);
arrow::Result<std::unique_ptr<BlockPayload>> InMemoryPayload::toBlockPayload(
Payload::Type payloadType,
arrow::MemoryPool* pool,
arrow::util::Codec* codec,
std::shared_ptr<arrow::Buffer> compressed) {
return BlockPayload::fromBuffers(
payloadType, numRows_, std::move(buffers_), isValidityBuffer_, pool, codec, std::move(compressed));
}

arrow::Status InMemoryPayload::serialize(arrow::io::OutputStream* outputStream) {
Expand Down
14 changes: 11 additions & 3 deletions cpp/core/shuffle/Payload.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ class BlockPayload final : public Payload {
std::vector<std::shared_ptr<arrow::Buffer>> buffers,
const std::vector<bool>* isValidityBuffer,
arrow::MemoryPool* pool,
arrow::util::Codec* codec);
arrow::util::Codec* codec,
std::shared_ptr<arrow::Buffer> compressed);

static arrow::Result<std::vector<std::shared_ptr<arrow::Buffer>>> deserialize(
arrow::io::InputStream* inputStream,
Expand All @@ -93,6 +94,10 @@ class BlockPayload final : public Payload {
uint32_t& numRows,
int64_t& decompressTime);

static int64_t maxCompressedLength(
const std::vector<std::shared_ptr<arrow::Buffer>>& buffers,
arrow::util::Codec* codec);

arrow::Status serialize(arrow::io::OutputStream* outputStream) override;

arrow::Result<std::shared_ptr<arrow::Buffer>> readBufferAt(uint32_t pos) override;
Expand Down Expand Up @@ -131,8 +136,11 @@ class InMemoryPayload final : public Payload {

arrow::Result<std::shared_ptr<arrow::Buffer>> readBufferAt(uint32_t index) override;

arrow::Result<std::unique_ptr<BlockPayload>>
toBlockPayload(Payload::Type payloadType, arrow::MemoryPool* pool, arrow::util::Codec* codec);
arrow::Result<std::unique_ptr<BlockPayload>> toBlockPayload(
Payload::Type payloadType,
arrow::MemoryPool* pool,
arrow::util::Codec* codec,
std::shared_ptr<arrow::Buffer> compressed = nullptr);

arrow::Status copyBuffers(arrow::MemoryPool* pool);

Expand Down
Loading

0 comments on commit f017b2f

Please sign in to comment.