From dff22ea180dcdeaa171bd3c813a1f1f9afc13500 Mon Sep 17 00:00:00 2001 From: Xiangpeng Hao Date: Wed, 7 Aug 2024 02:14:43 -0400 Subject: [PATCH] support string view in stats --- .../src/datasource/file_format/parquet.rs | 112 ++++++++++++++---- .../physical_plan/parquet/row_group_filter.rs | 2 + datafusion/core/tests/parquet/page_pruning.rs | 13 +- 3 files changed, 103 insertions(+), 24 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index f233f3842c8c..55295795e24f 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -332,7 +332,7 @@ impl FileFormat for ParquetFormat { async fn infer_stats( &self, - _state: &SessionState, + state: &SessionState, store: &Arc, table_schema: SchemaRef, object: &ObjectMeta, @@ -342,6 +342,11 @@ impl FileFormat for ParquetFormat { table_schema, object, self.metadata_size_hint(), + state + .config_options() + .execution + .parquet + .schema_force_string_view, ) .await?; Ok(stats) @@ -481,9 +486,10 @@ async fn fetch_statistics( table_schema: SchemaRef, file: &ObjectMeta, metadata_size_hint: Option, + force_string_view: bool, ) -> Result { let metadata = fetch_parquet_metadata(store, file, metadata_size_hint).await?; - statistics_from_parquet_meta_calc(&metadata, table_schema) + statistics_from_parquet_meta_calc(&metadata, table_schema, force_string_view) } /// Convert statistics in [`ParquetMetaData`] into [`Statistics`] using ['StatisticsConverter`] @@ -493,6 +499,7 @@ async fn fetch_statistics( pub fn statistics_from_parquet_meta_calc( metadata: &ParquetMetaData, table_schema: SchemaRef, + force_string_view: bool, ) -> Result { let row_groups_metadata = metadata.row_groups(); @@ -514,10 +521,13 @@ pub fn statistics_from_parquet_meta_calc( statistics.total_byte_size = Precision::Exact(total_byte_size); let file_metadata = metadata.file_metadata(); - let file_schema = parquet_to_arrow_schema( + let mut file_schema = parquet_to_arrow_schema( file_metadata.schema_descr(), file_metadata.key_value_metadata(), )?; + if force_string_view { + file_schema = transform_schema_to_view(&file_schema); + } statistics.column_statistics = if has_statistics { let (mut max_accs, mut min_accs) = create_max_min_accs(&table_schema); @@ -578,7 +588,7 @@ pub async fn statistics_from_parquet_meta( metadata: &ParquetMetaData, table_schema: SchemaRef, ) -> Result { - statistics_from_parquet_meta_calc(metadata, table_schema) + statistics_from_parquet_meta_calc(metadata, table_schema, false) } fn summarize_min_max_null_counts( @@ -1278,8 +1288,20 @@ mod tests { let format = ParquetFormat::default(); let schema = format.infer_schema(&ctx, &store, &meta).await.unwrap(); - let stats = - fetch_statistics(store.as_ref(), schema.clone(), &meta[0], None).await?; + let use_string_view = ctx + .config_options() + .execution + .parquet + .schema_force_string_view; + + let stats = fetch_statistics( + store.as_ref(), + schema.clone(), + &meta[0], + None, + use_string_view, + ) + .await?; assert_eq!(stats.num_rows, Precision::Exact(3)); let c1_stats = &stats.column_statistics[0]; @@ -1287,7 +1309,9 @@ mod tests { assert_eq!(c1_stats.null_count, Precision::Exact(1)); assert_eq!(c2_stats.null_count, Precision::Exact(3)); - let stats = fetch_statistics(store.as_ref(), schema, &meta[1], None).await?; + let stats = + fetch_statistics(store.as_ref(), schema, &meta[1], None, use_string_view) + .await?; assert_eq!(stats.num_rows, Precision::Exact(3)); let c1_stats = &stats.column_statistics[0]; let c2_stats = &stats.column_statistics[1]; @@ -1460,15 +1484,25 @@ mod tests { let session = SessionContext::new(); let ctx = session.state(); + let use_string_view = ctx + .config_options() + .execution + .parquet + .schema_force_string_view; let format = ParquetFormat::default().with_metadata_size_hint(Some(9)); let schema = format .infer_schema(&ctx, &store.upcast(), &meta) .await .unwrap(); - let stats = - fetch_statistics(store.upcast().as_ref(), schema.clone(), &meta[0], Some(9)) - .await?; + let stats = fetch_statistics( + store.upcast().as_ref(), + schema.clone(), + &meta[0], + Some(9), + use_string_view, + ) + .await?; assert_eq!(stats.num_rows, Precision::Exact(3)); let c1_stats = &stats.column_statistics[0]; @@ -1500,6 +1534,7 @@ mod tests { schema.clone(), &meta[0], Some(size_hint), + use_string_view, ) .await?; @@ -1548,7 +1583,15 @@ mod tests { // Fetch statistics for first file let pq_meta = fetch_parquet_metadata(store.as_ref(), &files[0], None).await?; - let stats = statistics_from_parquet_meta_calc(&pq_meta, schema.clone())?; + let stats = statistics_from_parquet_meta_calc( + &pq_meta, + schema.clone(), + state + .config_options() + .execution + .parquet + .schema_force_string_view, + )?; assert_eq!(stats.num_rows, Precision::Exact(4)); // column c_dic @@ -1590,25 +1633,49 @@ mod tests { let format = ParquetFormat::default(); let schema = format.infer_schema(&state, &store, &files).await.unwrap(); + let use_string_view = state + .config_options() + .execution + .parquet + .schema_force_string_view; + let null_i64 = ScalarValue::Int64(None); - let null_utf8 = ScalarValue::Utf8(None); + let null_utf8 = if use_string_view { + ScalarValue::Utf8View(None) + } else { + ScalarValue::Utf8(None) + }; // Fetch statistics for first file + let pq_meta = fetch_parquet_metadata(store.as_ref(), &files[0], None).await?; - let stats = statistics_from_parquet_meta_calc(&pq_meta, schema.clone())?; + let stats = + statistics_from_parquet_meta_calc(&pq_meta, schema.clone(), use_string_view)?; // assert_eq!(stats.num_rows, Precision::Exact(3)); // column c1 let c1_stats = &stats.column_statistics[0]; assert_eq!(c1_stats.null_count, Precision::Exact(1)); - assert_eq!( - c1_stats.max_value, - Precision::Exact(ScalarValue::Utf8(Some("bar".to_string()))) - ); - assert_eq!( - c1_stats.min_value, - Precision::Exact(ScalarValue::Utf8(Some("Foo".to_string()))) - ); + if use_string_view { + assert_eq!( + c1_stats.max_value, + Precision::Exact(ScalarValue::Utf8View(Some("bar".to_string()))) + ); + assert_eq!( + c1_stats.min_value, + Precision::Exact(ScalarValue::Utf8View(Some("Foo".to_string()))) + ); + } else { + assert_eq!( + c1_stats.max_value, + Precision::Exact(ScalarValue::Utf8(Some("bar".to_string()))) + ); + assert_eq!( + c1_stats.min_value, + Precision::Exact(ScalarValue::Utf8(Some("Foo".to_string()))) + ); + } + // column c2: missing from the file so the table treats all 3 rows as null let c2_stats = &stats.column_statistics[1]; assert_eq!(c2_stats.null_count, Precision::Exact(3)); @@ -1617,7 +1684,8 @@ mod tests { // Fetch statistics for second file let pq_meta = fetch_parquet_metadata(store.as_ref(), &files[1], None).await?; - let stats = statistics_from_parquet_meta_calc(&pq_meta, schema.clone())?; + let stats = + statistics_from_parquet_meta_calc(&pq_meta, schema.clone(), use_string_view)?; assert_eq!(stats.num_rows, Precision::Exact(3)); // column c1: missing from the file so the table treats all 3 rows as null let c1_stats = &stats.column_statistics[0]; diff --git a/datafusion/core/src/datasource/physical_plan/parquet/row_group_filter.rs b/datafusion/core/src/datasource/physical_plan/parquet/row_group_filter.rs index 6a6910748fc8..249a1c52b312 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/row_group_filter.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/row_group_filter.rs @@ -265,7 +265,9 @@ impl PruningStatistics for BloomFilterStatistics { .map(|value| { match value { ScalarValue::Utf8(Some(v)) => sbbf.check(&v.as_str()), + ScalarValue::Utf8View(Some(v)) => sbbf.check(&v.as_str()), ScalarValue::Binary(Some(v)) => sbbf.check(v), + ScalarValue::BinaryView(Some(v)) => sbbf.check(v), ScalarValue::FixedSizeBinary(_size, Some(v)) => sbbf.check(v), ScalarValue::Boolean(Some(v)) => sbbf.check(v), ScalarValue::Float64(Some(v)) => sbbf.check(v), diff --git a/datafusion/core/tests/parquet/page_pruning.rs b/datafusion/core/tests/parquet/page_pruning.rs index 15efd4bcd9dd..d7813168889b 100644 --- a/datafusion/core/tests/parquet/page_pruning.rs +++ b/datafusion/core/tests/parquet/page_pruning.rs @@ -29,7 +29,7 @@ use datafusion::physical_plan::ExecutionPlan; use datafusion::prelude::SessionContext; use datafusion_common::{ScalarValue, ToDFSchema}; use datafusion_expr::execution_props::ExecutionProps; -use datafusion_expr::{col, lit, Expr}; +use datafusion_expr::{cast, col, lit, Expr}; use datafusion_physical_expr::create_physical_expr; use futures::StreamExt; @@ -150,7 +150,16 @@ async fn page_index_filter_one_col() { let task_ctx = session_ctx.task_ctx(); // 5.create filter date_string_col == 1; - let filter = col("date_string_col").eq(lit("01/01/09")); + let force_string_view = state + .config_options() + .execution + .parquet + .schema_force_string_view; + let filter = if force_string_view { + col("date_string_col").eq(cast(lit("01/01/09"), arrow_schema::DataType::Utf8View)) + } else { + col("date_string_col").eq(lit("01/01/09")) + }; let parquet_exec = get_parquet_exec(&state, filter).await; let mut results = parquet_exec.execute(0, task_ctx.clone()).unwrap(); let batch = results.next().await.unwrap().unwrap();