From 1c0cd32feb19a367f02e6fcb4308fc1bd560b533 Mon Sep 17 00:00:00 2001 From: glutenperfbot Date: Thu, 15 Aug 2024 21:00:53 +0000 Subject: [PATCH 1/7] [VL] Daily Update Velox Version (2024_08_16) Signed-off-by: glutenperfbot --- ep/build-velox/src/get_velox.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ep/build-velox/src/get_velox.sh b/ep/build-velox/src/get_velox.sh index 9af55aae2758..f857ceada653 100755 --- a/ep/build-velox/src/get_velox.sh +++ b/ep/build-velox/src/get_velox.sh @@ -17,7 +17,7 @@ set -exu VELOX_REPO=https://github.com/oap-project/velox.git -VELOX_BRANCH=2024_08_14 +VELOX_BRANCH=2024_08_16 VELOX_HOME="" OS=`uname -s` From c8f781367d37fe0e5ec3dab9461b71eb6d2585d2 Mon Sep 17 00:00:00 2001 From: Chengcheng Jin Date: Fri, 16 Aug 2024 09:03:41 +0000 Subject: [PATCH 2/7] Fix compile --- cpp/velox/operators/serializer/VeloxColumnarBatchSerializer.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/velox/operators/serializer/VeloxColumnarBatchSerializer.cc b/cpp/velox/operators/serializer/VeloxColumnarBatchSerializer.cc index 8b21e7bdbbeb..acb14cf4de39 100644 --- a/cpp/velox/operators/serializer/VeloxColumnarBatchSerializer.cc +++ b/cpp/velox/operators/serializer/VeloxColumnarBatchSerializer.cc @@ -34,7 +34,7 @@ namespace { std::unique_ptr toByteStream(uint8_t* data, int32_t size) { std::vector byteRanges; byteRanges.push_back(ByteRange{data, size, 0}); - auto byteStream = std::make_unique(byteRanges); + auto byteStream = std::make_unique(byteRanges); return byteStream; } } // namespace From 7f57354f25cf1dc11e606d1e0ab4c7816dccae2a Mon Sep 17 00:00:00 2001 From: Chengcheng Jin Date: Fri, 16 Aug 2024 10:48:10 +0000 Subject: [PATCH 3/7] Fix ByteInputStream interface --- cpp/velox/shuffle/VeloxShuffleReader.cc | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/cpp/velox/shuffle/VeloxShuffleReader.cc b/cpp/velox/shuffle/VeloxShuffleReader.cc index 4d002499c9af..3966857b9e9d 100644 --- a/cpp/velox/shuffle/VeloxShuffleReader.cc +++ b/cpp/velox/shuffle/VeloxShuffleReader.cc @@ -16,6 +16,7 @@ */ #include "VeloxShuffleReader.h" +#include "GlutenByteStream.h" #include #include @@ -177,7 +178,7 @@ VectorPtr readFlatVector( std::unique_ptr toByteStream(uint8_t* data, int32_t size) { std::vector byteRanges; byteRanges.push_back(ByteRange{data, size, 0}); - auto byteStream = std::make_unique(byteRanges); + auto byteStream = std::make_unique(byteRanges); return byteStream; } @@ -450,7 +451,7 @@ std::shared_ptr VeloxSortShuffleReaderDeserializer::deserializeTo return std::make_shared(std::move(rowVector)); } -class VeloxRssSortShuffleReaderDeserializer::VeloxInputStream : public facebook::velox::ByteInputStream { +class VeloxRssSortShuffleReaderDeserializer::VeloxInputStream : public facebook::velox::GlutenByteInputStream { public: VeloxInputStream(std::shared_ptr input, facebook::velox::BufferPtr buffer); From e91d5bbbaa559e23b5f47fefc0c7a0f730c0bfb6 Mon Sep 17 00:00:00 2001 From: Chengcheng Jin Date: Fri, 16 Aug 2024 10:50:07 +0000 Subject: [PATCH 4/7] add file --- cpp/velox/shuffle/GlutenByteStream.h | 273 +++++++++++++++++++++++++++ 1 file changed, 273 insertions(+) create mode 100644 cpp/velox/shuffle/GlutenByteStream.h diff --git a/cpp/velox/shuffle/GlutenByteStream.h b/cpp/velox/shuffle/GlutenByteStream.h new file mode 100644 index 000000000000..711b7fb40e69 --- /dev/null +++ b/cpp/velox/shuffle/GlutenByteStream.h @@ -0,0 +1,273 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// TODO: wait to delete after rss sort reader refactored. +#include "velox/common/memory/ByteStream.h" + +namespace facebook::velox { +class GlutenByteInputStream : public ByteInputStream { + protected: + /// TODO Remove after refactoring SpillInput. + GlutenByteInputStream() {} + + public: + explicit GlutenByteInputStream(std::vector ranges) : ranges_{std::move(ranges)} { + VELOX_CHECK(!ranges_.empty()); + current_ = &ranges_[0]; + } + + /// Disable copy constructor. + GlutenByteInputStream(const GlutenByteInputStream&) = delete; + + /// Disable copy assignment operator. + GlutenByteInputStream& operator=(const GlutenByteInputStream& other) = delete; + + /// Enable move constructor. + GlutenByteInputStream(GlutenByteInputStream&& other) noexcept + : ranges_{std::move(other.ranges_)}, current_{other.current_} {} + + /// Enable move assignment operator. + GlutenByteInputStream& operator=(GlutenByteInputStream&& other) noexcept { + if (this != &other) { + ranges_ = std::move(other.ranges_); + current_ = other.current_; + other.current_ = nullptr; + } + return *this; + } + + /// TODO Remove after refactoring SpillInput. + virtual ~GlutenByteInputStream() = default; + + /// Returns total number of bytes available in the stream. + size_t size() const { + size_t total = 0; + for (const auto& range : ranges_) { + total += range.size; + } + return total; + } + + /// Returns true if all input has been read. + /// + /// TODO: Remove 'virtual' after refactoring SpillInput. + virtual bool atEnd() const { + if (!current_) { + return false; + } + if (current_->position < current_->size) { + return false; + } + + VELOX_CHECK(current_ >= ranges_.data() && current_ <= &ranges_.back()); + return current_ == &ranges_.back(); + } + + /// Returns current position (number of bytes from the start) in the stream. + std::streampos tellp() const { + if (ranges_.empty()) { + return 0; + } + VELOX_DCHECK_NOT_NULL(current_); + int64_t size = 0; + for (auto& range : ranges_) { + if (&range == current_) { + return current_->position + size; + } + size += range.size; + } + VELOX_FAIL("GlutenByteInputStream 'current_' is not in 'ranges_'."); + } + + /// Moves current position to specified one. + void seekp(std::streampos position) { + if (ranges_.empty() && position == 0) { + return; + } + int64_t toSkip = position; + for (auto& range : ranges_) { + if (toSkip <= range.size) { + current_ = ⦥ + current_->position = toSkip; + return; + } + toSkip -= range.size; + } + static_assert(sizeof(std::streamsize) <= sizeof(long long)); + VELOX_FAIL("Seeking past end of GlutenByteInputStream: {}", static_cast(position)); + } + + /// Returns the remaining size left from current reading position. + size_t remainingSize() const { + if (ranges_.empty()) { + return 0; + } + const auto* lastRange = &ranges_[ranges_.size() - 1]; + auto cur = current_; + size_t total = cur->size - cur->position; + while (++cur <= lastRange) { + total += cur->size; + } + return total; + } + + std::string toString() const { + std::stringstream oss; + oss << ranges_.size() << " ranges (position/size) ["; + for (const auto& range : ranges_) { + oss << "(" << range.position << "/" << range.size << (&range == current_ ? " current" : "") << ")"; + if (&range != &ranges_.back()) { + oss << ","; + } + } + oss << "]"; + return oss.str(); + } + + uint8_t readByte() { + if (current_->position < current_->size) { + return current_->buffer[current_->position++]; + } + next(); + return readByte(); + } + + void readBytes(uint8_t* bytes, int32_t size) { + VELOX_CHECK_GE(size, 0, "Attempting to read negative number of bytes"); + int32_t offset = 0; + for (;;) { + int32_t available = current_->size - current_->position; + int32_t numUsed = std::min(available, size); + simd::memcpy(bytes + offset, current_->buffer + current_->position, numUsed); + offset += numUsed; + size -= numUsed; + current_->position += numUsed; + if (!size) { + return; + } + next(); + } + } + + template + T read() { + if (current_->position + sizeof(T) <= current_->size) { + current_->position += sizeof(T); + return *reinterpret_cast(current_->buffer + current_->position - sizeof(T)); + } + // The number straddles two buffers. We read byte by byte and make + // a little-endian uint64_t. The bytes can be cast to any integer + // or floating point type since the wire format has the machine byte order. + static_assert(sizeof(T) <= sizeof(uint64_t)); + uint64_t value = 0; + for (int32_t i = 0; i < sizeof(T); ++i) { + value |= static_cast(readByte()) << (i * 8); + } + return *reinterpret_cast(&value); + } + + template + void readBytes(Char* data, int32_t size) { + readBytes(reinterpret_cast(data), size); + } + + /// Returns a view over the read buffer for up to 'size' next + /// bytes. The size of the value may be less if the current byte + /// range ends within 'size' bytes from the current position. The + /// size will be 0 if at end. + std::string_view nextView(int32_t size) { + VELOX_CHECK_GE(size, 0, "Attempting to view negative number of bytes"); + if (current_->position == current_->size) { + if (current_ == &ranges_.back()) { + return std::string_view(nullptr, 0); + } + next(); + } + VELOX_CHECK(current_->size); + auto position = current_->position; + auto viewSize = std::min(current_->size - current_->position, size); + current_->position += viewSize; + return std::string_view(reinterpret_cast(current_->buffer) + position, viewSize); + } + + void skip(int32_t size) { + VELOX_CHECK_GE(size, 0, "Attempting to skip negative number of bytes"); + for (;;) { + int32_t available = current_->size - current_->position; + int32_t numUsed = std::min(available, size); + size -= numUsed; + current_->position += numUsed; + if (!size) { + return; + } + next(); + } + } + + protected: + /// Sets 'current_' to point to the next range of input. // The + /// input is consecutive ByteRanges in 'ranges_' for the base class + /// but any view over external buffers can be made by specialization. + /// + /// TODO: Remove 'virtual' after refactoring SpillInput. + virtual void next(bool throwIfPastEnd = true) { + VELOX_CHECK(current_ >= &ranges_[0]); + size_t position = current_ - &ranges_[0]; + VELOX_CHECK_LT(position, ranges_.size()); + if (position == ranges_.size() - 1) { + if (throwIfPastEnd) { + VELOX_FAIL("Reading past end of GlutenByteInputStream"); + } + return; + } + ++current_; + current_->position = 0; + } + + // TODO: Remove after refactoring SpillInput. + const std::vector& ranges() const { + return ranges_; + } + + // TODO: Remove after refactoring SpillInput. + void setRange(ByteRange range) { + ranges_.resize(1); + ranges_[0] = range; + current_ = ranges_.data(); + } + + private: + std::vector ranges_; + + // Pointer to the current element of 'ranges_'. + ByteRange* current_{nullptr}; +}; + +template <> +inline Timestamp GlutenByteInputStream::read() { + Timestamp value; + readBytes(reinterpret_cast(&value), sizeof(value)); + return value; +} + +template <> +inline int128_t GlutenByteInputStream::read() { + int128_t value; + readBytes(reinterpret_cast(&value), sizeof(value)); + return value; +} +} // namespace facebook::velox \ No newline at end of file From ab755e5d842a1443733c49a95e96705519ccb87a Mon Sep 17 00:00:00 2001 From: Chengcheng Jin Date: Fri, 16 Aug 2024 10:50:45 +0000 Subject: [PATCH 5/7] fix code style --- cpp/velox/shuffle/GlutenByteStream.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/velox/shuffle/GlutenByteStream.h b/cpp/velox/shuffle/GlutenByteStream.h index 711b7fb40e69..2458581d56d7 100644 --- a/cpp/velox/shuffle/GlutenByteStream.h +++ b/cpp/velox/shuffle/GlutenByteStream.h @@ -270,4 +270,4 @@ inline int128_t GlutenByteInputStream::read() { readBytes(reinterpret_cast(&value), sizeof(value)); return value; } -} // namespace facebook::velox \ No newline at end of file +} // namespace facebook::velox From babe70bb4f00cdc3edf6c2f4a4130611b8136dd4 Mon Sep 17 00:00:00 2001 From: Chengcheng Jin Date: Fri, 16 Aug 2024 14:47:13 +0000 Subject: [PATCH 6/7] fix sort and memory manager fail --- cpp/velox/memory/VeloxMemoryManager.cc | 17 ++++++++++++----- cpp/velox/shuffle/GlutenByteStream.h | 12 +++--------- cpp/velox/tests/MemoryManagerTest.cc | 2 ++ 3 files changed, 17 insertions(+), 14 deletions(-) diff --git a/cpp/velox/memory/VeloxMemoryManager.cc b/cpp/velox/memory/VeloxMemoryManager.cc index 0a57d6a997d8..6b5606dd228e 100644 --- a/cpp/velox/memory/VeloxMemoryManager.cc +++ b/cpp/velox/memory/VeloxMemoryManager.cc @@ -63,11 +63,18 @@ class ListenableArbitrator : public velox::memory::MemoryArbitrator { ListenableArbitrator(const Config& config, AllocationListener* listener) : MemoryArbitrator(config), listener_(listener), - memoryPoolInitialCapacity_( - getConfig(config.extraConfigs, kMemoryPoolInitialCapacity, kDefaultMemoryPoolInitialCapacity)), - memoryPoolTransferCapacity_( - getConfig(config.extraConfigs, kMemoryPoolTransferCapacity, kDefaultMemoryPoolTransferCapacity)) { - } + memoryPoolInitialCapacity_(velox::config::toCapacity( + getConfig( + config.extraConfigs, + kMemoryPoolInitialCapacity, + std::to_string(kDefaultMemoryPoolInitialCapacity)), + velox::config::CapacityUnit::BYTE)), + memoryPoolTransferCapacity_(velox::config::toCapacity( + getConfig( + config.extraConfigs, + kMemoryPoolTransferCapacity, + std::to_string(kDefaultMemoryPoolTransferCapacity)), + velox::config::CapacityUnit::BYTE)) {} std::string kind() const override { return kind_; } diff --git a/cpp/velox/shuffle/GlutenByteStream.h b/cpp/velox/shuffle/GlutenByteStream.h index 2458581d56d7..78ea7b905adc 100644 --- a/cpp/velox/shuffle/GlutenByteStream.h +++ b/cpp/velox/shuffle/GlutenByteStream.h @@ -25,7 +25,8 @@ class GlutenByteInputStream : public ByteInputStream { GlutenByteInputStream() {} public: - explicit GlutenByteInputStream(std::vector ranges) : ranges_{std::move(ranges)} { + explicit GlutenByteInputStream(std::vector ranges) { + ranges_ = std::move(ranges); VELOX_CHECK(!ranges_.empty()); current_ = &ranges_[0]; } @@ -37,8 +38,7 @@ class GlutenByteInputStream : public ByteInputStream { GlutenByteInputStream& operator=(const GlutenByteInputStream& other) = delete; /// Enable move constructor. - GlutenByteInputStream(GlutenByteInputStream&& other) noexcept - : ranges_{std::move(other.ranges_)}, current_{other.current_} {} + GlutenByteInputStream(GlutenByteInputStream&& other) noexcept = delete; /// Enable move assignment operator. GlutenByteInputStream& operator=(GlutenByteInputStream&& other) noexcept { @@ -249,12 +249,6 @@ class GlutenByteInputStream : public ByteInputStream { ranges_[0] = range; current_ = ranges_.data(); } - - private: - std::vector ranges_; - - // Pointer to the current element of 'ranges_'. - ByteRange* current_{nullptr}; }; template <> diff --git a/cpp/velox/tests/MemoryManagerTest.cc b/cpp/velox/tests/MemoryManagerTest.cc index bb102dc2d8c3..d768c9690aae 100644 --- a/cpp/velox/tests/MemoryManagerTest.cc +++ b/cpp/velox/tests/MemoryManagerTest.cc @@ -331,7 +331,9 @@ class MultiMemoryManagerTest : public ::testing::Test { std::unordered_map conf = { {kMemoryReservationBlockSize, std::to_string(kMemoryReservationBlockSizeDefault)}, {kVeloxMemInitCapacity, std::to_string(kVeloxMemInitCapacityDefault)}}; + std::cout << "create the velox backend" << std::endl; gluten::VeloxBackend::create(conf); + std::cout << "created the velox backend" << std::endl; } std::unique_ptr newVeloxMemoryManager(std::unique_ptr listener) { From 1bb31693d55d2c05fa6e625d6f5a701ddd5d612c Mon Sep 17 00:00:00 2001 From: Chengcheng Jin Date: Fri, 16 Aug 2024 14:52:58 +0000 Subject: [PATCH 7/7] remove minor --- cpp/velox/tests/MemoryManagerTest.cc | 2 -- 1 file changed, 2 deletions(-) diff --git a/cpp/velox/tests/MemoryManagerTest.cc b/cpp/velox/tests/MemoryManagerTest.cc index d768c9690aae..bb102dc2d8c3 100644 --- a/cpp/velox/tests/MemoryManagerTest.cc +++ b/cpp/velox/tests/MemoryManagerTest.cc @@ -331,9 +331,7 @@ class MultiMemoryManagerTest : public ::testing::Test { std::unordered_map conf = { {kMemoryReservationBlockSize, std::to_string(kMemoryReservationBlockSizeDefault)}, {kVeloxMemInitCapacity, std::to_string(kVeloxMemInitCapacityDefault)}}; - std::cout << "create the velox backend" << std::endl; gluten::VeloxBackend::create(conf); - std::cout << "created the velox backend" << std::endl; } std::unique_ptr newVeloxMemoryManager(std::unique_ptr listener) {