diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 7c247103f6e7..70867b55018d 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -606,6 +606,11 @@ config_namespace! { /// When set to true, the optimizer will not attempt to convert Union to Interleave pub prefer_existing_union: bool, default = false + + /// When set to true, if the returned type is a view type + /// then the output will be coerced to a non-view. + /// Coerces `Utf8View` to `LargeUtf8`, and `BinaryView` to `LargeBinary`. + pub expand_views_at_output: bool, default = false } } diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index d0c1c3b2b3d6..ea4ea2c55948 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -1287,7 +1287,7 @@ impl LogicalPlan { /// referenced expressions into columns. /// /// See also: [`crate::utils::columnize_expr`] - pub(crate) fn columnized_output_exprs(&self) -> Result> { + pub fn columnized_output_exprs(&self) -> Result> { match self { LogicalPlan::Aggregate(aggregate) => Ok(aggregate .output_expressions()? diff --git a/datafusion/optimizer/src/analyzer/type_coercion.rs b/datafusion/optimizer/src/analyzer/type_coercion.rs index 284650c3d64e..722ee604e7d0 100644 --- a/datafusion/optimizer/src/analyzer/type_coercion.rs +++ b/datafusion/optimizer/src/analyzer/type_coercion.rs @@ -21,7 +21,7 @@ use std::sync::Arc; use itertools::izip; -use arrow::datatypes::{DataType, Field, IntervalUnit}; +use arrow::datatypes::{DataType, Field, IntervalUnit, Schema}; use crate::analyzer::AnalyzerRule; use crate::utils::NamePreserver; @@ -29,7 +29,7 @@ use datafusion_common::config::ConfigOptions; use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRewriter}; use datafusion_common::{ exec_err, internal_err, not_impl_err, plan_datafusion_err, plan_err, Column, - DFSchema, DFSchemaRef, DataFusionError, Result, ScalarValue, + DFSchema, DFSchemaRef, DataFusionError, Result, ScalarValue, TableReference, }; use datafusion_expr::expr::{ self, Alias, Between, BinaryExpr, Case, Exists, InList, InSubquery, Like, @@ -66,19 +66,39 @@ impl TypeCoercion { } } +/// Coerce output schema based upon optimizer config. +fn coerce_output(plan: LogicalPlan, config: &ConfigOptions) -> Result { + if !config.optimizer.expand_views_at_output { + return Ok(plan); + } + + let outer_refs = plan.expressions(); + if outer_refs.is_empty() { + return Ok(plan); + } + + if let Some(dfschema) = transform_schema_to_nonview(plan.schema()) { + coerce_plan_expr_for_schema(plan, &dfschema?) + } else { + Ok(plan) + } +} + impl AnalyzerRule for TypeCoercion { fn name(&self) -> &str { "type_coercion" } - fn analyze(&self, plan: LogicalPlan, _: &ConfigOptions) -> Result { + fn analyze(&self, plan: LogicalPlan, config: &ConfigOptions) -> Result { let empty_schema = DFSchema::empty(); + // recurse let transformed_plan = plan .transform_up_with_subqueries(|plan| analyze_internal(&empty_schema, plan))? .data; - Ok(transformed_plan) + // finish + coerce_output(transformed_plan, config) } } @@ -514,6 +534,55 @@ impl<'a> TreeNodeRewriter for TypeCoercionRewriter<'a> { } } +/// Transform a schema to use non-view types for Utf8View and BinaryView +fn transform_schema_to_nonview(dfschema: &DFSchemaRef) -> Option> { + let metadata = dfschema.as_arrow().metadata.clone(); + let mut transformed = false; + + let (qualifiers, transformed_fields): (Vec>, Vec>) = + dfschema + .iter() + .map(|(qualifier, field)| match field.data_type() { + DataType::Utf8View => { + transformed = true; + ( + qualifier.cloned() as Option, + Arc::new(Field::new( + field.name(), + DataType::LargeUtf8, + field.is_nullable(), + )), + ) + } + DataType::BinaryView => { + transformed = true; + ( + qualifier.cloned() as Option, + Arc::new(Field::new( + field.name(), + DataType::LargeBinary, + field.is_nullable(), + )), + ) + } + _ => ( + qualifier.cloned() as Option, + Arc::clone(field), + ), + }) + .unzip(); + + if !transformed { + return None; + } + + let schema = Schema::new_with_metadata(transformed_fields, metadata); + Some(DFSchema::from_field_specific_qualified_schema( + qualifiers, + &Arc::new(schema), + )) +} + /// Casts the given `value` to `target_type`. Note that this function /// only considers `Null` or `Utf8` values. fn coerce_scalar(target_type: &DataType, value: &ScalarValue) -> Result { @@ -935,10 +1004,11 @@ mod test { use arrow::datatypes::DataType::Utf8; use arrow::datatypes::{DataType, Field, TimeUnit}; + use datafusion_common::config::ConfigOptions; use datafusion_common::tree_node::{TransformedResult, TreeNode}; use datafusion_common::{DFSchema, DFSchemaRef, Result, ScalarValue}; use datafusion_expr::expr::{self, InSubquery, Like, ScalarFunction}; - use datafusion_expr::logical_plan::{EmptyRelation, Projection}; + use datafusion_expr::logical_plan::{EmptyRelation, Projection, Sort}; use datafusion_expr::test::function_stub::avg_udaf; use datafusion_expr::{ cast, col, create_udaf, is_true, lit, AccumulatorFactoryFunction, AggregateUDF, @@ -951,7 +1021,7 @@ mod test { use crate::analyzer::type_coercion::{ coerce_case_expression, TypeCoercion, TypeCoercionRewriter, }; - use crate::test::assert_analyzed_plan_eq; + use crate::test::{assert_analyzed_plan_eq, assert_analyzed_plan_with_config_eq}; fn empty() -> Arc { Arc::new(LogicalPlan::EmptyRelation(EmptyRelation { @@ -982,6 +1052,155 @@ mod test { assert_analyzed_plan_eq(Arc::new(TypeCoercion::new()), plan, expected) } + fn coerce_on_output_if_viewtype(plan: LogicalPlan, expected: &str) -> Result<()> { + let mut options = ConfigOptions::default(); + options.optimizer.expand_views_at_output = true; + + assert_analyzed_plan_with_config_eq( + options, + Arc::new(TypeCoercion::new()), + plan.clone(), + expected, + ) + } + + fn do_not_coerce_on_output(plan: LogicalPlan, expected: &str) -> Result<()> { + assert_analyzed_plan_with_config_eq( + ConfigOptions::default(), + Arc::new(TypeCoercion::new()), + plan.clone(), + expected, + ) + } + + #[test] + fn coerce_utf8view_output() -> Result<()> { + // Plan A + // scenario: outermost utf8view projection + let expr = col("a"); + let empty = empty_with_type(DataType::Utf8View); + let plan = LogicalPlan::Projection(Projection::try_new( + vec![expr.clone()], + Arc::clone(&empty), + )?); + // Plan A: no coerce + let if_not_coerced = "Projection: a\n EmptyRelation"; + do_not_coerce_on_output(plan.clone(), if_not_coerced)?; + // Plan A: coerce requested: Utf8View => LargeUtf8 + let if_coerced = "Projection: CAST(a AS LargeUtf8)\n EmptyRelation"; + coerce_on_output_if_viewtype(plan.clone(), if_coerced)?; + + // Plan B + // scenario: outermost bool projection + let bool_expr = col("a").lt(lit("foo")); + let bool_plan = LogicalPlan::Projection(Projection::try_new( + vec![bool_expr], + Arc::clone(&empty), + )?); + // Plan B: no coerce + let if_not_coerced = + "Projection: a < CAST(Utf8(\"foo\") AS Utf8View)\n EmptyRelation"; + do_not_coerce_on_output(bool_plan.clone(), if_not_coerced)?; + // Plan B: coerce requested: no coercion applied + let if_coerced = if_not_coerced; + coerce_on_output_if_viewtype(bool_plan, if_coerced)?; + + // Plan C + // scenario: with a non-projection root logical plan node + let sort_expr = expr.sort(true, true); + let sort_plan = LogicalPlan::Sort(Sort { + expr: vec![sort_expr], + input: Arc::new(plan), + fetch: None, + }); + // Plan C: no coerce + let if_not_coerced = + "Sort: a ASC NULLS FIRST\n Projection: a\n EmptyRelation"; + do_not_coerce_on_output(sort_plan.clone(), if_not_coerced)?; + // Plan C: coerce requested: Utf8View => LargeUtf8 + let if_coerced = "Projection: CAST(a AS LargeUtf8)\n Sort: a ASC NULLS FIRST\n Projection: a\n EmptyRelation"; + coerce_on_output_if_viewtype(sort_plan.clone(), if_coerced)?; + + // Plan D + // scenario: two layers of projections with view types + let plan = LogicalPlan::Projection(Projection::try_new( + vec![col("a")], + Arc::new(sort_plan), + )?); + // Plan D: no coerce + let if_not_coerced = "Projection: a\n Sort: a ASC NULLS FIRST\n Projection: a\n EmptyRelation"; + do_not_coerce_on_output(plan.clone(), if_not_coerced)?; + // Plan B: coerce requested: Utf8View => LargeUtf8 only on outermost + let if_coerced = "Projection: CAST(a AS LargeUtf8)\n Sort: a ASC NULLS FIRST\n Projection: a\n EmptyRelation"; + coerce_on_output_if_viewtype(plan.clone(), if_coerced)?; + + Ok(()) + } + + #[test] + fn coerce_binaryview_output() -> Result<()> { + // Plan A + // scenario: outermost binaryview projection + let expr = col("a"); + let empty = empty_with_type(DataType::BinaryView); + let plan = LogicalPlan::Projection(Projection::try_new( + vec![expr.clone()], + Arc::clone(&empty), + )?); + // Plan A: no coerce + let if_not_coerced = "Projection: a\n EmptyRelation"; + do_not_coerce_on_output(plan.clone(), if_not_coerced)?; + // Plan A: coerce requested: BinaryView => LargeBinary + let if_coerced = "Projection: CAST(a AS LargeBinary)\n EmptyRelation"; + coerce_on_output_if_viewtype(plan.clone(), if_coerced)?; + + // Plan B + // scenario: outermost bool projection + let bool_expr = col("a").lt(lit(vec![8, 1, 8, 1])); + let bool_plan = LogicalPlan::Projection(Projection::try_new( + vec![bool_expr], + Arc::clone(&empty), + )?); + // Plan B: no coerce + let if_not_coerced = + "Projection: a < CAST(Binary(\"8,1,8,1\") AS BinaryView)\n EmptyRelation"; + do_not_coerce_on_output(bool_plan.clone(), if_not_coerced)?; + // Plan B: coerce requested: no coercion applied + let if_coerced = if_not_coerced; + coerce_on_output_if_viewtype(bool_plan, if_coerced)?; + + // Plan C + // scenario: with a non-projection root logical plan node + let sort_expr = expr.sort(true, true); + let sort_plan = LogicalPlan::Sort(Sort { + expr: vec![sort_expr], + input: Arc::new(plan), + fetch: None, + }); + // Plan C: no coerce + let if_not_coerced = + "Sort: a ASC NULLS FIRST\n Projection: a\n EmptyRelation"; + do_not_coerce_on_output(sort_plan.clone(), if_not_coerced)?; + // Plan C: coerce requested: BinaryView => LargeBinary + let if_coerced = "Projection: CAST(a AS LargeBinary)\n Sort: a ASC NULLS FIRST\n Projection: a\n EmptyRelation"; + coerce_on_output_if_viewtype(sort_plan.clone(), if_coerced)?; + + // Plan D + // scenario: two layers of projections with view types + let plan = LogicalPlan::Projection(Projection::try_new( + vec![col("a")], + Arc::new(sort_plan), + )?); + // Plan D: no coerce + let if_not_coerced = "Projection: a\n Sort: a ASC NULLS FIRST\n Projection: a\n EmptyRelation"; + do_not_coerce_on_output(plan.clone(), if_not_coerced)?; + // Plan B: coerce requested: BinaryView => LargeBinary only on outermost + let if_coerced = "Projection: CAST(a AS LargeBinary)\n Sort: a ASC NULLS FIRST\n Projection: a\n EmptyRelation"; + coerce_on_output_if_viewtype(plan.clone(), if_coerced)?; + + Ok(()) + } + #[test] fn nested_case() -> Result<()> { let expr = col("a").lt(lit(2_u32)); diff --git a/datafusion/optimizer/src/test/mod.rs b/datafusion/optimizer/src/test/mod.rs index 1266b548ab05..cabeafd8e7de 100644 --- a/datafusion/optimizer/src/test/mod.rs +++ b/datafusion/optimizer/src/test/mod.rs @@ -114,6 +114,17 @@ pub fn assert_analyzed_plan_eq( expected: &str, ) -> Result<()> { let options = ConfigOptions::default(); + assert_analyzed_plan_with_config_eq(options, rule, plan, expected)?; + + Ok(()) +} + +pub fn assert_analyzed_plan_with_config_eq( + options: ConfigOptions, + rule: Arc, + plan: LogicalPlan, + expected: &str, +) -> Result<()> { let analyzed_plan = Analyzer::with_rules(vec![rule]).execute_and_check(plan, &options, |_, _| {})?; let formatted_plan = format!("{analyzed_plan}"); diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index f797a7a6539d..beefa24ba4c6 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -225,6 +225,7 @@ datafusion.optimizer.default_filter_selectivity 20 datafusion.optimizer.enable_distinct_aggregation_soft_limit true datafusion.optimizer.enable_round_robin_repartition true datafusion.optimizer.enable_topk_aggregation true +datafusion.optimizer.expand_views_at_output false datafusion.optimizer.filter_null_join_keys false datafusion.optimizer.hash_join_single_partition_threshold 1048576 datafusion.optimizer.hash_join_single_partition_threshold_rows 131072 @@ -314,6 +315,7 @@ datafusion.optimizer.default_filter_selectivity 20 The default filter selectivit datafusion.optimizer.enable_distinct_aggregation_soft_limit true When set to true, the optimizer will push a limit operation into grouped aggregations which have no aggregate expressions, as a soft limit, emitting groups once the limit is reached, before all rows in the group are read. datafusion.optimizer.enable_round_robin_repartition true When set to true, the physical plan optimizer will try to add round robin repartitioning to increase parallelism to leverage more CPU cores datafusion.optimizer.enable_topk_aggregation true When set to true, the optimizer will attempt to perform limit operations during aggregations, if possible +datafusion.optimizer.expand_views_at_output false When set to true, if the returned type is a view type then the output will be coerced to a non-view. Coerces `Utf8View` to `LargeUtf8`, and `BinaryView` to `LargeBinary`. datafusion.optimizer.filter_null_join_keys false When set to true, the optimizer will insert filters before a join between a nullable and non-nullable column to filter out nulls on the nullable side. This filter can add additional overhead when the file format does not fully support predicate push down. datafusion.optimizer.hash_join_single_partition_threshold 1048576 The maximum estimated size in bytes for one input side of a HashJoin will be collected into a single partition datafusion.optimizer.hash_join_single_partition_threshold_rows 131072 The maximum estimated size in rows for one input side of a HashJoin will be collected into a single partition diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 8514fb1fbd93..f7e25bd55850 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -111,6 +111,7 @@ Environment variables are read during `SessionConfig` initialisation so they mus | datafusion.optimizer.hash_join_single_partition_threshold_rows | 131072 | The maximum estimated size in rows for one input side of a HashJoin will be collected into a single partition | | datafusion.optimizer.default_filter_selectivity | 20 | The default filter selectivity used by Filter Statistics when an exact selectivity cannot be determined. Valid values are between 0 (no selectivity) and 100 (all rows are selected). | | datafusion.optimizer.prefer_existing_union | false | When set to true, the optimizer will not attempt to convert Union to Interleave | +| datafusion.optimizer.expand_views_at_output | false | When set to true, if the returned type is a view type then the output will be coerced to a non-view. Coerces `Utf8View` to `LargeUtf8`, and `BinaryView` to `LargeBinary`. | | datafusion.explain.logical_plan_only | false | When set to true, the explain statement will only print logical plans | | datafusion.explain.physical_plan_only | false | When set to true, the explain statement will only print physical plans | | datafusion.explain.show_statistics | false | When set to true, the explain statement will print operator statistics for physical plans |