From f43782acce158e2203dbedfe65acefaa9297247e Mon Sep 17 00:00:00 2001 From: zhuqi-lucas <821684824@qq.com> Date: Sat, 28 Dec 2024 16:49:59 +0800 Subject: [PATCH] Find a way to communicate the ordering of a file back with the existing listing table implementation --- datafusion/common/src/stats.rs | 13 +- datafusion/core/src/dataframe/mod.rs | 170 ++++++++++++++++++ datafusion/core/src/dataframe/parquet.rs | 24 ++- .../src/datasource/file_format/parquet.rs | 5 + .../core/src/datasource/listing/table.rs | 49 ++++- .../physical_plan/file_scan_config.rs | 3 + datafusion/core/src/datasource/statistics.rs | 8 + .../src/physical_optimizer/join_selection.rs | 8 + .../core/tests/custom_sources_cases/mod.rs | 2 + .../tests/custom_sources_cases/statistics.rs | 9 +- datafusion/core/tests/sql/explain_analyze.rs | 2 +- .../physical-plan/src/aggregates/mod.rs | 3 + datafusion/physical-plan/src/common.rs | 4 + datafusion/physical-plan/src/filter.rs | 11 ++ .../physical-plan/src/joins/cross_join.rs | 11 +- datafusion/physical-plan/src/joins/utils.rs | 20 ++- datafusion/physical-plan/src/projection.rs | 3 + datafusion/physical-plan/src/union.rs | 4 + datafusion/physical-plan/src/values.rs | 2 + .../src/windows/bounded_window_agg_exec.rs | 1 + .../src/windows/window_agg_exec.rs | 2 + .../proto/datafusion_common.proto | 1 + datafusion/proto-common/src/from_proto/mod.rs | 1 + .../proto-common/src/generated/pbjson.rs | 9 + .../proto-common/src/generated/prost.rs | 5 + datafusion/proto-common/src/to_proto/mod.rs | 1 + .../src/generated/datafusion_proto_common.rs | 5 + .../tests/cases/roundtrip_physical_plan.rs | 3 + 28 files changed, 364 insertions(+), 15 deletions(-) diff --git a/datafusion/common/src/stats.rs b/datafusion/common/src/stats.rs index d2ce965c5c493..2826bc6afb0ba 100644 --- a/datafusion/common/src/stats.rs +++ b/datafusion/common/src/stats.rs @@ -17,6 +17,7 @@ //! This module provides data structures to represent statistics +use std::collections::HashMap; use std::fmt::{self, Debug, Display}; use crate::{Result, ScalarValue}; @@ -223,6 +224,12 @@ pub struct Statistics { /// Statistics on a column level. It contains a [`ColumnStatistics`] for /// each field in the schema of the table to which the [`Statistics`] refer. pub column_statistics: Vec, + /// Additional metadata about the statistics. + /// For example: + /// key: "sort_by" + /// value: "timestamp ASC NULLS LAST" + /// It will be used to optimize the query execution, when we know the data is sorted. + pub metadata: HashMap, } impl Statistics { @@ -233,6 +240,7 @@ impl Statistics { num_rows: Precision::Absent, total_byte_size: Precision::Absent, column_statistics: Statistics::unknown_column(schema), + metadata: HashMap::new(), } } @@ -419,8 +427,8 @@ impl Display for Statistics { write!( f, - "Rows={}, Bytes={}, [{}]", - self.num_rows, self.total_byte_size, column_stats + "Rows={}, Bytes={}, [{}], metadata: {:?}", + self.num_rows, self.total_byte_size, column_stats, self.metadata )?; Ok(()) @@ -638,6 +646,7 @@ mod tests { num_rows: Precision::Exact(42), total_byte_size: Precision::Exact(500), column_statistics: counts.into_iter().map(col_stats_i64).collect(), + metadata: HashMap::new(), } } diff --git a/datafusion/core/src/dataframe/mod.rs b/datafusion/core/src/dataframe/mod.rs index 60a09301ae0fd..47446fa83976f 100644 --- a/datafusion/core/src/dataframe/mod.rs +++ b/datafusion/core/src/dataframe/mod.rs @@ -1977,6 +1977,10 @@ mod tests { use crate::prelude::{CsvReadOptions, NdJsonReadOptions, ParquetReadOptions}; use arrow::array::Int32Array; + use arrow_array::TimestampNanosecondArray; + use arrow_schema::TimeUnit; + use datafusion_common::config::TableParquetOptions; + use datafusion_common::instant::Instant; use datafusion_common::{assert_batches_eq, Constraint, Constraints, ScalarValue}; use datafusion_common_runtime::SpawnedTask; use datafusion_expr::expr::WindowFunction; @@ -1989,6 +1993,7 @@ mod tests { use datafusion_functions_window::nth_value::first_value_udwf; use datafusion_physical_expr::expressions::Column; use datafusion_physical_plan::{get_plan_string, ExecutionPlanProperties}; + use rand::Rng; use sqlparser::ast::NullTreatment; use tempfile::TempDir; @@ -4327,4 +4332,169 @@ mod tests { ); Ok(()) } + + #[tokio::test] + async fn write_data_gen() -> Result<()> { + // 定义数据模式 + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int64, false), + Field::new( + "timestamp", + DataType::Timestamp(TimeUnit::Nanosecond, None), + false, + ), + ])); + + let ctx = SessionContext::new_with_config( + SessionConfig::default() + .set_bool("datafusion.execution.collect_statistics", true), + ); + + println!("ctx catalog: {:?}", ctx.catalog_names()); + + // 创建随机数据 + let num_rows = 5_000_000; // 1500万行 + let mut rng = rand::thread_rng(); + let ids: Vec = (0..num_rows).collect(); + let timestamps: Vec = (0..num_rows) + .map(|_| rng.gen_range(1_700_000_000_000..1_800_000_000_000)) + .collect(); + + let id_array = Arc::new(Int64Array::from(ids)); + let timestamp_array = Arc::new(TimestampNanosecondArray::from(timestamps)); + + let batch = + RecordBatch::try_new(schema.clone(), vec![id_array, timestamp_array])?; + // + // 写入 Parquet 文件 + let file = std::path::Path::new("/tmp/testSorted.parquet"); + let write_df = ctx.read_batch(batch)?; + + write_df + .write_parquet( + file.to_str().unwrap(), + DataFrameWriteOptions::new().with_sort_by(vec![ + col("timestamp").sort(true, false), + col("id").sort(false, false), + ]), + Some(TableParquetOptions::new()), + ) + .await?; + + println!("Parquet 文件已写入完成"); + + let location = std::path::Path::new("/tmp/testSorted.parquet"); + let sql_str = + "create external table sortData(id INT, timestamp TIMESTAMP) stored as parquet location'" + .to_owned() + + location.to_str().unwrap() + + "'"; + + ctx.sql(sql_str.as_str()).await?.collect().await?; + + println!("创建排序表成功"); + // // 第一次查询: + // let start = Instant::now(); + // let df = ctx.sql("SELECT * FROM sortData").await?; + // let results = df.collect().await?; + // println!("第一次查询排序表耗时: {:?}", start.elapsed()); + + let start = Instant::now(); + let df = ctx + .read_parquet( + location.to_str().unwrap().to_string(), + ParquetReadOptions::default() + .file_sort_order(vec![vec![col("timestamp").sort(true, false)]]), + ) + .await?; + + let df_1 = df + .sort(vec![col("timestamp").sort(true, false)])? + .limit(0, Some(5)); + df_1?.show().await?; + println!("第一次查询排序 加排序表耗时: {:?}", start.elapsed()); + + // 第一次查询:ORDER BY + let start = Instant::now(); + let df = ctx + .sql("SELECT * FROM sortData order by timestamp limit 5") + .await?; + + let formatted = df.logical_plan().display_indent_schema().to_string(); + + println!("logic result {}", formatted); + let _ = df.collect().await?; + + println!( + "explain: {:?}", + ctx.sql("explain SELECT * FROM sortData order by timestamp limit 5") + .await? + .collect() + .await? + ); + + println!("第一次查询排序 加排序表耗时: {:?}", start.elapsed()); + + Ok(()) + } + + // #[tokio::test] + // async fn write_data_query() -> Result<()> { + // let ctx = SessionContext::new_with_config(SessionConfig::default(). + // set_bool("datafusion.execution.collect_statistics", true)); + // let location = std::path::Path::new("/tmp/test.parquet"); + // + // let sql_str = + // "create external table data(id INT, timestamp TIMESTAMP) stored as parquet location '" + // .to_owned() + // + location.to_str().unwrap() + // + "'"; + // + // ctx.sql(sql_str.as_str()).await?.collect().await?; + // + // println!("创建表成功"); + // + // // 第一次查询:ORDER BY + // let start = Instant::now(); + // let df = ctx.sql("SELECT * FROM data limit 5").await?; + // let results = df.collect().await?; + // println!("第一次查询耗时: {:?}", start.elapsed()); + // + // // // 第二次查询:ORDER BY + // // let start = Instant::now(); + // // let df = ctx.sql("SELECT * FROM data").await?; + // // let results = df.collect().await?; + // // println!("第二次查询耗时: {:?}", start.elapsed()); + // + // + // + // let location = std::path::Path::new("/tmp/testSorted.parquet"); + // let sql_str = + // "create external table sortData(id INT, timestamp TIMESTAMP) stored as parquet location'" + // .to_owned() + // + location.to_str().unwrap() + // + "'"; + // + // ctx.sql(sql_str.as_str()).await?.collect().await?; + // + // println!("创建排序表成功"); + // // // 第一次查询: + // // let start = Instant::now(); + // // let df = ctx.sql("SELECT * FROM sortData").await?; + // // let results = df.collect().await?; + // // println!("第一次查询排序表耗时: {:?}", start.elapsed()); + // + // + // // 第一次查询:ORDER BY + // let start = Instant::now(); + // let df = ctx.sql("SELECT * FROM sortData order by timestamp limit 5").await?; + // + // let formatted = df.logical_plan().display_indent_schema().to_string(); + // + // println!("logic result {}", formatted); + // let results = df.collect().await?; + // + // println!("第一次查询排序 加排序表耗时: {:?}", start.elapsed()); + // Ok(()) + // } } diff --git a/datafusion/core/src/dataframe/parquet.rs b/datafusion/core/src/dataframe/parquet.rs index 1dd4d68fca6b3..89745ac0b8cbd 100644 --- a/datafusion/core/src/dataframe/parquet.rs +++ b/datafusion/core/src/dataframe/parquet.rs @@ -66,10 +66,26 @@ impl DataFrame { ); } - let format = if let Some(parquet_opts) = writer_options { - Arc::new(ParquetFormatFactory::new_with_options(parquet_opts)) - } else { - Arc::new(ParquetFormatFactory::new()) + let format = match writer_options { + Some(mut parquet_opts) => { + if !options.sort_by.clone().is_empty() { + parquet_opts.key_value_metadata.insert( + "sort_by".to_string(), + Some( + options + .sort_by + .iter() + .map(|sort| sort.to_string()) + .collect::>() + .join(", "), + ), + ); + } + println!("parquet_opts: {:?}", parquet_opts); + + Arc::new(ParquetFormatFactory::new_with_options(parquet_opts)) + } + None => Arc::new(ParquetFormatFactory::new()), }; let file_type = format_as_file_type(format); diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 383fd65752349..d89d3696078f0 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -573,6 +573,11 @@ pub fn statistics_from_parquet_meta_calc( file_metadata.schema_descr(), file_metadata.key_value_metadata(), )?; + + statistics.metadata = file_schema.clone().metadata; + + println!("file_schema: {:?}", file_schema); + if let Some(merged) = coerce_file_schema_to_string_type(&table_schema, &file_schema) { file_schema = merged; } diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 791b15704d097..69f6e40869017 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -35,7 +35,7 @@ use crate::execution::context::SessionState; use datafusion_catalog::TableProvider; use datafusion_common::{config_err, DataFusionError, Result}; use datafusion_expr::dml::InsertOp; -use datafusion_expr::{utils::conjunction, Expr, TableProviderFilterPushDown}; +use datafusion_expr::{col, utils::conjunction, Expr, TableProviderFilterPushDown}; use datafusion_expr::{SortExpr, TableType}; use datafusion_physical_plan::{empty::EmptyExec, ExecutionPlan, Statistics}; @@ -54,6 +54,7 @@ use datafusion_physical_expr::{ use async_trait::async_trait; use datafusion_catalog::Session; +use datafusion_expr::expr::Sort; use datafusion_physical_expr_common::sort_expr::LexRequirement; use futures::{future, stream, StreamExt, TryStreamExt}; use itertools::Itertools; @@ -860,7 +861,51 @@ impl TableProvider for ListingTable { return Ok(Arc::new(EmptyExec::new(projected_schema))); } - let output_ordering = self.try_create_output_ordering()?; + let output_ordering; + if let Some(sort_by_value) = statistics.metadata.get("sort_by") { + if !sort_by_value.is_empty() { + // Split the input into individual sort expressions separated by commas + let sort_expressions: Vec<&str> = + sort_by_value.split(',').map(str::trim).collect(); + + let mut sort_order = vec![]; + + for sort_expr in sort_expressions { + // Split each expression into components (e.g., "timestamp ASC NULLS LAST") + let tokens: Vec<&str> = sort_expr.split_whitespace().collect(); + if tokens.is_empty() { + continue; // Skip empty tokens + } + // Parse the expression, direction, and nulls ordering + let expr = tokens[0].to_string(); + let asc = tokens + .get(1) + .map_or(true, |&t| t.eq_ignore_ascii_case("ASC")); // Default to ASC + let nulls_first = tokens + .get(2) + .map_or(false, |&t| t.eq_ignore_ascii_case("NULLS FIRST")); // Default to NULLS LAST + + // Create a Sort object + let sort = Sort::new(col(expr), asc, nulls_first); + sort_order.push(sort); + } + + if sort_order.is_empty() { + output_ordering = self.try_create_output_ordering()?; + } else { + output_ordering = create_ordering(&self.table_schema, &[sort_order])?; + } + } else { + output_ordering = self.try_create_output_ordering()?; + } + } else { + output_ordering = self.try_create_output_ordering()?; + } + + println!("output_ordering: {:?}", output_ordering); + + println!("Statistics : {:?}", statistics); + match state .config_options() .execution diff --git a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs index a5f2bd1760b33..c65205ddfa990 100644 --- a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs +++ b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs @@ -247,6 +247,7 @@ impl FileScanConfig { // TODO correct byte size? total_byte_size: Precision::Absent, column_statistics: table_cols_stats, + metadata: HashMap::new(), }; let projected_schema = Arc::new(Schema::new_with_metadata( @@ -701,6 +702,7 @@ mod tests { }) .collect(), total_byte_size: Precision::Absent, + metadata: HashMap::new(), }, to_partition_cols(vec![( "date".to_owned(), @@ -1190,6 +1192,7 @@ mod tests { statistics: Some(Statistics { num_rows: Precision::Absent, total_byte_size: Precision::Absent, + metadata: HashMap::new(), column_statistics: file .statistics .into_iter() diff --git a/datafusion/core/src/datasource/statistics.rs b/datafusion/core/src/datasource/statistics.rs index 201bbfd5c0076..690816d54be0d 100644 --- a/datafusion/core/src/datasource/statistics.rs +++ b/datafusion/core/src/datasource/statistics.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use std::collections::HashMap; use std::mem; use std::sync::Arc; @@ -62,10 +63,16 @@ pub async fn get_statistics_with_limit( // Fusing the stream allows us to call next safely even once it is finished. let mut all_files = Box::pin(all_files.fuse()); + let mut metadata = HashMap::new(); + if let Some(first_file) = all_files.next().await { let (mut file, file_stats) = first_file?; file.statistics = Some(file_stats.as_ref().clone()); result_files.push(file); + // Now only need to handle first file metadata, because we assume + // the same order for all files. + // todo we also need to support metadata for different files with different order? + metadata = file_stats.metadata.clone(); // First file, we set them directly from the file statistics. num_rows = file_stats.num_rows; @@ -138,6 +145,7 @@ pub async fn get_statistics_with_limit( num_rows, total_byte_size, column_statistics: col_stats_set, + metadata, }; if all_files.next().await.is_some() { // If we still have files in the stream, it means that the limit kicked diff --git a/datafusion/core/src/physical_optimizer/join_selection.rs b/datafusion/core/src/physical_optimizer/join_selection.rs index d7a2f17401419..745c355067add 100644 --- a/datafusion/core/src/physical_optimizer/join_selection.rs +++ b/datafusion/core/src/physical_optimizer/join_selection.rs @@ -594,6 +594,7 @@ mod tests_statistical { physical_plan::{displayable, ColumnStatistics, Statistics}, test::StatisticsExec, }; + use std::collections::HashMap; use arrow::datatypes::{DataType, Field}; use arrow_schema::Schema; @@ -613,6 +614,7 @@ mod tests_statistical { num_rows: Precision::Absent, total_byte_size: Precision::Absent, column_statistics: vec![ColumnStatistics::new_unknown()], + metadata: HashMap::new(), } } @@ -632,6 +634,7 @@ mod tests_statistical { num_rows: Precision::Inexact(threshold_num_rows / 128), total_byte_size: Precision::Inexact(threshold_byte_size / 128), column_statistics: vec![ColumnStatistics::new_unknown()], + metadata: HashMap::new(), } } @@ -642,6 +645,7 @@ mod tests_statistical { num_rows: Precision::Inexact(threshold_num_rows * 2), total_byte_size: Precision::Inexact(threshold_byte_size * 2), column_statistics: vec![ColumnStatistics::new_unknown()], + metadata: HashMap::new(), } } @@ -652,6 +656,7 @@ mod tests_statistical { num_rows: Precision::Inexact(threshold_num_rows * 4), total_byte_size: Precision::Inexact(threshold_byte_size * 4), column_statistics: vec![ColumnStatistics::new_unknown()], + metadata: HashMap::new(), } } @@ -739,6 +744,7 @@ mod tests_statistical { Some(50_000), ), total_byte_size: Precision::Absent, + metadata: HashMap::new(), }, Schema::new(vec![Field::new("big_col", DataType::Int32, false)]), )); @@ -752,6 +758,7 @@ mod tests_statistical { Some(1000), ), total_byte_size: Precision::Absent, + metadata: HashMap::new(), }, Schema::new(vec![Field::new("medium_col", DataType::Int32, false)]), )); @@ -765,6 +772,7 @@ mod tests_statistical { Some(1000), ), total_byte_size: Precision::Absent, + metadata: HashMap::new(), }, Schema::new(vec![Field::new("small_col", DataType::Int32, false)]), )); diff --git a/datafusion/core/tests/custom_sources_cases/mod.rs b/datafusion/core/tests/custom_sources_cases/mod.rs index aafefac04e321..52481bd5d7d84 100644 --- a/datafusion/core/tests/custom_sources_cases/mod.rs +++ b/datafusion/core/tests/custom_sources_cases/mod.rs @@ -16,6 +16,7 @@ // under the License. use std::any::Any; +use std::collections::HashMap; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; @@ -179,6 +180,7 @@ impl ExecutionPlan for CustomExecutionPlan { Ok(Statistics { num_rows: Precision::Exact(batch.num_rows()), total_byte_size: Precision::Absent, + metadata: HashMap::new(), column_statistics: self .projection .clone() diff --git a/datafusion/core/tests/custom_sources_cases/statistics.rs b/datafusion/core/tests/custom_sources_cases/statistics.rs index 9d3bd594a9299..1f4ce41f5d5e1 100644 --- a/datafusion/core/tests/custom_sources_cases/statistics.rs +++ b/datafusion/core/tests/custom_sources_cases/statistics.rs @@ -17,8 +17,6 @@ //! This module contains end to end tests of statistics propagation -use std::{any::Any, sync::Arc}; - use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use datafusion::execution::context::TaskContext; use datafusion::{ @@ -36,6 +34,8 @@ use datafusion_catalog::Session; use datafusion_common::{project_schema, stats::Precision}; use datafusion_physical_expr::EquivalenceProperties; use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType}; +use std::collections::HashMap; +use std::{any::Any, sync::Arc}; use async_trait::async_trait; @@ -120,6 +120,7 @@ impl TableProvider for StatisticsValidation { column_statistics: proj_col_stats, // TODO stats: knowing the type of the new columns we can guess the output size total_byte_size: Precision::Absent, + metadata: HashMap::new(), }, projected_schema, ))) @@ -209,6 +210,7 @@ fn fully_defined() -> (Statistics, Schema) { null_count: Precision::Exact(5), }, ], + metadata: HashMap::new(), }, Schema::new(vec![ Field::new("c1", DataType::Int32, false), @@ -262,7 +264,8 @@ async fn sql_limit() -> Result<()> { Statistics { num_rows: Precision::Exact(5), column_statistics: col_stats, - total_byte_size: Precision::Absent + total_byte_size: Precision::Absent, + metadata: HashMap::new(), }, physical_plan.statistics()? ); diff --git a/datafusion/core/tests/sql/explain_analyze.rs b/datafusion/core/tests/sql/explain_analyze.rs index 5fb0b9852641b..e790977cfd722 100644 --- a/datafusion/core/tests/sql/explain_analyze.rs +++ b/datafusion/core/tests/sql/explain_analyze.rs @@ -830,6 +830,6 @@ async fn csv_explain_analyze_with_statistics() { // should contain scan statistics assert_contains!( &formatted, - ", statistics=[Rows=Absent, Bytes=Absent, [(Col[0]:)]]" + ", statistics=[Rows=Absent, Bytes=Absent, [(Col[0]:)], metadata: {}]" ); } diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index c04211d679caf..cb0dd0dab88eb 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -18,6 +18,7 @@ //! Aggregates functionalities use std::any::Any; +use std::collections::HashMap; use std::sync::Arc; use super::{DisplayAs, ExecutionPlanProperties, PlanProperties}; @@ -855,6 +856,7 @@ impl ExecutionPlan for AggregateExec { num_rows: Precision::Exact(1), column_statistics, total_byte_size: Precision::Absent, + metadata: HashMap::new(), }) } _ => { @@ -882,6 +884,7 @@ impl ExecutionPlan for AggregateExec { num_rows, column_statistics, total_byte_size: Precision::Absent, + metadata: HashMap::new(), }) } } diff --git a/datafusion/physical-plan/src/common.rs b/datafusion/physical-plan/src/common.rs index aefb90d1d1b71..596137df93dd3 100644 --- a/datafusion/physical-plan/src/common.rs +++ b/datafusion/physical-plan/src/common.rs @@ -17,6 +17,7 @@ //! Defines common code used in execution plans +use std::collections::HashMap; use std::fs; use std::fs::{metadata, File}; use std::path::{Path, PathBuf}; @@ -177,6 +178,7 @@ pub fn compute_record_batch_statistics( num_rows: Precision::Exact(nb_rows), total_byte_size: Precision::Exact(total_byte_size), column_statistics, + metadata: HashMap::new(), } } @@ -342,6 +344,7 @@ mod tests { null_count: Precision::Exact(0), }, ], + metadata: HashMap::new(), }; assert_eq!(actual, expected); @@ -373,6 +376,7 @@ mod tests { min_value: Precision::Absent, null_count: Precision::Exact(3), }], + metadata: HashMap::new(), }; assert_eq!(actual, expected); diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 8e7c14f0baed9..a26258b766af4 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -16,6 +16,7 @@ // under the License. use std::any::Any; +use std::collections::HashMap; use std::pin::Pin; use std::sync::Arc; use std::task::{ready, Context, Poll}; @@ -203,6 +204,7 @@ impl FilterExec { num_rows, total_byte_size, column_statistics, + metadata: HashMap::new(), }) } @@ -642,6 +644,7 @@ mod tests { max_value: Precision::Inexact(ScalarValue::Int32(Some(100))), ..Default::default() }], + metadata: HashMap::new(), }, schema.clone(), )); @@ -686,6 +689,7 @@ mod tests { ..Default::default() }], total_byte_size: Precision::Absent, + metadata: HashMap::new(), }, schema.clone(), )); @@ -743,6 +747,7 @@ mod tests { }, ], total_byte_size: Precision::Absent, + metadata: HashMap::new(), }, schema.clone(), )); @@ -847,6 +852,7 @@ mod tests { ..Default::default() }, ], + metadata: HashMap::new(), }, schema, )); @@ -960,6 +966,7 @@ mod tests { ..Default::default() }, ], + metadata: HashMap::new(), }, schema, )); @@ -1015,6 +1022,7 @@ mod tests { ..Default::default() }, ], + metadata: HashMap::new(), }, schema, )); @@ -1079,6 +1087,7 @@ mod tests { ..Default::default() }, ], + metadata: HashMap::new(), }, schema, )); @@ -1151,6 +1160,7 @@ mod tests { max_value: Precision::Inexact(ScalarValue::Int32(Some(10))), distinct_count: Precision::Absent, }], + metadata: HashMap::new(), }; assert_eq!(filter_statistics, expected_filter_statistics); @@ -1210,6 +1220,7 @@ mod tests { column_statistics: vec![ColumnStatistics { ..Default::default() }], + metadata: HashMap::new(), }, schema, )); diff --git a/datafusion/physical-plan/src/joins/cross_join.rs b/datafusion/physical-plan/src/joins/cross_join.rs index 69300fce77454..3eb2c0c0c57ec 100644 --- a/datafusion/physical-plan/src/joins/cross_join.rs +++ b/datafusion/physical-plan/src/joins/cross_join.rs @@ -32,8 +32,6 @@ use crate::{ SendableRecordBatchStream, Statistics, }; use arrow::compute::concat_batches; -use std::{any::Any, sync::Arc, task::Poll}; - use arrow::datatypes::{Fields, Schema, SchemaRef}; use arrow::record_batch::RecordBatch; use arrow_array::RecordBatchOptions; @@ -42,6 +40,8 @@ use datafusion_common::{internal_err, JoinType, Result, ScalarValue}; use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation}; use datafusion_execution::TaskContext; use datafusion_physical_expr::equivalence::join_equivalence_properties; +use std::collections::HashMap; +use std::{any::Any, sync::Arc, task::Poll}; use async_trait::async_trait; use futures::{ready, Stream, StreamExt, TryStreamExt}; @@ -378,6 +378,7 @@ fn stats_cartesian_product( num_rows, total_byte_size, column_statistics: cross_join_stats, + metadata: HashMap::new(), } } @@ -613,6 +614,7 @@ mod tests { null_count: Precision::Exact(3), }, ], + metadata: HashMap::new(), }; let right = Statistics { @@ -624,6 +626,7 @@ mod tests { min_value: Precision::Exact(ScalarValue::Int64(Some(0))), null_count: Precision::Exact(2), }], + metadata: HashMap::new(), }; let result = stats_cartesian_product(left, right); @@ -651,6 +654,7 @@ mod tests { null_count: Precision::Exact(2 * left_row_count), }, ], + metadata: HashMap::new(), }; assert_eq!(result, expected); @@ -677,6 +681,7 @@ mod tests { null_count: Precision::Exact(3), }, ], + metadata: HashMap::new(), }; let right = Statistics { @@ -688,6 +693,7 @@ mod tests { min_value: Precision::Exact(ScalarValue::Int64(Some(0))), null_count: Precision::Exact(2), }], + metadata: HashMap::new(), }; let result = stats_cartesian_product(left, right); @@ -715,6 +721,7 @@ mod tests { null_count: Precision::Exact(2 * left_row_count), }, ], + metadata: HashMap::new(), }; assert_eq!(result, expected); diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index 371949a32598c..48211b02a7495 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -17,7 +17,7 @@ //! Join related functionality used both on logical and physical plans -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use std::fmt::{self, Debug}; use std::future::Future; use std::iter::once; @@ -738,6 +738,7 @@ pub(crate) fn estimate_join_statistics( num_rows, total_byte_size: Precision::Absent, column_statistics, + metadata: HashMap::new(), }) } @@ -774,11 +775,13 @@ fn estimate_join_cardinality( num_rows: left_stats.num_rows, total_byte_size: Precision::Absent, column_statistics: left_col_stats, + metadata: HashMap::new(), }, Statistics { num_rows: right_stats.num_rows, total_byte_size: Precision::Absent, column_statistics: right_col_stats, + metadata: HashMap::new(), }, )?; @@ -1970,6 +1973,7 @@ mod tests { .unwrap_or(Absent), column_statistics: column_stats, total_byte_size: Absent, + metadata: HashMap::new(), } } @@ -2163,11 +2167,13 @@ mod tests { num_rows: Inexact(left_num_rows), total_byte_size: Absent, column_statistics: left_col_stats.clone(), + metadata: HashMap::new(), }, Statistics { num_rows: Inexact(right_num_rows), total_byte_size: Absent, column_statistics: right_col_stats.clone(), + metadata: HashMap::new(), }, ), expected_cardinality.clone() @@ -2218,11 +2224,13 @@ mod tests { num_rows: Inexact(400), total_byte_size: Absent, column_statistics: left_col_stats, + metadata: HashMap::new(), }, Statistics { num_rows: Inexact(400), total_byte_size: Absent, column_statistics: right_col_stats, + metadata: HashMap::new(), }, ), Some(Inexact((400 * 400) / 200)) @@ -2252,11 +2260,13 @@ mod tests { num_rows: Inexact(100), total_byte_size: Absent, column_statistics: left_col_stats, + metadata: HashMap::new(), }, Statistics { num_rows: Inexact(100), total_byte_size: Absent, column_statistics: right_col_stats, + metadata: HashMap::new(), }, ), Some(Inexact(100)) @@ -2510,11 +2520,13 @@ mod tests { num_rows: Inexact(outer_num_rows), total_byte_size: Absent, column_statistics: outer_col_stats, + metadata: HashMap::new(), }, Statistics { num_rows: Inexact(inner_num_rows), total_byte_size: Absent, column_statistics: inner_col_stats, + metadata: HashMap::new(), }, &join_on, ) @@ -2545,11 +2557,13 @@ mod tests { num_rows: Absent, total_byte_size: Absent, column_statistics: dummy_column_stats.clone(), + metadata: HashMap::new(), }, Statistics { num_rows: Exact(10), total_byte_size: Absent, column_statistics: dummy_column_stats.clone(), + metadata: HashMap::new(), }, &join_on, ); @@ -2564,11 +2578,13 @@ mod tests { num_rows: Inexact(500), total_byte_size: Absent, column_statistics: dummy_column_stats.clone(), + metadata: HashMap::new(), }, Statistics { num_rows: Absent, total_byte_size: Absent, column_statistics: dummy_column_stats.clone(), + metadata: HashMap::new(), }, &join_on, ).expect("Expected non-empty PartialJoinStatistics for SemiJoin with absent inner num_rows"); @@ -2581,11 +2597,13 @@ mod tests { num_rows: Absent, total_byte_size: Absent, column_statistics: dummy_column_stats.clone(), + metadata: HashMap::new(), }, Statistics { num_rows: Absent, total_byte_size: Absent, column_statistics: dummy_column_stats, + metadata: HashMap::new(), }, &join_on, ); diff --git a/datafusion/physical-plan/src/projection.rs b/datafusion/physical-plan/src/projection.rs index e37a6b0dfb854..7e5f6e3d62de1 100644 --- a/datafusion/physical-plan/src/projection.rs +++ b/datafusion/physical-plan/src/projection.rs @@ -403,6 +403,7 @@ mod tests { null_count: Precision::Absent, }, ], + metadata: HashMap::new(), } } @@ -441,6 +442,7 @@ mod tests { null_count: Precision::Exact(0), }, ], + metadata: HashMap::new(), }; assert_eq!(result, expected); @@ -475,6 +477,7 @@ mod tests { null_count: Precision::Exact(0), }, ], + metadata: HashMap::new(), }; assert_eq!(result, expected); diff --git a/datafusion/physical-plan/src/union.rs b/datafusion/physical-plan/src/union.rs index 6e768a3d87bcc..c4e514ec8bdea 100644 --- a/datafusion/physical-plan/src/union.rs +++ b/datafusion/physical-plan/src/union.rs @@ -601,6 +601,7 @@ mod tests { use crate::collect; use crate::memory::MemoryExec; use crate::test; + use std::collections::HashMap; use arrow_schema::{DataType, SortOptions}; use datafusion_common::ScalarValue; @@ -685,6 +686,7 @@ mod tests { null_count: Precision::Absent, }, ], + metadata: HashMap::new(), }; let right = Statistics { @@ -710,6 +712,7 @@ mod tests { null_count: Precision::Absent, }, ], + metadata: HashMap::new(), }; let result = stats_union(left, right); @@ -736,6 +739,7 @@ mod tests { null_count: Precision::Absent, }, ], + metadata: HashMap::new(), }; assert_eq!(result, expected); diff --git a/datafusion/physical-plan/src/values.rs b/datafusion/physical-plan/src/values.rs index 5089b1e626d48..fc4d0fb3ed145 100644 --- a/datafusion/physical-plan/src/values.rs +++ b/datafusion/physical-plan/src/values.rs @@ -214,6 +214,7 @@ mod tests { use super::*; use crate::expressions::lit; use crate::test::{self, make_partition}; + use std::collections::HashMap; use arrow_schema::{DataType, Field}; use datafusion_common::stats::{ColumnStatistics, Precision}; @@ -292,6 +293,7 @@ mod tests { max_value: Precision::Absent, min_value: Precision::Absent, },], + metadata: HashMap::new(), } ); diff --git a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs index 2ac86da92e50f..329d25f5a9f82 100644 --- a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs +++ b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs @@ -329,6 +329,7 @@ impl ExecutionPlan for BoundedWindowAggExec { num_rows: input_stat.num_rows, column_statistics, total_byte_size: Precision::Absent, + metadata: std::collections::HashMap::new(), }) } } diff --git a/datafusion/physical-plan/src/windows/window_agg_exec.rs b/datafusion/physical-plan/src/windows/window_agg_exec.rs index b132c32470729..ac00fb586614b 100644 --- a/datafusion/physical-plan/src/windows/window_agg_exec.rs +++ b/datafusion/physical-plan/src/windows/window_agg_exec.rs @@ -18,6 +18,7 @@ //! Stream and channel implementations for window function expressions. use std::any::Any; +use std::collections::HashMap; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; @@ -258,6 +259,7 @@ impl ExecutionPlan for WindowAggExec { num_rows: input_stat.num_rows, column_statistics, total_byte_size: Precision::Absent, + metadata: HashMap::new(), }) } } diff --git a/datafusion/proto-common/proto/datafusion_common.proto b/datafusion/proto-common/proto/datafusion_common.proto index 69626f97fd809..dce06b41bb05b 100644 --- a/datafusion/proto-common/proto/datafusion_common.proto +++ b/datafusion/proto-common/proto/datafusion_common.proto @@ -564,6 +564,7 @@ message Statistics { Precision num_rows = 1; Precision total_byte_size = 2; repeated ColumnStats column_stats = 3; + map metadata = 4; } message ColumnStats { diff --git a/datafusion/proto-common/src/from_proto/mod.rs b/datafusion/proto-common/src/from_proto/mod.rs index eb6976aa0c061..77976a854f7bf 100644 --- a/datafusion/proto-common/src/from_proto/mod.rs +++ b/datafusion/proto-common/src/from_proto/mod.rs @@ -803,6 +803,7 @@ impl TryFrom<&protobuf::Statistics> for Statistics { }, // No column statistic (None) is encoded with empty array column_statistics: s.column_stats.iter().map(|s| s.into()).collect(), + metadata: s.metadata.clone(), }) } } diff --git a/datafusion/proto-common/src/generated/pbjson.rs b/datafusion/proto-common/src/generated/pbjson.rs index e88c1497af081..4c6aa78496c5c 100644 --- a/datafusion/proto-common/src/generated/pbjson.rs +++ b/datafusion/proto-common/src/generated/pbjson.rs @@ -1,3 +1,5 @@ +use std::collections::HashMap; + impl serde::Serialize for ArrowOptions { #[allow(deprecated)] fn serialize(&self, serializer: S) -> std::result::Result @@ -7487,6 +7489,7 @@ impl<'de> serde::Deserialize<'de> for Statistics { NumRows, TotalByteSize, ColumnStats, + MetaData, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -7511,6 +7514,7 @@ impl<'de> serde::Deserialize<'de> for Statistics { "numRows" | "num_rows" => Ok(GeneratedField::NumRows), "totalByteSize" | "total_byte_size" => Ok(GeneratedField::TotalByteSize), "columnStats" | "column_stats" => Ok(GeneratedField::ColumnStats), + "metadata" => Ok(GeneratedField::MetaData), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -7533,6 +7537,7 @@ impl<'de> serde::Deserialize<'de> for Statistics { let mut num_rows__ = None; let mut total_byte_size__ = None; let mut column_stats__ = None; + let mut metadata__ = HashMap::new(); while let Some(k) = map_.next_key()? { match k { GeneratedField::NumRows => { @@ -7553,12 +7558,16 @@ impl<'de> serde::Deserialize<'de> for Statistics { } column_stats__ = Some(map_.next_value()?); } + GeneratedField::MetaData => { + metadata__ = map_.next_value()?; + } } } Ok(Statistics { num_rows: num_rows__, total_byte_size: total_byte_size__, column_stats: column_stats__.unwrap_or_default(), + metadata: metadata__, }) } } diff --git a/datafusion/proto-common/src/generated/prost.rs b/datafusion/proto-common/src/generated/prost.rs index 6b8509775847a..36c4ca7860852 100644 --- a/datafusion/proto-common/src/generated/prost.rs +++ b/datafusion/proto-common/src/generated/prost.rs @@ -863,6 +863,11 @@ pub struct Statistics { pub total_byte_size: ::core::option::Option, #[prost(message, repeated, tag = "3")] pub column_stats: ::prost::alloc::vec::Vec, + #[prost(map = "string, string", tag = "4")] + pub metadata: ::std::collections::HashMap< + ::prost::alloc::string::String, + ::prost::alloc::string::String, + >, } #[derive(Clone, PartialEq, ::prost::Message)] pub struct ColumnStats { diff --git a/datafusion/proto-common/src/to_proto/mod.rs b/datafusion/proto-common/src/to_proto/mod.rs index a7cea607cb6d0..8c11e785c2fb0 100644 --- a/datafusion/proto-common/src/to_proto/mod.rs +++ b/datafusion/proto-common/src/to_proto/mod.rs @@ -739,6 +739,7 @@ impl From<&Statistics> for protobuf::Statistics { num_rows: Some(protobuf::Precision::from(&s.num_rows)), total_byte_size: Some(protobuf::Precision::from(&s.total_byte_size)), column_stats, + metadata: s.metadata.clone(), } } } diff --git a/datafusion/proto/src/generated/datafusion_proto_common.rs b/datafusion/proto/src/generated/datafusion_proto_common.rs index 6b8509775847a..36c4ca7860852 100644 --- a/datafusion/proto/src/generated/datafusion_proto_common.rs +++ b/datafusion/proto/src/generated/datafusion_proto_common.rs @@ -863,6 +863,11 @@ pub struct Statistics { pub total_byte_size: ::core::option::Option, #[prost(message, repeated, tag = "3")] pub column_stats: ::prost::alloc::vec::Vec, + #[prost(map = "string, string", tag = "4")] + pub metadata: ::std::collections::HashMap< + ::prost::alloc::string::String, + ::prost::alloc::string::String, + >, } #[derive(Clone, PartialEq, ::prost::Message)] pub struct ColumnStats { diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index 0a6ea6c7ff853..c11b106bc4e1a 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -16,6 +16,7 @@ // under the License. use std::any::Any; +use std::collections::HashMap; use std::fmt::Display; use std::ops::Deref; use std::sync::Arc; @@ -718,6 +719,7 @@ fn roundtrip_parquet_exec_with_pruning_predicate() -> Result<()> { column_statistics: Statistics::unknown_column(&Arc::new(Schema::new(vec![ Field::new("col", DataType::Utf8, false), ]))), + metadata: HashMap::new(), }, projection: None, limit: None, @@ -782,6 +784,7 @@ fn roundtrip_parquet_exec_with_custom_predicate_expr() -> Result<()> { column_statistics: Statistics::unknown_column(&Arc::new(Schema::new(vec![ Field::new("col", DataType::Utf8, false), ]))), + metadata: HashMap::new(), }, projection: None, limit: None,