From 5a56d2f314ce0cc3882642bcc9c1caeeab50b813 Mon Sep 17 00:00:00 2001 From: Rong Ma Date: Fri, 17 Nov 2023 05:55:06 +0000 Subject: [PATCH] add IAA gzip codec --- CMake/Findqpl.cmake | 56 ++++ CMakeLists.txt | 5 + velox/CMakeLists.txt | 4 + velox/common/compression/v2/Compression.cpp | 13 +- .../common/compression/v2/iaa/CMakeLists.txt | 17 ++ .../compression/v2/iaa/IaaCompression.cpp | 289 ++++++++++++++++++ .../compression/v2/iaa/IaaCompression.h | 42 +++ .../common/compression/v2/iaa/QplJobPool.cpp | 156 ++++++++++ velox/common/compression/v2/iaa/QplJobPool.h | 88 ++++++ 9 files changed, 668 insertions(+), 2 deletions(-) create mode 100644 CMake/Findqpl.cmake create mode 100644 velox/common/compression/v2/iaa/CMakeLists.txt create mode 100644 velox/common/compression/v2/iaa/IaaCompression.cpp create mode 100644 velox/common/compression/v2/iaa/IaaCompression.h create mode 100644 velox/common/compression/v2/iaa/QplJobPool.cpp create mode 100644 velox/common/compression/v2/iaa/QplJobPool.h diff --git a/CMake/Findqpl.cmake b/CMake/Findqpl.cmake new file mode 100644 index 0000000000000..72836646334df --- /dev/null +++ b/CMake/Findqpl.cmake @@ -0,0 +1,56 @@ +# Copyright (c) Facebook, Inc. and its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +include(ExternalProject) + +macro(build_qpl) + message(STATUS "Building QPL from source") + set(QPL_BUILD_VERSION "v1.3.1") + set(QPL_BUILD_SHA256_CHECKSUM + "6ae537f9b84c222212e1ca8edaa275d1e1923d179a691353da38856ed8f4e5c4") + set(QPL_SOURCE_URL + "https://github.com/intel/qpl/archive/refs/tags/v1.3.1.tar.gz") + set(QPL_LIB_NAME "qpl") + + set(QPL_PREFIX "${CMAKE_CURRENT_BINARY_DIR}/qpl_ep-install") + set(QPL_SOURCE_DIR "${QPL_PREFIX}/src/qpl_ep") + set(QPL_INCLUDE_DIR "${QPL_PREFIX}/include") + set(QPL_LIB_DIR "${QPL_PREFIX}/lib") + set(QPL_STATIC_LIB_NAME + "${CMAKE_STATIC_LIBRARY_PREFIX}${QPL_LIB_NAME}${CMAKE_STATIC_LIBRARY_SUFFIX}${QPL_STATIC_LIB_MAJOR_VERSION}" + ) + set(QPL_STATIC_LIB_TARGETS "${QPL_LIB_DIR}/${QPL_STATIC_LIB_NAME}") + ExternalProject_Add( + qpl_ep + PREFIX ${QPL_PREFIX} + URL ${QPL_SOURCE_URL} + URL_HASH "SHA256=${QPL_BUILD_SHA256_CHECKSUM}" + SOURCE_DIR ${QPL_SOURCE_DIR} + CMAKE_ARGS -DCMAKE_INSTALL_PREFIX=${QPL_PREFIX} + -DCMAKE_BUILD_TYPE=${CMAKE_BUILD_TYPE} -DQPL_BUILD_TESTS=OFF + -DLOG_HW_INIT=ON + BUILD_BYPRODUCTS ${QPL_STATIC_LIB_TARGETS}) + + # The include directory must exist before it is referenced by a target. + file(MAKE_DIRECTORY "${QPL_INCLUDE_DIR}") + + add_library(qpl::qpl STATIC IMPORTED) + set_target_properties( + qpl::qpl + PROPERTIES IMPORTED_LOCATION "${QPL_LIB_DIR}/${QPL_STATIC_LIB_NAME}" + INTERFACE_INCLUDE_DIRECTORIES "${QPL_INCLUDE_DIR}") + + add_dependencies(qpl::qpl qpl_ep) +endmacro() + +build_qpl() diff --git a/CMakeLists.txt b/CMakeLists.txt index 31c14788aa1c6..8507b9ca23e24 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -265,6 +265,11 @@ if(VELOX_ENABLE_REMOTE_FUNCTIONS) find_package(FBThrift CONFIG REQUIRED) endif() +if(VELOX_ENABLE_IAA) + find_package(qpl REQUIRED) + add_definitions(-DVELOX_ENABLE_IAA) +endif() + # define processor variable for conditional compilation if(${VELOX_CODEGEN_SUPPORT}) add_compile_definitions(CODEGEN_ENABLED=1) diff --git a/velox/CMakeLists.txt b/velox/CMakeLists.txt index ea35aa1ea59ca..8957f6d4233bc 100644 --- a/velox/CMakeLists.txt +++ b/velox/CMakeLists.txt @@ -79,3 +79,7 @@ endif() if(${VELOX_ENABLE_SUBSTRAIT}) add_subdirectory(substrait) endif() + +if(${VELOX_ENABLE_IAA}) + add_subdirectory(common/compression/v2/iaa) +endif() diff --git a/velox/common/compression/v2/Compression.cpp b/velox/common/compression/v2/Compression.cpp index dcb01c324e557..5d39b204f582e 100644 --- a/velox/common/compression/v2/Compression.cpp +++ b/velox/common/compression/v2/Compression.cpp @@ -27,6 +27,10 @@ #include "velox/common/compression/v2/SnappyCompression.h" #include "velox/common/compression/v2/ZstdCompression.h" +#ifdef VELOX_ENABLE_IAA +#include "velox/common/compression/v2/iaa/IaaCompression.h" +#endif + namespace facebook::velox::common { namespace { @@ -127,11 +131,16 @@ std::unique_ptr Codec::create( codec = makeLz4HadoopRawCodec(); break; case CompressionKind::CompressionKind_GZIP: { - auto opt = dynamic_cast(&codecOptions); - if (opt) { + if (auto opt = dynamic_cast(&codecOptions)) { codec = makeGzipCodec(compressionLevel, opt->format, opt->windowBits); break; } +#ifdef VELOX_ENABLE_IAA + if (auto opt = + dynamic_cast(&codecOptions)) { + codec = iaa::makeIaaGzipCodec(compressionLevel, opt->maxJobNumber); + } +#endif codec = makeGzipCodec(compressionLevel); break; } diff --git a/velox/common/compression/v2/iaa/CMakeLists.txt b/velox/common/compression/v2/iaa/CMakeLists.txt new file mode 100644 index 0000000000000..0a41be0db2176 --- /dev/null +++ b/velox/common/compression/v2/iaa/CMakeLists.txt @@ -0,0 +1,17 @@ +# Copyright (c) Facebook, Inc. and its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +target_sources(velox_common_compression_v2 PRIVATE IaaCompression.cpp + QplJobPool.cpp) +target_link_libraries(velox_common_compression_v2 qpl::qpl) diff --git a/velox/common/compression/v2/iaa/IaaCompression.cpp b/velox/common/compression/v2/iaa/IaaCompression.cpp new file mode 100644 index 0000000000000..fae7f23d97755 --- /dev/null +++ b/velox/common/compression/v2/iaa/IaaCompression.cpp @@ -0,0 +1,289 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include +#include + +#include "velox/common/base/Exceptions.h" +#include "velox/common/compression/v2/iaa/IaaCompression.h" +#include "velox/common/compression/v2/iaa/QplJobPool.h" + +namespace facebook::velox::common::iaa { +namespace { +constexpr int64_t kHwCodecError = -1; + +void logHWFallback(const std::string& source, qpl_status status) { + VLOG(1) << "DeflateQpl hardware codec failed, " + << "falling back to software codec. " + << "(Details: " << source << " failed, status: " << status + << " - Please refer to qpl_status in " + << "./contrib/qpl/include/qpl/c_api/status.h)"; +} + +void qplSWError(const std::string& source, qpl_status status) { + VELOX_FAIL( + "Execution of DeflateQpl software fallback codec failed. " + "(Details: {} failed, status: {}" + " - Please refer to qpl_status in " + "./contrib/qpl/include/qpl/c_api/status.h)", + source, + status); +} +} // namespace + +class HardwareCodecDeflateQpl { + public: + // Stands for hardware codec fail, need fallback to software codec. + explicit HardwareCodecDeflateQpl(qpl_compression_levels compressionLevel) + : compressionLevel_(compressionLevel){}; + + int64_t doCompressData( + uint32_t inputLength, + const uint8_t* input, + uint32_t outputLength, + uint8_t* output) const { + uint32_t job_id; + qpl_job* jobPtr; + if (!(jobPtr = QplJobHWPool::getInstance().acquireJob(job_id))) { + VLOG(1) << "DeflateQpl HW codec failed, falling back to SW codec. " + << "(Details: doCompressData->AcquireJob fail, " + << "probably job pool exhausted)"; + return kHwCodecError; + } + + jobPtr->op = qpl_op_compress; + jobPtr->next_in_ptr = const_cast(input); + jobPtr->next_out_ptr = output; + jobPtr->available_in = inputLength; + jobPtr->level = compressionLevel_; + jobPtr->available_out = outputLength; + jobPtr->flags = QPL_FLAG_FIRST | QPL_FLAG_DYNAMIC_HUFFMAN | QPL_FLAG_LAST | + QPL_FLAG_OMIT_VERIFY; + + if (auto status = qpl_execute_job(jobPtr); status == QPL_STS_OK) { + uint32_t compressedSize = jobPtr->total_out; + QplJobHWPool::getInstance().releaseJob(job_id); + return compressedSize; + } else { + logHWFallback("doCompressData->qpl_execute_job", status); + QplJobHWPool::getInstance().releaseJob(job_id); + return kHwCodecError; + } + } + + // Submit job request to the IAA hardware and then busy waiting till it + // complete. + int64_t doDecompressData( + uint32_t inputLength, + const uint8_t* input, + uint32_t outputLength, + uint8_t* output) { + uint32_t job_id = 0; + qpl_job* jobPtr; + if (!(jobPtr = QplJobHWPool::getInstance().acquireJob(job_id))) { + VLOG(1) << "DeflateQpl HW codec failed, falling back to SW codec. " + << "(Details: doDecompressData->AcquireJob fail, " + << "probably job pool exhausted)"; + return kHwCodecError; + } + + // Performing a decompression operation. + jobPtr->op = qpl_op_decompress; + jobPtr->next_in_ptr = const_cast(input); + jobPtr->next_out_ptr = output; + jobPtr->available_in = inputLength; + jobPtr->available_out = outputLength; + jobPtr->flags = QPL_FLAG_FIRST | QPL_FLAG_LAST; + + if (auto status = qpl_execute_job(jobPtr); status == QPL_STS_OK) { + uint32_t decompressedSize = jobPtr->total_out; + QplJobHWPool::getInstance().releaseJob(job_id); + return decompressedSize; + } else { + logHWFallback("doDecompressData->qpl_execute_job", status); + QplJobHWPool::getInstance().releaseJob(job_id); + return kHwCodecError; + } + } + + private: + qpl_compression_levels compressionLevel_ = qpl_default_level; +}; + +class SoftwareCodecDeflateQpl final { + public: + explicit SoftwareCodecDeflateQpl(qpl_compression_levels compressionLevel) + : compressionLevel_(compressionLevel){}; + + ~SoftwareCodecDeflateQpl() { + if (swJob) { + qpl_fini_job(swJob); + } + } + + int64_t doCompressData( + uint32_t inputLength, + const uint8_t* input, + uint32_t outputLength, + uint8_t* output) { + qpl_job* jobPtr = getJobCodecPtr(); + // Performing a compression operation + jobPtr->op = qpl_op_compress; + jobPtr->next_in_ptr = const_cast(input); + jobPtr->next_out_ptr = output; + jobPtr->available_in = inputLength; + jobPtr->level = compressionLevel_; + jobPtr->available_out = outputLength; + jobPtr->flags = QPL_FLAG_FIRST | QPL_FLAG_DYNAMIC_HUFFMAN | QPL_FLAG_LAST | + QPL_FLAG_OMIT_VERIFY; + + if (auto status = qpl_execute_job(jobPtr); status != QPL_STS_OK) { + qplSWError("doCompressData->qpl_execute_job", status); + } + + return jobPtr->total_out; + } + + int64_t doDecompressData( + uint32_t inputLength, + const uint8_t* input, + uint32_t outputLength, + uint8_t* output) { + qpl_job* jobPtr = getJobCodecPtr(); + + // Performing a decompression operation + jobPtr->op = qpl_op_decompress; + jobPtr->next_in_ptr = const_cast(input); + jobPtr->next_out_ptr = output; + jobPtr->available_in = inputLength; + jobPtr->available_out = outputLength; + jobPtr->flags = QPL_FLAG_FIRST | QPL_FLAG_LAST; + + if (auto status = qpl_execute_job(jobPtr); status != QPL_STS_OK) { + qplSWError("doDecompressData->qpl_execute_job", status); + } + return jobPtr->total_out; + } + + private: + qpl_job* swJob = nullptr; + std::unique_ptr sw_buffer; + qpl_compression_levels compressionLevel_ = qpl_default_level; + + qpl_job* getJobCodecPtr() { + if (!swJob) { + uint32_t size = 0; + qpl_get_job_size(qpl_path_software, &size); + + sw_buffer = std::make_unique(size); + swJob = reinterpret_cast(sw_buffer.get()); + + // Job initialization + if (auto status = qpl_init_job(qpl_path_software, swJob); + status != QPL_STS_OK) + VELOX_FAIL( + "Initialization of DeflateQpl software fallback codec failed. " + "(Details: qpl_init_job with error code: {}" + " - Please refer to qpl_status in " + "./contrib/qpl/include/qpl/c_api/status.h)", + status); + } + return swJob; + } +}; + +class QplGzipCodec : public Codec { + public: + explicit QplGzipCodec(qpl_compression_levels compressionLevel) + : hwCodec_(std::make_unique(compressionLevel)), + swCodec_(std::make_unique(compressionLevel)) {} + + uint64_t compress( + uint64_t inputLength, + const uint8_t* input, + uint64_t outputLength, + uint8_t* output) override { + auto ret = kHwCodecError; + if (QplJobHWPool::getInstance().jobPoolReady()) { + ret = hwCodec_->doCompressData( + (uint32_t)inputLength, input, (uint32_t)outputLength, output); + } + if (ret == kHwCodecError) { + return static_cast(swCodec_->doCompressData( + (uint32_t)inputLength, input, (uint32_t)outputLength, output)); + } + return static_cast(ret); + } + + uint64_t decompress( + uint64_t inputLength, + const uint8_t* input, + uint64_t outputLength, + uint8_t* output) override { + auto ret = kHwCodecError; + if (QplJobHWPool::getInstance().jobPoolReady()) { + ret = hwCodec_->doDecompressData( + (uint32_t)inputLength, input, (uint32_t)outputLength, output); + } + if (ret == kHwCodecError) { + return swCodec_->doDecompressData( + (uint32_t)inputLength, input, (uint32_t)outputLength, output); + } + return ret; + } + + uint64_t maxCompressedLength(uint64_t inputLength) override { + // Aligned with ZLIB. + return ( + (inputLength) + ((inputLength) >> 12) + ((inputLength) >> 14) + + ((inputLength) >> 25) + 13); + } + + std::shared_ptr makeCompressor() override { + VELOX_UNSUPPORTED("Streaming compression unsupported with QAT"); + } + + std::shared_ptr makeDecompressor() override { + VELOX_UNSUPPORTED("Streaming decompression unsupported with QAT"); + } + + CompressionKind compressionKind() const override { + return CompressionKind::CompressionKind_GZIP; + } + + int32_t minimumCompressionLevel() const override { + return qpl_level_1; + } + int32_t maximumCompressionLevel() const override { + return qpl_high_level; + } + int32_t defaultCompressionLevel() const override { + return qpl_default_level; + } + + private: + std::unique_ptr hwCodec_; + std::unique_ptr swCodec_; +}; + +std::unique_ptr makeIaaGzipCodec( + int32_t compressionLevel, + uint32_t maxJobNumber) { + QplJobHWPool::initialize(maxJobNumber); + return std::make_unique( + static_cast(compressionLevel)); +} + +} // namespace facebook::velox::common::iaa diff --git a/velox/common/compression/v2/iaa/IaaCompression.h b/velox/common/compression/v2/iaa/IaaCompression.h new file mode 100644 index 0000000000000..c55dd5c19205f --- /dev/null +++ b/velox/common/compression/v2/iaa/IaaCompression.h @@ -0,0 +1,42 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +#include +#include "velox/common/compression/v2/Compression.h" +#include "velox/common/compression/v2/iaa/QplJobPool.h" + +namespace facebook::velox::common::iaa { + +class IaaGzipCodecOptions : public CodecOptions { + public: + IaaGzipCodecOptions( + int32_t compressionLevel, + uint32_t maxJobNumber = kMaxQplJobNumber) + : CodecOptions(compressionLevel), maxJobNumber(maxJobNumber) {} + + uint32_t maxJobNumber; +}; + +std::unique_ptr makeIaaGzipCodec( + int32_t compressionLevel = qpl_default_level, + uint32_t maxJobNumber = kMaxQplJobNumber); + +} // namespace facebook::velox::common::iaa diff --git a/velox/common/compression/v2/iaa/QplJobPool.cpp b/velox/common/compression/v2/iaa/QplJobPool.cpp new file mode 100644 index 0000000000000..2cb51264dcfcf --- /dev/null +++ b/velox/common/compression/v2/iaa/QplJobPool.cpp @@ -0,0 +1,156 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include "velox/common/base/Exceptions.h" +#include "velox/common/compression/v2/iaa/QplJobPool.h" + +namespace facebook::velox::common::iaa { + +namespace { +inline void checkJobIndex(uint32_t index) { + auto maxJobNumber = QplJobHWPool::getInstance().maxJobNumber(); + if (index >= maxJobNumber) { + VELOX_FAIL( + "Job index {} overflow. Must be in range (0, {})", index, maxJobNumber); + } +} +} // namespace + +uint32_t QplJobHWPool::maxJobNumber_; +std::once_flag QplJobHWPool::initFlag_; +bool QplJobHWPool::initialized_; + +void QplJobHWPool::initialize(uint32_t maxJobNumber) { + std::call_once(initFlag_, [&] { + if (maxJobNumber > kMaxQplJobNumber) { + VELOX_FAIL( + "Job number {} exceeds maximum job number {}", + maxJobNumber, + kMaxQplJobNumber); + } + maxJobNumber_ = maxJobNumber; + initialized_ = true; + }); +} + +QplJobHWPool& QplJobHWPool::getInstance() { + std::call_once(initFlag_, [] { + if (!initialized_) { + VELOX_FAIL("QplJobHWPool not initialized"); + } + }); + static QplJobHWPool pool; + return pool; +} + +QplJobHWPool::QplJobHWPool() + : randomEngine_(std::random_device()()), + distribution_(0, maxJobNumber_ - 1) { + initJobPool(); +} + +QplJobHWPool::~QplJobHWPool() { + for (auto i = 0; i < maxJobNumber_; ++i) { + if (jobPool_[i]) { + while (!tryLockJob(i)) + ; + qpl_fini_job(jobPool_[i]); + unLockJob(i); + jobPool_[i] = nullptr; + } + } +} + +void QplJobHWPool::initJobPool() { + distribution_ = std::uniform_int_distribution(0, maxJobNumber_ - 1); + + uint32_t jobSize = 0; + const char* qpl_version = qpl_get_library_version(); + + // Get size required for saving a single qpl job object. + qpl_get_job_size(qpl_path_hardware, &jobSize); + // Allocate entire buffer for storing all job objects. + hwJobsBuffer_.resize(jobSize * maxJobNumber_); + // Initialize pool for storing all job object pointers. + // Reallocate buffer by shifting address offset for each job object. + for (uint32_t index = 0; index < maxJobNumber_; ++index) { + qpl_job* qplJobPtr = + reinterpret_cast(hwJobsBuffer_[index * jobSize]); + if (auto status = qpl_init_job(qpl_path_hardware, qplJobPtr); + status != QPL_STS_OK) { + jobPoolReady_ = false; + VLOG(1) + << "Initialization of hardware-assisted Qpl codec failed at index: " + << index << ", falling back to SW codec. " + << "(Details: QplJobHWPool->qpl_init_job failed, status: " << status + << " - Please refer to qpl_status in " + << "./contrib/qpl/include/qpl/c_api/status.h). " + << "Please check if Intel In-Memory Analytics Accelerator (IAA) " + << "is properly set up. QPL Version: " << qpl_version; + return; + } + jobPool_[index] = qplJobPtr; + jobLocks_[index].store(false); + } + jobPoolReady_ = true; +} + +qpl_job* QplJobHWPool::acquireJob(uint32_t& jobId) { + if (!jobPoolReady()) { + return nullptr; + } + uint32_t retry = 0; + auto index = distribution_(randomEngine_); + while (!tryLockJob(index)) { + index = distribution_(randomEngine_); + retry++; + if (retry > maxJobNumber_) { + return nullptr; + } + } + jobId = maxJobNumber_ - index; + return jobPool_[index]; +} + +void QplJobHWPool::releaseJob(uint32_t jobId) { + if (jobPoolReady()) { + auto index = maxJobNumber_ - jobId; + unLockJob(index); + } +} + +bool QplJobHWPool::tryLockJob(uint32_t index) { + checkJobIndex(index); + bool expected = false; + return jobLocks_[index].compare_exchange_strong(expected, true); +} + +void QplJobHWPool::unLockJob(uint32_t index) { + checkJobIndex(index); + jobLocks_[index].store(false); +} + +const bool& QplJobHWPool::jobPoolReady() const { + return jobPoolReady_; +} + +uint32_t QplJobHWPool::maxJobNumber() { + return maxJobNumber_; +} + +} // namespace facebook::velox::common::iaa diff --git a/velox/common/compression/v2/iaa/QplJobPool.h b/velox/common/compression/v2/iaa/QplJobPool.h new file mode 100644 index 0000000000000..96208548bdb36 --- /dev/null +++ b/velox/common/compression/v2/iaa/QplJobPool.h @@ -0,0 +1,88 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +#include + +namespace facebook::velox::common::iaa { + +static constexpr auto kMaxQplJobNumber = 1024; + +/// QplJobHWPool is resource pool to provide the job objects, which is +/// used for storing context information during. +/// Memory for QPL job will be allocated when the QPLJobHWPool instance is +/// created. +/// QPL job can offload RLE-decoding/Filter/(De)compression works to hardware +/// accelerator. +class QplJobHWPool { + public: + static QplJobHWPool& getInstance(); + + /// Setup maximum job number. + /// Must be called before first time calling getInstance(). + static void initialize(uint32_t maxJobNumber); + + /// Acquire QPL job. + /// \param jobId QPL job id, used when release QPL job + /// \return Pointer to the QPL job. If failed, return nullptr. + qpl_job* acquireJob(uint32_t& jobId); + + // Release QPL job by the jobId. + void releaseJob(uint32_t jobId); + + // Return if the QPL job is allocated successfully. + const bool& jobPoolReady() const; + + uint32_t maxJobNumber(); + + // Deleted copy constructor and assignment operator. + QplJobHWPool(const QplJobHWPool&) = delete; + QplJobHWPool& operator=(const QplJobHWPool&) = delete; + + private: + // Max jobs in jobPool. Entire buffer for storing all job objects. + std::vector hwJobsBuffer_; + // Job pool for storing all job object pointers. + std::array jobPool_; + // Locks for accessing each job object pointers. + std::array jobLocks_; + + bool jobPoolReady_{false}; + std::mt19937 randomEngine_; + std::uniform_int_distribution distribution_; + + static uint32_t maxJobNumber_; + static std::once_flag initFlag_; + static bool initialized_; + + QplJobHWPool(); + ~QplJobHWPool(); + + void initJobPool(); + bool tryLockJob(uint32_t index); + void unLockJob(uint32_t index); +}; + +} // namespace facebook::velox::common::iaa