-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Add unified compression API and lz4_frame/lz4_raw/lz4_hadoop codec #7589
base: main
Are you sure you want to change the base?
feat: Add unified compression API and lz4_frame/lz4_raw/lz4_hadoop codec #7589
Conversation
✅ Deploy Preview for meta-velox canceled.
|
70fd61a
to
26dbdea
Compare
@mbasmanova Could you help to review? Thanks! |
|
||
ret = LZ4F_createCompressionContext(&ctx_, LZ4F_VERSION); | ||
if (LZ4F_isError(ret)) { | ||
lz4Error(ret, "LZ4 init failed: "); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any missing content for the error message?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lz4Error
will try to expand the error message for the return code. Switched the param order to make it clear for the reader.
60d52b2
to
6f462b9
Compare
@mbasmanova Could you help to review? Thanks! |
Asynchronous Compression API is not is this PR, right? |
@yaqi-zhao Yes. As suggested here #7471 (comment) We shall add synchronous API first. |
Hi @pedroerp @mbasmanova , @marin-ma , may we prioritize the async mode interface implementation at the same time? Some key partners are waiting for the optimization to do Velox PoC validation. Thanks! |
I don't think so. Let's have good design then implement the features step by step as we talked. It's bad idea to merge an unmature code firstly then refactor it. it doesn't block the customer PoC if they want to use Yaqi's draft one, currently the issue blocked the customer PoC is how to generate the data which IAA can decompress, not the PRs. |
@FelixYBW The blocked issue is not the data generation, #7437 is merged and there is no block in this issue. |
@marin-ma Some high-level comments. |
@majetideepak Thank you for the review.
This was based on the discussion in #7471 (comment) The new folder "v2" is intended to introduce the new API first. Next, I will replace the compression API used in parquet and dwio module with the new API. Meanwhile,
This is a user-level API, which can be useful when users want to set different compression levels, such as in a Parquet writer. Given that the minimum and maximum compression levels can vary among different compression codecs,
This should indeed make the review process more straightforward. However, I'm unsure about the practicality of replacing only one codec with the new API while keeping the original ones for the rest. Does this mean that we should temporarily disable other codecs until they can be integrated individually? Or perhaps you have a more efficient suggestion for this replacement process? |
6f462b9
to
22f7435
Compare
Thanks for this pointer. Let's continue with the steps outlined in that issue. I will make a pass today. |
7232970
to
8fb0bea
Compare
@pedroerp @mbasmanova Can you review the PR? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some comments on the main Compression.h/cpp API.
I will look at the Hadoop and Lz4 compression code next.
uint64_t bytesWritten; | ||
bool outputTooSmall; | ||
}; | ||
struct FlushResult { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: add a new line above struct FlushResult
and struct EndResult
.
}; | ||
|
||
/// Compress some input. | ||
/// If bytes_read is 0 on return, then a larger output buffer should be |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CompressResult.bytesRead
uint8_t* output) = 0; | ||
|
||
/// Flush part of the compressed output. | ||
/// If outputTooSmall is true on return, flush() should be called again |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FlushResult.outputTooSmall
virtual FlushResult flush(uint64_t outputLength, uint8_t* output) = 0; | ||
|
||
/// End compressing, doing whatever is necessary to end the stream. | ||
/// If outputTooSmall is true on return, end() should be called again |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
EndResult.outputTooSmall
/// If outputTooSmall is true on return, end() should be called again | ||
/// with a larger buffer. Otherwise, the Compressor should not be used | ||
/// anymore. | ||
/// end() implies flush(). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you clarify what end() implies flush()
means?
} | ||
auto actualLength = | ||
doGetUncompressedLength(inputLength, input, uncompressedLength); | ||
if (actualLength) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actualLength > 0
auto actualLength = | ||
doGetUncompressedLength(inputLength, input, uncompressedLength); | ||
if (actualLength) { | ||
if (uncompressedLength) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
uncompressedLength > 0
VELOX_USER_CHECK_EQ( | ||
*actualLength, | ||
*uncompressedLength, | ||
"Invalid uncompressed length: {}.", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
clarify that expected uncompressed length {uncompressedLength} = {actualLength}
/// be written in this call will be written in subsequent calls to this | ||
/// function. This is useful when fixed-size compression blocks are required | ||
/// by the caller. | ||
/// Note: Only Gzip and Zstd codec supports this function. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we have an API supportsPartialCompression
?
/// function. This is useful when fixed-size compression blocks are required | ||
/// by the caller. | ||
/// Note: Only Gzip and Zstd codec supports this function. | ||
virtual uint64_t compressPartial( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The API name compressPartial
is misleading since this is still a one-shot compression.
But I can't think of an alternative name either :)
@@ -76,6 +76,10 @@ std::string compressionKindToString(CompressionKind kind) { | |||
return "lz4"; | |||
case CompressionKind_GZIP: | |||
return "gzip"; | |||
case CompressionKind_LZ4RAW: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can you please group these two with the lz4 above?
@@ -0,0 +1,29 @@ | |||
# Copyright (c) Facebook, Inc. and its affiliates. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is v2 folder supposed to replace its parent in the future? This structure is confusing, for example, you put Lz4Compression in this folder, while the original lzoDecompressor is in the parent directory. Also, how are you going to organize compressionKindToCodec, etc? If the intention is to replace the current compress/decompress interface, it would be better to just make changes to the parent (velox/common/compression) folder, so we can see what the structure of the interfaces are.
/// If bytes_read is 0 on return, then a larger output buffer should be | ||
/// supplied. | ||
virtual CompressResult compress( | ||
uint64_t inputLength, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think Velox convention is to have the array first, length second. Same for both input and output.
std::numeric_limits<int32_t>::min(); | ||
|
||
// Streaming compressor interface. | ||
class Compressor { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I actually think the naming of Compressor and Codec is a bit confusing unless you are familiar with Arrow. In velox::common, there is "encode" folder which contains integer encoding/decoding. Compression codecs don't belong there, but they are also named as codec. And people won't think "Compressor" is actually for streaming compression use. With Velox naming conventions, I think it's better to name them as StreamingCompressor and Compressor. Actually even in Arrow, they would call a codec "decompressor" or "compressor". e.g. in column_reader
decompressor_ = GetCodec(codec);
uint64_t compressedSize = 0; | ||
compressed.resize(10); | ||
bool doFlush = false; | ||
// Generate small random input buffer size. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add an empty line above all comments in this function to improve readability
uint64_t remaining = compressed.size(); | ||
uint64_t decompressedSize = 0; | ||
decompressed.resize(10); | ||
// Generate small random input buffer size. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add an empty line above all comments in this function to improve readability
|
||
// Check the streaming compressor against one-shot decompression. | ||
void checkStreamingCompressor(Codec* codec, const std::vector<uint8_t>& data) { | ||
// Run streaming compression. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add an empty line above all comments in this function to improve readability. Same for the next function
I actually find this approach not very crystal clear. For one thing, we don't know which file will be in which folder. Also I think the Compression.h/cpp should be merged with the ones in the velox/common/compression folder. I think you can achieve the same goal without disrupting Velox code base by just putting the code to where they should belong to. |
be335e4
to
6e1b203
Compare
25a90c5
to
8253b53
Compare
@pedroerp Could you help to review again? Thanks! |
if(VELOX_ENABLE_COMPRESSION_LZ4) | ||
list(APPEND VELOX_COMMON_COMPRESSION_SRCS Lz4Compression.cpp | ||
HadoopCompressionFormat.cpp) | ||
list(APPEND VELOX_COMMON_COMPRESSION_LINK_LIBS lz4::lz4) | ||
endif() | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please avoid using variables in this way. Use velox_target_sources
and velox_link_libraries
instead. Both can be called multiple times on the same target.
CMakeLists.txt
Outdated
if(VELOX_ENABLE_COMPRESSION_LZ4) | ||
add_definitions(-DVELOX_ENABLE_COMPRESSION_LZ4) | ||
endif() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move this into common/compression and use velox_compile_definitions(velox_common_compression PUBLIC VELOX_ENABLE_COMPRESSION_LZ4)
(-D is stripped and readded anyway)
0724c0b
to
41dd6e7
Compare
@assignUser Could you help review this again? It seems that the UT failures are not related to this PR. Thanks! |
HadoopCompressionFormat.cpp) | ||
velox_link_libraries(velox_common_compression PUBLIC lz4::lz4) | ||
velox_compile_definitions(velox_common_compression | ||
PRIVATE VELOX_ENABLE_COMPRESSION_LZ4) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah it's not used in a header so PRIVATE is better, good catch!
@@ -117,6 +117,7 @@ option(VELOX_ENABLE_PARQUET "Enable Parquet support" OFF) | |||
option(VELOX_ENABLE_ARROW "Enable Arrow support" OFF) | |||
option(VELOX_ENABLE_REMOTE_FUNCTIONS "Enable remote function support" OFF) | |||
option(VELOX_ENABLE_CCACHE "Use ccache if installed." ON) | |||
option(VELOX_ENABLE_COMPRESSION_LZ4 "Enable Lz4 compression support." OFF) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We probably do not need this option.
4f69737
to
6963bb7
Compare
@pedroerp Could you help to review again? Meanwhile, I'm working on the next patch to add more compression codecs. Thanks! |
6963bb7
to
56f9540
Compare
@pedroerp I have addressed the comments regarding the |
1aa55c1
to
472cfe9
Compare
Initial implementation of the proposed unified compression API. This patch defines the Compression Codec API inspired by Apache Arrow and adds missing functions used in Velox. Adds support for codecs LZ4_FRAME, LZ4_RAW, and LZ4_HADOOP. Include unit tests.
Discussion: #7471