diff --git a/CMake/FindQAT.cmake b/CMake/FindQAT.cmake new file mode 100644 index 000000000000..ab4845b01c3c --- /dev/null +++ b/CMake/FindQAT.cmake @@ -0,0 +1,97 @@ +# Copyright (c) Facebook, Inc. and its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +include_guard(GLOBAL) + +include(ExternalProject) + +macro(build_qatzip) + message(STATUS "Building QATzip from source") + set(QATZIP_BUILD_VERSION "v1.1.2") + set(QATZIP_BUILD_SHA256_CHECKSUM + "31419fa4b42d217b3e55a70a34545582cbf401a4f4d44738d21b4a3944b1e1ef") + set(QATZIP_SOURCE_URL + "https://github.com/intel/QATzip/archive/refs/tags/${QATZIP_BUILD_VERSION}.tar.gz" + ) + set(QATZIP_LIB_NAME "qatzip") + + set(QATZIP_PREFIX "${CMAKE_CURRENT_BINARY_DIR}/qatzip_ep-install") + set(QATZIP_SOURCE_DIR "${QATZIP_PREFIX}/src/qatzip_ep") + set(QATZIP_INCLUDE_DIR "${QATZIP_SOURCE_DIR}/include") + set(QATZIP_STATIC_LIB_NAME + "${CMAKE_STATIC_LIBRARY_PREFIX}${QATZIP_LIB_NAME}${CMAKE_STATIC_LIBRARY_SUFFIX}" + ) + set(QATZIP_STATIC_LIB_TARGETS + "${QATZIP_SOURCE_DIR}/src/.libs/${QATZIP_STATIC_LIB_NAME}") + set(QATZIP_CONFIGURE_ARGS "--prefix=${QATZIP_PREFIX}" "--with-pic" + "--with-ICP_ROOT=${ICP_ROOT}") + + ExternalProject_Add( + qatzip_ep + PREFIX ${QATZIP_PREFIX} + URL ${QATZIP_SOURCE_URL} + URL_HASH "SHA256=${QATZIP_BUILD_SHA256_CHECKSUM}" + SOURCE_DIR ${QATZIP_SOURCE_DIR} + CONFIGURE_COMMAND ${CMAKE_COMMAND} -E env QZ_ROOT=${QATZIP_SOURCE_DIR} + ./configure ${QATZIP_CONFIGURE_ARGS} + BUILD_COMMAND ${MAKE_PROGRAM} all + BUILD_BYPRODUCTS ${QATZIP_STATIC_LIB_TARGETS} + BUILD_IN_SOURCE 1) + + ExternalProject_Add_Step( + qatzip_ep pre-configure + COMMAND ./autogen.sh + DEPENDEES download + DEPENDERS configure + WORKING_DIRECTORY ${QATZIP_SOURCE_DIR}) + + # The include directory must exist before it is referenced by a target. + file(MAKE_DIRECTORY "${QATZIP_INCLUDE_DIR}") + + set(QATZIP_LINK_LIBRARIES + ZLIB::ZLIB lz4::lz4 "${UDEV_LIBRARY}" "${USDM_DRV_LIBRARY}" + "${QAT_S_LIBRARY}" Threads::Threads) + + add_library(qatzip::qatzip STATIC IMPORTED) + set_target_properties( + qatzip::qatzip + PROPERTIES IMPORTED_LOCATION "${QATZIP_STATIC_LIB_TARGETS}" + INTERFACE_INCLUDE_DIRECTORIES "${QATZIP_INCLUDE_DIR}" + INTERFACE_LINK_LIBRARIES "${QATZIP_LINK_LIBRARIES}") + + add_dependencies(qatzip::qatzip qatzip_ep) +endmacro() + +set(ICP_ROOT $ENV{ICP_ROOT}) +set(THREADS_PREFER_PTHREAD_FLAG ON) + +find_package(Threads REQUIRED) +find_program(MAKE_PROGRAM make REQUIRED) + +find_library(UDEV_LIBRARY REQUIRED NAMES udev) +find_library( + USDM_DRV_LIBRARY REQUIRED + NAMES usdm_drv_s + PATHS "${ICP_ROOT}/build" + NO_DEFAULT_PATH) +find_library( + QAT_S_LIBRARY REQUIRED + NAMES qat_s + PATHS "${ICP_ROOT}/build" + NO_DEFAULT_PATH) + +message(STATUS "Found udev: ${UDEV_LIBRARY}") +message(STATUS "Found usdm_drv: ${USDM_DRV_LIBRARY}") +message(STATUS "Found qat_s: ${QAT_S_LIBRARY}") + +build_qatzip() diff --git a/CMakeLists.txt b/CMakeLists.txt index 669356dab8a8..e5699fab95e1 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -98,6 +98,7 @@ option(VELOX_ENABLE_PARQUET "Enable Parquet support" OFF) option(VELOX_ENABLE_ARROW "Enable Arrow support" OFF) option(VELOX_ENABLE_REMOTE_FUNCTIONS "Enable remote function support" OFF) option(VELOX_ENABLE_CCACHE "Use ccache if installed." ON) +option(VELOX_ENABLE_QAT "Enable Intel QuickAssist Technology support" OFF) option(VELOX_BUILD_TEST_UTILS "Builds Velox test utilities" OFF) option(VELOX_BUILD_PYTHON_PACKAGE "Builds Velox Python bindings" OFF) @@ -257,6 +258,11 @@ if(VELOX_ENABLE_PARQUET) set(VELOX_ENABLE_ARROW ON) endif() +if(VELOX_ENABLE_QAT) + find_package(QAT REQUIRED) + add_definitions(-DVELOX_ENABLE_QAT) +endif() + if(VELOX_ENABLE_REMOTE_FUNCTIONS) # TODO: Move this to use resolve_dependency(). For some reason, FBThrift # requires clients to explicitly install fizz and wangle. @@ -424,20 +430,18 @@ resolve_dependency(glog) set_source(fmt) resolve_dependency(fmt) -if(NOT ${VELOX_BUILD_MINIMAL}) - find_package(ZLIB REQUIRED) - find_package(lz4 REQUIRED) - find_package(lzo2 REQUIRED) - find_package(zstd REQUIRED) - find_package(Snappy REQUIRED) - if(NOT TARGET zstd::zstd) - if(TARGET zstd::libzstd_static) - set(ZSTD_TYPE static) - else() - set(ZSTD_TYPE shared) - endif() - add_library(zstd::zstd ALIAS zstd::libzstd_${ZSTD_TYPE}) +find_package(ZLIB REQUIRED) +find_package(lz4 REQUIRED) +find_package(lzo2 REQUIRED) +find_package(zstd REQUIRED) +find_package(Snappy REQUIRED) +if(NOT TARGET zstd::zstd) + if(TARGET zstd::libzstd_static) + set(ZSTD_TYPE static) + else() + set(ZSTD_TYPE shared) endif() + add_library(zstd::zstd ALIAS zstd::libzstd_${ZSTD_TYPE}) endif() set_source(re2) diff --git a/velox/CMakeLists.txt b/velox/CMakeLists.txt index ea35aa1ea59c..0b71efba8136 100644 --- a/velox/CMakeLists.txt +++ b/velox/CMakeLists.txt @@ -79,3 +79,7 @@ endif() if(${VELOX_ENABLE_SUBSTRAIT}) add_subdirectory(substrait) endif() + +if(${VELOX_ENABLE_QAT}) + add_subdirectory(common/compression/v2/qat) +endif() diff --git a/velox/common/CMakeLists.txt b/velox/common/CMakeLists.txt index e96b44b9d2c6..adfe61bac562 100644 --- a/velox/common/CMakeLists.txt +++ b/velox/common/CMakeLists.txt @@ -14,6 +14,7 @@ add_subdirectory(base) add_subdirectory(caching) add_subdirectory(compression) +add_subdirectory(compression/v2) add_subdirectory(config) add_subdirectory(encode) add_subdirectory(file) diff --git a/velox/common/compression/Compression.cpp b/velox/common/compression/Compression.cpp index 3843f5902a98..21b966922a0c 100644 --- a/velox/common/compression/Compression.cpp +++ b/velox/common/compression/Compression.cpp @@ -76,6 +76,12 @@ std::string compressionKindToString(CompressionKind kind) { return "lz4"; case CompressionKind_GZIP: return "gzip"; + case CompressionKind_LZ4RAW: + return "lz4_raw"; + case CompressionKind_LZ4HADOOP: + return "lz4_hadoop"; + case CompressionKind_LZOHADOOP: + return "lzo_hadoop"; } return folly::to("unknown - ", kind); } @@ -89,7 +95,10 @@ CompressionKind stringToCompressionKind(const std::string& kind) { {"lzo", CompressionKind_LZO}, {"zstd", CompressionKind_ZSTD}, {"lz4", CompressionKind_LZ4}, - {"gzip", CompressionKind_GZIP}}; + {"gzip", CompressionKind_GZIP}, + {"lz4_raw", CompressionKind_LZ4RAW}, + {"lz4_hadoop", CompressionKind_LZ4HADOOP}, + {"lzo_hadoop", CompressionKind_LZOHADOOP}}; auto iter = stringToCompressionKindMap.find(kind); if (iter != stringToCompressionKindMap.end()) { return iter->second; diff --git a/velox/common/compression/Compression.h b/velox/common/compression/Compression.h index 317f9717a2ae..3e5191e83533 100644 --- a/velox/common/compression/Compression.h +++ b/velox/common/compression/Compression.h @@ -29,6 +29,9 @@ enum CompressionKind { CompressionKind_ZSTD = 4, CompressionKind_LZ4 = 5, CompressionKind_GZIP = 6, + CompressionKind_LZ4RAW = 7, + CompressionKind_LZ4HADOOP = 8, + CompressionKind_LZOHADOOP = 9, CompressionKind_MAX = INT64_MAX }; diff --git a/velox/common/compression/tests/CompressionTest.cpp b/velox/common/compression/tests/CompressionTest.cpp index 0b036c55c512..bad9231f7a7e 100644 --- a/velox/common/compression/tests/CompressionTest.cpp +++ b/velox/common/compression/tests/CompressionTest.cpp @@ -31,6 +31,9 @@ TEST_F(CompressionTest, testCompressionNames) { EXPECT_EQ("lzo", compressionKindToString(CompressionKind_LZO)); EXPECT_EQ("lz4", compressionKindToString(CompressionKind_LZ4)); EXPECT_EQ("zstd", compressionKindToString(CompressionKind_ZSTD)); + EXPECT_EQ("gzip", compressionKindToString(CompressionKind_GZIP)); + EXPECT_EQ("lz4_raw", compressionKindToString(CompressionKind_LZ4RAW)); + EXPECT_EQ("lz4_hadoop", compressionKindToString(CompressionKind_LZ4HADOOP)); EXPECT_EQ( "unknown - 99", compressionKindToString(static_cast(99))); @@ -56,6 +59,8 @@ TEST_F(CompressionTest, stringToCompressionKind) { EXPECT_EQ(stringToCompressionKind("lz4"), CompressionKind_LZ4); EXPECT_EQ(stringToCompressionKind("zstd"), CompressionKind_ZSTD); EXPECT_EQ(stringToCompressionKind("gzip"), CompressionKind_GZIP); + EXPECT_EQ(stringToCompressionKind("lz4_raw"), CompressionKind_LZ4RAW); + EXPECT_EQ(stringToCompressionKind("lz4_hadoop"), CompressionKind_LZ4HADOOP); VELOX_ASSERT_THROW( stringToCompressionKind("bz2"), "Not support compression kind bz2"); } diff --git a/velox/common/compression/v2/CMakeLists.txt b/velox/common/compression/v2/CMakeLists.txt new file mode 100644 index 000000000000..9f342f8791d4 --- /dev/null +++ b/velox/common/compression/v2/CMakeLists.txt @@ -0,0 +1,37 @@ +# Copyright (c) Facebook, Inc. and its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +if(${VELOX_BUILD_TESTING}) + add_subdirectory(tests) +endif() + +add_library( + velox_common_compression_v2 + Compression.cpp + GzipCompression.cpp + HadoopCompressionFormat.cpp + Lz4Compression.cpp + LzoCompression.cpp + SnappyCompression.cpp + ZstdCompression.cpp) + +target_link_libraries( + velox_common_compression_v2 + velox_common_compression + velox_common_base + Folly::folly + Snappy::snappy + zstd::zstd + ZLIB::ZLIB + lz4::lz4) diff --git a/velox/common/compression/v2/Compression.cpp b/velox/common/compression/v2/Compression.cpp new file mode 100644 index 000000000000..df39f099640c --- /dev/null +++ b/velox/common/compression/v2/Compression.cpp @@ -0,0 +1,240 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// Adapted from Apache Arrow. + +#include "velox/common/compression/v2/Compression.h" +#include +#include +#include "velox/common/base/Exceptions.h" +#include "velox/common/compression/v2/GzipCompression.h" +#include "velox/common/compression/v2/Lz4Compression.h" +#include "velox/common/compression/v2/LzoCompression.h" +#include "velox/common/compression/v2/SnappyCompression.h" +#include "velox/common/compression/v2/ZstdCompression.h" + +#ifdef VELOX_ENABLE_QAT +#include "velox/common/compression/v2/qat/QatCompression.h" +#endif + +namespace facebook::velox::common { + +namespace { +void checkSupportsCompressionLevel(CompressionKind kind) { + VELOX_USER_CHECK( + Codec::supportsCompressionLevel(kind), + "Codec '" + compressionKindToString(kind) + + "' doesn't support setting a compression level."); +} +} // namespace + +int32_t Codec::useDefaultCompressionLevel() { + return kUseDefaultCompressionLevel; +} + +void Codec::init() {} + +bool Codec::supportsGetUncompressedLength(CompressionKind kind) { + switch (kind) { + case CompressionKind_ZSTD: + case CompressionKind_SNAPPY: + return true; + default: + return false; + } +} + +bool Codec::supportsCompressionLevel(CompressionKind kind) { + switch (kind) { + case CompressionKind::CompressionKind_LZ4: + case CompressionKind::CompressionKind_LZ4RAW: + case CompressionKind::CompressionKind_GZIP: + case CompressionKind::CompressionKind_ZLIB: + case CompressionKind::CompressionKind_ZSTD: + return true; + default: + return false; + } +} + +bool Codec::supportsStreamingCompression(CompressionKind kind) { + switch (kind) { + case CompressionKind::CompressionKind_LZ4: + case CompressionKind::CompressionKind_GZIP: + case CompressionKind::CompressionKind_ZLIB: + case CompressionKind::CompressionKind_ZSTD: + return true; + default: + return false; + } +} + +int32_t Codec::maximumCompressionLevel(CompressionKind kind) { + checkSupportsCompressionLevel(kind); + auto codec = Codec::create(kind); + return codec->maximumCompressionLevel(); +} + +int32_t Codec::minimumCompressionLevel(CompressionKind kind) { + checkSupportsCompressionLevel(kind); + auto codec = Codec::create(kind); + return codec->minimumCompressionLevel(); +} + +int32_t Codec::defaultCompressionLevel(CompressionKind kind) { + checkSupportsCompressionLevel(kind); + auto codec = Codec::create(kind); + return codec->defaultCompressionLevel(); +} + +std::unique_ptr Codec::create( + CompressionKind kind, + const CodecOptions& codecOptions) { + if (!isAvailable(kind)) { + auto name = compressionKindToString(kind); + if (folly::StringPiece({name}).startsWith("unknown")) { + VELOX_UNSUPPORTED("Unrecognized codec '{}'", name); + } + VELOX_UNSUPPORTED("Support for codec '{}' not implemented.", name); + } + + auto compressionLevel = codecOptions.compressionLevel; + if (compressionLevel != kUseDefaultCompressionLevel) { + checkSupportsCompressionLevel(kind); + } + + std::unique_ptr codec; + switch (kind) { + case CompressionKind::CompressionKind_NONE: + return nullptr; + case CompressionKind::CompressionKind_LZ4: + codec = makeLz4FrameCodec(compressionLevel); + break; + case CompressionKind::CompressionKind_LZ4RAW: + codec = makeLz4RawCodec(compressionLevel); + break; + case CompressionKind::CompressionKind_LZ4HADOOP: + codec = makeLz4HadoopRawCodec(); + break; + case CompressionKind::CompressionKind_GZIP: { + if (auto opt = dynamic_cast(&codecOptions)) { + codec = makeGzipCodec(compressionLevel, opt->format, opt->windowBits); + break; + } +#ifdef VELOX_ENABLE_QAT + if (auto opt = + dynamic_cast(&codecOptions)) { + codec = qat::makeQatGzipCodec(compressionLevel, opt->pollingMode); + break; + } +#endif + codec = makeGzipCodec(compressionLevel); + break; + } + case CompressionKind::CompressionKind_ZLIB: { + auto opt = dynamic_cast(&codecOptions); + if (opt) { + codec = makeZlibCodec(compressionLevel, opt->windowBits); + break; + } + codec = makeZlibCodec(compressionLevel); + break; + } + case CompressionKind::CompressionKind_ZSTD: + codec = makeZstdCodec(compressionLevel); + break; + case CompressionKind::CompressionKind_SNAPPY: + codec = makeSnappyCodec(); + break; + case CompressionKind::CompressionKind_LZO: + codec = makeLzoCodec(); + default: + break; + } + + codec->init(); + + return codec; +} + +// use compression level to create Codec +std::unique_ptr Codec::create( + CompressionKind kind, + int32_t compressionLevel) { + return create(kind, CodecOptions{compressionLevel}); +} + +bool Codec::isAvailable(CompressionKind kind) { + switch (kind) { + case CompressionKind::CompressionKind_NONE: + case CompressionKind::CompressionKind_LZ4: + case CompressionKind::CompressionKind_LZ4RAW: + case CompressionKind::CompressionKind_LZ4HADOOP: + case CompressionKind::CompressionKind_GZIP: + case CompressionKind::CompressionKind_ZLIB: + case CompressionKind::CompressionKind_ZSTD: + case CompressionKind::CompressionKind_SNAPPY: + case CompressionKind::CompressionKind_LZO: + return true; + default: + return false; + } +} + +std::optional Codec::getUncompressedLength( + uint64_t inputLength, + const uint8_t* input, + std::optional uncompressedLength) const { + if (inputLength == 0) { + if (uncompressedLength.value_or(0) != 0) { + VELOX_USER_CHECK_EQ( + uncompressedLength.value_or(0), + 0, + "Invalid uncompressed length: {}.", + *uncompressedLength); + } + return 0; + } + auto actualLength = + doGetUncompressedLength(inputLength, input, uncompressedLength); + if (actualLength) { + if (uncompressedLength) { + VELOX_USER_CHECK_EQ( + *actualLength, + *uncompressedLength, + "Invalid uncompressed length: {}.", + *uncompressedLength); + } + return actualLength; + } + return uncompressedLength; +} + +std::optional Codec::doGetUncompressedLength( + uint64_t inputLength, + const uint8_t* input, + std::optional uncompressedLength) const { + return uncompressedLength; +} + +uint64_t Codec::compressPartial( + uint64_t inputLength, + const uint8_t* input, + uint64_t outputLength, + uint8_t* output) { + VELOX_UNSUPPORTED("'{}' doesn't support partial compression", name()); +} +} // namespace facebook::velox::common diff --git a/velox/common/compression/v2/Compression.h b/velox/common/compression/v2/Compression.h new file mode 100644 index 000000000000..d6ef89b9017c --- /dev/null +++ b/velox/common/compression/v2/Compression.h @@ -0,0 +1,246 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// Derived from Apache Arrow. + +#pragma once + +#include +#include +#include +#include +#include +#include "velox/common/compression/Compression.h" + +namespace facebook::velox::common { + +static constexpr int32_t kUseDefaultCompressionLevel = + std::numeric_limits::min(); + +/// Streaming compressor interface. +class Compressor { + public: + virtual ~Compressor() = default; + + struct CompressResult { + uint64_t bytesRead; + uint64_t bytesWritten; + bool outputTooSmall; + }; + struct FlushResult { + uint64_t bytesWritten; + bool outputTooSmall; + }; + struct EndResult { + uint64_t bytesWritten; + bool outputTooSmall; + }; + + /// Compress some input. + /// If bytes_read is 0 on return, then a larger output buffer should be + /// supplied. + virtual CompressResult compress( + uint64_t inputLength, + const uint8_t* input, + uint64_t outputLength, + uint8_t* output) = 0; + + /// Flush part of the compressed output. + /// If outputTooSmall is true on return, flush() should be called again + /// with a larger buffer. + virtual FlushResult flush(uint64_t outputLength, uint8_t* output) = 0; + + /// End compressing, doing whatever is necessary to end the stream. + /// If outputTooSmall is true on return, end() should be called again + /// with a larger buffer. Otherwise, the Compressor should not be used + /// anymore. + /// end() implies flush(). + virtual EndResult end(uint64_t outputLength, uint8_t* output) = 0; +}; + +/// Streaming decompressor interface +class Decompressor { + public: + virtual ~Decompressor() = default; + + struct DecompressResult { + uint64_t bytesRead; + uint64_t bytesWritten; + bool outputTooSmall; + }; + + /// Decompress some input. + /// If outputTooSmall is true on return, a larger output buffer needs + /// to be supplied. + virtual DecompressResult decompress( + uint64_t inputLength, + const uint8_t* input, + uint64_t outputLength, + uint8_t* output) = 0; + + /// Return whether the compressed stream is finished. + virtual bool isFinished() = 0; + + /// Reinitialize decompressor, making it ready for a new compressed stream. + virtual void reset() = 0; +}; + +/// Compression codec options +class CodecOptions { + public: + explicit CodecOptions(int32_t compressionLevel = kUseDefaultCompressionLevel) + : compressionLevel(compressionLevel) {} + + virtual ~CodecOptions() = default; + + int32_t compressionLevel; +}; + +/// Compression codec +class Codec { + public: + virtual ~Codec() = default; + + /// Return special value to indicate that a codec implementation + /// should use its default compression level. + static int32_t useDefaultCompressionLevel(); + + /// Create a kind for the given compression algorithm with CodecOptions. + static std::unique_ptr create( + CompressionKind kind, + const CodecOptions& codecOptions = CodecOptions{}); + + /// Create a kind for the given compression algorithm. + static std::unique_ptr create( + CompressionKind kind, + int32_t compressionLevel); + + /// Return true if support for indicated kind has been enabled. + static bool isAvailable(CompressionKind kind); + + /// Return true if indicated kind supports extracting uncompressed length + /// from compressed data. + static bool supportsGetUncompressedLength(CompressionKind kind); + + /// Return true if indicated kind supports setting a compression level. + static bool supportsCompressionLevel(CompressionKind kind); + + /// Return true if indicated kind supports creating streaming de/compressor. + static bool supportsStreamingCompression(CompressionKind kind); + + /// Return the smallest supported compression level for the kind + /// Note: This function creates a temporary Codec instance. + static int32_t minimumCompressionLevel(CompressionKind kind); + + /// Return the largest supported compression level for the kind + /// Note: This function creates a temporary Codec instance. + static int32_t maximumCompressionLevel(CompressionKind kind); + + /// Return the default compression level. + /// Note: This function creates a temporary Codec instance. + static int32_t defaultCompressionLevel(CompressionKind kind); + + /// Return the smallest supported compression level. + virtual int32_t minimumCompressionLevel() const = 0; + + /// Return the largest supported compression level. + virtual int32_t maximumCompressionLevel() const = 0; + + /// Return the default compression level. + virtual int32_t defaultCompressionLevel() const = 0; + + /// One-shot decompression function. + /// `outputLength` must be correct and therefore be obtained in advance. + /// The actual decompressed length is returned. + /// Note: One-shot decompression is not always compatible with streaming + /// compression. Depending on the codec (e.g. LZ4), different formats may + /// be used. + virtual uint64_t decompress( + uint64_t inputLength, + const uint8_t* input, + uint64_t outputLength, + uint8_t* output) = 0; + + /// Performs one-shot compression. + /// `outputLength` must first have been computed using maxCompressedLength(). + /// The actual compressed length is returned. + /// Note: One-shot compression is not always compatible with streaming + /// decompression. Depending on the codec (e.g. LZ4), different formats may + /// be used. + virtual uint64_t compress( + uint64_t inputLength, + const uint8_t* input, + uint64_t outputLength, + uint8_t* output) = 0; + + /// Performs one-shot compression. + /// This function compresses data and writes the output up to the specified + /// outputLength. If outputLength is too small to hold all the compressed + /// data, the function doesn't fail. Instead, it returns the number of bytes + /// actually written to the output buffer. Any remaining data that couldn't + /// be written in this call will be written in subsequent calls to this + /// function. This is useful when fixed-size compression blocks are required + /// by the caller. + /// Note: Only Gzip and Zstd codec supports this function. + virtual uint64_t compressPartial( + uint64_t inputLength, + const uint8_t* input, + uint64_t outputLength, + uint8_t* output); + + /// Maximum compressed length of given input length. + virtual uint64_t maxCompressedLength(uint64_t inputLength) = 0; + + /// Extracts the uncompressed length from the compressed data if possible. + /// If the codec doesn't store the uncompressed length, or the data is + /// corrupted it returns the given uncompressedLength. + /// If the uncompressed length is stored in the compressed data and + /// uncompressedLength is not none and they do not match a std::runtime_error + /// is thrown. + std::optional getUncompressedLength( + uint64_t inputLength, + const uint8_t* input, + std::optional uncompressedLength = std::nullopt) const; + + /// Create a streaming compressor instance. + virtual std::shared_ptr makeCompressor() = 0; + + /// Create a streaming compressor instance. + virtual std::shared_ptr makeDecompressor() = 0; + + /// This Codec's compression type. + virtual CompressionKind compressionKind() const = 0; + + /// The name of this Codec's compression type. + std::string name() const { + return compressionKindToString(compressionKind()); + } + + /// This Codec's compression level, if applicable. + virtual int32_t compressionLevel() const { + return kUseDefaultCompressionLevel; + } + + private: + /// Initializes the codec's resources. + virtual void init(); + + virtual std::optional doGetUncompressedLength( + uint64_t inputLength, + const uint8_t* input, + std::optional uncompressedLength) const; +}; +} // namespace facebook::velox::common diff --git a/velox/common/compression/v2/GzipCompression.cpp b/velox/common/compression/v2/GzipCompression.cpp new file mode 100644 index 000000000000..1a5b4140db8d --- /dev/null +++ b/velox/common/compression/v2/GzipCompression.cpp @@ -0,0 +1,522 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/common/compression/v2/GzipCompression.h" +#include "velox/common/base/Exceptions.h" + +namespace facebook::velox::common { + +namespace { + +constexpr uint64_t kGzipBufferLimit = + static_cast(std::numeric_limits::max()); + +// Determine if this is zlib or gzip from header. +int32_t getCompressionWindowBits(GzipFormat format, int32_t windowBits) { + switch (format) { + case GzipFormat::kDeflate: + windowBits = -windowBits; + break; + case GzipFormat::kGzip: + windowBits += 16; + break; + case GzipFormat::kZlib: + break; + } + return windowBits; +} + +int32_t getDecompressionWindowBits(GzipFormat format, int32_t windowBits) { + if (format == GzipFormat::kDeflate) { + return -windowBits; + } else { + // If not deflate, autodetect format from header. + return windowBits | 32; + } +} + +void zlibError(const char* prefix, const char* detail) { + std::string msg(detail); + VELOX_FAIL(prefix, msg.empty() ? msg : "(unknown error)"); +} +} // namespace + +class GZipDecompressor : public Decompressor { + public: + explicit GZipDecompressor(GzipFormat format, int32_t windowBits); + + ~GZipDecompressor() override; + + void init(); + + void reset() override; + + DecompressResult decompress( + uint64_t inputLength, + const uint8_t* input, + uint64_t outputLength, + uint8_t* output) override; + + bool isFinished() { + return finished_; + } + + protected: + z_stream stream_{}; + GzipFormat format_; + int32_t windowBits_; + bool initialized_{false}; + bool finished_{false}; +}; + +class GzipCompressor : public Compressor { + public: + explicit GzipCompressor(int32_t compressionLevel); + + ~GzipCompressor() override; + + void init(GzipFormat format, int32_t windowBits); + + CompressResult compress( + uint64_t inputLength, + const uint8_t* input, + uint64_t outputLength, + uint8_t* output) override; + + FlushResult flush(uint64_t outputLength, uint8_t* output) override; + + EndResult end(uint64_t outputLength, uint8_t* output) override; + + protected: + z_stream stream_{}; + int32_t compressionLevel_; + bool initialized_{false}; +}; + +GZipDecompressor::GZipDecompressor(GzipFormat format, int32_t windowBits) + : format_(format), windowBits_(windowBits) {} + +GZipDecompressor::~GZipDecompressor() { + if (initialized_) { + inflateEnd(&stream_); + } +} + +void GZipDecompressor::init() { + VELOX_CHECK(!initialized_, "Called on initialized stream."); + memset(&stream_, 0, sizeof(stream_)); + finished_ = false; + + auto windowBits = getDecompressionWindowBits(format_, windowBits_); + auto ret = inflateInit2(&stream_, windowBits); + if (ret != Z_OK) { + zlibError("zlib inflateInit failed: ", stream_.msg); + } + initialized_ = true; +} + +void GZipDecompressor::reset() { + DCHECK(initialized_); + finished_ = false; + auto ret = inflateReset(&stream_); + if (ret != Z_OK) { + zlibError("zlib inflateReset failed: ", stream_.msg); + } +} + +Decompressor::DecompressResult GZipDecompressor::decompress( + uint64_t inputLength, + const uint8_t* input, + uint64_t outputLength, + uint8_t* output) { + VELOX_CHECK(initialized_, "Called on non-initialized stream."); + + stream_.next_in = const_cast(reinterpret_cast(input)); + stream_.avail_in = static_cast(std::min(inputLength, kGzipBufferLimit)); + stream_.next_out = reinterpret_cast(output); + stream_.avail_out = + static_cast(std::min(outputLength, kGzipBufferLimit)); + + auto ret = inflate(&stream_, Z_SYNC_FLUSH); + if (ret == Z_DATA_ERROR || ret == Z_STREAM_ERROR || ret == Z_MEM_ERROR) { + zlibError("Zlib inflate failed: ", stream_.msg); + } + if (ret == Z_NEED_DICT) { + zlibError("Zlib inflate failed (need preset dictionary): ", stream_.msg); + } + if (ret == Z_BUF_ERROR) { + // No progress was possible or output is too small. + return DecompressResult{0, 0, true}; + } + VELOX_CHECK( + ret == Z_OK || ret == Z_STREAM_END, + "Invalid return code from zlib: {}", + ret); + finished_ = (ret == Z_STREAM_END); + return DecompressResult{ + inputLength - stream_.avail_in, outputLength - stream_.avail_out, false}; +} + +GzipCompressor::GzipCompressor(int32_t compressionLevel) + : compressionLevel_(compressionLevel) {} + +GzipCompressor::~GzipCompressor() { + if (initialized_) { + deflateEnd(&stream_); + } +} + +void GzipCompressor::init(GzipFormat format, int32_t windowBits) { + VELOX_CHECK(!initialized_, "Called on initialized stream."); + memset(&stream_, 0, sizeof(stream_)); + + // Initialize to run specified format + int32_t windowBitsForFormat = getCompressionWindowBits(format, windowBits); + auto ret = deflateInit2( + &stream_, + Z_DEFAULT_COMPRESSION, + Z_DEFLATED, + windowBitsForFormat, + compressionLevel_, + Z_DEFAULT_STRATEGY); + if (ret != Z_OK) { + zlibError("Zlib deflateInit failed: ", stream_.msg); + } + initialized_ = true; +} + +Compressor::CompressResult GzipCompressor::compress( + uint64_t inputLength, + const uint8_t* input, + uint64_t outputLength, + uint8_t* output) { + VELOX_CHECK(initialized_, "Called on non-initialized stream."); + + stream_.next_in = const_cast(reinterpret_cast(input)); + stream_.avail_in = static_cast(std::min(inputLength, kGzipBufferLimit)); + stream_.next_out = reinterpret_cast(output); + stream_.avail_out = + static_cast(std::min(outputLength, kGzipBufferLimit)); + + auto ret = deflate(&stream_, Z_NO_FLUSH); + if (ret == Z_STREAM_ERROR) { + zlibError("Zlib compress failed: ", stream_.msg); + } + if (ret == Z_OK) { + // Some progress has been made. + // If deflate returns Z_OK and with zero avail_out, it must be called again + // after making room in the output buffer because there might be more output + // pending. + return CompressResult{ + inputLength - stream_.avail_in, + outputLength - stream_.avail_out, + stream_.avail_out == 0}; + } + // No progress was possible, need to increase output buffer size. + VELOX_CHECK_EQ(ret, Z_BUF_ERROR, "Invalid return code from zlib: {}", ret); + return CompressResult{0, 0, true}; +} + +Compressor::FlushResult GzipCompressor::flush( + uint64_t outputLength, + uint8_t* output) { + VELOX_CHECK(initialized_, "Called on non-initialized stream."); + + static constexpr auto kInputLimit = + static_cast(std::numeric_limits::max()); + + stream_.avail_in = 0; + stream_.next_out = reinterpret_cast(output); + stream_.avail_out = static_cast(std::min(outputLength, kInputLimit)); + + auto ret = deflate(&stream_, Z_SYNC_FLUSH); + if (ret == Z_STREAM_ERROR) { + zlibError("Zlib flush failed: ", stream_.msg); + } + uint64_t bytesWritten; + if (ret == Z_OK) { + bytesWritten = outputLength - stream_.avail_out; + } else { + VELOX_CHECK_EQ(ret, Z_BUF_ERROR, "Invalid return code from zlib: {}", ret); + bytesWritten = 0; + } + // "If deflate returns with avail_out == 0, this function must be called + // again with the same value of the flush parameter and more output space + // (updated avail_out), until the flush is complete (deflate returns + // with non-zero avail_out)." + // "Note that Z_BUF_ERROR is not fatal, and deflate() can be called again + // with more input and more output space to continue compressing." + return FlushResult{bytesWritten, stream_.avail_out == 0}; +} + +Compressor::EndResult GzipCompressor::end( + uint64_t outputLength, + uint8_t* output) { + VELOX_CHECK(initialized_, "Called on non-initialized stream"); + + stream_.avail_in = 0; + stream_.next_out = reinterpret_cast(output); + stream_.avail_out = + static_cast(std::min(outputLength, kGzipBufferLimit)); + + auto ret = deflate(&stream_, Z_FINISH); + if (ret == Z_STREAM_ERROR) { + zlibError("Zlib flush failed: ", stream_.msg); + } + uint64_t bytesWritten = outputLength - stream_.avail_out; + if (ret == Z_STREAM_END) { + // Flush complete, we can now end the stream + initialized_ = false; + ret = deflateEnd(&stream_); + if (ret == Z_OK) { + return EndResult{bytesWritten, false}; + } + zlibError("Zlib end failed: ", stream_.msg); + } + // Not everything could be flushed, need to increase output buffer size. + return EndResult{bytesWritten, true}; +} + +GzipCodec::GzipCodec( + int32_t compressionLevel, + GzipFormat format, + int32_t windowBits) + : format_(format), windowBits_(windowBits) { + compressionLevel_ = compressionLevel == kUseDefaultCompressionLevel + ? kGzipDefaultCompressionLevel + : compressionLevel; +} + +GzipCodec::~GzipCodec() { + endCompressor(); + endDecompressor(); +} + +std::shared_ptr GzipCodec::makeCompressor() { + auto ptr = std::make_shared(compressionLevel_); + ptr->init(format_, windowBits_); + return ptr; +} + +std::shared_ptr GzipCodec::makeDecompressor() { + auto ptr = std::make_shared(format_, windowBits_); + ptr->init(); + return ptr; +} + +void GzipCodec::initCompressor() { + endDecompressor(); + memset(&stream_, 0, sizeof(stream_)); + + // Initialize to run specified format + int32_t windowBits = getCompressionWindowBits(format_, windowBits_); + auto ret = deflateInit2( + &stream_, + Z_DEFAULT_COMPRESSION, + Z_DEFLATED, + windowBits, + compressionLevel_, + Z_DEFAULT_STRATEGY); + if (ret != Z_OK) { + zlibError("zlib deflateInit failed: ", stream_.msg); + } + compressorInitialized_ = true; +} + +void GzipCodec::endCompressor() { + if (compressorInitialized_) { + (void)deflateEnd(&stream_); + } + compressorInitialized_ = false; +} + +void GzipCodec::initDecompressor() { + endCompressor(); + memset(&stream_, 0, sizeof(stream_)); + + // Initialize to run either deflate or zlib/gzip format + int32_t windowBits = getDecompressionWindowBits(format_, windowBits_); + auto ret = inflateInit2(&stream_, windowBits); + if (ret != Z_OK) { + zlibError("zlib inflateInit failed: ", stream_.msg); + } + decompressorInitialized_ = true; +} + +void GzipCodec::endDecompressor() { + if (decompressorInitialized_) { + (void)inflateEnd(&stream_); + } + decompressorInitialized_ = false; +} + +uint64_t GzipCodec::decompress( + uint64_t inputLength, + const uint8_t* input, + uint64_t outputLength, + uint8_t* output) { + if (!decompressorInitialized_) { + initDecompressor(); + } + if (outputLength == 0) { + // If input doesn't contain compressed data, outputLength is 0. + // The zlib library does not allow *output to be NULL, even when + // outputLength is 0 (inflate() will return Z_STREAM_ERROR). We + // don't consider this an error, so bail early if no output is expected. + // Note that we don't signal an error if the input actually contains + // compressed data. + return 0; + } + + // Reset the stream for this block + if (inflateReset(&stream_) != Z_OK) { + zlibError("zlib inflateReset failed: ", stream_.msg); + } + + stream_.next_in = const_cast(reinterpret_cast(input)); + stream_.avail_in = static_cast(inputLength); + stream_.next_out = reinterpret_cast(output); + stream_.avail_out = static_cast(outputLength); + + auto ret = inflate(&stream_, Z_FINISH); + if (ret != Z_STREAM_END) { + if (ret == Z_OK) { + // Z_OK (and stream.msg NOT set) indicates stream.avail_out is too + // small. + VELOX_FAIL("zlib inflate failed, output buffer too small"); + } + zlibError("Zlib inflate failed: ", stream_.msg); + } + + return stream_.total_out; +} + +uint64_t GzipCodec::maxCompressedLength(uint64_t inputLength) { + // Must be in compression mode. + if (!compressorInitialized_) { + initCompressor(); + } + uint64_t maxLength = deflateBound(&stream_, static_cast(inputLength)); + // ARROW-3514: return a more pessimistic estimate to account for bugs + // in old zlib versions. + return maxLength + 12; +} + +uint64_t GzipCodec::compress( + uint64_t inputLength, + const uint8_t* input, + uint64_t outputLength, + uint8_t* output) { + if (!compressorInitialized_) { + initCompressor(); + } + + if (deflateReset(&stream_) != Z_OK) { + zlibError("Zlib deflateReset failed: ", stream_.msg); + } + + stream_.next_in = const_cast(reinterpret_cast(input)); + stream_.avail_in = static_cast(inputLength); + stream_.next_out = reinterpret_cast(output); + stream_.avail_out = static_cast(outputLength); + + auto ret = deflate(&stream_, Z_FINISH); + if (ret != Z_STREAM_END) { + if (ret == Z_OK) { + // Z_OK (and stream.msg NOT set) indicates stream.avail_out is too + // small. + VELOX_FAIL("zlib deflate failed, output buffer too small"); + } + zlibError("Zlib deflate failed: ", stream_.msg); + } + + return stream_.total_out; +} + +uint64_t GzipCodec::compressPartial( + uint64_t inputLength, + const uint8_t* input, + uint64_t outputLength, + uint8_t* output) { + if (!compressorInitialized_) { + initCompressor(); + } + + if (deflateReset(&stream_) != Z_OK) { + zlibError("Zlib deflateReset failed: ", stream_.msg); + } + + stream_.next_in = const_cast(reinterpret_cast(input)); + stream_.avail_in = static_cast(inputLength); + stream_.next_out = reinterpret_cast(output); + stream_.avail_out = static_cast(outputLength); + + auto ret = deflate(&stream_, Z_FINISH); + if (ret != Z_STREAM_END && ret != Z_OK && ret != Z_BUF_ERROR) { + zlibError("Zlib deflate failed: ", stream_.msg); + } + + return stream_.total_out; +} + +void GzipCodec::init() { + if (windowBits_ < kGzipMinWindowBits || windowBits_ > kGzipMaxWindowBits) { + VELOX_USER_FAIL( + "GZip window bits should be between {} and {}", + kGzipMinWindowBits, + kGzipMaxWindowBits); + } + initCompressor(); + initDecompressor(); +} + +CompressionKind GzipCodec::compressionKind() const { + if (format_ == GzipFormat::kZlib) { + return CompressionKind_ZLIB; + } + return CompressionKind_GZIP; +} + +int32_t GzipCodec::compressionLevel() const { + return compressionLevel_; +} + +int32_t GzipCodec::minimumCompressionLevel() const { + return kGZipMinCompressionLevel; +} + +int32_t GzipCodec::maximumCompressionLevel() const { + return kGZipMaxCompressionLevel; +} + +int32_t GzipCodec::defaultCompressionLevel() const { + return kGzipDefaultCompressionLevel; +} + +std::unique_ptr makeGzipCodec( + int compressionLevel, + GzipFormat format, + std::optional windowBits) { + return std::make_unique( + compressionLevel, format, windowBits.value_or(kGzipDefaultWindowBits)); +} + +std::unique_ptr makeZlibCodec( + int compressionLevel, + std::optional windowBits) { + return makeGzipCodec(compressionLevel, GzipFormat::kZlib, windowBits); +} +} // namespace facebook::velox::common diff --git a/velox/common/compression/v2/GzipCompression.h b/velox/common/compression/v2/GzipCompression.h new file mode 100644 index 000000000000..48f841f08b64 --- /dev/null +++ b/velox/common/compression/v2/GzipCompression.h @@ -0,0 +1,137 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include "velox/common/compression/v2/Compression.h" + +namespace facebook::velox::common { + +enum GzipFormat { + kZlib, + kDeflate, + kGzip, +}; + +// Compression levels. +constexpr int32_t kGzipDefaultCompressionLevel = 9; +constexpr int32_t kGZipMinCompressionLevel = 1; +constexpr int32_t kGZipMaxCompressionLevel = 9; + +// Maximum window size. +static constexpr int32_t kGzipMaxWindowBits = 15; +// Minimum window size. +static constexpr int32_t kGzipMinWindowBits = 9; +// Default window size. +static constexpr int32_t kGzipDefaultWindowBits = 15; +// 4KB window size. +static constexpr int32_t kGzip4KBWindowBits = 12; + +class GzipCodecOptions : public CodecOptions { + public: + explicit GzipCodecOptions( + int32_t compressionLevel = kUseDefaultCompressionLevel, + GzipFormat format = GzipFormat::kGzip, + std::optional windowBits = kGzipDefaultWindowBits) + : CodecOptions(compressionLevel), + format(format), + windowBits(windowBits) {} + + GzipFormat format; + std::optional windowBits; +}; + +class GzipCodec : public Codec { + public: + GzipCodec(int32_t compressionLevel, GzipFormat format, int32_t windowBits); + + ~GzipCodec() override; + + std::shared_ptr makeCompressor() override; + + std::shared_ptr makeDecompressor() override; + + uint64_t decompress( + uint64_t inputLength, + const uint8_t* input, + uint64_t outputLength, + uint8_t* output) override; + + uint64_t maxCompressedLength(uint64_t inputLength) override; + + uint64_t compress( + uint64_t inputLength, + const uint8_t* input, + uint64_t outputLength, + uint8_t* output) override; + + uint64_t compressPartial( + uint64_t inputLength, + const uint8_t* input, + uint64_t outputLength, + uint8_t* output) override; + + void init() override; + + CompressionKind compressionKind() const override; + + int32_t compressionLevel() const override; + + int32_t minimumCompressionLevel() const override; + + int32_t maximumCompressionLevel() const override; + + int32_t defaultCompressionLevel() const override; + + private: + void initCompressor(); + + void endCompressor(); + + void initDecompressor(); + + void endDecompressor(); + + // zlib is stateful and the z_stream state variable must be initialized + // before + z_stream stream_{}; + + // Realistically, this will always be GZIP, but we leave the option open to + // configure + GzipFormat format_; + + int32_t windowBits_; + int32_t compressionLevel_; + // These variables are mutually exclusive. When the codec is in "compressor" + // state, compressorInitialized_ is true while decompressorInitialized_ is + // false. When it's decompressing, the opposite is true. + bool compressorInitialized_{false}; + bool decompressorInitialized_{false}; +}; + +std::unique_ptr makeGzipCodec( + int compressionLevel = kGzipDefaultCompressionLevel, + GzipFormat format = GzipFormat::kGzip, + std::optional windowBits = std::nullopt); + +std::unique_ptr makeZlibCodec( + int compressionLevel = kGzipDefaultCompressionLevel, + std::optional windowBits = std::nullopt); + +} // namespace facebook::velox::common diff --git a/velox/common/compression/v2/HadoopCompressionFormat.cpp b/velox/common/compression/v2/HadoopCompressionFormat.cpp new file mode 100644 index 000000000000..d2b6e4852a70 --- /dev/null +++ b/velox/common/compression/v2/HadoopCompressionFormat.cpp @@ -0,0 +1,81 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/common/compression/v2/HadoopCompressionFormat.h" +#include "velox/common/base/Exceptions.h" + +#include + +namespace facebook::velox::common { + +bool HadoopCompressionFormat::tryDecompressHadoop( + uint64_t inputLength, + const uint8_t* input, + uint64_t outputLength, + uint8_t* output, + uint64_t& actualDecompressedSize) { + // Parquet files written with the Hadoop Lz4RawCodec use their own framing. + // The input buffer can contain an arbitrary number of "frames", each + // with the following structure: + // - bytes 0..3: big-endian uint32_t representing the frame decompressed + // size + // - bytes 4..7: big-endian uint32_t representing the frame compressed size + // - bytes 8...: frame compressed data + // + // The Hadoop Lz4Codec source code can be found here: + // https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/Lz4Codec.cc + uint64_t totalDecompressedSize = 0; + + while (inputLength >= kPrefixLength) { + const uint32_t expectedDecompressedSize = + folly::Endian::big(folly::loadUnaligned(input)); + const uint32_t expectedCompressedSize = folly::Endian::big( + folly::loadUnaligned(input + sizeof(uint32_t))); + input += kPrefixLength; + inputLength -= kPrefixLength; + + if (inputLength < expectedCompressedSize) { + // Not enough bytes for Hadoop "frame" + return false; + } + if (outputLength < expectedDecompressedSize) { + // Not enough bytes to hold advertised output => probably not Hadoop + return false; + } + // Try decompressing and compare with expected decompressed length + try { + auto decompressedSize = decompressInternal( + expectedCompressedSize, input, outputLength, output); + if (decompressedSize != expectedDecompressedSize) { + return false; + } + } catch (const VeloxException& e) { + return false; + } + input += expectedCompressedSize; + inputLength -= expectedCompressedSize; + output += expectedDecompressedSize; + outputLength -= expectedDecompressedSize; + totalDecompressedSize += expectedDecompressedSize; + } + + if (inputLength == 0) { + actualDecompressedSize = totalDecompressedSize; + return true; + } + return false; +} +} // namespace facebook::velox::common diff --git a/velox/common/compression/v2/HadoopCompressionFormat.h b/velox/common/compression/v2/HadoopCompressionFormat.h new file mode 100644 index 000000000000..aaf5e71e97a3 --- /dev/null +++ b/velox/common/compression/v2/HadoopCompressionFormat.h @@ -0,0 +1,41 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +namespace facebook::velox::common { + +class HadoopCompressionFormat { + protected: + bool tryDecompressHadoop( + uint64_t inputLength, + const uint8_t* input, + uint64_t outputLength, + uint8_t* output, + uint64_t& actualDecompressedSize); + + virtual uint64_t decompressInternal( + uint64_t inputLength, + const uint8_t* input, + uint64_t outputLength, + uint8_t* output) = 0; + + // Offset starting at which page data can be read/written. + static constexpr uint64_t kPrefixLength = sizeof(uint32_t) * 2; +}; +} // namespace facebook::velox::common diff --git a/velox/common/compression/v2/Lz4Compression.cpp b/velox/common/compression/v2/Lz4Compression.cpp new file mode 100644 index 000000000000..d7440ccea77e --- /dev/null +++ b/velox/common/compression/v2/Lz4Compression.cpp @@ -0,0 +1,527 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/common/compression/v2/Lz4Compression.h" +#include "velox/common/base/Exceptions.h" + +namespace facebook::velox::common { + +namespace { + +void lz4Error(LZ4F_errorCode_t errorCode, const char* prefixMessage) { + VELOX_FAIL(prefixMessage, LZ4F_getErrorName(errorCode)); +} + +LZ4F_preferences_t defaultPreferences() { + LZ4F_preferences_t prefs; + memset(&prefs, 0, sizeof(prefs)); + return prefs; +} + +LZ4F_preferences_t defaultPreferences(int compressionLevel) { + LZ4F_preferences_t prefs = defaultPreferences(); + prefs.compressionLevel = compressionLevel; + return prefs; +} +} // namespace + +class LZ4Compressor : public Compressor { + public: + explicit LZ4Compressor(int32_t compressionLevel); + + ~LZ4Compressor() override; + + void init(); + + CompressResult compress( + uint64_t inputLength, + const uint8_t* input, + uint64_t outputLength, + uint8_t* output) override; + + FlushResult flush(uint64_t outputLength, uint8_t* output) override; + + EndResult end(uint64_t outputLength, uint8_t* output) override; + + protected: + void + compressBegin(uint8_t* output, size_t& outputLen, uint64_t& bytesWritten); + + int compressionLevel_; + LZ4F_compressionContext_t ctx_{nullptr}; + LZ4F_preferences_t prefs_; + bool firstTime_; +}; + +class LZ4Decompressor : public Decompressor { + public: + LZ4Decompressor() {} + + ~LZ4Decompressor() override { + if (ctx_ != nullptr) { + LZ4F_freeDecompressionContext(ctx_); + } + } + + void init(); + + void reset() override; + + DecompressResult decompress( + uint64_t inputLength, + const uint8_t* input, + uint64_t outputLength, + uint8_t* output) override; + + bool isFinished() override; + + protected: + LZ4F_decompressionContext_t ctx_ = nullptr; + bool finished_; +}; + +LZ4Compressor::LZ4Compressor(int32_t compressionLevel) + : compressionLevel_(compressionLevel) {} + +LZ4Compressor::~LZ4Compressor() { + if (ctx_ != nullptr) { + LZ4F_freeCompressionContext(ctx_); + } +} + +void LZ4Compressor::init() { + LZ4F_errorCode_t ret; + prefs_ = defaultPreferences(compressionLevel_); + firstTime_ = true; + + ret = LZ4F_createCompressionContext(&ctx_, LZ4F_VERSION); + if (LZ4F_isError(ret)) { + lz4Error(ret, "LZ4 init failed: "); + } +} + +Compressor::CompressResult LZ4Compressor::compress( + uint64_t inputLength, + const uint8_t* input, + uint64_t outputLength, + uint8_t* output) { + auto inputSize = static_cast(inputLength); + auto outputSize = static_cast(outputLength); + uint64_t bytesWritten = 0; + + if (firstTime_) { + // Output too small to write LZ4F header. + if (outputLength < LZ4F_HEADER_SIZE_MAX) { + return CompressResult{0, 0, true}; + } + compressBegin(output, outputSize, bytesWritten); + } + + if (outputSize < LZ4F_compressBound(inputSize, &prefs_)) { + // Output too small to compress into. + return CompressResult{0, bytesWritten, true}; + } + auto numBytesOrError = LZ4F_compressUpdate( + ctx_, output, outputSize, input, inputSize, nullptr /* options */); + if (LZ4F_isError(numBytesOrError)) { + lz4Error(numBytesOrError, "LZ4 compress update failed: "); + } + bytesWritten += static_cast(numBytesOrError); + DCHECK_LE(bytesWritten, outputSize); + return CompressResult{inputLength, bytesWritten, false}; +} + +Compressor::FlushResult LZ4Compressor::flush( + uint64_t outputLength, + uint8_t* output) { + auto outputSize = static_cast(outputLength); + uint64_t bytesWritten = 0; + + if (firstTime_) { + // Output too small to write LZ4F header. + if (outputLength < LZ4F_HEADER_SIZE_MAX) { + return FlushResult{0, true}; + } + compressBegin(output, outputSize, bytesWritten); + } + + if (outputSize < LZ4F_compressBound(0, &prefs_)) { + // Output too small to flush into. + return FlushResult{bytesWritten, true}; + } + + auto numBytesOrError = + LZ4F_flush(ctx_, output, outputSize, nullptr /* options */); + if (LZ4F_isError(numBytesOrError)) { + lz4Error(numBytesOrError, "LZ4 flush failed: "); + } + bytesWritten += static_cast(numBytesOrError); + DCHECK_LE(bytesWritten, outputLength); + return FlushResult{bytesWritten, false}; +} + +Compressor::EndResult LZ4Compressor::end( + uint64_t outputLength, + uint8_t* output) { + auto outputSize = static_cast(outputLength); + uint64_t bytesWritten = 0; + + if (firstTime_) { + // Output too small to write LZ4F header. + if (outputLength < LZ4F_HEADER_SIZE_MAX) { + return EndResult{0, true}; + } + compressBegin(output, outputSize, bytesWritten); + } + + if (outputSize < LZ4F_compressBound(0, &prefs_)) { + // Output too small to end frame into. + return EndResult{bytesWritten, true}; + } + + auto numBytesOrError = + LZ4F_compressEnd(ctx_, output, outputSize, nullptr /* options */); + if (LZ4F_isError(numBytesOrError)) { + lz4Error(numBytesOrError, "LZ4 end failed: "); + } + bytesWritten += static_cast(numBytesOrError); + DCHECK_LE(bytesWritten, outputLength); + return EndResult{bytesWritten, false}; +} + +void LZ4Compressor::compressBegin( + uint8_t* output, + size_t& outputLen, + uint64_t& bytesWritten) { + auto numBytesOrError = LZ4F_compressBegin(ctx_, output, outputLen, &prefs_); + if (LZ4F_isError(numBytesOrError)) { + lz4Error(numBytesOrError, "LZ4 compress begin failed: "); + } + firstTime_ = false; + output += numBytesOrError; + outputLen -= numBytesOrError; + bytesWritten += static_cast(numBytesOrError); +} + +void common::LZ4Decompressor::init() { + LZ4F_errorCode_t ret; + finished_ = false; + + ret = LZ4F_createDecompressionContext(&ctx_, LZ4F_VERSION); + if (LZ4F_isError(ret)) { + lz4Error(ret, "LZ4 init failed: "); + } +} + +void LZ4Decompressor::reset() { +#if defined(LZ4_VERSION_NUMBER) && LZ4_VERSION_NUMBER >= 10800 + // LZ4F_resetDecompressionContext appeared in 1.8.0 + DCHECK_NE(ctx_, nullptr); + LZ4F_resetDecompressionContext(ctx_); + finished_ = false; +#else + if (ctx_ != nullptr) { + LZ4F_freeDecompressionContext(ctx_); + } + init(); +#endif +} + +Decompressor::DecompressResult LZ4Decompressor::decompress( + uint64_t inputLength, + const uint8_t* input, + uint64_t outputLength, + uint8_t* output) { + auto inputSize = static_cast(inputLength); + auto outputSize = static_cast(outputLength); + + auto ret = LZ4F_decompress( + ctx_, output, &outputSize, input, &inputSize, nullptr /* options */); + if (LZ4F_isError(ret)) { + lz4Error(ret, "LZ4 decompress failed: "); + } + finished_ = (ret == 0); + return DecompressResult{ + static_cast(inputSize), + static_cast(outputSize), + (inputSize == 0 && outputSize == 0)}; +} + +bool LZ4Decompressor::isFinished() { + return finished_; +} + +Lz4CodecBase::Lz4CodecBase(int32_t compressionLevel) + : compressionLevel_( + compressionLevel == kUseDefaultCompressionLevel + ? kLz4DefaultCompressionLevel + : compressionLevel) {} + +int32_t Lz4CodecBase::minimumCompressionLevel() const { + return kLz4MinCompressionLevel; +} + +int32_t Lz4CodecBase::maximumCompressionLevel() const { +#if (defined(LZ4_VERSION_NUMBER) && LZ4_VERSION_NUMBER < 10800) + return 12; +#else + return LZ4F_compressionLevel_max(); +#endif +} + +int32_t Lz4CodecBase::defaultCompressionLevel() const { + return kLz4DefaultCompressionLevel; +} + +int32_t Lz4CodecBase::compressionLevel() const { + return compressionLevel_; +} + +Lz4FrameCodec::Lz4FrameCodec(int32_t compressionLevel) + : Lz4CodecBase(compressionLevel), + prefs_(defaultPreferences(compressionLevel_)) {} + +uint64_t Lz4FrameCodec::maxCompressedLength(uint64_t inputLen) { + return static_cast( + LZ4F_compressFrameBound(static_cast(inputLen), &prefs_)); +} + +uint64_t Lz4FrameCodec::compress( + uint64_t inputLength, + const uint8_t* input, + uint64_t outputLength, + uint8_t* output) { + auto ret = LZ4F_compressFrame( + output, + static_cast(outputLength), + input, + static_cast(inputLength), + &prefs_); + if (LZ4F_isError(ret)) { + lz4Error(ret, "Lz4 compression failure: "); + } + return static_cast(ret); +} + +uint64_t Lz4FrameCodec::decompress( + uint64_t inputLength, + const uint8_t* input, + uint64_t outputLength, + uint8_t* output) { + auto decompressor = makeDecompressor(); + + uint64_t bytesWritten = 0; + while (!decompressor->isFinished() && inputLength != 0) { + auto result = + decompressor->decompress(inputLength, input, outputLength, output); + input += result.bytesRead; + inputLength -= result.bytesRead; + output += result.bytesWritten; + outputLength -= result.bytesWritten; + bytesWritten += result.bytesWritten; + if (result.outputTooSmall) { + VELOX_FAIL("Lz4 decompression buffer too small."); + } + } + if (!decompressor->isFinished()) { + VELOX_FAIL("Lz4 compressed input contains less than one frame."); + } + if (inputLength != 0) { + VELOX_FAIL("Lz4 compressed input contains more than one frame."); + } + return bytesWritten; +} + +std::shared_ptr Lz4FrameCodec::makeCompressor() { + auto ptr = std::make_shared(compressionLevel_); + ptr->init(); + return ptr; +} + +std::shared_ptr Lz4FrameCodec::makeDecompressor() { + auto ptr = std::make_shared(); + ptr->init(); + return ptr; +} + +CompressionKind Lz4FrameCodec::compressionKind() const { + return CompressionKind::CompressionKind_LZ4; +} + +Lz4RawCodec::Lz4RawCodec(int32_t compressionLevel) + : Lz4CodecBase(compressionLevel) {} + +uint64_t Lz4RawCodec::maxCompressedLength(uint64_t inputLength) { + return static_cast( + LZ4_compressBound(static_cast(inputLength))); +} + +uint64_t Lz4RawCodec::decompress( + uint64_t inputLength, + const uint8_t* input, + uint64_t outputLength, + uint8_t* output) { + auto decompressedSize = LZ4_decompress_safe( + reinterpret_cast(input), + reinterpret_cast(output), + static_cast(inputLength), + static_cast(outputLength)); + if (decompressedSize < 0) { + VELOX_FAIL("Corrupt Lz4 compressed data."); + } + return static_cast(decompressedSize); +} + +uint64_t Lz4RawCodec::compress( + uint64_t inputLength, + const uint8_t* input, + uint64_t outputLength, + uint8_t* output) { + uint64_t compressedSize; +#ifdef LZ4HC_CLEVEL_MIN + constexpr int kMinHcClevel = LZ4HC_CLEVEL_MIN; +#else // For older versions of the lz4 library. + constexpr int kMinHcClevel = 3; +#endif + if (compressionLevel_ < kMinHcClevel) { + compressedSize = LZ4_compress_default( + reinterpret_cast(input), + reinterpret_cast(output), + static_cast(inputLength), + static_cast(outputLength)); + } else { + compressedSize = LZ4_compress_HC( + reinterpret_cast(input), + reinterpret_cast(output), + static_cast(inputLength), + static_cast(outputLength), + compressionLevel_); + } + if (compressedSize == 0) { + VELOX_FAIL("Lz4 compression failure."); + } + return static_cast(compressedSize); +} + +std::shared_ptr Lz4RawCodec::makeCompressor() { + VELOX_UNSUPPORTED( + "Streaming compression unsupported with LZ4 raw format. " + "Try using LZ4 frame format instead."); +} + +std::shared_ptr Lz4RawCodec::makeDecompressor() { + VELOX_UNSUPPORTED( + "Streaming decompression unsupported with LZ4 raw format. " + "Try using LZ4 frame format instead."); +} + +CompressionKind Lz4RawCodec::compressionKind() const { + return CompressionKind::CompressionKind_LZ4RAW; +} + +Lz4HadoopCodec::Lz4HadoopCodec() : Lz4RawCodec(kUseDefaultCompressionLevel) {} + +uint64_t Lz4HadoopCodec::maxCompressedLength(uint64_t inputLength) { + return kPrefixLength + Lz4RawCodec::maxCompressedLength(inputLength); +} + +uint64_t Lz4HadoopCodec::compress( + uint64_t inputLength, + const uint8_t* input, + uint64_t outputLength, + uint8_t* output) { + if (outputLength < kPrefixLength) { + VELOX_FAIL("Output buffer too small for Lz4HadoopCodec compression."); + } + + uint64_t compressedSize = Lz4RawCodec::compress( + inputLength, input, outputLength - kPrefixLength, output + kPrefixLength); + + // Prepend decompressed size in bytes and compressed size in bytes + // to be compatible with Hadoop Lz4RawCodec. + const uint32_t decompressedLength = + folly::Endian::big(static_cast(inputLength)); + const uint32_t compressedLength = + folly::Endian::big(static_cast(compressedSize)); + folly::storeUnaligned(output, decompressedLength); + folly::storeUnaligned(output + sizeof(uint32_t), compressedLength); + + return kPrefixLength + compressedSize; +} + +uint64_t Lz4HadoopCodec::decompress( + uint64_t inputLength, + const uint8_t* input, + uint64_t outputLength, + uint8_t* output) { + uint64_t decompressedSize; + if (tryDecompressHadoop( + inputLength, input, outputLength, output, decompressedSize)) { + return decompressedSize; + } + // Fall back on raw LZ4 codec (for files produces by earlier versions of + // Parquet C++). + return Lz4RawCodec::decompress(inputLength, input, outputLength, output); +} + +std::shared_ptr Lz4HadoopCodec::makeCompressor() { + VELOX_UNSUPPORTED( + "Streaming compression unsupported with LZ4 Hadoop raw format. " + "Try using LZ4 frame format instead."); +} + +std::shared_ptr Lz4HadoopCodec::makeDecompressor() { + VELOX_UNSUPPORTED( + "Streaming decompression unsupported with LZ4 Hadoop raw format. " + "Try using LZ4 frame format instead."); +} + +CompressionKind Lz4HadoopCodec::compressionKind() const { + return CompressionKind::CompressionKind_LZ4HADOOP; +} + +int32_t Lz4HadoopCodec::minimumCompressionLevel() const { + return kUseDefaultCompressionLevel; +} + +int32_t Lz4HadoopCodec::maximumCompressionLevel() const { + return kUseDefaultCompressionLevel; +} + +int32_t Lz4HadoopCodec::defaultCompressionLevel() const { + return kUseDefaultCompressionLevel; +} + +uint64_t Lz4HadoopCodec::decompressInternal( + uint64_t inputLength, + const uint8_t* input, + uint64_t outputLength, + uint8_t* output) { + return Lz4RawCodec::decompress(inputLength, input, outputLength, output); +} + +std::unique_ptr makeLz4FrameCodec(int32_t compressionLevel) { + return std::make_unique(compressionLevel); +} + +std::unique_ptr makeLz4RawCodec(int32_t compressionLevel) { + return std::make_unique(compressionLevel); +} + +std::unique_ptr makeLz4HadoopRawCodec() { + return std::make_unique(); +} +} // namespace facebook::velox::common \ No newline at end of file diff --git a/velox/common/compression/v2/Lz4Compression.h b/velox/common/compression/v2/Lz4Compression.h new file mode 100644 index 000000000000..576657eba007 --- /dev/null +++ b/velox/common/compression/v2/Lz4Compression.h @@ -0,0 +1,148 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include "velox/common/compression/v2/Compression.h" +#include "velox/common/compression/v2/HadoopCompressionFormat.h" + +namespace facebook::velox::common { + +static constexpr int32_t kLz4DefaultCompressionLevel = 1; +static constexpr int32_t kLz4MinCompressionLevel = 1; + +class Lz4CodecBase : public Codec { + public: + explicit Lz4CodecBase(int32_t compressionLevel); + + int32_t minimumCompressionLevel() const override; + + int32_t maximumCompressionLevel() const override; + + int32_t defaultCompressionLevel() const override; + + int32_t compressionLevel() const override; + + protected: + const int compressionLevel_; +}; + +class Lz4FrameCodec : public Lz4CodecBase { + public: + explicit Lz4FrameCodec(int32_t compressionLevel); + + uint64_t maxCompressedLength(uint64_t inputLength) override; + + uint64_t compress( + uint64_t inputLength, + const uint8_t* input, + uint64_t outputLength, + uint8_t* output) override; + + uint64_t decompress( + uint64_t inputLength, + const uint8_t* input, + uint64_t outputLength, + uint8_t* output) override; + + std::shared_ptr makeCompressor() override; + + std::shared_ptr makeDecompressor() override; + + CompressionKind compressionKind() const override; + + protected: + const LZ4F_preferences_t prefs_; +}; + +class Lz4RawCodec : public Lz4CodecBase { + public: + explicit Lz4RawCodec(int32_t compressionLevel); + + uint64_t maxCompressedLength(uint64_t inputLength) override; + + uint64_t compress( + uint64_t inputLength, + const uint8_t* input, + uint64_t outputLength, + uint8_t* output) override; + + uint64_t decompress( + uint64_t inputLength, + const uint8_t* input, + uint64_t outputLength, + uint8_t* output) override; + + std::shared_ptr makeCompressor() override; + + std::shared_ptr makeDecompressor() override; + + CompressionKind compressionKind() const override; +}; + +class Lz4HadoopCodec : public Lz4RawCodec, public HadoopCompressionFormat { + public: + Lz4HadoopCodec(); + + uint64_t maxCompressedLength(uint64_t inputLength) override; + + uint64_t compress( + uint64_t inputLength, + const uint8_t* input, + uint64_t outputLength, + uint8_t* output) override; + + uint64_t decompress( + uint64_t inputLength, + const uint8_t* input, + uint64_t outputLength, + uint8_t* output) override; + + std::shared_ptr makeCompressor() override; + + std::shared_ptr makeDecompressor() override; + + CompressionKind compressionKind() const override; + + int32_t minimumCompressionLevel() const override; + + int32_t maximumCompressionLevel() const override; + + int32_t defaultCompressionLevel() const override; + + private: + uint64_t decompressInternal( + uint64_t inputLength, + const uint8_t* input, + uint64_t outputLength, + uint8_t* output) override; +}; + +// Lz4 frame format codec. +std::unique_ptr makeLz4FrameCodec( + int32_t compressionLevel = kLz4DefaultCompressionLevel); + +// Lz4 "raw" format codec. +std::unique_ptr makeLz4RawCodec( + int32_t compressionLevel = kLz4DefaultCompressionLevel); + +// Lz4 "Hadoop" format codec (== Lz4 raw codec prefixed with lengths header) +std::unique_ptr makeLz4HadoopRawCodec(); +} // namespace facebook::velox::common diff --git a/velox/common/compression/v2/LzoCompression.cpp b/velox/common/compression/v2/LzoCompression.cpp new file mode 100644 index 000000000000..795c047a2d2b --- /dev/null +++ b/velox/common/compression/v2/LzoCompression.cpp @@ -0,0 +1,97 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/common/compression/v2/LzoCompression.h" +#include "velox/common/base/Exceptions.h" +#include "velox/common/compression/LzoDecompressor.h" + +namespace facebook::velox::common { + +LzoCodec::LzoCodec() = default; + +uint64_t LzoCodec::maxCompressedLength(uint64_t inputLength) { + VELOX_UNSUPPORTED("LZO compression is not supported."); +} + +uint64_t LzoCodec::compress( + uint64_t inputLength, + const uint8_t* input, + uint64_t outputLength, + uint8_t* output) { + VELOX_UNSUPPORTED("LZO compression is not supported."); +} + +uint64_t LzoCodec::decompress( + uint64_t inputLength, + const uint8_t* input, + uint64_t outputLength, + uint8_t* output) { + const char* inputAddress = + reinterpret_cast(const_cast(input)); + char* outputAddress = reinterpret_cast(output); + return velox::common::compression::lzoDecompress( + inputAddress, + inputAddress + inputLength, + outputAddress, + outputAddress + outputLength); +} + +std::shared_ptr LzoCodec::makeCompressor() { + VELOX_UNSUPPORTED("Streaming compression unsupported with LZO"); +} + +std::shared_ptr LzoCodec::makeDecompressor() { + VELOX_UNSUPPORTED("Streaming decompression unsupported with LZO"); +} + +CompressionKind LzoCodec::compressionKind() const { + return CompressionKind_LZO; +} + +int32_t LzoCodec::minimumCompressionLevel() const { + return kUseDefaultCompressionLevel; +} + +int32_t LzoCodec::maximumCompressionLevel() const { + return kUseDefaultCompressionLevel; +} + +int32_t LzoCodec::defaultCompressionLevel() const { + return kUseDefaultCompressionLevel; +} + +LzoHadoopCodec::LzoHadoopCodec() = default; + +CompressionKind LzoHadoopCodec::compressionKind() const { + return CompressionKind_LZOHADOOP; +} + +uint64_t LzoHadoopCodec::decompressInternal( + uint64_t inputLength, + const uint8_t* input, + uint64_t outputLength, + uint8_t* output) { + return LzoCodec::decompress(inputLength, input, outputLength, output); +} + +std::unique_ptr makeLzoCodec() { + return std::make_unique(); +} + +std::unique_ptr makeLzoHadoopCodec() { + return std::make_unique(); +} +} // namespace facebook::velox::common diff --git a/velox/common/compression/v2/LzoCompression.h b/velox/common/compression/v2/LzoCompression.h new file mode 100644 index 000000000000..36bd45a167de --- /dev/null +++ b/velox/common/compression/v2/LzoCompression.h @@ -0,0 +1,75 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include "velox/common/compression/v2/Compression.h" +#include "velox/common/compression/v2/HadoopCompressionFormat.h" + +namespace facebook::velox::common { + +class LzoCodec : public Codec { + public: + LzoCodec(); + + uint64_t maxCompressedLength(uint64_t inputLength) override; + + uint64_t compress( + uint64_t inputLength, + const uint8_t* input, + uint64_t outputLength, + uint8_t* output) override; + + uint64_t decompress( + uint64_t inputLength, + const uint8_t* input, + uint64_t outputLength, + uint8_t* output) override; + + std::shared_ptr makeCompressor() override; + + std::shared_ptr makeDecompressor() override; + + CompressionKind compressionKind() const override; + + int32_t minimumCompressionLevel() const override; + + int32_t maximumCompressionLevel() const override; + + int32_t defaultCompressionLevel() const override; +}; + +class LzoHadoopCodec : public LzoCodec, public HadoopCompressionFormat { + public: + LzoHadoopCodec(); + + CompressionKind compressionKind() const; + + private: + uint64_t decompressInternal( + uint64_t inputLength, + const uint8_t* input, + uint64_t outputLength, + uint8_t* output); +}; + +// Lzo format codec. +std::unique_ptr makeLzoCodec(); + +// Lzo "Hadoop" format codec. +std::unique_ptr makeLzoHadoopCodec(); +} // namespace facebook::velox::common diff --git a/velox/common/compression/v2/SnappyCompression.cpp b/velox/common/compression/v2/SnappyCompression.cpp new file mode 100644 index 000000000000..d0387e3d6d1c --- /dev/null +++ b/velox/common/compression/v2/SnappyCompression.cpp @@ -0,0 +1,105 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/common/compression/v2/SnappyCompression.h" +#include "velox/common/base/Exceptions.h" + +namespace facebook::velox::common { + +uint64_t SnappyCodec::decompress( + uint64_t inputLength, + const uint8_t* input, + uint64_t outputLength, + uint8_t* output) { + size_t decompressedSize; + VELOX_CHECK( + snappy::GetUncompressedLength( + reinterpret_cast(input), + static_cast(inputLength), + &decompressedSize), + "Corrupt snappy compressed data."); + VELOX_CHECK_GE(outputLength, decompressedSize, "Output length is too small"); + VELOX_CHECK( + snappy::RawUncompress( + reinterpret_cast(input), + static_cast(inputLength), + reinterpret_cast(output)), + "Corrupt snappy compressed data."); + return static_cast(decompressedSize); +} + +uint64_t SnappyCodec::maxCompressedLength(uint64_t inputLength) { + DCHECK_GE(inputLength, 0); + return static_cast( + snappy::MaxCompressedLength(static_cast(inputLength))); +} + +uint64_t SnappyCodec::compress( + uint64_t inputLength, + const uint8_t* input, + uint64_t outputLength, + uint8_t* output) { + size_t output_size; + snappy::RawCompress( + reinterpret_cast(input), + static_cast(inputLength), + reinterpret_cast(output), + &output_size); + return static_cast(output_size); +} + +std::shared_ptr SnappyCodec::makeCompressor() { + VELOX_UNSUPPORTED("Streaming compression unsupported with Snappy"); +} + +std::shared_ptr SnappyCodec::makeDecompressor() { + VELOX_UNSUPPORTED("Streaming decompression unsupported with Snappy"); +} + +CompressionKind SnappyCodec::compressionKind() const { + return CompressionKind_SNAPPY; +} + +int32_t SnappyCodec::minimumCompressionLevel() const { + return kUseDefaultCompressionLevel; +} + +int32_t SnappyCodec::maximumCompressionLevel() const { + return kUseDefaultCompressionLevel; +} + +int32_t SnappyCodec::defaultCompressionLevel() const { + return kUseDefaultCompressionLevel; +} + +std::optional SnappyCodec::doGetUncompressedLength( + uint64_t inputLength, + const uint8_t* input, + std::optional uncompressedLength) const { + size_t decompressedSize; + if (!snappy::GetUncompressedLength( + reinterpret_cast(input), + static_cast(inputLength), + &decompressedSize)) { + return uncompressedLength; + } + return static_cast(decompressedSize); +} + +std::unique_ptr makeSnappyCodec() { + return std::make_unique(); +} +} // namespace facebook::velox::common diff --git a/velox/common/compression/v2/SnappyCompression.h b/velox/common/compression/v2/SnappyCompression.h new file mode 100644 index 000000000000..045f1f5de5bf --- /dev/null +++ b/velox/common/compression/v2/SnappyCompression.h @@ -0,0 +1,64 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// Derived from Apache Arrow. + +#include +#include +#include +#include +#include "velox/common/compression/v2/Compression.h" + +namespace facebook::velox::common { + +class SnappyCodec : public Codec { + public: + uint64_t decompress( + uint64_t inputLength, + const uint8_t* input, + uint64_t outputLength, + uint8_t* output) override; + + uint64_t compress( + uint64_t inputLength, + const uint8_t* input, + uint64_t outputLength, + uint8_t* output) override; + + uint64_t maxCompressedLength(uint64_t inputLength) override; + + std::shared_ptr makeCompressor() override; + + std::shared_ptr makeDecompressor() override; + + CompressionKind compressionKind() const override; + + int32_t minimumCompressionLevel() const override; + + int32_t maximumCompressionLevel() const override; + + int32_t defaultCompressionLevel() const override; + + private: + std::optional doGetUncompressedLength( + uint64_t inputLength, + const uint8_t* input, + std::optional uncompressedLength) const override; +}; + +std::unique_ptr makeSnappyCodec(); + +} // namespace facebook::velox::common diff --git a/velox/common/compression/v2/ZstdCompression.cpp b/velox/common/compression/v2/ZstdCompression.cpp new file mode 100644 index 000000000000..0555f13bd907 --- /dev/null +++ b/velox/common/compression/v2/ZstdCompression.cpp @@ -0,0 +1,317 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/common/compression/v2/ZstdCompression.h" +#include +#include "velox/common/base/Exceptions.h" + +namespace facebook::velox::common { +namespace { +void zstdError(size_t errorCode, const char* prefixMessage) { + VELOX_FAIL(prefixMessage, ZSTD_getErrorName(errorCode)); +} +} // namespace + +class ZstdDecompressor : public Decompressor { + public: + ZstdDecompressor(); + + ~ZstdDecompressor() override; + + void init(); + + DecompressResult decompress( + uint64_t inputLength, + const uint8_t* input, + uint64_t outputLength, + uint8_t* output) override; + + void reset() override; + + bool isFinished() override; + + private: + ZSTD_DStream* stream_; + bool finished_{false}; +}; + +class ZstdCompressor : public Compressor { + public: + explicit ZstdCompressor(int32_t compressionLevel); + + ~ZstdCompressor() override; + + void init(); + + CompressResult compress( + uint64_t inputLength, + const uint8_t* input, + uint64_t outputLength, + uint8_t* output) override; + + FlushResult flush(uint64_t outputLength, uint8_t* output) override; + + EndResult end(uint64_t outputLength, uint8_t* output) override; + + private: + ZSTD_CStream* stream_; + int32_t compressionLevel_; +}; + +ZstdDecompressor::ZstdDecompressor() : stream_(ZSTD_createDStream()) {} + +ZstdDecompressor::~ZstdDecompressor() { + ZSTD_freeDStream(stream_); +} + +void ZstdDecompressor::init() { + finished_ = false; + size_t ret = ZSTD_initDStream(stream_); + if (ZSTD_isError(ret)) { + zstdError(ret, "ZSTD init failed: "); + } +} + +Decompressor::DecompressResult ZstdDecompressor::decompress( + uint64_t inputLength, + const uint8_t* input, + uint64_t outputLength, + uint8_t* output) { + ZSTD_inBuffer inBuffer; + ZSTD_outBuffer outBuffer; + + inBuffer.src = input; + inBuffer.size = static_cast(inputLength); + inBuffer.pos = 0; + outBuffer.dst = output; + outBuffer.size = static_cast(outputLength); + outBuffer.pos = 0; + + auto ret = ZSTD_decompressStream(stream_, &outBuffer, &inBuffer); + if (ZSTD_isError(ret)) { + zstdError(ret, "ZSTD decompress failed: "); + } + finished_ = (ret == 0); + return DecompressResult{ + static_cast(inBuffer.pos), + static_cast(outBuffer.pos), + inBuffer.pos == 0 && outBuffer.pos == 0}; +} + +void ZstdDecompressor::reset() { + return init(); +} + +bool ZstdDecompressor::isFinished() { + return finished_; +} + +ZstdCompressor::ZstdCompressor(int32_t compressionLevel) + : stream_(ZSTD_createCStream()), compressionLevel_(compressionLevel) {} + +ZstdCompressor::~ZstdCompressor() { + ZSTD_freeCStream(stream_); +} + +void ZstdCompressor::init() { + auto ret = ZSTD_initCStream(stream_, compressionLevel_); + if (ZSTD_isError(ret)) { + zstdError(ret, "ZSTD init failed: "); + } +} + +Compressor::CompressResult ZstdCompressor::compress( + uint64_t inputLength, + const uint8_t* input, + uint64_t outputLength, + uint8_t* output) { + ZSTD_inBuffer inBuffer; + ZSTD_outBuffer outBuffer; + + inBuffer.src = input; + inBuffer.size = static_cast(inputLength); + inBuffer.pos = 0; + outBuffer.dst = output; + outBuffer.size = static_cast(outputLength); + outBuffer.pos = 0; + + auto ret = ZSTD_compressStream(stream_, &outBuffer, &inBuffer); + if (ZSTD_isError(ret)) { + zstdError(ret, "ZSTD compress failed: "); + } + return CompressResult{ + static_cast(inBuffer.pos), + static_cast(outBuffer.pos), + inBuffer.pos == 0}; +} + +Compressor::FlushResult ZstdCompressor::flush( + uint64_t outputLength, + uint8_t* output) { + ZSTD_outBuffer outBuffer; + + outBuffer.dst = output; + outBuffer.size = static_cast(outputLength); + outBuffer.pos = 0; + + auto ret = ZSTD_flushStream(stream_, &outBuffer); + if (ZSTD_isError(ret)) { + zstdError(ret, "ZSTD flush failed: "); + } + return FlushResult{static_cast(outBuffer.pos), ret > 0}; +} + +Compressor::EndResult ZstdCompressor::end( + uint64_t outputLength, + uint8_t* output) { + ZSTD_outBuffer outBuffer; + + outBuffer.dst = output; + outBuffer.size = static_cast(outputLength); + outBuffer.pos = 0; + + auto ret = ZSTD_endStream(stream_, &outBuffer); + if (ZSTD_isError(ret)) { + zstdError(ret, "ZSTD end failed: "); + } + return EndResult{static_cast(outBuffer.pos), ret > 0}; +} + +ZstdCodec::ZstdCodec(int32_t compressionLevel) + : compressionLevel_( + compressionLevel == kUseDefaultCompressionLevel + ? kZSTDDefaultCompressionLevel + : compressionLevel) {} + +uint64_t ZstdCodec::decompress( + uint64_t inputLength, + const uint8_t* input, + uint64_t outputLength, + uint8_t* output) { + if (output == nullptr) { + // We may pass a NULL 0-byte output buffer but some zstd versions demand + // a valid pointer: https://github.com/facebook/zstd/issues/1385 + static uint8_t emptyBuffer; + VELOX_DCHECK_EQ(outputLength, 0); + output = &emptyBuffer; + } + + auto ret = ZSTD_decompress( + output, + static_cast(outputLength), + input, + static_cast(inputLength)); + if (ZSTD_isError(ret)) { + zstdError(ret, "ZSTD decompression failed: "); + } + VELOX_CHECK_EQ( + static_cast(ret), + outputLength, + "Corrupt ZSTD compressed data."); + return static_cast(ret); +} + +uint64_t ZstdCodec::maxCompressedLength(uint64_t inputLength) { + return ZSTD_compressBound(static_cast(inputLength)); +} + +std::optional ZstdCodec::doGetUncompressedLength( + uint64_t inputLength, + const uint8_t* input, + std::optional uncompressedLength) const { + // Read decompressed size from frame if available in input. + auto decompressedSize = ZSTD_getFrameContentSize(input, inputLength); + if (decompressedSize == ZSTD_CONTENTSIZE_UNKNOWN || + decompressedSize == ZSTD_CONTENTSIZE_ERROR) { + return uncompressedLength; + } + return decompressedSize; +} + +uint64_t ZstdCodec::compress( + uint64_t inputLength, + const uint8_t* input, + uint64_t outputBufferLength, + uint8_t* output) { + auto ret = ZSTD_compress( + output, + static_cast(outputBufferLength), + input, + static_cast(inputLength), + compressionLevel_); + if (ZSTD_isError(ret)) { + zstdError(ret, "ZSTD compression failed: "); + } + return static_cast(ret); +} + +uint64_t ZstdCodec::compressPartial( + uint64_t inputLength, + const uint8_t* input, + uint64_t outputBufferLength, + uint8_t* output) { + auto ret = ZSTD_compress( + output, + static_cast(outputBufferLength), + input, + static_cast(inputLength), + compressionLevel_); + if (ZSTD_isError(ret)) { + // It's fine to hit dest size too small. + if (ZSTD_getErrorCode(ret) == ZSTD_ErrorCode::ZSTD_error_dstSize_tooSmall) { + return outputBufferLength; + } + zstdError(ret, "ZSTD compression failed: "); + } + return static_cast(ret); +} + +std::shared_ptr ZstdCodec::makeCompressor() { + auto ptr = std::make_shared(compressionLevel_); + ptr->init(); + return ptr; +} + +std::shared_ptr ZstdCodec::makeDecompressor() { + auto ptr = std::make_shared(); + ptr->init(); + return ptr; +} + +CompressionKind ZstdCodec::compressionKind() const { + return CompressionKind_ZSTD; +} + +int32_t ZstdCodec::minimumCompressionLevel() const { + return ZSTD_minCLevel(); +} + +int32_t ZstdCodec::maximumCompressionLevel() const { + return ZSTD_maxCLevel(); +} + +int32_t ZstdCodec::defaultCompressionLevel() const { + return kZSTDDefaultCompressionLevel; +} + +int32_t ZstdCodec::compressionLevel() const { + return compressionLevel_; +} + +std::unique_ptr makeZstdCodec(int32_t compressionLevel) { + return std::make_unique(compressionLevel); +} +} // namespace facebook::velox::common diff --git a/velox/common/compression/v2/ZstdCompression.h b/velox/common/compression/v2/ZstdCompression.h new file mode 100644 index 000000000000..45c18c435317 --- /dev/null +++ b/velox/common/compression/v2/ZstdCompression.h @@ -0,0 +1,76 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include "velox/common/compression/v2/Compression.h" + +namespace facebook::velox::common { + +constexpr int kZSTDDefaultCompressionLevel = 1; + +class ZstdCodec : public Codec { + public: + explicit ZstdCodec(int32_t compressionLevel); + + uint64_t decompress( + uint64_t inputLength, + const uint8_t* input, + uint64_t outputLength, + uint8_t* output) override; + + uint64_t maxCompressedLength(uint64_t inputLength) override; + + uint64_t compress( + uint64_t inputLength, + const uint8_t* input, + uint64_t outputBufferLength, + uint8_t* output) override; + + uint64_t compressPartial( + uint64_t inputLength, + const uint8_t* input, + uint64_t outputBufferLength, + uint8_t* output) override; + + std::shared_ptr makeCompressor() override; + + std::shared_ptr makeDecompressor() override; + + CompressionKind compressionKind() const override; + + int32_t minimumCompressionLevel() const override; + + int32_t maximumCompressionLevel() const override; + + int32_t defaultCompressionLevel() const override; + + int32_t compressionLevel() const override; + + private: + std::optional doGetUncompressedLength( + uint64_t inputLength, + const uint8_t* input, + std::optional uncompressedLength) const override; + + int32_t compressionLevel_; +}; + +std::unique_ptr makeZstdCodec(int32_t compressionLevel); + +}; // namespace facebook::velox::common diff --git a/velox/common/compression/v2/qat/CMakeLists.txt b/velox/common/compression/v2/qat/CMakeLists.txt new file mode 100644 index 000000000000..825941a16486 --- /dev/null +++ b/velox/common/compression/v2/qat/CMakeLists.txt @@ -0,0 +1,16 @@ +# Copyright (c) Facebook, Inc. and its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +target_sources(velox_common_compression_v2 PRIVATE QatCompression.cpp) +target_link_libraries(velox_common_compression_v2 qatzip::qatzip) diff --git a/velox/common/compression/v2/qat/QatCompression.cpp b/velox/common/compression/v2/qat/QatCompression.cpp new file mode 100644 index 000000000000..addeb410afce --- /dev/null +++ b/velox/common/compression/v2/qat/QatCompression.cpp @@ -0,0 +1,133 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include + +#include "velox/common/base/Exceptions.h" +#include "velox/common/compression/v2/qat/QatCompression.h" + +namespace facebook::velox::common::qat { + +#define QZ_INIT_FAIL(rc) (QZ_OK != rc && QZ_DUPLICATE != rc) + +#define QZ_SETUP_SESSION_FAIL(rc) \ + (QZ_PARAMS == rc || QZ_NOSW_NO_HW == rc || QZ_NOSW_LOW_MEM == rc) + +class QatGZipCodec final : public Codec { + public: + QatGZipCodec(int32_t compressionLevel, QzPollingMode_T pollingMode) + : compressionLevel_(compressionLevel) { + auto rc = qzInit(&qzSession_, /* sw_backup = */ 1); + if (QZ_INIT_FAIL(rc)) { + VLOG(1) << "QAT hardware init failed with error: " << rc; + } else { + QzSessionParamsDeflate_T params; + // Get the default parameters. + qzGetDefaultsDeflate(¶ms); + params.common_params.polling_mode = pollingMode; + params.common_params.comp_lvl = compressionLevel; + rc = qzSetupSessionDeflate(&qzSession_, ¶ms); + if (QZ_SETUP_SESSION_FAIL(rc)) { + VLOG(1) << "QAT setup session failed with error: " << rc; + } + } + } + + ~QatGZipCodec() { + (void)qzTeardownSession(&qzSession_); + (void)qzClose(&qzSession_); + } + + uint64_t decompress( + uint64_t inputLength, + const uint8_t* input, + uint64_t outputLength, + uint8_t* output) override { + auto compressedSize = static_cast(inputLength); + auto uncompressedSize = static_cast(outputLength); + auto ret = qzDecompress( + &qzSession_, input, &compressedSize, output, &uncompressedSize); + if (ret == QZ_OK) { + return static_cast(uncompressedSize); + } else if (ret == QZ_PARAMS) { + VELOX_FAIL("QAT decompression failure: params is invalid"); + } else if (ret == QZ_FAIL) { + VELOX_FAIL("QAT decompression failure: Function did not succeed"); + } else { + VELOX_FAIL("QAT decompression failure with error: {}", ret); + } + } + + uint64_t maxCompressedLength(uint64_t inputLen) override { + return qzMaxCompressedLength(static_cast(inputLen), &qzSession_); + } + + uint64_t compress( + uint64_t inputLength, + const uint8_t* input, + uint64_t outputLength, + uint8_t* output) override { + auto uncompressedSize = static_cast(inputLength); + auto compressedSize = static_cast(outputLength); + auto ret = qzCompress( + &qzSession_, input, &uncompressedSize, output, &compressedSize, 1); + if (ret == QZ_OK) { + return static_cast(compressedSize); + } else if (ret == QZ_PARAMS) { + VELOX_FAIL("QAT compression failure: params is invalid"); + } else if (ret == QZ_FAIL) { + VELOX_FAIL("QAT compression failure: function did not succeed"); + } else { + VELOX_FAIL("QAT compression failure with error: {}", ret); + } + } + + std::shared_ptr makeCompressor() override { + VELOX_UNSUPPORTED("Streaming compression unsupported with QAT"); + } + + std::shared_ptr makeDecompressor() override { + VELOX_UNSUPPORTED("Streaming decompression unsupported with QAT"); + } + + int32_t compressionLevel() const override { + return compressionLevel_; + } + + CompressionKind compressionKind() const override { + return CompressionKind_GZIP; + } + + int32_t minimumCompressionLevel() const override { + return QZ_DEFLATE_COMP_LVL_MINIMUM; + } + int32_t maximumCompressionLevel() const override { + return QZ_DEFLATE_COMP_LVL_MAXIMUM; + } + int32_t defaultCompressionLevel() const override { + return QZ_COMP_LEVEL_DEFAULT; + } + + private: + int32_t compressionLevel_; + QzSession_T qzSession_ = {0}; +}; + +std::unique_ptr makeQatGzipCodec( + int32_t compressionLevel, + QzPollingMode_T pollingMode) { + return std::make_unique(compressionLevel, pollingMode); +} +} // namespace facebook::velox::common::qat \ No newline at end of file diff --git a/velox/common/compression/v2/qat/QatCompression.h b/velox/common/compression/v2/qat/QatCompression.h new file mode 100644 index 000000000000..c556b0ed1b31 --- /dev/null +++ b/velox/common/compression/v2/qat/QatCompression.h @@ -0,0 +1,38 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include +#include "velox/common/compression/v2/Compression.h" + +namespace facebook::velox::common::qat { + +class QatGzipCodecOptions : public CodecOptions { + public: + QatGzipCodecOptions( + int32_t compressionLevel = kUseDefaultCompressionLevel, + QzPollingMode_T pollingMode = QZ_BUSY_POLLING) + : CodecOptions(compressionLevel), pollingMode(pollingMode) {} + + QzPollingMode_T pollingMode; +}; + +std::unique_ptr makeQatGzipCodec( + int32_t compressionLevel = QZ_COMP_LEVEL_DEFAULT, + QzPollingMode_T pollingMode = QZ_BUSY_POLLING); + +} // namespace facebook::velox::common::qat \ No newline at end of file diff --git a/velox/common/compression/v2/tests/CMakeLists.txt b/velox/common/compression/v2/tests/CMakeLists.txt new file mode 100644 index 000000000000..983fd6c81fda --- /dev/null +++ b/velox/common/compression/v2/tests/CMakeLists.txt @@ -0,0 +1,21 @@ +# Copyright (c) Facebook, Inc. and its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +add_executable(velox_common_compression_v2_test CompressionTest.cpp) +add_test(velox_common_compression_v2_test velox_common_compression_v2_test) +target_link_libraries( + velox_common_compression_v2_test + PUBLIC velox_link_libs + PRIVATE velox_common_compression_v2 velox_exception glog::glog gtest + gtest_main) diff --git a/velox/common/compression/v2/tests/CompressionTest.cpp b/velox/common/compression/v2/tests/CompressionTest.cpp new file mode 100644 index 000000000000..f25af7af4296 --- /dev/null +++ b/velox/common/compression/v2/tests/CompressionTest.cpp @@ -0,0 +1,501 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include +#include + +#include "velox/common/base/VeloxException.h" +#include "velox/common/base/tests/GTestUtils.h" +#include "velox/common/compression/v2/Compression.h" +#include "velox/common/compression/v2/GzipCompression.h" + +namespace facebook::velox::common { + +namespace { + +const std::shared_ptr kDefaultCodecOptions = + std::make_shared(); + +struct TestParam { + CompressionKind compressionKind; + std::shared_ptr codecOptions; + + TestParam( + common::CompressionKind compressionKind, + std::shared_ptr codecOptions = kDefaultCodecOptions) + : compressionKind(compressionKind), codecOptions(codecOptions) {} +}; + +std::vector makeRandomData(size_t n) { + std::vector data(n); + std::default_random_engine engine(42); + std::uniform_int_distribution dist(0, 255); + std::generate(data.begin(), data.end(), [&]() { return dist(engine); }); + return data; +} + +std::vector makeCompressibleData(size_t size) { + std::string baseData = "The quick brown fox jumps over the lazy dog"; + auto repeats = static_cast(1 + size / baseData.size()); + + std::vector data(baseData.size() * repeats); + for (int i = 0; i < repeats; ++i) { + std::memcpy( + data.data() + i * baseData.size(), baseData.data(), baseData.size()); + } + data.resize(size); + return data; +} + +std::function makeRandomInputSize() { + std::default_random_engine engine(42); + std::uniform_int_distribution sizeDistribution(10, 40); + return [=]() mutable -> uint64_t { return sizeDistribution(engine); }; +} + +// Check roundtrip of one-shot compression and decompression functions. +void checkCodecRoundtrip( + Codec* c1, + Codec* c2, + const std::vector& data) { + auto maxCompressedLen = + static_cast(c1->maxCompressedLength(data.size())); + std::vector compressed(maxCompressedLen); + std::vector decompressed(data.size()); + + // Compress with codec c1. + auto compressedSize = c1->compress( + data.size(), data.data(), maxCompressedLen, compressed.data()); + compressed.resize(compressedSize); + + // Decompress with codec c2. + auto decompressedSize = c2->decompress( + compressed.size(), + compressed.data(), + decompressed.size(), + decompressed.data()); + + ASSERT_EQ(data, decompressed); + ASSERT_EQ(data.size(), decompressedSize); +} + +// Use same codec for both compression and decompression. +void checkCodecRoundtrip( + const std::unique_ptr& codec, + const std::vector& data) { + checkCodecRoundtrip(codec.get(), codec.get(), data); +} + +// Compress with codec c1 and decompress with codec c2. +void checkCodecRoundtrip( + const std::unique_ptr& c1, + const std::unique_ptr& c2, + const std::vector& data) { + checkCodecRoundtrip(c1.get(), c2.get(), data); +} + +void streamingCompress( + const std::shared_ptr& compressor, + const std::vector& uncompressed, + std::vector& compressed) { + const uint8_t* input = uncompressed.data(); + uint64_t remaining = uncompressed.size(); + uint64_t compressedSize = 0; + compressed.resize(10); + bool doFlush = false; + // Generate small random input buffer size. + auto randomInputSize = makeRandomInputSize(); + // Continue decompressing until consuming all compressed data . + while (remaining > 0) { + // Feed a small amount each time. + auto inputLength = std::min(remaining, randomInputSize()); + auto outputLength = compressed.size() - compressedSize; + uint8_t* output = compressed.data() + compressedSize; + // Compress once. + auto compressResult = + compressor->compress(inputLength, input, outputLength, output); + ASSERT_LE(compressResult.bytesRead, inputLength); + ASSERT_LE(compressResult.bytesWritten, outputLength); + // Update result. + compressedSize += compressResult.bytesWritten; + input += compressResult.bytesRead; + remaining -= compressResult.bytesRead; + // Grow compressed buffer if it's too small. + if (compressResult.outputTooSmall) { + compressed.resize(compressed.capacity() * 2); + } + // Once every two iterations, do a flush. + if (doFlush) { + Compressor::FlushResult flushResult; + do { + outputLength = compressed.size() - compressedSize; + output = compressed.data() + compressedSize; + flushResult = compressor->flush(outputLength, output); + ASSERT_LE(flushResult.bytesWritten, outputLength); + compressedSize += flushResult.bytesWritten; + if (flushResult.outputTooSmall) { + compressed.resize(compressed.capacity() * 2); + } + } while (flushResult.outputTooSmall); + } + doFlush = !doFlush; + } + // End the compressed stream. + Compressor::EndResult endResult; + do { + int64_t output_len = compressed.size() - compressedSize; + uint8_t* output = compressed.data() + compressedSize; + endResult = compressor->end(output_len, output); + ASSERT_LE(endResult.bytesWritten, output_len); + compressedSize += endResult.bytesWritten; + if (endResult.outputTooSmall) { + compressed.resize(compressed.capacity() * 2); + } + } while (endResult.outputTooSmall); + compressed.resize(compressedSize); +} + +void streamingDecompress( + const std::shared_ptr& decompressor, + const std::vector& compressed, + std::vector& decompressed) { + const uint8_t* input = compressed.data(); + uint64_t remaining = compressed.size(); + uint64_t decompressedSize = 0; + decompressed.resize(10); + // Generate small random input buffer size. + auto ramdomInputSize = makeRandomInputSize(); + // Continue decompressing until finishes. + while (!decompressor->isFinished()) { + // Feed a small amount each time. + auto inputLength = std::min(remaining, ramdomInputSize()); + auto outputLength = decompressed.size() - decompressedSize; + uint8_t* output = decompressed.data() + decompressedSize; + // Decompress once. + auto result = + decompressor->decompress(inputLength, input, outputLength, output); + ASSERT_LE(result.bytesRead, inputLength); + ASSERT_LE(result.bytesWritten, outputLength); + ASSERT_TRUE( + result.outputTooSmall || result.bytesWritten > 0 || + result.bytesRead > 0) + << "Decompression not progressing anymore"; + // Update result. + decompressedSize += result.bytesWritten; + input += result.bytesRead; + remaining -= result.bytesRead; + // Grow decompressed buffer if it's too small. + if (result.outputTooSmall) { + decompressed.resize(decompressed.capacity() * 2); + } + } + ASSERT_TRUE(decompressor->isFinished()); + ASSERT_EQ(remaining, 0); + decompressed.resize(decompressedSize); +} + +// Check the streaming compressor against one-shot decompression +void checkStreamingCompressor(Codec* codec, const std::vector& data) { + // Run streaming compression. + std::vector compressed; + streamingCompress(codec->makeCompressor(), data, compressed); + // Check decompressing the compressed data. + std::vector decompressed(data.size()); + ASSERT_NO_THROW(codec->decompress( + compressed.size(), + compressed.data(), + decompressed.size(), + decompressed.data())); + ASSERT_EQ(data, decompressed); +} + +// Check the streaming decompressor against one-shot compression. +void checkStreamingDecompressor( + Codec* codec, + const std::vector& data) { + // Create compressed data. + auto maxCompressedLen = codec->maxCompressedLength(data.size()); + std::vector compressed(maxCompressedLen); + auto compressedSize = codec->compress( + data.size(), data.data(), maxCompressedLen, compressed.data()); + compressed.resize(compressedSize); + // Run streaming decompression. + std::vector decompressed; + streamingDecompress(codec->makeDecompressor(), compressed, decompressed); + // Check the decompressed data. + ASSERT_EQ(data.size(), decompressed.size()); + ASSERT_EQ(data, decompressed); +} + +// Check the streaming compressor and decompressor together. +void checkStreamingRoundtrip( + const std::shared_ptr& compressor, + const std::shared_ptr& decompressor, + const std::vector& data) { + std::vector compressed; + streamingCompress(compressor, data, compressed); + std::vector decompressed; + streamingDecompress(decompressor, compressed, decompressed); + ASSERT_EQ(data, decompressed); +} + +void checkStreamingRoundtrip(Codec* codec, const std::vector& data) { + checkStreamingRoundtrip( + codec->makeCompressor(), codec->makeDecompressor(), data); +} +} // namespace + +class CodecTest : public ::testing::TestWithParam { + protected: + static CompressionKind getCompressionKind() { + return GetParam().compressionKind; + } + + static const CodecOptions& getCodecOptions() { + return *GetParam().codecOptions; + } + + static std::unique_ptr makeCodec() { + return Codec::create(getCompressionKind(), getCodecOptions()); + } +}; + +TEST_P(CodecTest, specifyCompressionLevel) { + std::vector data = makeRandomData(2000); + const auto kind = getCompressionKind(); + if (!Codec::isAvailable(kind)) { + // Support for this codec hasn't been built. + VELOX_ASSERT_THROW( + Codec::create(kind, Codec::useDefaultCompressionLevel()), + "Support for codec '" + compressionKindToString(kind) + + "' not implemented."); + } else if (!Codec::supportsCompressionLevel(kind)) { + VELOX_ASSERT_THROW( + Codec::create(kind, 1), + fmt::format( + "Codec '{}' doesn't support setting a compression level.", + compressionKindToString(kind))); + } else { + auto codec = Codec::create(kind, Codec::minimumCompressionLevel(kind)); + checkCodecRoundtrip(codec, data); + } +} + +TEST_P(CodecTest, getUncompressedLength) { + auto codec = makeCodec(); + // Test non-empty input. + { + auto inputLength = 100; + auto input = makeRandomData(inputLength); + std::vector compressed(codec->maxCompressedLength(input.size())); + auto compressedLength = codec->compress( + inputLength, input.data(), compressed.size(), compressed.data()); + compressed.resize(compressedLength); + + if (Codec::supportsGetUncompressedLength(getCompressionKind())) { + auto uncompressedLength = + codec->getUncompressedLength(compressedLength, compressed.data()); + ASSERT_EQ( + codec->getUncompressedLength(compressedLength, compressed.data()), + inputLength); + ASSERT_EQ( + codec->getUncompressedLength( + compressedLength, compressed.data(), inputLength), + inputLength); + ASSERT_EQ( + codec->getUncompressedLength( + compressedLength, compressed.data(), std::nullopt), + inputLength); + VELOX_ASSERT_THROW( + codec->getUncompressedLength( + compressedLength, compressed.data(), inputLength + 1), + fmt::format("Invalid uncompressed length: {}", inputLength + 1)); + } else { + ASSERT_EQ( + codec->getUncompressedLength(input.size(), input.data()), + std::nullopt); + ASSERT_EQ( + codec->getUncompressedLength( + input.size(), input.data(), std::nullopt), + std::nullopt); + ASSERT_EQ(codec->getUncompressedLength(input.size(), input.data(), 0), 0); + ASSERT_EQ(codec->getUncompressedLength(input.size(), input.data(), 2), 2); + } + } + // Test empty input. + { + std::vector input{}; + ASSERT_EQ(codec->getUncompressedLength(0, input.data(), 0), 0); + ASSERT_EQ(codec->getUncompressedLength(0, input.data(), std::nullopt), 0); + VELOX_ASSERT_THROW( + codec->getUncompressedLength(0, input.data(), 1), + fmt::format("Invalid uncompressed length: {}", 1)); + } +} + +TEST_P(CodecTest, codecRoundtrip) { + auto codec = makeCodec(); + for (int dataSize : {0, 10, 10000, 100000}) { + checkCodecRoundtrip(codec, makeRandomData(dataSize)); + checkCodecRoundtrip(codec, makeCompressibleData(dataSize)); + } +} + +TEST_P(CodecTest, minMaxCompressionLevel) { + auto type = getCompressionKind(); + auto codec = makeCodec(); + auto notSupportCompressionLevel = [](CompressionKind kind) { + return fmt::format( + "Codec '{}' doesn't support setting a compression level.", + compressionKindToString(kind)); + }; + + if (Codec::supportsCompressionLevel(type)) { + auto minLevel = Codec::minimumCompressionLevel(type); + auto maxLevel = Codec::maximumCompressionLevel(type); + auto defaultLevel = Codec::defaultCompressionLevel(type); + ASSERT_NE(minLevel, Codec::useDefaultCompressionLevel()); + ASSERT_NE(maxLevel, Codec::useDefaultCompressionLevel()); + ASSERT_NE(defaultLevel, Codec::useDefaultCompressionLevel()); + ASSERT_LT(minLevel, maxLevel); + ASSERT_EQ(minLevel, codec->minimumCompressionLevel()); + ASSERT_EQ(maxLevel, codec->maximumCompressionLevel()); + ASSERT_GE(defaultLevel, minLevel); + ASSERT_LE(defaultLevel, maxLevel); + } else { + VELOX_ASSERT_THROW( + Codec::minimumCompressionLevel(type), notSupportCompressionLevel(type)); + VELOX_ASSERT_THROW( + Codec::maximumCompressionLevel(type), notSupportCompressionLevel(type)); + VELOX_ASSERT_THROW( + Codec::defaultCompressionLevel(type), notSupportCompressionLevel(type)); + ASSERT_EQ( + codec->minimumCompressionLevel(), Codec::useDefaultCompressionLevel()); + ASSERT_EQ( + codec->maximumCompressionLevel(), Codec::useDefaultCompressionLevel()); + ASSERT_EQ( + codec->defaultCompressionLevel(), Codec::useDefaultCompressionLevel()); + } +} + +TEST_P(CodecTest, streamingCompressor) { + if (!Codec::supportsStreamingCompression(getCompressionKind())) { + return; + } + + for (auto dataSize : {0, 10, 10000, 100000}) { + auto codec = makeCodec(); + checkStreamingCompressor(codec.get(), makeRandomData(dataSize)); + checkStreamingCompressor(codec.get(), makeCompressibleData(dataSize)); + } +} + +TEST_P(CodecTest, streamingDecompressor) { + if (!Codec::supportsStreamingCompression(getCompressionKind())) { + return; + } + + for (auto dataSize : {0, 10, 10000, 100000}) { + auto codec = makeCodec(); + checkStreamingDecompressor(codec.get(), makeRandomData(dataSize)); + checkStreamingDecompressor(codec.get(), makeCompressibleData(dataSize)); + } +} + +TEST_P(CodecTest, streamingRoundtrip) { + if (!Codec::supportsStreamingCompression(getCompressionKind())) { + return; + } + + for (auto dataSize : {0, 10, 10000, 100000}) { + auto codec = makeCodec(); + checkStreamingRoundtrip(codec.get(), makeRandomData(dataSize)); + checkStreamingRoundtrip(codec.get(), makeCompressibleData(dataSize)); + } +} + +TEST_P(CodecTest, streamingDecompressorReuse) { + if (!Codec::supportsStreamingCompression(getCompressionKind())) { + return; + } + + auto codec = makeCodec(); + auto decompressor = codec->makeDecompressor(); + checkStreamingRoundtrip( + codec->makeCompressor(), decompressor, makeRandomData(100)); + + // Decompressor::reset() should allow reusing decompressor for a new stream. + decompressor->reset(); + checkStreamingRoundtrip( + codec->makeCompressor(), decompressor, makeRandomData(200)); +} + +std::vector getGzipTestParams() { + std::vector params; + for (auto windowBits : {kGzipDefaultWindowBits, kGzip4KBWindowBits}) { + for (auto format : {GzipFormat::kGzip, GzipFormat::kDeflate}) { + params.emplace_back( + CompressionKind::CompressionKind_ZLIB, + std::make_shared( + kUseDefaultCompressionLevel, format, windowBits)); + } + params.emplace_back( + CompressionKind::CompressionKind_ZLIB, + std::make_shared( + kUseDefaultCompressionLevel, GzipFormat::kZlib, windowBits)); + } + return params; +} + +INSTANTIATE_TEST_SUITE_P( + TestLZ4Frame, + CodecTest, + ::testing::Values(CompressionKind::CompressionKind_LZ4)); +INSTANTIATE_TEST_SUITE_P( + TestLZ4Raw, + CodecTest, + ::testing::Values(CompressionKind::CompressionKind_LZ4RAW)); +INSTANTIATE_TEST_SUITE_P( + TestLZ4Hadoop, + CodecTest, + ::testing::Values(CompressionKind::CompressionKind_LZ4HADOOP)); +INSTANTIATE_TEST_SUITE_P( + TestGzip, + CodecTest, + ::testing::ValuesIn(getGzipTestParams())); +INSTANTIATE_TEST_SUITE_P( + TestZstd, + CodecTest, + ::testing::Values(CompressionKind::CompressionKind_ZSTD)); +INSTANTIATE_TEST_SUITE_P( + TestSnappy, + CodecTest, + ::testing::Values(CompressionKind::CompressionKind_SNAPPY)); + +TEST(CodecLZ4HadoopTest, compatibility) { + // LZ4 Hadoop codec should be able to read back LZ4 raw blocks. + auto c1 = Codec::create(CompressionKind::CompressionKind_LZ4RAW); + auto c2 = Codec::create(CompressionKind::CompressionKind_LZ4HADOOP); + + for (auto dataSize : {0, 10, 10000, 100000}) { + checkCodecRoundtrip(c1, c2, makeRandomData(dataSize)); + } +} +} // namespace facebook::velox::common