Skip to content

Commit

Permalink
feat: Optimize PrestoSerializer compress buffer
Browse files Browse the repository at this point in the history
  • Loading branch information
jinchengchenghh committed Dec 12, 2024
1 parent b44ffc9 commit 0a803d5
Showing 1 changed file with 26 additions and 8 deletions.
34 changes: 26 additions & 8 deletions velox/serializers/PrestoSerializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4359,6 +4359,31 @@ void readTopColumns(
readColumns(
&source, childTypes, resultOffset, nullptr, 0, pool, opts, children);
}

std::unique_ptr<folly::IOBuf> uncompressBuffer(
ByteInputStream* source,
const PrestoHeader& header,
common::CompressionKind compressionKind) {
const auto codec = common::compressionKindToCodec(compressionKind);
// auto compressBuf = folly::IOBuf::create(header.compressedSize);
std::unique_ptr<folly::IOBuf> iobuf;
int32_t readCount = 0;
while (readCount < header.compressedSize) {
const auto remaining = header.compressedSize - readCount;
auto view = source->nextView(remaining);
readCount += view.size();
auto newBuf = folly::IOBuf::wrapBuffer(view.data(), view.size());
if (iobuf) {
iobuf->prev()->appendChain(std::move(newBuf));
} else {
iobuf = std::move(newBuf);
}
}

VELOX_DCHECK_EQ(readCount, header.compressedSize);
// Process chained uncompressed results IOBufs.
return codec->uncompress(iobuf.get(), header.uncompressedSize);
}
} // namespace

void PrestoVectorSerde::deserialize(
Expand All @@ -4369,8 +4394,6 @@ void PrestoVectorSerde::deserialize(
vector_size_t resultOffset,
const Options* options) {
const auto prestoOptions = toPrestoOptions(options);
const auto codec =
common::compressionKindToCodec(prestoOptions.compressionKind);
auto maybeHeader = PrestoHeader::read(source);
VELOX_CHECK(
maybeHeader.hasValue(),
Expand Down Expand Up @@ -4413,13 +4436,8 @@ void PrestoVectorSerde::deserialize(
if (!isCompressedBitSet(header.pageCodecMarker)) {
readTopColumns(*source, type, pool, *result, resultOffset, prestoOptions);
} else {
auto compressBuf = folly::IOBuf::create(header.compressedSize);
source->readBytes(compressBuf->writableData(), header.compressedSize);
compressBuf->append(header.compressedSize);

// Process chained uncompressed results IOBufs.
auto uncompress =
codec->uncompress(compressBuf.get(), header.uncompressedSize);
uncompressBuffer(source, header, prestoOptions.compressionKind);
auto uncompressedSource = std::make_unique<BufferInputStream>(
byteRangesFromIOBuf(uncompress.get()));
readTopColumns(
Expand Down

0 comments on commit 0a803d5

Please sign in to comment.