From 41e1c68dcd393d42d8140e0618c200cbc1d38fd7 Mon Sep 17 00:00:00 2001 From: "Ma, Rong" Date: Fri, 8 Dec 2023 10:01:38 +0800 Subject: [PATCH] remove arrow ipc payload --- cpp/core/CMakeLists.txt | 2 +- cpp/core/jni/JniWrapper.cc | 2 +- cpp/core/shuffle/BlockPayload.cc | 20 ++ cpp/core/shuffle/BlockPayload.h | 304 ++++++++++++++++++ cpp/core/shuffle/LocalPartitionWriter.cc | 36 ++- cpp/core/shuffle/LocalPartitionWriter.h | 3 +- cpp/core/shuffle/PartitionWriter.cc | 51 --- cpp/core/shuffle/PartitionWriter.h | 14 +- cpp/core/shuffle/ShuffleMemoryPool.cc | 66 ++++ cpp/core/shuffle/ShuffleMemoryPool.h | 47 +++ cpp/core/shuffle/ShuffleReader.cc | 23 +- cpp/core/shuffle/ShuffleReader.h | 28 +- cpp/core/shuffle/ShuffleWriter.h | 55 +--- cpp/core/shuffle/Utils.cc | 2 - cpp/core/shuffle/Utils.h | 3 + .../shuffle/rss/CelebornPartitionWriter.cc | 10 +- .../shuffle/rss/CelebornPartitionWriter.h | 1 + cpp/core/shuffle/rss/RemotePartitionWriter.cc | 2 +- cpp/core/utils/Timer.h | 20 +- cpp/velox/compute/VeloxBackend.cc | 3 - cpp/velox/compute/VeloxRuntime.cc | 7 +- cpp/velox/shuffle/VeloxShuffleReader.cc | 134 ++++---- cpp/velox/shuffle/VeloxShuffleReader.h | 58 +++- cpp/velox/shuffle/VeloxShuffleWriter.cc | 27 +- cpp/velox/shuffle/VeloxShuffleWriter.h | 7 +- cpp/velox/tests/VeloxShuffleWriterTest.cc | 15 +- .../utils/tests/VeloxShuffleWriterTestBase.h | 6 +- 27 files changed, 674 insertions(+), 272 deletions(-) create mode 100644 cpp/core/shuffle/BlockPayload.cc create mode 100644 cpp/core/shuffle/BlockPayload.h delete mode 100644 cpp/core/shuffle/PartitionWriter.cc create mode 100644 cpp/core/shuffle/ShuffleMemoryPool.cc create mode 100644 cpp/core/shuffle/ShuffleMemoryPool.h diff --git a/cpp/core/CMakeLists.txt b/cpp/core/CMakeLists.txt index 60e63dd4846d1..aa2e6d3df7c1e 100644 --- a/cpp/core/CMakeLists.txt +++ b/cpp/core/CMakeLists.txt @@ -205,7 +205,7 @@ set(SPARK_COLUMNAR_PLUGIN_SRCS shuffle/Partitioning.cc shuffle/PartitionWriterCreator.cc shuffle/LocalPartitionWriter.cc - shuffle/PartitionWriter.cc + shuffle/ShuffleMemoryPool.cc shuffle/rss/RemotePartitionWriter.cc shuffle/rss/CelebornPartitionWriter.cc shuffle/Utils.cc diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc index e19d4a683ee40..8a34b7476ab48 100644 --- a/cpp/core/jni/JniWrapper.cc +++ b/cpp/core/jni/JniWrapper.cc @@ -1049,7 +1049,7 @@ JNIEXPORT void JNICALL Java_io_glutenproject_vectorized_ShuffleReaderJniWrapper_ auto reader = ctx->objectStore()->retrieve(shuffleReaderHandle); env->CallVoidMethod(metrics, shuffleReaderMetricsSetDecompressTime, reader->getDecompressTime()); env->CallVoidMethod(metrics, shuffleReaderMetricsSetIpcTime, reader->getIpcTime()); - env->CallVoidMethod(metrics, shuffleReaderMetricsSetDeserializeTime, reader->getDeserializeTime()); + env->CallVoidMethod(metrics, shuffleReaderMetricsSetDeserializeTime, reader->getArrowToVeloxTime()); checkException(env); JNI_METHOD_END() diff --git a/cpp/core/shuffle/BlockPayload.cc b/cpp/core/shuffle/BlockPayload.cc new file mode 100644 index 0000000000000..0472db417be2c --- /dev/null +++ b/cpp/core/shuffle/BlockPayload.cc @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "shuffle/BlockPayload.h" + +namespace gluten {} \ No newline at end of file diff --git a/cpp/core/shuffle/BlockPayload.h b/cpp/core/shuffle/BlockPayload.h new file mode 100644 index 0000000000000..8e9418da03f01 --- /dev/null +++ b/cpp/core/shuffle/BlockPayload.h @@ -0,0 +1,304 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include "shuffle/Options.h" +#include "shuffle/PartitionWriter.h" +#include "shuffle/Utils.h" + +namespace gluten { +// A block represents data to be cached in-memory or spilled. +// Can be compressed or uncompressed. + +namespace { + +static constexpr int64_t kZeroBufferLength = 0; +static constexpr int64_t kNullBuffer = -1; +static constexpr int64_t kUncompressedBuffer = -2; + +template +void write(uint8_t** dst, T data) { + auto ptr = reinterpret_cast(*dst); + *ptr = data; + *dst += sizeof(T); +} + +template +T* advance(uint8_t** dst) { + auto ptr = reinterpret_cast(*dst); + *dst += sizeof(T); + return ptr; +} + +arrow::Status compressBuffer( + std::shared_ptr& buffer, + uint8_t*& output, + int64_t outputLength, + ShuffleWriterOptions* options) { + if (!buffer) { + write(&output, kNullBuffer); + write(&output, kZeroBufferLength); + return arrow::Status::OK(); + } + auto* compressedLengthPtr = advance(&output); + write(&output, static_cast(buffer->size())); + ARROW_ASSIGN_OR_RAISE( + auto compressedLength, options->codec->Compress(buffer->size(), buffer->data(), outputLength, output)); + if (compressedLength > buffer->size()) { + // Write uncompressed buffer. + memcpy(output, buffer->data(), buffer->size()); + output += buffer->size(); + *compressedLengthPtr = kUncompressedBuffer; + } else { + output += compressedLength; + *compressedLengthPtr = static_cast(compressedLength); + } + // Release buffer after compression. + buffer = nullptr; + return arrow::Status::OK(); +} + +} // namespace + +class BlockPayload : public Payload { + public: + enum Type : int32_t { kCompressed, kUncompressed }; + + BlockPayload(BlockPayload::Type type, uint32_t numRows, std::vector> buffers) + : type_(type), numRows_(numRows), buffers_(std::move(buffers)) {} + + static arrow::Result> fromBuffers( + uint32_t numRows, + std::vector> buffers, + ShuffleWriterOptions* options, + bool reuseBuffers, + bool shouldCompressBuffers) { + if (options->codec && numRows >= options->compression_threshold && shouldCompressBuffers) { + // Compress. + // Compressed buffer layout: | buffer1 compressedLength | buffer1 uncompressedLength | buffer1 | ... + auto metadataLength = sizeof(int64_t) * 2 * buffers.size(); + int64_t totalCompressedLength = + std::accumulate(buffers.begin(), buffers.end(), 0LL, [&](auto sum, const auto& buffer) { + if (!buffer) { + return sum; + } + return sum + options->codec->MaxCompressedLen(buffer->size(), buffer->data()); + }); + ARROW_ASSIGN_OR_RAISE( + std::shared_ptr compressed, + arrow::AllocateResizableBuffer( + metadataLength + totalCompressedLength, options->ipc_write_options.memory_pool)); + auto output = compressed->mutable_data(); + + // Compress buffers one by one. + for (auto& buffer : buffers) { + auto availableLength = compressed->size() - (output - compressed->data()); + RETURN_NOT_OK(compressBuffer(buffer, output, availableLength, options)); + } + + int64_t actualLength = output - compressed->data(); + ARROW_RETURN_IF(actualLength < 0, arrow::Status::Invalid("Writing compressed buffer out of bound.")); + RETURN_NOT_OK(compressed->Resize(actualLength)); + return std::make_unique( + Type::kCompressed, numRows, std::vector>{compressed}); + } + if (reuseBuffers) { + // Copy. + std::vector> copies; + for (auto& buffer : buffers) { + if (!buffer) { + copies.push_back(nullptr); + continue; + } + ARROW_ASSIGN_OR_RAISE( + auto copy, arrow::AllocateResizableBuffer(buffer->size(), options->ipc_write_options.memory_pool)); + memcpy(copy->mutable_data(), buffer->data(), buffer->size()); + copies.push_back(std::move(copy)); + } + return std::make_unique(Type::kUncompressed, numRows, std::move(copies)); + } + return std::make_unique(Type::kUncompressed, numRows, std::move(buffers)); + } + + arrow::Status serialize(arrow::io::OutputStream* outputStream) override { + RETURN_NOT_OK(outputStream->Write(&type_, sizeof(Type))); + RETURN_NOT_OK(outputStream->Write(&numRows_, sizeof(uint32_t))); + if (type_ == Type::kUncompressed) { + for (auto& buffer : buffers_) { + if (!buffer) { + RETURN_NOT_OK(outputStream->Write(&kNullBuffer, sizeof(int64_t))); + continue; + } + int64_t bufferSize = buffer->size(); + RETURN_NOT_OK(outputStream->Write(&bufferSize, sizeof(int64_t))); + RETURN_NOT_OK(outputStream->Write(std::move(buffer))); + } + } else { + RETURN_NOT_OK(outputStream->Write(std::move(buffers_[0]))); + } + buffers_.clear(); + return arrow::Status::OK(); + } + + static arrow::Result>> deserialize( + arrow::io::InputStream* inputStream, + const std::shared_ptr& schema, + const std::shared_ptr& codec, + arrow::MemoryPool* pool, + uint32_t& numRows) { + static const std::vector> kEmptyBuffers{}; + ARROW_ASSIGN_OR_RAISE(auto typeAndRows, readTypeAndRows(inputStream)); + if (typeAndRows.first == kIpcContinuationToken && typeAndRows.second == kZeroLength) { + numRows = 0; + return kEmptyBuffers; + } + numRows = typeAndRows.second; + auto fields = schema->fields(); + + auto isCompressionEnabled = typeAndRows.first == Type::kUncompressed || codec == nullptr; + auto readBuffer = [&]() { + if (isCompressionEnabled) { + return readUncompressedBuffer(inputStream); + } else { + return readCompressedBuffer(inputStream, codec, pool); + } + }; + + bool hasComplexDataType = false; + std::vector> buffers; + for (const auto& field : fields) { + auto fieldType = field->type()->id(); + switch (fieldType) { + case arrow::BinaryType::type_id: + case arrow::StringType::type_id: { + buffers.emplace_back(); + ARROW_ASSIGN_OR_RAISE(buffers.back(), readBuffer()); + buffers.emplace_back(); + ARROW_ASSIGN_OR_RAISE(buffers.back(), readBuffer()); + buffers.emplace_back(); + ARROW_ASSIGN_OR_RAISE(buffers.back(), readBuffer()); + break; + } + case arrow::StructType::type_id: + case arrow::MapType::type_id: + case arrow::ListType::type_id: { + hasComplexDataType = true; + } break; + default: { + buffers.emplace_back(); + ARROW_ASSIGN_OR_RAISE(buffers.back(), readBuffer()); + buffers.emplace_back(); + ARROW_ASSIGN_OR_RAISE(buffers.back(), readBuffer()); + break; + } + } + } + if (hasComplexDataType) { + buffers.emplace_back(); + ARROW_ASSIGN_OR_RAISE(buffers.back(), readBuffer()); + } + return buffers; + } + + static arrow::Result> readTypeAndRows(arrow::io::InputStream* inputStream) { + int32_t type; + uint32_t numRows; + RETURN_NOT_OK(inputStream->Read(sizeof(Type), &type)); + RETURN_NOT_OK(inputStream->Read(sizeof(uint32_t), &numRows)); + return std::make_pair(type, numRows); + } + + static arrow::Status mergeCompressed( + arrow::io::InputStream* inputStream, + arrow::io::OutputStream* outputStream, + uint32_t numRows, + int64_t totalLength) { + static const Type kType = Type::kUncompressed; + RETURN_NOT_OK(outputStream->Write(&kType, sizeof(Type))); + RETURN_NOT_OK(outputStream->Write(&numRows, sizeof(uint32_t))); + RETURN_NOT_OK(outputStream->Write(&totalLength, sizeof(int64_t))); + ARROW_ASSIGN_OR_RAISE(auto buffer, inputStream->Read(totalLength)); + RETURN_NOT_OK(outputStream->Write(buffer)); + return arrow::Status::OK(); + } + + static arrow::Result> readUncompressedBuffer(arrow::io::InputStream* inputStream) { + int64_t bufferLength; + RETURN_NOT_OK(inputStream->Read(sizeof(int64_t), &bufferLength)); + if (bufferLength == kNullBuffer) { + return nullptr; + } + ARROW_ASSIGN_OR_RAISE(auto buffer, inputStream->Read(bufferLength)); + return buffer; + } + + static arrow::Result> readCompressedBuffer( + arrow::io::InputStream* inputStream, + const std::shared_ptr& codec, + arrow::MemoryPool* pool) { + int64_t compressedLength; + int64_t uncompressedLength; + RETURN_NOT_OK(inputStream->Read(sizeof(int64_t), &compressedLength)); + RETURN_NOT_OK(inputStream->Read(sizeof(int64_t), &uncompressedLength)); + if (compressedLength == kNullBuffer) { + return nullptr; + } + if (compressedLength == kUncompressedBuffer) { + ARROW_ASSIGN_OR_RAISE(auto uncompressed, arrow::AllocateBuffer(uncompressedLength, pool)); + RETURN_NOT_OK(inputStream->Read(uncompressedLength, const_cast(uncompressed->data()))); + return uncompressed; + } + ARROW_ASSIGN_OR_RAISE(auto compressed, arrow::AllocateBuffer(compressedLength, pool)); + RETURN_NOT_OK(inputStream->Read(compressedLength, const_cast(compressed->data()))); + ARROW_ASSIGN_OR_RAISE(auto output, arrow::AllocateBuffer(uncompressedLength, pool)); + RETURN_NOT_OK(codec->Decompress( + compressedLength, compressed->data(), uncompressedLength, const_cast(output->data()))); + return output; + } + + static arrow::Status mergeUncompressed(arrow::io::InputStream* inputStream, arrow::ResizableBuffer* output) { + ARROW_ASSIGN_OR_RAISE(auto input, readUncompressedBuffer(inputStream)); + auto data = output->mutable_data() + output->size(); + auto newSize = output->size() + input->size(); + RETURN_NOT_OK(output->Resize(newSize)); + memcpy(data, input->data(), input->size()); + return arrow::Status::OK(); + } + + static arrow::Status compressAndWrite( + std::shared_ptr buffer, + arrow::io::OutputStream* outputStream, + ShuffleWriterOptions* options) { + auto maxCompressedLength = options->codec->MaxCompressedLen(buffer->size(), buffer->data()); + ARROW_ASSIGN_OR_RAISE( + std::shared_ptr compressed, + arrow::AllocateResizableBuffer( + sizeof(int64_t) * 2 + maxCompressedLength, options->ipc_write_options.memory_pool)); + auto output = compressed->mutable_data(); + RETURN_NOT_OK(compressBuffer(buffer, output, maxCompressedLength, options)); + RETURN_NOT_OK(outputStream->Write(compressed->data(), output - compressed->data())); + return arrow::Status::OK(); + } + + private: + Type type_; + uint32_t numRows_; + std::vector> buffers_; +}; + +} // namespace gluten \ No newline at end of file diff --git a/cpp/core/shuffle/LocalPartitionWriter.cc b/cpp/core/shuffle/LocalPartitionWriter.cc index 8fc24bf8a0214..9979beca205b1 100644 --- a/cpp/core/shuffle/LocalPartitionWriter.cc +++ b/cpp/core/shuffle/LocalPartitionWriter.cc @@ -15,9 +15,11 @@ * limitations under the License. */ -#include "shuffle/LocalPartitionWriter.h" #include #include + +#include "shuffle/BlockPayload.h" +#include "shuffle/LocalPartitionWriter.h" #include "shuffle/Utils.h" #include "utils/DebugOut.h" #include "utils/StringUtil.h" @@ -51,9 +53,9 @@ class CacheEvictor final : public LocalPartitionWriter::LocalEvictor { CacheEvictor(uint32_t numPartitions, ShuffleWriterOptions* options, const std::shared_ptr& spillInfo) : LocalPartitionWriter::LocalEvictor(numPartitions, options, spillInfo) {} - arrow::Status evict(uint32_t partitionId, std::unique_ptr payload) override { + arrow::Status evict(uint32_t partitionId, std::unique_ptr payload) override { if (partitionCachedPayload_.find(partitionId) == partitionCachedPayload_.end()) { - partitionCachedPayload_.emplace(partitionId, std::vector>{}); + partitionCachedPayload_.emplace(partitionId, std::vector>{}); } partitionCachedPayload_[partitionId].push_back(std::move(payload)); return arrow::Status::OK(); @@ -96,15 +98,14 @@ class CacheEvictor final : public LocalPartitionWriter::LocalEvictor { } private: - std::unordered_map>> partitionCachedPayload_; + std::unordered_map>> partitionCachedPayload_; arrow::Status flushInternal(uint32_t partitionId, arrow::io::OutputStream* os) { ScopedTimer timer(evictTime_); - int32_t metadataLength = 0; // unused auto payloads = std::move(partitionCachedPayload_[partitionId]); partitionCachedPayload_.erase(partitionId); for (auto& payload : payloads) { - RETURN_NOT_OK(arrow::ipc::WriteIpcPayload(*payload, options_->ipc_write_options, os, &metadataLength)); + RETURN_NOT_OK(payload->serialize(os)); } return arrow::Status::OK(); } @@ -118,15 +119,14 @@ class FlushOnSpillEvictor final : public LocalPartitionWriter::LocalEvictor { const std::shared_ptr& spillInfo) : LocalPartitionWriter::LocalEvictor(numPartitions, options, spillInfo) {} - arrow::Status evict(uint32_t partitionId, std::unique_ptr payload) override { + arrow::Status evict(uint32_t partitionId, std::unique_ptr payload) override { ScopedTimer timer(evictTime_); if (!os_) { ARROW_ASSIGN_OR_RAISE(os_, arrow::io::FileOutputStream::Open(spillInfo_->spilledFile, true)); } - int32_t metadataLength = 0; // unused. ARROW_ASSIGN_OR_RAISE(auto start, os_->Tell()); - RETURN_NOT_OK(arrow::ipc::WriteIpcPayload(*payload, options_->ipc_write_options, os_.get(), &metadataLength)); + RETURN_NOT_OK(payload->serialize(os_.get())); ARROW_ASSIGN_OR_RAISE(auto end, os_->Tell()); DEBUG_OUT << "Spilled partition " << partitionId << " file start: " << start << ", file end: " << end << std::endl; spillInfo_->partitionSpillInfos.push_back({partitionId, end - start}); @@ -250,7 +250,6 @@ arrow::Status LocalPartitionWriter::stop(ShuffleWriterMetrics* metrics) { writeTimer.start(); int64_t endInFinalFile = 0; - int32_t metadataLength = 0; // Unused. auto cachedPartitionBuffersIter = cachedPartitionBuffers_.begin(); // Iterator over pid. for (auto pid = 0; pid < numPartitions_; ++pid) { @@ -269,11 +268,14 @@ arrow::Status LocalPartitionWriter::stop(ShuffleWriterMetrics* metrics) { writeTimer.stop(); ARROW_ASSIGN_OR_RAISE( auto payload, - createPayloadFromBuffers( - std::get<1>(*cachedPartitionBuffersIter), std::move(std::get<2>(*cachedPartitionBuffersIter)))); + BlockPayload::fromBuffers( + std::get<1>(*cachedPartitionBuffersIter), + std::move(std::get<2>(*cachedPartitionBuffersIter)), + options_, + false, + true)); writeTimer.start(); - RETURN_NOT_OK( - arrow::ipc::WriteIpcPayload(*payload, options_->ipc_write_options, dataFileOs_.get(), &metadataLength)); + RETURN_NOT_OK(payload->serialize(dataFileOs_.get())); cachedPartitionBuffersIter++; } ARROW_ASSIGN_OR_RAISE(endInFinalFile, dataFileOs_->Tell()); @@ -345,12 +347,16 @@ arrow::Status LocalPartitionWriter::evict( uint32_t partitionId, uint32_t numRows, std::vector> buffers, + bool reuseBuffers, Evictor::Type evictType) { rawPartitionLengths_[partitionId] += getBufferSize(buffers); if (evictType == Evictor::Type::kStop) { cachedPartitionBuffers_.emplace_back(partitionId, numRows, std::move(buffers)); } else { - ARROW_ASSIGN_OR_RAISE(auto payload, createPayloadFromBuffers(numRows, std::move(buffers))); + ARROW_ASSIGN_OR_RAISE( + auto payload, + BlockPayload::fromBuffers( + numRows, std::move(buffers), options_, reuseBuffers, evictType == Evictor::Type::kCache)); RETURN_NOT_OK(requestEvict(evictType)); RETURN_NOT_OK(evictor_->evict(partitionId, std::move(payload))); } diff --git a/cpp/core/shuffle/LocalPartitionWriter.h b/cpp/core/shuffle/LocalPartitionWriter.h index d89f8d20cb7dc..b2bebab1c545f 100644 --- a/cpp/core/shuffle/LocalPartitionWriter.h +++ b/cpp/core/shuffle/LocalPartitionWriter.h @@ -23,7 +23,7 @@ #include "shuffle/PartitionWriter.h" #include "shuffle/ShuffleWriter.h" -#include "PartitionWriterCreator.h" +#include "shuffle/PartitionWriterCreator.h" #include "utils/macros.h" namespace gluten { @@ -55,6 +55,7 @@ class LocalPartitionWriter : public ShuffleWriter::PartitionWriter { uint32_t partitionId, uint32_t numRows, std::vector> buffers, + bool reuseBuffers, Evictor::Type evictType) override; arrow::Status finishEvict() override; diff --git a/cpp/core/shuffle/PartitionWriter.cc b/cpp/core/shuffle/PartitionWriter.cc deleted file mode 100644 index 4b12511b86332..0000000000000 --- a/cpp/core/shuffle/PartitionWriter.cc +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "shuffle/PartitionWriter.h" -#include "shuffle/Utils.h" - -namespace gluten { - -arrow::Result> gluten::ShuffleWriter::PartitionWriter::createPayloadFromBuffers( - uint32_t numRows, - std::vector> buffers) { - std::shared_ptr recordBatch; - if (options_->compression_type != arrow::Compression::UNCOMPRESSED) { - ARROW_ASSIGN_OR_RAISE( - recordBatch, - makeCompressedRecordBatch( - numRows, - std::move(buffers), - options_->write_schema, - options_->ipc_write_options.memory_pool, - options_->codec.get(), - options_->compression_threshold, - options_->compression_mode, - compressTime_)); - } else { - ARROW_ASSIGN_OR_RAISE( - recordBatch, - makeUncompressedRecordBatch( - numRows, std::move(buffers), options_->write_schema, options_->ipc_write_options.memory_pool)); - } - - auto payload = std::make_unique(); - RETURN_NOT_OK(arrow::ipc::GetRecordBatchPayload(*recordBatch, options_->ipc_write_options, payload.get())); - return payload; -} - -} // namespace gluten \ No newline at end of file diff --git a/cpp/core/shuffle/PartitionWriter.h b/cpp/core/shuffle/PartitionWriter.h index bc655d03ff842..de851ee2a8381 100644 --- a/cpp/core/shuffle/PartitionWriter.h +++ b/cpp/core/shuffle/PartitionWriter.h @@ -22,6 +22,13 @@ namespace gluten { +class Payload { + public: + virtual ~Payload() = default; + + virtual arrow::Status serialize(arrow::io::OutputStream* outputStream) = 0; +}; + class Evictor { public: enum Type { kCache, kFlush, kStop }; @@ -30,7 +37,7 @@ class Evictor { virtual ~Evictor() = default; - virtual arrow::Status evict(uint32_t partitionId, std::unique_ptr payload) = 0; + virtual arrow::Status evict(uint32_t partitionId, std::unique_ptr payload) = 0; virtual arrow::Status finish() = 0; @@ -62,15 +69,12 @@ class ShuffleWriter::PartitionWriter : public Evictable { uint32_t partitionId, uint32_t numRows, std::vector> buffers, + bool reuseBuffers, Evictor::Type evictType) = 0; virtual arrow::Status finishEvict() = 0; protected: - arrow::Result> createPayloadFromBuffers( - uint32_t numRows, - std::vector> buffers); - uint32_t numPartitions_; ShuffleWriterOptions* options_; diff --git a/cpp/core/shuffle/ShuffleMemoryPool.cc b/cpp/core/shuffle/ShuffleMemoryPool.cc new file mode 100644 index 0000000000000..25bbaae789694 --- /dev/null +++ b/cpp/core/shuffle/ShuffleMemoryPool.cc @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "shuffle/ShuffleMemoryPool.h" + +namespace gluten { +gluten::ShuffleMemoryPool::ShuffleMemoryPool(arrow::MemoryPool* pool) : pool_(pool) {} + +arrow::Status ShuffleMemoryPool::Allocate(int64_t size, int64_t alignment, uint8_t** out) { + auto before = pool_->bytes_allocated(); + auto status = pool_->Allocate(size, alignment, out); + if (status.ok()) { + bytesAllocated_ += (pool_->bytes_allocated() - before); + } + return status; +} + +arrow::Status ShuffleMemoryPool::Reallocate(int64_t old_size, int64_t new_size, int64_t alignment, uint8_t** ptr) { + auto before = pool_->bytes_allocated(); + auto status = pool_->Reallocate(old_size, new_size, alignment, ptr); + if (status.ok()) { + bytesAllocated_ += (pool_->bytes_allocated() - before); + } + return status; +} + +void ShuffleMemoryPool::Free(uint8_t* buffer, int64_t size, int64_t alignment) { + auto before = pool_->bytes_allocated(); + pool_->Free(buffer, size, alignment); + bytesAllocated_ += (pool_->bytes_allocated() - before); +} + +int64_t ShuffleMemoryPool::bytes_allocated() const { + return bytesAllocated_; +} + +int64_t ShuffleMemoryPool::max_memory() const { + return pool_->max_memory(); +} + +std::string ShuffleMemoryPool::backend_name() const { + return pool_->backend_name(); +} + +int64_t ShuffleMemoryPool::total_bytes_allocated() const { + return pool_->total_bytes_allocated(); +} + +int64_t ShuffleMemoryPool::num_allocations() const { + throw pool_->num_allocations(); +} +} // namespace gluten diff --git a/cpp/core/shuffle/ShuffleMemoryPool.h b/cpp/core/shuffle/ShuffleMemoryPool.h new file mode 100644 index 0000000000000..64755e689dbe9 --- /dev/null +++ b/cpp/core/shuffle/ShuffleMemoryPool.h @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#pragma once + +namespace gluten { +class ShuffleMemoryPool : public arrow::MemoryPool { + public: + ShuffleMemoryPool(arrow::MemoryPool* pool); + + arrow::Status Allocate(int64_t size, int64_t alignment, uint8_t** out) override; + + arrow::Status Reallocate(int64_t old_size, int64_t new_size, int64_t alignment, uint8_t** ptr) override; + + void Free(uint8_t* buffer, int64_t size, int64_t alignment) override; + + int64_t bytes_allocated() const override; + + int64_t max_memory() const override; + + std::string backend_name() const override; + + int64_t total_bytes_allocated() const override; + + int64_t num_allocations() const override; + + private: + arrow::MemoryPool* pool_; + uint64_t bytesAllocated_ = 0; +}; +} // namespace gluten \ No newline at end of file diff --git a/cpp/core/shuffle/ShuffleReader.cc b/cpp/core/shuffle/ShuffleReader.cc index 6b05dd64e5b89..43b25d2db2fc0 100644 --- a/cpp/core/shuffle/ShuffleReader.cc +++ b/cpp/core/shuffle/ShuffleReader.cc @@ -74,15 +74,10 @@ class ShuffleReaderOutStream : public ColumnarBatchIterator { namespace gluten { -ShuffleReader::ShuffleReader( - std::shared_ptr schema, - ShuffleReaderOptions options, - arrow::MemoryPool* pool) - : pool_(pool), options_(std::move(options)), schema_(schema) {} +ShuffleReader::ShuffleReader(std::unique_ptr factory) : factory_(std::move(factory)) {} std::shared_ptr ShuffleReader::readStream(std::shared_ptr in) { - return std::make_shared(std::make_unique( - options_, schema_, in, [this](int64_t ipcTime) { this->ipcTime_ += ipcTime; })); + return std::make_shared(factory_->createDeserializer(in)); } arrow::Status ShuffleReader::close() { @@ -90,7 +85,19 @@ arrow::Status ShuffleReader::close() { } arrow::MemoryPool* ShuffleReader::getPool() const { - return pool_; + return factory_->getPool(); +} + +int64_t ShuffleReader::getDecompressTime() const { + return factory_->getDecompressTime(); +} + +int64_t ShuffleReader::getIpcTime() const { + return ipcTime_; +} + +int64_t ShuffleReader::getArrowToVeloxTime() const { + return factory_->getArrowToVeloxTime(); } } // namespace gluten diff --git a/cpp/core/shuffle/ShuffleReader.h b/cpp/core/shuffle/ShuffleReader.h index 676211afb4ff5..818ca6a5bd5ed 100644 --- a/cpp/core/shuffle/ShuffleReader.h +++ b/cpp/core/shuffle/ShuffleReader.h @@ -28,9 +28,22 @@ namespace gluten { +class DeserializerFactory { + public: + virtual ~DeserializerFactory() = default; + + virtual std::unique_ptr createDeserializer(std::shared_ptr in) = 0; + + virtual arrow::MemoryPool* getPool() = 0; + + virtual int64_t getDecompressTime() = 0; + + virtual int64_t getArrowToVeloxTime() = 0; +}; + class ShuffleReader { public: - explicit ShuffleReader(std::shared_ptr schema, ShuffleReaderOptions options, arrow::MemoryPool* pool); + explicit ShuffleReader(std::unique_ptr factory); virtual ~ShuffleReader() = default; @@ -39,17 +52,11 @@ class ShuffleReader { arrow::Status close(); - int64_t getDecompressTime() const { - return decompressTime_; - } + int64_t getDecompressTime() const; - int64_t getIpcTime() const { - return ipcTime_; - } + int64_t getIpcTime() const; - int64_t getDeserializeTime() const { - return deserializeTime_; - } + int64_t getArrowToVeloxTime() const; arrow::MemoryPool* getPool() const; @@ -63,6 +70,7 @@ class ShuffleReader { private: std::shared_ptr schema_; + std::unique_ptr factory_; }; } // namespace gluten diff --git a/cpp/core/shuffle/ShuffleWriter.h b/cpp/core/shuffle/ShuffleWriter.h index 91d6861086d78..17790c256b991 100644 --- a/cpp/core/shuffle/ShuffleWriter.h +++ b/cpp/core/shuffle/ShuffleWriter.h @@ -27,6 +27,7 @@ #include "shuffle/Options.h" #include "shuffle/Partitioner.h" #include "shuffle/Partitioning.h" +#include "shuffle/ShuffleMemoryPool.h" #include "utils/Compression.h" namespace gluten { @@ -41,60 +42,6 @@ struct ShuffleWriterMetrics { std::vector rawPartitionLengths{}; // Uncompressed size. }; -class ShuffleMemoryPool : public arrow::MemoryPool { - public: - ShuffleMemoryPool(arrow::MemoryPool* pool) : pool_(pool) {} - - arrow::MemoryPool* delegated() { - return pool_; - } - - arrow::Status Allocate(int64_t size, int64_t alignment, uint8_t** out) override { - auto status = pool_->Allocate(size, alignment, out); - if (status.ok()) { - bytesAllocated_ += size; - } - return status; - } - - arrow::Status Reallocate(int64_t old_size, int64_t new_size, int64_t alignment, uint8_t** ptr) override { - auto status = pool_->Reallocate(old_size, new_size, alignment, ptr); - if (status.ok()) { - bytesAllocated_ += (new_size - old_size); - } - return status; - } - - void Free(uint8_t* buffer, int64_t size, int64_t alignment) override { - pool_->Free(buffer, size, alignment); - bytesAllocated_ -= size; - } - - int64_t bytes_allocated() const override { - return bytesAllocated_; - } - - int64_t max_memory() const override { - return pool_->max_memory(); - } - - std::string backend_name() const override { - return pool_->backend_name(); - } - - int64_t total_bytes_allocated() const override { - return pool_->total_bytes_allocated(); - } - - int64_t num_allocations() const override { - throw pool_->num_allocations(); - } - - private: - arrow::MemoryPool* pool_; - uint64_t bytesAllocated_ = 0; -}; - class ShuffleWriter : public Evictable { public: static constexpr int64_t kMinMemLimit = 128LL * 1024 * 1024; diff --git a/cpp/core/shuffle/Utils.cc b/cpp/core/shuffle/Utils.cc index f959e2a5cdfe7..a225a3655ef63 100644 --- a/cpp/core/shuffle/Utils.cc +++ b/cpp/core/shuffle/Utils.cc @@ -326,8 +326,6 @@ int64_t gluten::getMaxCompressedBufferSize( arrow::Status gluten::writeEos(arrow::io::OutputStream* os, int64_t* bytes) { // write EOS - static constexpr int32_t kIpcContinuationToken = -1; - static constexpr int32_t kZeroLength = 0; static const int64_t kSizeOfEos = sizeof(kIpcContinuationToken) + sizeof(kZeroLength); RETURN_NOT_OK(os->Write(&kIpcContinuationToken, sizeof(kIpcContinuationToken))); RETURN_NOT_OK(os->Write(&kZeroLength, sizeof(kZeroLength))); diff --git a/cpp/core/shuffle/Utils.h b/cpp/core/shuffle/Utils.h index fde8a7e0cc7cf..588f1d7015aaf 100644 --- a/cpp/core/shuffle/Utils.h +++ b/cpp/core/shuffle/Utils.h @@ -32,6 +32,9 @@ namespace gluten { using BinaryArrayLengthBufferType = uint32_t; using IpcOffsetBufferType = arrow::LargeStringType::offset_type; +static constexpr int32_t kIpcContinuationToken = -1; +static constexpr uint32_t kZeroLength = 0; + static const size_t kSizeOfBinaryArrayLengthBuffer = sizeof(BinaryArrayLengthBufferType); static const size_t kSizeOfIpcOffsetBuffer = sizeof(IpcOffsetBufferType); static const std::string kGlutenSparkLocalDirs = "GLUTEN_SPARK_LOCAL_DIRS"; diff --git a/cpp/core/shuffle/rss/CelebornPartitionWriter.cc b/cpp/core/shuffle/rss/CelebornPartitionWriter.cc index 8444c2e0f8173..7ffbfa1c110df 100644 --- a/cpp/core/shuffle/rss/CelebornPartitionWriter.cc +++ b/cpp/core/shuffle/rss/CelebornPartitionWriter.cc @@ -16,6 +16,7 @@ */ #include "CelebornPartitionWriter.h" +#include "shuffle/BlockPayload.h" #include "shuffle/Utils.h" #include "utils/Timer.h" @@ -26,13 +27,11 @@ class CelebornEvictHandle final : public Evictor { CelebornEvictHandle(ShuffleWriterOptions* options, RssClient* client, std::vector& bytesEvicted) : Evictor(options), client_(client), bytesEvicted_(bytesEvicted) {} - arrow::Status evict(uint32_t partitionId, std::unique_ptr payload) override { + arrow::Status evict(uint32_t partitionId, std::unique_ptr payload) override { // Copy payload to arrow buffered os. ARROW_ASSIGN_OR_RAISE( auto celebornBufferOs, arrow::io::BufferOutputStream::Create(options_->buffer_size, options_->memory_pool)); - int32_t metadataLength = 0; // unused - RETURN_NOT_OK( - arrow::ipc::WriteIpcPayload(*payload, options_->ipc_write_options, celebornBufferOs.get(), &metadataLength)); + RETURN_NOT_OK(payload->serialize(celebornBufferOs.get())); payload = nullptr; // Invalidate payload immediately. // Push. @@ -82,10 +81,11 @@ arrow::Status CelebornPartitionWriter::evict( uint32_t partitionId, uint32_t numRows, std::vector> buffers, + bool reuseBuffers, Evictor::Type evictType) { rawPartitionLengths_[partitionId] += getBufferSize(buffers); ScopedTimer timer(evictTime_); - ARROW_ASSIGN_OR_RAISE(auto payload, createPayloadFromBuffers(numRows, std::move(buffers))); + ARROW_ASSIGN_OR_RAISE(auto payload, BlockPayload::fromBuffers(numRows, std::move(buffers), options_, false, true)); RETURN_NOT_OK(evictor_->evict(partitionId, std::move(payload))); return arrow::Status::OK(); } diff --git a/cpp/core/shuffle/rss/CelebornPartitionWriter.h b/cpp/core/shuffle/rss/CelebornPartitionWriter.h index 10bfc21c19ee3..e7b15fbaabd8a 100644 --- a/cpp/core/shuffle/rss/CelebornPartitionWriter.h +++ b/cpp/core/shuffle/rss/CelebornPartitionWriter.h @@ -41,6 +41,7 @@ class CelebornPartitionWriter final : public RemotePartitionWriter { uint32_t partitionId, uint32_t numRows, std::vector> buffers, + bool reuseBuffers, Evictor::Type evictType /* unused */) override; arrow::Status finishEvict() override; diff --git a/cpp/core/shuffle/rss/RemotePartitionWriter.cc b/cpp/core/shuffle/rss/RemotePartitionWriter.cc index 6fa6feddd5821..9993956b6472a 100644 --- a/cpp/core/shuffle/rss/RemotePartitionWriter.cc +++ b/cpp/core/shuffle/rss/RemotePartitionWriter.cc @@ -15,6 +15,6 @@ * limitations under the License. */ -#include "shuffle/rss/RemotePartitionWriter.h" +#include "RemotePartitionWriter.h" namespace gluten {} // namespace gluten diff --git a/cpp/core/utils/Timer.h b/cpp/core/utils/Timer.h index 69e1babe00ef1..6a8763fd97e5f 100644 --- a/cpp/core/utils/Timer.h +++ b/cpp/core/utils/Timer.h @@ -60,15 +60,29 @@ class Timer { class ScopedTimer : public Timer { public: explicit ScopedTimer(int64_t& toAdd) : Timer(), toAdd_(toAdd) { - Timer::start(); + startInternal(); } ~ScopedTimer() { - Timer::stop(); - toAdd_ += realTimeUsed(); + stopInternal(); + } + + void switchTo(int64_t& toAdd) { + stopInternal(); + toAdd_ = toAdd; + startInternal(); } private: int64_t& toAdd_; + + void stopInternal() { + Timer::stop(); + toAdd_ += realTimeUsed(); + } + + void startInternal() { + Timer::start(); + } }; } // namespace gluten diff --git a/cpp/velox/compute/VeloxBackend.cc b/cpp/velox/compute/VeloxBackend.cc index 9dcbd9b49416b..82abd7acb2ed4 100644 --- a/cpp/velox/compute/VeloxBackend.cc +++ b/cpp/velox/compute/VeloxBackend.cc @@ -153,9 +153,6 @@ void VeloxBackend::init(const std::unordered_map& conf // Set backtrace_allocation gluten::backtrace_allocation = veloxcfg->get(kBacktraceAllocation, false); - // Set veloxShuffleReaderPrintFlag - gluten::veloxShuffleReaderPrintFlag = veloxcfg->get(kVeloxShuffleReaderPrintFlag, false); - // Setup and register. velox::filesystems::registerLocalFileSystem(); initJolFilesystem(veloxcfg); diff --git a/cpp/velox/compute/VeloxRuntime.cc b/cpp/velox/compute/VeloxRuntime.cc index a25a74cb32682..03d45fde5e059 100644 --- a/cpp/velox/compute/VeloxRuntime.cc +++ b/cpp/velox/compute/VeloxRuntime.cc @@ -28,6 +28,7 @@ #include "shuffle/VeloxShuffleReader.h" #include "shuffle/VeloxShuffleWriter.h" #include "utils/ConfigExtractor.h" +#include "utils/VeloxArrowUtils.h" using namespace facebook; @@ -174,8 +175,12 @@ std::shared_ptr VeloxRuntime::createShuffleReader( ShuffleReaderOptions options, arrow::MemoryPool* pool, MemoryManager* memoryManager) { + auto rowType = facebook::velox::asRowType(gluten::fromArrowSchema(schema)); + auto codec = gluten::createArrowIpcCodec(options.compression_type, options.codec_backend); auto ctxVeloxPool = getLeafVeloxPool(memoryManager); - return std::make_shared(schema, options, pool, ctxVeloxPool); + auto deserializerFactory = std::make_unique( + schema, std::move(codec), rowType, pool, ctxVeloxPool); + return std::make_shared(std::move(deserializerFactory)); } std::unique_ptr VeloxRuntime::createColumnarBatchSerializer( diff --git a/cpp/velox/shuffle/VeloxShuffleReader.cc b/cpp/velox/shuffle/VeloxShuffleReader.cc index 8aa8df34c6639..a8059cd8731cd 100644 --- a/cpp/velox/shuffle/VeloxShuffleReader.cc +++ b/cpp/velox/shuffle/VeloxShuffleReader.cc @@ -20,9 +20,11 @@ #include #include "memory/VeloxColumnarBatch.h" +#include "shuffle/BlockPayload.h" #include "shuffle/Utils.h" #include "utils/Common.h" #include "utils/Compression.h" +#include "utils/Timer.h" #include "utils/VeloxArrowUtils.h" #include "utils/macros.h" #include "velox/serializers/PrestoSerializer.h" @@ -37,8 +39,6 @@ using namespace facebook::velox; namespace gluten { -bool veloxShuffleReaderPrintFlag = false; - namespace { struct BufferViewReleaser { @@ -366,54 +366,6 @@ RowVectorPtr readRowVector( return rv; } -class VeloxShuffleReaderOutStream : public ColumnarBatchIterator { - public: - VeloxShuffleReaderOutStream( - arrow::MemoryPool* pool, - const std::shared_ptr& veloxPool, - const ShuffleReaderOptions& options, - const RowTypePtr& rowType, - const std::function decompressionTimeAccumulator, - const std::function deserializeTimeAccumulator, - ResultIterator& in) - : pool_(pool), - veloxPool_(veloxPool), - options_(options), - rowType_(rowType), - decompressionTimeAccumulator_(decompressionTimeAccumulator), - deserializeTimeAccumulator_(deserializeTimeAccumulator), - in_(std::move(in)) {} - - std::shared_ptr next() override { - if (!in_.hasNext()) { - return nullptr; - } - auto batch = in_.next(); - auto rb = std::dynamic_pointer_cast(batch)->getRecordBatch(); - - int64_t decompressTime = 0LL; - int64_t deserializeTime = 0LL; - - auto vp = - readRowVector(*rb, rowType_, options_.codec_backend, decompressTime, deserializeTime, pool_, veloxPool_.get()); - - decompressionTimeAccumulator_(decompressTime); - deserializeTimeAccumulator_(deserializeTime); - return std::make_shared(vp); - } - - private: - arrow::MemoryPool* pool_; - std::shared_ptr veloxPool_; - ShuffleReaderOptions options_; - facebook::velox::RowTypePtr rowType_; - - std::function decompressionTimeAccumulator_; - std::function deserializeTimeAccumulator_; - - ResultIterator in_; -}; - std::string getCodecBackend(CodecBackend type) { if (type == CodecBackend::QAT) { return "QAT"; @@ -440,31 +392,69 @@ std::string getCompressionType(arrow::Compression::type type) { } // namespace -VeloxShuffleReader::VeloxShuffleReader( - std::shared_ptr schema, - ShuffleReaderOptions options, - arrow::MemoryPool* pool, - std::shared_ptr veloxPool) - : ShuffleReader(schema, options, pool), veloxPool_(std::move(veloxPool)) { - rowType_ = asRowType(gluten::fromArrowSchema(schema)); - if (gluten::veloxShuffleReaderPrintFlag) { - std::ostringstream oss; - oss << "VeloxShuffleReader create, compression_type:" << getCompressionType(options.compression_type); - oss << " codec_backend:" << getCodecBackend(options.codec_backend); - LOG(INFO) << oss.str(); +VeloxShuffleReader::VeloxShuffleReader(std::unique_ptr factory) + : ShuffleReader(std::move(factory)) {} + +VeloxColumnarBatchDeserializer::VeloxColumnarBatchDeserializer( + const std::shared_ptr& in, + const std::shared_ptr& schema, + const std::shared_ptr& codec, + const facebook::velox::RowTypePtr& rowType, + arrow::MemoryPool* memoryPool, + facebook::velox::memory::MemoryPool* veloxPool, + int64_t& arrowToVeloxTime, + int64_t& decompressTime) + : in_(in), + schema_(schema), + codec_(codec), + rowType_(rowType), + memoryPool_(memoryPool), + veloxPool_(veloxPool), + arrowToVeloxTime_(arrowToVeloxTime), + decompressTime_(decompressTime) {} + +std::shared_ptr VeloxColumnarBatchDeserializer::next() { + ScopedTimer timer(decompressTime_); + uint32_t numRows; + GLUTEN_ASSIGN_OR_THROW( + auto arrowBuffers, BlockPayload::deserialize(in_.get(), schema_, codec_, memoryPool_, numRows)); + if (numRows == 0) { + // Reach EOS. + return nullptr; } + std::vector veloxBuffers; + veloxBuffers.reserve(arrowBuffers.size()); + for (auto& buffer : arrowBuffers) { + veloxBuffers.push_back(convertToVeloxBuffer(buffer)); + } + timer.switchTo(arrowToVeloxTime_); + auto rowVector = deserialize(rowType_, numRows, veloxBuffers, veloxPool_); + return std::make_shared(rowVector); } -std::shared_ptr VeloxShuffleReader::readStream(std::shared_ptr in) { - auto wrappedIn = ShuffleReader::readStream(in); - return std::make_shared(std::make_unique( - pool_, - veloxPool_, - options_, - rowType_, - [this](int64_t decompressionTime) { this->decompressTime_ += decompressionTime; }, - [this](int64_t deserializeTime) { this->deserializeTime_ += deserializeTime; }, - *wrappedIn)); +VeloxColumnarBatchDeserializerFactory::VeloxColumnarBatchDeserializerFactory( + const std::shared_ptr& schema, + const std::shared_ptr& codec, + const RowTypePtr& rowType, + arrow::MemoryPool* memoryPool, + std::shared_ptr veloxPool) + : schema_(schema), codec_(codec), rowType_(rowType), memoryPool_(memoryPool), veloxPool_(veloxPool) {} + +std::unique_ptr VeloxColumnarBatchDeserializerFactory::createDeserializer( + std::shared_ptr in) { + return std::make_unique( + std::move(in), schema_, codec_, rowType_, memoryPool_, veloxPool_.get(), arrowToVeloxTime_, decompressTime_); } +arrow::MemoryPool* VeloxColumnarBatchDeserializerFactory::getPool() { + return memoryPool_; +} + +int64_t VeloxColumnarBatchDeserializerFactory::getDecompressTime() { + return decompressTime_; +} + +int64_t VeloxColumnarBatchDeserializerFactory::getArrowToVeloxTime() { + return arrowToVeloxTime_; +} } // namespace gluten diff --git a/cpp/velox/shuffle/VeloxShuffleReader.h b/cpp/velox/shuffle/VeloxShuffleReader.h index df1236970db63..315bf73c43eea 100644 --- a/cpp/velox/shuffle/VeloxShuffleReader.h +++ b/cpp/velox/shuffle/VeloxShuffleReader.h @@ -17,27 +17,69 @@ #pragma once +#include "shuffle/BlockPayload.h" #include "shuffle/ShuffleReader.h" #include "velox/type/Type.h" #include "velox/vector/ComplexVector.h" namespace gluten { -class VeloxShuffleReader final : public ShuffleReader { +class VeloxColumnarBatchDeserializer final : public ColumnarBatchIterator { + public: + VeloxColumnarBatchDeserializer( + const std::shared_ptr& in, + const std::shared_ptr& schema, + const std::shared_ptr& codec, + const facebook::velox::RowTypePtr& rowType, + arrow::MemoryPool* memoryPool, + facebook::velox::memory::MemoryPool* veloxPool, + int64_t& arrowToVeloxTime, + int64_t& decompressTime); + + std::shared_ptr next(); + + private: + std::shared_ptr in_; + std::shared_ptr schema_; + std::shared_ptr codec_; + facebook::velox::RowTypePtr rowType_; + arrow::MemoryPool* memoryPool_; + facebook::velox::memory::MemoryPool* veloxPool_; + + int64_t& arrowToVeloxTime_; + int64_t& decompressTime_; +}; + +class VeloxColumnarBatchDeserializerFactory : public DeserializerFactory { public: - VeloxShuffleReader( - std::shared_ptr schema, - ShuffleReaderOptions options, - arrow::MemoryPool* pool, + VeloxColumnarBatchDeserializerFactory( + const std::shared_ptr& schema, + const std::shared_ptr& codec, + const facebook::velox::RowTypePtr& rowType, + arrow::MemoryPool* memoryPool, std::shared_ptr veloxPool); - std::shared_ptr readStream(std::shared_ptr in) override; + std::unique_ptr createDeserializer(std::shared_ptr in) override; + + arrow::MemoryPool* getPool() override; + + int64_t getDecompressTime() override; + + int64_t getArrowToVeloxTime() override; private: + std::shared_ptr schema_; + std::shared_ptr codec_; facebook::velox::RowTypePtr rowType_; + arrow::MemoryPool* memoryPool_; std::shared_ptr veloxPool_; -}; -extern bool veloxShuffleReaderPrintFlag; + int64_t arrowToVeloxTime_; + int64_t decompressTime_; +}; +class VeloxShuffleReader final : public ShuffleReader { + public: + VeloxShuffleReader(std::unique_ptr factory); +}; } // namespace gluten diff --git a/cpp/velox/shuffle/VeloxShuffleWriter.cc b/cpp/velox/shuffle/VeloxShuffleWriter.cc index 8e4acc8b8c587..2204947b79cd5 100644 --- a/cpp/velox/shuffle/VeloxShuffleWriter.cc +++ b/cpp/velox/shuffle/VeloxShuffleWriter.cc @@ -340,7 +340,7 @@ arrow::Status VeloxShuffleWriter::split(std::shared_ptr cb, int64 buffers.emplace_back(); ARROW_ASSIGN_OR_RAISE(buffers.back(), generateComplexTypeBuffers(rowVector)); } - RETURN_NOT_OK(evictBuffers(0, rv.size(), std::move(buffers))); + RETURN_NOT_OK(evictBuffers(0, rv.size(), std::move(buffers), false)); } else if (options_.partitioning == Partitioning::kRange) { auto compositeBatch = std::dynamic_pointer_cast(cb); VELOX_CHECK_NOT_NULL(compositeBatch); @@ -387,7 +387,7 @@ arrow::Status VeloxShuffleWriter::stop() { auto numRows = partitionBufferIdxBase_[pid]; if (numRows > 0) { ARROW_ASSIGN_OR_RAISE(auto buffers, assembleBuffers(pid, false)); - RETURN_NOT_OK(partitionWriter_->evict(pid, numRows, std::move(buffers), Evictor::Type::kStop)); + RETURN_NOT_OK(partitionWriter_->evict(pid, numRows, std::move(buffers), false, Evictor::Type::kStop)); } } } @@ -1042,9 +1042,10 @@ arrow::Status VeloxShuffleWriter::splitFixedWidthValueBuffer(const facebook::vel } arrow::Status VeloxShuffleWriter::evictBuffers( - uint32_t partitionId, uint32_t numRows, std::vector> buffers) { + uint32_t partitionId, uint32_t numRows, std::vector> buffers, bool reuseBuffers) { if (!buffers.empty()) { - RETURN_NOT_OK(partitionWriter_->evict(partitionId, numRows, std::move(buffers), Evictor::Type::kCache)); + RETURN_NOT_OK( + partitionWriter_->evict(partitionId, numRows, std::move(buffers), reuseBuffers, Evictor::Type::kCache)); } return arrow::Status::OK(); } @@ -1053,7 +1054,7 @@ arrow::Status VeloxShuffleWriter::splitFixedWidthValueBuffer(const facebook::vel auto numRows = partitionBufferIdxBase_[partitionId]; if (numRows > 0) { ARROW_ASSIGN_OR_RAISE(auto buffers, assembleBuffers(partitionId, reuseBuffers)); - RETURN_NOT_OK(evictBuffers(partitionId, numRows, buffers)); + RETURN_NOT_OK(evictBuffers(partitionId, numRows, buffers, reuseBuffers)); } return arrow::Status::OK(); } @@ -1164,20 +1165,6 @@ arrow::Status VeloxShuffleWriter::splitFixedWidthValueBuffer(const facebook::vel if (!reuseBuffers) { RETURN_NOT_OK(resetPartitionBuffer(partitionId)); } - if (options_.ipc_write_options.codec == nullptr && reuseBuffers) { - // Without compression, we need to perform a manual copy of the original buffers - // so that we can reuse them for next split. - for (auto i = 0; i < allBuffers.size(); ++i) { - auto& buffer = allBuffers[i]; - if (buffer) { - ARROW_ASSIGN_OR_RAISE(auto copy, arrow::AllocateResizableBuffer(buffer->size(), payloadPool_.get())); - if (buffer->size() > 0) { - gluten::fastCopy(copy->mutable_data(), buffer->data(), static_cast(buffer->size())); - } - buffer = std::move(copy); - } - } - } return allBuffers; } @@ -1442,7 +1429,7 @@ arrow::Status VeloxShuffleWriter::splitFixedWidthValueBuffer(const facebook::vel for (auto& item : pidToSize) { auto pid = item.first; ARROW_ASSIGN_OR_RAISE(auto buffers, assembleBuffers(pid, false)); - RETURN_NOT_OK(partitionWriter_->evict(pid, item.second, std::move(buffers), Evictor::Type::kFlush)); + RETURN_NOT_OK(partitionWriter_->evict(pid, item.second, std::move(buffers), false, Evictor::Type::kFlush)); evicted = beforeEvict - partitionBufferPool_->bytes_allocated(); if (evicted >= size) { break; diff --git a/cpp/velox/shuffle/VeloxShuffleWriter.h b/cpp/velox/shuffle/VeloxShuffleWriter.h index 13872fb4f4080..af74df1484cdc 100644 --- a/cpp/velox/shuffle/VeloxShuffleWriter.h +++ b/cpp/velox/shuffle/VeloxShuffleWriter.h @@ -250,8 +250,11 @@ class VeloxShuffleWriter final : public ShuffleWriter { arrow::Status splitComplexType(const facebook::velox::RowVector& rv); - arrow::Status - evictBuffers(uint32_t partitionId, uint32_t numRows, std::vector> buffers); + arrow::Status evictBuffers( + uint32_t partitionId, + uint32_t numRows, + std::vector> buffers, + bool reuseBuffers); arrow::Result>> assembleBuffers(uint32_t partitionId, bool reuseBuffers); diff --git a/cpp/velox/tests/VeloxShuffleWriterTest.cc b/cpp/velox/tests/VeloxShuffleWriterTest.cc index 2c475efa202be..7d1bc1e960c3f 100644 --- a/cpp/velox/tests/VeloxShuffleWriterTest.cc +++ b/cpp/velox/tests/VeloxShuffleWriterTest.cc @@ -541,7 +541,7 @@ TEST_F(VeloxShuffleWriterMemoryTest, kSplit) { TEST_F(VeloxShuffleWriterMemoryTest, kSplitSingle) { shuffleWriterOptions_.partitioning = Partitioning::kSingle; - auto pool = SelfEvictedMemoryPool(shuffleWriterOptions_.memory_pool); + auto pool = SelfEvictedMemoryPool(shuffleWriterOptions_.memory_pool, false); shuffleWriterOptions_.memory_pool = &pool; auto shuffleWriter = createShuffleWriter(); @@ -551,7 +551,7 @@ TEST_F(VeloxShuffleWriterMemoryTest, kSplitSingle) { // Trigger spill for the next split. ASSERT_TRUE(pool.checkEvict( - shuffleWriter->cachedPayloadSize() * 2, [&] { ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_)); })); + shuffleWriter->cachedPayloadSize(), [&] { ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_)); })); ASSERT_NOT_OK(shuffleWriter->stop()); } @@ -561,7 +561,7 @@ TEST_F(VeloxShuffleWriterMemoryTest, kStop) { for (const auto partitioning : {Partitioning::kSingle, Partitioning::kRoundRobin}) { shuffleWriterOptions_.partitioning = partitioning; shuffleWriterOptions_.buffer_size = 4; - auto pool = SelfEvictedMemoryPool(delegated); + auto pool = SelfEvictedMemoryPool(delegated, false); shuffleWriterOptions_.memory_pool = &pool; auto shuffleWriter = createShuffleWriter(); @@ -579,7 +579,7 @@ TEST_F(VeloxShuffleWriterMemoryTest, kStop) { } } -TEST_F(VeloxShuffleWriterMemoryTest, kUnevictable) { +TEST_F(VeloxShuffleWriterMemoryTest, evictPartitionBuffers) { auto delegated = shuffleWriterOptions_.memory_pool; shuffleWriterOptions_.buffer_size = 4; auto pool = SelfEvictedMemoryPool(delegated); @@ -597,10 +597,9 @@ TEST_F(VeloxShuffleWriterMemoryTest, kUnevictable) { ASSERT_GT(shuffleWriter->partitionBufferSize(), 0); // Set limited capacity. pool.setCapacity(0); - // Evict again. Because no cached payload to evict, it will try to compress and evict partition buffers. - // Throws OOM during allocating compression buffers. - auto status = shuffleWriter->evictFixedSize(shuffleWriter->partitionBufferSize(), &evicted); - ASSERT_TRUE(status.IsOutOfMemory()); + // Evict again. Because no cached payload to evict, it will try to evict all partition buffers. + ASSERT_NOT_OK(shuffleWriter->evictFixedSize(shuffleWriter->partitionBufferSize(), &evicted)); + ASSERT_EQ(shuffleWriter->partitionBufferSize(), 0); } TEST_F(VeloxShuffleWriterMemoryTest, kUnevictableSingle) { diff --git a/cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h b/cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h index 812eb319c7609..e4b472bb8b82b 100644 --- a/cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h +++ b/cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h @@ -250,7 +250,11 @@ class VeloxShuffleWriterTest : public ::testing::TestWithParam schema, std::vector& vectors) { ShuffleReaderOptions options; options.compression_type = shuffleWriterOptions_.compression_type; - auto reader = std::make_shared(schema, options, defaultArrowMemoryPool().get(), pool_); + auto codec = createArrowIpcCodec(options.compression_type, CodecBackend::NONE); + auto rowType = facebook::velox::asRowType(gluten::fromArrowSchema(schema)); + auto deserializerFactory = std::make_unique( + schema, std::move(codec), rowType, defaultArrowMemoryPool().get(), pool_); + auto reader = std::make_shared(std::move(deserializerFactory)); auto iter = reader->readStream(file_); while (iter->hasNext()) { auto vector = std::dynamic_pointer_cast(iter->next())->getRowVector();