Skip to content

Commit

Permalink
ORC-1730: [C++] Add finishEncode support for the encoder
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
Add finishEncode() to the RLE encoder and implement finishStream()  in BufferedOutputStream / compressionStream.

### Why are the changes needed?
We expect to finish encoding when the compression block is aligned with the row group boundary.

### How was this patch tested?
Uts in testRleEncode() can cover this patch.

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #1956 from luffy-zh/ORC-1264.

Authored-by: luffy-zh <[email protected]>
Signed-off-by: ffacs <[email protected]>
  • Loading branch information
luffy-zh authored and ffacs committed Jun 20, 2024
1 parent 7b6df29 commit 8003801
Show file tree
Hide file tree
Showing 12 changed files with 95 additions and 30 deletions.
9 changes: 9 additions & 0 deletions c++/src/ByteRLE.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ namespace orc {

virtual void suppress() override;

virtual void finishEncode() override;

/**
* Reset to initial state
*/
Expand Down Expand Up @@ -216,6 +218,13 @@ namespace orc {
reset();
}

void ByteRleEncoderImpl::finishEncode() {
writeValues();
outputStream->BackUp(bufferLength - bufferPosition);
outputStream->finishStream();
bufferLength = bufferPosition = 0;
}

std::unique_ptr<ByteRleEncoder> createByteRleEncoder(
std::unique_ptr<BufferedOutputStream> output) {
return std::make_unique<ByteRleEncoderImpl>(std::move(output));
Expand Down
7 changes: 7 additions & 0 deletions c++/src/ByteRLE.hh
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,13 @@ namespace orc {
* suppress the data and reset to initial state
*/
virtual void suppress() = 0;

/**
* Finalize the encoding process. This function should be called after all data required for
* encoding has been added. It ensures that any remaining data is processed and the final state
* of the encoder is set.
*/
virtual void finishEncode() = 0;
};

class ByteRleDecoder {
Expand Down
10 changes: 10 additions & 0 deletions c++/src/Compression.cc
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ namespace orc {
}
virtual uint64_t getSize() const override;
virtual uint64_t getRawInputBufferSize() const override = 0;
virtual void finishStream() override = 0;

protected:
void writeData(const unsigned char* data, int size);
Expand Down Expand Up @@ -173,6 +174,9 @@ namespace orc {
uint64_t getRawInputBufferSize() const override {
return rawInputBuffer.size();
}
virtual void finishStream() override {
compressInternal();
}

protected:
// return total compressed size
Expand Down Expand Up @@ -953,6 +957,8 @@ namespace orc {
return rawInputBuffer.size();
}

virtual void finishStream() override;

protected:
// compresses a block and returns the compressed size
virtual uint64_t doBlockCompression() = 0;
Expand Down Expand Up @@ -1024,6 +1030,10 @@ namespace orc {
BufferedOutputStream::suppress();
}

void BlockCompressionStream::finishStream() {
doBlockCompression();
}

/**
* LZ4 block compression
*/
Expand Down
6 changes: 6 additions & 0 deletions c++/src/RLE.cc
Original file line number Diff line number Diff line change
Expand Up @@ -121,4 +121,10 @@ namespace orc {
recorder->add(static_cast<uint64_t>(numLiterals));
}

void RleEncoder::finishEncode() {
outputStream->BackUp(static_cast<int>(bufferLength - bufferPosition));
outputStream->finishStream();
bufferLength = bufferPosition = 0;
}

} // namespace orc
7 changes: 7 additions & 0 deletions c++/src/RLE.hh
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,13 @@ namespace orc {

virtual void write(int64_t val) = 0;

/**
* Finalize the encoding process. This function should be called after all data required for
* encoding has been added. It ensures that any remaining data is processed and the final state
* of the encoder is set.
*/
virtual void finishEncode();

protected:
std::unique_ptr<BufferedOutputStream> outputStream;
size_t bufferPosition;
Expand Down
9 changes: 6 additions & 3 deletions c++/src/RLEv1.cc
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,8 @@ namespace orc {
}

uint64_t RleEncoderV1::flush() {
writeValues();
outputStream->BackUp(static_cast<int>(bufferLength - bufferPosition));
finishEncode();
uint64_t dataSize = outputStream->flush();
bufferLength = bufferPosition = 0;
return dataSize;
}

Expand Down Expand Up @@ -135,6 +133,11 @@ namespace orc {
}
}

void RleEncoderV1::finishEncode() {
writeValues();
RleEncoder::finishEncode();
}

signed char RleDecoderV1::readByte() {
SCOPED_MINUS_STOPWATCH(metrics, DecodingLatencyUs);
if (bufferStart_ == bufferEnd_) {
Expand Down
2 changes: 2 additions & 0 deletions c++/src/RLEv1.hh
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ namespace orc {

void write(int64_t val) override;

void finishEncode() override;

private:
int64_t delta_;
bool repeat_;
Expand Down
2 changes: 2 additions & 0 deletions c++/src/RLEv2.hh
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ namespace orc {

void write(int64_t val) override;

void finishEncode() override;

private:
const bool alignedBitPacking_;
uint32_t fixedRunLength_;
Expand Down
51 changes: 27 additions & 24 deletions c++/src/RleEncoderV2.cc
Original file line number Diff line number Diff line change
Expand Up @@ -440,31 +440,8 @@ namespace orc {
}

uint64_t RleEncoderV2::flush() {
if (numLiterals != 0) {
EncodingOption option = {};
if (variableRunLength_ != 0) {
determineEncoding(option);
writeValues(option);
} else if (fixedRunLength_ != 0) {
if (fixedRunLength_ < MIN_REPEAT) {
variableRunLength_ = fixedRunLength_;
fixedRunLength_ = 0;
determineEncoding(option);
writeValues(option);
} else if (fixedRunLength_ >= MIN_REPEAT && fixedRunLength_ <= MAX_SHORT_REPEAT_LENGTH) {
option.encoding = SHORT_REPEAT;
writeValues(option);
} else {
option.encoding = DELTA;
option.isFixedDelta = true;
writeValues(option);
}
}
}

outputStream->BackUp(static_cast<int>(bufferLength - bufferPosition));
finishEncode();
uint64_t dataSize = outputStream->flush();
bufferLength = bufferPosition = 0;
return dataSize;
}

Expand Down Expand Up @@ -779,4 +756,30 @@ namespace orc {
fixedRunLength_ = 1;
variableRunLength_ = 1;
}

void RleEncoderV2::finishEncode() {
if (numLiterals != 0) {
EncodingOption option = {};
if (variableRunLength_ != 0) {
determineEncoding(option);
writeValues(option);
} else if (fixedRunLength_ != 0) {
if (fixedRunLength_ < MIN_REPEAT) {
variableRunLength_ = fixedRunLength_;
fixedRunLength_ = 0;
determineEncoding(option);
writeValues(option);
} else if (fixedRunLength_ >= MIN_REPEAT && fixedRunLength_ <= MAX_SHORT_REPEAT_LENGTH) {
option.encoding = SHORT_REPEAT;
writeValues(option);
} else {
option.encoding = DELTA;
option.isFixedDelta = true;
writeValues(option);
}
}
}

RleEncoder::finishEncode();
}
} // namespace orc
4 changes: 4 additions & 0 deletions c++/src/io/OutputStream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ namespace orc {
}
}

void BufferedOutputStream::finishStream() {
// PASS
}

google::protobuf::int64 BufferedOutputStream::ByteCount() const {
return static_cast<google::protobuf::int64>(dataBuffer_->size());
}
Expand Down
1 change: 1 addition & 0 deletions c++/src/io/OutputStream.hh
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ namespace orc {
virtual bool isCompressed() const {
return false;
}
virtual void finishStream();
};
DIAGNOSTIC_POP

Expand Down
17 changes: 14 additions & 3 deletions c++/test/TestRleEncoder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -84,16 +84,21 @@ namespace orc {
std::make_unique<SeekableArrayInputStream>(memStream.getData(), memStream.getLength()),
isSinged, version, *getDefaultPool(), getDefaultReaderMetrics());

int64_t* decodedData = new int64_t[numValues];
decoder->next(decodedData, numValues, notNull);
std::vector<int64_t> decodedData(numValues);
decoder->next(decodedData.data(), numValues, notNull);

for (uint64_t i = 0; i < numValues; ++i) {
if (!notNull || notNull[i]) {
EXPECT_EQ(data[i], decodedData[i]);
}
}

delete[] decodedData;
decoder->next(decodedData.data(), numValues, notNull);
for (uint64_t i = 0; i < numValues; ++i) {
if (!notNull || notNull[i]) {
EXPECT_EQ(data[i], decodedData[i]);
}
}
}

std::unique_ptr<RleEncoder> RleTest::getEncoder(RleVersion version, MemoryOutputStream& memStream,
Expand Down Expand Up @@ -128,6 +133,9 @@ namespace orc {
char* notNull = numNulls == 0 ? nullptr : new char[numValues];
int64_t* data = new int64_t[numValues];
generateData(numValues, start, delta, random, data, numNulls, notNull);
encoder->add(data, numValues, notNull);
encoder->finishEncode();

encoder->add(data, numValues, notNull);
encoder->flush();

Expand Down Expand Up @@ -243,6 +251,9 @@ namespace orc {
MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);

std::unique_ptr<RleEncoder> encoder = getEncoder(RleVersion_2, memStream, isSigned);
encoder->add(data, numValues, nullptr);
encoder->finishEncode();

encoder->add(data, numValues, nullptr);
encoder->flush();

Expand Down

0 comments on commit 8003801

Please sign in to comment.