From dbe4f44cb6bbf5afd784c94eab2303affbd770bc Mon Sep 17 00:00:00 2001 From: Rong Ma Date: Thu, 16 Nov 2023 14:04:48 +0000 Subject: [PATCH] add QAT gzip codec --- CMake/FindQAT.cmake | 97 +++++++++++++ CMakeLists.txt | 6 + velox/CMakeLists.txt | 4 + velox/common/compression/v2/Compression.cpp | 15 +- .../common/compression/v2/qat/CMakeLists.txt | 16 +++ .../compression/v2/qat/QatCompression.cpp | 133 ++++++++++++++++++ .../compression/v2/qat/QatCompression.h | 38 +++++ 7 files changed, 306 insertions(+), 3 deletions(-) create mode 100644 CMake/FindQAT.cmake create mode 100644 velox/common/compression/v2/qat/CMakeLists.txt create mode 100644 velox/common/compression/v2/qat/QatCompression.cpp create mode 100644 velox/common/compression/v2/qat/QatCompression.h diff --git a/CMake/FindQAT.cmake b/CMake/FindQAT.cmake new file mode 100644 index 000000000000..ab4845b01c3c --- /dev/null +++ b/CMake/FindQAT.cmake @@ -0,0 +1,97 @@ +# Copyright (c) Facebook, Inc. and its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +include_guard(GLOBAL) + +include(ExternalProject) + +macro(build_qatzip) + message(STATUS "Building QATzip from source") + set(QATZIP_BUILD_VERSION "v1.1.2") + set(QATZIP_BUILD_SHA256_CHECKSUM + "31419fa4b42d217b3e55a70a34545582cbf401a4f4d44738d21b4a3944b1e1ef") + set(QATZIP_SOURCE_URL + "https://github.com/intel/QATzip/archive/refs/tags/${QATZIP_BUILD_VERSION}.tar.gz" + ) + set(QATZIP_LIB_NAME "qatzip") + + set(QATZIP_PREFIX "${CMAKE_CURRENT_BINARY_DIR}/qatzip_ep-install") + set(QATZIP_SOURCE_DIR "${QATZIP_PREFIX}/src/qatzip_ep") + set(QATZIP_INCLUDE_DIR "${QATZIP_SOURCE_DIR}/include") + set(QATZIP_STATIC_LIB_NAME + "${CMAKE_STATIC_LIBRARY_PREFIX}${QATZIP_LIB_NAME}${CMAKE_STATIC_LIBRARY_SUFFIX}" + ) + set(QATZIP_STATIC_LIB_TARGETS + "${QATZIP_SOURCE_DIR}/src/.libs/${QATZIP_STATIC_LIB_NAME}") + set(QATZIP_CONFIGURE_ARGS "--prefix=${QATZIP_PREFIX}" "--with-pic" + "--with-ICP_ROOT=${ICP_ROOT}") + + ExternalProject_Add( + qatzip_ep + PREFIX ${QATZIP_PREFIX} + URL ${QATZIP_SOURCE_URL} + URL_HASH "SHA256=${QATZIP_BUILD_SHA256_CHECKSUM}" + SOURCE_DIR ${QATZIP_SOURCE_DIR} + CONFIGURE_COMMAND ${CMAKE_COMMAND} -E env QZ_ROOT=${QATZIP_SOURCE_DIR} + ./configure ${QATZIP_CONFIGURE_ARGS} + BUILD_COMMAND ${MAKE_PROGRAM} all + BUILD_BYPRODUCTS ${QATZIP_STATIC_LIB_TARGETS} + BUILD_IN_SOURCE 1) + + ExternalProject_Add_Step( + qatzip_ep pre-configure + COMMAND ./autogen.sh + DEPENDEES download + DEPENDERS configure + WORKING_DIRECTORY ${QATZIP_SOURCE_DIR}) + + # The include directory must exist before it is referenced by a target. + file(MAKE_DIRECTORY "${QATZIP_INCLUDE_DIR}") + + set(QATZIP_LINK_LIBRARIES + ZLIB::ZLIB lz4::lz4 "${UDEV_LIBRARY}" "${USDM_DRV_LIBRARY}" + "${QAT_S_LIBRARY}" Threads::Threads) + + add_library(qatzip::qatzip STATIC IMPORTED) + set_target_properties( + qatzip::qatzip + PROPERTIES IMPORTED_LOCATION "${QATZIP_STATIC_LIB_TARGETS}" + INTERFACE_INCLUDE_DIRECTORIES "${QATZIP_INCLUDE_DIR}" + INTERFACE_LINK_LIBRARIES "${QATZIP_LINK_LIBRARIES}") + + add_dependencies(qatzip::qatzip qatzip_ep) +endmacro() + +set(ICP_ROOT $ENV{ICP_ROOT}) +set(THREADS_PREFER_PTHREAD_FLAG ON) + +find_package(Threads REQUIRED) +find_program(MAKE_PROGRAM make REQUIRED) + +find_library(UDEV_LIBRARY REQUIRED NAMES udev) +find_library( + USDM_DRV_LIBRARY REQUIRED + NAMES usdm_drv_s + PATHS "${ICP_ROOT}/build" + NO_DEFAULT_PATH) +find_library( + QAT_S_LIBRARY REQUIRED + NAMES qat_s + PATHS "${ICP_ROOT}/build" + NO_DEFAULT_PATH) + +message(STATUS "Found udev: ${UDEV_LIBRARY}") +message(STATUS "Found usdm_drv: ${USDM_DRV_LIBRARY}") +message(STATUS "Found qat_s: ${QAT_S_LIBRARY}") + +build_qatzip() diff --git a/CMakeLists.txt b/CMakeLists.txt index 31c14788aa1c..e5699fab95e1 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -98,6 +98,7 @@ option(VELOX_ENABLE_PARQUET "Enable Parquet support" OFF) option(VELOX_ENABLE_ARROW "Enable Arrow support" OFF) option(VELOX_ENABLE_REMOTE_FUNCTIONS "Enable remote function support" OFF) option(VELOX_ENABLE_CCACHE "Use ccache if installed." ON) +option(VELOX_ENABLE_QAT "Enable Intel QuickAssist Technology support" OFF) option(VELOX_BUILD_TEST_UTILS "Builds Velox test utilities" OFF) option(VELOX_BUILD_PYTHON_PACKAGE "Builds Velox Python bindings" OFF) @@ -257,6 +258,11 @@ if(VELOX_ENABLE_PARQUET) set(VELOX_ENABLE_ARROW ON) endif() +if(VELOX_ENABLE_QAT) + find_package(QAT REQUIRED) + add_definitions(-DVELOX_ENABLE_QAT) +endif() + if(VELOX_ENABLE_REMOTE_FUNCTIONS) # TODO: Move this to use resolve_dependency(). For some reason, FBThrift # requires clients to explicitly install fizz and wangle. diff --git a/velox/CMakeLists.txt b/velox/CMakeLists.txt index ea35aa1ea59c..0b71efba8136 100644 --- a/velox/CMakeLists.txt +++ b/velox/CMakeLists.txt @@ -79,3 +79,7 @@ endif() if(${VELOX_ENABLE_SUBSTRAIT}) add_subdirectory(substrait) endif() + +if(${VELOX_ENABLE_QAT}) + add_subdirectory(common/compression/v2/qat) +endif() diff --git a/velox/common/compression/v2/Compression.cpp b/velox/common/compression/v2/Compression.cpp index dcb01c324e55..df39f099640c 100644 --- a/velox/common/compression/v2/Compression.cpp +++ b/velox/common/compression/v2/Compression.cpp @@ -19,7 +19,6 @@ #include "velox/common/compression/v2/Compression.h" #include #include -#include #include "velox/common/base/Exceptions.h" #include "velox/common/compression/v2/GzipCompression.h" #include "velox/common/compression/v2/Lz4Compression.h" @@ -27,6 +26,10 @@ #include "velox/common/compression/v2/SnappyCompression.h" #include "velox/common/compression/v2/ZstdCompression.h" +#ifdef VELOX_ENABLE_QAT +#include "velox/common/compression/v2/qat/QatCompression.h" +#endif + namespace facebook::velox::common { namespace { @@ -127,11 +130,17 @@ std::unique_ptr Codec::create( codec = makeLz4HadoopRawCodec(); break; case CompressionKind::CompressionKind_GZIP: { - auto opt = dynamic_cast(&codecOptions); - if (opt) { + if (auto opt = dynamic_cast(&codecOptions)) { codec = makeGzipCodec(compressionLevel, opt->format, opt->windowBits); break; } +#ifdef VELOX_ENABLE_QAT + if (auto opt = + dynamic_cast(&codecOptions)) { + codec = qat::makeQatGzipCodec(compressionLevel, opt->pollingMode); + break; + } +#endif codec = makeGzipCodec(compressionLevel); break; } diff --git a/velox/common/compression/v2/qat/CMakeLists.txt b/velox/common/compression/v2/qat/CMakeLists.txt new file mode 100644 index 000000000000..825941a16486 --- /dev/null +++ b/velox/common/compression/v2/qat/CMakeLists.txt @@ -0,0 +1,16 @@ +# Copyright (c) Facebook, Inc. and its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +target_sources(velox_common_compression_v2 PRIVATE QatCompression.cpp) +target_link_libraries(velox_common_compression_v2 qatzip::qatzip) diff --git a/velox/common/compression/v2/qat/QatCompression.cpp b/velox/common/compression/v2/qat/QatCompression.cpp new file mode 100644 index 000000000000..addeb410afce --- /dev/null +++ b/velox/common/compression/v2/qat/QatCompression.cpp @@ -0,0 +1,133 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include + +#include "velox/common/base/Exceptions.h" +#include "velox/common/compression/v2/qat/QatCompression.h" + +namespace facebook::velox::common::qat { + +#define QZ_INIT_FAIL(rc) (QZ_OK != rc && QZ_DUPLICATE != rc) + +#define QZ_SETUP_SESSION_FAIL(rc) \ + (QZ_PARAMS == rc || QZ_NOSW_NO_HW == rc || QZ_NOSW_LOW_MEM == rc) + +class QatGZipCodec final : public Codec { + public: + QatGZipCodec(int32_t compressionLevel, QzPollingMode_T pollingMode) + : compressionLevel_(compressionLevel) { + auto rc = qzInit(&qzSession_, /* sw_backup = */ 1); + if (QZ_INIT_FAIL(rc)) { + VLOG(1) << "QAT hardware init failed with error: " << rc; + } else { + QzSessionParamsDeflate_T params; + // Get the default parameters. + qzGetDefaultsDeflate(¶ms); + params.common_params.polling_mode = pollingMode; + params.common_params.comp_lvl = compressionLevel; + rc = qzSetupSessionDeflate(&qzSession_, ¶ms); + if (QZ_SETUP_SESSION_FAIL(rc)) { + VLOG(1) << "QAT setup session failed with error: " << rc; + } + } + } + + ~QatGZipCodec() { + (void)qzTeardownSession(&qzSession_); + (void)qzClose(&qzSession_); + } + + uint64_t decompress( + uint64_t inputLength, + const uint8_t* input, + uint64_t outputLength, + uint8_t* output) override { + auto compressedSize = static_cast(inputLength); + auto uncompressedSize = static_cast(outputLength); + auto ret = qzDecompress( + &qzSession_, input, &compressedSize, output, &uncompressedSize); + if (ret == QZ_OK) { + return static_cast(uncompressedSize); + } else if (ret == QZ_PARAMS) { + VELOX_FAIL("QAT decompression failure: params is invalid"); + } else if (ret == QZ_FAIL) { + VELOX_FAIL("QAT decompression failure: Function did not succeed"); + } else { + VELOX_FAIL("QAT decompression failure with error: {}", ret); + } + } + + uint64_t maxCompressedLength(uint64_t inputLen) override { + return qzMaxCompressedLength(static_cast(inputLen), &qzSession_); + } + + uint64_t compress( + uint64_t inputLength, + const uint8_t* input, + uint64_t outputLength, + uint8_t* output) override { + auto uncompressedSize = static_cast(inputLength); + auto compressedSize = static_cast(outputLength); + auto ret = qzCompress( + &qzSession_, input, &uncompressedSize, output, &compressedSize, 1); + if (ret == QZ_OK) { + return static_cast(compressedSize); + } else if (ret == QZ_PARAMS) { + VELOX_FAIL("QAT compression failure: params is invalid"); + } else if (ret == QZ_FAIL) { + VELOX_FAIL("QAT compression failure: function did not succeed"); + } else { + VELOX_FAIL("QAT compression failure with error: {}", ret); + } + } + + std::shared_ptr makeCompressor() override { + VELOX_UNSUPPORTED("Streaming compression unsupported with QAT"); + } + + std::shared_ptr makeDecompressor() override { + VELOX_UNSUPPORTED("Streaming decompression unsupported with QAT"); + } + + int32_t compressionLevel() const override { + return compressionLevel_; + } + + CompressionKind compressionKind() const override { + return CompressionKind_GZIP; + } + + int32_t minimumCompressionLevel() const override { + return QZ_DEFLATE_COMP_LVL_MINIMUM; + } + int32_t maximumCompressionLevel() const override { + return QZ_DEFLATE_COMP_LVL_MAXIMUM; + } + int32_t defaultCompressionLevel() const override { + return QZ_COMP_LEVEL_DEFAULT; + } + + private: + int32_t compressionLevel_; + QzSession_T qzSession_ = {0}; +}; + +std::unique_ptr makeQatGzipCodec( + int32_t compressionLevel, + QzPollingMode_T pollingMode) { + return std::make_unique(compressionLevel, pollingMode); +} +} // namespace facebook::velox::common::qat \ No newline at end of file diff --git a/velox/common/compression/v2/qat/QatCompression.h b/velox/common/compression/v2/qat/QatCompression.h new file mode 100644 index 000000000000..c556b0ed1b31 --- /dev/null +++ b/velox/common/compression/v2/qat/QatCompression.h @@ -0,0 +1,38 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include +#include "velox/common/compression/v2/Compression.h" + +namespace facebook::velox::common::qat { + +class QatGzipCodecOptions : public CodecOptions { + public: + QatGzipCodecOptions( + int32_t compressionLevel = kUseDefaultCompressionLevel, + QzPollingMode_T pollingMode = QZ_BUSY_POLLING) + : CodecOptions(compressionLevel), pollingMode(pollingMode) {} + + QzPollingMode_T pollingMode; +}; + +std::unique_ptr makeQatGzipCodec( + int32_t compressionLevel = QZ_COMP_LEVEL_DEFAULT, + QzPollingMode_T pollingMode = QZ_BUSY_POLLING); + +} // namespace facebook::velox::common::qat \ No newline at end of file