Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Experimental parquet decoder with first-class selection pushdown support #6921

Open
wants to merge 23 commits into
base: main
Choose a base branch
from

Conversation

XiangpengHao
Copy link
Contributor

@XiangpengHao XiangpengHao commented Dec 30, 2024

Which issue does this PR close?

Many long lasting issues in DataFusion and Parquet. Note that this PR may or may not close these issues, but (imo) it will be the foundation to future more optimizations (e.g., more aggressive selection pushdown as described in this paper).

Why selection pushdown?

Selection pushdown (or late materialization or row-filter or filter pushdown) is great in concept, but can be tricky to implement efficiently. For example, current straightforward implementation actually slow down many queries, which prevents query engine like DataFusion to enable filter pushdown by default. The goal of a super fast row-filter pushdown parquet reader is described by @alamb in #5523 (comment):

is that evaluating predicates in ArrowFilter (aka pushed down predicates) is never worse than decoding the columns first and then filtering them with the filter kernel

Previous discussions have listed many potential optimizations to current selection pushdown, like the ones in #5523 (comment).
However, it's not clear how we can incorporate those optimizations into the current implementation. After thinking more carefully about the design spaces and the implications, I believe the only way to reach that goal is to re-structure the parquet reading pipline, and also reuse as much existing implementation as possible.

Current implementation and the problems

We currently implement a two phase decoding:
Phase 1: Build selectors on each predicate

Empty selector -> decode column 1 -> selection 1
Selection 1 -> decode column 2 -> selection 2
…
Selection K

Phase 2: Decode parquet data using the selector

Selection K -> decode column 1
Selection K -> decode column 2
…
Selection K -> decode column N

The problem is that we have to decode the predicate column twice, for example, if column 1 is being filtered, we need to first decode column 1 while evaluating the predicate, then decode it again to build the array.

Caching is the solution but not that simple

The high level intuition is that, if the problem is decoding twice, we simply cache the first decoding results and reuse later.

Here are the nuances:

  1. In the first stage of decoding, we build the filter over the entire column. To cache the decoded array, we need to cache the entire decoded column in memory, which can be prohibitively expensive.
  2. Each predicate inherit the selection from the previous evaluation, meaning that each predicates have different selection, which is different from the final selection. It's very non-trivial to convert the cached intermediate record batch to the final record batch, as the selection is completely different
  3. The columns being filtered may not be the columns in the final output, and it is especially challenging when we have nested columns, which requires us to do sub-tree matching.

Proposed solutions

The solution consists two parts:

  1. A new decoding pipeline that pushdown the predicate evaluation down to record batch decoding.
  2. **A carefully designed cache that consumes constant amount of memory (up to 2 page per column) to the column size. The extra memory overhead is therefore negligible. **

The pipeline looks like this:

Load predicate columns for batch 1 -> evaluate predicates -> filter 1 -> load & emit batch 1
Load predicate columns for batch 2 -> evaluate predicates -> filter 2 -> load & emit batch 2
...
Load predicate columns for batch N -> evaluate predicates -> filter N -> load & emit batch N

Once we have this pipeline, we can cache the predicate columns for batch N and reuse it when load & emit batch N, this avoids double decoding.

Due to the difficulties mentioned above, this PR cache the decompressed pages, rather than decoded arrow arrays. As some research suggests decompressing pages costs up to twice as much as decoding arrow, if we can cache the decompressed pages, then we only need to decode arrow twice, which might be good enough. Caching decompressed pages is much simpler to implement, as we can reuse the current array_readers and just implement a new PageReader.

What changes are included in this PR?

This PR only implements a reader for async record batch stream. Sync version is left as future work, and should be straightforward based on the async version.

Are there any user-facing changes?

No. The same ParquetRecordBatchStream, will automatically benefit from the changes.

@github-actions github-actions bot added parquet Changes to the parquet crate arrow Changes to the arrow crate labels Dec 30, 2024
@XiangpengHao
Copy link
Contributor Author

Would be great if @alamb @tustvold can take a look and let me know your thoughts!

@alamb
Copy link
Contributor

alamb commented Dec 30, 2024

Would be great if @alamb @tustvold can take a look and let me know your thoughts!

😍 -- looks amazing. I plan to work on arrow-rs reviews tomorrow. I;ll put this one on the top of the list

@XiangpengHao
Copy link
Contributor Author

