Skip to content

Commit

Permalink
ByteStream refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
yanngyoung committed Sep 10, 2024
1 parent 4744994 commit fb28be5
Show file tree
Hide file tree
Showing 23 changed files with 586 additions and 453 deletions.
29 changes: 29 additions & 0 deletions velox/common/base/ByteInputStream.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "velox/common/base/ByteInputStream.h"

namespace facebook::velox {

uint32_t ByteRange::availableBytes() const {
return std::max(0, size - position);
}

std::string ByteRange::toString() const {
return fmt::format("[{} starting at {}]", succinctBytes(size), position);
}

} // namespace facebook::velox
115 changes: 115 additions & 0 deletions velox/common/base/ByteInputStream.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once

#include "velox/common/memory/Memory.h"
#include "velox/type/Type.h"

namespace facebook::velox {

struct ByteRange {
/// Start of buffer. Not owned.
uint8_t* buffer;

/// Number of bytes or bits starting at 'buffer'.
int32_t size;

/// Index of next byte/bit to be read/written in 'buffer'.
int32_t position;

/// Returns the available bytes left in this range.
uint32_t availableBytes() const;

std::string toString() const;
};

/// Read-only byte input stream interface.
class ByteInputStream {
public:
virtual ~ByteInputStream() = default;

/// Returns total number of bytes available in the stream.
virtual size_t size() const = 0;

/// Returns true if all input has been read.
virtual bool atEnd() const = 0;

/// Returns current position (number of bytes from the start) in the stream.
virtual std::streampos tellp() const = 0;

/// Moves current position to specified one.
virtual void seekp(std::streampos pos) = 0;

/// Returns the remaining size left from current reading position.
virtual size_t remainingSize() const = 0;

virtual uint8_t readByte() = 0;

virtual void readBytes(uint8_t* bytes, int32_t size) = 0;

template <typename T>
T read() {
if (current_->position + sizeof(T) <= current_->size) {
current_->position += sizeof(T);
return *reinterpret_cast<const T*>(
current_->buffer + current_->position - sizeof(T));
}
// The number straddles two buffers. We read byte by byte and make a
// little-endian uint64_t. The bytes can be cast to any integer or floating
// point type since the wire format has the machine byte order.
static_assert(sizeof(T) <= sizeof(uint64_t));
uint64_t value = 0;
for (int32_t i = 0; i < sizeof(T); ++i) {
value |= static_cast<uint64_t>(readByte()) << (i * 8);
}
return *reinterpret_cast<const T*>(&value);
}

template <typename Char>
void readBytes(Char* data, int32_t size) {
readBytes(reinterpret_cast<uint8_t*>(data), size);
}

/// Returns a view over the read buffer for up to 'size' next bytes. The size
/// of the value may be less if the current byte range ends within 'size'
/// bytes from the current position. The size will be 0 if at end.
virtual std::string_view nextView(int32_t size) = 0;

virtual void skip(int32_t size) = 0;

virtual std::string toString() const = 0;

protected:
// Points to the current buffered byte range.
ByteRange* current_{nullptr};
std::vector<ByteRange> ranges_;
};

template <>
inline Timestamp ByteInputStream::read<Timestamp>() {
Timestamp value;
readBytes(reinterpret_cast<uint8_t*>(&value), sizeof(value));
return value;
}

template <>
inline int128_t ByteInputStream::read<int128_t>() {
int128_t value;
readBytes(reinterpret_cast<uint8_t*>(&value), sizeof(value));
return value;
}

} // namespace facebook::velox
Original file line number Diff line number Diff line change
Expand Up @@ -14,164 +14,10 @@
* limitations under the License.
*/

#include "velox/common/memory/ByteStream.h"
#include "velox/common/memory/OutputStream.h"

