Skip to content

Commit

Permalink
optimize buffer compress
Browse files Browse the repository at this point in the history
  • Loading branch information
jinchengchenghh committed Dec 12, 2024
1 parent e4e3b8f commit cdd52de
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 16 deletions.
27 changes: 19 additions & 8 deletions velox/serializers/CompactRowSerializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,21 @@ VectorSerde::Options toValidOptions(const VectorSerde::Options* options) {
return *options;
}

std::unique_ptr<folly::IOBuf> buffersToIOBuf(
const std::vector<BufferPtr>& buffers) {
std::unique_ptr<folly::IOBuf> iobuf;
for (const auto& buffer : buffers) {
auto newBuf =
folly::IOBuf::wrapBuffer(buffer->asMutable<char>(), buffer->size());
if (iobuf) {
iobuf->prev()->appendChain(std::move(newBuf));
} else {
iobuf = std::move(newBuf);
}
}
return iobuf;
}

struct CompactRowHeader {
int32_t uncompressedSize;
int32_t compressedSize;
Expand Down Expand Up @@ -181,8 +196,8 @@ class CompactRowVectorSerializer : public IterativeVectorSerializer {
return CompactRowHeader::size() + codec_->maxCompressedLength(size);
}

// The serialization format is | uncompressedSize | compressedSize | data |
// when compressed.
// The serialization format is | uncompressedSize | compressedSize |
// compressed | data | when compressed.
void flush(OutputStream* stream) override {
constexpr int32_t kMaxCompressionAttemptsToSkip = 30;
const auto size = uncompressedSize();
Expand All @@ -195,12 +210,8 @@ class CompactRowVectorSerializer : public IterativeVectorSerializer {
++stats_.numCompressionSkipped;
} else {
// Compress the buffer if satisfied condition.
IOBufOutputStream out(
*pool_, nullptr, buffers_.size() * sizeof(std::string_view));
for (const auto& buffer : buffers_) {
out.write(buffer->asMutable<char>(), buffer->size());
}
const auto compressedBuffer = codec_->compress(out.getIOBuf().get());
const auto toCompress = buffersToIOBuf(buffers_);
const auto compressedBuffer = codec_->compress(toCompress.get());
const auto compressedSize = compressedBuffer->length();
stats_.compressionInputBytes += size;
stats_.compressedBytes += compressedSize;
Expand Down
27 changes: 19 additions & 8 deletions velox/serializers/UnsafeRowSerializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,21 @@ VectorSerde::Options toValidOptions(const VectorSerde::Options* options) {
return *options;
}

std::unique_ptr<folly::IOBuf> buffersToIOBuf(
const std::vector<BufferPtr>& buffers) {
std::unique_ptr<folly::IOBuf> iobuf;
for (const auto& buffer : buffers) {
auto newBuf =
folly::IOBuf::wrapBuffer(buffer->asMutable<char>(), buffer->size());
if (iobuf) {
iobuf->prev()->appendChain(std::move(newBuf));
} else {
iobuf = std::move(newBuf);
}
}
return iobuf;
}

// The compressedSize is equal to uncompressedSize when not compressed.
struct UnsafeRowHeader {
int32_t uncompressedSize;
Expand Down Expand Up @@ -160,8 +175,8 @@ class UnsafeRowVectorSerializer : public IterativeVectorSerializer {
return UnsafeRowHeader::size() + codec_->maxCompressedLength(size);
}

// The serialization format is | uncompressedSize | compressedSize | data |
// when compressed.
// The serialization format is | uncompressedSize | compressedSize |
// compressed | data | when compressed.
void flush(OutputStream* stream) override {
constexpr int32_t kMaxCompressionAttemptsToSkip = 30;
const auto size = uncompressedSize();
Expand All @@ -174,12 +189,8 @@ class UnsafeRowVectorSerializer : public IterativeVectorSerializer {
++stats_.numCompressionSkipped;
} else {
// Compress the buffer if satisfied condition.
IOBufOutputStream out(
*pool_, nullptr, buffers_.size() * sizeof(std::string_view));
for (const auto& buffer : buffers_) {
out.write(buffer->asMutable<char>(), buffer->size());
}
const auto compressedBuffer = codec_->compress(out.getIOBuf().get());
const auto toCompress = buffersToIOBuf(buffers_);
const auto compressedBuffer = codec_->compress(toCompress.get());
const auto compressedSize = compressedBuffer->length();
stats_.compressionInputBytes += size;
stats_.compressedBytes += compressedSize;
Expand Down

0 comments on commit cdd52de

Please sign in to comment.