Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add peek_next_page_offset to SerializedPageReader #6945

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 142 additions & 0 deletions parquet/src/file/serialized_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,63 @@ impl<R: ChunkReader> SerializedPageReader<R> {
physical_type: meta.column_type(),
})
}

/// Similar to `peek_next_page`, but returns the offset of the next page instead of the page metadata.
/// Unlike page metadata, an offset can uniquely identify a page.
///
/// This is used when we need to read parquet with row-filter, and we don't want to decompress the page twice.
/// This function allows us to check if the next page is being cached or read previously.
#[cfg(test)]
fn peek_next_page_offset(&mut self) -> Result<Option<usize>> {
match &mut self.state {
SerializedPageReaderState::Values {
offset,
remaining_bytes,
next_page_header,
} => {
loop {
if *remaining_bytes == 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my only real concern is the fact that this body has so much duplication with peek_next_page (especially in the SerializedPageReaderState::Values block)

it is also somewhat strange it is in a different impl block than peek_next_page (I would have expected it to be next to it) but maybe I missed some generic subtlety

I tried a few ways to avoid the duplication and I didn't really find any good way to do so,

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

has so much duplication with peek_next_page

Agree, I tried to make peek_next_page to return an offset as well, but has no luck to easily do it.

in a different impl block than peek_next_page

I think it's because peek_next_page is in PageReader trait

return Ok(None);
}
return if let Some(header) = next_page_header.as_ref() {
if let Ok(_page_meta) = PageMetadata::try_from(&**header) {
Ok(Some(*offset))
} else {
// For unknown page type (e.g., INDEX_PAGE), skip and read next.
*next_page_header = None;
continue;
}
} else {
let mut read = self.reader.get_read(*offset as u64)?;
let (header_len, header) = read_page_header_len(&mut read)?;
*offset += header_len;
*remaining_bytes -= header_len;
let page_meta = if let Ok(_page_meta) = PageMetadata::try_from(&header) {
Ok(Some(*offset))
} else {
// For unknown page type (e.g., INDEX_PAGE), skip and read next.
continue;
};
*next_page_header = Some(Box::new(header));
page_meta
};
}
}
SerializedPageReaderState::Pages {
page_locations,
dictionary_page,
..
} => {
if let Some(page) = dictionary_page {
Ok(Some(page.offset as usize))
} else if let Some(page) = page_locations.front() {
Ok(Some(page.offset as usize))
} else {
Ok(None)
}
}
}
}
}

impl<R: ChunkReader> Iterator for SerializedPageReader<R> {
Expand Down Expand Up @@ -763,6 +820,8 @@ impl<R: ChunkReader> PageReader for SerializedPageReader<R> {

#[cfg(test)]
mod tests {
use std::collections::HashSet;

use bytes::Buf;

use crate::file::properties::{EnabledStatistics, WriterProperties};
Expand Down Expand Up @@ -1068,6 +1127,89 @@ mod tests {
assert_eq!(page_count, 2);
}

fn get_serialized_page_reader<R: ChunkReader>(
file_reader: &SerializedFileReader<R>,
row_group: usize,
column: usize,
) -> Result<SerializedPageReader<R>> {
let row_group = {
let row_group_metadata = file_reader.metadata.row_group(row_group);
let props = Arc::clone(&file_reader.props);
let f = Arc::clone(&file_reader.chunk_reader);
SerializedRowGroupReader::new(
f,
row_group_metadata,
file_reader
.metadata
.offset_index()
.map(|x| x[row_group].as_slice()),
props,
)?
};

let col = row_group.metadata.column(column);

let page_locations = row_group
.offset_index
.map(|x| x[column].page_locations.clone());

let props = Arc::clone(&row_group.props);
SerializedPageReader::new_with_properties(
Arc::clone(&row_group.chunk_reader),
col,
row_group.metadata.num_rows() as usize,
page_locations,
props,
)
}

#[test]
fn test_peek_next_page_offset_matches_actual() -> Result<()> {
let test_file = get_test_file("alltypes_plain.parquet");
let reader = SerializedFileReader::new(test_file)?;

let mut offset_set = HashSet::new();
let num_row_groups = reader.metadata.num_row_groups();
for row_group in 0..num_row_groups {
let num_columns = reader.metadata.row_group(row_group).num_columns();
for column in 0..num_columns {
let mut page_reader = get_serialized_page_reader(&reader, row_group, column)?;

while let Ok(Some(page_offset)) = page_reader.peek_next_page_offset() {
match &page_reader.state {
SerializedPageReaderState::Pages {
page_locations,
dictionary_page,
..
} => {
if let Some(page) = dictionary_page {
assert_eq!(page.offset as usize, page_offset);
} else if let Some(page) = page_locations.front() {
assert_eq!(page.offset as usize, page_offset);
} else {
unreachable!()
}
}
SerializedPageReaderState::Values {
offset,
next_page_header,
..
} => {
assert!(next_page_header.is_some());
assert_eq!(*offset, page_offset);
}
}
let page = page_reader.get_next_page()?;
assert!(page.is_some());
let newly_inserted = offset_set.insert(page_offset);
assert!(newly_inserted);
}
}
}

Ok(())
}

#[test]
fn test_page_iterator() {
let file = get_test_file("alltypes_plain.parquet");
Expand Down
Loading