From b8871b1f2d2a175e5bc63664de1014c226e7ef65 Mon Sep 17 00:00:00 2001 From: Kevin Wilfong Date: Fri, 2 Feb 2024 15:47:47 -0800 Subject: [PATCH] Introduce BatchVectorSerializer to better support preserving encodings when serializing Vectors (#8567) Summary: Pull Request resolved: https://github.com/facebookincubator/velox/pull/8567 High Level Goal: Support preserving encodings in serialized Vectors from PartitionedOutput when the "arbitrary" partitioning scheme is enabled. This is specifically for Presto, in order to match the Presto Java behavior, and because we've seen in practice this can dramatically reduce the amount of shuffled data improving performance. This change: Looking at the way PrestoVectorSerializer handles encodings, it's currently not generic, and it looks very fragile. E.g. we need to make sure that flush is called after every append, which is currently impossible at compile time, and I'm not 100% confident is always guaranteed at run time. Since we now need to expose this through the generic VectorSerde interface, I figured it was a good time to clean it up a little before it's further cemented. To do this, I've added a new interface BatchVectorSerializer which only supports a single API serialize which effectively combines append and flush so we guarantee that only rows from a single Vector are written for every flush. Thanks to this it also doesn't have to maintain the streams across calls, so it doesn't need the VectorStreamGroup. In the PrestoBatchVectorSerializer implementation, we can take advantage of this to set the encodings based on the single RowVector we're serializing on each call. My hope is with this, we can remove PrestoVectorSerde::serializeEncoded replacing it with BatchVectorSerializer, and also remove the notion of encodings from PrestoVectorSerializer so we don't need to worry about calling append twice without a flush on a PrestoVectorSerializer initialized with encodings. If there's general agreement on this approach I'll do that in a follow up shortly after this. This will also give me a bit more isolation to implement the rest of the features that are needed for the high level goal (calculating the serialized size of Vector slices with encodings, splitting very large RowVectors while maintaining encodings, etc.) in order to match the Presto Java behavior, without breaking the existing serialization logic. Reviewed By: xiaoxmeng Differential Revision: D53144805 fbshipit-source-id: efbe557936f06c92bac9e3e926add1943712cda3 --- velox/serializers/PrestoSerializer.cpp | 335 +++++++++++------- velox/serializers/PrestoSerializer.h | 19 +- .../tests/PrestoSerializerTest.cpp | 131 ++++--- velox/vector/VectorStream.cpp | 59 ++- velox/vector/VectorStream.h | 49 +++ 5 files changed, 403 insertions(+), 190 deletions(-) diff --git a/velox/serializers/PrestoSerializer.cpp b/velox/serializers/PrestoSerializer.cpp index 1786ad5dc2f7..e7363e762e6d 100644 --- a/velox/serializers/PrestoSerializer.cpp +++ b/velox/serializers/PrestoSerializer.cpp @@ -29,6 +29,19 @@ namespace { constexpr int8_t kCompressedBitMask = 1; constexpr int8_t kEncryptedBitMask = 2; constexpr int8_t kCheckSumBitMask = 4; +// uncompressed size comes after the number of rows and the codec +constexpr int32_t kSizeInBytesOffset{4 + 1}; +// There header for a page is: +// + number of rows (4 bytes) +// + codec (1 byte) +// + uncompressed size (4 bytes) +// + size (4 bytes) (this is the compressed size if the data is compressed, +// otherwise it's uncompressed size again) +// + checksum (8 bytes) +// +// See https://prestodb.io/docs/current/develop/serialized-page.html for a +// detailed specification of the format. +constexpr int32_t kHeaderSize{kSizeInBytesOffset + 4 + 4 + 8}; static inline const std::string_view kRLE{"RLE"}; static inline const std::string_view kDictionary{"DICTIONARY"}; @@ -3069,6 +3082,183 @@ void estimateSerializedSizeInt( } } +void flushUncompressed( + const std::vector>& streams, + int32_t numRows, + OutputStream* out, + PrestoOutputStreamListener* listener) { + int32_t offset = out->tellp(); + + char codecMask = 0; + if (listener) { + codecMask = getCodecMarker(); + } + // Pause CRC computation + if (listener) { + listener->pause(); + } + + writeInt32(out, numRows); + out->write(&codecMask, 1); + + // Make space for uncompressedSizeInBytes & sizeInBytes + writeInt32(out, 0); + writeInt32(out, 0); + // Write zero checksum. + writeInt64(out, 0); + + // Number of columns and stream content. Unpause CRC. + if (listener) { + listener->resume(); + } + writeInt32(out, streams.size()); + + for (auto& stream : streams) { + stream->flush(out); + } + + // Pause CRC computation + if (listener) { + listener->pause(); + } + + // Fill in uncompressedSizeInBytes & sizeInBytes + int32_t size = (int32_t)out->tellp() - offset; + int32_t uncompressedSize = size - kHeaderSize; + int64_t crc = 0; + if (listener) { + crc = computeChecksum(listener, codecMask, numRows, uncompressedSize); + } + + out->seekp(offset + kSizeInBytesOffset); + writeInt32(out, uncompressedSize); + writeInt32(out, uncompressedSize); + writeInt64(out, crc); + out->seekp(offset + size); +} + +void flushCompressed( + const std::vector>& streams, + const StreamArena& arena, + folly::io::Codec& codec, + int32_t numRows, + OutputStream* output, + PrestoOutputStreamListener* listener) { + char codecMask = kCompressedBitMask; + if (listener) { + codecMask |= kCheckSumBitMask; + } + + // Pause CRC computation + if (listener) { + listener->pause(); + } + + writeInt32(output, numRows); + output->write(&codecMask, 1); + + IOBufOutputStream out(*(arena.pool()), nullptr, arena.size()); + writeInt32(&out, streams.size()); + + for (auto& stream : streams) { + stream->flush(&out); + } + + const int32_t uncompressedSize = out.tellp(); + VELOX_CHECK_LE( + uncompressedSize, + codec.maxUncompressedLength(), + "UncompressedSize exceeds limit"); + auto compressed = codec.compress(out.getIOBuf().get()); + const int32_t compressedSize = compressed->length(); + writeInt32(output, uncompressedSize); + writeInt32(output, compressedSize); + const int32_t crcOffset = output->tellp(); + writeInt64(output, 0); // Write zero checksum + // Number of columns and stream content. Unpause CRC. + if (listener) { + listener->resume(); + } + output->write( + reinterpret_cast(compressed->writableData()), + compressed->length()); + // Pause CRC computation + if (listener) { + listener->pause(); + } + const int32_t endSize = output->tellp(); + // Fill in crc + int64_t crc = 0; + if (listener) { + crc = computeChecksum(listener, codecMask, numRows, compressedSize); + } + output->seekp(crcOffset); + writeInt64(output, crc); + output->seekp(endSize); +} + +// Writes the contents to 'out' in wire format +void flushStreams( + const std::vector>& streams, + int32_t numRows, + const StreamArena& arena, + folly::io::Codec& codec, + OutputStream* out) { + auto listener = dynamic_cast(out->listener()); + // Reset CRC computation + if (listener) { + listener->reset(); + } + + if (!needCompression(codec)) { + flushUncompressed(streams, numRows, out, listener); + } else { + flushCompressed(streams, arena, codec, numRows, out, listener); + } +} + +class PrestoBatchVectorSerializer : public BatchVectorSerializer { + public: + PrestoBatchVectorSerializer( + memory::MemoryPool* pool, + bool useLosslessTimestamp, + common::CompressionKind compressionKind) + : pool_(pool), + useLosslessTimestamp_(useLosslessTimestamp), + codec_(common::compressionKindToCodec(compressionKind)) {} + + void serialize( + const RowVectorPtr& vector, + const folly::Range& ranges, + Scratch& /* scratch */, + OutputStream* stream) override { + const auto numRows = rangesTotalSize(ranges); + const auto rowType = vector->type(); + const auto numChildren = vector->childrenSize(); + + StreamArena arena(pool_); + std::vector> streams(numChildren); + for (int i = 0; i < numChildren; i++) { + streams[i] = std::make_unique( + rowType->childAt(i), + std::nullopt, + vector->childAt(i), + &arena, + numRows, + useLosslessTimestamp_); + + serializeColumn(vector->childAt(i).get(), ranges, streams[i].get()); + } + + flushStreams(streams, numRows, arena, *codec_, stream); + } + + private: + memory::MemoryPool* pool_; + const bool useLosslessTimestamp_; + const std::unique_ptr codec_; +}; + class PrestoVectorSerializer : public VectorSerializer { public: PrestoVectorSerializer( @@ -3170,7 +3360,7 @@ class PrestoVectorSerializer : public VectorSerializer { // numRows(4) | codec(1) | uncompressedSize(4) | compressedSize(4) | // checksum(8) | data void flush(OutputStream* out) override { - flushInternal(numRows_, out); + flushStreams(streams_, numRows_, *streamArena_, *codec_, out); } void flushEncoded(const RowVectorPtr& vector, OutputStream* out) { @@ -3180,141 +3370,10 @@ class PrestoVectorSerializer : public VectorSerializer { Scratch scratch; append(vector, folly::Range(ranges.data(), ranges.size()), scratch); - flushInternal(vector->size(), out); + flushStreams(streams_, vector->size(), *streamArena_, *codec_, out); } private: - void flushUncompressed( - int32_t numRows, - OutputStream* out, - PrestoOutputStreamListener* listener) { - int32_t offset = out->tellp(); - - char codec = 0; - if (listener) { - codec = getCodecMarker(); - } - // Pause CRC computation - if (listener) { - listener->pause(); - } - - writeInt32(out, numRows); - out->write(&codec, 1); - - // Make space for uncompressedSizeInBytes & sizeInBytes - writeInt32(out, 0); - writeInt32(out, 0); - // Write zero checksum. - writeInt64(out, 0); - - // Number of columns and stream content. Unpause CRC. - if (listener) { - listener->resume(); - } - writeInt32(out, streams_.size()); - - for (auto& stream : streams_) { - stream->flush(out); - } - - // Pause CRC computation - if (listener) { - listener->pause(); - } - - // Fill in uncompressedSizeInBytes & sizeInBytes - int32_t size = (int32_t)out->tellp() - offset; - int32_t uncompressedSize = size - kHeaderSize; - int64_t crc = 0; - if (listener) { - crc = computeChecksum(listener, codec, numRows, uncompressedSize); - } - - out->seekp(offset + kSizeInBytesOffset); - writeInt32(out, uncompressedSize); - writeInt32(out, uncompressedSize); - writeInt64(out, crc); - out->seekp(offset + size); - } - - void flushCompressed( - int32_t numRows, - OutputStream* output, - PrestoOutputStreamListener* listener) { - const int32_t offset = output->tellp(); - char codec = kCompressedBitMask; - if (listener) { - codec |= kCheckSumBitMask; - } - - // Pause CRC computation - if (listener) { - listener->pause(); - } - - writeInt32(output, numRows); - output->write(&codec, 1); - - IOBufOutputStream out( - *(streamArena_->pool()), nullptr, streamArena_->size()); - writeInt32(&out, streams_.size()); - - for (auto& stream : streams_) { - stream->flush(&out); - } - - const int32_t uncompressedSize = out.tellp(); - VELOX_CHECK_LE( - uncompressedSize, - codec_->maxUncompressedLength(), - "UncompressedSize exceeds limit"); - auto compressed = codec_->compress(out.getIOBuf().get()); - const int32_t compressedSize = compressed->length(); - writeInt32(output, uncompressedSize); - writeInt32(output, compressedSize); - const int32_t crcOffset = output->tellp(); - writeInt64(output, 0); // Write zero checksum - // Number of columns and stream content. Unpause CRC. - if (listener) { - listener->resume(); - } - output->write( - reinterpret_cast(compressed->writableData()), - compressed->length()); - // Pause CRC computation - if (listener) { - listener->pause(); - } - const int32_t endSize = output->tellp(); - // Fill in crc - int64_t crc = 0; - if (listener) { - crc = computeChecksum(listener, codec, numRows, compressedSize); - } - output->seekp(crcOffset); - writeInt64(output, crc); - output->seekp(endSize); - } - - // Writes the contents to 'stream' in wire format - void flushInternal(int32_t numRows, OutputStream* out) { - auto listener = dynamic_cast(out->listener()); - // Reset CRC computation - if (listener) { - listener->reset(); - } - - if (!needCompression(*codec_)) { - flushUncompressed(numRows, out, listener); - } else { - flushCompressed(numRows, out, listener); - } - } - - static const int32_t kSizeInBytesOffset{4 + 1}; - static const int32_t kHeaderSize{kSizeInBytesOffset + 4 + 4 + 8}; - StreamArena* const streamArena_; const std::unique_ptr codec_; @@ -3354,7 +3413,15 @@ std::unique_ptr PrestoVectorSerde::createSerializer( prestoOptions.compressionKind); } -void PrestoVectorSerde::serializeEncoded( +std::unique_ptr PrestoVectorSerde::createBatchSerializer( + memory::MemoryPool* pool, + const Options* options) { + const auto prestoOptions = toPrestoOptions(options); + return std::make_unique( + pool, prestoOptions.useLosslessTimestamp, prestoOptions.compressionKind); +} + +void PrestoVectorSerde::deprecatedSerializeEncoded( const RowVectorPtr& vector, StreamArena* streamArena, const Options* options, diff --git a/velox/serializers/PrestoSerializer.h b/velox/serializers/PrestoSerializer.h index dfc647ac7c0e..7c3202bdd6b0 100644 --- a/velox/serializers/PrestoSerializer.h +++ b/velox/serializers/PrestoSerializer.h @@ -35,10 +35,9 @@ namespace facebook::velox::serializer::presto { /// efficient for a selective subset, e.g. when splitting a vector to a large /// number of output shuffle destinations. /// -/// 2. To serialize a single RowVector, one can use the serializeEncoded() -/// method. Since it serializes a single RowVector, it tries to preserve the -/// encodings of the input data. Check the method documentation below to learn -/// about the cases in which encodings are preserved. +/// 2. To serialize a single RowVector, one can use the BatchVectorSerializer +/// returned by createBatchSerializer(). Since it serializes a single RowVector, +/// it tries to preserve the encodings of the input data. class PrestoVectorSerde : public VectorSerde { public: // Input options that the serializer recognizes. @@ -83,6 +82,13 @@ class PrestoVectorSerde : public VectorSerde { StreamArena* streamArena, const Options* options) override; + /// Note that in addition to the differences highlighted in the VectorSerde + /// interface, BatchVectorSerializer returned by this function can maintain + /// the encodings of the input vectors recursively. + std::unique_ptr createBatchSerializer( + memory::MemoryPool* pool, + const Options* options) override; + /// Serializes a single RowVector with possibly encoded children, preserving /// their encodings. Encodings are preserved recursively for any RowVector /// children, but not for children of other nested vectors such as Array, Map, @@ -93,7 +99,10 @@ class PrestoVectorSerde : public VectorSerde { /// /// In order to override the encodings of top-level columns in the RowVector, /// you can specifiy the encodings using PrestoOptions.encodings - void serializeEncoded( + /// + /// DEPRECATED: Use createBatchSerializer and the BatchVectorSerializer's + /// serialize function instead. + void deprecatedSerializeEncoded( const RowVectorPtr& vector, StreamArena* streamArena, const Options* options, diff --git a/velox/serializers/tests/PrestoSerializerTest.cpp b/velox/serializers/tests/PrestoSerializerTest.cpp index d6ef26871900..8922eb8c106b 100644 --- a/velox/serializers/tests/PrestoSerializerTest.cpp +++ b/velox/serializers/tests/PrestoSerializerTest.cpp @@ -265,7 +265,7 @@ class PrestoSerializerTest paramOptions.encodings.push_back(child->encoding()); } - serde_->serializeEncoded(rowVector, &arena, ¶mOptions, &out); + serde_->deprecatedSerializeEncoded(rowVector, &arena, ¶mOptions, &out); } void assertEqualEncoding( @@ -283,19 +283,16 @@ class PrestoSerializerTest } } - void testEncodedRoundTrip( - const RowVectorPtr& data, - const serializer::presto::PrestoVectorSerde::PrestoOptions* serdeOptions = - nullptr) { - std::ostringstream out; - serializeEncoded(data, &out, serdeOptions); - const auto serialized = out.str(); - - auto rowType = asRowType(data->type()); + void verifySerializedEncodedData( + const RowVectorPtr& original, + const std::string& serialized, + const serializer::presto::PrestoVectorSerde::PrestoOptions* + serdeOptions) { + auto rowType = asRowType(original->type()); auto deserialized = deserialize(rowType, serialized, serdeOptions); - assertEqualVectors(data, deserialized); - assertEqualEncoding(data, deserialized); + assertEqualVectors(original, deserialized); + assertEqualEncoding(original, deserialized); // Deserialize 3 times while appending to a single vector. auto paramOptions = getParamSerdeOptions(serdeOptions); @@ -308,14 +305,84 @@ class PrestoSerializerTest offset = result->size(); } - auto expected = BaseVector::create(data->type(), data->size() * 3, pool()); + auto expected = + BaseVector::create(original->type(), original->size() * 3, pool()); for (auto i = 0; i < 3; ++i) { - expected->copy(data.get(), data->size() * i, 0, data->size()); + expected->copy(original.get(), original->size() * i, 0, original->size()); } assertEqualVectors(expected, result); } + void testEncodedRoundTrip( + const RowVectorPtr& data, + const serializer::presto::PrestoVectorSerde::PrestoOptions* serdeOptions = + nullptr) { + std::ostringstream out; + serializeEncoded(data, &out, serdeOptions); + const auto serialized = out.str(); + + verifySerializedEncodedData(data, serialized, serdeOptions); + } + + void serializeBatch( + const RowVectorPtr& rowVector, + std::ostream* output, + const serializer::presto::PrestoVectorSerde::PrestoOptions* + serdeOptions) { + facebook::velox::serializer::presto::PrestoOutputStreamListener listener; + OStreamOutputStream out(output, &listener); + auto paramOptions = getParamSerdeOptions(serdeOptions); + + auto serializer = serde_->createBatchSerializer(pool_.get(), ¶mOptions); + serializer->serialize(rowVector, &out); + } + + void testBatchVectorSerializerRoundTrip( + const RowVectorPtr& data, + const serializer::presto::PrestoVectorSerde::PrestoOptions* serdeOptions = + nullptr) { + std::ostringstream out; + serializeBatch(data, &out, serdeOptions); + const auto serialized = out.str(); + + verifySerializedEncodedData(data, serialized, serdeOptions); + } + + RowVectorPtr encodingsTestVector() { + auto baseNoNulls = makeFlatVector({1, 2, 3, 4}); + auto baseWithNulls = + makeNullableFlatVector({1, std::nullopt, 2, 3}); + auto baseArray = + makeArrayVector({{1, 2, 3}, {}, {4, 5}, {6, 7, 8, 9, 10}}); + auto indices = makeIndices(8, [](auto row) { return row / 2; }); + + return makeRowVector({ + BaseVector::wrapInDictionary(nullptr, indices, 8, baseNoNulls), + BaseVector::wrapInDictionary(nullptr, indices, 8, baseWithNulls), + BaseVector::wrapInDictionary(nullptr, indices, 8, baseArray), + BaseVector::createConstant(INTEGER(), 123, 8, pool_.get()), + BaseVector::createNullConstant(VARCHAR(), 8, pool_.get()), + BaseVector::wrapInConstant(8, 1, baseArray), + BaseVector::wrapInConstant(8, 2, baseArray), + makeRowVector({ + BaseVector::wrapInDictionary(nullptr, indices, 8, baseNoNulls), + BaseVector::wrapInDictionary(nullptr, indices, 8, baseWithNulls), + BaseVector::wrapInDictionary(nullptr, indices, 8, baseArray), + BaseVector::createConstant(INTEGER(), 123, 8, pool_.get()), + BaseVector::createNullConstant(VARCHAR(), 8, pool_.get()), + BaseVector::wrapInConstant(8, 1, baseArray), + BaseVector::wrapInConstant(8, 2, baseArray), + makeRowVector({ + BaseVector::wrapInDictionary( + nullptr, indices, 8, baseWithNulls), + BaseVector::createConstant(INTEGER(), 123, 8, pool_.get()), + BaseVector::wrapInConstant(8, 2, baseArray), + }), + }), + }); + } + std::unique_ptr serde_; }; @@ -515,37 +582,13 @@ TEST_P(PrestoSerializerTest, longDecimal) { // Test that hierarchically encoded columns (rows) have their encodings // preserved. TEST_P(PrestoSerializerTest, encodings) { - auto baseNoNulls = makeFlatVector({1, 2, 3, 4}); - auto baseWithNulls = makeNullableFlatVector({1, std::nullopt, 2, 3}); - auto baseArray = - makeArrayVector({{1, 2, 3}, {}, {4, 5}, {6, 7, 8, 9, 10}}); - auto indices = makeIndices(8, [](auto row) { return row / 2; }); - - auto data = makeRowVector({ - BaseVector::wrapInDictionary(nullptr, indices, 8, baseNoNulls), - BaseVector::wrapInDictionary(nullptr, indices, 8, baseWithNulls), - BaseVector::wrapInDictionary(nullptr, indices, 8, baseArray), - BaseVector::createConstant(INTEGER(), 123, 8, pool_.get()), - BaseVector::createNullConstant(VARCHAR(), 8, pool_.get()), - BaseVector::wrapInConstant(8, 1, baseArray), - BaseVector::wrapInConstant(8, 2, baseArray), - makeRowVector({ - BaseVector::wrapInDictionary(nullptr, indices, 8, baseNoNulls), - BaseVector::wrapInDictionary(nullptr, indices, 8, baseWithNulls), - BaseVector::wrapInDictionary(nullptr, indices, 8, baseArray), - BaseVector::createConstant(INTEGER(), 123, 8, pool_.get()), - BaseVector::createNullConstant(VARCHAR(), 8, pool_.get()), - BaseVector::wrapInConstant(8, 1, baseArray), - BaseVector::wrapInConstant(8, 2, baseArray), - makeRowVector({ - BaseVector::wrapInDictionary(nullptr, indices, 8, baseWithNulls), - BaseVector::createConstant(INTEGER(), 123, 8, pool_.get()), - BaseVector::wrapInConstant(8, 2, baseArray), - }), - }), - }); + testEncodedRoundTrip(encodingsTestVector()); +} - testEncodedRoundTrip(data); +// Test that hierarchically encoded columns (rows) have their encodings +// preserved by the PrestoBatchVectorSerializer. +TEST_P(PrestoSerializerTest, encodingsBatchVectorSerializer) { + testBatchVectorSerializerRoundTrip(encodingsTestVector()); } TEST_P(PrestoSerializerTest, scatterEncoded) { diff --git a/velox/vector/VectorStream.cpp b/velox/vector/VectorStream.cpp index fec8eb7a9fd1..758874c11239 100644 --- a/velox/vector/VectorStream.cpp +++ b/velox/vector/VectorStream.cpp @@ -18,14 +18,40 @@ #include "velox/common/base/RawVector.h" namespace facebook::velox { - -void VectorSerializer::append(const RowVectorPtr& vector) { - const IndexRange allRows{0, vector->size()}; - Scratch scratch; - append(vector, folly::Range(&allRows, 1), scratch); -} - namespace { +// An adapter class that can be used to convert a VectorSerializer into a +// BatchVectorSerializer for VectorSerdes that don't want to implement a +// separate serializer. +class DefaultBatchVectorSerializer : public BatchVectorSerializer { + public: + DefaultBatchVectorSerializer( + memory::MemoryPool* pool, + VectorSerde* serde, + const VectorSerde::Options* options) + : pool_(pool), serde_(serde), options_(options) {} + + void serialize( + const RowVectorPtr& vector, + const folly::Range& ranges, + Scratch& scratch, + OutputStream* stream) override { + size_t numRows = 0; + for (const auto& range : ranges) { + numRows += range.size; + } + + StreamArena arena(pool_); + auto serializer = serde_->createSerializer( + asRowType(vector->type()), numRows, &arena, options_); + serializer->append(vector, ranges, scratch); + serializer->flush(stream); + } + + private: + memory::MemoryPool* const pool_; + VectorSerde* const serde_; + const VectorSerde::Options* const options_; +}; std::unique_ptr& getVectorSerdeImpl() { static std::unique_ptr serde; @@ -41,6 +67,25 @@ getNamedVectorSerdeImpl() { } // namespace +void VectorSerializer::append(const RowVectorPtr& vector) { + const IndexRange allRows{0, vector->size()}; + Scratch scratch; + append(vector, folly::Range(&allRows, 1), scratch); +} + +void BatchVectorSerializer::serialize( + const RowVectorPtr& vector, + OutputStream* stream) { + const IndexRange allRows{0, vector->size()}; + serialize(vector, folly::Range(&allRows, 1), stream); +} + +std::unique_ptr VectorSerde::createBatchSerializer( + memory::MemoryPool* pool, + const Options* options) { + return std::make_unique(pool, this, options); +} + VectorSerde* getVectorSerde() { auto serde = getVectorSerdeImpl().get(); VELOX_CHECK_NOT_NULL(serde, "Vector serde is not registered."); diff --git a/velox/vector/VectorStream.h b/velox/vector/VectorStream.h index 5f1899195a73..ef9a38eb8094 100644 --- a/velox/vector/VectorStream.h +++ b/velox/vector/VectorStream.h @@ -33,6 +33,12 @@ struct IndexRange { vector_size_t size; }; +/// Serializer that can iteratively build up a buffer of serialized rows from +/// one or more RowVectors. +/// +/// Uses successive calls to `append` to add more rows to the serialization +/// buffer. Then call `flush` to write the aggregate serialized data to an +/// OutputStream. class VectorSerializer { public: virtual ~VectorSerializer() = default; @@ -82,6 +88,34 @@ class VectorSerializer { virtual void flush(OutputStream* stream) = 0; }; +/// Serializer that writes a subset of rows from a single RowVector to the +/// OutputStream. +/// +/// Each serialize() call serializes the specified range(s) of `vector` and +/// write them to the output stream. +class BatchVectorSerializer { + public: + virtual ~BatchVectorSerializer() = default; + + /// Serializes a subset of rows in a vector. + virtual void serialize( + const RowVectorPtr& vector, + const folly::Range& ranges, + Scratch& scratch, + OutputStream* stream) = 0; + + virtual void serialize( + const RowVectorPtr& vector, + const folly::Range& ranges, + OutputStream* stream) { + Scratch scratch; + serialize(vector, ranges, scratch, stream); + } + + /// Serializes all rows in a vector. + void serialize(const RowVectorPtr& vector, OutputStream* stream); +}; + class VectorSerde { public: virtual ~VectorSerde() = default; @@ -119,12 +153,27 @@ class VectorSerde { estimateSerializedSize(vector, ranges, sizes, scratch); } + /// Creates a Vector Serializer that iteratively builds up a buffer of + /// serialized rows from one or more RowVectors via append, and then writes to + /// an OutputSteam via flush. + /// + /// This is more appropriate if the use case involves many small writes, e.g. + /// partitioning a RowVector across multiple destinations. virtual std::unique_ptr createSerializer( RowTypePtr type, int32_t numRows, StreamArena* streamArena, const Options* options = nullptr) = 0; + /// Creates a Vector Serializer that writes a subset of rows from a single + /// RowVector to the OutputStream via a single serialize API. + /// + /// This is more appropriate if the use case involves large writes, e.g. + /// sending an entire RowVector to a particular destination. + virtual std::unique_ptr createBatchSerializer( + memory::MemoryPool* pool, + const Options* options = nullptr); + virtual void deserialize( ByteInputStream* source, velox::memory::MemoryPool* pool,