From b41f92647d400fc00cf56244dbc92cd00e1aa7df Mon Sep 17 00:00:00 2001 From: Rong Ma Date: Fri, 17 Nov 2023 07:28:42 +0000 Subject: [PATCH] add async codec --- velox/common/compression/v2/Compression.cpp | 48 +++++ velox/common/compression/v2/Compression.h | 26 ++- velox/common/compression/v2/GzipCompression.h | 29 +++ .../compression/v2/iaa/IaaCompression.cpp | 200 ++++++++++++++---- .../compression/v2/iaa/IaaCompression.h | 3 + 5 files changed, 269 insertions(+), 37 deletions(-) diff --git a/velox/common/compression/v2/Compression.cpp b/velox/common/compression/v2/Compression.cpp index 5d39b204f582e..bdc9fb0787ba7 100644 --- a/velox/common/compression/v2/Compression.cpp +++ b/velox/common/compression/v2/Compression.cpp @@ -139,6 +139,7 @@ std::unique_ptr Codec::create( if (auto opt = dynamic_cast(&codecOptions)) { codec = iaa::makeIaaGzipCodec(compressionLevel, opt->maxJobNumber); + break; } #endif codec = makeGzipCodec(compressionLevel); @@ -237,4 +238,51 @@ uint64_t Codec::compressPartial( uint8_t* output) { VELOX_UNSUPPORTED("'{}' doesn't support partial compression", name()); } + +bool AsyncCodec::isAvailable(CompressionKind kind) { + switch (kind) { + case CompressionKind::CompressionKind_NONE: + case CompressionKind::CompressionKind_GZIP: + return true; + default: + return false; + } +} + +std::unique_ptr AsyncCodec::create( + CompressionKind kind, + const CodecOptions& codecOptions) { + if (!isAvailable(kind)) { + auto name = compressionKindToString(kind); + if (folly::StringPiece({name}).startsWith("unknown")) { + VELOX_UNSUPPORTED("Unrecognized codec '{}'", name); + } + VELOX_UNSUPPORTED("Support for codec '{}' not implemented.", name); + } + + auto compressionLevel = codecOptions.compressionLevel; + if (compressionLevel != kUseDefaultCompressionLevel) { + checkSupportsCompressionLevel(kind); + } + + std::unique_ptr codec; + switch (kind) { + case CompressionKind::CompressionKind_NONE: + return nullptr; + case CompressionKind::CompressionKind_GZIP: +#ifdef VELOX_ENABLE_IAA + if (auto opt = + dynamic_cast(&codecOptions)) { + codec = iaa::makeIaaGzipAsyncCodec(compressionLevel, opt->maxJobNumber); + break; + } +#endif + return nullptr; + default: + VELOX_UNREACHABLE("Unknown compression kind: {}", kind); + } + + codec->init(); + return codec; +} } // namespace facebook::velox::common diff --git a/velox/common/compression/v2/Compression.h b/velox/common/compression/v2/Compression.h index d6ef89b9017c2..6654d4663e448 100644 --- a/velox/common/compression/v2/Compression.h +++ b/velox/common/compression/v2/Compression.h @@ -18,6 +18,7 @@ #pragma once +#include #include #include #include @@ -234,7 +235,7 @@ class Codec { return kUseDefaultCompressionLevel; } - private: + protected: /// Initializes the codec's resources. virtual void init(); @@ -243,4 +244,27 @@ class Codec { const uint8_t* input, std::optional uncompressedLength) const; }; + +class AsyncCodec : public virtual Codec { + public: + virtual ~AsyncCodec() override = default; + + static bool isAvailable(CompressionKind kind); + + std::unique_ptr create( + CompressionKind kind, + const CodecOptions& codecOptions); + + virtual folly::SemiFuture decompressAsync( + uint64_t inputLength, + const uint8_t* input, + uint64_t outputLength, + uint8_t* output) = 0; + + virtual folly::SemiFuture compressAsync( + uint64_t inputLength, + const uint8_t* input, + uint64_t outputLength, + uint8_t* output) = 0; +}; } // namespace facebook::velox::common diff --git a/velox/common/compression/v2/GzipCompression.h b/velox/common/compression/v2/GzipCompression.h index 48f841f08b64c..14cbf591d0626 100644 --- a/velox/common/compression/v2/GzipCompression.h +++ b/velox/common/compression/v2/GzipCompression.h @@ -125,6 +125,35 @@ class GzipCodec : public Codec { bool decompressorInitialized_{false}; }; +class DummyGzipAsyncCodec : public GzipCodec, public AsyncCodec { + public: + DummyGzipAsyncCodec() + : GzipCodec( + kGzipDefaultCompressionLevel, + GzipFormat::kGzip, + kGzip4KBWindowBits) {} + + folly::SemiFuture decompressAsync( + uint64_t srcLength, + const uint8_t* src, + uint64_t destLength, + uint8_t* dest) override { + return folly::makeSemiFuture().deferValue([&](auto&&) -> uint64_t { + return GzipCodec::decompress(srcLength, src, destLength, dest); + }); + } + + folly::SemiFuture compressAsync( + uint64_t srcLength, + const uint8_t* src, + uint64_t destLength, + uint8_t* dest) override { + return folly::makeSemiFuture().deferValue([&](auto&&) -> uint64_t { + return GzipCodec::compress(srcLength, src, destLength, dest); + }); + } +}; + std::unique_ptr makeGzipCodec( int compressionLevel = kGzipDefaultCompressionLevel, GzipFormat format = GzipFormat::kGzip, diff --git a/velox/common/compression/v2/iaa/IaaCompression.cpp b/velox/common/compression/v2/iaa/IaaCompression.cpp index fae7f23d97755..ea462933329db 100644 --- a/velox/common/compression/v2/iaa/IaaCompression.cpp +++ b/velox/common/compression/v2/iaa/IaaCompression.cpp @@ -49,20 +49,43 @@ class HardwareCodecDeflateQpl { explicit HardwareCodecDeflateQpl(qpl_compression_levels compressionLevel) : compressionLevel_(compressionLevel){}; - int64_t doCompressData( + qpl_job* acquireDecompressionJob( + uint32_t& jobId, uint32_t inputLength, const uint8_t* input, uint32_t outputLength, - uint8_t* output) const { - uint32_t job_id; + uint8_t* output) { qpl_job* jobPtr; - if (!(jobPtr = QplJobHWPool::getInstance().acquireJob(job_id))) { + if (!(jobPtr = QplJobHWPool::getInstance().acquireJob(jobId))) { VLOG(1) << "DeflateQpl HW codec failed, falling back to SW codec. " - << "(Details: doCompressData->AcquireJob fail, " + << "(Details: doDecompressData->AcquireJob fail, " << "probably job pool exhausted)"; - return kHwCodecError; + return nullptr; } + // Setup qpl decompression job. + jobPtr->op = qpl_op_decompress; + jobPtr->next_in_ptr = const_cast(input); + jobPtr->next_out_ptr = output; + jobPtr->available_in = inputLength; + jobPtr->available_out = outputLength; + jobPtr->flags = QPL_FLAG_FIRST | QPL_FLAG_LAST; + return jobPtr; + } + qpl_job* acquireCompressionJob( + uint32_t& jobId, + uint32_t inputLength, + const uint8_t* input, + uint32_t outputLength, + uint8_t* output) { + qpl_job* jobPtr; + if (!(jobPtr = QplJobHWPool::getInstance().acquireJob(jobId))) { + VLOG(1) << "DeflateQpl HW codec failed, falling back to SW codec. " + << "(Details: doCompressData->AcquireJob fail, " + << "probably job pool exhausted)"; + return nullptr; + } + // Setup qpl compression job. jobPtr->op = qpl_op_compress; jobPtr->next_in_ptr = const_cast(input); jobPtr->next_out_ptr = output; @@ -71,16 +94,45 @@ class HardwareCodecDeflateQpl { jobPtr->available_out = outputLength; jobPtr->flags = QPL_FLAG_FIRST | QPL_FLAG_DYNAMIC_HUFFMAN | QPL_FLAG_LAST | QPL_FLAG_OMIT_VERIFY; + return jobPtr; + } - if (auto status = qpl_execute_job(jobPtr); status == QPL_STS_OK) { - uint32_t compressedSize = jobPtr->total_out; - QplJobHWPool::getInstance().releaseJob(job_id); - return compressedSize; - } else { + int64_t doCompressData( + uint32_t inputLength, + const uint8_t* input, + uint32_t outputLength, + uint8_t* output) { + uint32_t jobId; + if (auto jobPtr = acquireCompressionJob( + jobId, inputLength, input, outputLength, output)) { + auto status = qpl_execute_job(jobPtr); + if (status == QPL_STS_OK) { + uint32_t compressedSize = jobPtr->total_out; + QplJobHWPool::getInstance().releaseJob(jobId); + return compressedSize; + } logHWFallback("doCompressData->qpl_execute_job", status); - QplJobHWPool::getInstance().releaseJob(job_id); + QplJobHWPool::getInstance().releaseJob(jobId); return kHwCodecError; } + return kHwCodecError; + } + + qpl_job* doCompressDataAsync( + uint32_t& jobId, + uint32_t inputLength, + const uint8_t* input, + uint32_t outputLength, + uint8_t* output) { + if (auto jobPtr = acquireCompressionJob( + jobId, inputLength, input, outputLength, output)) { + auto status = qpl_submit_job(jobPtr); + if (status == QPL_STS_OK) { + return jobPtr; + } + QplJobHWPool::getInstance().releaseJob(jobId); + } + return nullptr; } // Submit job request to the IAA hardware and then busy waiting till it @@ -90,32 +142,37 @@ class HardwareCodecDeflateQpl { const uint8_t* input, uint32_t outputLength, uint8_t* output) { - uint32_t job_id = 0; - qpl_job* jobPtr; - if (!(jobPtr = QplJobHWPool::getInstance().acquireJob(job_id))) { - VLOG(1) << "DeflateQpl HW codec failed, falling back to SW codec. " - << "(Details: doDecompressData->AcquireJob fail, " - << "probably job pool exhausted)"; + uint32_t jobId = 0; + if (auto jobPtr = acquireDecompressionJob( + jobId, inputLength, input, outputLength, output)) { + auto status = qpl_execute_job(jobPtr); + if (status == QPL_STS_OK) { + uint32_t decompressedSize = jobPtr->total_out; + QplJobHWPool::getInstance().releaseJob(jobId); + return decompressedSize; + } + logHWFallback("doDecompressData->qpl_execute_job", status); + QplJobHWPool::getInstance().releaseJob(jobId); return kHwCodecError; } + return kHwCodecError; + } - // Performing a decompression operation. - jobPtr->op = qpl_op_decompress; - jobPtr->next_in_ptr = const_cast(input); - jobPtr->next_out_ptr = output; - jobPtr->available_in = inputLength; - jobPtr->available_out = outputLength; - jobPtr->flags = QPL_FLAG_FIRST | QPL_FLAG_LAST; - - if (auto status = qpl_execute_job(jobPtr); status == QPL_STS_OK) { - uint32_t decompressedSize = jobPtr->total_out; - QplJobHWPool::getInstance().releaseJob(job_id); - return decompressedSize; - } else { - logHWFallback("doDecompressData->qpl_execute_job", status); - QplJobHWPool::getInstance().releaseJob(job_id); - return kHwCodecError; + qpl_job* doDecompressDataAsync( + uint32_t& jobId, + uint32_t inputLength, + const uint8_t* input, + uint32_t outputLength, + uint8_t* output) { + if (auto jobPtr = acquireDecompressionJob( + jobId, inputLength, input, outputLength, output)) { + auto status = qpl_submit_job(jobPtr); + if (status == QPL_STS_OK) { + return jobPtr; + } + QplJobHWPool::getInstance().releaseJob(jobId); } + return nullptr; } private: @@ -204,7 +261,7 @@ class SoftwareCodecDeflateQpl final { } }; -class QplGzipCodec : public Codec { +class QplGzipCodec : public virtual Codec { public: explicit QplGzipCodec(qpl_compression_levels compressionLevel) : hwCodec_(std::make_unique(compressionLevel)), @@ -273,11 +330,75 @@ class QplGzipCodec : public Codec { return qpl_default_level; } - private: + protected: std::unique_ptr hwCodec_; std::unique_ptr swCodec_; }; +class QplGzipAsyncCodec : public QplGzipCodec, public AsyncCodec { + public: + explicit QplGzipAsyncCodec(qpl_compression_levels compressionLevel) + : QplGzipCodec(compressionLevel) {} + + folly::SemiFuture decompressAsync( + uint64_t inputLength, + const uint8_t* input, + uint64_t outputLength, + uint8_t* output) override { + uint32_t jobId; + if (auto jobPtr = hwCodec_->doDecompressDataAsync( + jobId, inputLength, input, outputLength, output)) { + // If `qpl_submit_job` succeeds, add `qpl_wait_job` callback. + return folly::makeSemiFuture().deferValue([&](auto&&) -> uint64_t { + auto status = qpl_wait_job(jobPtr); + if (status == QPL_STS_OK) { + uint32_t decompressedSize = jobPtr->total_out; + QplJobHWPool::getInstance().releaseJob(jobId); + return static_cast(decompressedSize); + } + // If `qpl_wait_job` fails, falling back SW code path. + QplJobHWPool::getInstance().releaseJob(jobId); + logHWFallback("doDecompressData->qpl_submit_job", status); + auto swDecompressedLength = swCodec_->doDecompressData( + (uint32_t)inputLength, input, (uint32_t)outputLength, output); + return static_cast(swDecompressedLength); + }); + } + // If it fails, return SemiFuture ready with exception. + return folly::makeSemiFutureWith( + []() -> uint64_t { throw std::runtime_error("Cannot submit job."); }); + } + + folly::SemiFuture compressAsync( + uint64_t inputLength, + const uint8_t* input, + uint64_t outputLength, + uint8_t* output) { + uint32_t jobId; + if (auto jobPtr = hwCodec_->doCompressDataAsync( + jobId, inputLength, input, outputLength, output)) { + // If `qpl_submit_job` succeeds, add `qpl_wait_job` callback. + return folly::makeSemiFuture().deferValue([&](auto&&) -> uint64_t { + auto status = qpl_wait_job(jobPtr); + if (status == QPL_STS_OK) { + uint32_t compressedSize = jobPtr->total_out; + QplJobHWPool::getInstance().releaseJob(jobId); + return static_cast(compressedSize); + } + // If `qpl_wait_job` fails, falling back SW code path. + QplJobHWPool::getInstance().releaseJob(jobId); + logHWFallback("doDecompressData->qpl_wait_job", status); + auto swCompressedLength = swCodec_->doCompressData( + (uint32_t)inputLength, input, (uint32_t)outputLength, output); + return static_cast(swCompressedLength); + }); + } + // If it fails, return SemiFuture ready with exception. + return folly::makeSemiFutureWith( + []() -> uint64_t { throw std::runtime_error("Cannot submit job."); }); + } +}; + std::unique_ptr makeIaaGzipCodec( int32_t compressionLevel, uint32_t maxJobNumber) { @@ -286,4 +407,11 @@ std::unique_ptr makeIaaGzipCodec( static_cast(compressionLevel)); } +std::unique_ptr makeIaaGzipAsyncCodec( + int32_t compressionLevel, + uint32_t maxJobNumber) { + QplJobHWPool::initialize(maxJobNumber); + return std::make_unique( + static_cast(compressionLevel)); +} } // namespace facebook::velox::common::iaa diff --git a/velox/common/compression/v2/iaa/IaaCompression.h b/velox/common/compression/v2/iaa/IaaCompression.h index c55dd5c19205f..d155740891f70 100644 --- a/velox/common/compression/v2/iaa/IaaCompression.h +++ b/velox/common/compression/v2/iaa/IaaCompression.h @@ -39,4 +39,7 @@ std::unique_ptr makeIaaGzipCodec( int32_t compressionLevel = qpl_default_level, uint32_t maxJobNumber = kMaxQplJobNumber); +std::unique_ptr makeIaaGzipAsyncCodec( + int32_t compressionLevel = qpl_default_level, + uint32_t maxJobNumber = kMaxQplJobNumber); } // namespace facebook::velox::common::iaa