Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Optimize PrestoSerializer decompress buffer #11836

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions velox/common/file/FileInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,26 @@ void FileInputStream::readBytes(uint8_t* bytes, int32_t size) {
}
}

std::unique_ptr<folly::IOBuf> FileInputStream::readBytes(int32_t size) {
VELOX_CHECK_GE(size, 0, "Attempting to read negative number of bytes");
if (size == 0) {
return nullptr;
}

VELOX_CHECK_LE(
size, remainingSize(), "Read past the end of input file {}", fileSize_);
if (current_->availableBytes() >= size) {
auto iobuf =
folly::IOBuf::wrapBuffer(current_->buffer + current_->position, size);
current_->position += size;
return iobuf;
}
auto iobuf = folly::IOBuf::create(size);
iobuf->append(size);
readBytes(iobuf->writableData(), size);
return iobuf;
}

std::string_view FileInputStream::nextView(int32_t size) {
VELOX_CHECK_GE(size, 0, "Attempting to view negative number of bytes");
if (remainingSize() == 0) {
Expand Down
6 changes: 6 additions & 0 deletions velox/common/file/FileInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,12 @@ class FileInputStream : public ByteInputStream {

void readBytes(uint8_t* bytes, int32_t size) override;

/// Returns the view if required bytes keeps in the current file buffer,
/// otherwise, copy the bytes to IOBuf. Note: Not to readBytes until the
/// returned IOBuf is useless. You may overwrite the data in IOBuf if you read
/// more bytes.
std::unique_ptr<folly::IOBuf> readBytes(int32_t size) override;

std::string_view nextView(int32_t size) override;

std::string toString() const override;
Expand Down
44 changes: 44 additions & 0 deletions velox/common/file/tests/FileInputStreamTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,3 +112,47 @@ TEST_F(FileInputStreamTest, stats) {
ASSERT_GT(byteStream->stats().readTimeNs, 0);
}
}

TEST_F(FileInputStreamTest, iobuf) {
struct {
size_t streamSize;
size_t bufferSize;

std::string debugString() const {
return fmt::format(
"streamSize {}, bufferSize {}", streamSize, bufferSize);
}
} testSettings[] = {
{4096, 1024}, {4096, 4096}, {4096, 8192}, {4096, 4096 + 1024}};

for (const auto& testData : testSettings) {
SCOPED_TRACE(testData.debugString());

auto byteStream = createStream(testData.streamSize, testData.bufferSize);
// Read iobuf from buffer.
int offset = 0;
auto iobuf = byteStream->readBytes(3);
ASSERT_EQ(iobuf->length(), 3);
for (; offset < 3; ++offset) {
ASSERT_EQ(iobuf->data()[offset], offset % 256);
}

// Read iobuf from buffer from position 3.
iobuf = byteStream->readBytes(3);
ASSERT_EQ(iobuf->length(), 3);
for (; offset < 6; ++offset) {
ASSERT_EQ(iobuf->data()[offset - 3], offset % 256);
}

// Read iobuf from next buffer.
while (offset < testData.streamSize) {
const auto readBytes =
std::min(testData.streamSize / 8, testData.streamSize - offset);
auto iobuf = byteStream->readBytes(readBytes);
for (int i = 0; i < readBytes; ++i, ++offset) {
ASSERT_EQ(iobuf->data()[i], offset % 256);
}
}
ASSERT_TRUE(byteStream->atEnd());
}
}
25 changes: 25 additions & 0 deletions velox/common/memory/ByteStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,31 @@ std::string_view BufferInputStream::nextView(int32_t size) {
reinterpret_cast<char*>(current_->buffer) + position, viewSize);
}

std::unique_ptr<folly::IOBuf> BufferInputStream::readBytes(int32_t size) {
VELOX_CHECK_GE(size, 0, "Attempting to read negative number of bytes");
if (size == 0) {
return nullptr;
}
std::unique_ptr<folly::IOBuf> result;
for (;;) {
const int32_t availableBytes = current_->size - current_->position;
const int32_t readBytes = std::min(availableBytes, size);
auto newBuf = folly::IOBuf::wrapBuffer(
current_->buffer + current_->position, readBytes);
if (result) {
result->appendToChain(std::move(newBuf));
} else {
result = std::move(newBuf);
}
size -= readBytes;
current_->position += readBytes;
if (size == 0) {
return result;
}
nextRange();
}
}

void BufferInputStream::skip(int32_t size) {
VELOX_CHECK_GE(size, 0, "Attempting to skip negative number of bytes");
for (;;) {
Expand Down
6 changes: 6 additions & 0 deletions velox/common/memory/ByteStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,8 @@ class ByteInputStream {
readBytes(reinterpret_cast<uint8_t*>(data), size);
}

virtual std::unique_ptr<folly::IOBuf> readBytes(int32_t size) = 0;

/// Returns a view over the read buffer for up to 'size' next bytes. The size
/// of the value may be less if the current byte range ends within 'size'
/// bytes from the current position. The size will be 0 if at end.
Expand Down Expand Up @@ -191,6 +193,10 @@ class BufferInputStream : public ByteInputStream {

void readBytes(uint8_t* bytes, int32_t size) override;

// We can avoid copying the data by creating an IOBuf from the underlying
// buffer.
std::unique_ptr<folly::IOBuf> readBytes(int32_t size) override;

std::string_view nextView(int32_t size) override;

void skip(int32_t size) override;
Expand Down
31 changes: 31 additions & 0 deletions velox/common/memory/tests/ByteStreamTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,37 @@ TEST_P(InputByteStreamTest, inputStream) {
ASSERT_TRUE(byteStream->atEnd());
}

TEST_P(InputByteStreamTest, iobuf) {
const auto streamSize = 4096;
std::vector<ByteRange> byteRanges;
std::uint8_t buffer[streamSize];
for (auto i = 0; i < streamSize; ++i) {
buffer[i] = i % 256;
}
byteRanges.push_back(ByteRange{buffer, streamSize, 0});

std::uint8_t buffer2[streamSize];
for (auto i = 0; i < streamSize; ++i) {
buffer2[i] = i % 13;
}
byteRanges.push_back(ByteRange{buffer2, streamSize, 0});

auto byteStream = createStream(byteRanges);
for (int offset = 0; offset < streamSize * 2;) {
const int readBytes = std::min(streamSize / 11, streamSize * 2 - offset);
auto iobuf = byteStream->readBytes(readBytes);
ASSERT_EQ(iobuf->computeChainDataLength(), readBytes);
for (int i = 0; i < readBytes; ++i, ++offset) {
if (offset < streamSize) {
ASSERT_EQ(iobuf->data()[i], offset % 256);
} else {
ASSERT_EQ(iobuf->data()[i], (offset - streamSize) % 13);
}
}
}
ASSERT_TRUE(byteStream->atEnd());
}

TEST_P(InputByteStreamTest, emptyInputStreamError) {
if (GetParam()) {
VELOX_ASSERT_THROW(createStream({}), "Empty BufferInputStream");
Expand Down
20 changes: 12 additions & 8 deletions velox/serializers/PrestoSerializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4359,6 +4359,17 @@ void readTopColumns(
readColumns(
&source, childTypes, resultOffset, nullptr, 0, pool, opts, children);
}

std::unique_ptr<folly::IOBuf> uncompressStream(
ByteInputStream* source,
const PrestoHeader& header,
common::CompressionKind compressionKind,
memory::MemoryPool& pool) {
const auto codec = common::compressionKindToCodec(compressionKind);
const auto iobuf = source->readBytes(header.compressedSize);
// Process chained uncompressed results IOBufs.
return codec->uncompress(iobuf.get(), header.uncompressedSize);
}
} // namespace

void PrestoVectorSerde::deserialize(
Expand All @@ -4369,8 +4380,6 @@ void PrestoVectorSerde::deserialize(
vector_size_t resultOffset,
const Options* options) {
const auto prestoOptions = toPrestoOptions(options);
const auto codec =
common::compressionKindToCodec(prestoOptions.compressionKind);
auto maybeHeader = PrestoHeader::read(source);
VELOX_CHECK(
maybeHeader.hasValue(),
Expand Down Expand Up @@ -4413,13 +4422,8 @@ void PrestoVectorSerde::deserialize(
if (!isCompressedBitSet(header.pageCodecMarker)) {
readTopColumns(*source, type, pool, *result, resultOffset, prestoOptions);
} else {
auto compressBuf = folly::IOBuf::create(header.compressedSize);
source->readBytes(compressBuf->writableData(), header.compressedSize);
compressBuf->append(header.compressedSize);

// Process chained uncompressed results IOBufs.
auto uncompress =
codec->uncompress(compressBuf.get(), header.uncompressedSize);
uncompressStream(source, header, prestoOptions.compressionKind, *pool);
auto uncompressedSource = std::make_unique<BufferInputStream>(
byteRangesFromIOBuf(uncompress.get()));
readTopColumns(
Expand Down
Loading