From fb28be5b11ce4dcef0760106c743df87cbc3e399 Mon Sep 17 00:00:00 2001 From: yanngyoung Date: Wed, 21 Aug 2024 17:23:05 +0800 Subject: [PATCH] ByteStream refactor --- velox/common/base/ByteInputStream.cpp | 29 +++ velox/common/base/ByteInputStream.h | 115 +++++++++ .../ByteOutputStream.cpp} | 211 +--------------- .../ByteStream.h => base/ByteOutputStream.h} | 229 +----------------- velox/common/base/CMakeLists.txt | 2 + velox/common/file/FileInputStream.h | 2 +- .../common/file/tests/FileInputStreamTest.cpp | 2 - velox/common/memory/BufferInputStream.cpp | 167 +++++++++++++ velox/common/memory/BufferInputStream.h | 70 ++++++ velox/common/memory/CMakeLists.txt | 3 +- velox/common/memory/HashStringAllocator.h | 2 +- velox/common/memory/OutputStream.cpp | 76 ++++++ velox/common/memory/OutputStream.h | 110 +++++++++ velox/common/memory/StreamArena.cpp | 2 +- velox/common/memory/tests/ByteStreamTest.cpp | 4 +- velox/common/memory/tests/StreamArenaTest.cpp | 2 +- velox/exec/ContainerRowSerde.h | 2 +- velox/exec/ExchangeQueue.h | 3 +- velox/serializers/PrestoSerializer.cpp | 2 +- .../tests/PrestoSerializerTest.cpp | 1 - .../serializers/tests/SerializerBenchmark.cpp | 1 - velox/vector/VectorStream.h | 2 +- velox/vector/tests/VectorTest.cpp | 2 +- 23 files changed, 586 insertions(+), 453 deletions(-) create mode 100644 velox/common/base/ByteInputStream.cpp create mode 100644 velox/common/base/ByteInputStream.h rename velox/common/{memory/ByteStream.cpp => base/ByteOutputStream.cpp} (54%) rename velox/common/{memory/ByteStream.h => base/ByteOutputStream.h} (57%) create mode 100644 velox/common/memory/BufferInputStream.cpp create mode 100644 velox/common/memory/BufferInputStream.h create mode 100644 velox/common/memory/OutputStream.cpp create mode 100644 velox/common/memory/OutputStream.h diff --git a/velox/common/base/ByteInputStream.cpp b/velox/common/base/ByteInputStream.cpp new file mode 100644 index 000000000000..3d2589299191 --- /dev/null +++ b/velox/common/base/ByteInputStream.cpp @@ -0,0 +1,29 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/common/base/ByteInputStream.h" + +namespace facebook::velox { + +uint32_t ByteRange::availableBytes() const { + return std::max(0, size - position); +} + +std::string ByteRange::toString() const { + return fmt::format("[{} starting at {}]", succinctBytes(size), position); +} + +} // namespace facebook::velox diff --git a/velox/common/base/ByteInputStream.h b/velox/common/base/ByteInputStream.h new file mode 100644 index 000000000000..dece6724c377 --- /dev/null +++ b/velox/common/base/ByteInputStream.h @@ -0,0 +1,115 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include "velox/common/memory/Memory.h" +#include "velox/type/Type.h" + +namespace facebook::velox { + +struct ByteRange { + /// Start of buffer. Not owned. + uint8_t* buffer; + + /// Number of bytes or bits starting at 'buffer'. + int32_t size; + + /// Index of next byte/bit to be read/written in 'buffer'. + int32_t position; + + /// Returns the available bytes left in this range. + uint32_t availableBytes() const; + + std::string toString() const; +}; + +/// Read-only byte input stream interface. +class ByteInputStream { + public: + virtual ~ByteInputStream() = default; + + /// Returns total number of bytes available in the stream. + virtual size_t size() const = 0; + + /// Returns true if all input has been read. + virtual bool atEnd() const = 0; + + /// Returns current position (number of bytes from the start) in the stream. + virtual std::streampos tellp() const = 0; + + /// Moves current position to specified one. + virtual void seekp(std::streampos pos) = 0; + + /// Returns the remaining size left from current reading position. + virtual size_t remainingSize() const = 0; + + virtual uint8_t readByte() = 0; + + virtual void readBytes(uint8_t* bytes, int32_t size) = 0; + + template + T read() { + if (current_->position + sizeof(T) <= current_->size) { + current_->position += sizeof(T); + return *reinterpret_cast( + current_->buffer + current_->position - sizeof(T)); + } + // The number straddles two buffers. We read byte by byte and make a + // little-endian uint64_t. The bytes can be cast to any integer or floating + // point type since the wire format has the machine byte order. + static_assert(sizeof(T) <= sizeof(uint64_t)); + uint64_t value = 0; + for (int32_t i = 0; i < sizeof(T); ++i) { + value |= static_cast(readByte()) << (i * 8); + } + return *reinterpret_cast(&value); + } + + template + void readBytes(Char* data, int32_t size) { + readBytes(reinterpret_cast(data), size); + } + + /// Returns a view over the read buffer for up to 'size' next bytes. The size + /// of the value may be less if the current byte range ends within 'size' + /// bytes from the current position. The size will be 0 if at end. + virtual std::string_view nextView(int32_t size) = 0; + + virtual void skip(int32_t size) = 0; + + virtual std::string toString() const = 0; + + protected: + // Points to the current buffered byte range. + ByteRange* current_{nullptr}; + std::vector ranges_; +}; + +template <> +inline Timestamp ByteInputStream::read() { + Timestamp value; + readBytes(reinterpret_cast(&value), sizeof(value)); + return value; +} + +template <> +inline int128_t ByteInputStream::read() { + int128_t value; + readBytes(reinterpret_cast(&value), sizeof(value)); + return value; +} + +} // namespace facebook::velox diff --git a/velox/common/memory/ByteStream.cpp b/velox/common/base/ByteOutputStream.cpp similarity index 54% rename from velox/common/memory/ByteStream.cpp rename to velox/common/base/ByteOutputStream.cpp index e7802f477823..153c746c8e12 100644 --- a/velox/common/memory/ByteStream.cpp +++ b/velox/common/base/ByteOutputStream.cpp @@ -14,164 +14,10 @@ * limitations under the License. */ -#include "velox/common/memory/ByteStream.h" +#include "velox/common/memory/OutputStream.h" namespace facebook::velox { -uint32_t ByteRange::availableBytes() const { - return std::max(0, size - position); -} - -std::string ByteRange::toString() const { - return fmt::format("[{} starting at {}]", succinctBytes(size), position); -} - -std::string BufferInputStream::toString() const { - std::stringstream oss; - oss << ranges_.size() << " ranges (position/size) ["; - for (const auto& range : ranges_) { - oss << "(" << range.position << "/" << range.size - << (&range == current_ ? " current" : "") << ")"; - if (&range != &ranges_.back()) { - oss << ","; - } - } - oss << "]"; - return oss.str(); -} - -bool BufferInputStream::atEnd() const { - if (current_ == nullptr) { - return false; - } - if (current_->position < current_->size) { - return false; - } - - VELOX_CHECK(current_ >= ranges_.data() && current_ <= &ranges_.back()); - return current_ == &ranges_.back(); -} - -size_t BufferInputStream::size() const { - size_t total = 0; - for (const auto& range : ranges_) { - total += range.size; - } - return total; -} - -size_t BufferInputStream::remainingSize() const { - if (ranges_.empty()) { - return 0; - } - const auto* lastRange = &ranges_.back(); - auto* cur = current_; - size_t remainingBytes = cur->availableBytes(); - while (++cur <= lastRange) { - remainingBytes += cur->size; - } - return remainingBytes; -} - -std::streampos BufferInputStream::tellp() const { - if (ranges_.empty()) { - return 0; - } - assert(current_); - int64_t size = 0; - for (auto& range : ranges_) { - if (&range == current_) { - return current_->position + size; - } - size += range.size; - } - VELOX_FAIL("BufferInputStream 'current_' is not in 'ranges_'."); -} - -void BufferInputStream::seekp(std::streampos position) { - if (ranges_.empty() && position == 0) { - return; - } - int64_t toSkip = position; - for (auto& range : ranges_) { - if (toSkip <= range.size) { - current_ = ⦥ - current_->position = toSkip; - return; - } - toSkip -= range.size; - } - static_assert(sizeof(std::streamsize) <= sizeof(long long)); - VELOX_FAIL( - "Seeking past end of BufferInputStream: {}", - static_cast(position)); -} - -void BufferInputStream::nextRange() { - VELOX_CHECK(current_ >= &ranges_[0]); - const size_t rangeIndex = current_ - &ranges_[0]; - VELOX_CHECK_LT( - rangeIndex + 1, ranges_.size(), "Reading past end of BufferInputStream"); - ++current_; - current_->position = 0; -} - -uint8_t BufferInputStream::readByte() { - if (current_->position < current_->size) { - return current_->buffer[current_->position++]; - } - nextRange(); - return readByte(); -} - -void BufferInputStream::readBytes(uint8_t* bytes, int32_t size) { - VELOX_CHECK_GE(size, 0, "Attempting to read negative number of bytes"); - int32_t offset = 0; - for (;;) { - const int32_t availableBytes = current_->size - current_->position; - const int32_t readBytes = std::min(availableBytes, size); - simd::memcpy( - bytes + offset, current_->buffer + current_->position, readBytes); - offset += readBytes; - size -= readBytes; - current_->position += readBytes; - if (size == 0) { - return; - } - nextRange(); - } -} - -std::string_view BufferInputStream::nextView(int32_t size) { - VELOX_CHECK_GE(size, 0, "Attempting to view negative number of bytes"); - if (current_->position == current_->size) { - if (current_ == &ranges_.back()) { - return std::string_view(nullptr, 0); - } - nextRange(); - } - VELOX_CHECK_GT(current_->size, 0); - const auto position = current_->position; - const auto viewSize = std::min(current_->size - current_->position, size); - current_->position += viewSize; - return std::string_view( - reinterpret_cast(current_->buffer) + position, viewSize); -} - -void BufferInputStream::skip(int32_t size) { - VELOX_CHECK_GE(size, 0, "Attempting to skip negative number of bytes"); - for (;;) { - const int32_t numSkipped = - std::min(current_->availableBytes(), size); - size -= numSkipped; - current_->position += numSkipped; - if (size == 0) { - return; - } - nextRange(); - } -} - size_t ByteOutputStream::size() const { if (ranges_.empty()) { return 0; @@ -400,59 +246,4 @@ std::string ByteOutputStream::toString() const { return oss.str(); } -namespace { -// The user data structure passed to folly iobuf for buffer ownership handling. -struct FreeData { - std::shared_ptr arena; - std::function releaseFn; -}; - -FreeData* newFreeData( - const std::shared_ptr& arena, - const std::function& releaseFn) { - auto freeData = new FreeData(); - freeData->arena = arena; - freeData->releaseFn = releaseFn; - return freeData; -} - -void freeFunc(void* /*data*/, void* userData) { - auto* freeData = reinterpret_cast(userData); - freeData->arena.reset(); - if (freeData->releaseFn != nullptr) { - freeData->releaseFn(); - } - delete freeData; -} -} // namespace - -std::unique_ptr IOBufOutputStream::getIOBuf( - const std::function& releaseFn) { - // Make an IOBuf for each range. The IOBufs keep shared ownership of - // 'arena_'. - std::unique_ptr iobuf; - auto& ranges = out_->ranges(); - for (auto& range : ranges) { - auto numValues = - &range == &ranges.back() ? out_->lastRangeEnd() : range.size; - auto userData = newFreeData(arena_, releaseFn); - auto newBuf = folly::IOBuf::takeOwnership( - reinterpret_cast(range.buffer), numValues, freeFunc, userData); - if (iobuf) { - iobuf->prev()->appendChain(std::move(newBuf)); - } else { - iobuf = std::move(newBuf); - } - } - return iobuf; -} - -std::streampos IOBufOutputStream::tellp() const { - return out_->tellp(); -} - -void IOBufOutputStream::seekp(std::streampos pos) { - out_->seekp(pos); -} - } // namespace facebook::velox diff --git a/velox/common/memory/ByteStream.h b/velox/common/base/ByteOutputStream.h similarity index 57% rename from velox/common/memory/ByteStream.h rename to velox/common/base/ByteOutputStream.h index 040aa23bccb0..c364e8f4d0de 100644 --- a/velox/common/memory/ByteStream.h +++ b/velox/common/base/ByteOutputStream.h @@ -15,205 +15,12 @@ */ #pragma once -#include -#include "velox/common/base/Scratch.h" +#include "velox/common/memory/BufferInputStream.h" #include "velox/common/memory/StreamArena.h" -#include "velox/type/Type.h" - -#include -#include namespace facebook::velox { -struct ByteRange { - /// Start of buffer. Not owned. - uint8_t* buffer; - - /// Number of bytes or bits starting at 'buffer'. - int32_t size; - - /// Index of next byte/bit to be read/written in 'buffer'. - int32_t position; - - /// Returns the available bytes left in this range. - uint32_t availableBytes() const; - - std::string toString() const; -}; - -class OutputStreamListener { - public: - virtual void onWrite(const char* /* s */, std::streamsize /* count */) {} - virtual ~OutputStreamListener() = default; -}; - -class OutputStream { - public: - explicit OutputStream(OutputStreamListener* listener = nullptr) - : listener_(listener) {} - - virtual ~OutputStream() = default; - - virtual void write(const char* s, std::streamsize count) = 0; - - virtual std::streampos tellp() const = 0; - - virtual void seekp(std::streampos pos) = 0; - - OutputStreamListener* listener() const { - return listener_; - } - - protected: - OutputStreamListener* listener_; -}; - -class OStreamOutputStream : public OutputStream { - public: - explicit OStreamOutputStream( - std::ostream* out, - OutputStreamListener* listener = nullptr) - : OutputStream(listener), out_(out) {} - - void write(const char* s, std::streamsize count) override { - out_->write(s, count); - if (listener_) { - listener_->onWrite(s, count); - } - } - - std::streampos tellp() const override { - return out_->tellp(); - } - - void seekp(std::streampos pos) override { - out_->seekp(pos); - } - - private: - std::ostream* out_; -}; - -/// Read-only byte input stream interface. -class ByteInputStream { - public: - virtual ~ByteInputStream() = default; - - /// Returns total number of bytes available in the stream. - virtual size_t size() const = 0; - - /// Returns true if all input has been read. - virtual bool atEnd() const = 0; - - /// Returns current position (number of bytes from the start) in the stream. - virtual std::streampos tellp() const = 0; - - /// Moves current position to specified one. - virtual void seekp(std::streampos pos) = 0; - - /// Returns the remaining size left from current reading position. - virtual size_t remainingSize() const = 0; - - virtual uint8_t readByte() = 0; - - virtual void readBytes(uint8_t* bytes, int32_t size) = 0; - - template - T read() { - if (current_->position + sizeof(T) <= current_->size) { - current_->position += sizeof(T); - return *reinterpret_cast( - current_->buffer + current_->position - sizeof(T)); - } - // The number straddles two buffers. We read byte by byte and make a - // little-endian uint64_t. The bytes can be cast to any integer or floating - // point type since the wire format has the machine byte order. - static_assert(sizeof(T) <= sizeof(uint64_t)); - uint64_t value = 0; - for (int32_t i = 0; i < sizeof(T); ++i) { - value |= static_cast(readByte()) << (i * 8); - } - return *reinterpret_cast(&value); - } - - template - void readBytes(Char* data, int32_t size) { - readBytes(reinterpret_cast(data), size); - } - - /// Returns a view over the read buffer for up to 'size' next bytes. The size - /// of the value may be less if the current byte range ends within 'size' - /// bytes from the current position. The size will be 0 if at end. - virtual std::string_view nextView(int32_t size) = 0; - - virtual void skip(int32_t size) = 0; - - virtual std::string toString() const = 0; - - protected: - // Points to the current buffered byte range. - ByteRange* current_{nullptr}; - std::vector ranges_; -}; - -/// Read-only input stream backed by a set of buffers. -class BufferInputStream : public ByteInputStream { - public: - explicit BufferInputStream(std::vector ranges) { - VELOX_CHECK(!ranges.empty(), "Empty BufferInputStream"); - ranges_ = std::move(ranges); - current_ = &ranges_[0]; - } - - BufferInputStream(const BufferInputStream&) = delete; - BufferInputStream& operator=(const BufferInputStream& other) = delete; - BufferInputStream(BufferInputStream&& other) noexcept = delete; - BufferInputStream& operator=(BufferInputStream&& other) noexcept = delete; - - size_t size() const override; - - bool atEnd() const override; - - std::streampos tellp() const override; - - void seekp(std::streampos pos) override; - - size_t remainingSize() const override; - - uint8_t readByte() override; - - void readBytes(uint8_t* bytes, int32_t size) override; - - std::string_view nextView(int32_t size) override; - - void skip(int32_t size) override; - - std::string toString() const override; - - private: - // Sets 'current_' to the next range of input. The input is consecutive - // ByteRanges in 'ranges_' for the base class but any view over external - // buffers can be made by specialization. - void nextRange(); - - const std::vector& ranges() const { - return ranges_; - } -}; - -template <> -inline Timestamp ByteInputStream::read() { - Timestamp value; - readBytes(reinterpret_cast(&value), sizeof(value)); - return value; -} - -template <> -inline int128_t ByteInputStream::read() { - int128_t value; - readBytes(reinterpret_cast(&value), sizeof(value)); - return value; -} +class OutputStream; /// Stream over a chain of ByteRanges. Provides read, write and /// comparison for equality between stream contents and memory. Used @@ -455,36 +262,4 @@ class AppendWindow { ScratchPtr scratchPtr_; }; -class IOBufOutputStream : public OutputStream { - public: - explicit IOBufOutputStream( - memory::MemoryPool& pool, - OutputStreamListener* listener = nullptr, - int32_t initialSize = memory::AllocationTraits::kPageSize) - : OutputStream(listener), - arena_(std::make_shared(&pool)), - out_(std::make_unique(arena_.get())) { - out_->startWrite(initialSize); - } - - void write(const char* s, std::streamsize count) override { - out_->appendStringView(std::string_view(s, count)); - if (listener_) { - listener_->onWrite(s, count); - } - } - - std::streampos tellp() const override; - - void seekp(std::streampos pos) override; - - /// 'releaseFn' is executed on iobuf destruction if not null. - std::unique_ptr getIOBuf( - const std::function& releaseFn = nullptr); - - private: - std::shared_ptr arena_; - std::unique_ptr out_; -}; - } // namespace facebook::velox diff --git a/velox/common/base/CMakeLists.txt b/velox/common/base/CMakeLists.txt index 6c26222ef56c..2ac0927e450b 100644 --- a/velox/common/base/CMakeLists.txt +++ b/velox/common/base/CMakeLists.txt @@ -26,6 +26,8 @@ velox_link_libraries( velox_add_library( velox_common_base BitUtil.cpp + ByteInputStream.cpp + ByteOutputStream.cpp Counters.cpp Fs.cpp PeriodicStatsReporter.cpp diff --git a/velox/common/file/FileInputStream.h b/velox/common/file/FileInputStream.h index 6daf9f84e109..693ebd0ba7fe 100644 --- a/velox/common/file/FileInputStream.h +++ b/velox/common/file/FileInputStream.h @@ -19,8 +19,8 @@ #include #include "velox/buffer/Buffer.h" +#include "velox/common/base/ByteInputStream.h" #include "velox/common/file/File.h" -#include "velox/common/memory/ByteStream.h" namespace facebook::velox::common { diff --git a/velox/common/file/tests/FileInputStreamTest.cpp b/velox/common/file/tests/FileInputStreamTest.cpp index 18ab5733900e..4b62a2de25db 100644 --- a/velox/common/file/tests/FileInputStreamTest.cpp +++ b/velox/common/file/tests/FileInputStreamTest.cpp @@ -13,8 +13,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -#include "velox/common/memory/ByteStream.h" - #include "velox/common/base/BitUtil.h" #include "velox/common/file/FileInputStream.h" #include "velox/common/file/FileSystems.h" diff --git a/velox/common/memory/BufferInputStream.cpp b/velox/common/memory/BufferInputStream.cpp new file mode 100644 index 000000000000..211f6bc7f0b3 --- /dev/null +++ b/velox/common/memory/BufferInputStream.cpp @@ -0,0 +1,167 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/common/memory/BufferInputStream.h" + +namespace facebook::velox { + +std::string BufferInputStream::toString() const { + std::stringstream oss; + oss << ranges_.size() << " ranges (position/size) ["; + for (const auto& range : ranges_) { + oss << "(" << range.position << "/" << range.size + << (&range == current_ ? " current" : "") << ")"; + if (&range != &ranges_.back()) { + oss << ","; + } + } + oss << "]"; + return oss.str(); +} + +bool BufferInputStream::atEnd() const { + if (current_ == nullptr) { + return false; + } + if (current_->position < current_->size) { + return false; + } + + VELOX_CHECK(current_ >= ranges_.data() && current_ <= &ranges_.back()); + return current_ == &ranges_.back(); +} + +size_t BufferInputStream::size() const { + size_t total = 0; + for (const auto& range : ranges_) { + total += range.size; + } + return total; +} + +size_t BufferInputStream::remainingSize() const { + if (ranges_.empty()) { + return 0; + } + const auto* lastRange = &ranges_.back(); + auto* cur = current_; + size_t remainingBytes = cur->availableBytes(); + while (++cur <= lastRange) { + remainingBytes += cur->size; + } + return remainingBytes; +} + +std::streampos BufferInputStream::tellp() const { + if (ranges_.empty()) { + return 0; + } + assert(current_); + int64_t size = 0; + for (auto& range : ranges_) { + if (&range == current_) { + return current_->position + size; + } + size += range.size; + } + VELOX_FAIL("BufferInputStream 'current_' is not in 'ranges_'."); +} + +void BufferInputStream::seekp(std::streampos position) { + if (ranges_.empty() && position == 0) { + return; + } + int64_t toSkip = position; + for (auto& range : ranges_) { + if (toSkip <= range.size) { + current_ = ⦥ + current_->position = toSkip; + return; + } + toSkip -= range.size; + } + static_assert(sizeof(std::streamsize) <= sizeof(long long)); + VELOX_FAIL( + "Seeking past end of BufferInputStream: {}", + static_cast(position)); +} + +void BufferInputStream::nextRange() { + VELOX_CHECK(current_ >= &ranges_[0]); + const size_t rangeIndex = current_ - &ranges_[0]; + VELOX_CHECK_LT( + rangeIndex + 1, ranges_.size(), "Reading past end of BufferInputStream"); + ++current_; + current_->position = 0; +} + +uint8_t BufferInputStream::readByte() { + if (current_->position < current_->size) { + return current_->buffer[current_->position++]; + } + nextRange(); + return readByte(); +} + +void BufferInputStream::readBytes(uint8_t* bytes, int32_t size) { + VELOX_CHECK_GE(size, 0, "Attempting to read negative number of bytes"); + int32_t offset = 0; + for (;;) { + const int32_t availableBytes = current_->size - current_->position; + const int32_t readBytes = std::min(availableBytes, size); + simd::memcpy( + bytes + offset, current_->buffer + current_->position, readBytes); + offset += readBytes; + size -= readBytes; + current_->position += readBytes; + if (size == 0) { + return; + } + nextRange(); + } +} + +std::string_view BufferInputStream::nextView(int32_t size) { + VELOX_CHECK_GE(size, 0, "Attempting to view negative number of bytes"); + if (current_->position == current_->size) { + if (current_ == &ranges_.back()) { + return std::string_view(nullptr, 0); + } + nextRange(); + } + VELOX_CHECK_GT(current_->size, 0); + const auto position = current_->position; + const auto viewSize = std::min(current_->size - current_->position, size); + current_->position += viewSize; + return std::string_view( + reinterpret_cast(current_->buffer) + position, viewSize); +} + +void BufferInputStream::skip(int32_t size) { + VELOX_CHECK_GE(size, 0, "Attempting to skip negative number of bytes"); + for (;;) { + const int32_t numSkipped = + std::min(current_->availableBytes(), size); + size -= numSkipped; + current_->position += numSkipped; + if (size == 0) { + return; + } + nextRange(); + } +} + +} // namespace facebook::velox diff --git a/velox/common/memory/BufferInputStream.h b/velox/common/memory/BufferInputStream.h new file mode 100644 index 000000000000..e2b8547126f4 --- /dev/null +++ b/velox/common/memory/BufferInputStream.h @@ -0,0 +1,70 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include "velox/common/base/ByteInputStream.h" +#include "velox/common/base/Scratch.h" + +#include + +namespace facebook::velox { + +/// Read-only input stream backed by a set of buffers. +class BufferInputStream : public ByteInputStream { + public: + explicit BufferInputStream(std::vector ranges) { + VELOX_CHECK(!ranges.empty(), "Empty BufferInputStream"); + ranges_ = std::move(ranges); + current_ = &ranges_[0]; + } + + BufferInputStream(const BufferInputStream&) = delete; + BufferInputStream& operator=(const BufferInputStream& other) = delete; + BufferInputStream(BufferInputStream&& other) noexcept = delete; + BufferInputStream& operator=(BufferInputStream&& other) noexcept = delete; + + size_t size() const override; + + bool atEnd() const override; + + std::streampos tellp() const override; + + void seekp(std::streampos pos) override; + + size_t remainingSize() const override; + + uint8_t readByte() override; + + void readBytes(uint8_t* bytes, int32_t size) override; + + std::string_view nextView(int32_t size) override; + + void skip(int32_t size) override; + + std::string toString() const override; + + private: + // Sets 'current_' to the next range of input. The input is consecutive + // ByteRanges in 'ranges_' for the base class but any view over external + // buffers can be made by specialization. + void nextRange(); + + const std::vector& ranges() const { + return ranges_; + } +}; + +} // namespace facebook::velox diff --git a/velox/common/memory/CMakeLists.txt b/velox/common/memory/CMakeLists.txt index 2bb6e2d2d154..dfd2e2c46831 100644 --- a/velox/common/memory/CMakeLists.txt +++ b/velox/common/memory/CMakeLists.txt @@ -19,7 +19,7 @@ velox_add_library( velox_memory Allocation.cpp AllocationPool.cpp - ByteStream.cpp + BufferInputStream.cpp HashStringAllocator.cpp MallocAllocator.cpp Memory.cpp @@ -28,6 +28,7 @@ velox_add_library( MemoryPool.cpp MmapAllocator.cpp MmapArena.cpp + OutputStream.cpp SharedArbitrator.cpp StreamArena.cpp) diff --git a/velox/common/memory/HashStringAllocator.h b/velox/common/memory/HashStringAllocator.h index 4604b7565c4f..d46ade72b54c 100644 --- a/velox/common/memory/HashStringAllocator.h +++ b/velox/common/memory/HashStringAllocator.h @@ -15,9 +15,9 @@ */ #pragma once +#include "velox/common/base/ByteOutputStream.h" #include "velox/common/base/CheckedArithmetic.h" #include "velox/common/memory/AllocationPool.h" -#include "velox/common/memory/ByteStream.h" #include "velox/common/memory/CompactDoubleList.h" #include "velox/common/memory/Memory.h" #include "velox/common/memory/StreamArena.h" diff --git a/velox/common/memory/OutputStream.cpp b/velox/common/memory/OutputStream.cpp new file mode 100644 index 000000000000..556b60cb9491 --- /dev/null +++ b/velox/common/memory/OutputStream.cpp @@ -0,0 +1,76 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/common/memory/OutputStream.h" + +namespace facebook::velox { + +namespace { +// The user data structure passed to folly iobuf for buffer ownership handling. +struct FreeData { + std::shared_ptr arena; + std::function releaseFn; +}; + +FreeData* newFreeData( + const std::shared_ptr& arena, + const std::function& releaseFn) { + auto freeData = new FreeData(); + freeData->arena = arena; + freeData->releaseFn = releaseFn; + return freeData; +} + +void freeFunc(void* /*data*/, void* userData) { + auto* freeData = reinterpret_cast(userData); + freeData->arena.reset(); + if (freeData->releaseFn != nullptr) { + freeData->releaseFn(); + } + delete freeData; +} +} // namespace + +std::unique_ptr IOBufOutputStream::getIOBuf( + const std::function& releaseFn) { + // Make an IOBuf for each range. The IOBufs keep shared ownership of + // 'arena_'. + std::unique_ptr iobuf; + auto& ranges = out_->ranges(); + for (auto& range : ranges) { + auto numValues = + &range == &ranges.back() ? out_->lastRangeEnd() : range.size; + auto userData = newFreeData(arena_, releaseFn); + auto newBuf = folly::IOBuf::takeOwnership( + reinterpret_cast(range.buffer), numValues, freeFunc, userData); + if (iobuf) { + iobuf->prev()->appendChain(std::move(newBuf)); + } else { + iobuf = std::move(newBuf); + } + } + return iobuf; +} + +std::streampos IOBufOutputStream::tellp() const { + return out_->tellp(); +} + +void IOBufOutputStream::seekp(std::streampos pos) { + out_->seekp(pos); +} + +} // namespace facebook::velox diff --git a/velox/common/memory/OutputStream.h b/velox/common/memory/OutputStream.h new file mode 100644 index 000000000000..fd3200ad0c2d --- /dev/null +++ b/velox/common/memory/OutputStream.h @@ -0,0 +1,110 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include "velox/common/base/ByteOutputStream.h" + +#include +#include + +namespace facebook::velox { + +class OutputStreamListener { + public: + virtual void onWrite(const char* /* s */, std::streamsize /* count */) {} + virtual ~OutputStreamListener() = default; +}; + +class OutputStream { + public: + explicit OutputStream(OutputStreamListener* listener = nullptr) + : listener_(listener) {} + + virtual ~OutputStream() = default; + + virtual void write(const char* s, std::streamsize count) = 0; + + virtual std::streampos tellp() const = 0; + + virtual void seekp(std::streampos pos) = 0; + + OutputStreamListener* listener() const { + return listener_; + } + + protected: + OutputStreamListener* listener_; +}; + +class OStreamOutputStream : public OutputStream { + public: + explicit OStreamOutputStream( + std::ostream* out, + OutputStreamListener* listener = nullptr) + : OutputStream(listener), out_(out) {} + + void write(const char* s, std::streamsize count) override { + out_->write(s, count); + if (listener_) { + listener_->onWrite(s, count); + } + } + + std::streampos tellp() const override { + return out_->tellp(); + } + + void seekp(std::streampos pos) override { + out_->seekp(pos); + } + + private: + std::ostream* out_; +}; + +class IOBufOutputStream : public OutputStream { + public: + explicit IOBufOutputStream( + memory::MemoryPool& pool, + OutputStreamListener* listener = nullptr, + int32_t initialSize = memory::AllocationTraits::kPageSize) + : OutputStream(listener), + arena_(std::make_shared(&pool)), + out_(std::make_unique(arena_.get())) { + out_->startWrite(initialSize); + } + + void write(const char* s, std::streamsize count) override { + out_->appendStringView(std::string_view(s, count)); + if (listener_) { + listener_->onWrite(s, count); + } + } + + std::streampos tellp() const override; + + void seekp(std::streampos pos) override; + + /// 'releaseFn' is executed on iobuf destruction if not null. + std::unique_ptr getIOBuf( + const std::function& releaseFn = nullptr); + + private: + std::shared_ptr arena_; + std::unique_ptr out_; +}; + +} // namespace facebook::velox diff --git a/velox/common/memory/StreamArena.cpp b/velox/common/memory/StreamArena.cpp index 1153afbb8cf6..d92e08a09198 100644 --- a/velox/common/memory/StreamArena.cpp +++ b/velox/common/memory/StreamArena.cpp @@ -14,7 +14,7 @@ * limitations under the License. */ #include "velox/common/memory/StreamArena.h" -#include "velox/common/memory/ByteStream.h" +#include "velox/common/memory/OutputStream.h" namespace facebook::velox { diff --git a/velox/common/memory/tests/ByteStreamTest.cpp b/velox/common/memory/tests/ByteStreamTest.cpp index ec9e43941b32..e40bd4e7f4bd 100644 --- a/velox/common/memory/tests/ByteStreamTest.cpp +++ b/velox/common/memory/tests/ByteStreamTest.cpp @@ -13,12 +13,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -#include "velox/common/memory/ByteStream.h" - #include "velox/common/base/tests/GTestUtils.h" #include "velox/common/file/FileInputStream.h" #include "velox/common/file/FileSystems.h" +#include "velox/common/memory/BufferInputStream.h" #include "velox/common/memory/MmapAllocator.h" +#include "velox/common/memory/OutputStream.h" #include "velox/exec/tests/utils/TempDirectoryPath.h" #include diff --git a/velox/common/memory/tests/StreamArenaTest.cpp b/velox/common/memory/tests/StreamArenaTest.cpp index 3cafe734a320..971c7d6090b8 100644 --- a/velox/common/memory/tests/StreamArenaTest.cpp +++ b/velox/common/memory/tests/StreamArenaTest.cpp @@ -14,9 +14,9 @@ * limitations under the License. */ #include "velox/common/base/tests/GTestUtils.h" -#include "velox/common/memory/ByteStream.h" #include "velox/common/memory/MemoryAllocator.h" #include "velox/common/memory/MmapAllocator.h" +#include "velox/common/memory/OutputStream.h" #include #include diff --git a/velox/exec/ContainerRowSerde.h b/velox/exec/ContainerRowSerde.h index c3b139b426d4..b6e5d060046b 100644 --- a/velox/exec/ContainerRowSerde.h +++ b/velox/exec/ContainerRowSerde.h @@ -15,7 +15,7 @@ */ #pragma once -#include "velox/common/memory/ByteStream.h" +#include "velox/common/memory/OutputStream.h" #include "velox/vector/BaseVector.h" #include "velox/vector/DecodedVector.h" diff --git a/velox/exec/ExchangeQueue.h b/velox/exec/ExchangeQueue.h index 91e3a663aa06..6ac635a25722 100644 --- a/velox/exec/ExchangeQueue.h +++ b/velox/exec/ExchangeQueue.h @@ -15,7 +15,8 @@ */ #pragma once -#include "velox/common/memory/ByteStream.h" +#include "velox/common/memory/BufferInputStream.h" +#include "velox/common/memory/OutputStream.h" namespace facebook::velox::exec { diff --git a/velox/serializers/PrestoSerializer.cpp b/velox/serializers/PrestoSerializer.cpp index 4036e4791266..1c7717851a28 100644 --- a/velox/serializers/PrestoSerializer.cpp +++ b/velox/serializers/PrestoSerializer.cpp @@ -21,7 +21,7 @@ #include "velox/common/base/Crc.h" #include "velox/common/base/RawVector.h" -#include "velox/common/memory/ByteStream.h" +#include "velox/common/memory/OutputStream.h" #include "velox/vector/BiasVector.h" #include "velox/vector/ComplexVector.h" #include "velox/vector/DictionaryVector.h" diff --git a/velox/serializers/tests/PrestoSerializerTest.cpp b/velox/serializers/tests/PrestoSerializerTest.cpp index 9f4817a00c76..5b7b7de4a10c 100644 --- a/velox/serializers/tests/PrestoSerializerTest.cpp +++ b/velox/serializers/tests/PrestoSerializerTest.cpp @@ -18,7 +18,6 @@ #include #include #include "velox/common/base/tests/GTestUtils.h" -#include "velox/common/memory/ByteStream.h" #include "velox/common/time/Timer.h" #include "velox/functions/prestosql/types/TimestampWithTimeZoneType.h" #include "velox/vector/fuzzer/VectorFuzzer.h" diff --git a/velox/serializers/tests/SerializerBenchmark.cpp b/velox/serializers/tests/SerializerBenchmark.cpp index d6aeb1f3add7..5dce2d35998c 100644 --- a/velox/serializers/tests/SerializerBenchmark.cpp +++ b/velox/serializers/tests/SerializerBenchmark.cpp @@ -17,7 +17,6 @@ #include #include "velox/common/base/tests/GTestUtils.h" -#include "velox/common/memory/ByteStream.h" #include "velox/common/time/Timer.h" #include "velox/functions/prestosql/types/TimestampWithTimeZoneType.h" #include "velox/serializers/PrestoSerializer.h" diff --git a/velox/vector/VectorStream.h b/velox/vector/VectorStream.h index b8304bdc2e28..90d068fa2418 100644 --- a/velox/vector/VectorStream.h +++ b/velox/vector/VectorStream.h @@ -20,9 +20,9 @@ #include "velox/common/base/RuntimeMetrics.h" #include "velox/common/base/Scratch.h" #include "velox/common/compression/Compression.h" -#include "velox/common/memory/ByteStream.h" #include "velox/common/memory/Memory.h" #include "velox/common/memory/MemoryAllocator.h" +#include "velox/common/memory/OutputStream.h" #include "velox/common/memory/StreamArena.h" #include "velox/vector/ComplexVector.h" diff --git a/velox/vector/tests/VectorTest.cpp b/velox/vector/tests/VectorTest.cpp index 88cbe5b842c0..b18ee90c02bb 100644 --- a/velox/vector/tests/VectorTest.cpp +++ b/velox/vector/tests/VectorTest.cpp @@ -22,7 +22,7 @@ #include #include "velox/common/base/tests/GTestUtils.h" -#include "velox/common/memory/ByteStream.h" +#include "velox/common/memory/OutputStream.h" #include "velox/serializers/PrestoSerializer.h" #include "velox/vector/BaseVector.h" #include "velox/vector/ComplexVector.h"