-
Notifications
You must be signed in to change notification settings - Fork 483
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ORC-262: [C++] Support async io prefetch for orc c++ lib #2048
base: main
Are you sure you want to change the base?
Conversation
|
It is totally decided by users to choose whether to prefetch the whole orc file or single/multiple columns in single stripe or single column in single/multiple stripes. It is better letting user invoke |
@wgtmac That's a great work. We could do more improvements on IO latency hiding after it is merged. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have just finished the initial review. Thanks @taiyang-li! Please see my inline comments. My main concern is the usability that it requires user to call preBuffer
instead of automatically prefetching required data.
Could you resolve the conflicts, @taiyang-li ? |
Done. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still have some concerns about the public API. Please see my inline comments.
BTW, I believe void preBuffer(const std::vector<int>& stripes, const std::list<uint64_t>& includeTypes)
is a little bit coarse. We need to think about how to work together with selective read (e.g. when predicate pushdown is able to filter most rows).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the update and benchmark! I think we should not add the readAsync function which returns a DataBuffer. Please see my inline comments.
c8016c0
to
0e14b04
Compare
@wgtmac thanks for your advice. I had already finished the requested changes. Do you think the pr is ready to be merged ? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@@ -624,6 +647,21 @@ namespace orc { | |||
*/ | |||
virtual std::map<uint32_t, RowGroupIndex> getRowGroupIndex( | |||
uint32_t stripeIndex, const std::set<uint32_t>& included = {}) const = 0; | |||
|
|||
/** | |||
* Trigger IO prefetch and cache the prefetched contents asynchronously. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add the expectation when it is called multiple times w/ or w/o overlapping ranges. Also it is good to mention that it is thread safe.
@@ -19,6 +19,7 @@ | |||
#include "StripeStream.hh" | |||
#include "RLE.hh" | |||
#include "Reader.hh" | |||
#include "io/Cache.hh" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please sort alphabetically.
@@ -37,7 +38,8 @@ namespace orc { | |||
stripeStart_(stripeStart), | |||
input_(input), | |||
writerTimezone_(writerTimezone), | |||
readerTimezone_(readerTimezone) { | |||
readerTimezone_(readerTimezone), | |||
readCache_(reader.getReadCache()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You might directly call RowReaderImpl.getFileContents()
to get readCache as suggested above.
for (size_t i = 0; i < num_stripes; ++i) { | ||
stripes.push_back(i); | ||
} | ||
reader->preBuffer(stripes, {0}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add test case where preBuffer is called for multiple times and with different stripe/column, etc.
Co-authored-by: Gang Wu <[email protected]>
Co-authored-by: Gang Wu <[email protected]>
Co-authored-by: Gang Wu <[email protected]>
Co-authored-by: Gang Wu <[email protected]>
Co-authored-by: Gang Wu <[email protected]>
Co-authored-by: Gang Wu <[email protected]>
Co-authored-by: Gang Wu <[email protected]>
bool hit_cache = false; | ||
if (it != entries_.end() && it->range.contains(range)) { | ||
hit_cache = it->future.valid(); | ||
it->future.get(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If it->future.valid()
returns false, we might encounter an exception here.
std::vector<ReadRange> coalesce(std::vector<ReadRange> ranges) const; | ||
}; | ||
|
||
std::vector<ReadRange> coalesceReadRanges(std::vector<ReadRange> ranges, uint64_t holeSizeLimit, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about moving coalesceReadRanges
into struct ReadRangeCombiner
as a static function? Actually I think a separate coalesceReadRanges function is redundant.
std::vector<ReadRange> coalesceReadRanges(std::vector<ReadRange> ranges, uint64_t holeSizeLimit, | ||
uint64_t rangeSizeLimit); | ||
struct RangeCacheEntry { | ||
using BufferPtr = InputStream::BufferPtr; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can directly use std::shared_ptr now.
BufferPtr buffer; | ||
std::shared_future<void> future; | ||
|
||
RangeCacheEntry() = default; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should it be delete
?
RangeCacheEntry(const ReadRange& range, BufferPtr buffer, std::future<void> future) | ||
: range(range), buffer(std::move(buffer)), future(std::move(future).share()) {} | ||
|
||
friend bool operator<(const RangeCacheEntry& left, const RangeCacheEntry& right) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why friend
?
using Buffer = InputStream::Buffer; | ||
using BufferPtr = InputStream::BufferPtr; | ||
|
||
struct BufferSlice { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To be consistent, struct BufferSlice
should not be a nest class as well.
private: | ||
std::vector<RangeCacheEntry> makeCacheEntries(const std::vector<ReadRange>& ranges); | ||
|
||
InputStream* stream_; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please either remove blank lines or add a blank line between member variables to keep consistency.
entries_.erase(entries_.begin(), it); | ||
} | ||
|
||
std::vector<RangeCacheEntry> ReadRangeCache::makeCacheEntries( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: make it const or static
|
||
auto itr = ranges.begin(); | ||
// Ensure ranges is not empty. | ||
assert(itr <= ranges.end()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this assert is unnecessary
uint64_t coalescedStart = itr->offset; | ||
uint64_t coalescedEnd = coalescedStart + itr->length; | ||
|
||
for (++itr; itr < ranges.end(); ++itr) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have iterated the ranges for three times: line 31, line 41 and here. It can be done in a single pass after sorting.
bool hit_cache = false; | ||
if (it != entries_.end() && it->range.contains(range)) { | ||
hit_cache = it->future.valid(); | ||
it->future.get(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we catch and rethrow an orc::Exception?
Should we use timeout here to fallback to direct read?
What changes were proposed in this pull request?
Support async io prefetch for orc c++ lib. Close https://issues.apache.org/jira/browse/ORC-262
Changes:
InputStream::readAsync
(default unimplemented). It reads io asynchronously within the specified range.ReadRangeCache
to cache async io results. This borrows from a similar design of Parquet Reader in https://github.com/apache/arrowReader::preBuffer
to trigger io prefetch. In the specific implementation ofReaderImpl::preBuffer
, the io ranges will be calculated according to the selected stripe and columns, and then these ranges will be merged and sorted, andReadRangeCache::cache
will be called to trigger the asynchronous io in the background, waiting for the use of the upper layerReader::releaseBuffer
, which is used to release all cached io ranges before an offsetWhy are the changes needed?
Async io prefetch could hide io latency during reading orc files, which improves performance of scan operators in ClickHouse.
How was this patch tested?
Was this patch authored or co-authored using generative AI tooling?