From 6d9cbdd093c582280c8236c8e63363853fe43033 Mon Sep 17 00:00:00 2001 From: Rong Ma Date: Wed, 7 Aug 2024 06:30:05 +0000 Subject: [PATCH 1/4] fix shuffle spill not reported to spark metric --- cpp/core/jni/JniWrapper.cc | 3 ++- cpp/core/shuffle/LocalPartitionWriter.cc | 7 +++++++ cpp/core/shuffle/LocalPartitionWriter.h | 1 + cpp/core/shuffle/Options.h | 1 + cpp/core/shuffle/ShuffleWriter.cc | 4 ++++ cpp/core/shuffle/ShuffleWriter.h | 2 ++ .../org/apache/gluten/vectorized/GlutenSplitResult.java | 7 +++++++ .../org/apache/spark/shuffle/ColumnarShuffleWriter.scala | 2 ++ 8 files changed, 26 insertions(+), 1 deletion(-) diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc index 60f367fd72d1..2f0ff20d55f7 100644 --- a/cpp/core/jni/JniWrapper.cc +++ b/cpp/core/jni/JniWrapper.cc @@ -165,7 +165,7 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) { jniByteInputStreamClose = getMethodIdOrError(env, jniByteInputStreamClass, "close", "()V"); splitResultClass = createGlobalClassReferenceOrError(env, "Lorg/apache/gluten/vectorized/GlutenSplitResult;"); - splitResultConstructor = getMethodIdOrError(env, splitResultClass, "", "(JJJJJJJJJ[J[J)V"); + splitResultConstructor = getMethodIdOrError(env, splitResultClass, "", "(JJJJJJJJJJ[J[J)V"); columnarBatchSerializeResultClass = createGlobalClassReferenceOrError(env, "Lorg/apache/gluten/vectorized/ColumnarBatchSerializeResult;"); @@ -975,6 +975,7 @@ JNIEXPORT jobject JNICALL Java_org_apache_gluten_vectorized_ShuffleWriterJniWrap shuffleWriter->totalC2RTime(), shuffleWriter->totalBytesWritten(), shuffleWriter->totalBytesEvicted(), + shuffleWriter->totalBytesToEvict(), shuffleWriter->peakBytesAllocated(), partitionLengthArr, rawPartitionLengthArr); diff --git a/cpp/core/shuffle/LocalPartitionWriter.cc b/cpp/core/shuffle/LocalPartitionWriter.cc index 031be791bdf0..0eb913e878bf 100644 --- a/cpp/core/shuffle/LocalPartitionWriter.cc +++ b/cpp/core/shuffle/LocalPartitionWriter.cc @@ -564,6 +564,7 @@ arrow::Status LocalPartitionWriter::evict( auto payload, inMemoryPayload->toBlockPayload(payloadType, payloadPool_.get(), codec_ ? codec_.get() : nullptr)); if (!isFinal) { + totalBytesToEvict_ += payload->rawSize(); RETURN_NOT_OK(spiller_->spill(partitionId, std::move(payload))); } else { if (spills_.size() > 0) { @@ -573,6 +574,7 @@ arrow::Status LocalPartitionWriter::evict( partitionLengths_[pid] = totalBytesEvicted_ - bytesEvicted; } } + totalBytesToEvict_ += payload->rawSize(); RETURN_NOT_OK(spiller_->spill(partitionId, std::move(payload))); } lastEvictPid_ = partitionId; @@ -583,6 +585,7 @@ arrow::Status LocalPartitionWriter::evict( RETURN_NOT_OK(requestSpill(false)); ARROW_ASSIGN_OR_RAISE( auto payload, inMemoryPayload->toBlockPayload(Payload::kUncompressed, payloadPool_.get(), nullptr)); + totalBytesToEvict_ += payload->rawSize(); RETURN_NOT_OK(spiller_->spill(partitionId, std::move(payload))); return arrow::Status::OK(); } @@ -614,6 +617,7 @@ arrow::Status LocalPartitionWriter::evict(uint32_t partitionId, std::unique_ptr< RETURN_NOT_OK(requestSpill(stop)); if (!stop) { + totalBytesToEvict_ += blockPayload->rawSize(); RETURN_NOT_OK(spiller_->spill(partitionId, std::move(blockPayload))); } else { if (spills_.size() > 0) { @@ -623,6 +627,7 @@ arrow::Status LocalPartitionWriter::evict(uint32_t partitionId, std::unique_ptr< partitionLengths_[pid] = totalBytesEvicted_ - bytesEvicted; } } + totalBytesToEvict_ += blockPayload->rawSize(); RETURN_NOT_OK(spiller_->spill(partitionId, std::move(blockPayload))); } lastEvictPid_ = partitionId; @@ -656,6 +661,7 @@ arrow::Status LocalPartitionWriter::reclaimFixedSize(int64_t size, int64_t* actu ARROW_ASSIGN_OR_RAISE(auto merged, merger_->finishForSpill(pid)); if (merged.has_value()) { RETURN_NOT_OK(requestSpill(false)); + totalBytesToEvict_ += (*merged)->rawSize(); RETURN_NOT_OK(spiller_->spill(pid, std::move(*merged))); } } @@ -678,6 +684,7 @@ arrow::Status LocalPartitionWriter::populateMetrics(ShuffleWriterMetrics* metric metrics->totalCompressTime += compressTime_; metrics->totalEvictTime += spillTime_; metrics->totalWriteTime += writeTime_; + metrics->totalBytesToEvict += totalBytesToEvict_; metrics->totalBytesEvicted += totalBytesEvicted_; metrics->totalBytesWritten += std::filesystem::file_size(dataFile_); metrics->partitionLengths = std::move(partitionLengths_); diff --git a/cpp/core/shuffle/LocalPartitionWriter.h b/cpp/core/shuffle/LocalPartitionWriter.h index efd7b4df3f4f..555632fedd5d 100644 --- a/cpp/core/shuffle/LocalPartitionWriter.h +++ b/cpp/core/shuffle/LocalPartitionWriter.h @@ -110,6 +110,7 @@ class LocalPartitionWriter : public PartitionWriter { std::vector subDirSelection_; std::shared_ptr dataFileOs_; + int64_t totalBytesToEvict_{0}; int64_t totalBytesEvicted_{0}; std::vector partitionLengths_; std::vector rawPartitionLengths_; diff --git a/cpp/core/shuffle/Options.h b/cpp/core/shuffle/Options.h index 11fa037eb5a6..2424ec557742 100644 --- a/cpp/core/shuffle/Options.h +++ b/cpp/core/shuffle/Options.h @@ -87,6 +87,7 @@ struct PartitionWriterOptions { struct ShuffleWriterMetrics { int64_t totalBytesWritten{0}; int64_t totalBytesEvicted{0}; + int64_t totalBytesToEvict{0}; int64_t totalWriteTime{0}; int64_t totalEvictTime{0}; int64_t totalCompressTime{0}; diff --git a/cpp/core/shuffle/ShuffleWriter.cc b/cpp/core/shuffle/ShuffleWriter.cc index e637d37ffdd8..3eff9a2c821e 100644 --- a/cpp/core/shuffle/ShuffleWriter.cc +++ b/cpp/core/shuffle/ShuffleWriter.cc @@ -55,6 +55,10 @@ int64_t ShuffleWriter::totalBytesEvicted() const { return metrics_.totalBytesEvicted; } +int64_t ShuffleWriter::totalBytesToEvict() const { + return metrics_.totalBytesToEvict; +} + int64_t ShuffleWriter::totalWriteTime() const { return metrics_.totalWriteTime; } diff --git a/cpp/core/shuffle/ShuffleWriter.h b/cpp/core/shuffle/ShuffleWriter.h index 661112150297..8c79829e00a4 100644 --- a/cpp/core/shuffle/ShuffleWriter.h +++ b/cpp/core/shuffle/ShuffleWriter.h @@ -52,6 +52,8 @@ class ShuffleWriter : public Reclaimable { int64_t totalBytesEvicted() const; + int64_t totalBytesToEvict() const; + int64_t totalWriteTime() const; int64_t totalEvictTime() const; diff --git a/gluten-data/src/main/java/org/apache/gluten/vectorized/GlutenSplitResult.java b/gluten-data/src/main/java/org/apache/gluten/vectorized/GlutenSplitResult.java index dbc0d7db5191..335d0983c2e8 100644 --- a/gluten-data/src/main/java/org/apache/gluten/vectorized/GlutenSplitResult.java +++ b/gluten-data/src/main/java/org/apache/gluten/vectorized/GlutenSplitResult.java @@ -17,6 +17,7 @@ package org.apache.gluten.vectorized; public class GlutenSplitResult extends SplitResult { + private final long bytesToEvict; private final long peakBytes; private final long sortTime; private final long c2rTime; @@ -30,6 +31,7 @@ public GlutenSplitResult( long totalC2RTime, long totalBytesWritten, long totalBytesEvicted, + long totalBytesToEvict, // In-memory bytes(uncompressed) before spill. long peakBytes, long[] partitionLengths, long[] rawPartitionLengths) { @@ -42,11 +44,16 @@ public GlutenSplitResult( totalBytesEvicted, partitionLengths, rawPartitionLengths); + this.bytesToEvict = totalBytesEvicted; this.peakBytes = peakBytes; this.sortTime = totalSortTime; this.c2rTime = totalC2RTime; } + public long getBytesToEvict() { + return bytesToEvict; + } + public long getPeakBytes() { return peakBytes; } diff --git a/gluten-data/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala b/gluten-data/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala index d62ff1d68d6d..30ba8ec6f264 100644 --- a/gluten-data/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala +++ b/gluten-data/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala @@ -210,6 +210,8 @@ class ColumnarShuffleWriter[K, V]( dep.metrics("peakBytes").add(splitResult.getPeakBytes) writeMetrics.incBytesWritten(splitResult.getTotalBytesWritten) writeMetrics.incWriteTime(splitResult.getTotalWriteTime + splitResult.getTotalSpillTime) + taskContext.taskMetrics().incMemoryBytesSpilled(splitResult.getBytesToEvict) + taskContext.taskMetrics().incDiskBytesSpilled(splitResult.getTotalBytesSpilled) partitionLengths = splitResult.getPartitionLengths try { From 63bb54727b4a004919351aecc27bfe1373b5d091 Mon Sep 17 00:00:00 2001 From: Rong Ma Date: Thu, 8 Aug 2024 02:26:11 +0000 Subject: [PATCH 2/4] fix --- cpp/core/shuffle/Payload.cc | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/cpp/core/shuffle/Payload.cc b/cpp/core/shuffle/Payload.cc index b8d8274cb782..493a88e99385 100644 --- a/cpp/core/shuffle/Payload.cc +++ b/cpp/core/shuffle/Payload.cc @@ -326,8 +326,9 @@ void BlockPayload::setCompressionTime(int64_t compressionTime) { } uint64_t BlockPayload::rawSize() { - return std::accumulate( - buffers_.begin(), buffers_.end(), 0UL, [](auto sum, const auto& buffer) { return sum + buffer->size(); }); + return std::accumulate(buffers_.begin(), buffers_.end(), 0UL, [](auto sum, const auto& buffer) { + return buffer ? sum + buffer->size() : sum; + }); } arrow::Result> InMemoryPayload::merge( From b211ac6526a891db2c5e36d75e307982e9a65c5d Mon Sep 17 00:00:00 2001 From: Rong Ma Date: Thu, 8 Aug 2024 04:10:57 +0000 Subject: [PATCH 3/4] fix wrong spill(memory) size --- cpp/core/shuffle/LocalPartitionWriter.cc | 14 ++++++++------ cpp/core/shuffle/Payload.cc | 5 +++-- 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/cpp/core/shuffle/LocalPartitionWriter.cc b/cpp/core/shuffle/LocalPartitionWriter.cc index 0eb913e878bf..39e9f3b417c1 100644 --- a/cpp/core/shuffle/LocalPartitionWriter.cc +++ b/cpp/core/shuffle/LocalPartitionWriter.cc @@ -183,12 +183,15 @@ class LocalPartitionWriter::PayloadMerger { return merged; } - arrow::Result>> finishForSpill(uint32_t partitionId) { + arrow::Result>> finishForSpill( + uint32_t partitionId, + int64_t& totalBytesToEvict) { // We need to check whether the spill source is from compressing/copying the merged buffers. if ((partitionInMerge_.has_value() && *partitionInMerge_ == partitionId) || !hasMerged(partitionId)) { return std::nullopt; } auto payload = std::move(partitionMergePayload_[partitionId]); + totalBytesToEvict += payload->rawSize(); return payload->toBlockPayload(Payload::kUncompressed, pool_, codec_); } @@ -559,12 +562,12 @@ arrow::Status LocalPartitionWriter::evict( } RETURN_NOT_OK(requestSpill(isFinal)); + totalBytesToEvict_ += inMemoryPayload->rawSize(); auto payloadType = codec_ ? Payload::Type::kCompressed : Payload::Type::kUncompressed; ARROW_ASSIGN_OR_RAISE( auto payload, inMemoryPayload->toBlockPayload(payloadType, payloadPool_.get(), codec_ ? codec_.get() : nullptr)); if (!isFinal) { - totalBytesToEvict_ += payload->rawSize(); RETURN_NOT_OK(spiller_->spill(partitionId, std::move(payload))); } else { if (spills_.size() > 0) { @@ -574,7 +577,6 @@ arrow::Status LocalPartitionWriter::evict( partitionLengths_[pid] = totalBytesEvicted_ - bytesEvicted; } } - totalBytesToEvict_ += payload->rawSize(); RETURN_NOT_OK(spiller_->spill(partitionId, std::move(payload))); } lastEvictPid_ = partitionId; @@ -583,9 +585,9 @@ arrow::Status LocalPartitionWriter::evict( if (evictType == Evict::kSpill) { RETURN_NOT_OK(requestSpill(false)); + totalBytesToEvict_ += inMemoryPayload->rawSize(); ARROW_ASSIGN_OR_RAISE( auto payload, inMemoryPayload->toBlockPayload(Payload::kUncompressed, payloadPool_.get(), nullptr)); - totalBytesToEvict_ += payload->rawSize(); RETURN_NOT_OK(spiller_->spill(partitionId, std::move(payload))); return arrow::Status::OK(); } @@ -607,6 +609,7 @@ arrow::Status LocalPartitionWriter::evict( return arrow::Status::OK(); } +// FIXME: Remove this code path for local partition writer. arrow::Status LocalPartitionWriter::evict(uint32_t partitionId, std::unique_ptr blockPayload, bool stop) { rawPartitionLengths_[partitionId] += blockPayload->rawSize(); @@ -658,10 +661,9 @@ arrow::Status LocalPartitionWriter::reclaimFixedSize(int64_t size, int64_t* actu if (merger_) { auto beforeSpill = payloadPool_->bytes_allocated(); for (auto pid = 0; pid < numPartitions_; ++pid) { - ARROW_ASSIGN_OR_RAISE(auto merged, merger_->finishForSpill(pid)); + ARROW_ASSIGN_OR_RAISE(auto merged, merger_->finishForSpill(pid, totalBytesToEvict_)); if (merged.has_value()) { RETURN_NOT_OK(requestSpill(false)); - totalBytesToEvict_ += (*merged)->rawSize(); RETURN_NOT_OK(spiller_->spill(pid, std::move(*merged))); } } diff --git a/cpp/core/shuffle/Payload.cc b/cpp/core/shuffle/Payload.cc index 493a88e99385..04c58edaeced 100644 --- a/cpp/core/shuffle/Payload.cc +++ b/cpp/core/shuffle/Payload.cc @@ -440,8 +440,9 @@ arrow::Status InMemoryPayload::copyBuffers(arrow::MemoryPool* pool) { } uint64_t InMemoryPayload::rawSize() { - return std::accumulate( - buffers_.begin(), buffers_.end(), 0UL, [](auto sum, const auto& buffer) { return sum + buffer->size(); }); + return std::accumulate(buffers_.begin(), buffers_.end(), 0UL, [](auto sum, const auto& buffer) { + return buffer ? sum + buffer->size() : sum; + }); } UncompressedDiskBlockPayload::UncompressedDiskBlockPayload( From 9512482b79cde9a2a192dcef567f671c873f3efe Mon Sep 17 00:00:00 2001 From: Rong Ma Date: Thu, 8 Aug 2024 07:34:54 +0000 Subject: [PATCH 4/4] fix --- cpp/core/shuffle/LocalPartitionWriter.cc | 13 +++++------ cpp/core/shuffle/Payload.cc | 22 ++++++------------- cpp/core/shuffle/Payload.h | 18 +++++++-------- cpp/core/shuffle/Spill.cc | 2 +- cpp/core/shuffle/Spill.h | 2 +- cpp/core/shuffle/rss/RssPartitionWriter.cc | 2 +- cpp/velox/shuffle/VeloxHashShuffleWriter.cc | 1 + cpp/velox/shuffle/VeloxSortShuffleWriter.cc | 8 +++++++ cpp/velox/shuffle/VeloxSortShuffleWriter.h | 2 ++ .../gluten/vectorized/GlutenSplitResult.java | 2 +- 10 files changed, 36 insertions(+), 36 deletions(-) diff --git a/cpp/core/shuffle/LocalPartitionWriter.cc b/cpp/core/shuffle/LocalPartitionWriter.cc index 39e9f3b417c1..fe206b488cf8 100644 --- a/cpp/core/shuffle/LocalPartitionWriter.cc +++ b/cpp/core/shuffle/LocalPartitionWriter.cc @@ -315,7 +315,8 @@ class LocalPartitionWriter::PayloadCache { std::shared_ptr os, const std::string& spillFile, arrow::MemoryPool* pool, - arrow::util::Codec* codec) { + arrow::util::Codec* codec, + int64_t& totalBytesToEvict) { std::shared_ptr diskSpill = nullptr; ARROW_ASSIGN_OR_RAISE(auto start, os->Tell()); for (uint32_t pid = 0; pid < numPartitions_; ++pid) { @@ -324,6 +325,7 @@ class LocalPartitionWriter::PayloadCache { while (!payloads.empty()) { auto payload = std::move(payloads.front()); payloads.pop_front(); + totalBytesToEvict += payload->rawSize(); // Spill the cached payload to disk. RETURN_NOT_OK(payload->serialize(os.get())); compressTime_ += payload->getCompressTime(); @@ -553,7 +555,7 @@ arrow::Status LocalPartitionWriter::evict( bool reuseBuffers, bool hasComplexType, bool isFinal) { - rawPartitionLengths_[partitionId] += inMemoryPayload->getBufferSize(); + rawPartitionLengths_[partitionId] += inMemoryPayload->rawSize(); if (evictType == Evict::kSortSpill) { if (lastEvictPid_ != -1 && (partitionId < lastEvictPid_ || (isFinal && !dataFileOs_))) { @@ -562,7 +564,6 @@ arrow::Status LocalPartitionWriter::evict( } RETURN_NOT_OK(requestSpill(isFinal)); - totalBytesToEvict_ += inMemoryPayload->rawSize(); auto payloadType = codec_ ? Payload::Type::kCompressed : Payload::Type::kUncompressed; ARROW_ASSIGN_OR_RAISE( auto payload, @@ -585,7 +586,6 @@ arrow::Status LocalPartitionWriter::evict( if (evictType == Evict::kSpill) { RETURN_NOT_OK(requestSpill(false)); - totalBytesToEvict_ += inMemoryPayload->rawSize(); ARROW_ASSIGN_OR_RAISE( auto payload, inMemoryPayload->toBlockPayload(Payload::kUncompressed, payloadPool_.get(), nullptr)); RETURN_NOT_OK(spiller_->spill(partitionId, std::move(payload))); @@ -620,7 +620,6 @@ arrow::Status LocalPartitionWriter::evict(uint32_t partitionId, std::unique_ptr< RETURN_NOT_OK(requestSpill(stop)); if (!stop) { - totalBytesToEvict_ += blockPayload->rawSize(); RETURN_NOT_OK(spiller_->spill(partitionId, std::move(blockPayload))); } else { if (spills_.size() > 0) { @@ -630,7 +629,6 @@ arrow::Status LocalPartitionWriter::evict(uint32_t partitionId, std::unique_ptr< partitionLengths_[pid] = totalBytesEvicted_ - bytesEvicted; } } - totalBytesToEvict_ += blockPayload->rawSize(); RETURN_NOT_OK(spiller_->spill(partitionId, std::move(blockPayload))); } lastEvictPid_ = partitionId; @@ -650,7 +648,8 @@ arrow::Status LocalPartitionWriter::reclaimFixedSize(int64_t size, int64_t* actu ARROW_ASSIGN_OR_RAISE(auto os, arrow::io::BufferedOutputStream::Create(16384, pool_, raw)); spills_.emplace_back(); ARROW_ASSIGN_OR_RAISE( - spills_.back(), payloadCache_->spillAndClose(os, spillFile, payloadPool_.get(), codec_.get())); + spills_.back(), + payloadCache_->spillAndClose(os, spillFile, payloadPool_.get(), codec_.get(), totalBytesToEvict_)); reclaimed += beforeSpill - payloadPool_->bytes_allocated(); if (reclaimed >= size) { *actual = reclaimed; diff --git a/cpp/core/shuffle/Payload.cc b/cpp/core/shuffle/Payload.cc index 04c58edaeced..d0c24e4bcaba 100644 --- a/cpp/core/shuffle/Payload.cc +++ b/cpp/core/shuffle/Payload.cc @@ -325,10 +325,8 @@ void BlockPayload::setCompressionTime(int64_t compressionTime) { compressTime_ = compressionTime; } -uint64_t BlockPayload::rawSize() { - return std::accumulate(buffers_.begin(), buffers_.end(), 0UL, [](auto sum, const auto& buffer) { - return buffer ? sum + buffer->size() : sum; - }); +int64_t BlockPayload::rawSize() { + return getBufferSize(buffers_); } arrow::Result> InMemoryPayload::merge( @@ -419,10 +417,6 @@ arrow::Result> InMemoryPayload::readBufferAt(uint return std::move(buffers_[index]); } -int64_t InMemoryPayload::getBufferSize() const { - return gluten::getBufferSize(buffers_); -} - arrow::Status InMemoryPayload::copyBuffers(arrow::MemoryPool* pool) { for (auto& buffer : buffers_) { if (!buffer) { @@ -439,10 +433,8 @@ arrow::Status InMemoryPayload::copyBuffers(arrow::MemoryPool* pool) { return arrow::Status::OK(); } -uint64_t InMemoryPayload::rawSize() { - return std::accumulate(buffers_.begin(), buffers_.end(), 0UL, [](auto sum, const auto& buffer) { - return buffer ? sum + buffer->size() : sum; - }); +int64_t InMemoryPayload::rawSize() { + return getBufferSize(buffers_); } UncompressedDiskBlockPayload::UncompressedDiskBlockPayload( @@ -515,7 +507,7 @@ arrow::Result> UncompressedDiskBlockPayload::read return buffer; } -uint64_t UncompressedDiskBlockPayload::rawSize() { +int64_t UncompressedDiskBlockPayload::rawSize() { return rawSize_; } @@ -523,7 +515,7 @@ CompressedDiskBlockPayload::CompressedDiskBlockPayload( uint32_t numRows, const std::vector* isValidityBuffer, arrow::io::InputStream*& inputStream, - uint64_t rawSize, + int64_t rawSize, arrow::MemoryPool* /* pool */) : Payload(Type::kCompressed, numRows, isValidityBuffer), inputStream_(inputStream), rawSize_(rawSize) {} @@ -538,7 +530,7 @@ arrow::Result> CompressedDiskBlockPayload::readBu return arrow::Status::Invalid("Cannot read buffer from CompressedDiskBlockPayload."); } -uint64_t CompressedDiskBlockPayload::rawSize() { +int64_t CompressedDiskBlockPayload::rawSize() { return rawSize_; } } // namespace gluten diff --git a/cpp/core/shuffle/Payload.h b/cpp/core/shuffle/Payload.h index 0a317d9c3af9..1bd8815a4c2a 100644 --- a/cpp/core/shuffle/Payload.h +++ b/cpp/core/shuffle/Payload.h @@ -38,7 +38,7 @@ class Payload { virtual arrow::Result> readBufferAt(uint32_t index) = 0; - virtual uint64_t rawSize() = 0; + virtual int64_t rawSize() = 0; int64_t getCompressTime() const { return compressTime_; @@ -97,7 +97,7 @@ class BlockPayload final : public Payload { arrow::Result> readBufferAt(uint32_t pos) override; - uint64_t rawSize() override; + int64_t rawSize() override; protected: BlockPayload( @@ -134,11 +134,9 @@ class InMemoryPayload final : public Payload { arrow::Result> toBlockPayload(Payload::Type payloadType, arrow::MemoryPool* pool, arrow::util::Codec* codec); - int64_t getBufferSize() const; - arrow::Status copyBuffers(arrow::MemoryPool* pool); - uint64_t rawSize() override; + int64_t rawSize() override; private: std::vector> buffers_; @@ -159,11 +157,11 @@ class UncompressedDiskBlockPayload final : public Payload { arrow::Status serialize(arrow::io::OutputStream* outputStream) override; - uint64_t rawSize() override; + int64_t rawSize() override; private: arrow::io::InputStream*& inputStream_; - uint64_t rawSize_; + int64_t rawSize_; arrow::MemoryPool* pool_; arrow::util::Codec* codec_; uint32_t readPos_{0}; @@ -177,17 +175,17 @@ class CompressedDiskBlockPayload final : public Payload { uint32_t numRows, const std::vector* isValidityBuffer, arrow::io::InputStream*& inputStream, - uint64_t rawSize, + int64_t rawSize, arrow::MemoryPool* pool); arrow::Status serialize(arrow::io::OutputStream* outputStream) override; arrow::Result> readBufferAt(uint32_t index) override; - uint64_t rawSize() override; + int64_t rawSize() override; private: arrow::io::InputStream*& inputStream_; - uint64_t rawSize_; + int64_t rawSize_; }; } // namespace gluten diff --git a/cpp/core/shuffle/Spill.cc b/cpp/core/shuffle/Spill.cc index 0bbe667ab4d8..d8b9bc7ebf99 100644 --- a/cpp/core/shuffle/Spill.cc +++ b/cpp/core/shuffle/Spill.cc @@ -48,7 +48,7 @@ void Spill::insertPayload( Payload::Type payloadType, uint32_t numRows, const std::vector* isValidityBuffer, - uint64_t rawSize, + int64_t rawSize, arrow::MemoryPool* pool, arrow::util::Codec* codec) { // TODO: Add compression threshold. diff --git a/cpp/core/shuffle/Spill.h b/cpp/core/shuffle/Spill.h index 7ee60ef299fe..9d8d240879f9 100644 --- a/cpp/core/shuffle/Spill.h +++ b/cpp/core/shuffle/Spill.h @@ -46,7 +46,7 @@ class Spill final { Payload::Type payloadType, uint32_t numRows, const std::vector* isValidityBuffer, - uint64_t rawSize, + int64_t rawSize, arrow::MemoryPool* pool, arrow::util::Codec* codec); diff --git a/cpp/core/shuffle/rss/RssPartitionWriter.cc b/cpp/core/shuffle/rss/RssPartitionWriter.cc index 19f178a2cbc7..8f75f999335f 100644 --- a/cpp/core/shuffle/rss/RssPartitionWriter.cc +++ b/cpp/core/shuffle/rss/RssPartitionWriter.cc @@ -56,7 +56,7 @@ arrow::Status RssPartitionWriter::evict( bool reuseBuffers, bool hasComplexType, bool isFinal) { - rawPartitionLengths_[partitionId] += inMemoryPayload->getBufferSize(); + rawPartitionLengths_[partitionId] += inMemoryPayload->rawSize(); auto payloadType = codec_ ? Payload::Type::kCompressed : Payload::Type::kUncompressed; ARROW_ASSIGN_OR_RAISE( auto payload, inMemoryPayload->toBlockPayload(payloadType, payloadPool_.get(), codec_ ? codec_.get() : nullptr)); diff --git a/cpp/velox/shuffle/VeloxHashShuffleWriter.cc b/cpp/velox/shuffle/VeloxHashShuffleWriter.cc index e165d4a91da8..00d8be16656e 100644 --- a/cpp/velox/shuffle/VeloxHashShuffleWriter.cc +++ b/cpp/velox/shuffle/VeloxHashShuffleWriter.cc @@ -1364,6 +1364,7 @@ arrow::Result VeloxHashShuffleWriter::evictPartitionBuffersMinSize(int6 auto pid = item.first; ARROW_ASSIGN_OR_RAISE(auto buffers, assembleBuffers(pid, false)); auto payload = std::make_unique(item.second, &isValidityBuffer_, std::move(buffers)); + metrics_.totalBytesToEvict += payload->rawSize(); RETURN_NOT_OK(partitionWriter_->evict(pid, std::move(payload), Evict::kSpill, false, hasComplexType_, false)); evicted = beforeEvict - partitionBufferPool_->bytes_allocated(); if (evicted >= size) { diff --git a/cpp/velox/shuffle/VeloxSortShuffleWriter.cc b/cpp/velox/shuffle/VeloxSortShuffleWriter.cc index d15280c0acd4..7f3e9201f124 100644 --- a/cpp/velox/shuffle/VeloxSortShuffleWriter.cc +++ b/cpp/velox/shuffle/VeloxSortShuffleWriter.cc @@ -283,6 +283,7 @@ arrow::Status VeloxSortShuffleWriter::evictPartition(uint32_t partitionId, size_ index - begin, nullptr, std::vector>{std::make_shared(rawBuffer_, offset)}); + updateSpillMetrics(payload); RETURN_NOT_OK( partitionWriter_->evict(partitionId, std::move(payload), Evict::type::kSortSpill, false, false, stopped_)); begin = index; @@ -296,6 +297,7 @@ arrow::Status VeloxSortShuffleWriter::evictPartition(uint32_t partitionId, size_ end - begin, nullptr, std::vector>{std::make_shared(rawBuffer_, offset)}); + updateSpillMetrics(payload); RETURN_NOT_OK( partitionWriter_->evict(partitionId, std::move(payload), Evict::type::kSortSpill, false, false, stopped_)); return arrow::Status::OK(); @@ -399,4 +401,10 @@ void VeloxSortShuffleWriter::allocateMinimalArray() { options_.sortBufferInitialSize * sizeof(uint64_t), veloxPool_.get()); setUpArray(std::move(array)); } + +void VeloxSortShuffleWriter::updateSpillMetrics(const std::unique_ptr& payload) { + if (!stopped_) { + metrics_.totalBytesToEvict += payload->rawSize(); + } +} } // namespace gluten diff --git a/cpp/velox/shuffle/VeloxSortShuffleWriter.h b/cpp/velox/shuffle/VeloxSortShuffleWriter.h index 2925b85f8ea5..34fbfd243df4 100644 --- a/cpp/velox/shuffle/VeloxSortShuffleWriter.h +++ b/cpp/velox/shuffle/VeloxSortShuffleWriter.h @@ -89,6 +89,8 @@ class VeloxSortShuffleWriter final : public VeloxShuffleWriter { void allocateMinimalArray(); + void updateSpillMetrics(const std::unique_ptr& payload); + // Stores compact row id -> row facebook::velox::BufferPtr array_; uint64_t* arrayPtr_; diff --git a/gluten-data/src/main/java/org/apache/gluten/vectorized/GlutenSplitResult.java b/gluten-data/src/main/java/org/apache/gluten/vectorized/GlutenSplitResult.java index 335d0983c2e8..3bed6ac794fe 100644 --- a/gluten-data/src/main/java/org/apache/gluten/vectorized/GlutenSplitResult.java +++ b/gluten-data/src/main/java/org/apache/gluten/vectorized/GlutenSplitResult.java @@ -44,7 +44,7 @@ public GlutenSplitResult( totalBytesEvicted, partitionLengths, rawPartitionLengths); - this.bytesToEvict = totalBytesEvicted; + this.bytesToEvict = totalBytesToEvict; this.peakBytes = peakBytes; this.sortTime = totalSortTime; this.c2rTime = totalC2RTime;