Skip to content

Commit

Permalink
ORC-1720: [C++] Unified compressor/decompressor exception types
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
Add new exception types to the decompressor/compressor.

### Why are the changes needed?
The current implementation of the compressor/decompressor uses a variety of std::exception types, which can perplex users. We can enhance users' debugging efficiency by adopting a unified exception class.

### How was this patch tested?
The tests in TestWriter.cc can cover this patch.

### Was this patch authored or co-authored using generative AI tooling?
NO

Closes #1944 from luffy-zh/ORC-1720.

Authored-by: luffy-zh <[email protected]>
Signed-off-by: ffacs <[email protected]>
  • Loading branch information
luffy-zh authored and ffacs committed Jun 5, 2024
1 parent dcc7c7d commit 833817b
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 29 deletions.
12 changes: 12 additions & 0 deletions c++/include/orc/Exceptions.hh
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,18 @@ namespace orc {
SchemaEvolutionError(const SchemaEvolutionError&);
SchemaEvolutionError& operator=(const SchemaEvolutionError&) = delete;
};

class CompressionError : public std::runtime_error {
public:
explicit CompressionError(const std::string& whatArg);
explicit CompressionError(const char* whatArg);
~CompressionError() noexcept override;
CompressionError(const CompressionError&);

private:
CompressionError& operator=(const CompressionError&);
};

} // namespace orc

