Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parquet: omit min/max for interval columns when writing stats #5147

Merged
merged 2 commits into from
Nov 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions parquet/src/column/writer/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
use bytes::Bytes;
use half::f16;

use crate::basic::{Encoding, LogicalType, Type};
use crate::basic::{ConvertedType, Encoding, LogicalType, Type};
use crate::bloom_filter::Sbbf;
use crate::column::writer::{
compare_greater, fallback_encoding, has_dictionary_support, is_nan, update_max, update_min,
Expand Down Expand Up @@ -137,7 +137,10 @@ pub struct ColumnValueEncoderImpl<T: DataType> {

impl<T: DataType> ColumnValueEncoderImpl<T> {
fn write_slice(&mut self, slice: &[T::T]) -> Result<()> {
if self.statistics_enabled == EnabledStatistics::Page {
if self.statistics_enabled == EnabledStatistics::Page
// INTERVAL has undefined sort order, so don't write min/max stats for it
&& self.descr.converted_type() != ConvertedType::INTERVAL
{
if let Some((min, max)) = self.min_max(slice, None) {
update_min(&self.descr, &min, &mut self.min_value);
update_max(&self.descr, &max, &mut self.max_value);
Expand Down
59 changes: 50 additions & 9 deletions parquet/src/column/writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,10 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
// If only computing chunk-level statistics compute them here, page-level statistics
// are computed in [`Self::write_mini_batch`] and used to update chunk statistics in
// [`Self::add_data_page`]
if self.statistics_enabled == EnabledStatistics::Chunk {
if self.statistics_enabled == EnabledStatistics::Chunk
// INTERVAL has undefined sort order, so don't write min/max stats for it
&& self.descr.converted_type() != ConvertedType::INTERVAL
{
match (min, max) {
(Some(min), Some(max)) => {
update_min(&self.descr, min, &mut self.column_metrics.min_column_value);
Expand Down Expand Up @@ -1093,7 +1096,6 @@ fn is_nan<T: ParquetValueType>(descr: &ColumnDescriptor, val: &T) -> bool {
///
/// If `cur` is `None`, sets `cur` to `Some(val)`, otherwise calls `should_update` with
/// the value of `cur`, and updates `cur` to `Some(val)` if it returns `true`

fn update_stat<T: ParquetValueType, F>(
descr: &ColumnDescriptor,
val: &T,
Expand Down Expand Up @@ -3066,6 +3068,30 @@ mod tests {
Ok(())
}

#[test]
fn test_interval_stats_should_not_have_min_max() {
let input = [
vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1],
vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2],
]
.into_iter()
.map(|s| ByteArray::from(s).into())
.collect::<Vec<_>>();

let page_writer = get_test_page_writer();
let mut writer = get_test_interval_column_writer(page_writer);
writer.write_batch(&input, None, None).unwrap();

let metadata = writer.close().unwrap().metadata;
let stats = if let Some(Statistics::FixedLenByteArray(stats)) = metadata.statistics() {
stats.clone()
} else {
panic!("metadata missing statistics");
};
assert!(!stats.has_min_max_set());
}

fn write_multiple_pages<T: DataType>(
column_descr: &Arc<ColumnDescriptor>,
pages: &[&[Option<T::T>]],
Expand Down Expand Up @@ -3395,8 +3421,7 @@ mod tests {
values: &[FixedLenByteArray],
) -> ValueStatistics<FixedLenByteArray> {
let page_writer = get_test_page_writer();
let props = Default::default();
let mut writer = get_test_float16_column_writer(page_writer, 0, 0, props);
let mut writer = get_test_float16_column_writer(page_writer);
writer.write_batch(values, None, None).unwrap();

let metadata = writer.close().unwrap().metadata;
Expand All @@ -3409,12 +3434,9 @@ mod tests {

fn get_test_float16_column_writer(
page_writer: Box<dyn PageWriter>,
max_def_level: i16,
max_rep_level: i16,
props: WriterPropertiesPtr,
) -> ColumnWriterImpl<'static, FixedLenByteArrayType> {
let descr = Arc::new(get_test_float16_column_descr(max_def_level, max_rep_level));
let column_writer = get_column_writer(descr, props, page_writer);
let descr = Arc::new(get_test_float16_column_descr(0, 0));
let column_writer = get_column_writer(descr, Default::default(), page_writer);
get_typed_column_writer::<FixedLenByteArrayType>(column_writer)
}

Expand All @@ -3429,6 +3451,25 @@ mod tests {
ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
}

fn get_test_interval_column_writer(
page_writer: Box<dyn PageWriter>,
) -> ColumnWriterImpl<'static, FixedLenByteArrayType> {
let descr = Arc::new(get_test_interval_column_descr());
let column_writer = get_column_writer(descr, Default::default(), page_writer);
get_typed_column_writer::<FixedLenByteArrayType>(column_writer)
}

fn get_test_interval_column_descr() -> ColumnDescriptor {
let path = ColumnPath::from("col");
let tpe =
SchemaType::primitive_type_builder("col", FixedLenByteArrayType::get_physical_type())
.with_length(12)
.with_converted_type(ConvertedType::INTERVAL)
.build()
.unwrap();
ColumnDescriptor::new(Arc::new(tpe), 0, 0, path)
}

/// Returns column writer for UINT32 Column provided as ConvertedType only
fn get_test_unsigned_int_given_as_converted_column_writer<'a, T: DataType>(
page_writer: Box<dyn PageWriter + 'a>,
Expand Down
Loading