diff --git a/velox/common/CMakeLists.txt b/velox/common/CMakeLists.txt index 57175798fa9c1..e345d71976200 100644 --- a/velox/common/CMakeLists.txt +++ b/velox/common/CMakeLists.txt @@ -14,7 +14,6 @@ add_subdirectory(base) add_subdirectory(caching) add_subdirectory(compression) -add_subdirectory(compression/v2) add_subdirectory(encode) add_subdirectory(file) add_subdirectory(hyperloglog) diff --git a/velox/common/compression/CMakeLists.txt b/velox/common/compression/CMakeLists.txt index e429485151e35..d352763e8161e 100644 --- a/velox/common/compression/CMakeLists.txt +++ b/velox/common/compression/CMakeLists.txt @@ -16,8 +16,10 @@ if(${VELOX_BUILD_TESTING}) add_subdirectory(tests) endif() -add_library(velox_common_compression Compression.cpp LzoDecompressor.cpp) +add_library( + velox_common_compression Compression.cpp LzoDecompressor.cpp + Lz4Compression.cpp HadoopCompressionFormat.cpp) target_link_libraries( velox_common_compression - PUBLIC Folly::folly + PUBLIC Folly::folly lz4::lz4 PRIVATE velox_exception) diff --git a/velox/common/compression/Compression.cpp b/velox/common/compression/Compression.cpp index 3843f5902a980..22933d04d274f 100644 --- a/velox/common/compression/Compression.cpp +++ b/velox/common/compression/Compression.cpp @@ -16,6 +16,7 @@ #include "velox/common/compression/Compression.h" #include "velox/common/base/Exceptions.h" +#include "velox/common/compression/Lz4Compression.h" #include @@ -97,4 +98,124 @@ CompressionKind stringToCompressionKind(const std::string& kind) { VELOX_UNSUPPORTED("Not support compression kind {}", kind); } } + +void Codec::init() {} + +bool Codec::supportsGetUncompressedLength(CompressionKind kind) { + switch (kind) { + default: + return false; + } +} + +bool Codec::supportsStreamingCompression(CompressionKind kind) { + switch (kind) { + case CompressionKind::CompressionKind_LZ4: + return true; + default: + return false; + } +} + +bool Codec::supportsCompressFixedLength(CompressionKind kind) { + switch (kind) { + default: + return false; + } +} + +int32_t Codec::maximumCompressionLevel(CompressionKind kind) { + auto codec = Codec::create(kind); + return codec->maximumCompressionLevel(); +} + +int32_t Codec::minimumCompressionLevel(CompressionKind kind) { + auto codec = Codec::create(kind); + return codec->minimumCompressionLevel(); +} + +int32_t Codec::defaultCompressionLevel(CompressionKind kind) { + auto codec = Codec::create(kind); + return codec->defaultCompressionLevel(); +} + +std::unique_ptr Codec::create( + CompressionKind kind, + const CodecOptions& codecOptions) { + if (!isAvailable(kind)) { + auto name = compressionKindToString(kind); + if (folly::StringPiece({name}).startsWith("unknown")) { + VELOX_UNSUPPORTED("Unrecognized codec '{}'", name); + } + VELOX_UNSUPPORTED("Support for codec '{}' not implemented.", name); + } + + auto compressionLevel = codecOptions.compressionLevel; + std::unique_ptr codec; + switch (kind) { + case CompressionKind::CompressionKind_LZ4: + if (auto options = dynamic_cast(&codecOptions)) { + switch (options->type) { + case Lz4CodecOptions::kLz4Frame: + codec = makeLz4FrameCodec(compressionLevel); + break; + case Lz4CodecOptions::kLz4Raw: + codec = makeLz4RawCodec(compressionLevel); + break; + case Lz4CodecOptions::kLz4Hadoop: + codec = makeLz4HadoopCodec(); + break; + } + } + // By default, create LZ4 Frame codec. + codec = makeLz4FrameCodec(compressionLevel); + break; + default: + break; + } + + if (codec == nullptr) { + VELOX_UNSUPPORTED( + "{} codec not implemented", compressionKindToString(kind)); + } + + codec->init(); + + return codec; +} + +std::unique_ptr Codec::create( + CompressionKind kind, + int32_t compressionLevel) { + return create(kind, CodecOptions{compressionLevel}); +} + +bool Codec::isAvailable(CompressionKind kind) { + switch (kind) { + case CompressionKind::CompressionKind_NONE: + case CompressionKind::CompressionKind_LZ4: + return true; + case CompressionKind::CompressionKind_SNAPPY: + case CompressionKind::CompressionKind_GZIP: + case CompressionKind::CompressionKind_ZLIB: + case CompressionKind::CompressionKind_ZSTD: + case CompressionKind::CompressionKind_LZO: + default: + return false; + } +} + +std::optional Codec::getUncompressedLength( + uint64_t inputLength, + const uint8_t* input) const { + return std::nullopt; +} + +uint64_t Codec::compressFixedLength( + const uint8_t* input, + uint64_t inputLength, + uint8_t* output, + uint64_t outputLength) { + VELOX_UNSUPPORTED("'{}' doesn't support fixed-length compression", name()); +} } // namespace facebook::velox::common diff --git a/velox/common/compression/Compression.h b/velox/common/compression/Compression.h index 317f9717a2ae5..78f5e56d2c1c5 100644 --- a/velox/common/compression/Compression.h +++ b/velox/common/compression/Compression.h @@ -45,4 +45,212 @@ CompressionKind stringToCompressionKind(const std::string& kind); constexpr uint64_t DEFAULT_COMPRESSION_BLOCK_SIZE = 256 * 1024; +static constexpr int32_t kUseDefaultCompressionLevel = + std::numeric_limits::min(); + +class StreamingCompressor { + public: + virtual ~StreamingCompressor() = default; + + struct CompressResult { + uint64_t bytesRead; + uint64_t bytesWritten; + bool outputTooSmall; + }; + + struct FlushResult { + uint64_t bytesWritten; + bool outputTooSmall; + }; + + struct EndResult { + uint64_t bytesWritten; + bool outputTooSmall; + }; + + /// Compress some input. + /// If CompressResult.outputTooSmall is true on return, then a larger output + /// buffer should be supplied. + virtual CompressResult compress( + const uint8_t* input, + uint64_t inputLength, + uint8_t* output, + uint64_t outputLength) = 0; + + /// Flush part of the compressed output. + /// If FlushResult.outputTooSmall is true on return, flush() should be called + /// again with a larger buffer. + virtual FlushResult flush(uint8_t* output, uint64_t outputLength) = 0; + + /// End compressing, doing whatever is necessary to end the stream. + /// If EndResult.outputTooSmall is true on return, end() should be called + /// again with a larger buffer. Otherwise, the StreamingCompressor should not + /// be used anymore. end() will flush the compressed output. + virtual EndResult end(uint8_t* output, uint64_t outputLength) = 0; +}; + +class StreamingDecompressor { + public: + virtual ~StreamingDecompressor() = default; + + struct DecompressResult { + uint64_t bytesRead; + uint64_t bytesWritten; + bool outputTooSmall; + }; + + /// Decompress some input. + /// If outputTooSmall is true on return, a larger output buffer needs + /// to be supplied. + virtual DecompressResult decompress( + const uint8_t* input, + uint64_t inputLength, + uint8_t* output, + uint64_t outputLength) = 0; + + /// Return whether the compressed stream is finished. + virtual bool isFinished() = 0; + + /// Reinitialize decompressor, making it ready for a new compressed stream. + virtual void reset() = 0; +}; + +struct CodecOptions { + int32_t compressionLevel; + + CodecOptions(int32_t compressionLevel = kUseDefaultCompressionLevel) + : compressionLevel(compressionLevel) {} + + virtual ~CodecOptions() = default; +}; + +class Codec { + public: + virtual ~Codec() = default; + + /// Create a kind for the given compression algorithm with CodecOptions. + static std::unique_ptr create( + CompressionKind kind, + const CodecOptions& codecOptions = CodecOptions{}); + + /// Create a kind for the given compression algorithm. + static std::unique_ptr create( + CompressionKind kind, + int32_t compressionLevel); + + /// Return true if support for indicated kind has been enabled. + static bool isAvailable(CompressionKind kind); + + /// Return true if indicated kind supports extracting uncompressed length + /// from compressed data. + static bool supportsGetUncompressedLength(CompressionKind kind); + + /// Return true if indicated kind supports one-shot compression with fixed + /// compressed length. + static bool supportsCompressFixedLength(CompressionKind kind); + + /// Return true if indicated kind supports creating streaming de/compressor. + static bool supportsStreamingCompression(CompressionKind kind); + + /// Return the smallest supported compression level for the kind. + /// Note: This function creates a temporary Codec instance. + static int32_t minimumCompressionLevel(CompressionKind kind); + + /// Return the largest supported compression level for the kind + /// Note: This function creates a temporary Codec instance. + static int32_t maximumCompressionLevel(CompressionKind kind); + + /// Return the default compression level. + /// Note: This function creates a temporary Codec instance. + static int32_t defaultCompressionLevel(CompressionKind kind); + + /// Return the smallest supported compression level. + /// If the codec doesn't support compression level, + /// `kUseDefaultCompressionLevel` will be returned. + virtual int32_t minimumCompressionLevel() const = 0; + + /// Return the largest supported compression level. + /// If the codec doesn't support compression level, + /// `kUseDefaultCompressionLevel` will be returned. + virtual int32_t maximumCompressionLevel() const = 0; + + /// Return the default compression level. + /// If the codec doesn't support compression level, + /// `kUseDefaultCompressionLevel` will be returned. + virtual int32_t defaultCompressionLevel() const = 0; + + /// One-shot decompression function. + /// `outputLength` must be correct and therefore be obtained in advance. + /// The actual decompressed length is returned. + /// Note: One-shot decompression is not always compatible with streaming + /// compression. Depending on the codec (e.g. LZ4), different formats may + /// be used. + virtual uint64_t decompress( + const uint8_t* input, + uint64_t inputLength, + uint8_t* output, + uint64_t outputLength) = 0; + + /// Performs one-shot compression. + /// `outputLength` must first have been computed using maxCompressedLength(). + /// The actual compressed length is returned. + /// Note: One-shot compression is not always compatible with streaming + /// decompression. Depending on the codec (e.g. LZ4), different formats may + /// be used. + virtual uint64_t compress( + const uint8_t* input, + uint64_t inputLength, + uint8_t* output, + uint64_t outputLength) = 0; + + /// Performs one-shot compression. + /// This function compresses data and writes the output up to the specified + /// outputLength. If outputLength is too small to hold all the compressed + /// data, the function doesn't fail. Instead, it returns the number of bytes + /// actually written to the output buffer. Any remaining data that couldn't + /// be written in this call will be written in subsequent calls to this + /// function. This is useful when fixed-length compression blocks are required + /// by the caller. + /// Note: Only Gzip and Zstd codec supports this function. + virtual uint64_t compressFixedLength( + const uint8_t* input, + uint64_t inputLength, + uint8_t* output, + uint64_t outputLength); + + /// Maximum compressed length of given input length. + virtual uint64_t maxCompressedLength(uint64_t inputLength) = 0; + + /// Retrieves the actual uncompressed length of data using the specified + /// compression library. + /// Note: This functionality is not universally supported by all compression + /// libraries. If not supported, `std::nullopt` will be returned. + std::optional getUncompressedLength( + uint64_t inputLength, + const uint8_t* input) const; + + /// Create a streaming compressor instance. + virtual std::shared_ptr makeStreamingCompressor() = 0; + + /// Create a streaming compressor instance. + virtual std::shared_ptr + makeStreamingDecompressor() = 0; + + /// This Codec's compression type. + virtual CompressionKind compressionKind() const = 0; + + /// The name of this Codec's compression type. + std::string name() const { + return compressionKindToString(compressionKind()); + } + + /// This Codec's compression level, if applicable. + virtual int32_t compressionLevel() const { + return kUseDefaultCompressionLevel; + } + + private: + /// Initializes the codec's resources. + virtual void init(); +}; } // namespace facebook::velox::common diff --git a/velox/common/compression/v2/HadoopCompressionFormat.cpp b/velox/common/compression/HadoopCompressionFormat.cpp similarity index 97% rename from velox/common/compression/v2/HadoopCompressionFormat.cpp rename to velox/common/compression/HadoopCompressionFormat.cpp index df0f7034ea417..bda5a756c7c07 100644 --- a/velox/common/compression/v2/HadoopCompressionFormat.cpp +++ b/velox/common/compression/HadoopCompressionFormat.cpp @@ -14,7 +14,7 @@ * limitations under the License. */ -#include "velox/common/compression/v2/HadoopCompressionFormat.h" +#include "velox/common/compression/HadoopCompressionFormat.h" #include "velox/common/base/Exceptions.h" #include diff --git a/velox/common/compression/v2/HadoopCompressionFormat.h b/velox/common/compression/HadoopCompressionFormat.h similarity index 100% rename from velox/common/compression/v2/HadoopCompressionFormat.h rename to velox/common/compression/HadoopCompressionFormat.h diff --git a/velox/common/compression/v2/Lz4Compression.cpp b/velox/common/compression/Lz4Compression.cpp similarity index 97% rename from velox/common/compression/v2/Lz4Compression.cpp rename to velox/common/compression/Lz4Compression.cpp index 0afc3c6f258fa..1742c71b2a912 100644 --- a/velox/common/compression/v2/Lz4Compression.cpp +++ b/velox/common/compression/Lz4Compression.cpp @@ -14,13 +14,15 @@ * limitations under the License. */ -#include "velox/common/compression/v2/Lz4Compression.h" +#include "velox/common/compression/Lz4Compression.h" #include "velox/common/base/Exceptions.h" namespace facebook::velox::common { - namespace { +constexpr int32_t kLz4DefaultCompressionLevel = 1; +constexpr int32_t kLz4MinCompressionLevel = 1; + void lz4Error(const char* prefixMessage, LZ4F_errorCode_t errorCode) { VELOX_FAIL(prefixMessage, LZ4F_getErrorName(errorCode)); } @@ -428,8 +430,7 @@ Lz4RawCodec::makeStreamingDecompressor() { "Try using LZ4 frame format instead."); } -Lz4HadoopCodec::Lz4HadoopCodec(int32_t compressionLevel) - : Lz4RawCodec(compressionLevel) {} +Lz4HadoopCodec::Lz4HadoopCodec() : Lz4RawCodec(kLz4DefaultCompressionLevel) {} uint64_t Lz4HadoopCodec::maxCompressedLength(uint64_t inputLength) { return kPrefixLength + Lz4RawCodec::maxCompressedLength(inputLength); @@ -515,7 +516,7 @@ std::unique_ptr makeLz4RawCodec(int32_t compressionLevel) { return std::make_unique(compressionLevel); } -std::unique_ptr makeLz4HadoopRawCodec(int32_t compressionLevel) { - return std::make_unique(compressionLevel); +std::unique_ptr makeLz4HadoopCodec() { + return std::make_unique(); } } // namespace facebook::velox::common diff --git a/velox/common/compression/v2/Lz4Compression.h b/velox/common/compression/Lz4Compression.h similarity index 85% rename from velox/common/compression/v2/Lz4Compression.h rename to velox/common/compression/Lz4Compression.h index a915c0fe8caad..001a265fe8893 100644 --- a/velox/common/compression/v2/Lz4Compression.h +++ b/velox/common/compression/Lz4Compression.h @@ -20,22 +20,18 @@ #include #include #include -#include "velox/common/compression/v2/Compression.h" -#include "velox/common/compression/v2/HadoopCompressionFormat.h" +#include "velox/common/compression/Compression.h" +#include "velox/common/compression/HadoopCompressionFormat.h" namespace facebook::velox::common { -static constexpr int32_t kLz4DefaultCompressionLevel = 1; -static constexpr int32_t kLz4MinCompressionLevel = 1; - -class Lz4CodecOptions : public CodecOptions { - public: +struct Lz4CodecOptions : CodecOptions { enum Type { kLz4Frame, kLz4Raw, kLz4Hadoop }; - explicit Lz4CodecOptions( + Lz4CodecOptions( Lz4CodecOptions::Type type, int32_t compressionLevel = kUseDefaultCompressionLevel) - : type(type), CodecOptions(compressionLevel) {} + : CodecOptions(compressionLevel), type(type) {} Lz4CodecOptions::Type type; }; @@ -109,7 +105,7 @@ class Lz4RawCodec : public Lz4CodecBase { class Lz4HadoopCodec : public Lz4RawCodec, public HadoopCompressionFormat { public: - Lz4HadoopCodec(int32_t compressionLevel); + Lz4HadoopCodec(); uint64_t maxCompressedLength(uint64_t inputLength) override; @@ -145,13 +141,12 @@ class Lz4HadoopCodec : public Lz4RawCodec, public HadoopCompressionFormat { // Lz4 frame format codec. std::unique_ptr makeLz4FrameCodec( - int32_t compressionLevel = kLz4DefaultCompressionLevel); + int32_t compressionLevel = kUseDefaultCompressionLevel); // Lz4 "raw" format codec. std::unique_ptr makeLz4RawCodec( - int32_t compressionLevel = kLz4DefaultCompressionLevel); + int32_t compressionLevel = kUseDefaultCompressionLevel); // Lz4 "Hadoop" format codec (== Lz4 raw codec prefixed with lengths header) -std::unique_ptr makeLz4HadoopRawCodec( - int32_t compressionLevel = kLz4DefaultCompressionLevel); +std::unique_ptr makeLz4HadoopCodec(); } // namespace facebook::velox::common diff --git a/velox/common/compression/tests/CompressionTest.cpp b/velox/common/compression/tests/CompressionTest.cpp index 312c103e2a98c..d39be5da20cd0 100644 --- a/velox/common/compression/tests/CompressionTest.cpp +++ b/velox/common/compression/tests/CompressionTest.cpp @@ -14,14 +14,274 @@ * limitations under the License. */ +#include +#include +#include +#include +#include +#include + #include #include "velox/common/base/VeloxException.h" #include "velox/common/base/tests/GTestUtils.h" #include "velox/common/compression/Compression.h" +#include "velox/common/compression/Lz4Compression.h" namespace facebook::velox::common { +namespace { +const std::shared_ptr kDefaultCodecOptions = + std::make_shared(); + +struct TestParams { + CompressionKind compressionKind; + std::shared_ptr codecOptions; + + explicit TestParams( + common::CompressionKind compressionKind, + std::shared_ptr codecOptions = kDefaultCodecOptions) + : compressionKind(compressionKind), + codecOptions(std::move(codecOptions)) {} +}; + +std::vector makeRandomData(size_t n) { + std::vector data(n); + std::default_random_engine engine(42); + std::uniform_int_distribution dist(0, 255); + std::generate(data.begin(), data.end(), [&]() { return dist(engine); }); + return data; +} + +std::vector makeCompressibleData(size_t size) { + std::string baseData = "The quick brown fox jumps over the lazy dog"; + auto repeats = static_cast(1 + size / baseData.size()); + + std::vector data(baseData.size() * repeats); + for (int i = 0; i < repeats; ++i) { + std::memcpy( + data.data() + i * baseData.size(), baseData.data(), baseData.size()); + } + data.resize(size); + return data; +} + +std::function makeRandomInputSize() { + std::default_random_engine engine(42); + std::uniform_int_distribution sizeDistribution(10, 40); + return [=]() mutable -> uint64_t { return sizeDistribution(engine); }; +} + +// Check roundtrip of one-shot compression and decompression functions. +void checkCodecRoundtrip( + Codec* c1, + Codec* c2, + const std::vector& data) { + auto maxCompressedLen = + static_cast(c1->maxCompressedLength(data.size())); + std::vector compressed(maxCompressedLen); + std::vector decompressed(data.size()); + + // Compress with codec c1. + auto compressedSize = c1->compress( + data.data(), data.size(), compressed.data(), maxCompressedLen); + compressed.resize(compressedSize); + + // Decompress with codec c2. + auto decompressedSize = c2->decompress( + compressed.data(), + compressed.size(), + decompressed.data(), + decompressed.size()); + + ASSERT_EQ(data, decompressed); + ASSERT_EQ(data.size(), decompressedSize); +} + +// Use same codec for both compression and decompression. +void checkCodecRoundtrip( + const std::unique_ptr& codec, + const std::vector& data) { + checkCodecRoundtrip(codec.get(), codec.get(), data); +} + +// Compress with codec c1 and decompress with codec c2. +void checkCodecRoundtrip( + const std::unique_ptr& c1, + const std::unique_ptr& c2, + const std::vector& data) { + checkCodecRoundtrip(c1.get(), c2.get(), data); +} + +void streamingCompress( + const std::shared_ptr& compressor, + const std::vector& uncompressed, + std::vector& compressed) { + const uint8_t* input = uncompressed.data(); + uint64_t remaining = uncompressed.size(); + uint64_t compressedSize = 0; + compressed.resize(10); + bool doFlush = false; + + // Generate small random input buffer size. + auto randomInputSize = makeRandomInputSize(); + + // Continue decompressing until consuming all compressed data . + while (remaining > 0) { + // Feed a small amount each time. + auto inputLength = std::min(remaining, randomInputSize()); + auto outputLength = compressed.size() - compressedSize; + uint8_t* output = compressed.data() + compressedSize; + + // Compress once. + auto compressResult = + compressor->compress(input, inputLength, output, outputLength); + ASSERT_LE(compressResult.bytesRead, inputLength); + ASSERT_LE(compressResult.bytesWritten, outputLength); + + // Update result. + compressedSize += compressResult.bytesWritten; + input += compressResult.bytesRead; + remaining -= compressResult.bytesRead; + + // Grow compressed buffer if it's too small. + if (compressResult.outputTooSmall) { + compressed.resize(compressed.capacity() * 2); + } + + // Once every two iterations, do a flush. + if (doFlush) { + StreamingCompressor::FlushResult flushResult; + do { + outputLength = compressed.size() - compressedSize; + output = compressed.data() + compressedSize; + flushResult = compressor->flush(output, outputLength); + ASSERT_LE(flushResult.bytesWritten, outputLength); + compressedSize += flushResult.bytesWritten; + if (flushResult.outputTooSmall) { + compressed.resize(compressed.capacity() * 2); + } + } while (flushResult.outputTooSmall); + } + doFlush = !doFlush; + } + + // End the compressed stream. + StreamingCompressor::EndResult endResult; + do { + int64_t outputLength = compressed.size() - compressedSize; + uint8_t* output = compressed.data() + compressedSize; + endResult = compressor->end(output, outputLength); + ASSERT_LE(endResult.bytesWritten, outputLength); + compressedSize += endResult.bytesWritten; + if (endResult.outputTooSmall) { + compressed.resize(compressed.capacity() * 2); + } + } while (endResult.outputTooSmall); + compressed.resize(compressedSize); +} + +void streamingDecompress( + const std::shared_ptr& decompressor, + const std::vector& compressed, + std::vector& decompressed) { + const uint8_t* input = compressed.data(); + uint64_t remaining = compressed.size(); + uint64_t decompressedSize = 0; + decompressed.resize(10); + + // Generate small random input buffer size. + auto ramdomInputSize = makeRandomInputSize(); + + // Continue decompressing until finishes. + while (!decompressor->isFinished()) { + // Feed a small amount each time. + auto inputLength = std::min(remaining, ramdomInputSize()); + auto outputLength = decompressed.size() - decompressedSize; + uint8_t* output = decompressed.data() + decompressedSize; + + // Decompress once. + auto result = + decompressor->decompress(input, inputLength, output, outputLength); + ASSERT_LE(result.bytesRead, inputLength); + ASSERT_LE(result.bytesWritten, outputLength); + ASSERT_TRUE( + result.outputTooSmall || result.bytesWritten > 0 || + result.bytesRead > 0) + << "Decompression not progressing anymore"; + + // Update result. + decompressedSize += result.bytesWritten; + input += result.bytesRead; + remaining -= result.bytesRead; + + // Grow decompressed buffer if it's too small. + if (result.outputTooSmall) { + decompressed.resize(decompressed.capacity() * 2); + } + } + ASSERT_TRUE(decompressor->isFinished()); + ASSERT_EQ(remaining, 0); + decompressed.resize(decompressedSize); +} + +// Check the streaming compressor against one-shot decompression. +void checkStreamingCompressor(Codec* codec, const std::vector& data) { + // Run streaming compression. + std::vector compressed; + streamingCompress(codec->makeStreamingCompressor(), data, compressed); + + // Check decompressing the compressed data. + std::vector decompressed(data.size()); + ASSERT_NO_THROW(codec->decompress( + compressed.data(), + compressed.size(), + decompressed.data(), + decompressed.size())); + ASSERT_EQ(data, decompressed); +} + +// Check the streaming decompressor against one-shot compression. +void checkStreamingDecompressor( + Codec* codec, + const std::vector& data) { + // Create compressed data. + auto maxCompressedLen = codec->maxCompressedLength(data.size()); + std::vector compressed(maxCompressedLen); + auto compressedSize = codec->compress( + data.data(), data.size(), compressed.data(), maxCompressedLen); + compressed.resize(compressedSize); + + // Run streaming decompression. + std::vector decompressed; + streamingDecompress( + codec->makeStreamingDecompressor(), compressed, decompressed); + + // Check the decompressed data. + ASSERT_EQ(data.size(), decompressed.size()); + ASSERT_EQ(data, decompressed); +} + +// Check the streaming compressor and decompressor together. +void checkStreamingRoundtrip( + const std::shared_ptr& compressor, + const std::shared_ptr& decompressor, + const std::vector& data) { + std::vector compressed; + streamingCompress(compressor, data, compressed); + std::vector decompressed; + streamingDecompress(decompressor, compressed, decompressed); + ASSERT_EQ(data, decompressed); +} + +void checkStreamingRoundtrip(Codec* codec, const std::vector& data) { + checkStreamingRoundtrip( + codec->makeStreamingCompressor(), + codec->makeStreamingDecompressor(), + data); +} +} // namespace + class CompressionTest : public testing::Test {}; TEST_F(CompressionTest, testCompressionNames) { @@ -60,4 +320,156 @@ TEST_F(CompressionTest, stringToCompressionKind) { VELOX_ASSERT_THROW( stringToCompressionKind("bz2"), "Not support compression kind bz2"); } + +class CodecTest : public ::testing::TestWithParam { + protected: + static CompressionKind getCompressionKind() { + return GetParam().compressionKind; + } + + static const CodecOptions& getCodecOptions() { + return *GetParam().codecOptions; + } + + static std::unique_ptr makeCodec() { + return Codec::create(getCompressionKind(), getCodecOptions()); + } +}; + +TEST_P(CodecTest, specifyCompressionLevel) { + std::vector data = makeRandomData(2000); + const auto kind = getCompressionKind(); + if (!Codec::isAvailable(kind)) { + // Support for this codec hasn't been built. + VELOX_ASSERT_THROW( + Codec::create(kind, kUseDefaultCompressionLevel), + "Support for codec '" + compressionKindToString(kind) + + "' not implemented."); + return; + } + auto compressionLevels = { + Codec::defaultCompressionLevel(kind), + Codec::minimumCompressionLevel(kind), + Codec::maximumCompressionLevel(kind)}; + for (auto compressionLevel : compressionLevels) { + auto codec = Codec::create(kind, compressionLevel); + checkCodecRoundtrip(codec, data); + } +} + +TEST_P(CodecTest, getUncompressedLength) { + auto codec = makeCodec(); + auto inputLength = 100; + auto input = makeRandomData(inputLength); + std::vector compressed(codec->maxCompressedLength(input.size())); + auto compressedLength = codec->compress( + input.data(), inputLength, compressed.data(), compressed.size()); + compressed.resize(compressedLength); + + if (Codec::supportsGetUncompressedLength(getCompressionKind())) { + ASSERT_EQ( + codec->getUncompressedLength(compressedLength, compressed.data()), + inputLength); + } else { + ASSERT_EQ( + codec->getUncompressedLength(compressedLength, compressed.data()), + std::nullopt); + } +} + +TEST_P(CodecTest, codecRoundtrip) { + auto codec = makeCodec(); + for (int dataSize : {0, 10, 10000, 100000}) { + checkCodecRoundtrip(codec, makeRandomData(dataSize)); + checkCodecRoundtrip(codec, makeCompressibleData(dataSize)); + } +} + +TEST_P(CodecTest, streamingCompressor) { + if (!Codec::supportsStreamingCompression(getCompressionKind())) { + return; + } + + for (auto dataSize : {0, 10, 10000, 100000}) { + auto codec = makeCodec(); + checkStreamingCompressor(codec.get(), makeRandomData(dataSize)); + checkStreamingCompressor(codec.get(), makeCompressibleData(dataSize)); + } +} + +TEST_P(CodecTest, streamingDecompressor) { + if (!Codec::supportsStreamingCompression(getCompressionKind())) { + return; + } + + for (auto dataSize : {0, 10, 10000, 100000}) { + auto codec = makeCodec(); + checkStreamingDecompressor(codec.get(), makeRandomData(dataSize)); + checkStreamingDecompressor(codec.get(), makeCompressibleData(dataSize)); + } +} + +TEST_P(CodecTest, streamingRoundtrip) { + if (!Codec::supportsStreamingCompression(getCompressionKind())) { + return; + } + + for (auto dataSize : {0, 10, 10000, 100000}) { + auto codec = makeCodec(); + checkStreamingRoundtrip(codec.get(), makeRandomData(dataSize)); + checkStreamingRoundtrip(codec.get(), makeCompressibleData(dataSize)); + } +} + +TEST_P(CodecTest, streamingDecompressorReuse) { + if (!Codec::supportsStreamingCompression(getCompressionKind())) { + return; + } + + auto codec = makeCodec(); + auto decompressor = codec->makeStreamingDecompressor(); + checkStreamingRoundtrip( + codec->makeStreamingCompressor(), decompressor, makeRandomData(100)); + + // StreamingDecompressor::reset() should allow reusing decompressor for a + // new stream. + decompressor->reset(); + checkStreamingRoundtrip( + codec->makeStreamingCompressor(), decompressor, makeRandomData(200)); +} + +INSTANTIATE_TEST_SUITE_P( + TestLz4Frame, + CodecTest, + ::testing::Values(TestParams{ + CompressionKind::CompressionKind_LZ4, + std::make_shared(Lz4CodecOptions::kLz4Frame)})); + +INSTANTIATE_TEST_SUITE_P( + TestLz4Raw, + CodecTest, + ::testing::Values(TestParams{ + CompressionKind::CompressionKind_LZ4, + std::make_shared(Lz4CodecOptions::kLz4Raw)})); + +INSTANTIATE_TEST_SUITE_P( + TestLz4Hadoop, + CodecTest, + ::testing::Values(TestParams{ + CompressionKind::CompressionKind_LZ4, + std::make_shared(Lz4CodecOptions::kLz4Hadoop)})); + +TEST(CodecLZ4HadoopTest, compatibility) { + // LZ4 Hadoop codec should be able to read back LZ4 raw blocks. + auto c1 = Codec::create( + CompressionKind::CompressionKind_LZ4, + Lz4CodecOptions{Lz4CodecOptions::kLz4Raw}); + auto c2 = Codec::create( + CompressionKind::CompressionKind_LZ4, + Lz4CodecOptions{Lz4CodecOptions::kLz4Hadoop}); + + for (auto dataSize : {0, 10, 10000, 100000}) { + checkCodecRoundtrip(c1, c2, makeRandomData(dataSize)); + } +} } // namespace facebook::velox::common diff --git a/velox/common/compression/v2/CMakeLists.txt b/velox/common/compression/v2/CMakeLists.txt deleted file mode 100644 index 9354b35f294a1..0000000000000 --- a/velox/common/compression/v2/CMakeLists.txt +++ /dev/null @@ -1,29 +0,0 @@ -# Copyright (c) Facebook, Inc. and its affiliates. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -if(${VELOX_BUILD_TESTING}) - add_subdirectory(tests) -endif() - -add_library(velox_common_compression_v2 - Compression.cpp HadoopCompressionFormat.cpp Lz4Compression.cpp) - -target_link_libraries( - velox_common_compression_v2 - velox_common_base - Folly::folly - Snappy::snappy - zstd::zstd - ZLIB::ZLIB - lz4::lz4) diff --git a/velox/common/compression/v2/Compression.cpp b/velox/common/compression/v2/Compression.cpp deleted file mode 100644 index 357dad8e5f9f0..0000000000000 --- a/velox/common/compression/v2/Compression.cpp +++ /dev/null @@ -1,175 +0,0 @@ -/* - * Copyright (c) Facebook, Inc. and its affiliates. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -// Derived from Apache Arrow. -#include -#include - -#include "velox/common/base/Exceptions.h" -#include "velox/common/compression/v2/Compression.h" -#include "velox/common/compression/v2/Lz4Compression.h" - -namespace facebook::velox::common { - -namespace { -void checkSupportsCompressionLevel(CompressionKind kind) { - VELOX_USER_CHECK( - Codec::supportsCompressionLevel(kind), - "Codec '" + compressionKindToString(kind) + - "' doesn't support setting a compression level."); -} -} // namespace - -int32_t Codec::useDefaultCompressionLevel() { - return kUseDefaultCompressionLevel; -} - -void Codec::init() {} - -bool Codec::supportsGetUncompressedLength(CompressionKind kind) { - switch (kind) { - default: - return false; - } -} - -bool Codec::supportsCompressionLevel(CompressionKind kind) { - switch (kind) { - case CompressionKind::CompressionKind_LZ4: - return true; - default: - return false; - } -} - -bool Codec::supportsStreamingCompression(CompressionKind kind) { - switch (kind) { - case CompressionKind::CompressionKind_LZ4: - return true; - default: - return false; - } -} - -bool Codec::supportsCompressFixedLength(CompressionKind kind) { - switch (kind) { - default: - return false; - } -} - -int32_t Codec::maximumCompressionLevel(CompressionKind kind) { - checkSupportsCompressionLevel(kind); - auto codec = Codec::create(kind); - return codec->maximumCompressionLevel(); -} - -int32_t Codec::minimumCompressionLevel(CompressionKind kind) { - checkSupportsCompressionLevel(kind); - auto codec = Codec::create(kind); - return codec->minimumCompressionLevel(); -} - -int32_t Codec::defaultCompressionLevel(CompressionKind kind) { - checkSupportsCompressionLevel(kind); - auto codec = Codec::create(kind); - return codec->defaultCompressionLevel(); -} - -std::unique_ptr Codec::create( - CompressionKind kind, - const CodecOptions& codecOptions) { - if (!isAvailable(kind)) { - auto name = compressionKindToString(kind); - if (folly::StringPiece({name}).startsWith("unknown")) { - VELOX_UNSUPPORTED("Unrecognized codec '{}'", name); - } - VELOX_UNSUPPORTED("Support for codec '{}' not implemented.", name); - } - - auto compressionLevel = codecOptions.compressionLevel; - if (compressionLevel != kUseDefaultCompressionLevel) { - checkSupportsCompressionLevel(kind); - } - - std::unique_ptr codec; - switch (kind) { - case CompressionKind::CompressionKind_LZ4: - if (auto options = dynamic_cast(&codecOptions)) { - switch (options->type) { - case Lz4CodecOptions::kLz4Frame: - codec = makeLz4FrameCodec(compressionLevel); - break; - case Lz4CodecOptions::kLz4Raw: - codec = makeLz4RawCodec(compressionLevel); - break; - case Lz4CodecOptions::kLz4Hadoop: - codec = makeLz4HadoopRawCodec(compressionLevel); - break; - } - } - // By default, create LZ4 Frame codec. - codec = makeLz4FrameCodec(compressionLevel); - break; - default: - break; - } - - if (codec == nullptr) { - VELOX_UNSUPPORTED( - "{} codec not implemented", compressionKindToString(kind)); - } - - codec->init(); - - return codec; -} - -std::unique_ptr Codec::create( - CompressionKind kind, - int32_t compressionLevel) { - return create(kind, CodecOptions{compressionLevel}); -} - -bool Codec::isAvailable(CompressionKind kind) { - switch (kind) { - case CompressionKind::CompressionKind_NONE: - case CompressionKind::CompressionKind_LZ4: - return true; - case CompressionKind::CompressionKind_SNAPPY: - case CompressionKind::CompressionKind_GZIP: - case CompressionKind::CompressionKind_ZLIB: - case CompressionKind::CompressionKind_ZSTD: - case CompressionKind::CompressionKind_LZO: - default: - return false; - } -} - -std::optional Codec::getUncompressedLength( - uint64_t inputLength, - const uint8_t* input) const { - return std::nullopt; -} - -uint64_t Codec::compressFixedLength( - const uint8_t* input, - uint64_t inputLength, - uint8_t* output, - uint64_t outputLength) { - VELOX_UNSUPPORTED("'{}' doesn't support partial compression", name()); -} -} // namespace facebook::velox::common diff --git a/velox/common/compression/v2/Compression.h b/velox/common/compression/v2/Compression.h deleted file mode 100644 index da8c4ddd91ca1..0000000000000 --- a/velox/common/compression/v2/Compression.h +++ /dev/null @@ -1,240 +0,0 @@ -/* - * Copyright (c) Facebook, Inc. and its affiliates. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -// Derived from Apache Arrow. - -#pragma once - -#include -#include -#include -#include -#include -#include "velox/common/compression/Compression.h" - -namespace facebook::velox::common { - -static constexpr int32_t kUseDefaultCompressionLevel = - std::numeric_limits::min(); - -class StreamingCompressor { - public: - virtual ~StreamingCompressor() = default; - - struct CompressResult { - uint64_t bytesRead; - uint64_t bytesWritten; - bool outputTooSmall; - }; - - struct FlushResult { - uint64_t bytesWritten; - bool outputTooSmall; - }; - - struct EndResult { - uint64_t bytesWritten; - bool outputTooSmall; - }; - - /// Compress some input. - /// If CompressResult.outputTooSmall is true on return, then a larger output - /// buffer should be supplied. - virtual CompressResult compress( - const uint8_t* input, - uint64_t inputLength, - uint8_t* output, - uint64_t outputLength) = 0; - - /// Flush part of the compressed output. - /// If FlushResult.outputTooSmall is true on return, flush() should be called - /// again with a larger buffer. - virtual FlushResult flush(uint8_t* output, uint64_t outputLength) = 0; - - /// End compressing, doing whatever is necessary to end the stream. - /// If EndResult.outputTooSmall is true on return, end() should be called - /// again with a larger buffer. Otherwise, the StreamingCompressor should not - /// be used anymore. end() will flush the compressed output. - virtual EndResult end(uint8_t* output, uint64_t outputLength) = 0; -}; - -class StreamingDecompressor { - public: - virtual ~StreamingDecompressor() = default; - - struct DecompressResult { - uint64_t bytesRead; - uint64_t bytesWritten; - bool outputTooSmall; - }; - - /// Decompress some input. - /// If outputTooSmall is true on return, a larger output buffer needs - /// to be supplied. - virtual DecompressResult decompress( - const uint8_t* input, - uint64_t inputLength, - uint8_t* output, - uint64_t outputLength) = 0; - - /// Return whether the compressed stream is finished. - virtual bool isFinished() = 0; - - /// Reinitialize decompressor, making it ready for a new compressed stream. - virtual void reset() = 0; -}; - -class CodecOptions { - public: - explicit CodecOptions(int32_t compressionLevel = kUseDefaultCompressionLevel) - : compressionLevel(compressionLevel) {} - - virtual ~CodecOptions() = default; - - int32_t compressionLevel; -}; - -class Codec { - public: - virtual ~Codec() = default; - - /// Return special value to indicate that a codec implementation - /// should use its default compression level. - static int32_t useDefaultCompressionLevel(); - - /// Create a kind for the given compression algorithm with CodecOptions. - static std::unique_ptr create( - CompressionKind kind, - const CodecOptions& codecOptions = CodecOptions{}); - - /// Create a kind for the given compression algorithm. - static std::unique_ptr create( - CompressionKind kind, - int32_t compressionLevel); - - /// Return true if support for indicated kind has been enabled. - static bool isAvailable(CompressionKind kind); - - /// Return true if indicated kind supports extracting uncompressed length - /// from compressed data. - static bool supportsGetUncompressedLength(CompressionKind kind); - - /// Return true if indicated kind supports setting a compression level. - static bool supportsCompressionLevel(CompressionKind kind); - - /// Return true if indicated kind supports creating streaming de/compressor. - static bool supportsStreamingCompression(CompressionKind kind); - - /// Return true if indicated kind supports one-shot compression with fixed - /// compressed length. - static bool supportsCompressFixedLength(CompressionKind kind); - - /// Return the smallest supported compression level for the kind - /// Note: This function creates a temporary Codec instance. - static int32_t minimumCompressionLevel(CompressionKind kind); - - /// Return the largest supported compression level for the kind - /// Note: This function creates a temporary Codec instance. - static int32_t maximumCompressionLevel(CompressionKind kind); - - /// Return the default compression level. - /// Note: This function creates a temporary Codec instance. - static int32_t defaultCompressionLevel(CompressionKind kind); - - /// Return the smallest supported compression level. - virtual int32_t minimumCompressionLevel() const = 0; - - /// Return the largest supported compression level. - virtual int32_t maximumCompressionLevel() const = 0; - - /// Return the default compression level. - virtual int32_t defaultCompressionLevel() const = 0; - - /// One-shot decompression function. - /// `outputLength` must be correct and therefore be obtained in advance. - /// The actual decompressed length is returned. - /// Note: One-shot decompression is not always compatible with streaming - /// compression. Depending on the codec (e.g. LZ4), different formats may - /// be used. - virtual uint64_t decompress( - const uint8_t* input, - uint64_t inputLength, - uint8_t* output, - uint64_t outputLength) = 0; - - /// Performs one-shot compression. - /// `outputLength` must first have been computed using maxCompressedLength(). - /// The actual compressed length is returned. - /// Note: One-shot compression is not always compatible with streaming - /// decompression. Depending on the codec (e.g. LZ4), different formats may - /// be used. - virtual uint64_t compress( - const uint8_t* input, - uint64_t inputLength, - uint8_t* output, - uint64_t outputLength) = 0; - - /// Performs one-shot compression. - /// This function compresses data and writes the output up to the specified - /// outputLength. If outputLength is too small to hold all the compressed - /// data, the function doesn't fail. Instead, it returns the number of bytes - /// actually written to the output buffer. Any remaining data that couldn't - /// be written in this call will be written in subsequent calls to this - /// function. This is useful when fixed-length compression blocks are required - /// by the caller. - /// Note: Only Gzip and Zstd codec supports this function. - virtual uint64_t compressFixedLength( - const uint8_t* input, - uint64_t inputLength, - uint8_t* output, - uint64_t outputLength); - - /// Maximum compressed length of given input length. - virtual uint64_t maxCompressedLength(uint64_t inputLength) = 0; - - /// Retrieves the actual uncompressed length of data using the specified - /// compression library. - /// Note: This functionality is not universally supported by all compression - /// libraries. If not supported, `std::nullopt` will be returned. - std::optional getUncompressedLength( - uint64_t inputLength, - const uint8_t* input) const; - - /// Create a streaming compressor instance. - virtual std::shared_ptr makeStreamingCompressor() = 0; - - /// Create a streaming compressor instance. - virtual std::shared_ptr - makeStreamingDecompressor() = 0; - - /// This Codec's compression type. - virtual CompressionKind compressionKind() const = 0; - - /// The name of this Codec's compression type. - std::string name() const { - return compressionKindToString(compressionKind()); - } - - /// This Codec's compression level, if applicable. - virtual int32_t compressionLevel() const { - return kUseDefaultCompressionLevel; - } - - private: - /// Initializes the codec's resources. - virtual void init(); -}; -} // namespace facebook::velox::common diff --git a/velox/common/compression/v2/tests/CMakeLists.txt b/velox/common/compression/v2/tests/CMakeLists.txt deleted file mode 100644 index 03b61f7807bd0..0000000000000 --- a/velox/common/compression/v2/tests/CMakeLists.txt +++ /dev/null @@ -1,19 +0,0 @@ -# Copyright (c) Facebook, Inc. and its affiliates. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -add_executable(velox_common_compression_v2_test CompressionTest.cpp) -add_test(velox_common_compression_v2_test velox_common_compression_v2_test) -target_link_libraries( - velox_common_compression_v2_test velox_link_libs velox_common_compression_v2 - velox_exception gtest gtest_main) diff --git a/velox/common/compression/v2/tests/CompressionTest.cpp b/velox/common/compression/v2/tests/CompressionTest.cpp deleted file mode 100644 index c8d135f8c0f9a..0000000000000 --- a/velox/common/compression/v2/tests/CompressionTest.cpp +++ /dev/null @@ -1,472 +0,0 @@ -/* - * Copyright (c) Facebook, Inc. and its affiliates. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include -#include -#include -#include -#include -#include - -#include "velox/common/base/VeloxException.h" -#include "velox/common/base/tests/GTestUtils.h" -#include "velox/common/compression/v2/Compression.h" -#include "velox/common/compression/v2/Lz4Compression.h" - -namespace facebook::velox::common { - -namespace { - -const std::shared_ptr kDefaultCodecOptions = - std::make_shared(); - -struct TestParams { - CompressionKind compressionKind; - std::shared_ptr codecOptions; - - explicit TestParams( - common::CompressionKind compressionKind, - std::shared_ptr codecOptions = kDefaultCodecOptions) - : compressionKind(compressionKind), - codecOptions(std::move(codecOptions)) {} -}; - -std::vector makeRandomData(size_t n) { - std::vector data(n); - std::default_random_engine engine(42); - std::uniform_int_distribution dist(0, 255); - std::generate(data.begin(), data.end(), [&]() { return dist(engine); }); - return data; -} - -std::vector makeCompressibleData(size_t size) { - std::string baseData = "The quick brown fox jumps over the lazy dog"; - auto repeats = static_cast(1 + size / baseData.size()); - - std::vector data(baseData.size() * repeats); - for (int i = 0; i < repeats; ++i) { - std::memcpy( - data.data() + i * baseData.size(), baseData.data(), baseData.size()); - } - data.resize(size); - return data; -} - -std::function makeRandomInputSize() { - std::default_random_engine engine(42); - std::uniform_int_distribution sizeDistribution(10, 40); - return [=]() mutable -> uint64_t { return sizeDistribution(engine); }; -} - -// Check roundtrip of one-shot compression and decompression functions. -void checkCodecRoundtrip( - Codec* c1, - Codec* c2, - const std::vector& data) { - auto maxCompressedLen = - static_cast(c1->maxCompressedLength(data.size())); - std::vector compressed(maxCompressedLen); - std::vector decompressed(data.size()); - - // Compress with codec c1. - auto compressedSize = c1->compress( - data.data(), data.size(), compressed.data(), maxCompressedLen); - compressed.resize(compressedSize); - - // Decompress with codec c2. - auto decompressedSize = c2->decompress( - compressed.data(), - compressed.size(), - decompressed.data(), - decompressed.size()); - - ASSERT_EQ(data, decompressed); - ASSERT_EQ(data.size(), decompressedSize); -} - -// Use same codec for both compression and decompression. -void checkCodecRoundtrip( - const std::unique_ptr& codec, - const std::vector& data) { - checkCodecRoundtrip(codec.get(), codec.get(), data); -} - -// Compress with codec c1 and decompress with codec c2. -void checkCodecRoundtrip( - const std::unique_ptr& c1, - const std::unique_ptr& c2, - const std::vector& data) { - checkCodecRoundtrip(c1.get(), c2.get(), data); -} - -void streamingCompress( - const std::shared_ptr& compressor, - const std::vector& uncompressed, - std::vector& compressed) { - const uint8_t* input = uncompressed.data(); - uint64_t remaining = uncompressed.size(); - uint64_t compressedSize = 0; - compressed.resize(10); - bool doFlush = false; - - // Generate small random input buffer size. - auto randomInputSize = makeRandomInputSize(); - - // Continue decompressing until consuming all compressed data . - while (remaining > 0) { - // Feed a small amount each time. - auto inputLength = std::min(remaining, randomInputSize()); - auto outputLength = compressed.size() - compressedSize; - uint8_t* output = compressed.data() + compressedSize; - - // Compress once. - auto compressResult = - compressor->compress(input, inputLength, output, outputLength); - ASSERT_LE(compressResult.bytesRead, inputLength); - ASSERT_LE(compressResult.bytesWritten, outputLength); - - // Update result. - compressedSize += compressResult.bytesWritten; - input += compressResult.bytesRead; - remaining -= compressResult.bytesRead; - - // Grow compressed buffer if it's too small. - if (compressResult.outputTooSmall) { - compressed.resize(compressed.capacity() * 2); - } - - // Once every two iterations, do a flush. - if (doFlush) { - StreamingCompressor::FlushResult flushResult; - do { - outputLength = compressed.size() - compressedSize; - output = compressed.data() + compressedSize; - flushResult = compressor->flush(output, outputLength); - ASSERT_LE(flushResult.bytesWritten, outputLength); - compressedSize += flushResult.bytesWritten; - if (flushResult.outputTooSmall) { - compressed.resize(compressed.capacity() * 2); - } - } while (flushResult.outputTooSmall); - } - doFlush = !doFlush; - } - - // End the compressed stream. - StreamingCompressor::EndResult endResult; - do { - int64_t outputLength = compressed.size() - compressedSize; - uint8_t* output = compressed.data() + compressedSize; - endResult = compressor->end(output, outputLength); - ASSERT_LE(endResult.bytesWritten, outputLength); - compressedSize += endResult.bytesWritten; - if (endResult.outputTooSmall) { - compressed.resize(compressed.capacity() * 2); - } - } while (endResult.outputTooSmall); - compressed.resize(compressedSize); -} - -void streamingDecompress( - const std::shared_ptr& decompressor, - const std::vector& compressed, - std::vector& decompressed) { - const uint8_t* input = compressed.data(); - uint64_t remaining = compressed.size(); - uint64_t decompressedSize = 0; - decompressed.resize(10); - - // Generate small random input buffer size. - auto ramdomInputSize = makeRandomInputSize(); - - // Continue decompressing until finishes. - while (!decompressor->isFinished()) { - // Feed a small amount each time. - auto inputLength = std::min(remaining, ramdomInputSize()); - auto outputLength = decompressed.size() - decompressedSize; - uint8_t* output = decompressed.data() + decompressedSize; - - // Decompress once. - auto result = - decompressor->decompress(input, inputLength, output, outputLength); - ASSERT_LE(result.bytesRead, inputLength); - ASSERT_LE(result.bytesWritten, outputLength); - ASSERT_TRUE( - result.outputTooSmall || result.bytesWritten > 0 || - result.bytesRead > 0) - << "Decompression not progressing anymore"; - - // Update result. - decompressedSize += result.bytesWritten; - input += result.bytesRead; - remaining -= result.bytesRead; - - // Grow decompressed buffer if it's too small. - if (result.outputTooSmall) { - decompressed.resize(decompressed.capacity() * 2); - } - } - ASSERT_TRUE(decompressor->isFinished()); - ASSERT_EQ(remaining, 0); - decompressed.resize(decompressedSize); -} - -// Check the streaming compressor against one-shot decompression. -void checkStreamingCompressor(Codec* codec, const std::vector& data) { - // Run streaming compression. - std::vector compressed; - streamingCompress(codec->makeStreamingCompressor(), data, compressed); - - // Check decompressing the compressed data. - std::vector decompressed(data.size()); - ASSERT_NO_THROW(codec->decompress( - compressed.data(), - compressed.size(), - decompressed.data(), - decompressed.size())); - ASSERT_EQ(data, decompressed); -} - -// Check the streaming decompressor against one-shot compression. -void checkStreamingDecompressor( - Codec* codec, - const std::vector& data) { - // Create compressed data. - auto maxCompressedLen = codec->maxCompressedLength(data.size()); - std::vector compressed(maxCompressedLen); - auto compressedSize = codec->compress( - data.data(), data.size(), compressed.data(), maxCompressedLen); - compressed.resize(compressedSize); - - // Run streaming decompression. - std::vector decompressed; - streamingDecompress( - codec->makeStreamingDecompressor(), compressed, decompressed); - - // Check the decompressed data. - ASSERT_EQ(data.size(), decompressed.size()); - ASSERT_EQ(data, decompressed); -} - -// Check the streaming compressor and decompressor together. -void checkStreamingRoundtrip( - const std::shared_ptr& compressor, - const std::shared_ptr& decompressor, - const std::vector& data) { - std::vector compressed; - streamingCompress(compressor, data, compressed); - std::vector decompressed; - streamingDecompress(decompressor, compressed, decompressed); - ASSERT_EQ(data, decompressed); -} - -void checkStreamingRoundtrip(Codec* codec, const std::vector& data) { - checkStreamingRoundtrip( - codec->makeStreamingCompressor(), - codec->makeStreamingDecompressor(), - data); -} -} // namespace - -class CodecTest : public ::testing::TestWithParam { - protected: - static CompressionKind getCompressionKind() { - return GetParam().compressionKind; - } - - static const CodecOptions& getCodecOptions() { - return *GetParam().codecOptions; - } - - static std::unique_ptr makeCodec() { - return Codec::create(getCompressionKind(), getCodecOptions()); - } -}; - -TEST_P(CodecTest, specifyCompressionLevel) { - std::vector data = makeRandomData(2000); - const auto kind = getCompressionKind(); - if (!Codec::isAvailable(kind)) { - // Support for this codec hasn't been built. - VELOX_ASSERT_THROW( - Codec::create(kind, Codec::useDefaultCompressionLevel()), - "Support for codec '" + compressionKindToString(kind) + - "' not implemented."); - } else if (!Codec::supportsCompressionLevel(kind)) { - VELOX_ASSERT_THROW( - Codec::create(kind, 1), - fmt::format( - "Codec '{}' doesn't support setting a compression level.", - compressionKindToString(kind))); - } else { - auto codec = Codec::create(kind, Codec::minimumCompressionLevel(kind)); - checkCodecRoundtrip(codec, data); - } -} - -TEST_P(CodecTest, getUncompressedLength) { - auto codec = makeCodec(); - auto inputLength = 100; - auto input = makeRandomData(inputLength); - std::vector compressed(codec->maxCompressedLength(input.size())); - auto compressedLength = codec->compress( - input.data(), inputLength, compressed.data(), compressed.size()); - compressed.resize(compressedLength); - - if (Codec::supportsGetUncompressedLength(getCompressionKind())) { - ASSERT_EQ( - codec->getUncompressedLength(compressedLength, compressed.data()), - inputLength); - } else { - ASSERT_EQ( - codec->getUncompressedLength(compressedLength, compressed.data()), - std::nullopt); - } -} - -TEST_P(CodecTest, codecRoundtrip) { - auto codec = makeCodec(); - for (int dataSize : {0, 10, 10000, 100000}) { - checkCodecRoundtrip(codec, makeRandomData(dataSize)); - checkCodecRoundtrip(codec, makeCompressibleData(dataSize)); - } -} - -TEST_P(CodecTest, minMaxCompressionLevel) { - auto type = getCompressionKind(); - auto codec = makeCodec(); - auto notSupportCompressionLevel = [](CompressionKind kind) { - return fmt::format( - "Codec '{}' doesn't support setting a compression level.", - compressionKindToString(kind)); - }; - - if (Codec::supportsCompressionLevel(type)) { - auto minLevel = Codec::minimumCompressionLevel(type); - auto maxLevel = Codec::maximumCompressionLevel(type); - auto defaultLevel = Codec::defaultCompressionLevel(type); - ASSERT_NE(minLevel, Codec::useDefaultCompressionLevel()); - ASSERT_NE(maxLevel, Codec::useDefaultCompressionLevel()); - ASSERT_NE(defaultLevel, Codec::useDefaultCompressionLevel()); - ASSERT_LT(minLevel, maxLevel); - ASSERT_EQ(minLevel, codec->minimumCompressionLevel()); - ASSERT_EQ(maxLevel, codec->maximumCompressionLevel()); - ASSERT_GE(defaultLevel, minLevel); - ASSERT_LE(defaultLevel, maxLevel); - } else { - VELOX_ASSERT_THROW( - Codec::minimumCompressionLevel(type), notSupportCompressionLevel(type)); - VELOX_ASSERT_THROW( - Codec::maximumCompressionLevel(type), notSupportCompressionLevel(type)); - VELOX_ASSERT_THROW( - Codec::defaultCompressionLevel(type), notSupportCompressionLevel(type)); - ASSERT_EQ( - codec->minimumCompressionLevel(), Codec::useDefaultCompressionLevel()); - ASSERT_EQ( - codec->maximumCompressionLevel(), Codec::useDefaultCompressionLevel()); - ASSERT_EQ( - codec->defaultCompressionLevel(), Codec::useDefaultCompressionLevel()); - } -} - -TEST_P(CodecTest, streamingCompressor) { - if (!Codec::supportsStreamingCompression(getCompressionKind())) { - return; - } - - for (auto dataSize : {0, 10, 10000, 100000}) { - auto codec = makeCodec(); - checkStreamingCompressor(codec.get(), makeRandomData(dataSize)); - checkStreamingCompressor(codec.get(), makeCompressibleData(dataSize)); - } -} - -TEST_P(CodecTest, streamingDecompressor) { - if (!Codec::supportsStreamingCompression(getCompressionKind())) { - return; - } - - for (auto dataSize : {0, 10, 10000, 100000}) { - auto codec = makeCodec(); - checkStreamingDecompressor(codec.get(), makeRandomData(dataSize)); - checkStreamingDecompressor(codec.get(), makeCompressibleData(dataSize)); - } -} - -TEST_P(CodecTest, streamingRoundtrip) { - if (!Codec::supportsStreamingCompression(getCompressionKind())) { - return; - } - - for (auto dataSize : {0, 10, 10000, 100000}) { - auto codec = makeCodec(); - checkStreamingRoundtrip(codec.get(), makeRandomData(dataSize)); - checkStreamingRoundtrip(codec.get(), makeCompressibleData(dataSize)); - } -} - -TEST_P(CodecTest, streamingDecompressorReuse) { - if (!Codec::supportsStreamingCompression(getCompressionKind())) { - return; - } - - auto codec = makeCodec(); - auto decompressor = codec->makeStreamingDecompressor(); - checkStreamingRoundtrip( - codec->makeStreamingCompressor(), decompressor, makeRandomData(100)); - - // StreamingDecompressor::reset() should allow reusing decompressor for a new - // stream. - decompressor->reset(); - checkStreamingRoundtrip( - codec->makeStreamingCompressor(), decompressor, makeRandomData(200)); -} - -INSTANTIATE_TEST_SUITE_P( - TestLz4Frame, - CodecTest, - ::testing::Values(TestParams{ - CompressionKind::CompressionKind_LZ4, - std::make_shared(Lz4CodecOptions::kLz4Frame)})); - -INSTANTIATE_TEST_SUITE_P( - TestLz4Raw, - CodecTest, - ::testing::Values(TestParams{ - CompressionKind::CompressionKind_LZ4, - std::make_shared(Lz4CodecOptions::kLz4Raw)})); - -INSTANTIATE_TEST_SUITE_P( - TestLz4Hadoop, - CodecTest, - ::testing::Values(TestParams{ - CompressionKind::CompressionKind_LZ4, - std::make_shared(Lz4CodecOptions::kLz4Hadoop)})); - -TEST(CodecLZ4HadoopTest, compatibility) { - // LZ4 Hadoop codec should be able to read back LZ4 raw blocks. - auto c1 = Codec::create( - CompressionKind::CompressionKind_LZ4, - Lz4CodecOptions{Lz4CodecOptions::kLz4Raw}); - auto c2 = Codec::create( - CompressionKind::CompressionKind_LZ4, - Lz4CodecOptions{Lz4CodecOptions::kLz4Hadoop}); - - for (auto dataSize : {0, 10, 10000, 100000}) { - checkCodecRoundtrip(c1, c2, makeRandomData(dataSize)); - } -} -} // namespace facebook::velox::common