Implemented some more optimizations and tuning, here are ClickBench numbers on my machine. TLDR: about 15% total time reduction.

We first compare no-pushdown vs our new push down implementation. Only Q27 has meaningful slow down, other queries are either similar or much faster.

The fix for Q27 requires us to actually switch to a boolean mask-based selector implementation, like the one in #6624

┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━━┳━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query        ┃ no-pushdown ┃ new-pushdown ┃        Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━━╇━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 0     │      0.47ms │       0.43ms │ +1.10x faster │
│ QQuery 1     │     51.10ms │      50.10ms │     no change │
│ QQuery 2     │     68.23ms │      64.49ms │ +1.06x faster │
│ QQuery 3     │     90.68ms │      86.73ms │     no change │
│ QQuery 4     │    458.93ms │     458.59ms │     no change │
│ QQuery 5     │    522.06ms │     478.50ms │ +1.09x faster │
│ QQuery 6     │     49.84ms │      49.94ms │     no change │
│ QQuery 7     │     55.09ms │      55.77ms │     no change │
│ QQuery 8     │    565.26ms │     556.95ms │     no change │
│ QQuery 9     │    575.83ms │     575.05ms │     no change │
│ QQuery 10    │    164.56ms │     178.23ms │  1.08x slower │
│ QQuery 11    │    177.20ms │     191.32ms │  1.08x slower │
│ QQuery 12    │    591.05ms │     569.92ms │     no change │
│ QQuery 13    │    861.06ms │     848.59ms │     no change │
│ QQuery 14    │    596.20ms │     580.73ms │     no change │
│ QQuery 15    │    554.96ms │     548.77ms │     no change │
│ QQuery 16    │   1175.08ms │    1146.07ms │     no change │
│ QQuery 17    │   1150.45ms │    1121.49ms │     no change │
│ QQuery 18    │   2634.75ms │    2494.07ms │ +1.06x faster │
│ QQuery 19    │     90.15ms │      89.24ms │     no change │
│ QQuery 20    │    620.15ms │     591.67ms │     no change │
│ QQuery 21    │    782.38ms │     703.15ms │ +1.11x faster │
│ QQuery 22    │   1927.94ms │    1404.35ms │ +1.37x faster │
│ QQuery 23    │   8104.11ms │    3610.76ms │ +2.24x faster │
│ QQuery 24    │    360.79ms │     330.55ms │ +1.09x faster │
│ QQuery 25    │    290.61ms │     252.54ms │ +1.15x faster │
│ QQuery 26    │    395.18ms │     362.72ms │ +1.09x faster │
│ QQuery 27    │    891.76ms │     959.39ms │  1.08x slower │
│ QQuery 28    │   4059.54ms │    4137.37ms │     no change │
│ QQuery 29    │    235.88ms │     228.99ms │     no change │
│ QQuery 30    │    564.22ms │     584.65ms │     no change │
│ QQuery 31    │    741.20ms │     757.87ms │     no change │
│ QQuery 32    │   2652.48ms │    2574.19ms │     no change │
│ QQuery 33    │   2373.71ms │    2327.10ms │     no change │
│ QQuery 34    │   2391.00ms │    2342.15ms │     no change │
│ QQuery 35    │    700.79ms │     694.51ms │     no change │
│ QQuery 36    │    151.51ms │     152.93ms │     no change │
│ QQuery 37    │    108.18ms │      86.03ms │ +1.26x faster │
│ QQuery 38    │    114.64ms │     106.22ms │ +1.08x faster │
│ QQuery 39    │    260.80ms │     239.13ms │ +1.09x faster │
│ QQuery 40    │     60.74ms │      73.29ms │  1.21x slower │
│ QQuery 41    │     58.75ms │      67.85ms │  1.15x slower │
│ QQuery 42    │     65.49ms │      68.11ms │     no change │
└──────────────┴─────────────┴──────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┓
┃ Benchmark Summary           ┃            ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━┩
│ Total Time (no-pushdown)    │ 38344.79ms │
│ Total Time (new-pushdown)   │ 32800.50ms │
│ Average Time (no-pushdown)  │   891.74ms │
│ Average Time (new-pushdown) │   762.80ms │
│ Queries Faster              │         13 │
│ Queries Slower              │          5 │
│ Queries with No Change      │         25 │
└─────────────────────────────┴────────────┘