namespace facebook::velox {

uint32_t ByteRange::availableBytes() const {
return std::max(0, size - position);
}

std::string ByteRange::toString() const {
return fmt::format("[{} starting at {}]", succinctBytes(size), position);
}

std::string BufferInputStream::toString() const {
std::stringstream oss;
oss << ranges_.size() << " ranges (position/size) [";
for (const auto& range : ranges_) {
oss << "(" << range.position << "/" << range.size
<< (&range == current_ ? " current" : "") << ")";
if (&range != &ranges_.back()) {
oss << ",";
}
}
oss << "]";
return oss.str();
}

bool BufferInputStream::atEnd() const {
if (current_ == nullptr) {
return false;
}
if (current_->position < current_->size) {
return false;
}

VELOX_CHECK(current_ >= ranges_.data() && current_ <= &ranges_.back());
return current_ == &ranges_.back();
}

size_t BufferInputStream::size() const {
size_t total = 0;
for (const auto& range : ranges_) {
total += range.size;
}
return total;
}

size_t BufferInputStream::remainingSize() const {
if (ranges_.empty()) {
return 0;
}
const auto* lastRange = &ranges_.back();
auto* cur = current_;
size_t remainingBytes = cur->availableBytes();
while (++cur <= lastRange) {
remainingBytes += cur->size;
}
return remainingBytes;
}

std::streampos BufferInputStream::tellp() const {
if (ranges_.empty()) {
return 0;
}
assert(current_);
int64_t size = 0;
for (auto& range : ranges_) {
if (&range == current_) {
return current_->position + size;
}
size += range.size;
}
VELOX_FAIL("BufferInputStream 'current_' is not in 'ranges_'.");
}

void BufferInputStream::seekp(std::streampos position) {
if (ranges_.empty() && position == 0) {
return;
}
int64_t toSkip = position;
for (auto& range : ranges_) {
if (toSkip <= range.size) {
current_ = &range;
current_->position = toSkip;
return;
}
toSkip -= range.size;
}
static_assert(sizeof(std::streamsize) <= sizeof(long long));
VELOX_FAIL(
"Seeking past end of BufferInputStream: {}",
static_cast<long long>(position));
}

void BufferInputStream::nextRange() {
VELOX_CHECK(current_ >= &ranges_[0]);
const size_t rangeIndex = current_ - &ranges_[0];
VELOX_CHECK_LT(
rangeIndex + 1, ranges_.size(), "Reading past end of BufferInputStream");
++current_;
current_->position = 0;
}

uint8_t BufferInputStream::readByte() {
if (current_->position < current_->size) {
return current_->buffer[current_->position++];
}
nextRange();
return readByte();
}

void BufferInputStream::readBytes(uint8_t* bytes, int32_t size) {
VELOX_CHECK_GE(size, 0, "Attempting to read negative number of bytes");
int32_t offset = 0;
for (;;) {
const int32_t availableBytes = current_->size - current_->position;
const int32_t readBytes = std::min(availableBytes, size);
simd::memcpy(
bytes + offset, current_->buffer + current_->position, readBytes);
offset += readBytes;
size -= readBytes;
current_->position += readBytes;
if (size == 0) {
return;
}
nextRange();
}
}

std::string_view BufferInputStream::nextView(int32_t size) {
VELOX_CHECK_GE(size, 0, "Attempting to view negative number of bytes");
if (current_->position == current_->size) {
if (current_ == &ranges_.back()) {
return std::string_view(nullptr, 0);
}
nextRange();
}
VELOX_CHECK_GT(current_->size, 0);
const auto position = current_->position;
const auto viewSize = std::min(current_->size - current_->position, size);
current_->position += viewSize;
return std::string_view(
reinterpret_cast<char*>(current_->buffer) + position, viewSize);
}

void BufferInputStream::skip(int32_t size) {
VELOX_CHECK_GE(size, 0, "Attempting to skip negative number of bytes");
for (;;) {
const int32_t numSkipped =
std::min<int32_t>(current_->availableBytes(), size);
size -= numSkipped;
current_->position += numSkipped;
if (size == 0) {
return;
}
nextRange();
}
}

size_t ByteOutputStream::size() const {
if (ranges_.empty()) {
return 0;
Expand Down Expand Up @@ -400,59 +246,4 @@ std::string ByteOutputStream::toString() const {
return oss.str();
}

namespace {
// The user data structure passed to folly iobuf for buffer ownership handling.
struct FreeData {
std::shared_ptr<StreamArena> arena;
std::function<void()> releaseFn;
};

FreeData* newFreeData(
const std::shared_ptr<StreamArena>& arena,
const std::function<void()>& releaseFn) {
auto freeData = new FreeData();
freeData->arena = arena;
freeData->releaseFn = releaseFn;
return freeData;
}

void freeFunc(void* /*data*/, void* userData) {
auto* freeData = reinterpret_cast<FreeData*>(userData);
freeData->arena.reset();
if (freeData->releaseFn != nullptr) {
freeData->releaseFn();
}
delete freeData;
}
} // namespace

std::unique_ptr<folly::IOBuf> IOBufOutputStream::getIOBuf(
const std::function<void()>& releaseFn) {
// Make an IOBuf for each range. The IOBufs keep shared ownership of
// 'arena_'.
std::unique_ptr<folly::IOBuf> iobuf;
auto& ranges = out_->ranges();
for (auto& range : ranges) {
auto numValues =
&range == &ranges.back() ? out_->lastRangeEnd() : range.size;
auto userData = newFreeData(arena_, releaseFn);
auto newBuf = folly::IOBuf::takeOwnership(
reinterpret_cast<char*>(range.buffer), numValues, freeFunc, userData);
if (iobuf) {
iobuf->prev()->appendChain(std::move(newBuf));
} else {
iobuf = std::move(newBuf);
}
}
return iobuf;
}

std::streampos IOBufOutputStream::tellp() const {
return out_->tellp();
}

void IOBufOutputStream::seekp(std::streampos pos) {
out_->seekp(pos);
}

} // namespace facebook::velox
Loading

0 comments on commit fb28be5

Please sign in to comment.