Skip to content

Commit

Permalink
use existing lzo decompression
Browse files Browse the repository at this point in the history
  • Loading branch information
marin-ma committed Nov 17, 2023
1 parent 3647955 commit ea5785b
Show file tree
Hide file tree
Showing 7 changed files with 186 additions and 8 deletions.
5 changes: 4 additions & 1 deletion velox/common/compression/Compression.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ std::string compressionKindToString(CompressionKind kind) {
return "lz4_raw";
case CompressionKind_LZ4HADOOP:
return "lz4_hadoop";
case CompressionKind_LZOHADOOP:
return "lzo_hadoop";
}
return folly::to<std::string>("unknown - ", kind);
}
Expand All @@ -95,7 +97,8 @@ CompressionKind stringToCompressionKind(const std::string& kind) {
{"lz4", CompressionKind_LZ4},
{"gzip", CompressionKind_GZIP},
{"lz4_raw", CompressionKind_LZ4RAW},
{"lz4_hadoop", CompressionKind_LZ4HADOOP}};
{"lz4_hadoop", CompressionKind_LZ4HADOOP},
{"lzo_hadoop", CompressionKind_LZOHADOOP}};
auto iter = stringToCompressionKindMap.find(kind);
if (iter != stringToCompressionKindMap.end()) {
return iter->second;
Expand Down
1 change: 1 addition & 0 deletions velox/common/compression/Compression.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ enum CompressionKind {
CompressionKind_GZIP = 6,
CompressionKind_LZ4RAW = 7,
CompressionKind_LZ4HADOOP = 8,
CompressionKind_LZOHADOOP = 9,
CompressionKind_MAX = INT64_MAX
};

Expand Down
2 changes: 2 additions & 0 deletions velox/common/compression/v2/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@ add_library(
GzipCompression.cpp
HadoopCompressionFormat.cpp
Lz4Compression.cpp
LzoCompression.cpp
SnappyCompression.cpp
ZstdCompression.cpp)

target_link_libraries(
velox_common_compression_v2
velox_common_compression
velox_common_base
Folly::folly
Snappy::snappy
Expand Down
11 changes: 6 additions & 5 deletions velox/common/compression/v2/Compression.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "velox/common/base/Exceptions.h"
#include "velox/common/compression/v2/GzipCompression.h"
#include "velox/common/compression/v2/Lz4Compression.h"
#include "velox/common/compression/v2/LzoCompression.h"
#include "velox/common/compression/v2/SnappyCompression.h"
#include "velox/common/compression/v2/ZstdCompression.h"

Expand Down Expand Up @@ -114,6 +115,8 @@ std::unique_ptr<Codec> Codec::create(

std::unique_ptr<Codec> codec;
switch (kind) {
case CompressionKind::CompressionKind_NONE:
return nullptr;
case CompressionKind::CompressionKind_LZ4:
codec = makeLz4FrameCodec(compressionLevel);
break;
Expand Down Expand Up @@ -147,14 +150,12 @@ std::unique_ptr<Codec> Codec::create(
case CompressionKind::CompressionKind_SNAPPY:
codec = makeSnappyCodec();
break;
case CompressionKind::CompressionKind_LZO:
codec = makeLzoCodec();
default:
break;
}

if (codec == nullptr) {
VELOX_UNSUPPORTED("LZO codec not implemented");
}

codec->init();

return codec;
Expand All @@ -177,8 +178,8 @@ bool Codec::isAvailable(CompressionKind kind) {
case CompressionKind::CompressionKind_ZLIB:
case CompressionKind::CompressionKind_ZSTD:
case CompressionKind::CompressionKind_SNAPPY:
return true;
case CompressionKind::CompressionKind_LZO:
return true;
default:
return false;
}
Expand Down
3 changes: 1 addition & 2 deletions velox/common/compression/v2/GzipCompression.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
* limitations under the License.
*/

#include "velox/common/base/Exceptions.h"
#include "velox/common/compression/v2/GzipCompression.h"
#include "velox/common/base/Exceptions.h"

namespace facebook::velox::common {

Expand Down Expand Up @@ -520,4 +520,3 @@ std::unique_ptr<Codec> makeZlibCodec(
return makeGzipCodec(compressionLevel, GzipFormat::kZlib, windowBits);
}
} // namespace facebook::velox::common

97 changes: 97 additions & 0 deletions velox/common/compression/v2/LzoCompression.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "velox/common/compression/v2/LzoCompression.h"
#include "velox/common/base/Exceptions.h"
#include "velox/common/compression/LzoDecompressor.h"

namespace facebook::velox::common {

LzoCodec::LzoCodec() = default;

uint64_t LzoCodec::maxCompressedLength(uint64_t inputLength) {
VELOX_UNSUPPORTED("LZO compression is not supported.");
}

uint64_t LzoCodec::compress(
uint64_t inputLength,
const uint8_t* input,
uint64_t outputLength,
uint8_t* output) {
VELOX_UNSUPPORTED("LZO compression is not supported.");
}

uint64_t LzoCodec::decompress(
uint64_t inputLength,
const uint8_t* input,
uint64_t outputLength,
uint8_t* output) {
const char* inputAddress =
reinterpret_cast<const char*>(const_cast<uint8_t*>(input));
char* outputAddress = reinterpret_cast<char*>(output);
return velox::common::compression::lzoDecompress(
inputAddress,
inputAddress + inputLength,
outputAddress,
outputAddress + outputLength);
}

std::shared_ptr<Compressor> LzoCodec::makeCompressor() {
VELOX_UNSUPPORTED("Streaming compression unsupported with LZO");
}

std::shared_ptr<Decompressor> LzoCodec::makeDecompressor() {
VELOX_UNSUPPORTED("Streaming decompression unsupported with LZO");
}

CompressionKind LzoCodec::compressionKind() const {
return CompressionKind_LZO;
}

int32_t LzoCodec::minimumCompressionLevel() const {
return kUseDefaultCompressionLevel;
}

int32_t LzoCodec::maximumCompressionLevel() const {
return kUseDefaultCompressionLevel;
}

int32_t LzoCodec::defaultCompressionLevel() const {
return kUseDefaultCompressionLevel;
}

LzoHadoopCodec::LzoHadoopCodec() = default;

CompressionKind LzoHadoopCodec::compressionKind() const {
return CompressionKind_LZOHADOOP;
}

uint64_t LzoHadoopCodec::decompressInternal(
uint64_t inputLength,
const uint8_t* input,
uint64_t outputLength,
uint8_t* output) {
return LzoCodec::decompress(inputLength, input, outputLength, output);
}

std::unique_ptr<Codec> makeLzoCodec() {
return std::make_unique<LzoCodec>();
}

std::unique_ptr<Codec> makeLzoHadoopCodec() {
return std::make_unique<LzoHadoopCodec>();
}
} // namespace facebook::velox::common
75 changes: 75 additions & 0 deletions velox/common/compression/v2/LzoCompression.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include <memory>
#include "velox/common/compression/v2/Compression.h"
#include "velox/common/compression/v2/HadoopCompressionFormat.h"

namespace facebook::velox::common {

class LzoCodec : public Codec {
public:
LzoCodec();

uint64_t maxCompressedLength(uint64_t inputLength) override;

uint64_t compress(
uint64_t inputLength,
const uint8_t* input,
uint64_t outputLength,
uint8_t* output) override;

uint64_t decompress(
uint64_t inputLength,
const uint8_t* input,
uint64_t outputLength,
uint8_t* output) override;

std::shared_ptr<Compressor> makeCompressor() override;

std::shared_ptr<Decompressor> makeDecompressor() override;

CompressionKind compressionKind() const override;

int32_t minimumCompressionLevel() const override;

int32_t maximumCompressionLevel() const override;

int32_t defaultCompressionLevel() const override;
};

class LzoHadoopCodec : public LzoCodec, public HadoopCompressionFormat {
public:
LzoHadoopCodec();

CompressionKind compressionKind() const;

private:
uint64_t decompressInternal(
uint64_t inputLength,
const uint8_t* input,
uint64_t outputLength,
uint8_t* output);
};

// Lzo format codec.
std::unique_ptr<Codec> makeLzoCodec();

// Lzo "Hadoop" format codec.
std::unique_ptr<Codec> makeLzoHadoopCodec();
} // namespace facebook::velox::common

0 comments on commit ea5785b

Please sign in to comment.