-
Notifications
You must be signed in to change notification settings - Fork 911
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support for progressive parquet chunked reading. #14079
Support for progressive parquet chunked reading. #14079
Conversation
…amount of memory used for decompression and other scratch space when decoding, which causes the reader to make multiple 'passes' over the set of row groups to be read. Signed-off-by: db <[email protected]>
…g a vector instead of referencing it.
a519c12
to
3772e7a
Compare
_impl = std::make_unique<impl>( | ||
chunk_read_limit, pass_read_limit, std::move(sources), options, stream, mr); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we move this into the initializer list of this constructor?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can't initialize a parent's member as part of your own initialization list.
void load_global_chunk_info(); | ||
void compute_input_pass_row_group_info(); | ||
void setup_pass(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've removed a lot of member functions in our previous work for chunked reader, because they can be just free functions. So if possible, please remove such declaration and make them free functions. Having them as member functions, you will have to maintain their signatures here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If these were made free functions they would all have to be passed large numbers of parameters, which would the code a lot harder to read.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
review.flush()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some comments on first pass. I'll make another one tomorrow with a clear head.
* | ||
* @param chunk_read_limit Limit on total number of bytes to be returned per read, | ||
* or `0` if there is no limit | ||
* @param pass_read_limit Limit on the amount of memory used for reading and decompressing data or |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Above it says decompression limit, but here it says decompress and reading. I am also concerned by the soft limit. This seems like the thing that you have hard limits. Should it explode on over limit or is the OOM the explosion and that is why it is considered a soft limit?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a soft limit because it will attempt to continue even if it can't meet the limit. For example, if the user specified 1 MB
and it can't fit even one row group (say 50 MB
) into that size, it will still attempt to read/decompress one row group at a time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
another flush, mostly comments coming from difficulty to understand the code.
Still got some parts to dig into.
…hostdevice_vector as it was unnecessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
few more small suggestions, looks good overall.
I think the biggest issue for me is the fragility of the test, which we discussed offline and found a solution for.
…encoding to keep the hardcoded uncompressed size predictable.
/merge |
Previously, the parquet chunked reader operated by controlling the size of output chunks only. It would still ingest the entire input file and decompress it, which can take up a considerable amount of memory. With this new 'progressive' support, we also 'chunk' at the input level. Specifically, the user can pass a
pass_read_limit
value which controls how much memory is used for storing compressed/decompressed data. The reader will make multiple passes over the file, reading as many row groups as it can to attempt to fit within this limit. Within each pass, chunks are emitted as before.From the external user's perspective, the chunked read mechanism is the same. You call
has_next()
andread_chunk()
. If the user has specified a value forpass_read_limit
the set of chunks produced might end up being different (although the concatenation of all of them will still be the same).The core idea of the code change is to add the idea of the internal
pass
. Previously we had afile_intermediate_data
which held data acrossread_chunk()
calls. There is now apass_intermediate_data
struct which holds information specific to a given pass. Many of the invariant things from the file level before (row groups and chunks to process) are now stored in the pass intermediate data. As we begin each pass, we take the subset of global row groups and chunks that we are going to process for this pass, copy them to out intermediate data, and the remainder of the reader reference this instead of the file-level data.In order to avoid breaking pre-existing interfaces, there's a new contructor for the
chunked_parquet_reader
class:Checklist