Skip to content

Commit

Permalink
refactor: Break PrestoSerializer.cpp into components (#11922)
Browse files Browse the repository at this point in the history
Summary:

PrestoSerializer.cpp has grown into a roughly 5000 line behemoth. It makes navigating the file a challenge.

This change is a pure refactor and does not introduce any changes besides moving things between files and renaming functions. It breaks the file up the code into 8 new files based on function:
* PrestoIterativeVectorSerializer: contains the PrestoIterativeVectorSerializer class
* PrestoBatchVectorSerializer: contains the PrestoBatchVectorSerializer class
* PrestoHeader: contains the PrestoHeader class which is used for reading/parsing the header of a PrestoPage
during deserialization
* VectorStream: contains the VectorStream class which is an appendable container for serialized values used
during serialization
* PrestoVectorLexer: contains the PrestoVectorLexer class along with Token and TokenType, this is used to lex 
serialized PrestoPages for the benefit of compression libraries like Zstrong.
* PrestoSerializerDeserializationUtils: contains the logic needed to deserialize a PrestoPage, notably the 
function readTopColumns which serves as the entry to deserialization
* PrestoSerializerEstimationUtils: contains the logic needed to estimate the size of a serialized PrestoPage, this 
logic is shared by PrestoBatchVectorSerializer and PrestoVectorSerde's estimateSerializedSize function (used 
with PrestoIterativeVectorSerializer)
* PrestoSerializerSerializationUtils: contains the generic logic needed to serialize a PrestoPage, notably the
function serializeColumn which serves as the entry point to serialization and the function flushStreams used 
write the final output to an OutputStream.  This logic is shared by PrestoBatchVectorSerializer, 
PrestoIterativeVectorSerializer, and PrestoVectorSerde's deserializeSingleColumn function.

These are all internal details of the PrestoSerializer (which is why they were in the cpp file) so I've placed all the
moved logic in a "detail" namespace (with the exception of Token and TokenType in PrestoVectorLexer).

I've verified that all tests pass and run the benchmark to verify I didn't introduce any regressions.

Differential Revision: D67543330
  • Loading branch information
Kevin Wilfong authored and facebook-github-bot committed Jan 2, 2025
1 parent 98932e5 commit a771dfd
Show file tree
Hide file tree
Showing 20 changed files with 5,150 additions and 4,626 deletions.
15 changes: 13 additions & 2 deletions velox/serializers/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,19 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
velox_add_library(velox_presto_serializer CompactRowSerializer.cpp
PrestoSerializer.cpp UnsafeRowSerializer.cpp)
velox_add_library(
velox_presto_serializer
CompactRowSerializer.cpp
PrestoSerializer.cpp
UnsafeRowSerializer.cpp
PrestoBatchVectorSerializer.cpp
PrestoHeader.cpp
PrestoIterativeVectorSerializer.cpp
PrestoSerializerDeserializationUtils.cpp
PrestoSerializerEstimationUtils.cpp
PrestoSerializerSerializationUtils.cpp
PrestoVectorLexer.cpp
VectorStream.cpp)

velox_link_libraries(velox_presto_serializer velox_vector velox_row_fast)

Expand Down
180 changes: 180 additions & 0 deletions velox/serializers/PrestoBatchVectorSerializer.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "velox/serializers/PrestoBatchVectorSerializer.h"

#include "velox/serializers/PrestoSerializerEstimationUtils.h"
#include "velox/serializers/PrestoSerializerSerializationUtils.h"
#include "velox/serializers/VectorStream.h"

