diff --git a/cpp/velox/memory/VeloxMemoryManager.cc b/cpp/velox/memory/VeloxMemoryManager.cc index 0a57d6a997d8..6b5606dd228e 100644 --- a/cpp/velox/memory/VeloxMemoryManager.cc +++ b/cpp/velox/memory/VeloxMemoryManager.cc @@ -63,11 +63,18 @@ class ListenableArbitrator : public velox::memory::MemoryArbitrator { ListenableArbitrator(const Config& config, AllocationListener* listener) : MemoryArbitrator(config), listener_(listener), - memoryPoolInitialCapacity_( - getConfig(config.extraConfigs, kMemoryPoolInitialCapacity, kDefaultMemoryPoolInitialCapacity)), - memoryPoolTransferCapacity_( - getConfig(config.extraConfigs, kMemoryPoolTransferCapacity, kDefaultMemoryPoolTransferCapacity)) { - } + memoryPoolInitialCapacity_(velox::config::toCapacity( + getConfig( + config.extraConfigs, + kMemoryPoolInitialCapacity, + std::to_string(kDefaultMemoryPoolInitialCapacity)), + velox::config::CapacityUnit::BYTE)), + memoryPoolTransferCapacity_(velox::config::toCapacity( + getConfig( + config.extraConfigs, + kMemoryPoolTransferCapacity, + std::to_string(kDefaultMemoryPoolTransferCapacity)), + velox::config::CapacityUnit::BYTE)) {} std::string kind() const override { return kind_; } diff --git a/cpp/velox/operators/serializer/VeloxColumnarBatchSerializer.cc b/cpp/velox/operators/serializer/VeloxColumnarBatchSerializer.cc index 8b21e7bdbbeb..acb14cf4de39 100644 --- a/cpp/velox/operators/serializer/VeloxColumnarBatchSerializer.cc +++ b/cpp/velox/operators/serializer/VeloxColumnarBatchSerializer.cc @@ -34,7 +34,7 @@ namespace { std::unique_ptr toByteStream(uint8_t* data, int32_t size) { std::vector byteRanges; byteRanges.push_back(ByteRange{data, size, 0}); - auto byteStream = std::make_unique(byteRanges); + auto byteStream = std::make_unique(byteRanges); return byteStream; } } // namespace diff --git a/cpp/velox/shuffle/GlutenByteStream.h b/cpp/velox/shuffle/GlutenByteStream.h new file mode 100644 index 000000000000..78ea7b905adc --- /dev/null +++ b/cpp/velox/shuffle/GlutenByteStream.h @@ -0,0 +1,267 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// TODO: wait to delete after rss sort reader refactored. +#include "velox/common/memory/ByteStream.h" + +namespace facebook::velox { +class GlutenByteInputStream : public ByteInputStream { + protected: + /// TODO Remove after refactoring SpillInput. + GlutenByteInputStream() {} + + public: + explicit GlutenByteInputStream(std::vector ranges) { + ranges_ = std::move(ranges); + VELOX_CHECK(!ranges_.empty()); + current_ = &ranges_[0]; + } + + /// Disable copy constructor. + GlutenByteInputStream(const GlutenByteInputStream&) = delete; + + /// Disable copy assignment operator. + GlutenByteInputStream& operator=(const GlutenByteInputStream& other) = delete; + + /// Enable move constructor. + GlutenByteInputStream(GlutenByteInputStream&& other) noexcept = delete; + + /// Enable move assignment operator. + GlutenByteInputStream& operator=(GlutenByteInputStream&& other) noexcept { + if (this != &other) { + ranges_ = std::move(other.ranges_); + current_ = other.current_; + other.current_ = nullptr; + } + return *this; + } + + /// TODO Remove after refactoring SpillInput. + virtual ~GlutenByteInputStream() = default; + + /// Returns total number of bytes available in the stream. + size_t size() const { + size_t total = 0; + for (const auto& range : ranges_) { + total += range.size; + } + return total; + } + + /// Returns true if all input has been read. + /// + /// TODO: Remove 'virtual' after refactoring SpillInput. + virtual bool atEnd() const { + if (!current_) { + return false; + } + if (current_->position < current_->size) { + return false; + } + + VELOX_CHECK(current_ >= ranges_.data() && current_ <= &ranges_.back()); + return current_ == &ranges_.back(); + } + + /// Returns current position (number of bytes from the start) in the stream. + std::streampos tellp() const { + if (ranges_.empty()) { + return 0; + } + VELOX_DCHECK_NOT_NULL(current_); + int64_t size = 0; + for (auto& range : ranges_) { + if (&range == current_) { + return current_->position + size; + } + size += range.size; + } + VELOX_FAIL("GlutenByteInputStream 'current_' is not in 'ranges_'."); + } + + /// Moves current position to specified one. + void seekp(std::streampos position) { + if (ranges_.empty() && position == 0) { + return; + } + int64_t toSkip = position; + for (auto& range : ranges_) { + if (toSkip <= range.size) { + current_ = ⦥ + current_->position = toSkip; + return; + } + toSkip -= range.size; + } + static_assert(sizeof(std::streamsize) <= sizeof(long long)); + VELOX_FAIL("Seeking past end of GlutenByteInputStream: {}", static_cast(position)); + } + + /// Returns the remaining size left from current reading position. + size_t remainingSize() const { + if (ranges_.empty()) { + return 0; + } + const auto* lastRange = &ranges_[ranges_.size() - 1]; + auto cur = current_; + size_t total = cur->size - cur->position; + while (++cur <= lastRange) { + total += cur->size; + } + return total; + } + + std::string toString() const { + std::stringstream oss; + oss << ranges_.size() << " ranges (position/size) ["; + for (const auto& range : ranges_) { + oss << "(" << range.position << "/" << range.size << (&range == current_ ? " current" : "") << ")"; + if (&range != &ranges_.back()) { + oss << ","; + } + } + oss << "]"; + return oss.str(); + } + + uint8_t readByte() { + if (current_->position < current_->size) { + return current_->buffer[current_->position++]; + } + next(); + return readByte(); + } + + void readBytes(uint8_t* bytes, int32_t size) { + VELOX_CHECK_GE(size, 0, "Attempting to read negative number of bytes"); + int32_t offset = 0; + for (;;) { + int32_t available = current_->size - current_->position; + int32_t numUsed = std::min(available, size); + simd::memcpy(bytes + offset, current_->buffer + current_->position, numUsed); + offset += numUsed; + size -= numUsed; + current_->position += numUsed; + if (!size) { + return; + } + next(); + } + } + + template + T read() { + if (current_->position + sizeof(T) <= current_->size) { + current_->position += sizeof(T); + return *reinterpret_cast(current_->buffer + current_->position - sizeof(T)); + } + // The number straddles two buffers. We read byte by byte and make + // a little-endian uint64_t. The bytes can be cast to any integer + // or floating point type since the wire format has the machine byte order. + static_assert(sizeof(T) <= sizeof(uint64_t)); + uint64_t value = 0; + for (int32_t i = 0; i < sizeof(T); ++i) { + value |= static_cast(readByte()) << (i * 8); + } + return *reinterpret_cast(&value); + } + + template + void readBytes(Char* data, int32_t size) { + readBytes(reinterpret_cast(data), size); + } + + /// Returns a view over the read buffer for up to 'size' next + /// bytes. The size of the value may be less if the current byte + /// range ends within 'size' bytes from the current position. The + /// size will be 0 if at end. + std::string_view nextView(int32_t size) { + VELOX_CHECK_GE(size, 0, "Attempting to view negative number of bytes"); + if (current_->position == current_->size) { + if (current_ == &ranges_.back()) { + return std::string_view(nullptr, 0); + } + next(); + } + VELOX_CHECK(current_->size); + auto position = current_->position; + auto viewSize = std::min(current_->size - current_->position, size); + current_->position += viewSize; + return std::string_view(reinterpret_cast(current_->buffer) + position, viewSize); + } + + void skip(int32_t size) { + VELOX_CHECK_GE(size, 0, "Attempting to skip negative number of bytes"); + for (;;) { + int32_t available = current_->size - current_->position; + int32_t numUsed = std::min(available, size); + size -= numUsed; + current_->position += numUsed; + if (!size) { + return; + } + next(); + } + } + + protected: + /// Sets 'current_' to point to the next range of input. // The + /// input is consecutive ByteRanges in 'ranges_' for the base class + /// but any view over external buffers can be made by specialization. + /// + /// TODO: Remove 'virtual' after refactoring SpillInput. + virtual void next(bool throwIfPastEnd = true) { + VELOX_CHECK(current_ >= &ranges_[0]); + size_t position = current_ - &ranges_[0]; + VELOX_CHECK_LT(position, ranges_.size()); + if (position == ranges_.size() - 1) { + if (throwIfPastEnd) { + VELOX_FAIL("Reading past end of GlutenByteInputStream"); + } + return; + } + ++current_; + current_->position = 0; + } + + // TODO: Remove after refactoring SpillInput. + const std::vector& ranges() const { + return ranges_; + } + + // TODO: Remove after refactoring SpillInput. + void setRange(ByteRange range) { + ranges_.resize(1); + ranges_[0] = range; + current_ = ranges_.data(); + } +}; + +template <> +inline Timestamp GlutenByteInputStream::read() { + Timestamp value; + readBytes(reinterpret_cast(&value), sizeof(value)); + return value; +} + +template <> +inline int128_t GlutenByteInputStream::read() { + int128_t value; + readBytes(reinterpret_cast(&value), sizeof(value)); + return value; +} +} // namespace facebook::velox diff --git a/cpp/velox/shuffle/VeloxShuffleReader.cc b/cpp/velox/shuffle/VeloxShuffleReader.cc index 4d002499c9af..3966857b9e9d 100644 --- a/cpp/velox/shuffle/VeloxShuffleReader.cc +++ b/cpp/velox/shuffle/VeloxShuffleReader.cc @@ -16,6 +16,7 @@ */ #include "VeloxShuffleReader.h" +#include "GlutenByteStream.h" #include #include @@ -177,7 +178,7 @@ VectorPtr readFlatVector( std::unique_ptr toByteStream(uint8_t* data, int32_t size) { std::vector byteRanges; byteRanges.push_back(ByteRange{data, size, 0}); - auto byteStream = std::make_unique(byteRanges); + auto byteStream = std::make_unique(byteRanges); return byteStream; } @@ -450,7 +451,7 @@ std::shared_ptr VeloxSortShuffleReaderDeserializer::deserializeTo return std::make_shared(std::move(rowVector)); } -class VeloxRssSortShuffleReaderDeserializer::VeloxInputStream : public facebook::velox::ByteInputStream { +class VeloxRssSortShuffleReaderDeserializer::VeloxInputStream : public facebook::velox::GlutenByteInputStream { public: VeloxInputStream(std::shared_ptr input, facebook::velox::BufferPtr buffer); diff --git a/ep/build-velox/src/get_velox.sh b/ep/build-velox/src/get_velox.sh index b51233983480..f857ceada653 100755 --- a/ep/build-velox/src/get_velox.sh +++ b/ep/build-velox/src/get_velox.sh @@ -17,7 +17,7 @@ set -exu VELOX_REPO=https://github.com/oap-project/velox.git -VELOX_BRANCH=2024_08_15 +VELOX_BRANCH=2024_08_16 VELOX_HOME="" OS=`uname -s`