From 490c080e5ba7a50efc862da9508e6669900549ee Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Thu, 7 Dec 2023 15:33:42 +0200 Subject: [PATCH] Parquet: Ensure page statistics are written only when conifgured from the Arrow Writer (#5181) * Issue fix and tests * Cleanup tests --- parquet/src/arrow/arrow_writer/mod.rs | 143 +++++++++++++++++++++++++- parquet/src/column/writer/mod.rs | 29 +++--- 2 files changed, 158 insertions(+), 14 deletions(-) diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index ea7b1eee99b8..e6e95d50996a 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -916,8 +916,9 @@ mod tests { use crate::basic::Encoding; use crate::data_type::AsBytes; use crate::file::metadata::ParquetMetaData; + use crate::file::page_index::index::Index; use crate::file::page_index::index_reader::read_pages_locations; - use crate::file::properties::{ReaderProperties, WriterVersion}; + use crate::file::properties::{EnabledStatistics, ReaderProperties, WriterVersion}; use crate::file::serialized_reader::ReadOptionsBuilder; use crate::file::{ reader::{FileReader, SerializedFileReader}, @@ -2738,4 +2739,144 @@ mod tests { assert_eq!(index[0][0].len(), 1); // 1 page assert_eq!(index[0][1].len(), 1); // 1 page } + + #[test] + fn test_disabled_statistics_with_page() { + let file_schema = Schema::new(vec![ + Field::new("a", DataType::Utf8, true), + Field::new("b", DataType::Utf8, true), + ]); + let file_schema = Arc::new(file_schema); + + let batch = RecordBatch::try_new( + file_schema.clone(), + vec![ + Arc::new(StringArray::from(vec!["a", "b", "c", "d"])) as _, + Arc::new(StringArray::from(vec!["w", "x", "y", "z"])) as _, + ], + ) + .unwrap(); + + let props = WriterProperties::builder() + .set_statistics_enabled(EnabledStatistics::None) + .set_column_statistics_enabled("a".into(), EnabledStatistics::Page) + .build(); + + let mut buf = Vec::with_capacity(1024); + let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), Some(props)).unwrap(); + writer.write(&batch).unwrap(); + + let metadata = writer.close().unwrap(); + assert_eq!(metadata.row_groups.len(), 1); + let row_group = &metadata.row_groups[0]; + assert_eq!(row_group.columns.len(), 2); + // Column "a" has both offset and column index, as requested + assert!(row_group.columns[0].offset_index_offset.is_some()); + assert!(row_group.columns[0].column_index_offset.is_some()); + // Column "b" should only have offset index + assert!(row_group.columns[1].offset_index_offset.is_some()); + assert!(row_group.columns[1].column_index_offset.is_none()); + + let options = ReadOptionsBuilder::new().with_page_index().build(); + let reader = SerializedFileReader::new_with_options(Bytes::from(buf), options).unwrap(); + + let row_group = reader.get_row_group(0).unwrap(); + let a_col = row_group.metadata().column(0); + let b_col = row_group.metadata().column(1); + + // Column chunk of column "a" should have chunk level statistics + if let Statistics::ByteArray(byte_array_stats) = a_col.statistics().unwrap() { + let min = byte_array_stats.min(); + let max = byte_array_stats.max(); + + assert_eq!(min.as_bytes(), &[b'a']); + assert_eq!(max.as_bytes(), &[b'd']); + } else { + panic!("expecting Statistics::ByteArray"); + } + + // The column chunk for column "b" shouldn't have statistics + assert!(b_col.statistics().is_none()); + + let offset_index = reader.metadata().offset_index().unwrap(); + assert_eq!(offset_index.len(), 1); // 1 row group + assert_eq!(offset_index[0].len(), 2); // 2 columns + + let column_index = reader.metadata().column_index().unwrap(); + assert_eq!(column_index.len(), 1); // 1 row group + assert_eq!(column_index[0].len(), 2); // 2 columns + + let a_idx = &column_index[0][0]; + assert!(matches!(a_idx, Index::BYTE_ARRAY(_)), "{a_idx:?}"); + let b_idx = &column_index[0][1]; + assert!(matches!(b_idx, Index::NONE), "{b_idx:?}"); + } + + #[test] + fn test_disabled_statistics_with_chunk() { + let file_schema = Schema::new(vec![ + Field::new("a", DataType::Utf8, true), + Field::new("b", DataType::Utf8, true), + ]); + let file_schema = Arc::new(file_schema); + + let batch = RecordBatch::try_new( + file_schema.clone(), + vec![ + Arc::new(StringArray::from(vec!["a", "b", "c", "d"])) as _, + Arc::new(StringArray::from(vec!["w", "x", "y", "z"])) as _, + ], + ) + .unwrap(); + + let props = WriterProperties::builder() + .set_statistics_enabled(EnabledStatistics::None) + .set_column_statistics_enabled("a".into(), EnabledStatistics::Chunk) + .build(); + + let mut buf = Vec::with_capacity(1024); + let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), Some(props)).unwrap(); + writer.write(&batch).unwrap(); + + let metadata = writer.close().unwrap(); + assert_eq!(metadata.row_groups.len(), 1); + let row_group = &metadata.row_groups[0]; + assert_eq!(row_group.columns.len(), 2); + // Column "a" should only have offset index + assert!(row_group.columns[0].offset_index_offset.is_some()); + assert!(row_group.columns[0].column_index_offset.is_none()); + // Column "b" should only have offset index + assert!(row_group.columns[1].offset_index_offset.is_some()); + assert!(row_group.columns[1].column_index_offset.is_none()); + + let options = ReadOptionsBuilder::new().with_page_index().build(); + let reader = SerializedFileReader::new_with_options(Bytes::from(buf), options).unwrap(); + + let row_group = reader.get_row_group(0).unwrap(); + let a_col = row_group.metadata().column(0); + let b_col = row_group.metadata().column(1); + + // Column chunk of column "a" should have chunk level statistics + if let Statistics::ByteArray(byte_array_stats) = a_col.statistics().unwrap() { + let min = byte_array_stats.min(); + let max = byte_array_stats.max(); + + assert_eq!(min.as_bytes(), &[b'a']); + assert_eq!(max.as_bytes(), &[b'd']); + } else { + panic!("expecting Statistics::ByteArray"); + } + + // The column chunk for column "b" shouldn't have statistics + assert!(b_col.statistics().is_none()); + + let column_index = reader.metadata().column_index().unwrap(); + assert_eq!(column_index.len(), 1); // 1 row group + assert_eq!(column_index[0].len(), 2); // 2 columns + + let a_idx = &column_index[0][0]; + assert!(matches!(a_idx, Index::NONE), "{a_idx:?}"); + let b_idx = &column_index[0][1]; + assert!(matches!(b_idx, Index::NONE), "{b_idx:?}"); + } } diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs index 5dd7747c6fc2..531af4bd461e 100644 --- a/parquet/src/column/writer/mod.rs +++ b/parquet/src/column/writer/mod.rs @@ -764,19 +764,22 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { self.column_metrics.num_column_nulls += self.page_metrics.num_page_nulls; - let page_statistics = match (values_data.min_value, values_data.max_value) { - (Some(min), Some(max)) => { - update_min(&self.descr, &min, &mut self.column_metrics.min_column_value); - update_max(&self.descr, &max, &mut self.column_metrics.max_column_value); - Some(ValueStatistics::new( - Some(min), - Some(max), - None, - self.page_metrics.num_page_nulls, - false, - )) - } - _ => None, + let page_statistics = if let (Some(min), Some(max)) = + (values_data.min_value, values_data.max_value) + { + // Update chunk level statistics + update_min(&self.descr, &min, &mut self.column_metrics.min_column_value); + update_max(&self.descr, &max, &mut self.column_metrics.max_column_value); + + (self.statistics_enabled == EnabledStatistics::Page).then_some(ValueStatistics::new( + Some(min), + Some(max), + None, + self.page_metrics.num_page_nulls, + false, + )) + } else { + None }; // update column and offset index