namespace facebook::velox::serializer::presto::detail {
void PrestoBatchVectorSerializer::serialize(
const RowVectorPtr& vector,
const folly::Range<const IndexRange*>& ranges,
Scratch& scratch,
OutputStream* stream) {
const auto numRows = rangesTotalSize(ranges);
const auto rowType = vector->type();
const auto numChildren = vector->childrenSize();

StreamArena arena(pool_);
std::vector<std::unique_ptr<VectorStream>> streams(numChildren);
for (int i = 0; i < numChildren; i++) {
streams[i] = std::make_unique<VectorStream>(
rowType->childAt(i),
std::nullopt,
vector->childAt(i),
&arena,
numRows,
opts_);

if (numRows > 0) {
serializeColumn(vector->childAt(i), ranges, streams[i].get(), scratch);
}
}

flushStreams(
streams, numRows, arena, *codec_, opts_.minCompressionRatio, stream);
}

void PrestoBatchVectorSerializer::estimateSerializedSizeImpl(
const VectorPtr& vector,
const folly::Range<const IndexRange*>& ranges,
vector_size_t** sizes,
Scratch& scratch) {
switch (vector->encoding()) {
case VectorEncoding::Simple::FLAT:
VELOX_DYNAMIC_SCALAR_TYPE_DISPATCH_ALL(
estimateFlatSerializedSize,
vector->typeKind(),
vector.get(),
ranges,
sizes);
break;
case VectorEncoding::Simple::CONSTANT:
VELOX_DYNAMIC_TYPE_DISPATCH_ALL(
estimateConstantSerializedSize,
vector->typeKind(),
vector,
ranges,
sizes,
scratch);
break;
case VectorEncoding::Simple::DICTIONARY:
VELOX_DYNAMIC_TYPE_DISPATCH_ALL(
estimateDictionarySerializedSize,
vector->typeKind(),
vector,
ranges,
sizes,
scratch);
break;
case VectorEncoding::Simple::ROW: {
if (!vector->mayHaveNulls()) {
// Add the size of the offsets in the Row encoding.
for (int32_t i = 0; i < ranges.size(); ++i) {
*sizes[i] += ranges[i].size * sizeof(int32_t);
}

auto rowVector = vector->as<RowVector>();
auto& children = rowVector->children();
for (auto& child : children) {
if (child) {
estimateSerializedSizeImpl(child, ranges, sizes, scratch);
}
}

break;
}

std::vector<IndexRange> childRanges;
std::vector<vector_size_t*> childSizes;
for (int32_t i = 0; i < ranges.size(); ++i) {
// Add the size of the nulls bit mask.
*sizes[i] += bits::nbytes(ranges[i].size);

auto begin = ranges[i].begin;
auto end = begin + ranges[i].size;
for (auto offset = begin; offset < end; ++offset) {
// Add the size of the offset.
*sizes[i] += sizeof(int32_t);
if (!vector->isNullAt(offset)) {
childRanges.push_back(IndexRange{offset, 1});
childSizes.push_back(sizes[i]);
}
}
}

auto rowVector = vector->as<RowVector>();
auto& children = rowVector->children();
for (auto& child : children) {
if (child) {
estimateSerializedSizeImpl(
child,
folly::Range(childRanges.data(), childRanges.size()),
childSizes.data(),
scratch);
}
}

break;
}
case VectorEncoding::Simple::MAP: {
auto mapVector = vector->as<MapVector>();
std::vector<IndexRange> childRanges;
std::vector<vector_size_t*> childSizes;
expandRepeatedRanges(
mapVector,
mapVector->rawOffsets(),
mapVector->rawSizes(),
ranges,
sizes,
&childRanges,
&childSizes);
estimateSerializedSizeImpl(
mapVector->mapKeys(), childRanges, childSizes.data(), scratch);
estimateSerializedSizeImpl(
mapVector->mapValues(), childRanges, childSizes.data(), scratch);
break;
}
case VectorEncoding::Simple::ARRAY: {
auto arrayVector = vector->as<ArrayVector>();
std::vector<IndexRange> childRanges;
std::vector<vector_size_t*> childSizes;
expandRepeatedRanges(
arrayVector,
arrayVector->rawOffsets(),
arrayVector->rawSizes(),
ranges,
sizes,
&childRanges,
&childSizes);
estimateSerializedSizeImpl(
arrayVector->elements(), childRanges, childSizes.data(), scratch);
break;
}
case VectorEncoding::Simple::LAZY:
estimateSerializedSizeImpl(
vector->as<LazyVector>()->loadedVectorShared(),
ranges,
sizes,
scratch);
break;
default:
VELOX_UNSUPPORTED("Unsupported vector encoding {}", vector->encoding());
}
}
} // namespace facebook::velox::serializer::presto::detail
56 changes: 56 additions & 0 deletions velox/serializers/PrestoBatchVectorSerializer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once

