Skip to content

Commit

Permalink
add more comments
Browse files Browse the repository at this point in the history
  • Loading branch information
luffy-zh committed May 20, 2024
1 parent ead502a commit 4eeba00
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 14 deletions.
6 changes: 4 additions & 2 deletions c++/include/orc/Writer.hh
Original file line number Diff line number Diff line change
Expand Up @@ -279,12 +279,14 @@ namespace orc {
uint64_t getOutputBufferCapacity() const;

/**
* Set the initial block size of input buffer in the class CompressionStream.
* Set the initial block size of original input buffer in the class CompressionStream.
* the input buffer is used to store raw data before compression, while the output buffer is
* dedicated to holding compressed data
*/
WriterOptions& setMemoryBlockSize(uint64_t capacity);

/**
* Get the initial block size of input buffer in the class CompressionStream.
* Get the initial block size of original input buffer in the class CompressionStream.
* @return if not set, return default value which is 64 KB.
*/
uint64_t getMemoryBlockSize() const;
Expand Down
15 changes: 6 additions & 9 deletions c++/src/Compression.cc
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,7 @@ namespace orc {
// Buffer to hold uncompressed data until user calls Next()
BlockBuffer rawInputBuffer;

// compress with raw fallback
void compressWithRawFallback();
void compressInternal();
};

void CompressionStream::BackUp(int count) {
Expand All @@ -195,7 +194,7 @@ namespace orc {
}

uint64_t CompressionStream::flush() {
compressWithRawFallback();
compressInternal();
BufferedOutputStream::BackUp(outputSize - outputPosition);
rawInputBuffer.resize(0);
outputSize = outputPosition = 0;
Expand All @@ -219,7 +218,7 @@ namespace orc {
// PASS
}

void CompressionStream::compressWithRawFallback() {
void CompressionStream::compressInternal() {
if (rawInputBuffer.size() != 0) {
ensureHeader();

Expand Down Expand Up @@ -250,15 +249,13 @@ namespace orc {
if (rawInputBuffer.size() > compressionBlockSize) {
std::stringstream ss;
ss << "uncompressed data size " << rawInputBuffer.size() << " is larger than block size "
<< compressionBlockSize
<< ". compressionBlockSize should be set equal to multiply of "
"memoryBlockSize";
<< compressionBlockSize;
throw std::logic_error(ss.str());
}

// triggle compress when rawInputBuffer is reach the capacity
// compress data in the rawInputBuffer when it is full
if (rawInputBuffer.size() == compressionBlockSize) {
compressWithRawFallback();
compressInternal();
}

auto block = rawInputBuffer.getNextBlock();
Expand Down
5 changes: 4 additions & 1 deletion c++/src/Compression.hh
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,11 @@ namespace orc {
* @param outStream the output stream that is the underlying target
* @param strategy compression strategy
* @param bufferCapacity compression stream buffer total capacity
* @param compressionBlockSize compression buffer block size
* @param compressionBlockSize compression is triggered when the original input buffer size
* reaches this size
* @param memoryBlockSize the block size for original input buffer
* @param pool the memory pool
* @param metrics the writer metrics
*/
std::unique_ptr<BufferedOutputStream> createCompressor(
CompressionKind kind, OutputStream* outStream, CompressionStrategy strategy,
Expand Down
4 changes: 3 additions & 1 deletion c++/src/RLE.cc
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,9 @@ namespace orc {
uint64_t unusedBufferSize = static_cast<uint64_t>(bufferLength - bufferPosition);
if (outputStream->isCompressed()) {
recorder->add(flushedSize);
// number of decompressed bytes that need to be consumed
// There are multiple blocks in the input buffer, but bufferPosition only records the
// effective length of the last block. We need rawInputBufferSize to record the total length
// of all variable blocks.
recorder->add(outputStream->getRawInputBufferSize() - unusedBufferSize);
} else {
recorder->add(flushedSize - unusedBufferSize);
Expand Down
4 changes: 3 additions & 1 deletion c++/src/io/OutputStream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,9 @@ namespace orc {
if (outStream_->isCompressed()) {
// start of the compression chunk in the stream
recorder->add(flushedSize);
// number of decompressed bytes that need to be consumed
// There are multiple blocks in the input buffer, but bufferPosition only records the
// effective length of the last block. We need rawInputBufferSize to record the total length
// of all variable blocks.
recorder->add(outStream_->getRawInputBufferSize() - unusedBufferSize);
} else {
// byte offset of the start location
Expand Down

0 comments on commit 4eeba00

Please sign in to comment.