Skip to content

Commit

Permalink
add async codec
Browse files Browse the repository at this point in the history
  • Loading branch information
marin-ma committed Nov 17, 2023
1 parent 5a56d2f commit b41f926
Show file tree
Hide file tree
Showing 5 changed files with 269 additions and 37 deletions.
48 changes: 48 additions & 0 deletions velox/common/compression/v2/Compression.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ std::unique_ptr<Codec> Codec::create(
if (auto opt =
dynamic_cast<const iaa::IaaGzipCodecOptions*>(&codecOptions)) {
codec = iaa::makeIaaGzipCodec(compressionLevel, opt->maxJobNumber);
break;
}
#endif
codec = makeGzipCodec(compressionLevel);
Expand Down Expand Up @@ -237,4 +238,51 @@ uint64_t Codec::compressPartial(
uint8_t* output) {
VELOX_UNSUPPORTED("'{}' doesn't support partial compression", name());
}

bool AsyncCodec::isAvailable(CompressionKind kind) {
switch (kind) {
case CompressionKind::CompressionKind_NONE:
case CompressionKind::CompressionKind_GZIP:
return true;
default:
return false;
}
}

std::unique_ptr<AsyncCodec> AsyncCodec::create(
CompressionKind kind,
const CodecOptions& codecOptions) {
if (!isAvailable(kind)) {
auto name = compressionKindToString(kind);
if (folly::StringPiece({name}).startsWith("unknown")) {
VELOX_UNSUPPORTED("Unrecognized codec '{}'", name);
}
VELOX_UNSUPPORTED("Support for codec '{}' not implemented.", name);
}

auto compressionLevel = codecOptions.compressionLevel;
if (compressionLevel != kUseDefaultCompressionLevel) {
checkSupportsCompressionLevel(kind);
}

std::unique_ptr<AsyncCodec> codec;
switch (kind) {
case CompressionKind::CompressionKind_NONE:
return nullptr;
case CompressionKind::CompressionKind_GZIP:
#ifdef VELOX_ENABLE_IAA
if (auto opt =
dynamic_cast<const iaa::IaaGzipCodecOptions*>(&codecOptions)) {
codec = iaa::makeIaaGzipAsyncCodec(compressionLevel, opt->maxJobNumber);
break;
}
#endif
return nullptr;
default:
VELOX_UNREACHABLE("Unknown compression kind: {}", kind);
}

codec->init();
return codec;
}
} // namespace facebook::velox::common
26 changes: 25 additions & 1 deletion velox/common/compression/v2/Compression.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#pragma once

#include <folly/futures/Future.h>
#include <cstdint>
#include <limits>
#include <memory>
Expand Down Expand Up @@ -234,7 +235,7 @@ class Codec {
return kUseDefaultCompressionLevel;
}

private:
protected:
/// Initializes the codec's resources.
virtual void init();

Expand All @@ -243,4 +244,27 @@ class Codec {
const uint8_t* input,
std::optional<uint64_t> uncompressedLength) const;
};

class AsyncCodec : public virtual Codec {
public:
virtual ~AsyncCodec() override = default;

static bool isAvailable(CompressionKind kind);

std::unique_ptr<AsyncCodec> create(
CompressionKind kind,
const CodecOptions& codecOptions);

virtual folly::SemiFuture<uint64_t> decompressAsync(
uint64_t inputLength,
const uint8_t* input,
uint64_t outputLength,
uint8_t* output) = 0;

virtual folly::SemiFuture<uint64_t> compressAsync(
uint64_t inputLength,
const uint8_t* input,
uint64_t outputLength,
uint8_t* output) = 0;
};
} // namespace facebook::velox::common
29 changes: 29 additions & 0 deletions velox/common/compression/v2/GzipCompression.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,35 @@ class GzipCodec : public Codec {
bool decompressorInitialized_{false};
};

class DummyGzipAsyncCodec : public GzipCodec, public AsyncCodec {
public:
DummyGzipAsyncCodec()
: GzipCodec(
kGzipDefaultCompressionLevel,
GzipFormat::kGzip,
kGzip4KBWindowBits) {}

folly::SemiFuture<uint64_t> decompressAsync(
uint64_t srcLength,
const uint8_t* src,
uint64_t destLength,
uint8_t* dest) override {
return folly::makeSemiFuture().deferValue([&](auto&&) -> uint64_t {
return GzipCodec::decompress(srcLength, src, destLength, dest);
});
}

folly::SemiFuture<uint64_t> compressAsync(
uint64_t srcLength,
const uint8_t* src,
uint64_t destLength,
uint8_t* dest) override {
return folly::makeSemiFuture().deferValue([&](auto&&) -> uint64_t {
return GzipCodec::compress(srcLength, src, destLength, dest);
});
}
};

std::unique_ptr<Codec> makeGzipCodec(
int compressionLevel = kGzipDefaultCompressionLevel,
GzipFormat format = GzipFormat::kGzip,
Expand Down
Loading

0 comments on commit b41f926

Please sign in to comment.