#include "velox/serializers/PrestoSerializer.h"
#include "velox/vector/VectorStream.h"

namespace facebook::velox::serializer::presto::detail {
class PrestoBatchVectorSerializer : public BatchVectorSerializer {
public:
PrestoBatchVectorSerializer(
memory::MemoryPool* pool,
const PrestoVectorSerde::PrestoOptions& opts)
: pool_(pool),
codec_(common::compressionKindToCodec(opts.compressionKind)),
opts_(opts) {}

void serialize(
const RowVectorPtr& vector,
const folly::Range<const IndexRange*>& ranges,
Scratch& scratch,
OutputStream* stream) override;

void estimateSerializedSize(
VectorPtr vector,
const folly::Range<const IndexRange*>& ranges,
vector_size_t** sizes,
Scratch& scratch) override {
estimateSerializedSizeImpl(vector, ranges, sizes, scratch);
}

private:
void estimateSerializedSizeImpl(
const VectorPtr& vector,
const folly::Range<const IndexRange*>& ranges,
vector_size_t** sizes,
Scratch& scratch);

memory::MemoryPool* pool_;
const std::unique_ptr<folly::io::Codec> codec_;
PrestoVectorSerde::PrestoOptions opts_;
};
} // namespace facebook::velox::serializer::presto::detail
75 changes: 75 additions & 0 deletions velox/serializers/PrestoHeader.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "velox/serializers/PrestoHeader.h"

#include "velox/serializers/PrestoSerializerSerializationUtils.h"

namespace facebook::velox::serializer::presto::detail {
/* static */ Expected<PrestoHeader> PrestoHeader::read(
ByteInputStream* source) {
if (source->remainingSize() < kHeaderSize) {
return folly::makeUnexpected(Status::Invalid(
fmt::format("{} bytes for header", source->remainingSize())));
}
PrestoHeader header;
header.numRows = source->read<int32_t>();
header.pageCodecMarker = source->read<int8_t>();
header.uncompressedSize = source->read<int32_t>();
header.compressedSize = source->read<int32_t>();
header.checksum = source->read<int64_t>();

if (header.numRows < 0) {
return folly::makeUnexpected(
Status::Invalid(fmt::format("negative numRows: {}", header.numRows)));
}
if (header.uncompressedSize < 0) {
return folly::makeUnexpected(Status::Invalid(
fmt::format("negative uncompressedSize: {}", header.uncompressedSize)));
}
if (header.compressedSize < 0) {
return folly::makeUnexpected(Status::Invalid(
fmt::format("negative compressedSize: {}", header.compressedSize)));
}

return header;
}

/* static */ std::optional<PrestoHeader> PrestoHeader::read(
std::string_view* source) {
if (source->size() < kHeaderSize) {
return std::nullopt;
}

PrestoHeader header;
header.numRows = readInt<int32_t>(source);
header.pageCodecMarker = readInt<int8_t>(source);
header.uncompressedSize = readInt<int32_t>(source);
header.compressedSize = readInt<int32_t>(source);
header.checksum = readInt<int64_t>(source);

if (header.numRows < 0) {
return std::nullopt;
}
if (header.uncompressedSize < 0) {
return std::nullopt;
}
if (header.compressedSize < 0) {
return std::nullopt;
}

return header;
}
} // namespace facebook::velox::serializer::presto::detail
42 changes: 42 additions & 0 deletions velox/serializers/PrestoHeader.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once

#include "velox/common/base/Status.h"
#include "velox/common/memory/ByteStream.h"

namespace facebook::velox::serializer::presto::detail {

struct PrestoHeader {
int32_t numRows;
int8_t pageCodecMarker;
int32_t uncompressedSize;
int32_t compressedSize;
int64_t checksum;

static Expected<PrestoHeader> read(ByteInputStream* source);

static std::optional<PrestoHeader> read(std::string_view* source);

template <typename T>
static T readInt(std::string_view* source) {
assert(source->size() >= sizeof(T));
auto value = folly::loadUnaligned<T>(source->data());
source->remove_prefix(sizeof(T));
return value;
}
};
} // namespace facebook::velox::serializer::presto::detail
Loading

0 comments on commit a771dfd

Please sign in to comment.