From 87ad96895ec1095205280d2dc1223b9d2670cb45 Mon Sep 17 00:00:00 2001 From: Chengcheng Jin Date: Mon, 20 Jan 2025 15:48:11 +0000 Subject: [PATCH] feat: Optimize PrestoSerializer compress buffer --- velox/common/memory/ByteStream.cpp | 25 +++++++++++++ velox/common/memory/ByteStream.h | 2 ++ velox/common/memory/tests/ByteStreamTest.cpp | 37 +++++++++++++++++--- velox/serializers/PrestoSerializer.cpp | 26 ++++++++++---- velox/serializers/RowSerializer.h | 28 ++++++++++----- 5 files changed, 99 insertions(+), 19 deletions(-) diff --git a/velox/common/memory/ByteStream.cpp b/velox/common/memory/ByteStream.cpp index 18168c881b3c..13abfe6977e2 100644 --- a/velox/common/memory/ByteStream.cpp +++ b/velox/common/memory/ByteStream.cpp @@ -172,6 +172,31 @@ std::string_view BufferInputStream::nextView(int32_t size) { reinterpret_cast(current_->buffer) + position, viewSize); } +std::unique_ptr BufferInputStream::readBytes(int32_t size) { + VELOX_CHECK_GE(size, 0, "Attempting to read negative number of bytes"); + if (size == 0) { + return nullptr; + } + std::unique_ptr result; + for (;;) { + const int32_t availableBytes = current_->size - current_->position; + const int32_t readBytes = std::min(availableBytes, size); + auto newBuf = folly::IOBuf::wrapBuffer( + current_->buffer + current_->position, readBytes); + if (result) { + result->appendToChain(std::move(newBuf)); + } else { + result = std::move(newBuf); + } + size -= readBytes; + current_->position += readBytes; + if (size == 0) { + return result; + } + nextRange(); + } +} + void BufferInputStream::skip(int32_t size) { VELOX_CHECK_GE(size, 0, "Attempting to skip negative number of bytes"); for (;;) { diff --git a/velox/common/memory/ByteStream.h b/velox/common/memory/ByteStream.h index 370737288366..432e07a81e8d 100644 --- a/velox/common/memory/ByteStream.h +++ b/velox/common/memory/ByteStream.h @@ -253,6 +253,8 @@ class BufferInputStream : public ByteInputStream { void readBytes(uint8_t* bytes, int32_t size) override; + std::unique_ptr readBytes(int32_t size); + std::string_view nextView(int32_t size) override; void skip(int32_t size) override; diff --git a/velox/common/memory/tests/ByteStreamTest.cpp b/velox/common/memory/tests/ByteStreamTest.cpp index 72b7d095f55f..24856ee1ed75 100644 --- a/velox/common/memory/tests/ByteStreamTest.cpp +++ b/velox/common/memory/tests/ByteStreamTest.cpp @@ -497,12 +497,39 @@ TEST_P(InputByteStreamTest, inputStream) { ASSERT_TRUE(byteStream->atEnd()); } -TEST_P(InputByteStreamTest, emptyInputStreamError) { - if (GetParam()) { - VELOX_ASSERT_THROW(createStream({}), "Empty BufferInputStream"); - } else { - VELOX_ASSERT_THROW(createStream({}), "(0 vs. 0) Empty FileInputStream"); +TEST_P(InputByteStreamTest, iobuf) { + if (!GetParam()) { + return; + } + const auto streamSize = 4096; + std::vector byteRanges; + std::uint8_t buffer[streamSize]; + for (auto i = 0; i < streamSize; ++i) { + buffer[i] = i % 256; } + byteRanges.push_back(ByteRange{buffer, streamSize, 0}); + + std::uint8_t buffer2[streamSize]; + for (auto i = 0; i < streamSize; ++i) { + buffer2[i] = i % 13; + } + byteRanges.push_back(ByteRange{buffer2, streamSize, 0}); + + auto byteStream = createStream(byteRanges); + auto bufferStream = dynamic_cast(byteStream.get()); + for (int offset = 0; offset < streamSize * 2;) { + const int readBytes = std::min(streamSize / 11, streamSize * 2 - offset); + auto iobuf = bufferStream->readBytes(readBytes); + ASSERT_EQ(iobuf->computeChainDataLength(), readBytes); + for (int i = 0; i < readBytes; ++i, ++offset) { + if (offset < streamSize) { + ASSERT_EQ(iobuf->data()[i], offset % 256); + } else { + ASSERT_EQ(iobuf->data()[i], (offset - streamSize) % 13); + } + } + } + ASSERT_TRUE(byteStream->atEnd()); } TEST_P(InputByteStreamTest, remainingSize) { diff --git a/velox/serializers/PrestoSerializer.cpp b/velox/serializers/PrestoSerializer.cpp index 2217c2eed226..691532c60608 100644 --- a/velox/serializers/PrestoSerializer.cpp +++ b/velox/serializers/PrestoSerializer.cpp @@ -90,6 +90,25 @@ PrestoVectorSerde::PrestoOptions toPrestoOptions( prestoOptions, "Serde options are not Presto-compatible"); return *prestoOptions; } + +std::unique_ptr uncompressStream( + ByteInputStream* source, + const detail::PrestoHeader& header, + common::CompressionKind compressionKind) { + const auto codec = common::compressionKindToCodec(compressionKind); + if (auto* bufferSource = dynamic_cast(source)) { + // If the source is a BufferInputStream, we can avoid copying the data + // by creating an IOBuf from the underlying buffer. + const auto iobuf = bufferSource->readBytes(header.compressedSize); + // Process chained uncompressed results IOBufs. + return codec->uncompress(iobuf.get(), header.uncompressedSize); + } + auto compressBuf = folly::IOBuf::create(header.compressedSize); + source->readBytes(compressBuf->writableData(), header.compressedSize); + compressBuf->append(header.compressedSize); + // Process chained uncompressed results IOBufs. + return codec->uncompress(compressBuf.get(), header.uncompressedSize); +} } // namespace void PrestoVectorSerde::estimateSerializedSize( @@ -182,13 +201,8 @@ void PrestoVectorSerde::deserialize( detail::readTopColumns( *source, type, pool, *result, resultOffset, prestoOptions); } else { - auto compressBuf = folly::IOBuf::create(header.compressedSize); - source->readBytes(compressBuf->writableData(), header.compressedSize); - compressBuf->append(header.compressedSize); - - // Process chained uncompressed results IOBufs. auto uncompress = - codec->uncompress(compressBuf.get(), header.uncompressedSize); + uncompressStream(source, header, prestoOptions.compressionKind); auto uncompressedSource = std::make_unique( byteRangesFromIOBuf(uncompress.get())); detail::readTopColumns( diff --git a/velox/serializers/RowSerializer.h b/velox/serializers/RowSerializer.h index 9c4f1a78abbb..fcab8bccde2e 100644 --- a/velox/serializers/RowSerializer.h +++ b/velox/serializers/RowSerializer.h @@ -301,14 +301,7 @@ class RowDeserializer { if (header.compressed) { VELOX_DCHECK_NE( compressionKind, common::CompressionKind::CompressionKind_NONE); - auto compressBuf = folly::IOBuf::create(header.compressedSize); - source->readBytes(compressBuf->writableData(), header.compressedSize); - compressBuf->append(header.compressedSize); - - // Process chained uncompressed results IOBufs. - const auto codec = common::compressionKindToCodec(compressionKind); - uncompressedBuf = - codec->uncompress(compressBuf.get(), header.uncompressedSize); + uncompressedBuf = uncompressStream(source, header, compressionKind); } std::unique_ptr uncompressedStream; @@ -365,6 +358,25 @@ class RowDeserializer { rowBuffer.append(rowFragment.data(), rowFragment.size()); } } + + static std::unique_ptr uncompressStream( + ByteInputStream* source, + const detail::RowGroupHeader& header, + common::CompressionKind compressionKind) { + const auto codec = common::compressionKindToCodec(compressionKind); + if (auto* bufferSource = dynamic_cast(source)) { + // If the source is a BufferInputStream, we can avoid copying the data + // by creating an IOBuf from the underlying buffer. + const auto iobuf = bufferSource->readBytes(header.compressedSize); + // Process chained uncompressed results IOBufs. + return codec->uncompress(iobuf.get(), header.uncompressedSize); + } + auto compressBuf = folly::IOBuf::create(header.compressedSize); + source->readBytes(compressBuf->writableData(), header.compressedSize); + compressBuf->append(header.compressedSize); + // Process chained uncompressed results IOBufs. + return codec->uncompress(compressBuf.get(), header.uncompressedSize); + } }; } // namespace facebook::velox::serializer