Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ORC-1730: [C++] Add finishEncode support for the encoder #1956

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions c++/src/ByteRLE.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ namespace orc {

virtual void suppress() override;

/**
* Finish encoding the stream.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The statement is a little bit confusing.

Copy link
Contributor Author

@luffy-zh luffy-zh Jun 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More comments are added to finishEncode function.

*/
virtual void finishEncode() override;

/**
* Reset to initial state
*/
Expand Down Expand Up @@ -216,6 +221,13 @@ namespace orc {
reset();
}

void ByteRleEncoderImpl::finishEncode() {
writeValues();
outputStream->BackUp(bufferLength - bufferPosition);
outputStream->finishStream();
bufferLength = bufferPosition = 0;
}

std::unique_ptr<ByteRleEncoder> createByteRleEncoder(
std::unique_ptr<BufferedOutputStream> output) {
return std::make_unique<ByteRleEncoderImpl>(std::move(output));
Expand Down
5 changes: 5 additions & 0 deletions c++/src/ByteRLE.hh
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ namespace orc {
* suppress the data and reset to initial state
*/
virtual void suppress() = 0;

/**
* finish current encoding
*/
virtual void finishEncode() = 0;
};

class ByteRleDecoder {
Expand Down
10 changes: 10 additions & 0 deletions c++/src/Compression.cc
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ namespace orc {
}
virtual uint64_t getSize() const override;
virtual uint64_t getRawInputBufferSize() const override = 0;
virtual void finishStream() override = 0;

protected:
void writeData(const unsigned char* data, int size);
Expand Down Expand Up @@ -173,6 +174,9 @@ namespace orc {
uint64_t getRawInputBufferSize() const override {
return rawInputBuffer.size();
}
virtual void finishStream() override {
compressInternal();
}

protected:
// return total compressed size
Expand Down Expand Up @@ -953,6 +957,8 @@ namespace orc {
return rawInputBuffer.size();
}

virtual void finishStream() override;

protected:
// compresses a block and returns the compressed size
virtual uint64_t doBlockCompression() = 0;
Expand Down Expand Up @@ -1024,6 +1030,10 @@ namespace orc {
BufferedOutputStream::suppress();
}

void BlockCompressionStream::finishStream() {
doBlockCompression();
}

/**
* LZ4 block compression
*/
Expand Down
6 changes: 6 additions & 0 deletions c++/src/RLE.cc
Original file line number Diff line number Diff line change
Expand Up @@ -121,4 +121,10 @@ namespace orc {
recorder->add(static_cast<uint64_t>(numLiterals));
}

void RleEncoder::finishEncode() {
outputStream->BackUp(static_cast<int>(bufferLength - bufferPosition));
outputStream->finishStream();
bufferLength = bufferPosition = 0;
}

} // namespace orc
2 changes: 2 additions & 0 deletions c++/src/RLE.hh
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ namespace orc {

virtual void write(int64_t val) = 0;

virtual void finishEncode();

protected:
std::unique_ptr<BufferedOutputStream> outputStream;
size_t bufferPosition;
Expand Down
5 changes: 5 additions & 0 deletions c++/src/RLEv1.cc
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can rewrite RleEncoderV1::flush() with finishEncode() to make it clearer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Completed the refactoring of RleEncoderV1::flush().

Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,11 @@ namespace orc {
}
}

void RleEncoderV1::finishEncode() {
writeValues();
RleEncoder::finishEncode();
}

signed char RleDecoderV1::readByte() {
SCOPED_MINUS_STOPWATCH(metrics, DecodingLatencyUs);
if (bufferStart_ == bufferEnd_) {
Expand Down
2 changes: 2 additions & 0 deletions c++/src/RLEv1.hh
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ namespace orc {

void write(int64_t val) override;

void finishEncode() override;

private:
int64_t delta_;
bool repeat_;
Expand Down
2 changes: 2 additions & 0 deletions c++/src/RLEv2.hh
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ namespace orc {

void write(int64_t val) override;

void finishEncode() override;

private:
const bool alignedBitPacking_;
uint32_t fixedRunLength_;
Expand Down
51 changes: 27 additions & 24 deletions c++/src/RleEncoderV2.cc
Original file line number Diff line number Diff line change
Expand Up @@ -440,31 +440,8 @@ namespace orc {
}

uint64_t RleEncoderV2::flush() {
if (numLiterals != 0) {
EncodingOption option = {};
if (variableRunLength_ != 0) {
determineEncoding(option);
writeValues(option);
} else if (fixedRunLength_ != 0) {
if (fixedRunLength_ < MIN_REPEAT) {
variableRunLength_ = fixedRunLength_;
fixedRunLength_ = 0;
determineEncoding(option);
writeValues(option);
} else if (fixedRunLength_ >= MIN_REPEAT && fixedRunLength_ <= MAX_SHORT_REPEAT_LENGTH) {
option.encoding = SHORT_REPEAT;
writeValues(option);
} else {
option.encoding = DELTA;
option.isFixedDelta = true;
writeValues(option);
}
}
}

outputStream->BackUp(static_cast<int>(bufferLength - bufferPosition));
finishEncode();
uint64_t dataSize = outputStream->flush();
bufferLength = bufferPosition = 0;
return dataSize;
}

Expand Down Expand Up @@ -779,4 +756,30 @@ namespace orc {
fixedRunLength_ = 1;
variableRunLength_ = 1;
}

void RleEncoderV2::finishEncode() {
if (numLiterals != 0) {
EncodingOption option = {};
if (variableRunLength_ != 0) {
determineEncoding(option);
writeValues(option);
} else if (fixedRunLength_ != 0) {
if (fixedRunLength_ < MIN_REPEAT) {
variableRunLength_ = fixedRunLength_;
fixedRunLength_ = 0;
determineEncoding(option);
writeValues(option);
} else if (fixedRunLength_ >= MIN_REPEAT && fixedRunLength_ <= MAX_SHORT_REPEAT_LENGTH) {
option.encoding = SHORT_REPEAT;
writeValues(option);
} else {
option.encoding = DELTA;
option.isFixedDelta = true;
writeValues(option);
}
}
}

RleEncoder::finishEncode();
}
} // namespace orc
4 changes: 4 additions & 0 deletions c++/src/io/OutputStream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ namespace orc {
}
}

void BufferedOutputStream::finishStream() {
// PASS
}

google::protobuf::int64 BufferedOutputStream::ByteCount() const {
return static_cast<google::protobuf::int64>(dataBuffer_->size());
}
Expand Down
1 change: 1 addition & 0 deletions c++/src/io/OutputStream.hh
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ namespace orc {
virtual bool isCompressed() const {
return false;
}
virtual void finishStream();
};
DIAGNOSTIC_POP

Expand Down
17 changes: 14 additions & 3 deletions c++/test/TestRleEncoder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -84,16 +84,21 @@ namespace orc {
std::make_unique<SeekableArrayInputStream>(memStream.getData(), memStream.getLength()),
isSinged, version, *getDefaultPool(), getDefaultReaderMetrics());

int64_t* decodedData = new int64_t[numValues];
decoder->next(decodedData, numValues, notNull);
std::vector<int64_t> decodedData(numValues);
decoder->next(decodedData.data(), numValues, notNull);

for (uint64_t i = 0; i < numValues; ++i) {
if (!notNull || notNull[i]) {
EXPECT_EQ(data[i], decodedData[i]);
}
}

delete[] decodedData;
decoder->next(decodedData.data(), numValues, notNull);
for (uint64_t i = 0; i < numValues; ++i) {
if (!notNull || notNull[i]) {
EXPECT_EQ(data[i], decodedData[i]);
}
}
}

std::unique_ptr<RleEncoder> RleTest::getEncoder(RleVersion version, MemoryOutputStream& memStream,
Expand Down Expand Up @@ -128,6 +133,9 @@ namespace orc {
char* notNull = numNulls == 0 ? nullptr : new char[numValues];
int64_t* data = new int64_t[numValues];
generateData(numValues, start, delta, random, data, numNulls, notNull);
encoder->add(data, numValues, notNull);
encoder->finishEncode();

encoder->add(data, numValues, notNull);
encoder->flush();

Expand Down Expand Up @@ -243,6 +251,9 @@ namespace orc {
MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);

std::unique_ptr<RleEncoder> encoder = getEncoder(RleVersion_2, memStream, isSigned);
encoder->add(data, numValues, nullptr);
encoder->finishEncode();

encoder->add(data, numValues, nullptr);
encoder->flush();

Expand Down
Loading