diff --git a/velox/serializers/PrestoSerializer.cpp b/velox/serializers/PrestoSerializer.cpp index 1786ad5dc2f7..e7363e762e6d 100644 --- a/velox/serializers/PrestoSerializer.cpp +++ b/velox/serializers/PrestoSerializer.cpp @@ -29,6 +29,19 @@ namespace { constexpr int8_t kCompressedBitMask = 1; constexpr int8_t kEncryptedBitMask = 2; constexpr int8_t kCheckSumBitMask = 4; +// uncompressed size comes after the number of rows and the codec +constexpr int32_t kSizeInBytesOffset{4 + 1}; +// There header for a page is: +// + number of rows (4 bytes) +// + codec (1 byte) +// + uncompressed size (4 bytes) +// + size (4 bytes) (this is the compressed size if the data is compressed, +// otherwise it's uncompressed size again) +// + checksum (8 bytes) +// +// See https://prestodb.io/docs/current/develop/serialized-page.html for a +// detailed specification of the format. +constexpr int32_t kHeaderSize{kSizeInBytesOffset + 4 + 4 + 8}; static inline const std::string_view kRLE{"RLE"}; static inline const std::string_view kDictionary{"DICTIONARY"}; @@ -3069,6 +3082,183 @@ void estimateSerializedSizeInt( } } +void flushUncompressed( + const std::vector>& streams, + int32_t numRows, + OutputStream* out, + PrestoOutputStreamListener* listener) { + int32_t offset = out->tellp(); + + char codecMask = 0; + if (listener) { + codecMask = getCodecMarker(); + } + // Pause CRC computation + if (listener) { + listener->pause(); + } + + writeInt32(out, numRows); + out->write(&codecMask, 1); + + // Make space for uncompressedSizeInBytes & sizeInBytes + writeInt32(out, 0); + writeInt32(out, 0); + // Write zero checksum. + writeInt64(out, 0); + + // Number of columns and stream content. Unpause CRC. + if (listener) { + listener->resume(); + } + writeInt32(out, streams.size()); + + for (auto& stream : streams) { + stream->flush(out); + } + + // Pause CRC computation + if (listener) { + listener->pause(); + } + + // Fill in uncompressedSizeInBytes & sizeInBytes + int32_t size = (int32_t)out->tellp() - offset; + int32_t uncompressedSize = size - kHeaderSize; + int64_t crc = 0; + if (listener) { + crc = computeChecksum(listener, codecMask, numRows, uncompressedSize); + } + + out->seekp(offset + kSizeInBytesOffset); + writeInt32(out, uncompressedSize); + writeInt32(out, uncompressedSize); + writeInt64(out, crc); + out->seekp(offset + size); +} + +void flushCompressed( + const std::vector>& streams, + const StreamArena& arena, + folly::io::Codec& codec, + int32_t numRows, + OutputStream* output, + PrestoOutputStreamListener* listener) { + char codecMask = kCompressedBitMask; + if (listener) { + codecMask |= kCheckSumBitMask; + } + + // Pause CRC computation + if (listener) { + listener->pause(); + } + + writeInt32(output, numRows); + output->write(&codecMask, 1); + + IOBufOutputStream out(*(arena.pool()), nullptr, arena.size()); + writeInt32(&out, streams.size()); + + for (auto& stream : streams) { + stream->flush(&out); + } + + const int32_t uncompressedSize = out.tellp(); + VELOX_CHECK_LE( + uncompressedSize, + codec.maxUncompressedLength(), + "UncompressedSize exceeds limit"); + auto compressed = codec.compress(out.getIOBuf().get()); + const int32_t compressedSize = compressed->length(); + writeInt32(output, uncompressedSize); + writeInt32(output, compressedSize); + const int32_t crcOffset = output->tellp(); + writeInt64(output, 0); // Write zero checksum + // Number of columns and stream content. Unpause CRC. + if (listener) { + listener->resume(); + } + output->write( + reinterpret_cast(compressed->writableData()), + compressed->length()); + // Pause CRC computation + if (listener) { + listener->pause(); + } + const int32_t endSize = output->tellp(); + // Fill in crc + int64_t crc = 0; + if (listener) { + crc = computeChecksum(listener, codecMask, numRows, compressedSize); + } + output->seekp(crcOffset); + writeInt64(output, crc); + output->seekp(endSize); +} + +// Writes the contents to 'out' in wire format +void flushStreams( + const std::vector>& streams, + int32_t numRows, + const StreamArena& arena, + folly::io::Codec& codec, + OutputStream* out) { + auto listener = dynamic_cast(out->listener()); + // Reset CRC computation + if (listener) { + listener->reset(); + } + + if (!needCompression(codec)) { + flushUncompressed(streams, numRows, out, listener); + } else { + flushCompressed(streams, arena, codec, numRows, out, listener); + } +} + +class PrestoBatchVectorSerializer : public BatchVectorSerializer { + public: + PrestoBatchVectorSerializer( + memory::MemoryPool* pool, + bool useLosslessTimestamp, + common::CompressionKind compressionKind) + : pool_(pool), + useLosslessTimestamp_(useLosslessTimestamp), + codec_(common::compressionKindToCodec(compressionKind)) {} + + void serialize( + const RowVectorPtr& vector, + const folly::Range& ranges, + Scratch& /* scratch */, + OutputStream* stream) override { + const auto numRows = rangesTotalSize(ranges); + const auto rowType = vector->type(); + const auto numChildren = vector->childrenSize(); + + StreamArena arena(pool_); + std::vector> streams(numChildren); + for (int i = 0; i < numChildren; i++) { + streams[i] = std::make_unique( + rowType->childAt(i), + std::nullopt, + vector->childAt(i), + &arena, + numRows, + useLosslessTimestamp_); + + serializeColumn(vector->childAt(i).get(), ranges, streams[i].get()); + } + + flushStreams(streams, numRows, arena, *codec_, stream); + } + + private: + memory::MemoryPool* pool_; + const bool useLosslessTimestamp_; + const std::unique_ptr codec_; +}; + class PrestoVectorSerializer : public VectorSerializer { public: PrestoVectorSerializer( @@ -3170,7 +3360,7 @@ class PrestoVectorSerializer : public VectorSerializer { // numRows(4) | codec(1) | uncompressedSize(4) | compressedSize(4) | // checksum(8) | data void flush(OutputStream* out) override { - flushInternal(numRows_, out); + flushStreams(streams_, numRows_, *streamArena_, *codec_, out); } void flushEncoded(const RowVectorPtr& vector, OutputStream* out) { @@ -3180,141 +3370,10 @@ class PrestoVectorSerializer : public VectorSerializer { Scratch scratch; append(vector, folly::Range(ranges.data(), ranges.size()), scratch); - flushInternal(vector->size(), out); + flushStreams(streams_, vector->size(), *streamArena_, *codec_, out); } private: - void flushUncompressed( - int32_t numRows, - OutputStream* out, - PrestoOutputStreamListener* listener) { - int32_t offset = out->tellp(); - - char codec = 0; - if (listener) { - codec = getCodecMarker(); - } - // Pause CRC computation - if (listener) { - listener->pause(); - } - - writeInt32(out, numRows); - out->write(&codec, 1); - - // Make space for uncompressedSizeInBytes & sizeInBytes - writeInt32(out, 0); - writeInt32(out, 0); - // Write zero checksum. - writeInt64(out, 0); - - // Number of columns and stream content. Unpause CRC. - if (listener) { - listener->resume(); - } - writeInt32(out, streams_.size()); - - for (auto& stream : streams_) { - stream->flush(out); - } - - // Pause CRC computation - if (listener) { - listener->pause(); - } - - // Fill in uncompressedSizeInBytes & sizeInBytes - int32_t size = (int32_t)out->tellp() - offset; - int32_t uncompressedSize = size - kHeaderSize; - int64_t crc = 0; - if (listener) { - crc = computeChecksum(listener, codec, numRows, uncompressedSize); - } - - out->seekp(offset + kSizeInBytesOffset); - writeInt32(out, uncompressedSize); - writeInt32(out, uncompressedSize); - writeInt64(out, crc); - out->seekp(offset + size); - } - - void flushCompressed( - int32_t numRows, - OutputStream* output, - PrestoOutputStreamListener* listener) { - const int32_t offset = output->tellp(); - char codec = kCompressedBitMask; - if (listener) { - codec |= kCheckSumBitMask; - } - - // Pause CRC computation - if (listener) { - listener->pause(); - } - - writeInt32(output, numRows); - output->write(&codec, 1); - - IOBufOutputStream out( - *(streamArena_->pool()), nullptr, streamArena_->size()); - writeInt32(&out, streams_.size()); - - for (auto& stream : streams_) { - stream->flush(&out); - } - - const int32_t uncompressedSize = out.tellp(); - VELOX_CHECK_LE( - uncompressedSize, - codec_->maxUncompressedLength(), - "UncompressedSize exceeds limit"); - auto compressed = codec_->compress(out.getIOBuf().get()); - const int32_t compressedSize = compressed->length(); - writeInt32(output, uncompressedSize); - writeInt32(output, compressedSize); - const int32_t crcOffset = output->tellp(); - writeInt64(output, 0); // Write zero checksum - // Number of columns and stream content. Unpause CRC. - if (listener) { - listener->resume(); - } - output->write( - reinterpret_cast(compressed->writableData()), - compressed->length()); - // Pause CRC computation - if (listener) { - listener->pause(); - } - const int32_t endSize = output->tellp(); - // Fill in crc - int64_t crc = 0; - if (listener) { - crc = computeChecksum(listener, codec, numRows, compressedSize); - } - output->seekp(crcOffset); - writeInt64(output, crc); - output->seekp(endSize); - } - - // Writes the contents to 'stream' in wire format - void flushInternal(int32_t numRows, OutputStream* out) { - auto listener = dynamic_cast(out->listener()); - // Reset CRC computation - if (listener) { - listener->reset(); - } - - if (!needCompression(*codec_)) { - flushUncompressed(numRows, out, listener); - } else { - flushCompressed(numRows, out, listener); - } - } - - static const int32_t kSizeInBytesOffset{4 + 1}; - static const int32_t kHeaderSize{kSizeInBytesOffset + 4 + 4 + 8}; - StreamArena* const streamArena_; const std::unique_ptr codec_; @@ -3354,7 +3413,15 @@ std::unique_ptr PrestoVectorSerde::createSerializer( prestoOptions.compressionKind); } -void PrestoVectorSerde::serializeEncoded( +std::unique_ptr PrestoVectorSerde::createBatchSerializer( + memory::MemoryPool* pool, + const Options* options) { + const auto prestoOptions = toPrestoOptions(options); + return std::make_unique( + pool, prestoOptions.useLosslessTimestamp, prestoOptions.compressionKind); +} + +void PrestoVectorSerde::deprecatedSerializeEncoded( const RowVectorPtr& vector, StreamArena* streamArena, const Options* options, diff --git a/velox/serializers/PrestoSerializer.h b/velox/serializers/PrestoSerializer.h index dfc647ac7c0e..7c3202bdd6b0 100644 --- a/velox/serializers/PrestoSerializer.h +++ b/velox/serializers/PrestoSerializer.h @@ -35,10 +35,9 @@ namespace facebook::velox::serializer::presto { /// efficient for a selective subset, e.g. when splitting a vector to a large /// number of output shuffle destinations. /// -/// 2. To serialize a single RowVector, one can use the serializeEncoded() -/// method. Since it serializes a single RowVector, it tries to preserve the -/// encodings of the input data. Check the method documentation below to learn -/// about the cases in which encodings are preserved. +/// 2. To serialize a single RowVector, one can use the BatchVectorSerializer +/// returned by createBatchSerializer(). Since it serializes a single RowVector, +/// it tries to preserve the encodings of the input data. class PrestoVectorSerde : public VectorSerde { public: // Input options that the serializer recognizes. @@ -83,6 +82,13 @@ class PrestoVectorSerde : public VectorSerde { StreamArena* streamArena, const Options* options) override; + /// Note that in addition to the differences highlighted in the VectorSerde + /// interface, BatchVectorSerializer returned by this function can maintain + /// the encodings of the input vectors recursively. + std::unique_ptr createBatchSerializer( + memory::MemoryPool* pool, + const Options* options) override; + /// Serializes a single RowVector with possibly encoded children, preserving /// their encodings. Encodings are preserved recursively for any RowVector /// children, but not for children of other nested vectors such as Array, Map, @@ -93,7 +99,10 @@ class PrestoVectorSerde : public VectorSerde { /// /// In order to override the encodings of top-level columns in the RowVector, /// you can specifiy the encodings using PrestoOptions.encodings - void serializeEncoded( + /// + /// DEPRECATED: Use createBatchSerializer and the BatchVectorSerializer's + /// serialize function instead. + void deprecatedSerializeEncoded( const RowVectorPtr& vector, StreamArena* streamArena, const Options* options, diff --git a/velox/serializers/tests/PrestoSerializerTest.cpp b/velox/serializers/tests/PrestoSerializerTest.cpp index d6ef26871900..8922eb8c106b 100644 --- a/velox/serializers/tests/PrestoSerializerTest.cpp +++ b/velox/serializers/tests/PrestoSerializerTest.cpp @@ -265,7 +265,7 @@ class PrestoSerializerTest paramOptions.encodings.push_back(child->encoding()); } - serde_->serializeEncoded(rowVector, &arena, ¶mOptions, &out); + serde_->deprecatedSerializeEncoded(rowVector, &arena, ¶mOptions, &out); } void assertEqualEncoding( @@ -283,19 +283,16 @@ class PrestoSerializerTest } } - void testEncodedRoundTrip( - const RowVectorPtr& data, - const serializer::presto::PrestoVectorSerde::PrestoOptions* serdeOptions = - nullptr) { - std::ostringstream out; - serializeEncoded(data, &out, serdeOptions); - const auto serialized = out.str(); - - auto rowType = asRowType(data->type()); + void verifySerializedEncodedData( + const RowVectorPtr& original, + const std::string& serialized, + const serializer::presto::PrestoVectorSerde::PrestoOptions* + serdeOptions) { + auto rowType = asRowType(original->type()); auto deserialized = deserialize(rowType, serialized, serdeOptions); - assertEqualVectors(data, deserialized); - assertEqualEncoding(data, deserialized); + assertEqualVectors(original, deserialized); + assertEqualEncoding(original, deserialized); // Deserialize 3 times while appending to a single vector. auto paramOptions = getParamSerdeOptions(serdeOptions); @@ -308,14 +305,84 @@ class PrestoSerializerTest offset = result->size(); } - auto expected = BaseVector::create(data->type(), data->size() * 3, pool()); + auto expected = + BaseVector::create(original->type(), original->size() * 3, pool()); for (auto i = 0; i < 3; ++i) { - expected->copy(data.get(), data->size() * i, 0, data->size()); + expected->copy(original.get(), original->size() * i, 0, original->size()); } assertEqualVectors(expected, result); } + void testEncodedRoundTrip( + const RowVectorPtr& data, + const serializer::presto::PrestoVectorSerde::PrestoOptions* serdeOptions = + nullptr) { + std::ostringstream out; + serializeEncoded(data, &out, serdeOptions); + const auto serialized = out.str(); + + verifySerializedEncodedData(data, serialized, serdeOptions); + } + + void serializeBatch( + const RowVectorPtr& rowVector, + std::ostream* output, + const serializer::presto::PrestoVectorSerde::PrestoOptions* + serdeOptions) { + facebook::velox::serializer::presto::PrestoOutputStreamListener listener; + OStreamOutputStream out(output, &listener); + auto paramOptions = getParamSerdeOptions(serdeOptions); + + auto serializer = serde_->createBatchSerializer(pool_.get(), ¶mOptions); + serializer->serialize(rowVector, &out); + } + + void testBatchVectorSerializerRoundTrip( + const RowVectorPtr& data, + const serializer::presto::PrestoVectorSerde::PrestoOptions* serdeOptions = + nullptr) { + std::ostringstream out; + serializeBatch(data, &out, serdeOptions); + const auto serialized = out.str(); + + verifySerializedEncodedData(data, serialized, serdeOptions); + } + + RowVectorPtr encodingsTestVector() { + auto baseNoNulls = makeFlatVector({1, 2, 3, 4}); + auto baseWithNulls = + makeNullableFlatVector({1, std::nullopt, 2, 3}); + auto baseArray = + makeArrayVector({{1, 2, 3}, {}, {4, 5}, {6, 7, 8, 9, 10}}); + auto indices = makeIndices(8, [](auto row) { return row / 2; }); + + return makeRowVector({ + BaseVector::wrapInDictionary(nullptr, indices, 8, baseNoNulls), + BaseVector::wrapInDictionary(nullptr, indices, 8, baseWithNulls), + BaseVector::wrapInDictionary(nullptr, indices, 8, baseArray), + BaseVector::createConstant(INTEGER(), 123, 8, pool_.get()), + BaseVector::createNullConstant(VARCHAR(), 8, pool_.get()), + BaseVector::wrapInConstant(8, 1, baseArray), + BaseVector::wrapInConstant(8, 2, baseArray), + makeRowVector({ + BaseVector::wrapInDictionary(nullptr, indices, 8, baseNoNulls), + BaseVector::wrapInDictionary(nullptr, indices, 8, baseWithNulls), + BaseVector::wrapInDictionary(nullptr, indices, 8, baseArray), + BaseVector::createConstant(INTEGER(), 123, 8, pool_.get()), + BaseVector::createNullConstant(VARCHAR(), 8, pool_.get()), + BaseVector::wrapInConstant(8, 1, baseArray), + BaseVector::wrapInConstant(8, 2, baseArray), + makeRowVector({ + BaseVector::wrapInDictionary( + nullptr, indices, 8, baseWithNulls), + BaseVector::createConstant(INTEGER(), 123, 8, pool_.get()), + BaseVector::wrapInConstant(8, 2, baseArray), + }), + }), + }); + } + std::unique_ptr serde_; }; @@ -515,37 +582,13 @@ TEST_P(PrestoSerializerTest, longDecimal) { // Test that hierarchically encoded columns (rows) have their encodings // preserved. TEST_P(PrestoSerializerTest, encodings) { - auto baseNoNulls = makeFlatVector({1, 2, 3, 4}); - auto baseWithNulls = makeNullableFlatVector({1, std::nullopt, 2, 3}); - auto baseArray = - makeArrayVector({{1, 2, 3}, {}, {4, 5}, {6, 7, 8, 9, 10}}); - auto indices = makeIndices(8, [](auto row) { return row / 2; }); - - auto data = makeRowVector({ - BaseVector::wrapInDictionary(nullptr, indices, 8, baseNoNulls), - BaseVector::wrapInDictionary(nullptr, indices, 8, baseWithNulls), - BaseVector::wrapInDictionary(nullptr, indices, 8, baseArray), - BaseVector::createConstant(INTEGER(), 123, 8, pool_.get()), - BaseVector::createNullConstant(VARCHAR(), 8, pool_.get()), - BaseVector::wrapInConstant(8, 1, baseArray), - BaseVector::wrapInConstant(8, 2, baseArray), - makeRowVector({ - BaseVector::wrapInDictionary(nullptr, indices, 8, baseNoNulls), - BaseVector::wrapInDictionary(nullptr, indices, 8, baseWithNulls), - BaseVector::wrapInDictionary(nullptr, indices, 8, baseArray), - BaseVector::createConstant(INTEGER(), 123, 8, pool_.get()), - BaseVector::createNullConstant(VARCHAR(), 8, pool_.get()), - BaseVector::wrapInConstant(8, 1, baseArray), - BaseVector::wrapInConstant(8, 2, baseArray), - makeRowVector({ - BaseVector::wrapInDictionary(nullptr, indices, 8, baseWithNulls), - BaseVector::createConstant(INTEGER(), 123, 8, pool_.get()), - BaseVector::wrapInConstant(8, 2, baseArray), - }), - }), - }); + testEncodedRoundTrip(encodingsTestVector()); +} - testEncodedRoundTrip(data); +// Test that hierarchically encoded columns (rows) have their encodings +// preserved by the PrestoBatchVectorSerializer. +TEST_P(PrestoSerializerTest, encodingsBatchVectorSerializer) { + testBatchVectorSerializerRoundTrip(encodingsTestVector()); } TEST_P(PrestoSerializerTest, scatterEncoded) { diff --git a/velox/vector/VectorStream.cpp b/velox/vector/VectorStream.cpp index fec8eb7a9fd1..758874c11239 100644 --- a/velox/vector/VectorStream.cpp +++ b/velox/vector/VectorStream.cpp @@ -18,14 +18,40 @@ #include "velox/common/base/RawVector.h" namespace facebook::velox { - -void VectorSerializer::append(const RowVectorPtr& vector) { - const IndexRange allRows{0, vector->size()}; - Scratch scratch; - append(vector, folly::Range(&allRows, 1), scratch); -} - namespace { +// An adapter class that can be used to convert a VectorSerializer into a +// BatchVectorSerializer for VectorSerdes that don't want to implement a +// separate serializer. +class DefaultBatchVectorSerializer : public BatchVectorSerializer { + public: + DefaultBatchVectorSerializer( + memory::MemoryPool* pool, + VectorSerde* serde, + const VectorSerde::Options* options) + : pool_(pool), serde_(serde), options_(options) {} + + void serialize( + const RowVectorPtr& vector, + const folly::Range& ranges, + Scratch& scratch, + OutputStream* stream) override { + size_t numRows = 0; + for (const auto& range : ranges) { + numRows += range.size; + } + + StreamArena arena(pool_); + auto serializer = serde_->createSerializer( + asRowType(vector->type()), numRows, &arena, options_); + serializer->append(vector, ranges, scratch); + serializer->flush(stream); + } + + private: + memory::MemoryPool* const pool_; + VectorSerde* const serde_; + const VectorSerde::Options* const options_; +}; std::unique_ptr& getVectorSerdeImpl() { static std::unique_ptr serde; @@ -41,6 +67,25 @@ getNamedVectorSerdeImpl() { } // namespace +void VectorSerializer::append(const RowVectorPtr& vector) { + const IndexRange allRows{0, vector->size()}; + Scratch scratch; + append(vector, folly::Range(&allRows, 1), scratch); +} + +void BatchVectorSerializer::serialize( + const RowVectorPtr& vector, + OutputStream* stream) { + const IndexRange allRows{0, vector->size()}; + serialize(vector, folly::Range(&allRows, 1), stream); +} + +std::unique_ptr VectorSerde::createBatchSerializer( + memory::MemoryPool* pool, + const Options* options) { + return std::make_unique(pool, this, options); +} + VectorSerde* getVectorSerde() { auto serde = getVectorSerdeImpl().get(); VELOX_CHECK_NOT_NULL(serde, "Vector serde is not registered."); diff --git a/velox/vector/VectorStream.h b/velox/vector/VectorStream.h index 5f1899195a73..ef9a38eb8094 100644 --- a/velox/vector/VectorStream.h +++ b/velox/vector/VectorStream.h @@ -33,6 +33,12 @@ struct IndexRange { vector_size_t size; }; +/// Serializer that can iteratively build up a buffer of serialized rows from +/// one or more RowVectors. +/// +/// Uses successive calls to `append` to add more rows to the serialization +/// buffer. Then call `flush` to write the aggregate serialized data to an +/// OutputStream. class VectorSerializer { public: virtual ~VectorSerializer() = default; @@ -82,6 +88,34 @@ class VectorSerializer { virtual void flush(OutputStream* stream) = 0; }; +/// Serializer that writes a subset of rows from a single RowVector to the +/// OutputStream. +/// +/// Each serialize() call serializes the specified range(s) of `vector` and +/// write them to the output stream. +class BatchVectorSerializer { + public: + virtual ~BatchVectorSerializer() = default; + + /// Serializes a subset of rows in a vector. + virtual void serialize( + const RowVectorPtr& vector, + const folly::Range& ranges, + Scratch& scratch, + OutputStream* stream) = 0; + + virtual void serialize( + const RowVectorPtr& vector, + const folly::Range& ranges, + OutputStream* stream) { + Scratch scratch; + serialize(vector, ranges, scratch, stream); + } + + /// Serializes all rows in a vector. + void serialize(const RowVectorPtr& vector, OutputStream* stream); +}; + class VectorSerde { public: virtual ~VectorSerde() = default; @@ -119,12 +153,27 @@ class VectorSerde { estimateSerializedSize(vector, ranges, sizes, scratch); } + /// Creates a Vector Serializer that iteratively builds up a buffer of + /// serialized rows from one or more RowVectors via append, and then writes to + /// an OutputSteam via flush. + /// + /// This is more appropriate if the use case involves many small writes, e.g. + /// partitioning a RowVector across multiple destinations. virtual std::unique_ptr createSerializer( RowTypePtr type, int32_t numRows, StreamArena* streamArena, const Options* options = nullptr) = 0; + /// Creates a Vector Serializer that writes a subset of rows from a single + /// RowVector to the OutputStream via a single serialize API. + /// + /// This is more appropriate if the use case involves large writes, e.g. + /// sending an entire RowVector to a particular destination. + virtual std::unique_ptr createBatchSerializer( + memory::MemoryPool* pool, + const Options* options = nullptr); + virtual void deserialize( ByteInputStream* source, velox::memory::MemoryPool* pool,