Now we compare our new implementation with the old pushdown implementation -- only Q23 is a bit slower, others are either faster or similar.

We do need some extra work to get the optimal performance of Q23. Nonetheless, we are faster than no-pushdown. I believe getting a fix for Q23 does not require foundamental changes to the existing decoding pipeline.

┏━━━━━━━━━━━━━━┳━━━━━━━━━━━┳━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query        ┃  pushdown ┃ new-pushdown ┃        Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━╇━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 0     │    0.48ms │       0.43ms │ +1.12x faster │
│ QQuery 1     │   51.49ms │      50.10ms │     no change │
│ QQuery 2     │   67.83ms │      64.49ms │     no change │
│ QQuery 3     │   89.68ms │      86.73ms │     no change │
│ QQuery 4     │  469.88ms │     458.59ms │     no change │
│ QQuery 5     │  523.97ms │     478.50ms │ +1.10x faster │
│ QQuery 6     │   50.37ms │      49.94ms │     no change │
│ QQuery 7     │   56.89ms │      55.77ms │     no change │
│ QQuery 8     │  560.69ms │     556.95ms │     no change │
│ QQuery 9     │  583.14ms │     575.05ms │     no change │
│ QQuery 10    │  155.75ms │     178.23ms │  1.14x slower │
│ QQuery 11    │  170.31ms │     191.32ms │  1.12x slower │
│ QQuery 12    │  723.13ms │     569.92ms │ +1.27x faster │
│ QQuery 13    │ 1181.34ms │     848.59ms │ +1.39x faster │
│ QQuery 14    │  736.95ms │     580.73ms │ +1.27x faster │
│ QQuery 15    │  551.74ms │     548.77ms │     no change │
│ QQuery 16    │ 1171.99ms │    1146.07ms │     no change │
│ QQuery 17    │ 1152.34ms │    1121.49ms │     no change │
│ QQuery 18    │ 2555.82ms │    2494.07ms │     no change │
│ QQuery 19    │   84.20ms │      89.24ms │  1.06x slower │
│ QQuery 20    │  606.77ms │     591.67ms │     no change │
│ QQuery 21    │  704.86ms │     703.15ms │     no change │
│ QQuery 22    │ 1633.53ms │    1404.35ms │ +1.16x faster │
│ QQuery 23    │ 2691.84ms │    3610.76ms │  1.34x slower │
│ QQuery 24    │  528.09ms │     330.55ms │ +1.60x faster │
│ QQuery 25    │  465.38ms │     252.54ms │ +1.84x faster │
│ QQuery 26    │  562.40ms │     362.72ms │ +1.55x faster │
│ QQuery 27    │ 1121.76ms │     959.39ms │ +1.17x faster │
│ QQuery 28    │ 4455.16ms │    4137.37ms │ +1.08x faster │
│ QQuery 29    │  234.18ms │     228.99ms │     no change │
│ QQuery 30    │  596.22ms │     584.65ms │     no change │
│ QQuery 31    │  754.21ms │     757.87ms │     no change │
│ QQuery 32    │ 2570.52ms │    2574.19ms │     no change │
│ QQuery 33    │ 2357.37ms │    2327.10ms │     no change │
│ QQuery 34    │ 2377.89ms │    2342.15ms │     no change │
│ QQuery 35    │  703.78ms │     694.51ms │     no change │
│ QQuery 36    │  162.29ms │     152.93ms │ +1.06x faster │
│ QQuery 37    │  129.96ms │      86.03ms │ +1.51x faster │
│ QQuery 38    │   90.79ms │     106.22ms │  1.17x slower │
│ QQuery 39    │  220.71ms │     239.13ms │  1.08x slower │
│ QQuery 40    │   72.87ms │      73.29ms │     no change │
│ QQuery 41    │   70.04ms │      67.85ms │     no change │
│ QQuery 42    │   68.17ms │      68.11ms │     no change │
└──────────────┴───────────┴──────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┓
┃ Benchmark Summary           ┃            ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━┩
│ Total Time (pushdown)       │ 34116.80ms │
│ Total Time (new-pushdown)   │ 32800.50ms │
│ Average Time (pushdown)     │   793.41ms │
│ Average Time (new-pushdown) │   762.80ms │
│ Queries Faster              │         13 │
│ Queries Slower              │          6 │
│ Queries with No Change      │         24 │
└─────────────────────────────┴────────────┘

