diff --git a/cpp/core/shuffle/LocalPartitionWriter.cc b/cpp/core/shuffle/LocalPartitionWriter.cc index fe206b488cf8f..a0b0b83fd2eb7 100644 --- a/cpp/core/shuffle/LocalPartitionWriter.cc +++ b/cpp/core/shuffle/LocalPartitionWriter.cc @@ -385,17 +385,15 @@ std::string LocalPartitionWriter::nextSpilledFileDir() { return spilledFileDir; } -arrow::Status LocalPartitionWriter::openDataFile() { - // open data file output stream +arrow::Result> LocalPartitionWriter::openFile(const std::string& file) { std::shared_ptr fout; - ARROW_ASSIGN_OR_RAISE(fout, arrow::io::FileOutputStream::Open(dataFile_)); + ARROW_ASSIGN_OR_RAISE(fout, arrow::io::FileOutputStream::Open(file)); if (options_.bufferedWrite) { - // Output stream buffer is neither partition buffer memory nor ipc memory. - ARROW_ASSIGN_OR_RAISE(dataFileOs_, arrow::io::BufferedOutputStream::Create(16384, pool_, fout)); - } else { - dataFileOs_ = fout; + // The 16k bytes is a temporary allocation and will be freed with file close. + // Use default memory pool and count treat the memory as executor memory overhead to avoid unnecessary spill. + return arrow::io::BufferedOutputStream::Create(16384, arrow::default_memory_pool(), fout); } - return arrow::Status::OK(); + return fout; } arrow::Status LocalPartitionWriter::clearResource() { @@ -467,9 +465,7 @@ arrow::Status LocalPartitionWriter::stop(ShuffleWriterMetrics* metrics) { compressTime_ += spill->compressTime(); } else { RETURN_NOT_OK(finishSpill(true)); - // Open final data file. - // If options_.bufferedWrite is set, it will acquire 16KB memory that can trigger spill. - RETURN_NOT_OK(openDataFile()); + ARROW_ASSIGN_OR_RAISE(dataFileOs_, openFile(dataFile_)); int64_t endInFinalFile = 0; DLOG(INFO) << "LocalPartitionWriter stopped. Total spills: " << spills_.size(); @@ -523,14 +519,13 @@ arrow::Status LocalPartitionWriter::requestSpill(bool isFinal) { std::string spillFile; std::shared_ptr os; if (isFinal) { - RETURN_NOT_OK(openDataFile()); + ARROW_ASSIGN_OR_RAISE(dataFileOs_, openFile(dataFile_)); spillFile = dataFile_; os = dataFileOs_; useSpillFileAsDataFile_ = true; } else { ARROW_ASSIGN_OR_RAISE(spillFile, createTempShuffleFile(nextSpilledFileDir())); - ARROW_ASSIGN_OR_RAISE(auto raw, arrow::io::FileOutputStream::Open(spillFile, true)); - ARROW_ASSIGN_OR_RAISE(os, arrow::io::BufferedOutputStream::Create(16384, pool_, raw)); + ARROW_ASSIGN_OR_RAISE(os, openFile(spillFile)); } spiller_ = std::make_unique( os, std::move(spillFile), options_.compressionThreshold, payloadPool_.get(), codec_.get()); @@ -548,42 +543,14 @@ arrow::Status LocalPartitionWriter::finishSpill(bool close) { return arrow::Status::OK(); } -arrow::Status LocalPartitionWriter::evict( +arrow::Status LocalPartitionWriter::hashEvict( uint32_t partitionId, std::unique_ptr inMemoryPayload, Evict::type evictType, bool reuseBuffers, - bool hasComplexType, - bool isFinal) { + bool hasComplexType) { rawPartitionLengths_[partitionId] += inMemoryPayload->rawSize(); - if (evictType == Evict::kSortSpill) { - if (lastEvictPid_ != -1 && (partitionId < lastEvictPid_ || (isFinal && !dataFileOs_))) { - lastEvictPid_ = -1; - RETURN_NOT_OK(finishSpill(true)); - } - RETURN_NOT_OK(requestSpill(isFinal)); - - auto payloadType = codec_ ? Payload::Type::kCompressed : Payload::Type::kUncompressed; - ARROW_ASSIGN_OR_RAISE( - auto payload, - inMemoryPayload->toBlockPayload(payloadType, payloadPool_.get(), codec_ ? codec_.get() : nullptr)); - if (!isFinal) { - RETURN_NOT_OK(spiller_->spill(partitionId, std::move(payload))); - } else { - if (spills_.size() > 0) { - for (auto pid = lastEvictPid_ + 1; pid <= partitionId; ++pid) { - auto bytesEvicted = totalBytesEvicted_; - RETURN_NOT_OK(mergeSpills(pid)); - partitionLengths_[pid] = totalBytesEvicted_ - bytesEvicted; - } - } - RETURN_NOT_OK(spiller_->spill(partitionId, std::move(payload))); - } - lastEvictPid_ = partitionId; - return arrow::Status::OK(); - } - if (evictType == Evict::kSpill) { RETURN_NOT_OK(requestSpill(false)); ARROW_ASSIGN_OR_RAISE( @@ -609,6 +576,38 @@ arrow::Status LocalPartitionWriter::evict( return arrow::Status::OK(); } +arrow::Status LocalPartitionWriter::sortEvict( + uint32_t partitionId, + std::unique_ptr inMemoryPayload, + std::shared_ptr compressed, + bool isFinal) { + if (lastEvictPid_ != -1 && (partitionId < lastEvictPid_ || (isFinal && !dataFileOs_))) { + lastEvictPid_ = -1; + RETURN_NOT_OK(finishSpill(true)); + } + RETURN_NOT_OK(requestSpill(isFinal)); + + auto payloadType = codec_ ? Payload::Type::kCompressed : Payload::Type::kUncompressed; + ARROW_ASSIGN_OR_RAISE( + auto payload, + inMemoryPayload->toBlockPayload( + payloadType, payloadPool_.get(), codec_ ? codec_.get() : nullptr, std::move(compressed))); + if (!isFinal) { + RETURN_NOT_OK(spiller_->spill(partitionId, std::move(payload))); + } else { + if (spills_.size() > 0) { + for (auto pid = lastEvictPid_ + 1; pid <= partitionId; ++pid) { + auto bytesEvicted = totalBytesEvicted_; + RETURN_NOT_OK(mergeSpills(pid)); + partitionLengths_[pid] = totalBytesEvicted_ - bytesEvicted; + } + } + RETURN_NOT_OK(spiller_->spill(partitionId, std::move(payload))); + } + lastEvictPid_ = partitionId; + return arrow::Status::OK(); +} + // FIXME: Remove this code path for local partition writer. arrow::Status LocalPartitionWriter::evict(uint32_t partitionId, std::unique_ptr blockPayload, bool stop) { rawPartitionLengths_[partitionId] += blockPayload->rawSize(); @@ -644,8 +643,7 @@ arrow::Status LocalPartitionWriter::reclaimFixedSize(int64_t size, int64_t* actu if (payloadCache_ && payloadCache_->canSpill()) { auto beforeSpill = payloadPool_->bytes_allocated(); ARROW_ASSIGN_OR_RAISE(auto spillFile, createTempShuffleFile(nextSpilledFileDir())); - ARROW_ASSIGN_OR_RAISE(auto raw, arrow::io::FileOutputStream::Open(spillFile, true)); - ARROW_ASSIGN_OR_RAISE(auto os, arrow::io::BufferedOutputStream::Create(16384, pool_, raw)); + ARROW_ASSIGN_OR_RAISE(auto os, openFile(spillFile)); spills_.emplace_back(); ARROW_ASSIGN_OR_RAISE( spills_.back(), diff --git a/cpp/core/shuffle/LocalPartitionWriter.h b/cpp/core/shuffle/LocalPartitionWriter.h index 555632fedd5dd..826eb22f7431f 100644 --- a/cpp/core/shuffle/LocalPartitionWriter.h +++ b/cpp/core/shuffle/LocalPartitionWriter.h @@ -35,12 +35,17 @@ class LocalPartitionWriter : public PartitionWriter { const std::string& dataFile, const std::vector& localDirs); - arrow::Status evict( + arrow::Status hashEvict( uint32_t partitionId, std::unique_ptr inMemoryPayload, Evict::type evictType, bool reuseBuffers, - bool hasComplexType, + bool hasComplexType) override; + + arrow::Status sortEvict( + uint32_t partitionId, + std::unique_ptr inMemoryPayload, + std::shared_ptr compressed, bool isFinal) override; arrow::Status evict(uint32_t partitionId, std::unique_ptr blockPayload, bool stop) override; @@ -87,7 +92,7 @@ class LocalPartitionWriter : public PartitionWriter { std::string nextSpilledFileDir(); - arrow::Status openDataFile(); + arrow::Result> openFile(const std::string& file); arrow::Status mergeSpills(uint32_t partitionId); diff --git a/cpp/core/shuffle/PartitionWriter.h b/cpp/core/shuffle/PartitionWriter.h index 3774198c996e7..da091bcb02806 100644 --- a/cpp/core/shuffle/PartitionWriter.h +++ b/cpp/core/shuffle/PartitionWriter.h @@ -26,7 +26,7 @@ namespace gluten { struct Evict { - enum type { kCache, kSpill, kSortSpill }; + enum type { kCache, kSpill }; }; class PartitionWriter : public Reclaimable { @@ -42,14 +42,29 @@ class PartitionWriter : public Reclaimable { virtual arrow::Status stop(ShuffleWriterMetrics* metrics) = 0; /// Evict buffers for `partitionId` partition. - virtual arrow::Status evict( + virtual arrow::Status hashEvict( uint32_t partitionId, std::unique_ptr inMemoryPayload, Evict::type evictType, bool reuseBuffers, - bool hasComplexType, + bool hasComplexType) = 0; + + virtual arrow::Status sortEvict( + uint32_t partitionId, + std::unique_ptr inMemoryPayload, + std::shared_ptr compressed, bool isFinal) = 0; + arrow::Result> getCompressedBuffer( + const std::vector>& buffers, + arrow::MemoryPool* pool) { + if (!codec_) { + return nullptr; + } + auto compressedLength = BlockPayload::maxCompressedLength(buffers, codec_.get()); + return arrow::AllocateBuffer(compressedLength, pool); + } + virtual arrow::Status evict(uint32_t partitionId, std::unique_ptr blockPayload, bool stop) = 0; uint64_t cachedPayloadSize() { diff --git a/cpp/core/shuffle/Payload.cc b/cpp/core/shuffle/Payload.cc index d0c24e4bcabaf..62e1823fe8745 100644 --- a/cpp/core/shuffle/Payload.cc +++ b/cpp/core/shuffle/Payload.cc @@ -186,29 +186,30 @@ arrow::Result> BlockPayload::fromBuffers( std::vector> buffers, const std::vector* isValidityBuffer, arrow::MemoryPool* pool, - arrow::util::Codec* codec) { + arrow::util::Codec* codec, + std::shared_ptr compressed) { if (payloadType == Payload::Type::kCompressed) { Timer compressionTime; compressionTime.start(); // Compress. - // Compressed buffer layout: | buffer1 compressedLength | buffer1 uncompressedLength | buffer1 | ... - const auto metadataLength = sizeof(int64_t) * 2 * buffers.size(); - int64_t totalCompressedLength = - std::accumulate(buffers.begin(), buffers.end(), 0LL, [&](auto sum, const auto& buffer) { - if (!buffer) { - return sum; - } - return sum + codec->MaxCompressedLen(buffer->size(), buffer->data()); - }); - const auto maxCompressedLength = metadataLength + totalCompressedLength; - ARROW_ASSIGN_OR_RAISE( - std::shared_ptr compressed, arrow::AllocateResizableBuffer(maxCompressedLength, pool)); - - auto output = compressed->mutable_data(); + auto maxLength = maxCompressedLength(buffers, codec); + std::shared_ptr compressedBuffer; + uint8_t* output; + if (compressed) { + ARROW_RETURN_IF( + compressed->size() < maxLength, + arrow::Status::Invalid( + "Compressed buffer length < maxCompressedLength. (", compressed->size(), " vs ", maxLength, ")")); + output = const_cast(compressed->data()); + } else { + ARROW_ASSIGN_OR_RAISE(compressedBuffer, arrow::AllocateResizableBuffer(maxLength, pool)); + output = compressedBuffer->mutable_data(); + } + int64_t actualLength = 0; // Compress buffers one by one. for (auto& buffer : buffers) { - auto availableLength = maxCompressedLength - actualLength; + auto availableLength = maxLength - actualLength; // Release buffer after compression. ARROW_ASSIGN_OR_RAISE(auto compressedSize, compressBuffer(std::move(buffer), output, availableLength, codec)); output += compressedSize; @@ -216,15 +217,14 @@ arrow::Result> BlockPayload::fromBuffers( } ARROW_RETURN_IF(actualLength < 0, arrow::Status::Invalid("Writing compressed buffer out of bound.")); - RETURN_NOT_OK(compressed->Resize(actualLength)); + if (compressed) { + compressedBuffer = std::make_shared(compressed->data(), actualLength); + } else { + RETURN_NOT_OK(std::dynamic_pointer_cast(compressedBuffer)->Resize(actualLength)); + } compressionTime.stop(); - auto payload = std::unique_ptr(new BlockPayload( - Type::kCompressed, - numRows, - std::vector>{compressed}, - isValidityBuffer, - pool, - codec)); + auto payload = std::unique_ptr( + new BlockPayload(Type::kCompressed, numRows, {compressedBuffer}, isValidityBuffer, pool, codec)); payload->setCompressionTime(compressionTime.realTimeUsed()); return payload; } @@ -329,6 +329,21 @@ int64_t BlockPayload::rawSize() { return getBufferSize(buffers_); } +int64_t BlockPayload::maxCompressedLength( + const std::vector>& buffers, + arrow::util::Codec* codec) { + // Compressed buffer layout: | buffer1 compressedLength | buffer1 uncompressedLength | buffer1 | ... + const auto metadataLength = sizeof(int64_t) * 2 * buffers.size(); + int64_t totalCompressedLength = + std::accumulate(buffers.begin(), buffers.end(), 0LL, [&](auto sum, const auto& buffer) { + if (!buffer) { + return sum; + } + return sum + codec->MaxCompressedLen(buffer->size(), buffer->data()); + }); + return metadataLength + totalCompressedLength; +} + arrow::Result> InMemoryPayload::merge( std::unique_ptr source, std::unique_ptr append, @@ -404,9 +419,13 @@ arrow::Result> InMemoryPayload::merge( return std::make_unique(mergedRows, isValidityBuffer, std::move(merged)); } -arrow::Result> -InMemoryPayload::toBlockPayload(Payload::Type payloadType, arrow::MemoryPool* pool, arrow::util::Codec* codec) { - return BlockPayload::fromBuffers(payloadType, numRows_, std::move(buffers_), isValidityBuffer_, pool, codec); +arrow::Result> InMemoryPayload::toBlockPayload( + Payload::Type payloadType, + arrow::MemoryPool* pool, + arrow::util::Codec* codec, + std::shared_ptr compressed) { + return BlockPayload::fromBuffers( + payloadType, numRows_, std::move(buffers_), isValidityBuffer_, pool, codec, std::move(compressed)); } arrow::Status InMemoryPayload::serialize(arrow::io::OutputStream* outputStream) { diff --git a/cpp/core/shuffle/Payload.h b/cpp/core/shuffle/Payload.h index 1bd8815a4c2a7..ea8c897e96d0d 100644 --- a/cpp/core/shuffle/Payload.h +++ b/cpp/core/shuffle/Payload.h @@ -84,7 +84,8 @@ class BlockPayload final : public Payload { std::vector> buffers, const std::vector* isValidityBuffer, arrow::MemoryPool* pool, - arrow::util::Codec* codec); + arrow::util::Codec* codec, + std::shared_ptr compressed); static arrow::Result>> deserialize( arrow::io::InputStream* inputStream, @@ -93,6 +94,10 @@ class BlockPayload final : public Payload { uint32_t& numRows, int64_t& decompressTime); + static int64_t maxCompressedLength( + const std::vector>& buffers, + arrow::util::Codec* codec); + arrow::Status serialize(arrow::io::OutputStream* outputStream) override; arrow::Result> readBufferAt(uint32_t pos) override; @@ -131,8 +136,11 @@ class InMemoryPayload final : public Payload { arrow::Result> readBufferAt(uint32_t index) override; - arrow::Result> - toBlockPayload(Payload::Type payloadType, arrow::MemoryPool* pool, arrow::util::Codec* codec); + arrow::Result> toBlockPayload( + Payload::Type payloadType, + arrow::MemoryPool* pool, + arrow::util::Codec* codec, + std::shared_ptr compressed = nullptr); arrow::Status copyBuffers(arrow::MemoryPool* pool); diff --git a/cpp/core/shuffle/rss/RssPartitionWriter.cc b/cpp/core/shuffle/rss/RssPartitionWriter.cc index 8f75f999335fe..8a072365a296a 100644 --- a/cpp/core/shuffle/rss/RssPartitionWriter.cc +++ b/cpp/core/shuffle/rss/RssPartitionWriter.cc @@ -49,17 +49,41 @@ arrow::Status RssPartitionWriter::reclaimFixedSize(int64_t size, int64_t* actual return arrow::Status::OK(); } -arrow::Status RssPartitionWriter::evict( +arrow::Status RssPartitionWriter::hashEvict( uint32_t partitionId, std::unique_ptr inMemoryPayload, Evict::type evictType, bool reuseBuffers, - bool hasComplexType, + bool hasComplexType) { + return doEvict(partitionId, std::move(inMemoryPayload), nullptr); +} + +arrow::Status RssPartitionWriter::sortEvict( + uint32_t partitionId, + std::unique_ptr inMemoryPayload, + std::shared_ptr compressed, bool isFinal) { + return doEvict(partitionId, std::move(inMemoryPayload), std::move(compressed)); +} + +arrow::Status RssPartitionWriter::evict(uint32_t partitionId, std::unique_ptr blockPayload, bool) { + rawPartitionLengths_[partitionId] += blockPayload->rawSize(); + ScopedTimer timer(&spillTime_); + ARROW_ASSIGN_OR_RAISE(auto buffer, blockPayload->readBufferAt(0)); + bytesEvicted_[partitionId] += rssClient_->pushPartitionData(partitionId, buffer->data_as(), buffer->size()); + return arrow::Status::OK(); +} + +arrow::Status RssPartitionWriter::doEvict( + uint32_t partitionId, + std::unique_ptr inMemoryPayload, + std::shared_ptr compressed) { rawPartitionLengths_[partitionId] += inMemoryPayload->rawSize(); auto payloadType = codec_ ? Payload::Type::kCompressed : Payload::Type::kUncompressed; ARROW_ASSIGN_OR_RAISE( - auto payload, inMemoryPayload->toBlockPayload(payloadType, payloadPool_.get(), codec_ ? codec_.get() : nullptr)); + auto payload, + inMemoryPayload->toBlockPayload( + payloadType, payloadPool_.get(), codec_ ? codec_.get() : nullptr, std::move(compressed))); // Copy payload to arrow buffered os. ARROW_ASSIGN_OR_RAISE(auto rssBufferOs, arrow::io::BufferOutputStream::Create(options_.pushBufferMaxSize, pool_)); RETURN_NOT_OK(payload->serialize(rssBufferOs.get())); @@ -72,12 +96,4 @@ arrow::Status RssPartitionWriter::evict( partitionId, reinterpret_cast(const_cast(buffer->data())), buffer->size()); return arrow::Status::OK(); } - -arrow::Status RssPartitionWriter::evict(uint32_t partitionId, std::unique_ptr blockPayload, bool) { - rawPartitionLengths_[partitionId] += blockPayload->rawSize(); - ScopedTimer timer(&spillTime_); - ARROW_ASSIGN_OR_RAISE(auto buffer, blockPayload->readBufferAt(0)); - bytesEvicted_[partitionId] += rssClient_->pushPartitionData(partitionId, buffer->data_as(), buffer->size()); - return arrow::Status::OK(); -} } // namespace gluten diff --git a/cpp/core/shuffle/rss/RssPartitionWriter.h b/cpp/core/shuffle/rss/RssPartitionWriter.h index 01602e3369186..dbcbbbb881291 100644 --- a/cpp/core/shuffle/rss/RssPartitionWriter.h +++ b/cpp/core/shuffle/rss/RssPartitionWriter.h @@ -37,12 +37,17 @@ class RssPartitionWriter final : public PartitionWriter { init(); } - arrow::Status evict( + arrow::Status hashEvict( uint32_t partitionId, std::unique_ptr inMemoryPayload, Evict::type evictType, bool reuseBuffers, - bool hasComplexType, + bool hasComplexType) override; + + arrow::Status sortEvict( + uint32_t partitionId, + std::unique_ptr inMemoryPayload, + std::shared_ptr compressed, bool isFinal) override; arrow::Status evict(uint32_t partitionId, std::unique_ptr blockPayload, bool stop) override; @@ -54,6 +59,11 @@ class RssPartitionWriter final : public PartitionWriter { private: void init(); + arrow::Status doEvict( + uint32_t partitionId, + std::unique_ptr inMemoryPayload, + std::shared_ptr compressed); + std::shared_ptr rssClient_; std::vector bytesEvicted_; diff --git a/cpp/velox/shuffle/VeloxHashShuffleWriter.cc b/cpp/velox/shuffle/VeloxHashShuffleWriter.cc index 00d8be16656ef..49ed14245f94d 100644 --- a/cpp/velox/shuffle/VeloxHashShuffleWriter.cc +++ b/cpp/velox/shuffle/VeloxHashShuffleWriter.cc @@ -954,7 +954,7 @@ arrow::Status VeloxHashShuffleWriter::evictBuffers( if (!buffers.empty()) { auto payload = std::make_unique(numRows, &isValidityBuffer_, std::move(buffers)); RETURN_NOT_OK( - partitionWriter_->evict(partitionId, std::move(payload), Evict::kCache, reuseBuffers, hasComplexType_, false)); + partitionWriter_->hashEvict(partitionId, std::move(payload), Evict::kCache, reuseBuffers, hasComplexType_)); } return arrow::Status::OK(); } @@ -1114,8 +1114,9 @@ arrow::Status VeloxHashShuffleWriter::reclaimFixedSize(int64_t size, int64_t* ac int64_t reclaimed = 0; if (reclaimed < size) { + auto before = partitionBufferPool_->bytes_allocated(); ARROW_ASSIGN_OR_RAISE(auto cached, evictCachedPayload(size - reclaimed)); - reclaimed += cached; + reclaimed += cached + (before - partitionBufferPool_->bytes_allocated()); } if (reclaimed < size && shrinkPartitionBuffersAfterSpill()) { ARROW_ASSIGN_OR_RAISE(auto shrunken, shrinkPartitionBuffersMinSize(size - reclaimed)); @@ -1365,7 +1366,7 @@ arrow::Result VeloxHashShuffleWriter::evictPartitionBuffersMinSize(int6 ARROW_ASSIGN_OR_RAISE(auto buffers, assembleBuffers(pid, false)); auto payload = std::make_unique(item.second, &isValidityBuffer_, std::move(buffers)); metrics_.totalBytesToEvict += payload->rawSize(); - RETURN_NOT_OK(partitionWriter_->evict(pid, std::move(payload), Evict::kSpill, false, hasComplexType_, false)); + RETURN_NOT_OK(partitionWriter_->hashEvict(pid, std::move(payload), Evict::kSpill, false, hasComplexType_)); evicted = beforeEvict - partitionBufferPool_->bytes_allocated(); if (evicted >= size) { break; diff --git a/cpp/velox/shuffle/VeloxRssSortShuffleWriter.cc b/cpp/velox/shuffle/VeloxRssSortShuffleWriter.cc index d2fb5e7f577ce..4b3475547ac05 100644 --- a/cpp/velox/shuffle/VeloxRssSortShuffleWriter.cc +++ b/cpp/velox/shuffle/VeloxRssSortShuffleWriter.cc @@ -119,7 +119,8 @@ arrow::Status VeloxRssSortShuffleWriter::evictBatch(uint32_t partitionId) { auto buffer = bufferOutputStream_->getBuffer(); auto arrowBuffer = std::make_shared(buffer->as(), buffer->size()); ARROW_ASSIGN_OR_RAISE( - auto payload, BlockPayload::fromBuffers(Payload::kRaw, 0, {std::move(arrowBuffer)}, nullptr, nullptr, nullptr)); + auto payload, + BlockPayload::fromBuffers(Payload::kRaw, 0, {std::move(arrowBuffer)}, nullptr, nullptr, nullptr, nullptr)); RETURN_NOT_OK(partitionWriter_->evict(partitionId, std::move(payload), stopped_)); batch_ = std::make_unique(veloxPool_.get(), serde_.get()); batch_->createStreamTree(rowType_, options_.bufferSize, &serdeOptions_); diff --git a/cpp/velox/shuffle/VeloxSortShuffleWriter.cc b/cpp/velox/shuffle/VeloxSortShuffleWriter.cc index 2bfc4908d2f66..3cb5ab84f7a19 100644 --- a/cpp/velox/shuffle/VeloxSortShuffleWriter.cc +++ b/cpp/velox/shuffle/VeloxSortShuffleWriter.cc @@ -106,8 +106,11 @@ arrow::Status VeloxSortShuffleWriter::init() { options_.partitioning == Partitioning::kSingle, arrow::Status::Invalid("VeloxSortShuffleWriter doesn't support single partition.")); allocateMinimalArray(); - sortedBuffer_ = facebook::velox::AlignedBuffer::allocate(kSortedBufferSize, veloxPool_.get()); - rawBuffer_ = sortedBuffer_->asMutable(); + ARROW_ASSIGN_OR_RAISE(sortedBuffer_, arrow::AllocateBuffer(kSortedBufferSize, pool_)); + rawBuffer_ = sortedBuffer_->mutable_data(); + ARROW_ASSIGN_OR_RAISE( + compressedBuffer_, + partitionWriter_->getCompressedBuffer({std::make_shared(rawBuffer_, kSortedBufferSize)}, pool_)); return arrow::Status::OK(); } @@ -303,8 +306,7 @@ arrow::Status VeloxSortShuffleWriter::evictPartition0(uint32_t partitionId, uint nullptr, std::vector>{std::make_shared(rawBuffer_, rawLength)}); updateSpillMetrics(payload); - RETURN_NOT_OK( - partitionWriter_->evict(partitionId, std::move(payload), Evict::type::kSortSpill, false, false, stopped_)); + RETURN_NOT_OK(partitionWriter_->sortEvict(partitionId, std::move(payload), compressedBuffer_, stopped_)); return arrow::Status::OK(); } diff --git a/cpp/velox/shuffle/VeloxSortShuffleWriter.h b/cpp/velox/shuffle/VeloxSortShuffleWriter.h index 1626573a7dc6e..e65af72247a69 100644 --- a/cpp/velox/shuffle/VeloxSortShuffleWriter.h +++ b/cpp/velox/shuffle/VeloxSortShuffleWriter.h @@ -107,8 +107,9 @@ class VeloxSortShuffleWriter final : public VeloxShuffleWriter { // For debug. uint32_t currenPageSize_; - facebook::velox::BufferPtr sortedBuffer_; + std::unique_ptr sortedBuffer_; uint8_t* rawBuffer_; + std::shared_ptr compressedBuffer_; // Row ID -> Partition ID // subscript: The index of row in the current input RowVector diff --git a/cpp/velox/tests/VeloxShuffleWriterTest.cc b/cpp/velox/tests/VeloxShuffleWriterTest.cc index 7cbfbcd79cc95..59500f4c9dffb 100644 --- a/cpp/velox/tests/VeloxShuffleWriterTest.cc +++ b/cpp/velox/tests/VeloxShuffleWriterTest.cc @@ -588,8 +588,10 @@ TEST_F(VeloxHashShuffleWriterMemoryTest, kSplitSingle) { TEST_F(VeloxHashShuffleWriterMemoryTest, kStop) { for (const auto partitioning : {Partitioning::kSingle, Partitioning::kRoundRobin}) { ASSERT_NOT_OK(initShuffleWriterOptions()); - shuffleWriterOptions_.partitioning = partitioning; - shuffleWriterOptions_.bufferSize = 4; + shuffleWriterOptions_.bufferSize = 4096; + // Force compression. + partitionWriterOptions_.compressionThreshold = 0; + partitionWriterOptions_.mergeThreshold = 0; auto pool = SelfEvictedMemoryPool(defaultArrowMemoryPool().get(), false); auto shuffleWriter = createShuffleWriter(&pool); @@ -597,19 +599,23 @@ TEST_F(VeloxHashShuffleWriterMemoryTest, kStop) { for (int i = 0; i < 10; ++i) { ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_)); - ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector2_)); - ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_)); } + // Reclaim bytes to shrink partition buffer. + int64_t reclaimed = 0; + ASSERT_NOT_OK(shuffleWriter->reclaimFixedSize(2000, &reclaimed)); + ASSERT(reclaimed >= 2000); // Trigger spill during stop. - // For single partitioning, spill is triggered by allocating buffered output stream. ASSERT_TRUE(pool.checkEvict(pool.bytes_allocated(), [&] { ASSERT_NOT_OK(shuffleWriter->stop()); })); } } TEST_F(VeloxHashShuffleWriterMemoryTest, kStopComplex) { ASSERT_NOT_OK(initShuffleWriterOptions()); - shuffleWriterOptions_.bufferSize = 4; + shuffleWriterOptions_.bufferSize = 4096; + // Force compression. + partitionWriterOptions_.compressionThreshold = 0; + partitionWriterOptions_.mergeThreshold = 0; auto pool = SelfEvictedMemoryPool(defaultArrowMemoryPool().get(), false); auto shuffleWriter = createShuffleWriter(&pool); @@ -617,6 +623,10 @@ TEST_F(VeloxHashShuffleWriterMemoryTest, kStopComplex) { for (int i = 0; i < 3; ++i) { ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVectorComplex_)); } + // Reclaim bytes to shrink partition buffer. + int64_t reclaimed = 0; + ASSERT_NOT_OK(shuffleWriter->reclaimFixedSize(2000, &reclaimed)); + ASSERT(reclaimed >= 2000); // Reclaim from PartitionWriter to free cached bytes. auto payloadSize = shuffleWriter->cachedPayloadSize(); diff --git a/cpp/velox/utils/tests/MemoryPoolUtils.cc b/cpp/velox/utils/tests/MemoryPoolUtils.cc index a595d011952ff..5a0ae03b14963 100644 --- a/cpp/velox/utils/tests/MemoryPoolUtils.cc +++ b/cpp/velox/utils/tests/MemoryPoolUtils.cc @@ -128,6 +128,7 @@ int64_t SelfEvictedMemoryPool::num_allocations() const { arrow::Status SelfEvictedMemoryPool::ensureCapacity(int64_t size) { VELOX_CHECK_NOT_NULL(evictable_); + DLOG(INFO) << "Size: " << size << ", capacity_: " << capacity_ << ", bytes allocated: " << pool_->bytes_allocated(); if (size > capacity_ - pool_->bytes_allocated()) { // Self evict. int64_t actual;