#endif
58 changes: 31 additions & 27 deletions c++/src/Compression.cc
Original file line number Diff line number Diff line change
Expand Up @@ -128,12 +128,12 @@ namespace orc {
while (offset < size) {
if (outputPosition == outputSize) {
if (!BufferedOutputStream::Next(reinterpret_cast<void**>(&outputBuffer), &outputSize)) {
throw std::runtime_error("Failed to get next output buffer from output stream.");
throw CompressionError("Failed to get next output buffer from output stream.");
}
outputPosition = 0;
} else if (outputPosition > outputSize) {
// for safety this will unlikely happen
throw std::logic_error("Write to an out-of-bound place during compression!");
throw CompressionError("Write to an out-of-bound place during compression!");
}
int currentSize = std::min(outputSize - outputPosition, size - offset);
memcpy(outputBuffer + outputPosition, data + offset, static_cast<size_t>(currentSize));
Expand All @@ -147,7 +147,7 @@ namespace orc {
for (uint32_t i = 0; i < HEADER_SIZE; ++i) {
if (outputPosition >= outputSize) {
if (!BufferedOutputStream::Next(reinterpret_cast<void**>(&outputBuffer), &outputSize)) {
throw std::runtime_error("Failed to get next output buffer from output stream.");
throw CompressionError("Failed to get next output buffer from output stream.");
}
outputPosition = 0;
}
Expand Down Expand Up @@ -188,7 +188,7 @@ namespace orc {
uint64_t backup = static_cast<uint64_t>(count);
uint64_t currSize = rawInputBuffer.size();
if (backup > currSize) {
throw std::logic_error("Can't backup that much!");
throw CompressionError("Can't backup that much!");
}
rawInputBuffer.resize(currSize - backup);
}
Expand Down Expand Up @@ -250,7 +250,7 @@ namespace orc {
std::stringstream ss;
ss << "uncompressed data size " << rawInputBuffer.size()
<< " is larger than compression block size " << compressionBlockSize;
throw std::logic_error(ss.str());
throw CompressionError(ss.str());
}

// compress data in the rawInputBuffer when it is full
Expand Down Expand Up @@ -297,7 +297,7 @@ namespace orc {

uint64_t ZlibCompressionStream::doStreamingCompression() {
if (deflateReset(&strm_) != Z_OK) {
throw std::runtime_error("Failed to reset inflate.");
throw CompressionError("Failed to reset inflate.");
}

// iterate through all blocks
Expand All @@ -318,7 +318,7 @@ namespace orc {
do {
if (outputPosition >= outputSize) {
if (!BufferedOutputStream::Next(reinterpret_cast<void**>(&outputBuffer), &outputSize)) {
throw std::runtime_error("Failed to get next output buffer from output stream.");
throw CompressionError("Failed to get next output buffer from output stream.");
}
outputPosition = 0;
}
Expand All @@ -333,7 +333,7 @@ namespace orc {
} else if (ret == Z_OK) {
// needs more buffer so will continue the loop
} else {
throw std::runtime_error("Failed to deflate input data.");
throw CompressionError("Failed to deflate input data.");
}
} while (strm_.avail_out == 0);
} while (!finish);
Expand All @@ -357,7 +357,7 @@ namespace orc {
strm_.next_in = nullptr;

if (deflateInit2(&strm_, level, Z_DEFLATED, -15, 8, Z_DEFAULT_STRATEGY) != Z_OK) {
throw std::runtime_error("Error while calling deflateInit2() for zlib.");
throw CompressionError("Error while calling deflateInit2() for zlib.");
}
}

Expand Down Expand Up @@ -557,7 +557,7 @@ namespace orc {
} else if (state == DECOMPRESS_START) {
NextDecompress(data, size, availableSize);
} else {
throw std::logic_error(
throw CompressionError(
"Unknown compression state in "
"DecompressionStream::Next");
}
Expand All @@ -571,7 +571,7 @@ namespace orc {

void DecompressionStream::BackUp(int count) {
if (outputBuffer == nullptr || outputBufferLength != 0) {
throw std::logic_error("Backup without previous Next in " + getName());
throw CompressionError("Backup without previous Next in " + getName());
}
outputBuffer -= static_cast<size_t>(count);
outputBufferLength = static_cast<size_t>(count);
Expand Down Expand Up @@ -699,13 +699,17 @@ namespace orc {
case Z_OK:
break;
case Z_MEM_ERROR:
throw std::logic_error("Memory error from inflateInit2");
throw CompressionError(
"Memory error from ZlibDecompressionStream::ZlibDecompressionStream inflateInit2");
case Z_VERSION_ERROR:
throw std::logic_error("Version error from inflateInit2");
throw CompressionError(
"Version error from ZlibDecompressionStream::ZlibDecompressionStream inflateInit2");
case Z_STREAM_ERROR:
throw std::logic_error("Stream error from inflateInit2");
throw CompressionError(
"Stream error from ZlibDecompressionStream::ZlibDecompressionStream inflateInit2");
default:
throw std::logic_error("Unknown error from inflateInit2");
throw CompressionError(
"Unknown error from ZlibDecompressionStream::ZlibDecompressionStream inflateInit2");
}
}

Expand All @@ -726,7 +730,7 @@ namespace orc {
zstream_.next_out = reinterpret_cast<Bytef*>(const_cast<char*>(outputBuffer));
zstream_.avail_out = static_cast<uInt>(outputDataBuffer.capacity());
if (inflateReset(&zstream_) != Z_OK) {
throw std::logic_error(
throw CompressionError(
"Bad inflateReset in "
"ZlibDecompressionStream::NextDecompress");
}
Expand All @@ -746,19 +750,19 @@ namespace orc {
case Z_STREAM_END:
break;
case Z_BUF_ERROR:
throw std::logic_error(
throw CompressionError(
"Buffer error in "
"ZlibDecompressionStream::NextDecompress");
case Z_DATA_ERROR:
throw std::logic_error(
throw CompressionError(
"Data error in "
"ZlibDecompressionStream::NextDecompress");
case Z_STREAM_ERROR:
throw std::logic_error(
throw CompressionError(
"Stream error in "
"ZlibDecompressionStream::NextDecompress");
default:
throw std::logic_error(
throw CompressionError(
"Unknown error in "
"ZlibDecompressionStream::NextDecompress");
}
Expand Down Expand Up @@ -864,7 +868,7 @@ namespace orc {
}

if (outLength > maxOutputLength) {
throw std::logic_error("Snappy length exceeds block size");
throw CompressionError("Snappy length exceeds block size");
}

if (!snappy::RawUncompress(input, length, output)) {
Expand Down Expand Up @@ -966,7 +970,7 @@ namespace orc {

void BlockCompressionStream::BackUp(int count) {
if (count > bufferSize) {
throw std::logic_error("Can't backup that much!");
throw CompressionError("Can't backup that much!");
}
bufferSize -= count;
}
Expand All @@ -975,7 +979,7 @@ namespace orc {
void* data;
int size;
if (!Next(&data, &size)) {
throw std::runtime_error("Failed to flush compression buffer.");
throw CompressionError("Failed to flush compression buffer.");
}
BufferedOutputStream::BackUp(outputSize - outputPosition);
bufferSize = outputSize = outputPosition = 0;
Expand Down Expand Up @@ -1058,15 +1062,15 @@ namespace orc {
reinterpret_cast<char*>(compressorBuffer.data()), bufferSize,
static_cast<int>(compressorBuffer.size()), level);
if (result == 0) {
throw std::runtime_error("Error during block compression using lz4.");
throw CompressionError("Error during block compression using lz4.");
}
return static_cast<uint64_t>(result);
}

void Lz4CompressionSteam::init() {
state_ = LZ4_createStream();
if (!state_) {
throw std::runtime_error("Error while allocating state for lz4.");
throw CompressionError("Error while allocating state for lz4.");
}
}

Expand Down Expand Up @@ -1154,7 +1158,7 @@ namespace orc {
void ZSTDCompressionStream::init() {
cctx_ = ZSTD_createCCtx();
if (!cctx_) {
throw std::runtime_error("Error while calling ZSTD_createCCtx() for zstd.");
throw CompressionError("Error while calling ZSTD_createCCtx() for zstd.");
}
}

Expand Down Expand Up @@ -1211,7 +1215,7 @@ namespace orc {
void ZSTDDecompressionStream::init() {
dctx_ = ZSTD_createDCtx();
if (!dctx_) {
throw std::runtime_error("Error while calling ZSTD_createDCtx() for zstd.");
throw CompressionError("Error while calling ZSTD_createDCtx() for zstd.");
}
}

Expand Down
16 changes: 16 additions & 0 deletions c++/src/Exceptions.cc
Original file line number Diff line number Diff line change
Expand Up @@ -84,4 +84,20 @@ namespace orc {
SchemaEvolutionError::~SchemaEvolutionError() noexcept {
// PASS
}

CompressionError::CompressionError(const std::string& whatArg) : runtime_error(whatArg) {
// PASS
}

CompressionError::CompressionError(const char* whatArg) : runtime_error(whatArg) {
// PASS
}

CompressionError::CompressionError(const CompressionError& error) : runtime_error(error) {
// PASS
}

CompressionError::~CompressionError() noexcept {
// PASS
}
} // namespace orc
4 changes: 2 additions & 2 deletions c++/test/TestDecompression.cc
Original file line number Diff line number Diff line change
Expand Up @@ -545,7 +545,7 @@ namespace orc {
*getDefaultPool(), getDefaultReaderMetrics());
const void* ptr;
int length;
ASSERT_THROW(result->BackUp(20), std::logic_error);
ASSERT_THROW(result->BackUp(20), CompressionError);
ASSERT_EQ(true, result->Next(&ptr, &length));
ASSERT_EQ(30, length);
for (int i = 0; i < 10; ++i) {
Expand All @@ -554,7 +554,7 @@ namespace orc {
}
}
result->BackUp(10);
ASSERT_THROW(result->BackUp(2), std::logic_error);
ASSERT_THROW(result->BackUp(2), CompressionError);
ASSERT_EQ(true, result->Next(&ptr, &length));
ASSERT_EQ(10, length);
for (int i = 0; i < 10; ++i) {
Expand Down

0 comments on commit 833817b

Please sign in to comment.