diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc index 53af168508ba1..2f8158d527b6e 100644 --- a/cpp/core/jni/JniWrapper.cc +++ b/cpp/core/jni/JniWrapper.cc @@ -773,6 +773,7 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_ShuffleWriterJniWrappe jstring codecJstr, jstring codecBackendJstr, jint compressionLevel, + jint compressionBufferSize, jint compressionThreshold, jstring compressionModeJstr, jint sortBufferInitialSize, @@ -804,6 +805,7 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_ShuffleWriterJniWrappe .startPartitionId = startPartitionId, .shuffleWriterType = gluten::ShuffleWriter::stringToType(jStringToCString(env, shuffleWriterTypeJstr)), .sortBufferInitialSize = sortBufferInitialSize, + .compressionBufferSize = compressionBufferSize, .useRadixSort = static_cast(useRadixSort)}; // Build PartitionWriterOptions. diff --git a/cpp/core/shuffle/LocalPartitionWriter.cc b/cpp/core/shuffle/LocalPartitionWriter.cc index fe206b488cf8f..f0edfa257357c 100644 --- a/cpp/core/shuffle/LocalPartitionWriter.cc +++ b/cpp/core/shuffle/LocalPartitionWriter.cc @@ -385,17 +385,15 @@ std::string LocalPartitionWriter::nextSpilledFileDir() { return spilledFileDir; } -arrow::Status LocalPartitionWriter::openDataFile() { - // open data file output stream +arrow::Result> LocalPartitionWriter::openFile(const std::string& file) { std::shared_ptr fout; - ARROW_ASSIGN_OR_RAISE(fout, arrow::io::FileOutputStream::Open(dataFile_)); + ARROW_ASSIGN_OR_RAISE(fout, arrow::io::FileOutputStream::Open(file)); if (options_.bufferedWrite) { - // Output stream buffer is neither partition buffer memory nor ipc memory. - ARROW_ASSIGN_OR_RAISE(dataFileOs_, arrow::io::BufferedOutputStream::Create(16384, pool_, fout)); - } else { - dataFileOs_ = fout; + // The 16k bytes is a temporary allocation and will be freed with file close. + // Use default memory pool and count treat the memory as executor memory overhead to avoid unnecessary spill. + return arrow::io::BufferedOutputStream::Create(16384, arrow::default_memory_pool(), fout); } - return arrow::Status::OK(); + return fout; } arrow::Status LocalPartitionWriter::clearResource() { @@ -467,9 +465,7 @@ arrow::Status LocalPartitionWriter::stop(ShuffleWriterMetrics* metrics) { compressTime_ += spill->compressTime(); } else { RETURN_NOT_OK(finishSpill(true)); - // Open final data file. - // If options_.bufferedWrite is set, it will acquire 16KB memory that can trigger spill. - RETURN_NOT_OK(openDataFile()); + ARROW_ASSIGN_OR_RAISE(dataFileOs_, openFile(dataFile_)); int64_t endInFinalFile = 0; DLOG(INFO) << "LocalPartitionWriter stopped. Total spills: " << spills_.size(); @@ -523,14 +519,13 @@ arrow::Status LocalPartitionWriter::requestSpill(bool isFinal) { std::string spillFile; std::shared_ptr os; if (isFinal) { - RETURN_NOT_OK(openDataFile()); + ARROW_ASSIGN_OR_RAISE(dataFileOs_, openFile(dataFile_)); spillFile = dataFile_; os = dataFileOs_; useSpillFileAsDataFile_ = true; } else { ARROW_ASSIGN_OR_RAISE(spillFile, createTempShuffleFile(nextSpilledFileDir())); - ARROW_ASSIGN_OR_RAISE(auto raw, arrow::io::FileOutputStream::Open(spillFile, true)); - ARROW_ASSIGN_OR_RAISE(os, arrow::io::BufferedOutputStream::Create(16384, pool_, raw)); + ARROW_ASSIGN_OR_RAISE(os, openFile(spillFile)); } spiller_ = std::make_unique( os, std::move(spillFile), options_.compressionThreshold, payloadPool_.get(), codec_.get()); @@ -548,42 +543,14 @@ arrow::Status LocalPartitionWriter::finishSpill(bool close) { return arrow::Status::OK(); } -arrow::Status LocalPartitionWriter::evict( +arrow::Status LocalPartitionWriter::hashEvict( uint32_t partitionId, std::unique_ptr inMemoryPayload, Evict::type evictType, bool reuseBuffers, - bool hasComplexType, - bool isFinal) { + bool hasComplexType) { rawPartitionLengths_[partitionId] += inMemoryPayload->rawSize(); - if (evictType == Evict::kSortSpill) { - if (lastEvictPid_ != -1 && (partitionId < lastEvictPid_ || (isFinal && !dataFileOs_))) { - lastEvictPid_ = -1; - RETURN_NOT_OK(finishSpill(true)); - } - RETURN_NOT_OK(requestSpill(isFinal)); - - auto payloadType = codec_ ? Payload::Type::kCompressed : Payload::Type::kUncompressed; - ARROW_ASSIGN_OR_RAISE( - auto payload, - inMemoryPayload->toBlockPayload(payloadType, payloadPool_.get(), codec_ ? codec_.get() : nullptr)); - if (!isFinal) { - RETURN_NOT_OK(spiller_->spill(partitionId, std::move(payload))); - } else { - if (spills_.size() > 0) { - for (auto pid = lastEvictPid_ + 1; pid <= partitionId; ++pid) { - auto bytesEvicted = totalBytesEvicted_; - RETURN_NOT_OK(mergeSpills(pid)); - partitionLengths_[pid] = totalBytesEvicted_ - bytesEvicted; - } - } - RETURN_NOT_OK(spiller_->spill(partitionId, std::move(payload))); - } - lastEvictPid_ = partitionId; - return arrow::Status::OK(); - } - if (evictType == Evict::kSpill) { RETURN_NOT_OK(requestSpill(false)); ARROW_ASSIGN_OR_RAISE( @@ -609,6 +576,40 @@ arrow::Status LocalPartitionWriter::evict( return arrow::Status::OK(); } +arrow::Status LocalPartitionWriter::sortEvict( + uint32_t partitionId, + std::unique_ptr inMemoryPayload, + std::shared_ptr compressed, + bool isFinal) { + rawPartitionLengths_[partitionId] += inMemoryPayload->rawSize(); + + if (lastEvictPid_ != -1 && (partitionId < lastEvictPid_ || (isFinal && !dataFileOs_))) { + lastEvictPid_ = -1; + RETURN_NOT_OK(finishSpill(true)); + } + RETURN_NOT_OK(requestSpill(isFinal)); + + auto payloadType = codec_ ? Payload::Type::kCompressed : Payload::Type::kUncompressed; + ARROW_ASSIGN_OR_RAISE( + auto payload, + inMemoryPayload->toBlockPayload( + payloadType, payloadPool_.get(), codec_ ? codec_.get() : nullptr, std::move(compressed))); + if (!isFinal) { + RETURN_NOT_OK(spiller_->spill(partitionId, std::move(payload))); + } else { + if (spills_.size() > 0) { + for (auto pid = lastEvictPid_ + 1; pid <= partitionId; ++pid) { + auto bytesEvicted = totalBytesEvicted_; + RETURN_NOT_OK(mergeSpills(pid)); + partitionLengths_[pid] = totalBytesEvicted_ - bytesEvicted; + } + } + RETURN_NOT_OK(spiller_->spill(partitionId, std::move(payload))); + } + lastEvictPid_ = partitionId; + return arrow::Status::OK(); +} + // FIXME: Remove this code path for local partition writer. arrow::Status LocalPartitionWriter::evict(uint32_t partitionId, std::unique_ptr blockPayload, bool stop) { rawPartitionLengths_[partitionId] += blockPayload->rawSize(); @@ -644,8 +645,7 @@ arrow::Status LocalPartitionWriter::reclaimFixedSize(int64_t size, int64_t* actu if (payloadCache_ && payloadCache_->canSpill()) { auto beforeSpill = payloadPool_->bytes_allocated(); ARROW_ASSIGN_OR_RAISE(auto spillFile, createTempShuffleFile(nextSpilledFileDir())); - ARROW_ASSIGN_OR_RAISE(auto raw, arrow::io::FileOutputStream::Open(spillFile, true)); - ARROW_ASSIGN_OR_RAISE(auto os, arrow::io::BufferedOutputStream::Create(16384, pool_, raw)); + ARROW_ASSIGN_OR_RAISE(auto os, openFile(spillFile)); spills_.emplace_back(); ARROW_ASSIGN_OR_RAISE( spills_.back(), diff --git a/cpp/core/shuffle/LocalPartitionWriter.h b/cpp/core/shuffle/LocalPartitionWriter.h index 555632fedd5dd..826eb22f7431f 100644 --- a/cpp/core/shuffle/LocalPartitionWriter.h +++ b/cpp/core/shuffle/LocalPartitionWriter.h @@ -35,12 +35,17 @@ class LocalPartitionWriter : public PartitionWriter { const std::string& dataFile, const std::vector& localDirs); - arrow::Status evict( + arrow::Status hashEvict( uint32_t partitionId, std::unique_ptr inMemoryPayload, Evict::type evictType, bool reuseBuffers, - bool hasComplexType, + bool hasComplexType) override; + + arrow::Status sortEvict( + uint32_t partitionId, + std::unique_ptr inMemoryPayload, + std::shared_ptr compressed, bool isFinal) override; arrow::Status evict(uint32_t partitionId, std::unique_ptr blockPayload, bool stop) override; @@ -87,7 +92,7 @@ class LocalPartitionWriter : public PartitionWriter { std::string nextSpilledFileDir(); - arrow::Status openDataFile(); + arrow::Result> openFile(const std::string& file); arrow::Status mergeSpills(uint32_t partitionId); diff --git a/cpp/core/shuffle/Options.h b/cpp/core/shuffle/Options.h index 2424ec557742b..a3dc9f6260b0c 100644 --- a/cpp/core/shuffle/Options.h +++ b/cpp/core/shuffle/Options.h @@ -30,6 +30,7 @@ static constexpr int64_t kDefaultSortBufferThreshold = 64 << 20; static constexpr int64_t kDefaultPushMemoryThreshold = 4096; static constexpr int32_t kDefaultNumSubDirs = 64; static constexpr int32_t kDefaultCompressionThreshold = 100; +static constexpr int32_t kDefaultCompressionBufferSize = 32 * 1024; static const std::string kDefaultCompressionTypeStr = "lz4"; static constexpr int32_t kDefaultBufferAlignment = 64; static constexpr double kDefaultBufferReallocThreshold = 0.25; @@ -62,6 +63,7 @@ struct ShuffleWriterOptions { // Sort shuffle writer. int32_t sortBufferInitialSize = kDefaultSortBufferSize; + int32_t compressionBufferSize = kDefaultCompressionBufferSize; bool useRadixSort = kDefaultUseRadixSort; }; diff --git a/cpp/core/shuffle/PartitionWriter.h b/cpp/core/shuffle/PartitionWriter.h index 3774198c996e7..3a44d38365816 100644 --- a/cpp/core/shuffle/PartitionWriter.h +++ b/cpp/core/shuffle/PartitionWriter.h @@ -26,7 +26,7 @@ namespace gluten { struct Evict { - enum type { kCache, kSpill, kSortSpill }; + enum type { kCache, kSpill }; }; class PartitionWriter : public Reclaimable { @@ -42,14 +42,26 @@ class PartitionWriter : public Reclaimable { virtual arrow::Status stop(ShuffleWriterMetrics* metrics) = 0; /// Evict buffers for `partitionId` partition. - virtual arrow::Status evict( + virtual arrow::Status hashEvict( uint32_t partitionId, std::unique_ptr inMemoryPayload, Evict::type evictType, bool reuseBuffers, - bool hasComplexType, + bool hasComplexType) = 0; + + virtual arrow::Status sortEvict( + uint32_t partitionId, + std::unique_ptr inMemoryPayload, + std::shared_ptr compressed, bool isFinal) = 0; + std::optional getCompressedBufferLength(const std::vector>& buffers) { + if (!codec_) { + return std::nullopt; + } + return BlockPayload::maxCompressedLength(buffers, codec_.get()); + } + virtual arrow::Status evict(uint32_t partitionId, std::unique_ptr blockPayload, bool stop) = 0; uint64_t cachedPayloadSize() { diff --git a/cpp/core/shuffle/Payload.cc b/cpp/core/shuffle/Payload.cc index d0c24e4bcabaf..62e1823fe8745 100644 --- a/cpp/core/shuffle/Payload.cc +++ b/cpp/core/shuffle/Payload.cc @@ -186,29 +186,30 @@ arrow::Result> BlockPayload::fromBuffers( std::vector> buffers, const std::vector* isValidityBuffer, arrow::MemoryPool* pool, - arrow::util::Codec* codec) { + arrow::util::Codec* codec, + std::shared_ptr compressed) { if (payloadType == Payload::Type::kCompressed) { Timer compressionTime; compressionTime.start(); // Compress. - // Compressed buffer layout: | buffer1 compressedLength | buffer1 uncompressedLength | buffer1 | ... - const auto metadataLength = sizeof(int64_t) * 2 * buffers.size(); - int64_t totalCompressedLength = - std::accumulate(buffers.begin(), buffers.end(), 0LL, [&](auto sum, const auto& buffer) { - if (!buffer) { - return sum; - } - return sum + codec->MaxCompressedLen(buffer->size(), buffer->data()); - }); - const auto maxCompressedLength = metadataLength + totalCompressedLength; - ARROW_ASSIGN_OR_RAISE( - std::shared_ptr compressed, arrow::AllocateResizableBuffer(maxCompressedLength, pool)); - - auto output = compressed->mutable_data(); + auto maxLength = maxCompressedLength(buffers, codec); + std::shared_ptr compressedBuffer; + uint8_t* output; + if (compressed) { + ARROW_RETURN_IF( + compressed->size() < maxLength, + arrow::Status::Invalid( + "Compressed buffer length < maxCompressedLength. (", compressed->size(), " vs ", maxLength, ")")); + output = const_cast(compressed->data()); + } else { + ARROW_ASSIGN_OR_RAISE(compressedBuffer, arrow::AllocateResizableBuffer(maxLength, pool)); + output = compressedBuffer->mutable_data(); + } + int64_t actualLength = 0; // Compress buffers one by one. for (auto& buffer : buffers) { - auto availableLength = maxCompressedLength - actualLength; + auto availableLength = maxLength - actualLength; // Release buffer after compression. ARROW_ASSIGN_OR_RAISE(auto compressedSize, compressBuffer(std::move(buffer), output, availableLength, codec)); output += compressedSize; @@ -216,15 +217,14 @@ arrow::Result> BlockPayload::fromBuffers( } ARROW_RETURN_IF(actualLength < 0, arrow::Status::Invalid("Writing compressed buffer out of bound.")); - RETURN_NOT_OK(compressed->Resize(actualLength)); + if (compressed) { + compressedBuffer = std::make_shared(compressed->data(), actualLength); + } else { + RETURN_NOT_OK(std::dynamic_pointer_cast(compressedBuffer)->Resize(actualLength)); + } compressionTime.stop(); - auto payload = std::unique_ptr(new BlockPayload( - Type::kCompressed, - numRows, - std::vector>{compressed}, - isValidityBuffer, - pool, - codec)); + auto payload = std::unique_ptr( + new BlockPayload(Type::kCompressed, numRows, {compressedBuffer}, isValidityBuffer, pool, codec)); payload->setCompressionTime(compressionTime.realTimeUsed()); return payload; } @@ -329,6 +329,21 @@ int64_t BlockPayload::rawSize() { return getBufferSize(buffers_); } +int64_t BlockPayload::maxCompressedLength( + const std::vector>& buffers, + arrow::util::Codec* codec) { + // Compressed buffer layout: | buffer1 compressedLength | buffer1 uncompressedLength | buffer1 | ... + const auto metadataLength = sizeof(int64_t) * 2 * buffers.size(); + int64_t totalCompressedLength = + std::accumulate(buffers.begin(), buffers.end(), 0LL, [&](auto sum, const auto& buffer) { + if (!buffer) { + return sum; + } + return sum + codec->MaxCompressedLen(buffer->size(), buffer->data()); + }); + return metadataLength + totalCompressedLength; +} + arrow::Result> InMemoryPayload::merge( std::unique_ptr source, std::unique_ptr append, @@ -404,9 +419,13 @@ arrow::Result> InMemoryPayload::merge( return std::make_unique(mergedRows, isValidityBuffer, std::move(merged)); } -arrow::Result> -InMemoryPayload::toBlockPayload(Payload::Type payloadType, arrow::MemoryPool* pool, arrow::util::Codec* codec) { - return BlockPayload::fromBuffers(payloadType, numRows_, std::move(buffers_), isValidityBuffer_, pool, codec); +arrow::Result> InMemoryPayload::toBlockPayload( + Payload::Type payloadType, + arrow::MemoryPool* pool, + arrow::util::Codec* codec, + std::shared_ptr compressed) { + return BlockPayload::fromBuffers( + payloadType, numRows_, std::move(buffers_), isValidityBuffer_, pool, codec, std::move(compressed)); } arrow::Status InMemoryPayload::serialize(arrow::io::OutputStream* outputStream) { diff --git a/cpp/core/shuffle/Payload.h b/cpp/core/shuffle/Payload.h index 1bd8815a4c2a7..ea8c897e96d0d 100644 --- a/cpp/core/shuffle/Payload.h +++ b/cpp/core/shuffle/Payload.h @@ -84,7 +84,8 @@ class BlockPayload final : public Payload { std::vector> buffers, const std::vector* isValidityBuffer, arrow::MemoryPool* pool, - arrow::util::Codec* codec); + arrow::util::Codec* codec, + std::shared_ptr compressed); static arrow::Result>> deserialize( arrow::io::InputStream* inputStream, @@ -93,6 +94,10 @@ class BlockPayload final : public Payload { uint32_t& numRows, int64_t& decompressTime); + static int64_t maxCompressedLength( + const std::vector>& buffers, + arrow::util::Codec* codec); + arrow::Status serialize(arrow::io::OutputStream* outputStream) override; arrow::Result> readBufferAt(uint32_t pos) override; @@ -131,8 +136,11 @@ class InMemoryPayload final : public Payload { arrow::Result> readBufferAt(uint32_t index) override; - arrow::Result> - toBlockPayload(Payload::Type payloadType, arrow::MemoryPool* pool, arrow::util::Codec* codec); + arrow::Result> toBlockPayload( + Payload::Type payloadType, + arrow::MemoryPool* pool, + arrow::util::Codec* codec, + std::shared_ptr compressed = nullptr); arrow::Status copyBuffers(arrow::MemoryPool* pool); diff --git a/cpp/core/shuffle/rss/RssPartitionWriter.cc b/cpp/core/shuffle/rss/RssPartitionWriter.cc index 8f75f999335fe..8a072365a296a 100644 --- a/cpp/core/shuffle/rss/RssPartitionWriter.cc +++ b/cpp/core/shuffle/rss/RssPartitionWriter.cc @@ -49,17 +49,41 @@ arrow::Status RssPartitionWriter::reclaimFixedSize(int64_t size, int64_t* actual return arrow::Status::OK(); } -arrow::Status RssPartitionWriter::evict( +arrow::Status RssPartitionWriter::hashEvict( uint32_t partitionId, std::unique_ptr inMemoryPayload, Evict::type evictType, bool reuseBuffers, - bool hasComplexType, + bool hasComplexType) { + return doEvict(partitionId, std::move(inMemoryPayload), nullptr); +} + +arrow::Status RssPartitionWriter::sortEvict( + uint32_t partitionId, + std::unique_ptr inMemoryPayload, + std::shared_ptr compressed, bool isFinal) { + return doEvict(partitionId, std::move(inMemoryPayload), std::move(compressed)); +} + +arrow::Status RssPartitionWriter::evict(uint32_t partitionId, std::unique_ptr blockPayload, bool) { + rawPartitionLengths_[partitionId] += blockPayload->rawSize(); + ScopedTimer timer(&spillTime_); + ARROW_ASSIGN_OR_RAISE(auto buffer, blockPayload->readBufferAt(0)); + bytesEvicted_[partitionId] += rssClient_->pushPartitionData(partitionId, buffer->data_as(), buffer->size()); + return arrow::Status::OK(); +} + +arrow::Status RssPartitionWriter::doEvict( + uint32_t partitionId, + std::unique_ptr inMemoryPayload, + std::shared_ptr compressed) { rawPartitionLengths_[partitionId] += inMemoryPayload->rawSize(); auto payloadType = codec_ ? Payload::Type::kCompressed : Payload::Type::kUncompressed; ARROW_ASSIGN_OR_RAISE( - auto payload, inMemoryPayload->toBlockPayload(payloadType, payloadPool_.get(), codec_ ? codec_.get() : nullptr)); + auto payload, + inMemoryPayload->toBlockPayload( + payloadType, payloadPool_.get(), codec_ ? codec_.get() : nullptr, std::move(compressed))); // Copy payload to arrow buffered os. ARROW_ASSIGN_OR_RAISE(auto rssBufferOs, arrow::io::BufferOutputStream::Create(options_.pushBufferMaxSize, pool_)); RETURN_NOT_OK(payload->serialize(rssBufferOs.get())); @@ -72,12 +96,4 @@ arrow::Status RssPartitionWriter::evict( partitionId, reinterpret_cast(const_cast(buffer->data())), buffer->size()); return arrow::Status::OK(); } - -arrow::Status RssPartitionWriter::evict(uint32_t partitionId, std::unique_ptr blockPayload, bool) { - rawPartitionLengths_[partitionId] += blockPayload->rawSize(); - ScopedTimer timer(&spillTime_); - ARROW_ASSIGN_OR_RAISE(auto buffer, blockPayload->readBufferAt(0)); - bytesEvicted_[partitionId] += rssClient_->pushPartitionData(partitionId, buffer->data_as(), buffer->size()); - return arrow::Status::OK(); -} } // namespace gluten diff --git a/cpp/core/shuffle/rss/RssPartitionWriter.h b/cpp/core/shuffle/rss/RssPartitionWriter.h index 01602e3369186..dbcbbbb881291 100644 --- a/cpp/core/shuffle/rss/RssPartitionWriter.h +++ b/cpp/core/shuffle/rss/RssPartitionWriter.h @@ -37,12 +37,17 @@ class RssPartitionWriter final : public PartitionWriter { init(); } - arrow::Status evict( + arrow::Status hashEvict( uint32_t partitionId, std::unique_ptr inMemoryPayload, Evict::type evictType, bool reuseBuffers, - bool hasComplexType, + bool hasComplexType) override; + + arrow::Status sortEvict( + uint32_t partitionId, + std::unique_ptr inMemoryPayload, + std::shared_ptr compressed, bool isFinal) override; arrow::Status evict(uint32_t partitionId, std::unique_ptr blockPayload, bool stop) override; @@ -54,6 +59,11 @@ class RssPartitionWriter final : public PartitionWriter { private: void init(); + arrow::Status doEvict( + uint32_t partitionId, + std::unique_ptr inMemoryPayload, + std::shared_ptr compressed); + std::shared_ptr rssClient_; std::vector bytesEvicted_; diff --git a/cpp/velox/shuffle/VeloxHashShuffleWriter.cc b/cpp/velox/shuffle/VeloxHashShuffleWriter.cc index 00d8be16656ef..49ed14245f94d 100644 --- a/cpp/velox/shuffle/VeloxHashShuffleWriter.cc +++ b/cpp/velox/shuffle/VeloxHashShuffleWriter.cc @@ -954,7 +954,7 @@ arrow::Status VeloxHashShuffleWriter::evictBuffers( if (!buffers.empty()) { auto payload = std::make_unique(numRows, &isValidityBuffer_, std::move(buffers)); RETURN_NOT_OK( - partitionWriter_->evict(partitionId, std::move(payload), Evict::kCache, reuseBuffers, hasComplexType_, false)); + partitionWriter_->hashEvict(partitionId, std::move(payload), Evict::kCache, reuseBuffers, hasComplexType_)); } return arrow::Status::OK(); } @@ -1114,8 +1114,9 @@ arrow::Status VeloxHashShuffleWriter::reclaimFixedSize(int64_t size, int64_t* ac int64_t reclaimed = 0; if (reclaimed < size) { + auto before = partitionBufferPool_->bytes_allocated(); ARROW_ASSIGN_OR_RAISE(auto cached, evictCachedPayload(size - reclaimed)); - reclaimed += cached; + reclaimed += cached + (before - partitionBufferPool_->bytes_allocated()); } if (reclaimed < size && shrinkPartitionBuffersAfterSpill()) { ARROW_ASSIGN_OR_RAISE(auto shrunken, shrinkPartitionBuffersMinSize(size - reclaimed)); @@ -1365,7 +1366,7 @@ arrow::Result VeloxHashShuffleWriter::evictPartitionBuffersMinSize(int6 ARROW_ASSIGN_OR_RAISE(auto buffers, assembleBuffers(pid, false)); auto payload = std::make_unique(item.second, &isValidityBuffer_, std::move(buffers)); metrics_.totalBytesToEvict += payload->rawSize(); - RETURN_NOT_OK(partitionWriter_->evict(pid, std::move(payload), Evict::kSpill, false, hasComplexType_, false)); + RETURN_NOT_OK(partitionWriter_->hashEvict(pid, std::move(payload), Evict::kSpill, false, hasComplexType_)); evicted = beforeEvict - partitionBufferPool_->bytes_allocated(); if (evicted >= size) { break; diff --git a/cpp/velox/shuffle/VeloxRssSortShuffleWriter.cc b/cpp/velox/shuffle/VeloxRssSortShuffleWriter.cc index d2fb5e7f577ce..4b3475547ac05 100644 --- a/cpp/velox/shuffle/VeloxRssSortShuffleWriter.cc +++ b/cpp/velox/shuffle/VeloxRssSortShuffleWriter.cc @@ -119,7 +119,8 @@ arrow::Status VeloxRssSortShuffleWriter::evictBatch(uint32_t partitionId) { auto buffer = bufferOutputStream_->getBuffer(); auto arrowBuffer = std::make_shared(buffer->as(), buffer->size()); ARROW_ASSIGN_OR_RAISE( - auto payload, BlockPayload::fromBuffers(Payload::kRaw, 0, {std::move(arrowBuffer)}, nullptr, nullptr, nullptr)); + auto payload, + BlockPayload::fromBuffers(Payload::kRaw, 0, {std::move(arrowBuffer)}, nullptr, nullptr, nullptr, nullptr)); RETURN_NOT_OK(partitionWriter_->evict(partitionId, std::move(payload), stopped_)); batch_ = std::make_unique(veloxPool_.get(), serde_.get()); batch_->createStreamTree(rowType_, options_.bufferSize, &serdeOptions_); diff --git a/cpp/velox/shuffle/VeloxShuffleReader.cc b/cpp/velox/shuffle/VeloxShuffleReader.cc index 3966857b9e9d0..4eec461e6532d 100644 --- a/cpp/velox/shuffle/VeloxShuffleReader.cc +++ b/cpp/velox/shuffle/VeloxShuffleReader.cc @@ -314,7 +314,7 @@ std::shared_ptr VeloxHashShuffleReaderDeserializer::next() { uint32_t numRows; GLUTEN_ASSIGN_OR_THROW( auto arrowBuffers, BlockPayload::deserialize(in_.get(), codec_, memoryPool_, numRows, decompressTime_)); - if (numRows == 0) { + if (arrowBuffers.empty()) { // Reach EOS. return nullptr; } @@ -333,7 +333,7 @@ std::shared_ptr VeloxHashShuffleReaderDeserializer::next() { while (!merged_ || merged_->numRows() < batchSize_) { GLUTEN_ASSIGN_OR_THROW( arrowBuffers, BlockPayload::deserialize(in_.get(), codec_, memoryPool_, numRows, decompressTime_)); - if (numRows == 0) { + if (arrowBuffers.empty()) { reachEos_ = true; break; } @@ -403,16 +403,22 @@ std::shared_ptr VeloxSortShuffleReaderDeserializer::next() { GLUTEN_ASSIGN_OR_THROW( auto arrowBuffers, BlockPayload::deserialize(in_.get(), codec_, arrowPool_, numRows, decompressTime_)); - if (numRows == 0) { + if (arrowBuffers.empty()) { reachEos_ = true; if (cachedRows_ > 0) { return deserializeToBatch(); } return nullptr; } - auto buffer = std::move(arrowBuffers[0]); - cachedInputs_.emplace_back(numRows, wrapInBufferViewAsOwner(buffer->data(), buffer->size(), buffer)); - cachedRows_ += numRows; + + if (numRows > 0) { + auto buffer = std::move(arrowBuffers[0]); + cachedInputs_.emplace_back(numRows, wrapInBufferViewAsOwner(buffer->data(), buffer->size(), buffer)); + cachedRows_ += numRows; + } else { + // numRows = 0 indicates that we read a segment of a large row. + readLargeRow(arrowBuffers); + } } return deserializeToBatch(); } @@ -451,6 +457,35 @@ std::shared_ptr VeloxSortShuffleReaderDeserializer::deserializeTo return std::make_shared(std::move(rowVector)); } +void VeloxSortShuffleReaderDeserializer::readLargeRow(std::vector>& arrowBuffers) { + // Cache the read segment. + std::vector> buffers; + auto rowSize = *reinterpret_cast(const_cast(arrowBuffers[0]->data())); + RowSizeType bufferSize = arrowBuffers[0]->size(); + buffers.emplace_back(std::move(arrowBuffers[0])); + // Read and cache the remaining segments. + uint32_t numRows; + while (bufferSize < rowSize) { + GLUTEN_ASSIGN_OR_THROW( + arrowBuffers, BlockPayload::deserialize(in_.get(), codec_, arrowPool_, numRows, decompressTime_)); + VELOX_DCHECK_EQ(numRows, 0); + bufferSize += arrowBuffers[0]->size(); + buffers.emplace_back(std::move(arrowBuffers[0])); + } + VELOX_CHECK_EQ(bufferSize, rowSize); + // Merge all segments. + GLUTEN_ASSIGN_OR_THROW(std::shared_ptr rowBuffer, arrow::AllocateBuffer(rowSize, arrowPool_)); + RowSizeType bytes = 0; + auto* dst = rowBuffer->mutable_data(); + for (const auto& buffer : buffers) { + VELOX_DCHECK_NOT_NULL(buffer); + gluten::fastCopy(dst + bytes, buffer->data(), buffer->size()); + bytes += buffer->size(); + } + cachedInputs_.emplace_back(1, wrapInBufferViewAsOwner(rowBuffer->data(), rowSize, rowBuffer)); + cachedRows_++; +} + class VeloxRssSortShuffleReaderDeserializer::VeloxInputStream : public facebook::velox::GlutenByteInputStream { public: VeloxInputStream(std::shared_ptr input, facebook::velox::BufferPtr buffer); diff --git a/cpp/velox/shuffle/VeloxShuffleReader.h b/cpp/velox/shuffle/VeloxShuffleReader.h index 2be913aa13a72..f7ff05c5d13eb 100644 --- a/cpp/velox/shuffle/VeloxShuffleReader.h +++ b/cpp/velox/shuffle/VeloxShuffleReader.h @@ -83,6 +83,8 @@ class VeloxSortShuffleReaderDeserializer final : public ColumnarBatchIterator { private: std::shared_ptr deserializeToBatch(); + void readLargeRow(std::vector>& arrowBuffers); + std::shared_ptr in_; std::shared_ptr schema_; std::shared_ptr codec_; diff --git a/cpp/velox/shuffle/VeloxSortShuffleWriter.cc b/cpp/velox/shuffle/VeloxSortShuffleWriter.cc index 2bfc4908d2f66..55aa739e7a0d5 100644 --- a/cpp/velox/shuffle/VeloxSortShuffleWriter.cc +++ b/cpp/velox/shuffle/VeloxSortShuffleWriter.cc @@ -32,8 +32,6 @@ constexpr uint64_t kMaskLower40Bits = (1UL << 40) - 1; constexpr uint32_t kPartitionIdStartByteIndex = 5; constexpr uint32_t kPartitionIdEndByteIndex = 7; -constexpr uint32_t kSortedBufferSize = 1 * 1024 * 1024; - uint64_t toCompactRowId(uint32_t partitionId, uint32_t pageNumber, uint32_t offsetInPage) { // |63 partitionId(24) |39 inputIndex(13) |26 rowIndex(27) | return (uint64_t)partitionId << 40 | (uint64_t)pageNumber << 27 | offsetInPage; @@ -106,8 +104,17 @@ arrow::Status VeloxSortShuffleWriter::init() { options_.partitioning == Partitioning::kSingle, arrow::Status::Invalid("VeloxSortShuffleWriter doesn't support single partition.")); allocateMinimalArray(); - sortedBuffer_ = facebook::velox::AlignedBuffer::allocate(kSortedBufferSize, veloxPool_.get()); - rawBuffer_ = sortedBuffer_->asMutable(); + // In Spark, sortedBuffer_ memory and compressionBuffer_ memory are pre-allocated and counted into executor + // memory overhead. To align with Spark, we use arrow::default_memory_pool() to avoid counting these memory in Gluten. + ARROW_ASSIGN_OR_RAISE( + sortedBuffer_, arrow::AllocateBuffer(options_.compressionBufferSize, arrow::default_memory_pool())); + rawBuffer_ = sortedBuffer_->mutable_data(); + auto compressedBufferLength = partitionWriter_->getCompressedBufferLength( + {std::make_shared(rawBuffer_, options_.compressionBufferSize)}); + if (compressedBufferLength.has_value()) { + ARROW_ASSIGN_OR_RAISE( + compressionBuffer_, arrow::AllocateBuffer(*compressedBufferLength, arrow::default_memory_pool())); + } return arrow::Status::OK(); } @@ -266,6 +273,7 @@ arrow::Status VeloxSortShuffleWriter::evictAllPartitions() { } arrow::Status VeloxSortShuffleWriter::evictPartition(uint32_t partitionId, size_t begin, size_t end) { + VELOX_DCHECK(begin < end); // Count copy row time into sortTime_. Timer sortTime{}; // Serialize [begin, end) @@ -278,33 +286,51 @@ arrow::Status VeloxSortShuffleWriter::evictPartition(uint32_t partitionId, size_ auto pageIndex = extractPageNumberAndOffset(arrayPtr_[index]); addr = pageAddresses_[pageIndex.first] + pageIndex.second; size = *(RowSizeType*)addr; - if (offset + size > kSortedBufferSize) { + if (offset + size > options_.compressionBufferSize && offset > 0) { sortTime.stop(); - RETURN_NOT_OK(evictPartition0(partitionId, index - begin, offset)); + RETURN_NOT_OK(evictPartition0(partitionId, index - begin, rawBuffer_, offset)); sortTime.start(); begin = index; offset = 0; } - gluten::fastCopy(rawBuffer_ + offset, addr, size); - offset += size; + if (size > options_.compressionBufferSize) { + // Split large rows. + sortTime.stop(); + RowSizeType bytes = 0; + auto* buffer = reinterpret_cast(addr); + while (bytes < size) { + auto rawLength = std::min((uint32_t)options_.compressionBufferSize, size - bytes); + // Use numRows = 0 to represent a part of row. + RETURN_NOT_OK(evictPartition0(partitionId, 0, buffer + bytes, rawLength)); + bytes += rawLength; + } + begin++; + sortTime.start(); + } else { + // Copy small rows. + gluten::fastCopy(rawBuffer_ + offset, addr, size); + offset += size; + } index++; } sortTime.stop(); - RETURN_NOT_OK(evictPartition0(partitionId, index - begin, offset)); - + if (offset > 0) { + VELOX_CHECK(index > begin); + RETURN_NOT_OK(evictPartition0(partitionId, index - begin, rawBuffer_, offset)); + } sortTime_ += sortTime.realTimeUsed(); return arrow::Status::OK(); } -arrow::Status VeloxSortShuffleWriter::evictPartition0(uint32_t partitionId, uint32_t numRows, int64_t rawLength) { +arrow::Status +VeloxSortShuffleWriter::evictPartition0(uint32_t partitionId, int32_t numRows, uint8_t* buffer, int64_t rawLength) { VELOX_CHECK(rawLength > 0); auto payload = std::make_unique( numRows, nullptr, - std::vector>{std::make_shared(rawBuffer_, rawLength)}); + std::vector>{std::make_shared(buffer, rawLength)}); updateSpillMetrics(payload); - RETURN_NOT_OK( - partitionWriter_->evict(partitionId, std::move(payload), Evict::type::kSortSpill, false, false, stopped_)); + RETURN_NOT_OK(partitionWriter_->sortEvict(partitionId, std::move(payload), compressionBuffer_, stopped_)); return arrow::Status::OK(); } diff --git a/cpp/velox/shuffle/VeloxSortShuffleWriter.h b/cpp/velox/shuffle/VeloxSortShuffleWriter.h index 1626573a7dc6e..531ed1fe3e761 100644 --- a/cpp/velox/shuffle/VeloxSortShuffleWriter.h +++ b/cpp/velox/shuffle/VeloxSortShuffleWriter.h @@ -77,7 +77,7 @@ class VeloxSortShuffleWriter final : public VeloxShuffleWriter { arrow::Status evictPartition(uint32_t partitionId, size_t begin, size_t end); - arrow::Status evictPartition0(uint32_t partitionId, uint32_t numRows, int64_t rawLength); + arrow::Status evictPartition0(uint32_t partitionId, int32_t numRows, uint8_t* buffer, int64_t rawLength); uint32_t maxRowsToInsert(uint32_t offset, uint32_t remainingRows); @@ -107,8 +107,9 @@ class VeloxSortShuffleWriter final : public VeloxShuffleWriter { // For debug. uint32_t currenPageSize_; - facebook::velox::BufferPtr sortedBuffer_; + std::unique_ptr sortedBuffer_; uint8_t* rawBuffer_; + std::shared_ptr compressionBuffer_{nullptr}; // Row ID -> Partition ID // subscript: The index of row in the current input RowVector diff --git a/cpp/velox/tests/VeloxShuffleWriterTest.cc b/cpp/velox/tests/VeloxShuffleWriterTest.cc index 7cbfbcd79cc95..d8b2fb72fdd28 100644 --- a/cpp/velox/tests/VeloxShuffleWriterTest.cc +++ b/cpp/velox/tests/VeloxShuffleWriterTest.cc @@ -70,12 +70,17 @@ std::vector createShuffleTestParams() { std::vector mergeBufferSizes = {0, 3, 4, 10, 4096}; for (const auto& compression : compressions) { - for (auto useRadixSort : {true, false}) { - params.push_back(ShuffleTestParams{ - ShuffleWriterType::kSortShuffle, PartitionWriterType::kLocal, compression, 0, 0, useRadixSort}); + for (const auto compressionBufferSize : {4, 56, 32 * 1024}) { + for (auto useRadixSort : {true, false}) { + params.push_back(ShuffleTestParams{ + .shuffleWriterType = ShuffleWriterType::kSortShuffle, + .partitionWriterType = PartitionWriterType::kLocal, + .compressionType = compression, + .compressionBufferSize = compressionBufferSize, + .useRadixSort = useRadixSort}); + } } - params.push_back( - ShuffleTestParams{ShuffleWriterType::kRssSortShuffle, PartitionWriterType::kRss, compression, 0, 0, false}); + params.push_back(ShuffleTestParams{ShuffleWriterType::kRssSortShuffle, PartitionWriterType::kRss, compression}); for (const auto compressionThreshold : compressionThresholds) { for (const auto mergeBufferSize : mergeBufferSizes) { params.push_back(ShuffleTestParams{ @@ -83,11 +88,10 @@ std::vector createShuffleTestParams() { PartitionWriterType::kLocal, compression, compressionThreshold, - mergeBufferSize, - false /* unused */}); + mergeBufferSize}); } params.push_back(ShuffleTestParams{ - ShuffleWriterType::kHashShuffle, PartitionWriterType::kRss, compression, compressionThreshold, 0}); + ShuffleWriterType::kHashShuffle, PartitionWriterType::kRss, compression, compressionThreshold}); } } @@ -588,8 +592,10 @@ TEST_F(VeloxHashShuffleWriterMemoryTest, kSplitSingle) { TEST_F(VeloxHashShuffleWriterMemoryTest, kStop) { for (const auto partitioning : {Partitioning::kSingle, Partitioning::kRoundRobin}) { ASSERT_NOT_OK(initShuffleWriterOptions()); - shuffleWriterOptions_.partitioning = partitioning; - shuffleWriterOptions_.bufferSize = 4; + shuffleWriterOptions_.bufferSize = 4096; + // Force compression. + partitionWriterOptions_.compressionThreshold = 0; + partitionWriterOptions_.mergeThreshold = 0; auto pool = SelfEvictedMemoryPool(defaultArrowMemoryPool().get(), false); auto shuffleWriter = createShuffleWriter(&pool); @@ -597,19 +603,23 @@ TEST_F(VeloxHashShuffleWriterMemoryTest, kStop) { for (int i = 0; i < 10; ++i) { ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_)); - ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector2_)); - ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_)); } + // Reclaim bytes to shrink partition buffer. + int64_t reclaimed = 0; + ASSERT_NOT_OK(shuffleWriter->reclaimFixedSize(2000, &reclaimed)); + ASSERT(reclaimed >= 2000); // Trigger spill during stop. - // For single partitioning, spill is triggered by allocating buffered output stream. ASSERT_TRUE(pool.checkEvict(pool.bytes_allocated(), [&] { ASSERT_NOT_OK(shuffleWriter->stop()); })); } } TEST_F(VeloxHashShuffleWriterMemoryTest, kStopComplex) { ASSERT_NOT_OK(initShuffleWriterOptions()); - shuffleWriterOptions_.bufferSize = 4; + shuffleWriterOptions_.bufferSize = 4096; + // Force compression. + partitionWriterOptions_.compressionThreshold = 0; + partitionWriterOptions_.mergeThreshold = 0; auto pool = SelfEvictedMemoryPool(defaultArrowMemoryPool().get(), false); auto shuffleWriter = createShuffleWriter(&pool); @@ -617,6 +627,10 @@ TEST_F(VeloxHashShuffleWriterMemoryTest, kStopComplex) { for (int i = 0; i < 3; ++i) { ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVectorComplex_)); } + // Reclaim bytes to shrink partition buffer. + int64_t reclaimed = 0; + ASSERT_NOT_OK(shuffleWriter->reclaimFixedSize(2000, &reclaimed)); + ASSERT(reclaimed >= 2000); // Reclaim from PartitionWriter to free cached bytes. auto payloadSize = shuffleWriter->cachedPayloadSize(); diff --git a/cpp/velox/utils/tests/MemoryPoolUtils.cc b/cpp/velox/utils/tests/MemoryPoolUtils.cc index a595d011952ff..5a0ae03b14963 100644 --- a/cpp/velox/utils/tests/MemoryPoolUtils.cc +++ b/cpp/velox/utils/tests/MemoryPoolUtils.cc @@ -128,6 +128,7 @@ int64_t SelfEvictedMemoryPool::num_allocations() const { arrow::Status SelfEvictedMemoryPool::ensureCapacity(int64_t size) { VELOX_CHECK_NOT_NULL(evictable_); + DLOG(INFO) << "Size: " << size << ", capacity_: " << capacity_ << ", bytes allocated: " << pool_->bytes_allocated(); if (size > capacity_ - pool_->bytes_allocated()) { // Self evict. int64_t actual; diff --git a/cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h b/cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h index 102c73ca49fab..d9a2c1e2eaa57 100644 --- a/cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h +++ b/cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h @@ -66,15 +66,17 @@ struct ShuffleTestParams { ShuffleWriterType shuffleWriterType; PartitionWriterType partitionWriterType; arrow::Compression::type compressionType; - int32_t compressionThreshold; - int32_t mergeBufferSize; - bool useRadixSort; + int32_t compressionThreshold{0}; + int32_t mergeBufferSize{0}; + int32_t compressionBufferSize{0}; + bool useRadixSort{false}; std::string toString() const { std::ostringstream out; out << "shuffleWriterType = " << shuffleWriterType << ", partitionWriterType = " << partitionWriterType << ", compressionType = " << compressionType << ", compressionThreshold = " << compressionThreshold - << ", mergeBufferSize = " << mergeBufferSize << ", useRadixSort = " << (useRadixSort ? "true" : "false"); + << ", mergeBufferSize = " << mergeBufferSize << ", compressionBufferSize = " << compressionBufferSize + << ", useRadixSort = " << (useRadixSort ? "true" : "false"); return out.str(); } }; @@ -255,6 +257,7 @@ class VeloxShuffleWriterTest : public ::testing::TestWithParam= 4, got $bufferSize") + } + bufferSize + } + if ("lz4" == codec) { + checkAndGetBufferSize(IO_COMPRESSION_LZ4_BLOCKSIZE) + } else if ("zstd" == codec) { + checkAndGetBufferSize(IO_COMPRESSION_ZSTD_BUFFERSIZE) + } else { + GlutenConfig.GLUTEN_SHUFFLE_DEFUALT_COMPRESSION_BUFFER_SIZE + } + } + def getReaderParam[K, C]( handle: ShuffleHandle, startMapIndex: Int, diff --git a/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java b/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java index 5a5bf5caa30cc..28261bda3f9dc 100644 --- a/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java +++ b/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java @@ -67,6 +67,7 @@ public class VeloxUniffleColumnarShuffleWriter extends RssShuffleWriter null)); + compressionBufferSize = + GlutenShuffleUtils.getCompressionBufferSize(sparkConf, compressionCodec); } } @@ -150,6 +153,7 @@ protected void writeImpl(Iterator> records) { // use field do this compressionCodec, compressionLevel, + compressionBufferSize, compressThreshold, GlutenConfig.getConf().columnarShuffleCompressionMode(), (int) (long) sparkConf.get(package$.MODULE$.SHUFFLE_SORT_INIT_BUFFER_SIZE()), diff --git a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala index 27ce1ec363798..df4b64c74ea01 100644 --- a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala +++ b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala @@ -588,8 +588,8 @@ object GlutenConfig { // Shuffle Writer buffer size. val GLUTEN_SHUFFLE_WRITER_BUFFER_SIZE = "spark.gluten.shuffleWriter.bufferSize" - val GLUTEN_SHUFFLE_WRITER_MERGE_THRESHOLD = "spark.gluten.sql.columnar.shuffle.merge.threshold" + val GLUTEN_SHUFFLE_DEFUALT_COMPRESSION_BUFFER_SIZE = 32 * 1024 // Controls whether to load DLL from jars. User can get dependent native libs packed into a jar // by executing dev/package.sh. Then, with that jar configured, Gluten can load the native libs