From ea5785bf77034b2e1bdf04a5c65a60624a7f96b4 Mon Sep 17 00:00:00 2001 From: Rong Ma Date: Tue, 14 Nov 2023 09:06:07 +0000 Subject: [PATCH] use existing lzo decompression --- velox/common/compression/Compression.cpp | 5 +- velox/common/compression/Compression.h | 1 + velox/common/compression/v2/CMakeLists.txt | 2 + velox/common/compression/v2/Compression.cpp | 11 ++- .../common/compression/v2/GzipCompression.cpp | 3 +- .../common/compression/v2/LzoCompression.cpp | 97 +++++++++++++++++++ velox/common/compression/v2/LzoCompression.h | 75 ++++++++++++++ 7 files changed, 186 insertions(+), 8 deletions(-) create mode 100644 velox/common/compression/v2/LzoCompression.cpp create mode 100644 velox/common/compression/v2/LzoCompression.h diff --git a/velox/common/compression/Compression.cpp b/velox/common/compression/Compression.cpp index 779ed08a6d84..21b966922a0c 100644 --- a/velox/common/compression/Compression.cpp +++ b/velox/common/compression/Compression.cpp @@ -80,6 +80,8 @@ std::string compressionKindToString(CompressionKind kind) { return "lz4_raw"; case CompressionKind_LZ4HADOOP: return "lz4_hadoop"; + case CompressionKind_LZOHADOOP: + return "lzo_hadoop"; } return folly::to("unknown - ", kind); } @@ -95,7 +97,8 @@ CompressionKind stringToCompressionKind(const std::string& kind) { {"lz4", CompressionKind_LZ4}, {"gzip", CompressionKind_GZIP}, {"lz4_raw", CompressionKind_LZ4RAW}, - {"lz4_hadoop", CompressionKind_LZ4HADOOP}}; + {"lz4_hadoop", CompressionKind_LZ4HADOOP}, + {"lzo_hadoop", CompressionKind_LZOHADOOP}}; auto iter = stringToCompressionKindMap.find(kind); if (iter != stringToCompressionKindMap.end()) { return iter->second; diff --git a/velox/common/compression/Compression.h b/velox/common/compression/Compression.h index 072c59147edc..3e5191e83533 100644 --- a/velox/common/compression/Compression.h +++ b/velox/common/compression/Compression.h @@ -31,6 +31,7 @@ enum CompressionKind { CompressionKind_GZIP = 6, CompressionKind_LZ4RAW = 7, CompressionKind_LZ4HADOOP = 8, + CompressionKind_LZOHADOOP = 9, CompressionKind_MAX = INT64_MAX }; diff --git a/velox/common/compression/v2/CMakeLists.txt b/velox/common/compression/v2/CMakeLists.txt index fdb50e1b21a2..9f342f8791d4 100644 --- a/velox/common/compression/v2/CMakeLists.txt +++ b/velox/common/compression/v2/CMakeLists.txt @@ -22,11 +22,13 @@ add_library( GzipCompression.cpp HadoopCompressionFormat.cpp Lz4Compression.cpp + LzoCompression.cpp SnappyCompression.cpp ZstdCompression.cpp) target_link_libraries( velox_common_compression_v2 + velox_common_compression velox_common_base Folly::folly Snappy::snappy diff --git a/velox/common/compression/v2/Compression.cpp b/velox/common/compression/v2/Compression.cpp index 6cf15776a507..dcb01c324e55 100644 --- a/velox/common/compression/v2/Compression.cpp +++ b/velox/common/compression/v2/Compression.cpp @@ -23,6 +23,7 @@ #include "velox/common/base/Exceptions.h" #include "velox/common/compression/v2/GzipCompression.h" #include "velox/common/compression/v2/Lz4Compression.h" +#include "velox/common/compression/v2/LzoCompression.h" #include "velox/common/compression/v2/SnappyCompression.h" #include "velox/common/compression/v2/ZstdCompression.h" @@ -114,6 +115,8 @@ std::unique_ptr Codec::create( std::unique_ptr codec; switch (kind) { + case CompressionKind::CompressionKind_NONE: + return nullptr; case CompressionKind::CompressionKind_LZ4: codec = makeLz4FrameCodec(compressionLevel); break; @@ -147,14 +150,12 @@ std::unique_ptr Codec::create( case CompressionKind::CompressionKind_SNAPPY: codec = makeSnappyCodec(); break; + case CompressionKind::CompressionKind_LZO: + codec = makeLzoCodec(); default: break; } - if (codec == nullptr) { - VELOX_UNSUPPORTED("LZO codec not implemented"); - } - codec->init(); return codec; @@ -177,8 +178,8 @@ bool Codec::isAvailable(CompressionKind kind) { case CompressionKind::CompressionKind_ZLIB: case CompressionKind::CompressionKind_ZSTD: case CompressionKind::CompressionKind_SNAPPY: - return true; case CompressionKind::CompressionKind_LZO: + return true; default: return false; } diff --git a/velox/common/compression/v2/GzipCompression.cpp b/velox/common/compression/v2/GzipCompression.cpp index 943a98b03029..1a5b4140db8d 100644 --- a/velox/common/compression/v2/GzipCompression.cpp +++ b/velox/common/compression/v2/GzipCompression.cpp @@ -14,8 +14,8 @@ * limitations under the License. */ -#include "velox/common/base/Exceptions.h" #include "velox/common/compression/v2/GzipCompression.h" +#include "velox/common/base/Exceptions.h" namespace facebook::velox::common { @@ -520,4 +520,3 @@ std::unique_ptr makeZlibCodec( return makeGzipCodec(compressionLevel, GzipFormat::kZlib, windowBits); } } // namespace facebook::velox::common - diff --git a/velox/common/compression/v2/LzoCompression.cpp b/velox/common/compression/v2/LzoCompression.cpp new file mode 100644 index 000000000000..795c047a2d2b --- /dev/null +++ b/velox/common/compression/v2/LzoCompression.cpp @@ -0,0 +1,97 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/common/compression/v2/LzoCompression.h" +#include "velox/common/base/Exceptions.h" +#include "velox/common/compression/LzoDecompressor.h" + +namespace facebook::velox::common { + +LzoCodec::LzoCodec() = default; + +uint64_t LzoCodec::maxCompressedLength(uint64_t inputLength) { + VELOX_UNSUPPORTED("LZO compression is not supported."); +} + +uint64_t LzoCodec::compress( + uint64_t inputLength, + const uint8_t* input, + uint64_t outputLength, + uint8_t* output) { + VELOX_UNSUPPORTED("LZO compression is not supported."); +} + +uint64_t LzoCodec::decompress( + uint64_t inputLength, + const uint8_t* input, + uint64_t outputLength, + uint8_t* output) { + const char* inputAddress = + reinterpret_cast(const_cast(input)); + char* outputAddress = reinterpret_cast(output); + return velox::common::compression::lzoDecompress( + inputAddress, + inputAddress + inputLength, + outputAddress, + outputAddress + outputLength); +} + +std::shared_ptr LzoCodec::makeCompressor() { + VELOX_UNSUPPORTED("Streaming compression unsupported with LZO"); +} + +std::shared_ptr LzoCodec::makeDecompressor() { + VELOX_UNSUPPORTED("Streaming decompression unsupported with LZO"); +} + +CompressionKind LzoCodec::compressionKind() const { + return CompressionKind_LZO; +} + +int32_t LzoCodec::minimumCompressionLevel() const { + return kUseDefaultCompressionLevel; +} + +int32_t LzoCodec::maximumCompressionLevel() const { + return kUseDefaultCompressionLevel; +} + +int32_t LzoCodec::defaultCompressionLevel() const { + return kUseDefaultCompressionLevel; +} + +LzoHadoopCodec::LzoHadoopCodec() = default; + +CompressionKind LzoHadoopCodec::compressionKind() const { + return CompressionKind_LZOHADOOP; +} + +uint64_t LzoHadoopCodec::decompressInternal( + uint64_t inputLength, + const uint8_t* input, + uint64_t outputLength, + uint8_t* output) { + return LzoCodec::decompress(inputLength, input, outputLength, output); +} + +std::unique_ptr makeLzoCodec() { + return std::make_unique(); +} + +std::unique_ptr makeLzoHadoopCodec() { + return std::make_unique(); +} +} // namespace facebook::velox::common diff --git a/velox/common/compression/v2/LzoCompression.h b/velox/common/compression/v2/LzoCompression.h new file mode 100644 index 000000000000..36bd45a167de --- /dev/null +++ b/velox/common/compression/v2/LzoCompression.h @@ -0,0 +1,75 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include "velox/common/compression/v2/Compression.h" +#include "velox/common/compression/v2/HadoopCompressionFormat.h" + +namespace facebook::velox::common { + +class LzoCodec : public Codec { + public: + LzoCodec(); + + uint64_t maxCompressedLength(uint64_t inputLength) override; + + uint64_t compress( + uint64_t inputLength, + const uint8_t* input, + uint64_t outputLength, + uint8_t* output) override; + + uint64_t decompress( + uint64_t inputLength, + const uint8_t* input, + uint64_t outputLength, + uint8_t* output) override; + + std::shared_ptr makeCompressor() override; + + std::shared_ptr makeDecompressor() override; + + CompressionKind compressionKind() const override; + + int32_t minimumCompressionLevel() const override; + + int32_t maximumCompressionLevel() const override; + + int32_t defaultCompressionLevel() const override; +}; + +class LzoHadoopCodec : public LzoCodec, public HadoopCompressionFormat { + public: + LzoHadoopCodec(); + + CompressionKind compressionKind() const; + + private: + uint64_t decompressInternal( + uint64_t inputLength, + const uint8_t* input, + uint64_t outputLength, + uint8_t* output); +}; + +// Lzo format codec. +std::unique_ptr makeLzoCodec(); + +// Lzo "Hadoop" format codec. +std::unique_ptr makeLzoHadoopCodec(); +} // namespace facebook::velox::common