diff --git a/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/MetricsApiImpl.scala b/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/MetricsApiImpl.scala index eca07afb311ed..6d9d560893672 100644 --- a/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/MetricsApiImpl.scala +++ b/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/MetricsApiImpl.scala @@ -263,6 +263,7 @@ class MetricsApiImpl extends MetricsApi with Logging { sparkContext: SparkContext): Map[String, SQLMetric] = Map( "dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"), + "numPartitions" -> SQLMetrics.createMetric(sparkContext, "number of partitions"), "bytesSpilled" -> SQLMetrics.createSizeMetric(sparkContext, "shuffle bytes spilled"), "splitBufferSize" -> SQLMetrics.createSizeMetric(sparkContext, "split buffer size total"), "splitTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "totaltime to split"), diff --git a/cpp/core/CMakeLists.txt b/cpp/core/CMakeLists.txt index fb7c714a3ef89..24b5b88a0eab2 100644 --- a/cpp/core/CMakeLists.txt +++ b/cpp/core/CMakeLists.txt @@ -213,7 +213,7 @@ set(SPARK_COLUMNAR_PLUGIN_SRCS utils/StringUtil.cc utils/ObjectStore.cc jni/JniError.cc - jni/JniCommon.cc) + jni/JniCommon.cc shuffle/PartitionWriter.cc) file(MAKE_DIRECTORY ${root_directory}/releases) add_library(gluten SHARED ${SPARK_COLUMNAR_PLUGIN_SRCS}) diff --git a/cpp/core/shuffle/LocalPartitionWriter.cc b/cpp/core/shuffle/LocalPartitionWriter.cc index 313be10653b8c..2e21a3a33255d 100644 --- a/cpp/core/shuffle/LocalPartitionWriter.cc +++ b/cpp/core/shuffle/LocalPartitionWriter.cc @@ -25,19 +25,16 @@ namespace gluten { -class LocalPartitionWriter::LocalEvictHandle : public EvictHandle { +class LocalPartitionWriter::LocalEvictor : public Evictor { public: - LocalEvictHandle( - uint32_t numPartitions, - const arrow::ipc::IpcWriteOptions& options, - const std::shared_ptr& spillInfo) - : numPartitions_(numPartitions), options_(options), spillInfo_(spillInfo) {} + LocalEvictor(uint32_t numPartitions, ShuffleWriterOptions* options, const std::shared_ptr& spillInfo) + : Evictor(options), numPartitions_(numPartitions), spillInfo_(spillInfo) {} - static std::shared_ptr create( + static arrow::Result> create( uint32_t numPartitions, - const arrow::ipc::IpcWriteOptions& options, + ShuffleWriterOptions* options, const std::shared_ptr& spillInfo, - bool flush); + Evictor::Type evictType); bool finished() { return finished_; @@ -45,22 +42,20 @@ class LocalPartitionWriter::LocalEvictHandle : public EvictHandle { virtual arrow::Status flushCachedPayloads(uint32_t partitionId, arrow::io::OutputStream* os) = 0; + virtual Type evictType() = 0; + protected: uint32_t numPartitions_; - arrow::ipc::IpcWriteOptions options_; std::shared_ptr spillInfo_; std::shared_ptr os_; bool finished_{false}; }; -class CacheEvictHandle final : public LocalPartitionWriter::LocalEvictHandle { +class CacheEvictor final : public LocalPartitionWriter::LocalEvictor { public: - CacheEvictHandle( - uint32_t numPartitions, - const arrow::ipc::IpcWriteOptions& options, - const std::shared_ptr& spillInfo) - : LocalPartitionWriter::LocalEvictHandle(numPartitions, options, spillInfo) { + CacheEvictor(uint32_t numPartitions, ShuffleWriterOptions* options, const std::shared_ptr& spillInfo) + : LocalPartitionWriter::LocalEvictor(numPartitions, options, spillInfo) { partitionCachedPayload_.resize(numPartitions); } @@ -70,6 +65,7 @@ class CacheEvictHandle final : public LocalPartitionWriter::LocalEvictHandle { } arrow::Status finish() override { + ScopedTimer timer(evictTime_); if (!finished_) { ARROW_ASSIGN_OR_RAISE(os_, arrow::io::FileOutputStream::Open(spillInfo_->spilledFile, true)); int64_t start = 0; @@ -98,36 +94,42 @@ class CacheEvictHandle final : public LocalPartitionWriter::LocalEvictHandle { return arrow::Status::OK(); } + ScopedTimer timer(evictTime_); int32_t metadataLength = 0; // unused auto payloads = std::move(partitionCachedPayload_[partitionId]); // Clear cached batches before creating the payloads, to avoid spilling this partition. partitionCachedPayload_[partitionId].clear(); for (auto& payload : payloads) { - RETURN_NOT_OK(arrow::ipc::WriteIpcPayload(*payload, options_, os, &metadataLength)); + RETURN_NOT_OK(arrow::ipc::WriteIpcPayload(*payload, options_->ipc_write_options, os, &metadataLength)); } return arrow::Status::OK(); } + Type evictType() override { + return Type::kCache; + } + private: std::vector>> partitionCachedPayload_; }; -class FlushOnSpillEvictHandle final : public LocalPartitionWriter::LocalEvictHandle { +class FlushOnSpillEvictor final : public LocalPartitionWriter::LocalEvictor { public: - FlushOnSpillEvictHandle( + FlushOnSpillEvictor( uint32_t numPartitions, - const arrow::ipc::IpcWriteOptions& options, + ShuffleWriterOptions* options, const std::shared_ptr& spillInfo) - : LocalPartitionWriter::LocalEvictHandle(numPartitions, options, spillInfo) {} + : LocalPartitionWriter::LocalEvictor(numPartitions, options, spillInfo) {} arrow::Status evict(uint32_t partitionId, std::unique_ptr payload) override { + ScopedTimer timer(evictTime_); if (!os_) { ARROW_ASSIGN_OR_RAISE(os_, arrow::io::FileOutputStream::Open(spillInfo_->spilledFile, true)); } int32_t metadataLength = 0; // unused. ARROW_ASSIGN_OR_RAISE(auto start, os_->Tell()); - RETURN_NOT_OK(arrow::ipc::WriteIpcPayload(*payload, options_, os_.get(), &metadataLength)); + RETURN_NOT_OK(arrow::ipc::WriteIpcPayload(*payload, options_->ipc_write_options, os_.get(), &metadataLength)); ARROW_ASSIGN_OR_RAISE(auto end, os_->Tell()); DEBUG_OUT << "Spilled partition " << partitionId << " file start: " << start << ", file end: " << end << std::endl; spillInfo_->partitionSpillInfos.push_back({partitionId, end - start}); @@ -148,29 +150,36 @@ class FlushOnSpillEvictHandle final : public LocalPartitionWriter::LocalEvictHan arrow::Status flushCachedPayloads(uint32_t partitionId, arrow::io::OutputStream* os) override { return arrow::Status::OK(); } + + Type evictType() override { + return Type::kFlush; + } }; -std::shared_ptr LocalPartitionWriter::LocalEvictHandle::create( +arrow::Result> LocalPartitionWriter::LocalEvictor::create( uint32_t numPartitions, - const arrow::ipc::IpcWriteOptions& options, + ShuffleWriterOptions* options, const std::shared_ptr& spillInfo, - bool flush) { - if (flush) { - return std::make_shared(numPartitions, options, spillInfo); - } else { - return std::make_shared(numPartitions, options, spillInfo); + Evictor::Type evictType) { + switch (evictType) { + case Evictor::Type::kFlush: + return std::make_unique(numPartitions, options, spillInfo); + case Type::kCache: + return std::make_unique(numPartitions, options, spillInfo); + default: + return arrow::Status::Invalid("Cannot create Evictor from type Evictor::Type::kStop."); } } std::string LocalPartitionWriter::nextSpilledFileDir() { auto spilledFileDir = getSpilledShuffleFileDir(configuredDirs_[dirSelection_], subDirSelection_[dirSelection_]); - subDirSelection_[dirSelection_] = (subDirSelection_[dirSelection_] + 1) % shuffleWriter_->options().num_sub_dirs; + subDirSelection_[dirSelection_] = (subDirSelection_[dirSelection_] + 1) % options_->num_sub_dirs; dirSelection_ = (dirSelection_ + 1) % configuredDirs_.size(); return spilledFileDir; } arrow::Status LocalPartitionWriter::setLocalDirs() { - configuredDirs_ = splitPaths(shuffleWriter_->options().local_dirs); + configuredDirs_ = splitPaths(options_->local_dirs); // Shuffle the configured local directories. This prevents each task from using the same directory for spilled files. std::random_device rd; std::default_random_engine engine(rd()); @@ -182,11 +191,10 @@ arrow::Status LocalPartitionWriter::setLocalDirs() { arrow::Status LocalPartitionWriter::openDataFile() { // open data file output stream std::shared_ptr fout; - ARROW_ASSIGN_OR_RAISE(fout, arrow::io::FileOutputStream::Open(shuffleWriter_->options().data_file)); - if (shuffleWriter_->options().buffered_write) { + ARROW_ASSIGN_OR_RAISE(fout, arrow::io::FileOutputStream::Open(options_->data_file)); + if (options_->buffered_write) { // Output stream buffer is neither partition buffer memory nor ipc memory. - ARROW_ASSIGN_OR_RAISE( - dataFileOs_, arrow::io::BufferedOutputStream::Create(16384, shuffleWriter_->options().memory_pool, fout)); + ARROW_ASSIGN_OR_RAISE(dataFileOs_, arrow::io::BufferedOutputStream::Create(16384, options_->memory_pool, fout)); } else { dataFileOs_ = fout; } @@ -202,16 +210,38 @@ arrow::Status LocalPartitionWriter::clearResource() { } arrow::Status LocalPartitionWriter::init() { + partitionLengths_.resize(numPartitions_, 0); + rawPartitionLengths_.resize(numPartitions_, 0); fs_ = std::make_shared(); RETURN_NOT_OK(setLocalDirs()); return arrow::Status::OK(); } -arrow::Status LocalPartitionWriter::stop() { - int64_t totalBytesEvicted = 0; - int64_t totalBytesWritten = 0; - auto numPartitions = shuffleWriter_->numPartitions(); +arrow::Status LocalPartitionWriter::mergeSpills(uint32_t partitionId) { + for (auto spill : spills_) { + // Read if partition exists in the spilled file and write to the final file. + if (spill->mergePos < spill->partitionSpillInfos.size() && + spill->partitionSpillInfos[spill->mergePos].partitionId == partitionId) { // A hit. + if (!spill->inputStream) { + // Open spilled file. + ARROW_ASSIGN_OR_RAISE( + spill->inputStream, arrow::io::MemoryMappedFile::Open(spill->spilledFile, arrow::io::FileMode::READ)); + // Add evict metrics. + ARROW_ASSIGN_OR_RAISE(auto spilledSize, spill->inputStream->GetSize()); + totalBytesEvicted_ += spilledSize; + } + auto spillInfo = spill->partitionSpillInfos[spill->mergePos]; + ARROW_ASSIGN_OR_RAISE(auto raw, spill->inputStream->Read(spillInfo.length)); + RETURN_NOT_OK(dataFileOs_->Write(raw)); + // Goto next partition in this spillInfo. + spill->mergePos++; + } + } + return arrow::Status::OK(); +} + +arrow::Status LocalPartitionWriter::stop(ShuffleWriterMetrics* metrics) { // Open final file. // If options_.buffered_write is set, it will acquire 16KB memory that might trigger spill. RETURN_NOT_OK(openDataFile()); @@ -220,54 +250,38 @@ arrow::Status LocalPartitionWriter::stop() { writeTimer.start(); int64_t endInFinalFile = 0; + int32_t metadataLength = 0; + auto cachedBuffersIter = cachedPartitionBuffers_.begin(); // Iterator over pid. - for (auto pid = 0; pid < numPartitions; ++pid) { + for (auto pid = 0; pid < numPartitions_; ++pid) { // Record start offset. auto startInFinalFile = endInFinalFile; // Iterator over all spilled files. - for (auto spill : spills_) { - // Read if partition exists in the spilled file and write to the final file. - if (spill->mergePos < spill->partitionSpillInfos.size() && - spill->partitionSpillInfos[spill->mergePos].partitionId == pid) { // A hit. - if (!spill->inputStream) { - // Open spilled file. - ARROW_ASSIGN_OR_RAISE( - spill->inputStream, arrow::io::MemoryMappedFile::Open(spill->spilledFile, arrow::io::FileMode::READ)); - // Add evict metrics. - ARROW_ASSIGN_OR_RAISE(auto spilledSize, spill->inputStream->GetSize()); - totalBytesEvicted += spilledSize; - } - - auto spillInfo = spill->partitionSpillInfos[spill->mergePos]; - ARROW_ASSIGN_OR_RAISE(auto raw, spill->inputStream->Read(spillInfo.length)); - RETURN_NOT_OK(dataFileOs_->Write(raw)); - // Goto next partition in this spillInfo. - spill->mergePos++; - } - } + RETURN_NOT_OK(mergeSpills(pid)); // Write cached batches. - if (evictHandle_ && !evictHandle_->finished()) { - RETURN_NOT_OK(evictHandle_->flushCachedPayloads(pid, dataFileOs_.get())); + if (evictor_ && !evictor_->finished()) { + RETURN_NOT_OK(evictor_->flushCachedPayloads(pid, dataFileOs_.get())); } // Compress and write the last payload. // Stop the timer to prevent counting the compression time into write time. - writeTimer.stop(); - ARROW_ASSIGN_OR_RAISE(auto lastPayload, shuffleWriter_->createPayloadFromBuffer(pid, false)); - writeTimer.start(); - if (lastPayload) { - int32_t metadataLength = 0; // unused - RETURN_NOT_OK(arrow::ipc::WriteIpcPayload( - *lastPayload, shuffleWriter_->options().ipc_write_options, dataFileOs_.get(), &metadataLength)); + if (cachedBuffersIter != cachedPartitionBuffers_.end() && std::get<0>(*cachedBuffersIter) == pid) { + writeTimer.stop(); + ARROW_ASSIGN_OR_RAISE( + auto payload, + createPayloadFromBuffers(std::get<1>(*cachedBuffersIter), std::move(std::get<2>(*cachedBuffersIter)))); + writeTimer.start(); + RETURN_NOT_OK( + arrow::ipc::WriteIpcPayload(*payload, options_->ipc_write_options, dataFileOs_.get(), &metadataLength)); + cachedBuffersIter++; } ARROW_ASSIGN_OR_RAISE(endInFinalFile, dataFileOs_->Tell()); - if (endInFinalFile != startInFinalFile && shuffleWriter_->options().write_eos) { + if (endInFinalFile != startInFinalFile && options_->write_eos) { // Write EOS if any payload written. int64_t bytes; RETURN_NOT_OK(writeEos(dataFileOs_.get(), &bytes)); endInFinalFile += bytes; } - - shuffleWriter_->setPartitionLengths(pid, endInFinalFile - startInFinalFile); + partitionLengths_[pid] = endInFinalFile - startInFinalFile; } for (auto spill : spills_) { @@ -281,34 +295,36 @@ arrow::Status LocalPartitionWriter::stop() { } RETURN_NOT_OK(fs_->DeleteFile(spill->spilledFile)); } - - ARROW_ASSIGN_OR_RAISE(totalBytesWritten, dataFileOs_->Tell()); + ARROW_ASSIGN_OR_RAISE(totalBytesWritten_, dataFileOs_->Tell()); writeTimer.stop(); - - shuffleWriter_->setTotalWriteTime(writeTimer.realTimeUsed()); - shuffleWriter_->setTotalBytesEvicted(totalBytesEvicted); - shuffleWriter_->setTotalBytesWritten(totalBytesWritten); + writeTime_ = writeTimer.realTimeUsed(); // Close Final file, Clear buffered resources. RETURN_NOT_OK(clearResource()); - + // Populate metrics. + RETURN_NOT_OK(populateMetrics(metrics)); return arrow::Status::OK(); } -arrow::Status LocalPartitionWriter::requestNextEvict(bool flush) { +arrow::Status LocalPartitionWriter::requestEvict(Evictor::Type evictType) { + if (auto handle = getEvictHandle()) { + if (handle->evictType() == evictType) { + return arrow::Status::OK(); + } + } RETURN_NOT_OK(finishEvict()); + ARROW_ASSIGN_OR_RAISE(auto spilledFile, createTempShuffleFile(nextSpilledFileDir())); auto spillInfo = std::make_shared(spilledFile); spills_.push_back(spillInfo); - evictHandle_ = LocalEvictHandle::create( - shuffleWriter_->numPartitions(), shuffleWriter_->options().ipc_write_options, spillInfo, flush); + ARROW_ASSIGN_OR_RAISE(evictor_, LocalEvictor::create(numPartitions_, options_, spillInfo, evictType)); return arrow::Status::OK(); } -EvictHandle* LocalPartitionWriter::getEvictHandle() { - if (evictHandle_ && !evictHandle_->finished()) { - return evictHandle_.get(); +LocalPartitionWriter::LocalEvictor* LocalPartitionWriter::getEvictHandle() { + if (evictor_ && !evictor_->finished()) { + return evictor_.get(); } return nullptr; } @@ -316,6 +332,7 @@ EvictHandle* LocalPartitionWriter::getEvictHandle() { arrow::Status LocalPartitionWriter::finishEvict() { if (auto handle = getEvictHandle()) { RETURN_NOT_OK(handle->finish()); + evictTime_ += handle->getEvictTime(); // The spilled file should not be empty. However, defensively // discard the last SpillInfo to avoid iterating over invalid ones. auto lastSpillInfo = spills_.back(); @@ -327,11 +344,56 @@ arrow::Status LocalPartitionWriter::finishEvict() { return arrow::Status::OK(); } +arrow::Status LocalPartitionWriter::evict( + uint32_t partitionId, + uint32_t numRows, + std::vector> buffers, + Evictor::Type evictType) { + rawPartitionLengths_[partitionId] += getBufferSize(buffers); + if (evictType == Evictor::Type::kStop) { + cachedPartitionBuffers_.push_back({partitionId, numRows, std::move(buffers)}); + } else { + ARROW_ASSIGN_OR_RAISE(auto payload, createPayloadFromBuffers(numRows, std::move(buffers))); + RETURN_NOT_OK(requestEvict(evictType)); + RETURN_NOT_OK(getEvictHandle()->evict(partitionId, std::move(payload))); + } + return arrow::Status::OK(); +} + +arrow::Status LocalPartitionWriter::populateMetrics(ShuffleWriterMetrics* metrics) { + metrics->totalCompressTime += compressTime_; + metrics->totalEvictTime += evictTime_; + metrics->totalWriteTime += writeTime_; + metrics->totalBytesEvicted += totalBytesEvicted_; + metrics->totalBytesWritten += totalBytesWritten_; + metrics->partitionLengths = std::move(partitionLengths_); + metrics->rawPartitionLengths = std::move(rawPartitionLengths_); + return arrow::Status::OK(); +} + +arrow::Status LocalPartitionWriter::evictFixedSize(int64_t size, int64_t* actual) { + auto beforeShrink = options_->memory_pool->bytes_allocated(); + for (auto& item : cachedPartitionBuffers_) { + auto& buffers = std::get<2>(item); + for (auto& buffer : buffers) { + if (!buffer) { + continue; + } + if (auto parent = std::dynamic_pointer_cast(buffer->parent())) { + RETURN_NOT_OK(parent->Resize(buffer->size())); + } + } + } + *actual = beforeShrink - options_->memory_pool->bytes_allocated(); + return arrow::Status::OK(); +} + LocalPartitionWriterCreator::LocalPartitionWriterCreator() : PartitionWriterCreator() {} arrow::Result> LocalPartitionWriterCreator::make( - ShuffleWriter* shuffleWriter) { - auto partitionWriter = std::make_shared(shuffleWriter); + uint32_t numPartitions, + ShuffleWriterOptions* options) { + auto partitionWriter = std::make_shared(numPartitions, options); RETURN_NOT_OK(partitionWriter->init()); return partitionWriter; } diff --git a/cpp/core/shuffle/LocalPartitionWriter.h b/cpp/core/shuffle/LocalPartitionWriter.h index 27fa2c0c2c6f9..96db7c8244764 100644 --- a/cpp/core/shuffle/LocalPartitionWriter.h +++ b/cpp/core/shuffle/LocalPartitionWriter.h @@ -46,13 +46,16 @@ struct SpillInfo { class LocalPartitionWriter : public ShuffleWriter::PartitionWriter { public: - explicit LocalPartitionWriter(ShuffleWriter* shuffleWriter) : PartitionWriter(shuffleWriter) {} + explicit LocalPartitionWriter(uint32_t numPartitions, ShuffleWriterOptions* options) + : PartitionWriter(numPartitions, options) {} arrow::Status init() override; - arrow::Status requestNextEvict(bool flush) override; - - EvictHandle* getEvictHandle() override; + arrow::Status evict( + uint32_t partitionId, + uint32_t numRows, + std::vector> buffers, + Evictor::Type evictType) override; arrow::Status finishEvict() override; @@ -75,22 +78,32 @@ class LocalPartitionWriter : public ShuffleWriter::PartitionWriter { /// If spill is triggered by 2.c, cached payloads of the remaining unmerged partitions will be spilled. /// In both cases, if the cached payload size doesn't free enough memory, /// it will shrink partition buffers to free more memory. - arrow::Status stop() override; + arrow::Status stop(ShuffleWriterMetrics* metrics) override; + + arrow::Status evictFixedSize(int64_t size, int64_t* actual) override; - class LocalEvictHandle; + class LocalEvictor; private: + arrow::Status requestEvict(Evictor::Type evictType); + + LocalEvictor* getEvictHandle(); + arrow::Status setLocalDirs(); std::string nextSpilledFileDir(); arrow::Status openDataFile(); + arrow::Status mergeSpills(uint32_t partitionId); + arrow::Status clearResource(); - std::shared_ptr fs_{}; - std::shared_ptr evictHandle_; - std::vector> spills_; + arrow::Status populateMetrics(ShuffleWriterMetrics* metrics); + + std::shared_ptr fs_{nullptr}; + std::shared_ptr evictor_{nullptr}; + std::vector> spills_{}; // configured local dirs for spilled file int32_t dirSelection_ = 0; @@ -98,12 +111,19 @@ class LocalPartitionWriter : public ShuffleWriter::PartitionWriter { std::vector configuredDirs_; std::shared_ptr dataFileOs_; + int64_t totalBytesEvicted_{0}; + int64_t totalBytesWritten_{0}; + std::vector partitionLengths_; + std::vector rawPartitionLengths_; + std::vector>>> cachedPartitionBuffers_; }; class LocalPartitionWriterCreator : public ShuffleWriter::PartitionWriterCreator { public: LocalPartitionWriterCreator(); - arrow::Result> make(ShuffleWriter* shuffleWriter) override; + arrow::Result> make( + uint32_t numPartitions, + ShuffleWriterOptions* options) override; }; } // namespace gluten diff --git a/cpp/core/shuffle/Options.h b/cpp/core/shuffle/Options.h index 8c7a395c33f2b..20f72f9f06ed2 100644 --- a/cpp/core/shuffle/Options.h +++ b/cpp/core/shuffle/Options.h @@ -62,6 +62,9 @@ struct ShuffleWriterOptions { arrow::ipc::IpcWriteOptions ipc_write_options = arrow::ipc::IpcWriteOptions::Defaults(); + std::shared_ptr write_schema{nullptr}; + std::shared_ptr codec{nullptr}; + std::string data_file{}; std::string local_dirs{}; arrow::MemoryPool* memory_pool{}; diff --git a/cpp/core/shuffle/PartitionWriter.cc b/cpp/core/shuffle/PartitionWriter.cc new file mode 100644 index 0000000000000..4b12511b86332 --- /dev/null +++ b/cpp/core/shuffle/PartitionWriter.cc @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "shuffle/PartitionWriter.h" +#include "shuffle/Utils.h" + +namespace gluten { + +arrow::Result> gluten::ShuffleWriter::PartitionWriter::createPayloadFromBuffers( + uint32_t numRows, + std::vector> buffers) { + std::shared_ptr recordBatch; + if (options_->compression_type != arrow::Compression::UNCOMPRESSED) { + ARROW_ASSIGN_OR_RAISE( + recordBatch, + makeCompressedRecordBatch( + numRows, + std::move(buffers), + options_->write_schema, + options_->ipc_write_options.memory_pool, + options_->codec.get(), + options_->compression_threshold, + options_->compression_mode, + compressTime_)); + } else { + ARROW_ASSIGN_OR_RAISE( + recordBatch, + makeUncompressedRecordBatch( + numRows, std::move(buffers), options_->write_schema, options_->ipc_write_options.memory_pool)); + } + + auto payload = std::make_unique(); + RETURN_NOT_OK(arrow::ipc::GetRecordBatchPayload(*recordBatch, options_->ipc_write_options, payload.get())); + return payload; +} + +} // namespace gluten \ No newline at end of file diff --git a/cpp/core/shuffle/PartitionWriter.h b/cpp/core/shuffle/PartitionWriter.h index d87c20955c3c8..9090df3d73fa0 100644 --- a/cpp/core/shuffle/PartitionWriter.h +++ b/cpp/core/shuffle/PartitionWriter.h @@ -22,40 +22,61 @@ namespace gluten { -class EvictHandle { +class Evictor { public: - virtual ~EvictHandle() = default; + enum Type { kCache, kFlush, kStop }; + + Evictor(ShuffleWriterOptions* options) : options_(options) {} + + virtual ~Evictor() = default; virtual arrow::Status evict(uint32_t partitionId, std::unique_ptr payload) = 0; virtual arrow::Status finish() = 0; + + int64_t getEvictTime() { + return evictTime_; + } + + protected: + ShuffleWriterOptions* options_; + + int64_t evictTime_{0}; }; -class ShuffleWriter::PartitionWriter { +class ShuffleWriter::PartitionWriter : public Evictable { public: - PartitionWriter(ShuffleWriter* shuffleWriter) : shuffleWriter_(shuffleWriter) {} + PartitionWriter(uint32_t numPartitions, ShuffleWriterOptions* options) + : numPartitions_(numPartitions), options_(options) {} + virtual ~PartitionWriter() = default; virtual arrow::Status init() = 0; - virtual arrow::Status stop() = 0; + virtual arrow::Status stop(ShuffleWriterMetrics* metrics) = 0; - /// Request next evict. The caller can use `requestNextEvict` to start a evict, and choose to call - /// `getEvictHandle()->evict()` immediately, or to call it latter somewhere else. - /// The caller can start new evict multiple times. Once it's called, the last `EvictHandle` - /// will be finished automatically. + /// Evict buffers for `partitionId` partition. /// \param flush Whether to flush the evicted data immediately. If it's false, /// the data can be cached first. - virtual arrow::Status requestNextEvict(bool flush) = 0; - - /// Get the current managed EvictHandle. Returns nullptr if the current EvictHandle was finished, - /// or requestNextEvict has not been called. - /// \return - virtual EvictHandle* getEvictHandle() = 0; + virtual arrow::Status evict( + uint32_t partitionId, + uint32_t numRows, + std::vector> buffers, + Evictor::Type evictType) = 0; virtual arrow::Status finishEvict() = 0; - ShuffleWriter* shuffleWriter_; + protected: + arrow::Result> createPayloadFromBuffers( + uint32_t numRows, + std::vector> buffers); + + uint32_t numPartitions_; + + ShuffleWriterOptions* options_; + int64_t compressTime_{0}; + int64_t evictTime_{0}; + int64_t writeTime_{0}; }; } // namespace gluten diff --git a/cpp/core/shuffle/PartitionWriterCreator.h b/cpp/core/shuffle/PartitionWriterCreator.h index f1cb5873fd74f..49ce7fc33ac36 100644 --- a/cpp/core/shuffle/PartitionWriterCreator.h +++ b/cpp/core/shuffle/PartitionWriterCreator.h @@ -27,7 +27,9 @@ class ShuffleWriter::PartitionWriterCreator { PartitionWriterCreator() = default; virtual ~PartitionWriterCreator() = default; - virtual arrow::Result> make(ShuffleWriter* shuffleWriter) = 0; + virtual arrow::Result> make( + uint32_t numPartitions, + ShuffleWriterOptions* options) = 0; }; } // namespace gluten diff --git a/cpp/core/shuffle/ShuffleReader.cc b/cpp/core/shuffle/ShuffleReader.cc index b1b278f581f3f..6b05dd64e5b89 100644 --- a/cpp/core/shuffle/ShuffleReader.cc +++ b/cpp/core/shuffle/ShuffleReader.cc @@ -36,7 +36,7 @@ class ShuffleReaderOutStream : public ColumnarBatchIterator { const std::function ipcTimeAccumulator) : options_(options), in_(in), ipcTimeAccumulator_(ipcTimeAccumulator) { if (options.compression_type != arrow::Compression::UNCOMPRESSED) { - writeSchema_ = toCompressWriteSchema(*schema); + writeSchema_ = toCompressWriteSchema(); } else { writeSchema_ = toWriteSchema(*schema); } diff --git a/cpp/core/shuffle/ShuffleSchema.h b/cpp/core/shuffle/ShuffleSchema.h index dd906fdc6cced..dbcd45154c92a 100644 --- a/cpp/core/shuffle/ShuffleSchema.h +++ b/cpp/core/shuffle/ShuffleSchema.h @@ -49,7 +49,7 @@ inline std::shared_ptr toWriteSchema(arrow::Schema& schema) { return std::make_shared(fields); } -inline std::shared_ptr toCompressWriteSchema(arrow::Schema& schema) { +inline std::shared_ptr toCompressWriteSchema() { std::vector> fields; fields.emplace_back(std::make_shared("header", arrow::large_utf8())); fields.emplace_back(std::make_shared("lengthBuffer", arrow::large_utf8())); diff --git a/cpp/core/shuffle/ShuffleWriter.cc b/cpp/core/shuffle/ShuffleWriter.cc index e58869fdf548b..5c42b94a4c856 100644 --- a/cpp/core/shuffle/ShuffleWriter.cc +++ b/cpp/core/shuffle/ShuffleWriter.cc @@ -30,23 +30,4 @@ namespace gluten { // by default, allocate 8M block, 2M page size #define SPLIT_BUFFER_SIZE 16 * 1024 * 1024 #endif - -std::shared_ptr ShuffleWriter::writeSchema() { - if (writeSchema_ != nullptr) { - return writeSchema_; - } - - writeSchema_ = toWriteSchema(*schema_); - return writeSchema_; -} - -std::shared_ptr ShuffleWriter::compressWriteSchema() { - if (compressWriteSchema_ != nullptr) { - return compressWriteSchema_; - } - - compressWriteSchema_ = toCompressWriteSchema(*schema_); - return compressWriteSchema_; -} - } // namespace gluten diff --git a/cpp/core/shuffle/ShuffleWriter.h b/cpp/core/shuffle/ShuffleWriter.h index df075a9984fe8..91d6861086d78 100644 --- a/cpp/core/shuffle/ShuffleWriter.h +++ b/cpp/core/shuffle/ShuffleWriter.h @@ -31,6 +31,16 @@ namespace gluten { +struct ShuffleWriterMetrics { + int64_t totalBytesWritten{0}; + int64_t totalBytesEvicted{0}; + int64_t totalWriteTime{0}; + int64_t totalEvictTime{0}; + int64_t totalCompressTime{0}; + std::vector partitionLengths{}; + std::vector rawPartitionLengths{}; // Uncompressed size. +}; + class ShuffleMemoryPool : public arrow::MemoryPool { public: ShuffleMemoryPool(arrow::MemoryPool* pool) : pool_(pool) {} @@ -91,11 +101,7 @@ class ShuffleWriter : public Evictable { virtual arrow::Status split(std::shared_ptr cb, int64_t memLimit) = 0; - virtual arrow::Result> createPayloadFromBuffer( - uint32_t partitionId, - bool reuseBuffers) = 0; - - virtual arrow::Status evictPayload(uint32_t partitionId, std::unique_ptr payload) = 0; + virtual arrow::Status evictPartitionBuffers(uint32_t partitionId, bool reuseBuffers) = 0; virtual arrow::Status stop() = 0; @@ -107,66 +113,42 @@ class ShuffleWriter : public Evictable { return numPartitions_; } - int64_t totalBytesWritten() const { - return totalBytesWritten_; + int64_t partitionBufferSize() const { + return partitionBufferPool_->bytes_allocated(); } - int64_t totalBytesEvicted() const { - return totalBytesEvicted_; + int64_t totalBytesWritten() const { + return metrics_.totalBytesWritten; } - int64_t partitionBufferSize() const { - return partitionBufferPool_->bytes_allocated(); + int64_t totalBytesEvicted() const { + return metrics_.totalBytesEvicted; } int64_t totalWriteTime() const { - return totalWriteTime_; + return metrics_.totalWriteTime; } int64_t totalEvictTime() const { - return totalEvictTime_; + return metrics_.totalEvictTime; } int64_t totalCompressTime() const { - return totalCompressTime_; + return metrics_.totalCompressTime; } const std::vector& partitionLengths() const { - return partitionLengths_; + return metrics_.partitionLengths; } const std::vector& rawPartitionLengths() const { - return rawPartitionLengths_; + return metrics_.rawPartitionLengths; } ShuffleWriterOptions& options() { return options_; } - void setPartitionLengths(int32_t index, int64_t length) { - partitionLengths_[index] = length; - } - - void setRawPartitionLength(int32_t index, int64_t length) { - rawPartitionLengths_[index] = length; - } - - void setTotalWriteTime(int64_t totalWriteTime) { - totalWriteTime_ = totalWriteTime; - } - - void setTotalBytesWritten(int64_t totalBytesWritten) { - totalBytesWritten_ = totalBytesWritten; - } - - void setTotalEvictTime(int64_t totalEvictTime) { - totalEvictTime_ = totalEvictTime; - } - - void setTotalBytesEvicted(int64_t totalBytesEvicted) { - totalBytesEvicted_ = totalBytesEvicted; - } - virtual const uint64_t cachedPayloadSize() const = 0; class PartitionWriter; @@ -181,15 +163,12 @@ class ShuffleWriter : public Evictable { : numPartitions_(numPartitions), partitionWriterCreator_(std::move(partitionWriterCreator)), options_(std::move(options)), - partitionBufferPool_(std::make_shared(options_.memory_pool)), - codec_(createArrowIpcCodec(options_.compression_type, options_.codec_backend)) {} + partitionBufferPool_(std::make_shared(options_.memory_pool)) { + options_.codec = createArrowIpcCodec(options_.compression_type, options_.codec_backend); + } virtual ~ShuffleWriter() = default; - std::shared_ptr writeSchema(); - - std::shared_ptr compressWriteSchema(); - int32_t numPartitions_; std::shared_ptr partitionWriterCreator_; @@ -199,25 +178,14 @@ class ShuffleWriter : public Evictable { // The actual allocation is delegated to options_.memory_pool. std::shared_ptr partitionBufferPool_; - int64_t totalBytesWritten_ = 0; - int64_t totalBytesEvicted_ = 0; - int64_t totalWriteTime_ = 0; - int64_t totalEvictTime_ = 0; - int64_t totalCompressTime_ = 0; - - std::vector partitionLengths_; - std::vector rawPartitionLengths_; // Uncompressed size. - - std::unique_ptr codec_; - std::shared_ptr schema_; - std::shared_ptr writeSchema_; - std::shared_ptr compressWriteSchema_; // col partid std::vector>>> partitionBuffers_; std::shared_ptr partitioner_; + + ShuffleWriterMetrics metrics_{}; }; } // namespace gluten diff --git a/cpp/core/shuffle/Utils.cc b/cpp/core/shuffle/Utils.cc index 9255d679801c1..f959e2a5cdfe7 100644 --- a/cpp/core/shuffle/Utils.cc +++ b/cpp/core/shuffle/Utils.cc @@ -16,9 +16,7 @@ */ #include "shuffle/Utils.h" -#include "Options.h" -#include "utils/StringUtil.h" - +#include #include #include #include @@ -27,6 +25,194 @@ #include #include #include +#include "shuffle/Options.h" +#include "utils/Timer.h" + +namespace gluten { +namespace { +arrow::Result> makeNullBinaryArray( + std::shared_ptr type, + arrow::MemoryPool* pool) { + ARROW_ASSIGN_OR_RAISE(auto offsetBuffer, arrow::AllocateResizableBuffer(kSizeOfIpcOffsetBuffer << 1, pool)); + // set the first offset to 0, and set the value offset + uint8_t* offsetaddr = offsetBuffer->mutable_data(); + memset(offsetaddr, 0, kSizeOfIpcOffsetBuffer); + // second value offset 0 + memset(offsetaddr + kSizeOfIpcOffsetBuffer, 0, kSizeOfIpcOffsetBuffer); + // If it is not compressed array, null valueBuffer + // worked, but if compress, will core dump at buffer::size(), so replace by kNullBuffer + static std::shared_ptr kNullBuffer = std::make_shared(nullptr, 0); + return arrow::MakeArray(arrow::ArrayData::Make(type, 1, {nullptr, std::move(offsetBuffer), kNullBuffer})); +} + +arrow::Result> makeBinaryArray( + std::shared_ptr type, + std::shared_ptr valueBuffer, + arrow::MemoryPool* pool) { + if (valueBuffer == nullptr) { + return makeNullBinaryArray(type, pool); + } + + ARROW_ASSIGN_OR_RAISE(auto offsetBuffer, arrow::AllocateResizableBuffer(kSizeOfIpcOffsetBuffer << 1, pool)); + // set the first offset to 0, and set the value offset + uint8_t* offsetaddr = offsetBuffer->mutable_data(); + memset(offsetaddr, 0, kSizeOfIpcOffsetBuffer); + int64_t length = valueBuffer->size(); + memcpy(offsetaddr + kSizeOfIpcOffsetBuffer, reinterpret_cast(&length), kSizeOfIpcOffsetBuffer); + return arrow::MakeArray(arrow::ArrayData::Make(type, 1, {nullptr, std::move(offsetBuffer), valueBuffer})); +} + +// Length buffer layout |compressionMode|buffers.size()|buffer1 unCompressedLength|buffer1 compressedLength| buffer2... +arrow::Status getLengthBufferAndValueBufferOneByOne( + const std::vector>& buffers, + arrow::MemoryPool* pool, + arrow::util::Codec* codec, + std::shared_ptr& lengthBuffer, + std::shared_ptr& valueBuffer) { + ARROW_ASSIGN_OR_RAISE( + lengthBuffer, arrow::AllocateResizableBuffer((1 + 1 + buffers.size() * 2) * sizeof(int64_t), pool)); + auto lengthBufferPtr = (int64_t*)(lengthBuffer->mutable_data()); + // Write compression mode. + *lengthBufferPtr++ = CompressionMode::BUFFER; + // Write number of buffers. + *lengthBufferPtr++ = buffers.size(); + + int64_t compressedBufferMaxSize = getMaxCompressedBufferSize(buffers, codec); + ARROW_ASSIGN_OR_RAISE(valueBuffer, arrow::AllocateResizableBuffer(compressedBufferMaxSize, pool)); + int64_t compressValueOffset = 0; + for (auto& buffer : buffers) { + if (buffer != nullptr && buffer->size() != 0) { + int64_t actualLength; + int64_t maxLength = codec->MaxCompressedLen(buffer->size(), nullptr); + ARROW_ASSIGN_OR_RAISE( + actualLength, + codec->Compress( + buffer->size(), buffer->data(), maxLength, valueBuffer->mutable_data() + compressValueOffset)); + compressValueOffset += actualLength; + *lengthBufferPtr++ = buffer->size(); + *lengthBufferPtr++ = actualLength; + } else { + *lengthBufferPtr++ = 0; + *lengthBufferPtr++ = 0; + } + } + RETURN_NOT_OK(valueBuffer->Resize(compressValueOffset, /*shrink*/ true)); + return arrow::Status::OK(); +} + +// Length buffer layout |compressionMode|buffer unCompressedLength|buffer compressedLength|buffers.size()| buffer1 size +// | buffer2 size +arrow::Status getLengthBufferAndValueBufferStream( + const std::vector>& buffers, + arrow::MemoryPool* pool, + arrow::util::Codec* codec, + std::shared_ptr& lengthBuffer, + std::shared_ptr& compressedBuffer) { + ARROW_ASSIGN_OR_RAISE(lengthBuffer, arrow::AllocateResizableBuffer((1 + 3 + buffers.size()) * sizeof(int64_t), pool)); + auto originalBufferSize = getBufferSize(buffers); + + // because 64B align, uncompressedBuffer size maybe bigger than unCompressedBufferSize which is + // getBuffersSize(buffers), then cannot use this size + ARROW_ASSIGN_OR_RAISE(auto uncompressedBuffer, arrow::AllocateResizableBuffer(originalBufferSize, pool)); + int64_t uncompressedSize = uncompressedBuffer->size(); + + auto lengthBufferPtr = (int64_t*)(lengthBuffer->mutable_data()); + // First write metadata. + // Write compression mode. + *lengthBufferPtr++ = CompressionMode::ROWVECTOR; + // Store uncompressed size. + *lengthBufferPtr++ = uncompressedSize; // uncompressedLength + // Skip compressed size and update later. + auto compressedLengthPtr = lengthBufferPtr++; + // Store number of buffers. + *lengthBufferPtr++ = buffers.size(); + + int64_t compressValueOffset = 0; + for (auto& buffer : buffers) { + // Copy all buffers into one big buffer. + if (buffer != nullptr && buffer->size() != 0) { + *lengthBufferPtr++ = buffer->size(); + memcpy(uncompressedBuffer->mutable_data() + compressValueOffset, buffer->data(), buffer->size()); + compressValueOffset += buffer->size(); + } else { + *lengthBufferPtr++ = 0; + } + } + + // Compress the big buffer. + int64_t maxLength = codec->MaxCompressedLen(uncompressedSize, nullptr); + ARROW_ASSIGN_OR_RAISE(compressedBuffer, arrow::AllocateResizableBuffer(maxLength, pool)); + ARROW_ASSIGN_OR_RAISE( + int64_t actualLength, + codec->Compress(uncompressedSize, uncompressedBuffer->data(), maxLength, compressedBuffer->mutable_data())); + RETURN_NOT_OK(compressedBuffer->Resize(actualLength, /*shrink*/ true)); + + // Update compressed size. + *compressedLengthPtr = actualLength; + return arrow::Status::OK(); +} +} // namespace + +arrow::Result> makeCompressedRecordBatch( + uint32_t numRows, + const std::vector>& buffers, + const std::shared_ptr compressWriteSchema, + arrow::MemoryPool* pool, + arrow::util::Codec* codec, + int32_t bufferCompressThreshold, + CompressionMode compressionMode, + int64_t& compressionTime) { + ScopedTimer{compressionTime}; + std::vector> arrays; + // header col, numRows, compressionType + { + ARROW_ASSIGN_OR_RAISE(auto headerBuffer, arrow::AllocateResizableBuffer(sizeof(uint32_t) + sizeof(int32_t), pool)); + memcpy(headerBuffer->mutable_data(), &numRows, sizeof(uint32_t)); + int32_t compressType = static_cast(codec->compression_type()); + memcpy(headerBuffer->mutable_data() + sizeof(uint32_t), &compressType, sizeof(int32_t)); + arrays.emplace_back(); + ARROW_ASSIGN_OR_RAISE( + arrays.back(), makeBinaryArray(compressWriteSchema->field(0)->type(), std::move(headerBuffer), pool)); + } + std::shared_ptr lengthBuffer; + std::shared_ptr valueBuffer; + if (compressionMode == CompressionMode::BUFFER && numRows > bufferCompressThreshold) { + RETURN_NOT_OK(getLengthBufferAndValueBufferOneByOne(buffers, pool, codec, lengthBuffer, valueBuffer)); + } else { + RETURN_NOT_OK(getLengthBufferAndValueBufferStream(buffers, pool, codec, lengthBuffer, valueBuffer)); + } + + arrays.emplace_back(); + ARROW_ASSIGN_OR_RAISE(arrays.back(), makeBinaryArray(compressWriteSchema->field(1)->type(), lengthBuffer, pool)); + arrays.emplace_back(); + ARROW_ASSIGN_OR_RAISE(arrays.back(), makeBinaryArray(compressWriteSchema->field(2)->type(), valueBuffer, pool)); + return arrow::RecordBatch::Make(compressWriteSchema, 1, {arrays}); +} + +arrow::Result> makeUncompressedRecordBatch( + uint32_t numRows, + const std::vector>& buffers, + const std::shared_ptr writeSchema, + arrow::MemoryPool* pool) { + std::vector> arrays; + // header col, numRows, compressionType + { + ARROW_ASSIGN_OR_RAISE(auto headerBuffer, arrow::AllocateResizableBuffer(sizeof(uint32_t) + sizeof(int32_t), pool)); + memcpy(headerBuffer->mutable_data(), &numRows, sizeof(uint32_t)); + int32_t compressType = static_cast(arrow::Compression::type::UNCOMPRESSED); + memcpy(headerBuffer->mutable_data() + sizeof(uint32_t), &compressType, sizeof(int32_t)); + arrays.emplace_back(); + ARROW_ASSIGN_OR_RAISE(arrays.back(), makeBinaryArray(writeSchema->field(0)->type(), std::move(headerBuffer), pool)); + } + + int32_t bufferNum = writeSchema->num_fields() - 1; + for (int32_t i = 0; i < bufferNum; i++) { + arrays.emplace_back(); + ARROW_ASSIGN_OR_RAISE(arrays.back(), makeBinaryArray(writeSchema->field(i + 1)->type(), buffers[i], pool)); + } + return arrow::RecordBatch::Make(writeSchema, 1, {arrays}); +} +} // namespace gluten std::string gluten::generateUuid() { boost::uuids::random_generator generator; @@ -115,17 +301,29 @@ arrow::Result>> gluten::toShuffleWr return shuffleWriterTypeId; } -int64_t gluten::getBufferSizes(const std::shared_ptr& array) { - return gluten::getBufferSizes(array->data()->buffers); +int64_t gluten::getBufferSize(const std::shared_ptr& array) { + return gluten::getBufferSize(array->data()->buffers); } -int64_t gluten::getBufferSizes(const std::vector>& buffers) { +int64_t gluten::getBufferSize(const std::vector>& buffers) { return std::accumulate( std::cbegin(buffers), std::cend(buffers), 0LL, [](int64_t sum, const std::shared_ptr& buf) { return buf == nullptr ? sum : sum + buf->size(); }); } +int64_t gluten::getMaxCompressedBufferSize( + const std::vector>& buffers, + arrow::util::Codec* codec) { + int64_t totalSize = 0; + for (auto& buffer : buffers) { + if (buffer != nullptr && buffer->size() != 0) { + totalSize += codec->MaxCompressedLen(buffer->size(), nullptr); + } + } + return totalSize; +} + arrow::Status gluten::writeEos(arrow::io::OutputStream* os, int64_t* bytes) { // write EOS static constexpr int32_t kIpcContinuationToken = -1; diff --git a/cpp/core/shuffle/Utils.h b/cpp/core/shuffle/Utils.h index 9c104d759828d..fde8a7e0cc7cf 100644 --- a/cpp/core/shuffle/Utils.h +++ b/cpp/core/shuffle/Utils.h @@ -25,10 +25,16 @@ #include #include #include +#include "utils/Compression.h" namespace gluten { -const std::string kGlutenSparkLocalDirs = "GLUTEN_SPARK_LOCAL_DIRS"; +using BinaryArrayLengthBufferType = uint32_t; +using IpcOffsetBufferType = arrow::LargeStringType::offset_type; + +static const size_t kSizeOfBinaryArrayLengthBuffer = sizeof(BinaryArrayLengthBufferType); +static const size_t kSizeOfIpcOffsetBuffer = sizeof(IpcOffsetBufferType); +static const std::string kGlutenSparkLocalDirs = "GLUTEN_SPARK_LOCAL_DIRS"; std::string generateUuid(); @@ -39,10 +45,31 @@ arrow::Result createTempShuffleFile(const std::string& dir); arrow::Result>> toShuffleWriterTypeId( const std::vector>& fields); -int64_t getBufferSizes(const std::shared_ptr& array); +int64_t getBufferSize(const std::shared_ptr& array); + +int64_t getBufferSize(const std::vector>& buffers); -int64_t getBufferSizes(const std::vector>& buffers); +int64_t getMaxCompressedBufferSize( + const std::vector>& buffers, + arrow::util::Codec* codec); arrow::Status writeEos(arrow::io::OutputStream* os, int64_t* bytes); +arrow::Result> makeCompressedRecordBatch( + uint32_t numRows, + const std::vector>& buffers, + const std::shared_ptr compressWriteSchema, + arrow::MemoryPool* pool, + arrow::util::Codec* codec, + int32_t bufferCompressThreshold, + CompressionMode compressionMode, + int64_t& compressionTime); + +// generate the new big one row several columns binary recordbatch +arrow::Result> makeUncompressedRecordBatch( + uint32_t numRows, + const std::vector>& buffers, + const std::shared_ptr writeSchema, + arrow::MemoryPool* pool); + } // namespace gluten diff --git a/cpp/core/shuffle/rss/CelebornPartitionWriter.cc b/cpp/core/shuffle/rss/CelebornPartitionWriter.cc index e3ccbeec3161d..d6d7cb28c6445 100644 --- a/cpp/core/shuffle/rss/CelebornPartitionWriter.cc +++ b/cpp/core/shuffle/rss/CelebornPartitionWriter.cc @@ -16,24 +16,23 @@ */ #include "CelebornPartitionWriter.h" +#include "shuffle/Utils.h" +#include "utils/Timer.h" namespace gluten { -class CelebornEvictHandle final : public EvictHandle { +class CelebornEvictHandle final : public Evictor { public: - CelebornEvictHandle( - int64_t bufferSize, - const arrow::ipc::IpcWriteOptions& options, - arrow::MemoryPool* pool, - RssClient* client, - std::vector& bytesEvicted) - : bufferSize_(bufferSize), options_(options), pool_(pool), client_(client), bytesEvicted_(bytesEvicted) {} + CelebornEvictHandle(ShuffleWriterOptions* options, RssClient* client, std::vector& bytesEvicted) + : Evictor(options), client_(client), bytesEvicted_(bytesEvicted) {} arrow::Status evict(uint32_t partitionId, std::unique_ptr payload) override { // Copy payload to arrow buffered os. - ARROW_ASSIGN_OR_RAISE(auto celebornBufferOs, arrow::io::BufferOutputStream::Create(bufferSize_, pool_)); + ARROW_ASSIGN_OR_RAISE( + auto celebornBufferOs, arrow::io::BufferOutputStream::Create(options_->buffer_size, options_->memory_pool)); int32_t metadataLength = 0; // unused - RETURN_NOT_OK(arrow::ipc::WriteIpcPayload(*payload, options_, celebornBufferOs.get(), &metadataLength)); + RETURN_NOT_OK( + arrow::ipc::WriteIpcPayload(*payload, options_->ipc_write_options, celebornBufferOs.get(), &metadataLength)); payload = nullptr; // Invalidate payload immediately. // Push. @@ -48,56 +47,63 @@ class CelebornEvictHandle final : public EvictHandle { } private: - int64_t bufferSize_; - arrow::ipc::IpcWriteOptions options_; - arrow::MemoryPool* pool_; RssClient* client_; - std::vector& bytesEvicted_; + std::vector& bytesEvicted_; }; arrow::Status CelebornPartitionWriter::init() { - const auto& options = shuffleWriter_->options(); - bytesEvicted_.resize(shuffleWriter_->numPartitions(), 0); - evictHandle_ = std::make_shared( - options.buffer_size, options.ipc_write_options, options.memory_pool, celebornClient_.get(), bytesEvicted_); + bytesEvicted_.resize(numPartitions_, 0); + rawPartitionLengths_.resize(numPartitions_, 0); + evictHandle_ = std::make_shared(options_, celebornClient_.get(), bytesEvicted_); return arrow::Status::OK(); } -arrow::Status CelebornPartitionWriter::stop() { - // push data and collect metrics - for (auto pid = 0; pid < shuffleWriter_->numPartitions(); ++pid) { - ARROW_ASSIGN_OR_RAISE(auto payload, shuffleWriter_->createPayloadFromBuffer(pid, false)); - if (payload) { - RETURN_NOT_OK(evictHandle_->evict(pid, std::move(payload))); - } - shuffleWriter_->setPartitionLengths(pid, bytesEvicted_[pid]); - shuffleWriter_->setTotalBytesWritten(shuffleWriter_->totalBytesWritten() + bytesEvicted_[pid]); - } +arrow::Status CelebornPartitionWriter::stop(ShuffleWriterMetrics* metrics) { + // Push data and collect metrics. + auto totalBytesEvicted = std::accumulate(bytesEvicted_.begin(), bytesEvicted_.end(), 0); celebornClient_->stop(); + // Populate metrics. + metrics->totalCompressTime += compressTime_; + metrics->totalEvictTime += evictHandle_->getEvictTime(); + metrics->totalWriteTime += writeTime_; + metrics->totalBytesEvicted += totalBytesEvicted; + metrics->totalBytesWritten += totalBytesEvicted; + metrics->partitionLengths = std::move(bytesEvicted_); + metrics->rawPartitionLengths = std::move(rawPartitionLengths_); return arrow::Status::OK(); } -arrow::Status CelebornPartitionWriter::requestNextEvict(bool flush) { - return arrow::Status::OK(); +arrow::Status CelebornPartitionWriter::finishEvict() { + return evictHandle_->finish(); } -EvictHandle* CelebornPartitionWriter::getEvictHandle() { - return evictHandle_.get(); +arrow::Status CelebornPartitionWriter::evict( + uint32_t partitionId, + uint32_t numRows, + std::vector> buffers, + Evictor::Type evictType) { + rawPartitionLengths_[partitionId] += getBufferSize(buffers); + ScopedTimer timer(evictTime_); + ARROW_ASSIGN_OR_RAISE(auto payload, createPayloadFromBuffers(numRows, std::move(buffers))); + RETURN_NOT_OK(evictHandle_->evict(partitionId, std::move(payload))); + return arrow::Status::OK(); } -arrow::Status CelebornPartitionWriter::finishEvict() { - return evictHandle_->finish(); +arrow::Status CelebornPartitionWriter::evictFixedSize(int64_t size, int64_t* actual) { + *actual = 0; + return arrow::Status::OK(); } CelebornPartitionWriterCreator::CelebornPartitionWriterCreator(std::shared_ptr client) : PartitionWriterCreator(), client_(client) {} arrow::Result> CelebornPartitionWriterCreator::make( - ShuffleWriter* shuffleWriter) { - std::shared_ptr res(new CelebornPartitionWriter(shuffleWriter, client_)); - RETURN_NOT_OK(res->init()); - return res; + uint32_t numPartitions, + ShuffleWriterOptions* options) { + auto partitionWriter = std::make_shared(numPartitions, options, client_); + RETURN_NOT_OK(partitionWriter->init()); + return partitionWriter; } } // namespace gluten diff --git a/cpp/core/shuffle/rss/CelebornPartitionWriter.h b/cpp/core/shuffle/rss/CelebornPartitionWriter.h index 3ca73d6d7af4e..a3be4f46a17e3 100644 --- a/cpp/core/shuffle/rss/CelebornPartitionWriter.h +++ b/cpp/core/shuffle/rss/CelebornPartitionWriter.h @@ -29,34 +29,44 @@ namespace gluten { class CelebornPartitionWriter final : public RemotePartitionWriter { public: - CelebornPartitionWriter(ShuffleWriter* shuffleWriter, std::shared_ptr celebornClient) - : RemotePartitionWriter(shuffleWriter) { + CelebornPartitionWriter( + uint32_t numPartitions, + ShuffleWriterOptions* options, + std::shared_ptr celebornClient) + : RemotePartitionWriter(numPartitions, options) { celebornClient_ = celebornClient; } - arrow::Status requestNextEvict(bool flush /*unused*/) override; - - EvictHandle* getEvictHandle() override; + arrow::Status evict( + uint32_t partitionId, + uint32_t numRows, + std::vector> buffers, + Evictor::Type evictType /* unused */) override; arrow::Status finishEvict() override; arrow::Status init() override; - arrow::Status stop() override; + arrow::Status stop(ShuffleWriterMetrics* metrics) override; + + arrow::Status evictFixedSize(int64_t size, int64_t* actual) override; private: std::shared_ptr celebornClient_; - std::shared_ptr evictHandle_; + std::shared_ptr evictHandle_; - std::vector bytesEvicted_; + std::vector bytesEvicted_; + std::vector rawPartitionLengths_; }; class CelebornPartitionWriterCreator : public ShuffleWriter::PartitionWriterCreator { public: explicit CelebornPartitionWriterCreator(std::shared_ptr client); - arrow::Result> make(ShuffleWriter* shuffleWriter) override; + arrow::Result> make( + uint32_t numPartitions, + ShuffleWriterOptions* options) override; private: std::shared_ptr client_; diff --git a/cpp/core/shuffle/rss/RemotePartitionWriter.h b/cpp/core/shuffle/rss/RemotePartitionWriter.h index 8ca4cf1a87f71..76b1a0bc5ca75 100644 --- a/cpp/core/shuffle/rss/RemotePartitionWriter.h +++ b/cpp/core/shuffle/rss/RemotePartitionWriter.h @@ -23,7 +23,8 @@ namespace gluten { class RemotePartitionWriter : public ShuffleWriter::PartitionWriter { public: - explicit RemotePartitionWriter(ShuffleWriter* shuffleWriter) : PartitionWriter(shuffleWriter) {} + explicit RemotePartitionWriter(uint32_t numPartitions, ShuffleWriterOptions* options) + : PartitionWriter(numPartitions, options) {} }; } // namespace gluten diff --git a/cpp/velox/CMakeLists.txt b/cpp/velox/CMakeLists.txt index 0d4a99dfd658b..d776cc36c3e84 100644 --- a/cpp/velox/CMakeLists.txt +++ b/cpp/velox/CMakeLists.txt @@ -287,8 +287,7 @@ set(VELOX_SRCS operators/serializer/VeloxRowToColumnarConverter.cc operators/writer/VeloxParquetDatasource.cc shuffle/VeloxShuffleReader.cc - shuffle/VeloxShuffleUtils.cc - shuffle/VeloxShuffleWriter.cc + shuffle/VeloxShuffleWriter.cc substrait/SubstraitParser.cc substrait/SubstraitToVeloxExpr.cc substrait/SubstraitToVeloxPlan.cc @@ -303,7 +302,7 @@ set(VELOX_SRCS utils/VeloxArrowUtils.cc utils/ConfigExtractor.cc utils/Common.cc - ) + ) if(BUILD_TESTS OR BUILD_BENCHMARKS) list(APPEND VELOX_SRCS utils/tests/MemoryPoolUtils.cc) diff --git a/cpp/velox/shuffle/VeloxShuffleReader.cc b/cpp/velox/shuffle/VeloxShuffleReader.cc index dfff284ad67ce..8aa8df34c6639 100644 --- a/cpp/velox/shuffle/VeloxShuffleReader.cc +++ b/cpp/velox/shuffle/VeloxShuffleReader.cc @@ -19,8 +19,8 @@ #include -#include "VeloxShuffleUtils.h" #include "memory/VeloxColumnarBatch.h" +#include "shuffle/Utils.h" #include "utils/Common.h" #include "utils/Compression.h" #include "utils/VeloxArrowUtils.h" diff --git a/cpp/velox/shuffle/VeloxShuffleUtils.cc b/cpp/velox/shuffle/VeloxShuffleUtils.cc deleted file mode 100644 index d4a7fd317c380..0000000000000 --- a/cpp/velox/shuffle/VeloxShuffleUtils.cc +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "shuffle/VeloxShuffleUtils.h" -#include -#include - -int64_t gluten::getMaxCompressedBufferSize( - const std::vector>& buffers, - arrow::util::Codec* codec) { - int64_t totalSize = 0; - for (auto& buffer : buffers) { - if (buffer != nullptr && buffer->size() != 0) { - totalSize += codec->MaxCompressedLen(buffer->size(), nullptr); - } - } - return totalSize; -} - -int64_t gluten::getBuffersSize(const std::vector>& buffers) { - int64_t totalSize = 0; - for (auto& buffer : buffers) { - if (buffer != nullptr) { - totalSize += buffer->size(); - } - } - return totalSize; -} diff --git a/cpp/velox/shuffle/VeloxShuffleUtils.h b/cpp/velox/shuffle/VeloxShuffleUtils.h deleted file mode 100644 index d679933ec34b4..0000000000000 --- a/cpp/velox/shuffle/VeloxShuffleUtils.h +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include - -namespace gluten { - -using BinaryArrayLengthBufferType = uint32_t; -using IpcOffsetBufferType = arrow::LargeStringType::offset_type; - -static const size_t kSizeOfBinaryArrayLengthBuffer = sizeof(BinaryArrayLengthBufferType); -static const size_t kSizeOfIpcOffsetBuffer = sizeof(IpcOffsetBufferType); - -int64_t getBuffersSize(const std::vector>& buffers); - -int64_t getMaxCompressedBufferSize( - const std::vector>& buffers, - arrow::util::Codec* codec); - -} // namespace gluten diff --git a/cpp/velox/shuffle/VeloxShuffleWriter.cc b/cpp/velox/shuffle/VeloxShuffleWriter.cc index 7955d24b1f40d..dd99ec8eeac97 100644 --- a/cpp/velox/shuffle/VeloxShuffleWriter.cc +++ b/cpp/velox/shuffle/VeloxShuffleWriter.cc @@ -16,11 +16,12 @@ */ #include "VeloxShuffleWriter.h" -#include "VeloxShuffleUtils.h" #include "memory/ArrowMemory.h" #include "memory/VeloxColumnarBatch.h" #include "memory/VeloxMemoryManager.h" #include "shuffle/Partitioner.h" +#include "shuffle/ShuffleSchema.h" +#include "shuffle/Utils.h" #include "utils/Common.h" #include "utils/Compression.h" #include "utils/Timer.h" @@ -99,189 +100,6 @@ const int32_t* getFirstColumn(const facebook::velox::RowVector& rv) { return firstChild->asFlatVector()->rawValues(); } -arrow::Result> makeNullBinaryArray( - std::shared_ptr type, - arrow::MemoryPool* pool) { - ARROW_ASSIGN_OR_RAISE(auto offsetBuffer, arrow::AllocateResizableBuffer(kSizeOfIpcOffsetBuffer << 1, pool)); - // set the first offset to 0, and set the value offset - uint8_t* offsetaddr = offsetBuffer->mutable_data(); - memset(offsetaddr, 0, kSizeOfIpcOffsetBuffer); - // second value offset 0 - memset(offsetaddr + kSizeOfIpcOffsetBuffer, 0, kSizeOfIpcOffsetBuffer); - // If it is not compressed array, null valueBuffer - // worked, but if compress, will core dump at buffer::size(), so replace by kNullBuffer - static std::shared_ptr kNullBuffer = std::make_shared(nullptr, 0); - return arrow::MakeArray(arrow::ArrayData::Make(type, 1, {nullptr, std::move(offsetBuffer), kNullBuffer})); -} - -arrow::Result> makeBinaryArray( - std::shared_ptr type, - std::shared_ptr valueBuffer, - arrow::MemoryPool* pool) { - if (valueBuffer == nullptr) { - return makeNullBinaryArray(type, pool); - } - - ARROW_ASSIGN_OR_RAISE(auto offsetBuffer, arrow::AllocateResizableBuffer(kSizeOfIpcOffsetBuffer << 1, pool)); - // set the first offset to 0, and set the value offset - uint8_t* offsetaddr = offsetBuffer->mutable_data(); - memset(offsetaddr, 0, kSizeOfIpcOffsetBuffer); - int64_t length = valueBuffer->size(); - memcpy(offsetaddr + kSizeOfIpcOffsetBuffer, reinterpret_cast(&length), kSizeOfIpcOffsetBuffer); - return arrow::MakeArray(arrow::ArrayData::Make(type, 1, {nullptr, std::move(offsetBuffer), valueBuffer})); -} - -// Length buffer layout |compressionMode|buffers.size()|buffer1 unCompressedLength|buffer1 compressedLength| buffer2... -arrow::Status getLengthBufferAndValueBufferOneByOne( - const std::vector>& buffers, - arrow::MemoryPool* pool, - arrow::util::Codec* codec, - std::shared_ptr& lengthBuffer, - std::shared_ptr& valueBuffer) { - ARROW_ASSIGN_OR_RAISE( - lengthBuffer, arrow::AllocateResizableBuffer((1 + 1 + buffers.size() * 2) * sizeof(int64_t), pool)); - auto lengthBufferPtr = (int64_t*)(lengthBuffer->mutable_data()); - // Write compression mode. - *lengthBufferPtr++ = CompressionMode::BUFFER; - // Write number of buffers. - *lengthBufferPtr++ = buffers.size(); - - int64_t compressedBufferMaxSize = getMaxCompressedBufferSize(buffers, codec); - ARROW_ASSIGN_OR_RAISE(valueBuffer, arrow::AllocateResizableBuffer(compressedBufferMaxSize, pool)); - int64_t compressValueOffset = 0; - for (auto& buffer : buffers) { - if (buffer != nullptr && buffer->size() != 0) { - int64_t actualLength; - int64_t maxLength = codec->MaxCompressedLen(buffer->size(), nullptr); - ARROW_ASSIGN_OR_RAISE( - actualLength, - codec->Compress( - buffer->size(), buffer->data(), maxLength, valueBuffer->mutable_data() + compressValueOffset)); - compressValueOffset += actualLength; - *lengthBufferPtr++ = buffer->size(); - *lengthBufferPtr++ = actualLength; - } else { - *lengthBufferPtr++ = 0; - *lengthBufferPtr++ = 0; - } - } - RETURN_NOT_OK(valueBuffer->Resize(compressValueOffset, /*shrink*/ true)); - return arrow::Status::OK(); -} - -// Length buffer layout |compressionMode|buffer unCompressedLength|buffer compressedLength|buffers.size()| buffer1 size -// | buffer2 size -arrow::Status getLengthBufferAndValueBufferStream( - const std::vector>& buffers, - arrow::MemoryPool* pool, - arrow::util::Codec* codec, - std::shared_ptr& lengthBuffer, - std::shared_ptr& compressedBuffer) { - ARROW_ASSIGN_OR_RAISE(lengthBuffer, arrow::AllocateResizableBuffer((1 + 3 + buffers.size()) * sizeof(int64_t), pool)); - auto originalBufferSize = getBuffersSize(buffers); - - // because 64B align, uncompressedBuffer size maybe bigger than unCompressedBufferSize which is - // getBuffersSize(buffers), then cannot use this size - ARROW_ASSIGN_OR_RAISE(auto uncompressedBuffer, arrow::AllocateResizableBuffer(originalBufferSize, pool)); - int64_t uncompressedSize = uncompressedBuffer->size(); - - auto lengthBufferPtr = (int64_t*)(lengthBuffer->mutable_data()); - // First write metadata. - // Write compression mode. - *lengthBufferPtr++ = CompressionMode::ROWVECTOR; - // Store uncompressed size. - *lengthBufferPtr++ = uncompressedSize; // uncompressedLength - // Skip compressed size and update later. - auto compressedLengthPtr = lengthBufferPtr++; - // Store number of buffers. - *lengthBufferPtr++ = buffers.size(); - - int64_t compressValueOffset = 0; - for (auto& buffer : buffers) { - // Copy all buffers into one big buffer. - if (buffer != nullptr && buffer->size() != 0) { - *lengthBufferPtr++ = buffer->size(); - gluten::fastCopy(uncompressedBuffer->mutable_data() + compressValueOffset, buffer->data(), buffer->size()); - compressValueOffset += buffer->size(); - } else { - *lengthBufferPtr++ = 0; - } - } - - // Compress the big buffer. - int64_t maxLength = codec->MaxCompressedLen(uncompressedSize, nullptr); - ARROW_ASSIGN_OR_RAISE(compressedBuffer, arrow::AllocateResizableBuffer(maxLength, pool)); - ARROW_ASSIGN_OR_RAISE( - int64_t actualLength, - codec->Compress(uncompressedSize, uncompressedBuffer->data(), maxLength, compressedBuffer->mutable_data())); - RETURN_NOT_OK(compressedBuffer->Resize(actualLength, /*shrink*/ true)); - - // Update compressed size. - *compressedLengthPtr = actualLength; - return arrow::Status::OK(); -} - -arrow::Result> makeCompressedRecordBatch( - uint32_t numRows, - const std::vector>& buffers, - const std::shared_ptr compressWriteSchema, - arrow::MemoryPool* pool, - arrow::util::Codec* codec, - int32_t bufferCompressThreshold, - CompressionMode compressionMode, - int64_t& compressionTime) { - ScopedTimer{compressionTime}; - std::vector> arrays; - // header col, numRows, compressionType - { - ARROW_ASSIGN_OR_RAISE(auto headerBuffer, arrow::AllocateResizableBuffer(sizeof(uint32_t) + sizeof(int32_t), pool)); - memcpy(headerBuffer->mutable_data(), &numRows, sizeof(uint32_t)); - int32_t compressType = static_cast(codec->compression_type()); - memcpy(headerBuffer->mutable_data() + sizeof(uint32_t), &compressType, sizeof(int32_t)); - arrays.emplace_back(); - ARROW_ASSIGN_OR_RAISE( - arrays.back(), makeBinaryArray(compressWriteSchema->field(0)->type(), std::move(headerBuffer), pool)); - } - std::shared_ptr lengthBuffer; - std::shared_ptr valueBuffer; - if (compressionMode == CompressionMode::BUFFER && numRows > bufferCompressThreshold) { - RETURN_NOT_OK(getLengthBufferAndValueBufferOneByOne(buffers, pool, codec, lengthBuffer, valueBuffer)); - } else { - RETURN_NOT_OK(getLengthBufferAndValueBufferStream(buffers, pool, codec, lengthBuffer, valueBuffer)); - } - - arrays.emplace_back(); - ARROW_ASSIGN_OR_RAISE(arrays.back(), makeBinaryArray(compressWriteSchema->field(1)->type(), lengthBuffer, pool)); - arrays.emplace_back(); - ARROW_ASSIGN_OR_RAISE(arrays.back(), makeBinaryArray(compressWriteSchema->field(2)->type(), valueBuffer, pool)); - return arrow::RecordBatch::Make(compressWriteSchema, 1, {arrays}); -} - -// generate the new big one row several columns binary recordbatch -arrow::Result> makeUncompressedRecordBatch( - uint32_t numRows, - const std::vector>& buffers, - const std::shared_ptr writeSchema, - arrow::MemoryPool* pool) { - std::vector> arrays; - // header col, numRows, compressionType - { - ARROW_ASSIGN_OR_RAISE(auto headerBuffer, arrow::AllocateResizableBuffer(sizeof(uint32_t) + sizeof(int32_t), pool)); - memcpy(headerBuffer->mutable_data(), &numRows, sizeof(uint32_t)); - int32_t compressType = static_cast(arrow::Compression::type::UNCOMPRESSED); - memcpy(headerBuffer->mutable_data() + sizeof(uint32_t), &compressType, sizeof(int32_t)); - arrays.emplace_back(); - ARROW_ASSIGN_OR_RAISE(arrays.back(), makeBinaryArray(writeSchema->field(0)->type(), std::move(headerBuffer), pool)); - } - - int32_t bufferNum = writeSchema->num_fields() - 1; - for (int32_t i = 0; i < bufferNum; i++) { - arrays.emplace_back(); - ARROW_ASSIGN_OR_RAISE(arrays.back(), makeBinaryArray(writeSchema->field(i + 1)->type(), buffers[i], pool)); - } - return arrow::RecordBatch::Make(writeSchema, 1, {arrays}); -} - class EvictGuard { public: explicit EvictGuard(EvictState& evictState) : evictState_(evictState) { @@ -424,7 +242,7 @@ arrow::Status VeloxShuffleWriter::init() { // memory_pool should be assigned. VELOX_CHECK_NOT_NULL(options_.memory_pool); - ARROW_ASSIGN_OR_RAISE(partitionWriter_, partitionWriterCreator_->make(this)); + ARROW_ASSIGN_OR_RAISE(partitionWriter_, partitionWriterCreator_->make(numPartitions_, &options_)); ARROW_ASSIGN_OR_RAISE( partitioner_, Partitioner::make(options_.partitioning, numPartitions_, options_.start_partition_id)); @@ -438,9 +256,6 @@ arrow::Status VeloxShuffleWriter::init() { partitionBufferIdxBase_.resize(numPartitions_); - partitionLengths_.resize(numPartitions_); - rawPartitionLengths_.resize(numPartitions_); - return arrow::Status::OK(); } @@ -525,11 +340,7 @@ arrow::Status VeloxShuffleWriter::split(std::shared_ptr cb, int64 buffers.emplace_back(); ARROW_ASSIGN_OR_RAISE(buffers.back(), generateComplexTypeBuffers(rowVector)); } - - rawPartitionLengths_[0] += getBuffersSize(buffers); - ARROW_ASSIGN_OR_RAISE(auto rb, makeRecordBatch(rv.size(), buffers)); - ARROW_ASSIGN_OR_RAISE(auto payload, createPayload(*rb, false)); - RETURN_NOT_OK(evictPayload(0, std::move(payload))); + RETURN_NOT_OK(evictBuffers(0, rv.size(), std::move(buffers))); } else if (options_.partitioning == Partitioning::kRange) { auto compositeBatch = std::dynamic_pointer_cast(cb); VELOX_CHECK_NOT_NULL(compositeBatch); @@ -571,10 +382,17 @@ arrow::Status VeloxShuffleWriter::split(std::shared_ptr cb, int64 } arrow::Status VeloxShuffleWriter::stop() { + if (options_.partitioning != Partitioning::kSingle) { + for (auto pid = 0; pid < numPartitions_; ++pid) { + auto numRows = partitionBufferIdxBase_[pid]; + ARROW_ASSIGN_OR_RAISE(auto buffers, assembleBuffers(pid, false)); + RETURN_NOT_OK(partitionWriter_->evict(pid, numRows, std::move(buffers), Evictor::Type::kStop)); + } + } { SCOPED_TIMER(cpuWallTimingList_[CpuWallTimingStop]); setSplitState(SplitState::kStop); - RETURN_NOT_OK(partitionWriter_->stop()); + RETURN_NOT_OK(partitionWriter_->stop(&metrics_)); partitionBuffers_.clear(); } @@ -971,7 +789,6 @@ arrow::Status VeloxShuffleWriter::splitFixedWidthValueBuffer(const facebook::vel binaryBuf.valueOffset = valueOffset; } - return arrow::Status::OK(); } @@ -1029,6 +846,7 @@ arrow::Status VeloxShuffleWriter::splitFixedWidthValueBuffer(const facebook::vel arrow::Status VeloxShuffleWriter::initColumnTypes(const facebook::velox::RowVector& rv) { schema_ = toArrowSchema(rv.type(), veloxPool_.get()); + options_.write_schema = options_.codec ? toCompressWriteSchema() : toWriteSchema(*schema_); for (size_t i = 0; i < rv.childrenSize(); ++i) { veloxColumnTypes_.push_back(rv.childAt(i)->type()); @@ -1221,58 +1039,39 @@ arrow::Status VeloxShuffleWriter::splitFixedWidthValueBuffer(const facebook::vel return arrow::Status::OK(); } - arrow::Result> VeloxShuffleWriter::createPayloadFromBuffer( - uint32_t partitionId, bool reuseBuffers) { - ARROW_ASSIGN_OR_RAISE(auto rb, createArrowRecordBatchFromBuffer(partitionId, reuseBuffers)); - if (rb) { - ARROW_ASSIGN_OR_RAISE(auto payload, createPayload(*rb, reuseBuffers)); - return payload; + arrow::Status VeloxShuffleWriter::evictBuffers( + uint32_t partitionId, uint32_t numRows, std::vector> buffers) { + if (!buffers.empty()) { + RETURN_NOT_OK(partitionWriter_->evict(partitionId, numRows, std::move(buffers), Evictor::Type::kCache)); } - return nullptr; + return arrow::Status::OK(); } - arrow::Result> VeloxShuffleWriter::createPayload( - const arrow::RecordBatch& rb, bool reuseBuffers) { - auto payload = std::make_unique(); - // Extract numRows from header column - RETURN_NOT_OK(arrow::ipc::GetRecordBatchPayload(rb, options_.ipc_write_options, payload.get())); - if (codec_ == nullptr) { - // Without compression, we need to perform a manual copy of the original buffers - // so that we can reuse them for next split. - if (reuseBuffers) { - for (auto i = 0; i < payload->body_buffers.size(); ++i) { - auto& buffer = payload->body_buffers[i]; - if (buffer) { - ARROW_ASSIGN_OR_RAISE(auto copy, ::arrow::AllocateResizableBuffer(buffer->size(), payloadPool_.get())); - if (buffer->size() > 0) { - gluten::fastCopy(copy->mutable_data(), buffer->data(), static_cast(buffer->size())); - } - buffer = std::move(copy); - } - } - } + arrow::Status VeloxShuffleWriter::evictPartitionBuffers(uint32_t partitionId, bool reuseBuffers) { + auto numRows = partitionBufferIdxBase_[partitionId]; + if (numRows > 0) { + ARROW_ASSIGN_OR_RAISE(auto buffers, assembleBuffers(partitionId, reuseBuffers)); + RETURN_NOT_OK(evictBuffers(partitionId, numRows, buffers)); } - return payload; + return arrow::Status::OK(); } - arrow::Result> VeloxShuffleWriter::createArrowRecordBatchFromBuffer( + arrow::Result>> VeloxShuffleWriter::assembleBuffers( uint32_t partitionId, bool reuseBuffers) { SCOPED_TIMER(cpuWallTimingList_[CpuWallTimingCreateRbFromBuffer]); if (partitionBufferIdxBase_[partitionId] == 0) { - return nullptr; + return std::vector>{}; } auto numRows = partitionBufferIdxBase_[partitionId]; - - // already filled auto fixedWidthIdx = 0; auto binaryIdx = 0; auto numFields = schema_->num_fields(); std::vector> arrays(numFields); std::vector> allBuffers; - // one column should have 2 buffers at least, string column has 3 column buffers + // One column should have 2 buffers at least, string column has 3 column buffers. allBuffers.reserve(fixedWidthColumnCount_ * 2 + binaryColumnIndices_.size() * 3); bool hasComplexType = false; for (int i = 0; i < numFields; ++i) { @@ -1301,11 +1100,6 @@ arrow::Status VeloxShuffleWriter::splitFixedWidthValueBuffer(const facebook::vel if (reuseBuffers) { // Set the first value offset to 0. binaryBuf.valueOffset = 0; - } else { - // Reset all buffers. - partitionValidityAddrs_[fixedWidthColumnCount_ + binaryIdx][partitionId] = nullptr; - binaryBuf = BinaryBuf(); - partitionBuffers_[fixedWidthColumnCount_ + binaryIdx][partitionId].clear(); } binaryIdx++; break; @@ -1341,11 +1135,6 @@ arrow::Status VeloxShuffleWriter::splitFixedWidthValueBuffer(const facebook::vel arrow::SliceBuffer(valueBuffer, 0, numRows * (arrow::bit_width(arrowColumnTypes_[i]->id()) >> 3)); } allBuffers.push_back(std::move(slicedValueBuffer)); - if (!reuseBuffers) { - partitionValidityAddrs_[fixedWidthIdx][partitionId] = nullptr; - partitionFixedWidthValueAddrs_[fixedWidthIdx][partitionId] = nullptr; - partitionBuffers_[fixedWidthIdx][partitionId].clear(); - } fixedWidthIdx++; break; } @@ -1369,31 +1158,25 @@ arrow::Status VeloxShuffleWriter::splitFixedWidthValueBuffer(const facebook::vel arenas_[partitionId] = nullptr; } + partitionBufferIdxBase_[partitionId] = 0; if (!reuseBuffers) { - partition2BufferSize_[partitionId] = 0; + RETURN_NOT_OK(resetPartitionBuffer(partitionId)); } - partitionBufferIdxBase_[partitionId] = 0; - - rawPartitionLengths_[partitionId] += getBuffersSize(allBuffers); - return makeRecordBatch(numRows, allBuffers); - } - - arrow::Result> VeloxShuffleWriter::makeRecordBatch( - uint32_t numRows, const std::vector>& buffers) { - SCOPED_TIMER(cpuWallTimingList_[CpuWallTimingMakeRB]); - if (codec_ == nullptr) { - return makeUncompressedRecordBatch(numRows, buffers, writeSchema(), payloadPool_.get()); - } else { - return makeCompressedRecordBatch( - numRows, - buffers, - compressWriteSchema(), - payloadPool_.get(), - codec_.get(), - options_.compression_threshold, - options_.compression_mode, - totalCompressTime_); + if (options_.ipc_write_options.codec == nullptr && reuseBuffers) { + // Without compression, we need to perform a manual copy of the original buffers + // so that we can reuse them for next split. + for (auto i = 0; i < allBuffers.size(); ++i) { + auto& buffer = allBuffers[i]; + if (buffer) { + ARROW_ASSIGN_OR_RAISE(auto copy, arrow::AllocateResizableBuffer(buffer->size(), payloadPool_.get())); + if (buffer->size() > 0) { + gluten::fastCopy(copy->mutable_data(), buffer->data(), static_cast(buffer->size())); + } + buffer = std::move(copy); + } + } } + return allBuffers; } arrow::Status VeloxShuffleWriter::evictFixedSize(int64_t size, int64_t * actual) { @@ -1405,7 +1188,8 @@ arrow::Status VeloxShuffleWriter::splitFixedWidthValueBuffer(const facebook::vel int64_t reclaimed = 0; if (reclaimed < size && shrinkPartitionBuffersBeforeSpill()) { - ARROW_ASSIGN_OR_RAISE(auto shrunken, shrinkPartitionBuffers()); + int64_t shrunken = 0; + RETURN_NOT_OK(partitionWriter_->evictFixedSize(size - reclaimed, &shrunken)); reclaimed += shrunken; } if (reclaimed < size) { @@ -1431,12 +1215,7 @@ arrow::Status VeloxShuffleWriter::splitFixedWidthValueBuffer(const facebook::vel return 0; } auto evicted = beforeEvict; - - { - ScopedTimer evictTime(totalEvictTime_); - RETURN_NOT_OK(partitionWriter_->finishEvict()); - } - + RETURN_NOT_OK(partitionWriter_->finishEvict()); if (auto afterEvict = cachedPayloadSize()) { // Evict can be triggered by compressing buffers. The cachedPayloadSize is not empty. evicted -= afterEvict; @@ -1558,19 +1337,6 @@ arrow::Status VeloxShuffleWriter::splitFixedWidthValueBuffer(const facebook::vel return resizePartitionBuffer(partitionId, newSize, /*preserveData=*/true); } - arrow::Result VeloxShuffleWriter::shrinkPartitionBuffers() { - auto beforeShrink = partitionBufferPool_->bytes_allocated(); - if (beforeShrink == 0) { - return 0; - } - for (auto pid = 0; pid < numPartitions_; ++pid) { - RETURN_NOT_OK(shrinkPartitionBuffer(pid)); - } - auto shrunken = beforeShrink - partitionBufferPool_->bytes_allocated(); - VLOG(2) << shrunken << " bytes released from shrinking."; - return shrunken; - } - uint64_t VeloxShuffleWriter::valueBufferSizeForBinaryArray(uint32_t binaryIdx, int64_t newSize) { return (binaryArrayTotalSizeBytes_[binaryIdx] + totalInputNumRows_ - 1) / totalInputNumRows_ * newSize + 1024; } @@ -1630,24 +1396,6 @@ arrow::Status VeloxShuffleWriter::splitFixedWidthValueBuffer(const facebook::vel return payloadPool_->bytes_allocated(); } - arrow::Status VeloxShuffleWriter::evictPartitionBuffer(uint32_t partitionId, uint32_t newSize, bool reuseBuffers) { - ARROW_ASSIGN_OR_RAISE(auto payload, createPayloadFromBuffer(partitionId, reuseBuffers)); - if (payload) { - RETURN_NOT_OK(evictPayload(partitionId, std::move(payload))); - } - return arrow::Status::OK(); - } - - arrow::Status VeloxShuffleWriter::evictPayload( - uint32_t partitionId, std::unique_ptr payload) { - ScopedTimer spillTime(totalEvictTime_); - if (!partitionWriter_->getEvictHandle()) { - RETURN_NOT_OK(partitionWriter_->requestNextEvict(false)); - } - RETURN_NOT_OK(partitionWriter_->getEvictHandle()->evict(partitionId, std::move(payload))); - return arrow::Status::OK(); - } - arrow::Result VeloxShuffleWriter::shrinkPartitionBuffersMinSize(int64_t size) { // Sort partition buffers by (partition2BufferSize_ - partitionBufferIdxBase_) std::vector> pidToSize; @@ -1689,29 +1437,16 @@ arrow::Status VeloxShuffleWriter::splitFixedWidthValueBuffer(const facebook::vel pidToSize.emplace_back(pid, partition2BufferSize_[pid]); } if (!pidToSize.empty()) { - Timer spillTime; - spillTime.start(); - RETURN_NOT_OK(partitionWriter_->requestNextEvict(true)); - auto evictHandle = partitionWriter_->getEvictHandle(); - spillTime.stop(); - for (auto& item : pidToSize) { auto pid = item.first; - ARROW_ASSIGN_OR_RAISE(auto payload, createPayloadFromBuffer(pid, false)); - - spillTime.start(); - RETURN_NOT_OK(evictHandle->evict(pid, std::move(payload))); - spillTime.stop(); - + ARROW_ASSIGN_OR_RAISE(auto buffers, assembleBuffers(pid, false)); + RETURN_NOT_OK(partitionWriter_->evict(pid, item.second, std::move(buffers), Evictor::Type::kFlush)); evicted = beforeEvict - partitionBufferPool_->bytes_allocated(); if (evicted >= size) { break; } } - spillTime.start(); RETURN_NOT_OK(partitionWriter_->finishEvict()); - spillTime.stop(); - totalEvictTime_ += spillTime.realTimeUsed(); } return evicted; } @@ -1758,7 +1493,7 @@ arrow::Status VeloxShuffleWriter::splitFixedWidthValueBuffer(const facebook::vel } else if (beyondThreshold(pid, newSize)) { if (newSize <= partitionBufferIdxBase_[pid]) { // If the newSize is smaller, cache the buffered data and reuse and shrink the buffer. - RETURN_NOT_OK(evictPartitionBuffer(pid, newSize, true)); + RETURN_NOT_OK(evictPartitionBuffers(pid, true)); RETURN_NOT_OK(resizePartitionBuffer(pid, newSize, /*preserveData=*/false)); } else { // If the newSize is larger, check if alreadyFilled + toBeFilled <= newSize @@ -1773,7 +1508,7 @@ arrow::Status VeloxShuffleWriter::splitFixedWidthValueBuffer(const facebook::vel // If newSize <= allocated buffer size, reuse and shrink the buffer. // Else free and allocate new buffers. bool reuseBuffers = newSize <= partition2BufferSize_[pid]; - RETURN_NOT_OK(evictPartitionBuffer(pid, newSize, reuseBuffers)); + RETURN_NOT_OK(evictPartitionBuffers(pid, reuseBuffers)); if (reuseBuffers) { RETURN_NOT_OK(resizePartitionBuffer(pid, newSize, /*preserveData=*/false)); } else { @@ -1786,11 +1521,11 @@ arrow::Status VeloxShuffleWriter::splitFixedWidthValueBuffer(const facebook::vel // buffer. if (newSize > partition2BufferSize_[pid]) { // If the partition size after split is already larger than allocated buffer size, need reallocate. - RETURN_NOT_OK(evictPartitionBuffer(pid, newSize, false)); + RETURN_NOT_OK(evictPartitionBuffers(pid, false)); RETURN_NOT_OK(allocatePartitionBuffer(pid, newSize)); } else { // Partition size after split is smaller than buffer size. Reuse the buffers. - RETURN_NOT_OK(evictPartitionBuffer(pid, newSize, true)); + RETURN_NOT_OK(evictPartitionBuffers(pid, true)); // Reset validity buffer for reallocate. RETURN_NOT_OK(resetValidityBuffer(pid)); } diff --git a/cpp/velox/shuffle/VeloxShuffleWriter.h b/cpp/velox/shuffle/VeloxShuffleWriter.h index f257589fcd2d9..13872fb4f4080 100644 --- a/cpp/velox/shuffle/VeloxShuffleWriter.h +++ b/cpp/velox/shuffle/VeloxShuffleWriter.h @@ -133,18 +133,14 @@ class VeloxShuffleWriter final : public ShuffleWriter { arrow::Status stop() override; - arrow::Status evictFixedSize(int64_t size, int64_t* actual) override; - - arrow::Result> createPayloadFromBuffer( - uint32_t partitionId, - bool reuseBuffers) override; + arrow::Status evictPartitionBuffers(uint32_t partitionId, bool reuseBuffers) override; - arrow::Status evictPayload(uint32_t partitionId, std::unique_ptr payload) override; + arrow::Status evictFixedSize(int64_t size, int64_t* actual) override; const uint64_t cachedPayloadSize() const override; int64_t rawPartitionBytes() const { - return std::accumulate(rawPartitionLengths_.begin(), rawPartitionLengths_.end(), 0LL); + return std::accumulate(metrics_.rawPartitionLengths.begin(), metrics_.rawPartitionLengths.end(), 0LL); } // for testing @@ -254,13 +250,10 @@ class VeloxShuffleWriter final : public ShuffleWriter { arrow::Status splitComplexType(const facebook::velox::RowVector& rv); - arrow::Status evictPartitionBuffer(uint32_t partitionId, uint32_t newSize, bool reuseBuffers); + arrow::Status + evictBuffers(uint32_t partitionId, uint32_t numRows, std::vector> buffers); - arrow::Result> createArrowRecordBatchFromBuffer( - uint32_t partitionId, - bool reuseBuffers); - - arrow::Result> createPayload(const arrow::RecordBatch& rb, bool reuseBuffers); + arrow::Result>> assembleBuffers(uint32_t partitionId, bool reuseBuffers); template arrow::Status splitFixedType(const uint8_t* srcAddr, const std::vector& dstAddrs) { @@ -293,8 +286,6 @@ class VeloxShuffleWriter final : public ShuffleWriter { arrow::Result shrinkPartitionBuffersMinSize(int64_t size); - arrow::Result shrinkPartitionBuffers(); - arrow::Result evictPartitionBuffersMinSize(int64_t size); arrow::Status shrinkPartitionBuffer(uint32_t partitionId); diff --git a/cpp/velox/tests/VeloxShuffleWriterTest.cc b/cpp/velox/tests/VeloxShuffleWriterTest.cc index b8853ed93552f..2c475efa202be 100644 --- a/cpp/velox/tests/VeloxShuffleWriterTest.cc +++ b/cpp/velox/tests/VeloxShuffleWriterTest.cc @@ -82,13 +82,6 @@ TEST_P(SinglePartitioningShuffleWriter, single) { auto shuffleWriter = createShuffleWriter(); testShuffleWrite(*shuffleWriter, {inputVector1_, inputVector2_, inputVector1_}); } - // Test not compress small buffer. - { - shuffleWriterOptions_.compression_type = arrow::Compression::LZ4_FRAME; - shuffleWriterOptions_.compression_threshold = 1024; - auto shuffleWriter = createShuffleWriter(); - testShuffleWrite(*shuffleWriter, {inputVector1_, inputVector1_}); - } // Split null RowVector. { auto shuffleWriter = createShuffleWriter(); @@ -373,10 +366,7 @@ TEST_P(RoundRobinPartitioningShuffleWriter, spillVerifyResult) { // Clear buffers and evict payloads and cache. for (auto pid : {0, 1}) { - GLUTEN_ASSIGN_OR_THROW(auto payload, shuffleWriter->createPayloadFromBuffer(pid, true)); - if (payload) { - ASSERT_NOT_OK(shuffleWriter->evictPayload(pid, std::move(payload))); - } + ASSERT_NOT_OK(shuffleWriter->evictPartitionBuffers(pid, true)); } ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_)); @@ -484,10 +474,7 @@ TEST_F(VeloxShuffleWriterMemoryTest, kInit) { // Clear buffers then the size after shrink will be 0. for (auto pid = 0; pid < kDefaultShufflePartitions; ++pid) { - GLUTEN_ASSIGN_OR_THROW(auto payload, shuffleWriter->createPayloadFromBuffer(pid, true)); - if (payload) { - ASSERT_NOT_OK(shuffleWriter->evictPayload(pid, std::move(payload))); - } + ASSERT_NOT_OK(shuffleWriter->evictPartitionBuffers(pid, true)); } auto bufferSize = shuffleWriter->partitionBufferSize(); diff --git a/gluten-data/src/main/scala/org/apache/spark/sql/execution/utils/ExecUtil.scala b/gluten-data/src/main/scala/org/apache/spark/sql/execution/utils/ExecUtil.scala index 5f3db8bf8a279..0d0eae30e9aaf 100644 --- a/gluten-data/src/main/scala/org/apache/spark/sql/execution/utils/ExecUtil.scala +++ b/gluten-data/src/main/scala/org/apache/spark/sql/execution/utils/ExecUtil.scala @@ -80,6 +80,7 @@ object ExecUtil { serializer: Serializer, writeMetrics: Map[String, SQLMetric], metrics: Map[String, SQLMetric]): ShuffleDependency[Int, ColumnarBatch, ColumnarBatch] = { + metrics("numPartitions").set(newPartitioning.numPartitions) // scalastyle:on argcount // only used for fallback range partitioning val rangePartitioner: Option[Partitioner] = newPartitioning match {