@XiangpengHao
Copy link
Contributor Author

XiangpengHao commented Jan 1, 2025

The implementation of course lacks tons of tests (I tried to mannually verify the clickbench results). If the high level stuff looks good, I'll try to send break down PRs that has tests and documentations.

Like most performance related PRs, some of the code changes can be very non-intuitive, please let me know and I'll try my best to explain why some codes has to implement in that way

@alamb
Copy link
Contributor

alamb commented Jan 1, 2025

Starting to check it out

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @XiangpengHao -- TLDR I think this POC looks really nice and the overall structure makes sense to me. I am willing to help review this PR as it moves closer to reality

There are obvious ways to break this PR up into pieces, which is a nice bonus -- the core caching logic is fairly localized

cc @thinkharderdev @tustvold @Dandandan @etseidl for your comments / reviews as well

I also think the description on the PR is quite good and easy to follow -- thank you for that

(todo: cite myself)

😆 my favorite part of the description

if we can cache the decompressed pages, then we only need to decode arrow twice, which might be good enough.

We can also consider caching arrow as a follow on PR / project. If this initial PR effectively avoids decompressing each page twice (though it still decodes each page to arrow twice) that still seems better than the current main branch which decompresses and decodes twice.

parquet/Cargo.toml Show resolved Hide resolved
parquet/src/arrow/array_reader/byte_view_array.rs Outdated Show resolved Hide resolved
parquet/src/arrow/mod.rs Outdated Show resolved Hide resolved
parquet/src/file/serialized_reader.rs Outdated Show resolved Hide resolved
parquet/src/arrow/async_reader/mod.rs Outdated Show resolved Hide resolved
arrow-buffer/src/buffer/boolean.rs Outdated Show resolved Hide resolved
parquet/src/arrow/async_reader/arrow_reader.rs Outdated Show resolved Hide resolved
parquet/src/arrow/async_reader/arrow_reader.rs Outdated Show resolved Hide resolved
parquet/src/arrow/async_reader/arrow_reader.rs Outdated Show resolved Hide resolved
Copy link
Contributor

@thinkharderdev thinkharderdev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

very nice @XiangpengHao. I think this makes a lot of sense.

parquet/src/arrow/async_reader/arrow_reader.rs Outdated Show resolved Hide resolved
parquet/src/arrow/async_reader/arrow_reader.rs Outdated Show resolved Hide resolved
@XiangpengHao XiangpengHao changed the title [POC] Experimental parquet decoder with first-class filter pushdown support [POC] Experimental parquet decoder with first-class selection pushdown support Jan 9, 2025
@XiangpengHao XiangpengHao marked this pull request as ready for review January 10, 2025 16:56
@XiangpengHao
Copy link
Contributor Author

Now that we have most of the leaf changes merged, it's finally time for the big change here! I have renovated the PR description as well as many documents added. Please let me know anything I can help to clarify!

@XiangpengHao XiangpengHao changed the title [POC] Experimental parquet decoder with first-class selection pushdown support Experimental parquet decoder with first-class selection pushdown support Jan 10, 2025
@alamb
Copy link
Contributor

alamb commented Jan 12, 2025

Thanks @XiangpengHao -- I plan to review this more carefully tomorrow, but I first want to finish up / merge

(aka #6668 is before this PR in my queue)

@Rachelint
Copy link
Contributor

As some research suggests decompressing pages costs up to twice as much as decoding arrow

I agree. We found decompression in arrow is really expensive and designed a specific cache for it in our production enviroment, and decode is very effecient actually.

@@ -683,7 +685,7 @@ impl ByteViewArrayDecoderDelta {

/// Check that `val` is a valid UTF-8 sequence
pub fn check_valid_utf8(val: &[u8]) -> Result<()> {
match std::str::from_utf8(val) {
match simdutf8::basic::from_utf8(val) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Filed #7014

Copy link
Contributor

@alamb alamb Jan 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there is something wrong with how github is displaying this diff -- this callsite was changed to use simdutf8 in this PR

The version on main doesn't have any calls to from_utf8:
https://github.com/apache/arrow-rs/blob/main/parquet/src/arrow/array_reader/byte_view_array.rs#L381

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just checked and this function goes away if main is merged in.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
parquet Changes to the parquet crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants