Skip to content

Commit

Permalink
add QAT gzip codec
Browse files Browse the repository at this point in the history
  • Loading branch information
marin-ma committed Nov 16, 2023
1 parent e2ce69d commit e1a5423
Show file tree
Hide file tree
Showing 7 changed files with 294 additions and 3 deletions.
91 changes: 91 additions & 0 deletions CMake/FindQAT.cmake
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
# Copyright (c) Facebook, Inc. and its affiliates.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
include_guard(GLOBAL)

include(ExternalProject)

macro(build_qatzip)
message(STATUS "Building QATzip from source")
set(QATZIP_BUILD_VERSION "v1.1.2")
set(QATZIP_BUILD_SHA256_CHECKSUM "31419fa4b42d217b3e55a70a34545582cbf401a4f4d44738d21b4a3944b1e1ef")
set(QATZIP_SOURCE_URL
"https://github.com/intel/QATzip/archive/refs/tags/${QATZIP_BUILD_VERSION}.tar.gz")
set(QATZIP_LIB_NAME "qatzip")

set(QATZIP_PREFIX
"${CMAKE_CURRENT_BINARY_DIR}/qatzip_ep-install")
set(QATZIP_SOURCE_DIR "${QATZIP_PREFIX}/src/qatzip_ep")
set(QATZIP_INCLUDE_DIR "${QATZIP_SOURCE_DIR}/include")
set(QATZIP_STATIC_LIB_NAME "${CMAKE_STATIC_LIBRARY_PREFIX}${QATZIP_LIB_NAME}${CMAKE_STATIC_LIBRARY_SUFFIX}")
set(QATZIP_STATIC_LIB_TARGETS
"${QATZIP_SOURCE_DIR}/src/.libs/${QATZIP_STATIC_LIB_NAME}")
set(QATZIP_CONFIGURE_ARGS
"--prefix=${QATZIP_PREFIX}"
"--with-pic"
"--with-ICP_ROOT=${ICP_ROOT}")

ExternalProject_Add(qatzip_ep
PREFIX ${QATZIP_PREFIX}
URL ${QATZIP_SOURCE_URL}
URL_HASH "SHA256=${QATZIP_BUILD_SHA256_CHECKSUM}"
SOURCE_DIR ${QATZIP_SOURCE_DIR}
CONFIGURE_COMMAND ${CMAKE_COMMAND} -E env QZ_ROOT=${QATZIP_SOURCE_DIR} ./configure ${QATZIP_CONFIGURE_ARGS}
BUILD_COMMAND ${MAKE_PROGRAM} all
BUILD_BYPRODUCTS ${QATZIP_STATIC_LIB_TARGETS}
BUILD_IN_SOURCE 1)

ExternalProject_Add_Step(qatzip_ep pre-configure
COMMAND ./autogen.sh
DEPENDEES download
DEPENDERS configure
WORKING_DIRECTORY ${QATZIP_SOURCE_DIR})

# The include directory must exist before it is referenced by a target.
file(MAKE_DIRECTORY "${QATZIP_INCLUDE_DIR}")

set(QATZIP_LINK_LIBRARIES
ZLIB::ZLIB
lz4::lz4
"${UDEV_LIBRARY}"
"${USDM_DRV_LIBRARY}"
"${QAT_S_LIBRARY}"
Threads::Threads)

add_library(qatzip::qatzip STATIC IMPORTED)
set_target_properties(qatzip::qatzip
PROPERTIES IMPORTED_LOCATION
"${QATZIP_STATIC_LIB_TARGETS}"
INTERFACE_INCLUDE_DIRECTORIES
"${QATZIP_INCLUDE_DIR}"
INTERFACE_LINK_LIBRARIES
"${QATZIP_LINK_LIBRARIES}")

add_dependencies(qatzip::qatzip qatzip_ep)
endmacro()

set(ICP_ROOT $ENV{ICP_ROOT})
set(THREADS_PREFER_PTHREAD_FLAG ON)

find_package(Threads REQUIRED)
find_program(MAKE_PROGRAM make REQUIRED)

find_library(UDEV_LIBRARY REQUIRED NAMES udev)
find_library(USDM_DRV_LIBRARY REQUIRED NAMES usdm_drv_s PATHS "${ICP_ROOT}/build" NO_DEFAULT_PATH)
find_library(QAT_S_LIBRARY REQUIRED NAMES qat_s PATHS "${ICP_ROOT}/build" NO_DEFAULT_PATH)

message(STATUS "Found udev: ${UDEV_LIBRARY}")
message(STATUS "Found usdm_drv: ${USDM_DRV_LIBRARY}")
message(STATUS "Found qat_s: ${QAT_S_LIBRARY}")

build_qatzip()
6 changes: 6 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ option(VELOX_ENABLE_PARQUET "Enable Parquet support" OFF)
option(VELOX_ENABLE_ARROW "Enable Arrow support" OFF)
option(VELOX_ENABLE_REMOTE_FUNCTIONS "Enable remote function support" OFF)
option(VELOX_ENABLE_CCACHE "Use ccache if installed." ON)
option(VELOX_ENABLE_QAT "Enable Intel QuickAssist Technology support" OFF)

option(VELOX_BUILD_TEST_UTILS "Builds Velox test utilities" OFF)
option(VELOX_BUILD_PYTHON_PACKAGE "Builds Velox Python bindings" OFF)
Expand Down Expand Up @@ -257,6 +258,11 @@ if(VELOX_ENABLE_PARQUET)
set(VELOX_ENABLE_ARROW ON)
endif()

if(VELOX_ENABLE_QAT)
find_package(QAT REQUIRED)
add_definitions(-DVELOX_ENABLE_QAT)
endif()

if(VELOX_ENABLE_REMOTE_FUNCTIONS)
# TODO: Move this to use resolve_dependency(). For some reason, FBThrift
# requires clients to explicitly install fizz and wangle.
Expand Down
4 changes: 4 additions & 0 deletions velox/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -79,3 +79,7 @@ endif()
if(${VELOX_ENABLE_SUBSTRAIT})
add_subdirectory(substrait)
endif()

if(${VELOX_ENABLE_QAT})
add_subdirectory(common/compression/v2/qat)
endif()
15 changes: 12 additions & 3 deletions velox/common/compression/v2/Compression.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,17 @@
#include "velox/common/compression/v2/Compression.h"
#include <memory>
#include <string>
#include <utility>
#include "velox/common/base/Exceptions.h"
#include "velox/common/compression/v2/GzipCompression.h"
#include "velox/common/compression/v2/Lz4Compression.h"
#include "velox/common/compression/v2/LzoCompression.h"
#include "velox/common/compression/v2/SnappyCompression.h"
#include "velox/common/compression/v2/ZstdCompression.h"

#ifdef VELOX_ENABLE_QAT
#include "velox/common/compression/v2/qat/QatCompression.h"
#endif

namespace facebook::velox::common {

namespace {
Expand Down Expand Up @@ -127,11 +130,17 @@ std::unique_ptr<Codec> Codec::create(
codec = makeLz4HadoopRawCodec();
break;
case CompressionKind::CompressionKind_GZIP: {
auto opt = dynamic_cast<const GzipCodecOptions*>(&codecOptions);
if (opt) {
if (auto opt = dynamic_cast<const GzipCodecOptions*>(&codecOptions)) {
codec = makeGzipCodec(compressionLevel, opt->format, opt->windowBits);
break;
}
#ifdef VELOX_ENABLE_QAT
if (auto opt =
dynamic_cast<const qat::QatGzipCodecOptions*>(&codecOptions)) {
codec = qat::makeQatGzipCodec(compressionLevel, opt->pollingMode);
break;
}
#endif
codec = makeGzipCodec(compressionLevel);
break;
}
Expand Down
16 changes: 16 additions & 0 deletions velox/common/compression/v2/qat/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Copyright (c) Facebook, Inc. and its affiliates.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

target_sources(velox_common_compression_v2 PRIVATE QatCompression.cpp)
target_link_libraries(velox_common_compression_v2 qatzip::qatzip)
127 changes: 127 additions & 0 deletions velox/common/compression/v2/qat/QatCompression.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "velox/common/compression/v2/qat/QatCompression.h"
#include "velox/common/base/Exceptions.h"

namespace facebook::velox::common::qat {

#define QZ_INIT_FAIL(rc) (QZ_OK != rc && QZ_DUPLICATE != rc)

#define QZ_SETUP_SESSION_FAIL(rc) \
(QZ_PARAMS == rc || QZ_NOSW_NO_HW == rc || QZ_NOSW_LOW_MEM == rc)

class QatGZipCodec final : public Codec {
public:
QatGZipCodec(int32_t compressionLevel, QzPollingMode_T pollingMode)
: compressionLevel_(compressionLevel) {
auto rc = qzInit(&qzSession_, /* sw_backup = */ 1);
if (QZ_INIT_FAIL(rc)) {
VLOG(1) << "QAT hardware init failed with error: " << rc;
} else {
QzSessionParamsDeflate_T params;
// Get the default parameters.
qzGetDefaultsDeflate(&params);
params.common_params.polling_mode = pollingMode;
params.common_params.comp_lvl = compressionLevel;
rc = qzSetupSessionDeflate(&qzSession_, &params);
if (QZ_SETUP_SESSION_FAIL(rc)) {
VLOG(1) << "QAT setup session failed with error: " << rc;
}
}
}

uint64_t decompress(
uint64_t inputLength,
const uint8_t* input,
uint64_t outputLength,
uint8_t* output) override {
auto compressedSize = static_cast<uint32_t>(inputLength);
auto uncompressedSize = static_cast<uint32_t>(outputLength);
auto ret = qzDecompress(
&qzSession_, input, &compressedSize, output, &uncompressedSize);
if (ret == QZ_OK) {
return static_cast<uint64_t>(uncompressedSize);
} else if (ret == QZ_PARAMS) {
VELOX_FAIL("QAT decompression failure: params is invalid");
} else if (ret == QZ_FAIL) {
VELOX_FAIL("QAT decompression failure: Function did not succeed");
} else {
VELOX_FAIL("QAT decompression failure with error: {}", ret);
}
}

uint64_t maxCompressedLength(uint64_t inputLen) override {
return qzMaxCompressedLength(static_cast<size_t>(inputLen), &qzSession_);
}

uint64_t compress(
uint64_t inputLength,
const uint8_t* input,
uint64_t outputLength,
uint8_t* output) override {
auto uncompressedSize = static_cast<uint32_t>(inputLength);
auto compressedSize = static_cast<uint32_t>(outputLength);
auto ret = qzCompress(
&qzSession_, input, &uncompressedSize, output, &compressedSize, 1);
if (ret == QZ_OK) {
return static_cast<uint64_t>(compressedSize);
} else if (ret == QZ_PARAMS) {
VELOX_FAIL("QAT compression failure: params is invalid");
} else if (ret == QZ_FAIL) {
VELOX_FAIL("QAT compression failure: function did not succeed");
} else {
VELOX_FAIL("QAT compression failure with error: {}", ret);
}
}

std::shared_ptr<Compressor> makeCompressor() override {
VELOX_UNSUPPORTED("Streaming compression unsupported with QAT");
}

std::shared_ptr<Decompressor> makeDecompressor() override {
VELOX_UNSUPPORTED("Streaming decompression unsupported with QAT");
}

int32_t compressionLevel() const override {
return compressionLevel_;
}

CompressionKind compressionKind() const override {
return CompressionKind_GZIP;
}

int32_t minimumCompressionLevel() const override {
return QZ_DEFLATE_COMP_LVL_MINIMUM;
}
int32_t maximumCompressionLevel() const override {
return QZ_DEFLATE_COMP_LVL_MAXIMUM;
}
int32_t defaultCompressionLevel() const override {
return QZ_COMP_LEVEL_DEFAULT;
}

private:
int32_t compressionLevel_;
QzSession_T qzSession_ = {0};
};

std::unique_ptr<Codec> makeQatGzipCodec(
int32_t compressionLevel,
QzPollingMode_T pollingMode) {
return std::make_unique<QatGZipCodec>(compressionLevel, pollingMode);
}
} // namespace facebook::velox::common::qat
38 changes: 38 additions & 0 deletions velox/common/compression/v2/qat/QatCompression.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once

#include <qatzip.h>
#include <vector>
#include "velox/common/compression/v2/Compression.h"

namespace facebook::velox::common::qat {

class QatGzipCodecOptions : public CodecOptions {
public:
QatGzipCodecOptions(
int32_t compressionLevel = kUseDefaultCompressionLevel,
QzPollingMode_T pollingMode = QZ_BUSY_POLLING)
: CodecOptions(compressionLevel), pollingMode(pollingMode) {}

QzPollingMode_T pollingMode;
};

std::unique_ptr<Codec> makeQatGzipCodec(
int32_t compressionLevel = QZ_COMP_LEVEL_DEFAULT,
QzPollingMode_T pollingMode = QZ_BUSY_POLLING);

} // namespace facebook::velox::common::qat

0 comments on commit e1a5423

Please sign in to comment.