diff --git a/velox/common/compression/v2/Compression.cpp b/velox/common/compression/v2/Compression.cpp index 78b1bc1d305f1..9b745baff3441 100644 --- a/velox/common/compression/v2/Compression.cpp +++ b/velox/common/compression/v2/Compression.cpp @@ -14,13 +14,12 @@ * limitations under the License. */ -// Adapted from Apache Arrow. - -#include "velox/common/compression/v2/Compression.h" +// Derived from Apache Arrow. #include #include -#include + #include "velox/common/base/Exceptions.h" +#include "velox/common/compression/v2/Compression.h" #include "velox/common/compression/v2/Lz4Compression.h" namespace facebook::velox::common { @@ -126,7 +125,6 @@ std::unique_ptr Codec::create( return codec; } -// use compression level to create Codec std::unique_ptr Codec::create( CompressionKind kind, int32_t compressionLevel) { diff --git a/velox/common/compression/v2/Compression.h b/velox/common/compression/v2/Compression.h index d6ef89b9017c2..1816ca4e7007f 100644 --- a/velox/common/compression/v2/Compression.h +++ b/velox/common/compression/v2/Compression.h @@ -30,7 +30,7 @@ namespace facebook::velox::common { static constexpr int32_t kUseDefaultCompressionLevel = std::numeric_limits::min(); -/// Streaming compressor interface. +// Streaming compressor interface. class Compressor { public: virtual ~Compressor() = default; @@ -71,7 +71,7 @@ class Compressor { virtual EndResult end(uint64_t outputLength, uint8_t* output) = 0; }; -/// Streaming decompressor interface +// Streaming decompressor interface class Decompressor { public: virtual ~Decompressor() = default; @@ -91,14 +91,14 @@ class Decompressor { uint64_t outputLength, uint8_t* output) = 0; - /// Return whether the compressed stream is finished. + // Return whether the compressed stream is finished. virtual bool isFinished() = 0; - /// Reinitialize decompressor, making it ready for a new compressed stream. + // Reinitialize decompressor, making it ready for a new compressed stream. virtual void reset() = 0; }; -/// Compression codec options +// Compression codec options class CodecOptions { public: explicit CodecOptions(int32_t compressionLevel = kUseDefaultCompressionLevel) @@ -109,7 +109,7 @@ class CodecOptions { int32_t compressionLevel; }; -/// Compression codec +// Compression codec class Codec { public: virtual ~Codec() = default; @@ -118,27 +118,27 @@ class Codec { /// should use its default compression level. static int32_t useDefaultCompressionLevel(); - /// Create a kind for the given compression algorithm with CodecOptions. + // Create a kind for the given compression algorithm with CodecOptions. static std::unique_ptr create( CompressionKind kind, const CodecOptions& codecOptions = CodecOptions{}); - /// Create a kind for the given compression algorithm. + // Create a kind for the given compression algorithm. static std::unique_ptr create( CompressionKind kind, int32_t compressionLevel); - /// Return true if support for indicated kind has been enabled. + // Return true if support for indicated kind has been enabled. static bool isAvailable(CompressionKind kind); /// Return true if indicated kind supports extracting uncompressed length /// from compressed data. static bool supportsGetUncompressedLength(CompressionKind kind); - /// Return true if indicated kind supports setting a compression level. + // Return true if indicated kind supports setting a compression level. static bool supportsCompressionLevel(CompressionKind kind); - /// Return true if indicated kind supports creating streaming de/compressor. + // Return true if indicated kind supports creating streaming de/compressor. static bool supportsStreamingCompression(CompressionKind kind); /// Return the smallest supported compression level for the kind @@ -153,13 +153,13 @@ class Codec { /// Note: This function creates a temporary Codec instance. static int32_t defaultCompressionLevel(CompressionKind kind); - /// Return the smallest supported compression level. + // Return the smallest supported compression level. virtual int32_t minimumCompressionLevel() const = 0; - /// Return the largest supported compression level. + // Return the largest supported compression level. virtual int32_t maximumCompressionLevel() const = 0; - /// Return the default compression level. + // Return the default compression level. virtual int32_t defaultCompressionLevel() const = 0; /// One-shot decompression function. @@ -201,7 +201,7 @@ class Codec { uint64_t outputLength, uint8_t* output); - /// Maximum compressed length of given input length. + // Maximum compressed length of given input length. virtual uint64_t maxCompressedLength(uint64_t inputLength) = 0; /// Extracts the uncompressed length from the compressed data if possible. @@ -215,27 +215,27 @@ class Codec { const uint8_t* input, std::optional uncompressedLength = std::nullopt) const; - /// Create a streaming compressor instance. + // Create a streaming compressor instance. virtual std::shared_ptr makeCompressor() = 0; - /// Create a streaming compressor instance. + // Create a streaming compressor instance. virtual std::shared_ptr makeDecompressor() = 0; - /// This Codec's compression type. + // This Codec's compression type. virtual CompressionKind compressionKind() const = 0; - /// The name of this Codec's compression type. + // The name of this Codec's compression type. std::string name() const { return compressionKindToString(compressionKind()); } - /// This Codec's compression level, if applicable. + // This Codec's compression level, if applicable. virtual int32_t compressionLevel() const { return kUseDefaultCompressionLevel; } private: - /// Initializes the codec's resources. + // Initializes the codec's resources. virtual void init(); virtual std::optional doGetUncompressedLength( diff --git a/velox/common/compression/v2/HadoopCompressionFormat.cpp b/velox/common/compression/v2/HadoopCompressionFormat.cpp index d2b6e4852a708..4f11884dfb553 100644 --- a/velox/common/compression/v2/HadoopCompressionFormat.cpp +++ b/velox/common/compression/v2/HadoopCompressionFormat.cpp @@ -48,14 +48,14 @@ bool HadoopCompressionFormat::tryDecompressHadoop( inputLength -= kPrefixLength; if (inputLength < expectedCompressedSize) { - // Not enough bytes for Hadoop "frame" + // Not enough bytes for Hadoop "frame". return false; } if (outputLength < expectedDecompressedSize) { - // Not enough bytes to hold advertised output => probably not Hadoop + // Not enough bytes to hold advertised output => probably not Hadoop. return false; } - // Try decompressing and compare with expected decompressed length + // Try decompressing and compare with expected decompressed length. try { auto decompressedSize = decompressInternal( expectedCompressedSize, input, outputLength, output); diff --git a/velox/common/compression/v2/Lz4Compression.cpp b/velox/common/compression/v2/Lz4Compression.cpp index baf45c4335e41..8d98be72fe903 100644 --- a/velox/common/compression/v2/Lz4Compression.cpp +++ b/velox/common/compression/v2/Lz4Compression.cpp @@ -23,7 +23,7 @@ namespace facebook::velox::common { namespace { -void lz4Error(LZ4F_errorCode_t errorCode, const char* prefixMessage) { +void lz4Error(const char* prefixMessage, LZ4F_errorCode_t errorCode) { VELOX_FAIL(prefixMessage, LZ4F_getErrorName(errorCode)); } @@ -111,7 +111,7 @@ void LZ4Compressor::init() { ret = LZ4F_createCompressionContext(&ctx_, LZ4F_VERSION); if (LZ4F_isError(ret)) { - lz4Error(ret, "LZ4 init failed: "); + lz4Error("LZ4 init failed: ", ret); } } @@ -139,7 +139,7 @@ Compressor::CompressResult LZ4Compressor::compress( auto numBytesOrError = LZ4F_compressUpdate( ctx_, output, outputSize, input, inputSize, nullptr /* options */); if (LZ4F_isError(numBytesOrError)) { - lz4Error(numBytesOrError, "LZ4 compress update failed: "); + lz4Error("LZ4 compress update failed: ", numBytesOrError); } bytesWritten += static_cast(numBytesOrError); DCHECK_LE(bytesWritten, outputSize); @@ -168,7 +168,7 @@ Compressor::FlushResult LZ4Compressor::flush( auto numBytesOrError = LZ4F_flush(ctx_, output, outputSize, nullptr /* options */); if (LZ4F_isError(numBytesOrError)) { - lz4Error(numBytesOrError, "LZ4 flush failed: "); + lz4Error("LZ4 flush failed: ", numBytesOrError); } bytesWritten += static_cast(numBytesOrError); DCHECK_LE(bytesWritten, outputLength); @@ -197,7 +197,7 @@ Compressor::EndResult LZ4Compressor::end( auto numBytesOrError = LZ4F_compressEnd(ctx_, output, outputSize, nullptr /* options */); if (LZ4F_isError(numBytesOrError)) { - lz4Error(numBytesOrError, "LZ4 end failed: "); + lz4Error("LZ4 end failed: ", numBytesOrError); } bytesWritten += static_cast(numBytesOrError); DCHECK_LE(bytesWritten, outputLength); @@ -210,7 +210,7 @@ void LZ4Compressor::compressBegin( uint64_t& bytesWritten) { auto numBytesOrError = LZ4F_compressBegin(ctx_, output, outputLen, &prefs_); if (LZ4F_isError(numBytesOrError)) { - lz4Error(numBytesOrError, "LZ4 compress begin failed: "); + lz4Error("LZ4 compress begin failed: ", numBytesOrError); } firstTime_ = false; output += numBytesOrError; @@ -224,7 +224,7 @@ void common::LZ4Decompressor::init() { ret = LZ4F_createDecompressionContext(&ctx_, LZ4F_VERSION); if (LZ4F_isError(ret)) { - lz4Error(ret, "LZ4 init failed: "); + lz4Error("LZ4 init failed: ", ret); } } @@ -253,7 +253,7 @@ Decompressor::DecompressResult LZ4Decompressor::decompress( auto ret = LZ4F_decompress( ctx_, output, &outputSize, input, &inputSize, nullptr /* options */); if (LZ4F_isError(ret)) { - lz4Error(ret, "LZ4 decompress failed: "); + lz4Error("LZ4 decompress failed: ", ret); } finished_ = (ret == 0); return DecompressResult{ @@ -313,7 +313,7 @@ uint64_t Lz4FrameCodec::compress( static_cast(inputLength), &prefs_); if (LZ4F_isError(ret)) { - lz4Error(ret, "Lz4 compression failure: "); + lz4Error("Lz4 compression failure: ", ret); } return static_cast(ret); } diff --git a/velox/common/compression/v2/tests/CMakeLists.txt b/velox/common/compression/v2/tests/CMakeLists.txt index 983fd6c81fda6..61c6cdffdd76e 100644 --- a/velox/common/compression/v2/tests/CMakeLists.txt +++ b/velox/common/compression/v2/tests/CMakeLists.txt @@ -16,6 +16,8 @@ add_executable(velox_common_compression_v2_test CompressionTest.cpp) add_test(velox_common_compression_v2_test velox_common_compression_v2_test) target_link_libraries( velox_common_compression_v2_test - PUBLIC velox_link_libs - PRIVATE velox_common_compression_v2 velox_exception glog::glog gtest - gtest_main) + velox_link_libs + velox_common_compression_v2 + velox_exception + gtest + gtest_main) diff --git a/velox/common/compression/v2/tests/CompressionTest.cpp b/velox/common/compression/v2/tests/CompressionTest.cpp index 8c6dc5ce7ae6c..8f311a09f511b 100644 --- a/velox/common/compression/v2/tests/CompressionTest.cpp +++ b/velox/common/compression/v2/tests/CompressionTest.cpp @@ -210,7 +210,7 @@ void streamingDecompress( decompressed.resize(decompressedSize); } -// Check the streaming compressor against one-shot decompression +// Check the streaming compressor against one-shot decompression. void checkStreamingCompressor(Codec* codec, const std::vector& data) { // Run streaming compression. std::vector compressed;