Skip to content

Commit

Permalink
add snappy codec
Browse files Browse the repository at this point in the history
  • Loading branch information
marin-ma committed Nov 16, 2023
1 parent a5aa603 commit 3647955
Show file tree
Hide file tree
Showing 5 changed files with 180 additions and 1 deletion.
1 change: 1 addition & 0 deletions velox/common/compression/v2/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ add_library(
GzipCompression.cpp
HadoopCompressionFormat.cpp
Lz4Compression.cpp
SnappyCompression.cpp
ZstdCompression.cpp)

target_link_libraries(
Expand Down
7 changes: 6 additions & 1 deletion velox/common/compression/v2/Compression.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "velox/common/base/Exceptions.h"
#include "velox/common/compression/v2/GzipCompression.h"
#include "velox/common/compression/v2/Lz4Compression.h"
#include "velox/common/compression/v2/SnappyCompression.h"
#include "velox/common/compression/v2/ZstdCompression.h"

namespace facebook::velox::common {
Expand All @@ -45,6 +46,7 @@ void Codec::init() {}
bool Codec::supportsGetUncompressedLength(CompressionKind kind) {
switch (kind) {
case CompressionKind_ZSTD:
case CompressionKind_SNAPPY:
return true;
default:
return false;
Expand Down Expand Up @@ -142,6 +144,9 @@ std::unique_ptr<Codec> Codec::create(
case CompressionKind::CompressionKind_ZSTD:
codec = makeZstdCodec(compressionLevel);
break;
case CompressionKind::CompressionKind_SNAPPY:
codec = makeSnappyCodec();
break;
default:
break;
}
Expand Down Expand Up @@ -171,8 +176,8 @@ bool Codec::isAvailable(CompressionKind kind) {
case CompressionKind::CompressionKind_GZIP:
case CompressionKind::CompressionKind_ZLIB:
case CompressionKind::CompressionKind_ZSTD:
return true;
case CompressionKind::CompressionKind_SNAPPY:
return true;
case CompressionKind::CompressionKind_LZO:
default:
return false;
Expand Down
105 changes: 105 additions & 0 deletions velox/common/compression/v2/SnappyCompression.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "velox/common/compression/v2/SnappyCompression.h"
#include "velox/common/base/Exceptions.h"

namespace facebook::velox::common {

uint64_t SnappyCodec::decompress(
uint64_t inputLength,
const uint8_t* input,
uint64_t outputLength,
uint8_t* output) {
size_t decompressedSize;
VELOX_CHECK(
snappy::GetUncompressedLength(
reinterpret_cast<const char*>(input),
static_cast<size_t>(inputLength),
&decompressedSize),
"Corrupt snappy compressed data.");
VELOX_CHECK_GE(outputLength, decompressedSize, "Output length is too small");
VELOX_CHECK(
snappy::RawUncompress(
reinterpret_cast<const char*>(input),
static_cast<size_t>(inputLength),
reinterpret_cast<char*>(output)),
"Corrupt snappy compressed data.");
return static_cast<uint64_t>(decompressedSize);
}

uint64_t SnappyCodec::maxCompressedLength(uint64_t inputLength) {
DCHECK_GE(inputLength, 0);
return static_cast<uint64_t>(
snappy::MaxCompressedLength(static_cast<size_t>(inputLength)));
}

uint64_t SnappyCodec::compress(
uint64_t inputLength,
const uint8_t* input,
uint64_t outputLength,
uint8_t* output) {
size_t output_size;
snappy::RawCompress(
reinterpret_cast<const char*>(input),
static_cast<size_t>(inputLength),
reinterpret_cast<char*>(output),
&output_size);
return static_cast<uint64_t>(output_size);
}

std::shared_ptr<Compressor> SnappyCodec::makeCompressor() {
VELOX_UNSUPPORTED("Streaming compression unsupported with Snappy");
}

std::shared_ptr<Decompressor> SnappyCodec::makeDecompressor() {
VELOX_UNSUPPORTED("Streaming decompression unsupported with Snappy");
}

CompressionKind SnappyCodec::compressionKind() const {
return CompressionKind_SNAPPY;
}

int32_t SnappyCodec::minimumCompressionLevel() const {
return kUseDefaultCompressionLevel;
}

int32_t SnappyCodec::maximumCompressionLevel() const {
return kUseDefaultCompressionLevel;
}

int32_t SnappyCodec::defaultCompressionLevel() const {
return kUseDefaultCompressionLevel;
}

std::optional<uint64_t> SnappyCodec::doGetUncompressedLength(
uint64_t inputLength,
const uint8_t* input,
std::optional<uint64_t> uncompressedLength) const {
size_t decompressedSize;
if (!snappy::GetUncompressedLength(
reinterpret_cast<const char*>(input),
static_cast<size_t>(inputLength),
&decompressedSize)) {
return uncompressedLength;
}
return static_cast<uint64_t>(decompressedSize);
}

std::unique_ptr<Codec> makeSnappyCodec() {
return std::make_unique<SnappyCodec>();
}
} // namespace facebook::velox::common
64 changes: 64 additions & 0 deletions velox/common/compression/v2/SnappyCompression.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

// Derived from Apache Arrow.

#include <snappy.h>
#include <cstddef>
#include <cstdint>
#include <memory>
#include "velox/common/compression/v2/Compression.h"

namespace facebook::velox::common {

class SnappyCodec : public Codec {
public:
uint64_t decompress(
uint64_t inputLength,
const uint8_t* input,
uint64_t outputLength,
uint8_t* output) override;

uint64_t compress(
uint64_t inputLength,
const uint8_t* input,
uint64_t outputLength,
uint8_t* output) override;

uint64_t maxCompressedLength(uint64_t inputLength) override;

std::shared_ptr<Compressor> makeCompressor() override;

std::shared_ptr<Decompressor> makeDecompressor() override;

CompressionKind compressionKind() const override;

int32_t minimumCompressionLevel() const override;

int32_t maximumCompressionLevel() const override;

int32_t defaultCompressionLevel() const override;

private:
std::optional<uint64_t> doGetUncompressedLength(
uint64_t inputLength,
const uint8_t* input,
std::optional<uint64_t> uncompressedLength) const override;
};

std::unique_ptr<Codec> makeSnappyCodec();

} // namespace facebook::velox::common
4 changes: 4 additions & 0 deletions velox/common/compression/v2/tests/CompressionTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,10 @@ INSTANTIATE_TEST_SUITE_P(
TestZstd,
CodecTest,
::testing::Values(CompressionKind::CompressionKind_ZSTD));
INSTANTIATE_TEST_SUITE_P(
TestSnappy,
CodecTest,
::testing::Values(CompressionKind::CompressionKind_SNAPPY));

TEST(CodecLZ4HadoopTest, compatibility) {
// LZ4 Hadoop codec should be able to read back LZ4 raw blocks.
Expand Down

0 comments on commit 3647955

Please sign in to comment.