Skip to content

Commit

Permalink
Introduce BatchVectorSerializer to better support preserving encoding…
Browse files Browse the repository at this point in the history
…s when serializing Vectors (facebookincubator#8567)

Summary:
Pull Request resolved: facebookincubator#8567

High Level Goal:
Support preserving encodings in serialized Vectors from PartitionedOutput when the "arbitrary" partitioning scheme is
enabled.  This is specifically for Presto, in order to match the Presto Java behavior, and because we've seen in practice this
can dramatically reduce the amount of shuffled data improving performance.

This change:
Looking at the way PrestoVectorSerializer handles encodings, it's currently not generic, and it looks very fragile.  E.g. we need
to make sure that flush is called after every append, which is currently impossible at compile time, and I'm not 100% confident
is always guaranteed at run time.

Since we now need to expose this through the generic VectorSerde interface, I figured it was a good time to clean it up a little
before it's further cemented.  To do this, I've added a new interface BatchVectorSerializer which only supports a single API
serialize which effectively combines append and flush so we guarantee that only rows from a single Vector are written for
every flush.  Thanks to this it also doesn't have to maintain the streams across calls, so it doesn't need the
VectorStreamGroup.

In the PrestoBatchVectorSerializer implementation, we can take advantage of this to set the encodings based on the single
RowVector we're serializing on each call.

My hope is with this, we can remove PrestoVectorSerde::serializeEncoded replacing it with BatchVectorSerializer, and also
remove the notion of encodings from PrestoVectorSerializer so we don't need to worry about calling append twice without a
flush on a PrestoVectorSerializer initialized with encodings.  If there's general agreement on this approach I'll do that in a
follow up shortly after this.

This will also give me a bit more isolation to implement the rest of the features that are needed for the high level goal
(calculating the serialized size of Vector slices with encodings, splitting very large RowVectors while maintaining encodings,
etc.) in order to match the Presto Java behavior, without breaking the existing serialization logic.

Reviewed By: xiaoxmeng

Differential Revision: D53144805

fbshipit-source-id: efbe557936f06c92bac9e3e926add1943712cda3
  • Loading branch information
Kevin Wilfong authored and facebook-github-bot committed Feb 2, 2024
1 parent e8da627 commit b8871b1
Show file tree
Hide file tree
Showing 5 changed files with 403 additions and 190 deletions.
335 changes: 201 additions & 134 deletions velox/serializers/PrestoSerializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,19 @@ namespace {
constexpr int8_t kCompressedBitMask = 1;
constexpr int8_t kEncryptedBitMask = 2;
constexpr int8_t kCheckSumBitMask = 4;
// uncompressed size comes after the number of rows and the codec
constexpr int32_t kSizeInBytesOffset{4 + 1};
// There header for a page is:
// + number of rows (4 bytes)
// + codec (1 byte)
// + uncompressed size (4 bytes)
// + size (4 bytes) (this is the compressed size if the data is compressed,
// otherwise it's uncompressed size again)
// + checksum (8 bytes)
//
// See https://prestodb.io/docs/current/develop/serialized-page.html for a
// detailed specification of the format.
constexpr int32_t kHeaderSize{kSizeInBytesOffset + 4 + 4 + 8};
static inline const std::string_view kRLE{"RLE"};
static inline const std::string_view kDictionary{"DICTIONARY"};

Expand Down Expand Up @@ -3069,6 +3082,183 @@ void estimateSerializedSizeInt(
}
}

void flushUncompressed(
const std::vector<std::unique_ptr<VectorStream>>& streams,
int32_t numRows,
OutputStream* out,
PrestoOutputStreamListener* listener) {
int32_t offset = out->tellp();

char codecMask = 0;
if (listener) {
codecMask = getCodecMarker();
}
// Pause CRC computation
if (listener) {
listener->pause();
}

writeInt32(out, numRows);
out->write(&codecMask, 1);

// Make space for uncompressedSizeInBytes & sizeInBytes
writeInt32(out, 0);
writeInt32(out, 0);
// Write zero checksum.
writeInt64(out, 0);

// Number of columns and stream content. Unpause CRC.
if (listener) {
listener->resume();
}
writeInt32(out, streams.size());

for (auto& stream : streams) {
stream->flush(out);
}

// Pause CRC computation
if (listener) {
listener->pause();
}

// Fill in uncompressedSizeInBytes & sizeInBytes
int32_t size = (int32_t)out->tellp() - offset;
int32_t uncompressedSize = size - kHeaderSize;
int64_t crc = 0;
if (listener) {
crc = computeChecksum(listener, codecMask, numRows, uncompressedSize);
}

out->seekp(offset + kSizeInBytesOffset);
writeInt32(out, uncompressedSize);
writeInt32(out, uncompressedSize);
writeInt64(out, crc);
out->seekp(offset + size);
}

void flushCompressed(
const std::vector<std::unique_ptr<VectorStream>>& streams,
const StreamArena& arena,
folly::io::Codec& codec,
int32_t numRows,
OutputStream* output,
PrestoOutputStreamListener* listener) {
char codecMask = kCompressedBitMask;
if (listener) {
codecMask |= kCheckSumBitMask;
}

// Pause CRC computation
if (listener) {
listener->pause();
}

writeInt32(output, numRows);
output->write(&codecMask, 1);

IOBufOutputStream out(*(arena.pool()), nullptr, arena.size());
writeInt32(&out, streams.size());

for (auto& stream : streams) {
stream->flush(&out);
}

const int32_t uncompressedSize = out.tellp();
VELOX_CHECK_LE(
uncompressedSize,
codec.maxUncompressedLength(),
"UncompressedSize exceeds limit");
auto compressed = codec.compress(out.getIOBuf().get());
const int32_t compressedSize = compressed->length();
writeInt32(output, uncompressedSize);
writeInt32(output, compressedSize);
const int32_t crcOffset = output->tellp();
writeInt64(output, 0); // Write zero checksum
// Number of columns and stream content. Unpause CRC.
if (listener) {
listener->resume();
}
output->write(
reinterpret_cast<const char*>(compressed->writableData()),
compressed->length());
// Pause CRC computation
if (listener) {
listener->pause();
}
const int32_t endSize = output->tellp();
// Fill in crc
int64_t crc = 0;
if (listener) {
crc = computeChecksum(listener, codecMask, numRows, compressedSize);
}
output->seekp(crcOffset);
writeInt64(output, crc);
output->seekp(endSize);
}

// Writes the contents to 'out' in wire format
void flushStreams(
const std::vector<std::unique_ptr<VectorStream>>& streams,
int32_t numRows,
const StreamArena& arena,
folly::io::Codec& codec,
OutputStream* out) {
auto listener = dynamic_cast<PrestoOutputStreamListener*>(out->listener());
// Reset CRC computation
if (listener) {
listener->reset();
}

if (!needCompression(codec)) {
flushUncompressed(streams, numRows, out, listener);
} else {
flushCompressed(streams, arena, codec, numRows, out, listener);
}
}

class PrestoBatchVectorSerializer : public BatchVectorSerializer {
public:
PrestoBatchVectorSerializer(
memory::MemoryPool* pool,
bool useLosslessTimestamp,
common::CompressionKind compressionKind)
: pool_(pool),
useLosslessTimestamp_(useLosslessTimestamp),
codec_(common::compressionKindToCodec(compressionKind)) {}

void serialize(
const RowVectorPtr& vector,
const folly::Range<const IndexRange*>& ranges,
Scratch& /* scratch */,
OutputStream* stream) override {
const auto numRows = rangesTotalSize(ranges);
const auto rowType = vector->type();
const auto numChildren = vector->childrenSize();

StreamArena arena(pool_);
std::vector<std::unique_ptr<VectorStream>> streams(numChildren);
for (int i = 0; i < numChildren; i++) {
streams[i] = std::make_unique<VectorStream>(
rowType->childAt(i),
std::nullopt,
vector->childAt(i),
&arena,
numRows,
useLosslessTimestamp_);

serializeColumn(vector->childAt(i).get(), ranges, streams[i].get());
}

flushStreams(streams, numRows, arena, *codec_, stream);
}

private:
memory::MemoryPool* pool_;
const bool useLosslessTimestamp_;
const std::unique_ptr<folly::io::Codec> codec_;
};

class PrestoVectorSerializer : public VectorSerializer {
public:
PrestoVectorSerializer(
Expand Down Expand Up @@ -3170,7 +3360,7 @@ class PrestoVectorSerializer : public VectorSerializer {
// numRows(4) | codec(1) | uncompressedSize(4) | compressedSize(4) |
// checksum(8) | data
void flush(OutputStream* out) override {
flushInternal(numRows_, out);
flushStreams(streams_, numRows_, *streamArena_, *codec_, out);
}

void flushEncoded(const RowVectorPtr& vector, OutputStream* out) {
Expand All @@ -3180,141 +3370,10 @@ class PrestoVectorSerializer : public VectorSerializer {
Scratch scratch;
append(vector, folly::Range(ranges.data(), ranges.size()), scratch);

flushInternal(vector->size(), out);
flushStreams(streams_, vector->size(), *streamArena_, *codec_, out);
}

private:
void flushUncompressed(
int32_t numRows,
OutputStream* out,
PrestoOutputStreamListener* listener) {
int32_t offset = out->tellp();

char codec = 0;
if (listener) {
codec = getCodecMarker();
}
// Pause CRC computation
if (listener) {
listener->pause();
}

writeInt32(out, numRows);
out->write(&codec, 1);

// Make space for uncompressedSizeInBytes & sizeInBytes
writeInt32(out, 0);
writeInt32(out, 0);
// Write zero checksum.
writeInt64(out, 0);

// Number of columns and stream content. Unpause CRC.
if (listener) {
listener->resume();
}
writeInt32(out, streams_.size());

for (auto& stream : streams_) {
stream->flush(out);
}

// Pause CRC computation
if (listener) {
listener->pause();
}

// Fill in uncompressedSizeInBytes & sizeInBytes
int32_t size = (int32_t)out->tellp() - offset;
int32_t uncompressedSize = size - kHeaderSize;
int64_t crc = 0;
if (listener) {
crc = computeChecksum(listener, codec, numRows, uncompressedSize);
}

out->seekp(offset + kSizeInBytesOffset);
writeInt32(out, uncompressedSize);
writeInt32(out, uncompressedSize);
writeInt64(out, crc);
out->seekp(offset + size);
}

void flushCompressed(
int32_t numRows,
OutputStream* output,
PrestoOutputStreamListener* listener) {
const int32_t offset = output->tellp();
char codec = kCompressedBitMask;
if (listener) {
codec |= kCheckSumBitMask;
}

// Pause CRC computation
if (listener) {
listener->pause();
}

writeInt32(output, numRows);
output->write(&codec, 1);

IOBufOutputStream out(
*(streamArena_->pool()), nullptr, streamArena_->size());
writeInt32(&out, streams_.size());

for (auto& stream : streams_) {
stream->flush(&out);
}

const int32_t uncompressedSize = out.tellp();
VELOX_CHECK_LE(
uncompressedSize,
codec_->maxUncompressedLength(),
"UncompressedSize exceeds limit");
auto compressed = codec_->compress(out.getIOBuf().get());
const int32_t compressedSize = compressed->length();
writeInt32(output, uncompressedSize);
writeInt32(output, compressedSize);
const int32_t crcOffset = output->tellp();
writeInt64(output, 0); // Write zero checksum
// Number of columns and stream content. Unpause CRC.
if (listener) {
listener->resume();
}
output->write(
reinterpret_cast<const char*>(compressed->writableData()),
compressed->length());
// Pause CRC computation
if (listener) {
listener->pause();
}
const int32_t endSize = output->tellp();
// Fill in crc
int64_t crc = 0;
if (listener) {
crc = computeChecksum(listener, codec, numRows, compressedSize);
}
output->seekp(crcOffset);
writeInt64(output, crc);
output->seekp(endSize);
}

// Writes the contents to 'stream' in wire format
void flushInternal(int32_t numRows, OutputStream* out) {
auto listener = dynamic_cast<PrestoOutputStreamListener*>(out->listener());
// Reset CRC computation
if (listener) {
listener->reset();
}

if (!needCompression(*codec_)) {
flushUncompressed(numRows, out, listener);
} else {
flushCompressed(numRows, out, listener);
}
}

static const int32_t kSizeInBytesOffset{4 + 1};
static const int32_t kHeaderSize{kSizeInBytesOffset + 4 + 4 + 8};

StreamArena* const streamArena_;
const std::unique_ptr<folly::io::Codec> codec_;

Expand Down Expand Up @@ -3354,7 +3413,15 @@ std::unique_ptr<VectorSerializer> PrestoVectorSerde::createSerializer(
prestoOptions.compressionKind);
}

void PrestoVectorSerde::serializeEncoded(
std::unique_ptr<BatchVectorSerializer> PrestoVectorSerde::createBatchSerializer(
memory::MemoryPool* pool,
const Options* options) {
const auto prestoOptions = toPrestoOptions(options);
return std::make_unique<PrestoBatchVectorSerializer>(
pool, prestoOptions.useLosslessTimestamp, prestoOptions.compressionKind);
}

void PrestoVectorSerde::deprecatedSerializeEncoded(
const RowVectorPtr& vector,
StreamArena* streamArena,
const Options* options,
Expand Down
Loading

0 comments on commit b8871b1

Please sign in to comment.