Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Return err if wildcard is not expanded before type coercion #14130

Merged
merged 5 commits into from
Jan 15, 2025
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 15 additions & 10 deletions datafusion/optimizer/src/analyzer/type_coercion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -995,16 +995,19 @@ fn project_with_column_index(
.enumerate()
.map(|(i, e)| match e {
Expr::Alias(Alias { ref name, .. }) if name != schema.field(i).name() => {
e.unalias().alias(schema.field(i).name())
Ok(e.unalias().alias(schema.field(i).name()))
}
Expr::Column(Column {
relation: _,
ref name,
}) if name != schema.field(i).name() => e.alias(schema.field(i).name()),
Expr::Alias { .. } | Expr::Column { .. } => e,
_ => e.alias(schema.field(i).name()),
}) if name != schema.field(i).name() => Ok(e.alias(schema.field(i).name())),
Expr::Alias { .. } | Expr::Column { .. } => Ok(e),
Expr::Wildcard { .. } => {
plan_err!("Wildcard should be expanded before type coercion")
}
_ => Ok(e.alias(schema.field(i).name())),
})
.collect::<Vec<_>>();
.collect::<Result<Vec<_>>>()?;

Projection::try_new_with_schema(alias_expr, input, schema)
.map(LogicalPlan::Projection)
Expand All @@ -1018,11 +1021,17 @@ mod test {
use arrow::datatypes::DataType::Utf8;
use arrow::datatypes::{DataType, Field, TimeUnit};

use crate::analyzer::type_coercion::{
coerce_case_expression, TypeCoercion, TypeCoercionRewriter,
};
use crate::test::{assert_analyzed_plan_eq, assert_analyzed_plan_with_config_eq};
use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::{TransformedResult, TreeNode};
use datafusion_common::{DFSchema, DFSchemaRef, Result, ScalarValue};
use datafusion_expr::expr::{self, InSubquery, Like, ScalarFunction};
use datafusion_expr::logical_plan::{EmptyRelation, Projection, Sort};
use datafusion_expr::sqlparser::dialect::PostgreSqlDialect;
use datafusion_expr::sqlparser::parser::Parser;
use datafusion_expr::test::function_stub::avg_udaf;
use datafusion_expr::{
cast, col, create_udaf, is_true, lit, AccumulatorFactoryFunction, AggregateUDF,
Expand All @@ -1031,11 +1040,7 @@ mod test {
Volatility,
};
use datafusion_functions_aggregate::average::AvgAccumulator;

use crate::analyzer::type_coercion::{
coerce_case_expression, TypeCoercion, TypeCoercionRewriter,
};
use crate::test::{assert_analyzed_plan_eq, assert_analyzed_plan_with_config_eq};
use datafusion_sql::planner::SqlToRel;

fn empty() -> Arc<LogicalPlan> {
Arc::new(LogicalPlan::EmptyRelation(EmptyRelation {
Expand Down
28 changes: 27 additions & 1 deletion datafusion/optimizer/tests/optimizer_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@ use std::sync::Arc;
use arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit};

use datafusion_common::config::ConfigOptions;
use datafusion_common::{plan_err, Result};
use datafusion_common::{assert_contains, plan_err, Result};
use datafusion_expr::sqlparser::dialect::PostgreSqlDialect;
use datafusion_expr::test::function_stub::sum_udaf;
use datafusion_expr::{AggregateUDF, LogicalPlan, ScalarUDF, TableSource, WindowUDF};
use datafusion_functions_aggregate::average::avg_udaf;
use datafusion_functions_aggregate::count::count_udaf;
use datafusion_optimizer::analyzer::type_coercion::TypeCoercionRewriter;
use datafusion_optimizer::analyzer::Analyzer;
use datafusion_optimizer::optimizer::Optimizer;
use datafusion_optimizer::{OptimizerConfig, OptimizerContext, OptimizerRule};
Expand Down Expand Up @@ -387,6 +389,30 @@ fn select_correlated_predicate_subquery_with_uppercase_ident() {
assert_eq!(expected, format!("{plan}"));
}

// The test should return an error
// because the wildcard didn't be expanded before type coercion
#[test]
fn test_union_coercion_with_wildcard() -> Result<()> {
let dialect = PostgreSqlDialect {};
let context_provider = MyContextProvider::default();
let sql = "select * from (SELECT col_int32, col_uint32 FROM test) union all select * from(SELECT col_uint32, col_int32 FROM test)";
let statements = Parser::parse_sql(&dialect, sql)?;
let sql_to_rel = SqlToRel::new(&context_provider);
let logical_plan = sql_to_rel.sql_statement_to_plan(statements[0].clone())?;

if let LogicalPlan::Union(union) = logical_plan {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to ensure that logical_plan is definitely a union. This could happen if sql_statement_to_plan changes in the future, and then this test would become invalid. We can do it like this:

    let LogicalPlan::Union(union) = logical_plan else {
        panic!("Expected Union plan");
    };
    let err = TypeCoercionRewriter::coerce_union(union)
        .err()
        .unwrap()
        .to_string();
    assert_contains!(
        err,
        "Error during planning: Wildcard should be expanded before type coercion"
    );

let err = TypeCoercionRewriter::coerce_union(union)
.err()
.unwrap()
.to_string();
assert_contains!(
err,
"Error during planning: Wildcard should be expanded before type coercion"
);
}
Ok(())
}

fn test_sql(sql: &str) -> Result<LogicalPlan> {
// parse the SQL
let dialect = GenericDialect {}; // or AnsiDialect, or your own dialect ...
Expand Down
Loading