diff --git a/datafusion/core/src/datasource/listing/helpers.rs b/datafusion/core/src/datasource/listing/helpers.rs index a03bcec7abec..077356b716b0 100644 --- a/datafusion/core/src/datasource/listing/helpers.rs +++ b/datafusion/core/src/datasource/listing/helpers.rs @@ -126,6 +126,7 @@ pub fn expr_applicable_for_cols(col_names: &[String], expr: &Expr) -> bool { | Expr::Sort { .. } | Expr::WindowFunction { .. } | Expr::Wildcard { .. } + | Expr::Unnest { .. } | Expr::Placeholder(_) => { is_applicable = false; Ok(VisitRecursion::Stop) diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 6aa0c93fad0f..591a3e3131c8 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -124,6 +124,11 @@ fn physical_name(e: &Expr) -> Result { fn create_physical_name(e: &Expr, is_first_expr: bool) -> Result { match e { + Expr::Unnest(_) => { + internal_err!( + "Expr::Unnest should have been converted to LogicalPlan::Unnest" + ) + } Expr::Column(c) => { if is_first_expr { Ok(c.name.clone()) diff --git a/datafusion/expr/src/expr.rs b/datafusion/expr/src/expr.rs index 0000f3df033a..09de4b708de9 100644 --- a/datafusion/expr/src/expr.rs +++ b/datafusion/expr/src/expr.rs @@ -180,6 +180,13 @@ pub enum Expr { /// A place holder which hold a reference to a qualified field /// in the outer query, used for correlated sub queries. OuterReferenceColumn(DataType, Column), + /// Unnest expression + Unnest(Unnest), +} + +#[derive(Clone, PartialEq, Eq, Hash, Debug)] +pub struct Unnest { + pub exprs: Vec, } /// Alias expression @@ -917,6 +924,7 @@ impl Expr { Expr::TryCast { .. } => "TryCast", Expr::WindowFunction { .. } => "WindowFunction", Expr::Wildcard { .. } => "Wildcard", + Expr::Unnest { .. } => "Unnest", } } @@ -1307,6 +1315,7 @@ impl Expr { | Expr::Negative(..) | Expr::OuterReferenceColumn(_, _) | Expr::TryCast(..) + | Expr::Unnest(..) | Expr::Wildcard { .. } | Expr::WindowFunction(..) | Expr::Literal(..) @@ -1561,6 +1570,9 @@ impl fmt::Display for Expr { } }, Expr::Placeholder(Placeholder { id, .. }) => write!(f, "{id}"), + Expr::Unnest(Unnest { exprs }) => { + write!(f, "UNNEST({exprs:?})") + } } } } @@ -1748,6 +1760,7 @@ fn create_name(e: &Expr) -> Result { } } } + Expr::Unnest(Unnest { exprs }) => Ok(format!("UNNEST({exprs:?})")), Expr::ScalarFunction(fun) => create_function_name(fun.name(), false, &fun.args), Expr::WindowFunction(WindowFunction { fun, diff --git a/datafusion/expr/src/expr_rewriter/mod.rs b/datafusion/expr/src/expr_rewriter/mod.rs index 1f04c80833f0..3f7388c3c3d5 100644 --- a/datafusion/expr/src/expr_rewriter/mod.rs +++ b/datafusion/expr/src/expr_rewriter/mod.rs @@ -17,7 +17,7 @@ //! Expression rewriter -use crate::expr::Alias; +use crate::expr::{Alias, Unnest}; use crate::logical_plan::Projection; use crate::{Expr, ExprSchemable, LogicalPlan, LogicalPlanBuilder}; use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRewriter}; @@ -75,6 +75,16 @@ pub fn normalize_col_with_schemas_and_ambiguity_check( schemas: &[&[&DFSchema]], using_columns: &[HashSet], ) -> Result { + // Normalize column inside Unnest + if let Expr::Unnest(Unnest { exprs }) = expr { + let e = normalize_col_with_schemas_and_ambiguity_check( + exprs[0].clone(), + schemas, + using_columns, + )?; + return Ok(Expr::Unnest(Unnest { exprs: vec![e] })); + } + expr.transform(&|expr| { Ok({ if let Expr::Column(c) = expr { diff --git a/datafusion/expr/src/expr_schema.rs b/datafusion/expr/src/expr_schema.rs index 865279ab9aaf..d30f304a26b1 100644 --- a/datafusion/expr/src/expr_schema.rs +++ b/datafusion/expr/src/expr_schema.rs @@ -19,7 +19,7 @@ use super::{Between, Expr, Like}; use crate::expr::{ AggregateFunction, AggregateFunctionDefinition, Alias, BinaryExpr, Cast, GetFieldAccess, GetIndexedField, InList, InSubquery, Placeholder, ScalarFunction, - ScalarFunctionDefinition, Sort, TryCast, WindowFunction, + ScalarFunctionDefinition, Sort, TryCast, Unnest, WindowFunction, }; use crate::field_util::GetFieldAccessSchema; use crate::type_coercion::binary::get_result_type; @@ -82,6 +82,13 @@ impl ExprSchemable for Expr { Expr::Case(case) => case.when_then_expr[0].1.get_type(schema), Expr::Cast(Cast { data_type, .. }) | Expr::TryCast(TryCast { data_type, .. }) => Ok(data_type.clone()), + Expr::Unnest(Unnest { exprs }) => { + let arg_data_types = exprs + .iter() + .map(|e| e.get_type(schema)) + .collect::>>()?; + Ok(arg_data_types[0].clone()) + } Expr::ScalarFunction(ScalarFunction { func_def, args }) => { let arg_data_types = args .iter() @@ -250,6 +257,7 @@ impl ExprSchemable for Expr { | Expr::ScalarFunction(..) | Expr::WindowFunction { .. } | Expr::AggregateFunction { .. } + | Expr::Unnest(_) | Expr::Placeholder(_) => Ok(true), Expr::IsNull(_) | Expr::IsNotNull(_) diff --git a/datafusion/expr/src/tree_node/expr.rs b/datafusion/expr/src/tree_node/expr.rs index 8b38d1cf01d6..add15b3d7ad7 100644 --- a/datafusion/expr/src/tree_node/expr.rs +++ b/datafusion/expr/src/tree_node/expr.rs @@ -20,7 +20,7 @@ use crate::expr::{ AggregateFunction, AggregateFunctionDefinition, Alias, Between, BinaryExpr, Case, Cast, GetIndexedField, GroupingSet, InList, InSubquery, Like, Placeholder, - ScalarFunction, ScalarFunctionDefinition, Sort, TryCast, WindowFunction, + ScalarFunction, ScalarFunctionDefinition, Sort, TryCast, Unnest, WindowFunction, }; use crate::{Expr, GetFieldAccess}; @@ -33,7 +33,7 @@ impl TreeNode for Expr { op: &mut F, ) -> Result { let children = match self { - Expr::Alias(Alias{expr, .. }) + Expr::Alias(Alias{expr,..}) | Expr::Not(expr) | Expr::IsNotNull(expr) | Expr::IsTrue(expr) @@ -58,6 +58,7 @@ impl TreeNode for Expr { GetFieldAccess::NamedStructField { .. } => vec![expr], } } + Expr::Unnest(Unnest { exprs }) | Expr::GroupingSet(GroupingSet::Rollup(exprs)) | Expr::GroupingSet(GroupingSet::Cube(exprs)) => exprs.iter().collect(), Expr::ScalarFunction (ScalarFunction{ args, .. } ) => { @@ -151,6 +152,7 @@ impl TreeNode for Expr { | Expr::Exists { .. } | Expr::ScalarSubquery(_) | Expr::ScalarVariable(_, _) + | Expr::Unnest(_) | Expr::Literal(_) => self, Expr::Alias(Alias { expr, diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs index 5d011e097f36..e855554f3687 100644 --- a/datafusion/expr/src/utils.rs +++ b/datafusion/expr/src/utils.rs @@ -270,7 +270,8 @@ pub fn expr_to_columns(expr: &Expr, accum: &mut HashSet) -> Result<()> { // Use explicit pattern match instead of a default // implementation, so that in the future if someone adds // new Expr types, they will check here as well - Expr::ScalarVariable(_, _) + Expr::Unnest(_) + | Expr::ScalarVariable(_, _) | Expr::Alias(_) | Expr::Literal(_) | Expr::BinaryExpr { .. } diff --git a/datafusion/optimizer/src/analyzer/type_coercion.rs b/datafusion/optimizer/src/analyzer/type_coercion.rs index d804edb0c52f..662e0fc7c258 100644 --- a/datafusion/optimizer/src/analyzer/type_coercion.rs +++ b/datafusion/optimizer/src/analyzer/type_coercion.rs @@ -134,6 +134,9 @@ impl TreeNodeRewriter for TypeCoercionRewriter { fn mutate(&mut self, expr: Expr) -> Result { match expr { + Expr::Unnest(_) => internal_err!( + "Unnest should be rewritten to LogicalPlan::Unnest before type coercion" + ), Expr::ScalarSubquery(Subquery { subquery, outer_ref_columns, diff --git a/datafusion/optimizer/src/push_down_filter.rs b/datafusion/optimizer/src/push_down_filter.rs index fc56cbb86828..acdda6833285 100644 --- a/datafusion/optimizer/src/push_down_filter.rs +++ b/datafusion/optimizer/src/push_down_filter.rs @@ -227,6 +227,7 @@ fn can_evaluate_as_join_condition(predicate: &Expr) -> Result { | Expr::InSubquery(_) | Expr::ScalarSubquery(_) | Expr::OuterReferenceColumn(_, _) + | Expr::Unnest(_) | Expr::ScalarFunction(datafusion_expr::expr::ScalarFunction { func_def: ScalarFunctionDefinition::UDF(_), .. diff --git a/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs b/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs index 05a4f18e63ab..fe3679099480 100644 --- a/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs +++ b/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs @@ -483,6 +483,7 @@ impl<'a> ConstEvaluator<'a> { ScalarFunctionDefinition::Name(_) => false, }, Expr::Literal(_) + | Expr::Unnest(_) | Expr::BinaryExpr { .. } | Expr::Not(_) | Expr::IsNotNull(_) diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index f2b5c5dd4239..218399694884 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -426,6 +426,8 @@ message LogicalExprNode { PlaceholderNode placeholder = 34; + Unnest unnest = 35; + } } @@ -531,6 +533,10 @@ message NegativeNode { LogicalExprNode expr = 1; } +message Unnest { + repeated LogicalExprNode exprs = 1; +} + message InListNode { LogicalExprNode expr = 1; repeated LogicalExprNode list = 2; diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index b9a8c5fc0782..450b18dc0982 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -13359,6 +13359,9 @@ impl serde::Serialize for LogicalExprNode { logical_expr_node::ExprType::Placeholder(v) => { struct_ser.serialize_field("placeholder", v)?; } + logical_expr_node::ExprType::Unnest(v) => { + struct_ser.serialize_field("unnest", v)?; + } } } struct_ser.end() @@ -13426,6 +13429,7 @@ impl<'de> serde::Deserialize<'de> for LogicalExprNode { "similar_to", "similarTo", "placeholder", + "unnest", ]; #[allow(clippy::enum_variant_names)] @@ -13464,6 +13468,7 @@ impl<'de> serde::Deserialize<'de> for LogicalExprNode { Ilike, SimilarTo, Placeholder, + Unnest, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -13519,6 +13524,7 @@ impl<'de> serde::Deserialize<'de> for LogicalExprNode { "ilike" => Ok(GeneratedField::Ilike), "similarTo" | "similar_to" => Ok(GeneratedField::SimilarTo), "placeholder" => Ok(GeneratedField::Placeholder), + "unnest" => Ok(GeneratedField::Unnest), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -13777,6 +13783,13 @@ impl<'de> serde::Deserialize<'de> for LogicalExprNode { return Err(serde::de::Error::duplicate_field("placeholder")); } expr_type__ = map_.next_value::<::std::option::Option<_>>()?.map(logical_expr_node::ExprType::Placeholder) +; + } + GeneratedField::Unnest => { + if expr_type__.is_some() { + return Err(serde::de::Error::duplicate_field("unnest")); + } + expr_type__ = map_.next_value::<::std::option::Option<_>>()?.map(logical_expr_node::ExprType::Unnest) ; } } @@ -26752,6 +26765,97 @@ impl<'de> serde::Deserialize<'de> for UniqueConstraint { deserializer.deserialize_struct("datafusion.UniqueConstraint", FIELDS, GeneratedVisitor) } } +impl serde::Serialize for Unnest { + #[allow(deprecated)] + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + use serde::ser::SerializeStruct; + let mut len = 0; + if !self.exprs.is_empty() { + len += 1; + } + let mut struct_ser = serializer.serialize_struct("datafusion.Unnest", len)?; + if !self.exprs.is_empty() { + struct_ser.serialize_field("exprs", &self.exprs)?; + } + struct_ser.end() + } +} +impl<'de> serde::Deserialize<'de> for Unnest { + #[allow(deprecated)] + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + const FIELDS: &[&str] = &[ + "exprs", + ]; + + #[allow(clippy::enum_variant_names)] + enum GeneratedField { + Exprs, + } + impl<'de> serde::Deserialize<'de> for GeneratedField { + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + struct GeneratedVisitor; + + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = GeneratedField; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(formatter, "expected one of: {:?}", &FIELDS) + } + + #[allow(unused_variables)] + fn visit_str(self, value: &str) -> std::result::Result + where + E: serde::de::Error, + { + match value { + "exprs" => Ok(GeneratedField::Exprs), + _ => Err(serde::de::Error::unknown_field(value, FIELDS)), + } + } + } + deserializer.deserialize_identifier(GeneratedVisitor) + } + } + struct GeneratedVisitor; + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = Unnest; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + formatter.write_str("struct datafusion.Unnest") + } + + fn visit_map(self, mut map_: V) -> std::result::Result + where + V: serde::de::MapAccess<'de>, + { + let mut exprs__ = None; + while let Some(k) = map_.next_key()? { + match k { + GeneratedField::Exprs => { + if exprs__.is_some() { + return Err(serde::de::Error::duplicate_field("exprs")); + } + exprs__ = Some(map_.next_value()?); + } + } + } + Ok(Unnest { + exprs: exprs__.unwrap_or_default(), + }) + } + } + deserializer.deserialize_struct("datafusion.Unnest", FIELDS, GeneratedVisitor) + } +} impl serde::Serialize for ValuesNode { #[allow(deprecated)] fn serialize(&self, serializer: S) -> std::result::Result diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index 758ef2dcb5f3..7894285129f6 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -585,7 +585,7 @@ pub struct SubqueryAliasNode { pub struct LogicalExprNode { #[prost( oneof = "logical_expr_node::ExprType", - tags = "1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34" + tags = "1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35" )] pub expr_type: ::core::option::Option, } @@ -670,6 +670,8 @@ pub mod logical_expr_node { SimilarTo(::prost::alloc::boxed::Box), #[prost(message, tag = "34")] Placeholder(super::PlaceholderNode), + #[prost(message, tag = "35")] + Unnest(super::Unnest), } } #[allow(clippy::derive_partial_eq_without_eq)] @@ -836,6 +838,12 @@ pub struct NegativeNode { } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] +pub struct Unnest { + #[prost(message, repeated, tag = "1")] + pub exprs: ::prost::alloc::vec::Vec, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] pub struct InListNode { #[prost(message, optional, boxed, tag = "1")] pub expr: ::core::option::Option<::prost::alloc::boxed::Box>, diff --git a/datafusion/proto/src/logical_plan/from_proto.rs b/datafusion/proto/src/logical_plan/from_proto.rs index decf3b18745f..8ef7271ff2a5 100644 --- a/datafusion/proto/src/logical_plan/from_proto.rs +++ b/datafusion/proto/src/logical_plan/from_proto.rs @@ -44,6 +44,7 @@ use datafusion_common::{ Constraints, DFField, DFSchema, DFSchemaRef, DataFusionError, OwnedTableReference, Result, ScalarValue, }; +use datafusion_expr::expr::Unnest; use datafusion_expr::window_frame::{check_window_frame, regularize_window_order_by}; use datafusion_expr::{ abs, acos, acosh, array, array_append, array_concat, array_dims, array_distinct, @@ -1339,6 +1340,14 @@ pub fn parse_expr( ExprType::Negative(negative) => Ok(Expr::Negative(Box::new( parse_required_expr(negative.expr.as_deref(), registry, "expr")?, ))), + ExprType::Unnest(unnest) => { + let exprs = unnest + .exprs + .iter() + .map(|e| parse_expr(e, registry)) + .collect::, _>>()?; + Ok(Expr::Unnest(Unnest { exprs })) + } ExprType::InList(in_list) => Ok(Expr::InList(InList::new( Box::new(parse_required_expr( in_list.expr.as_deref(), diff --git a/datafusion/proto/src/logical_plan/to_proto.rs b/datafusion/proto/src/logical_plan/to_proto.rs index e094994840b2..e5948de40a23 100644 --- a/datafusion/proto/src/logical_plan/to_proto.rs +++ b/datafusion/proto/src/logical_plan/to_proto.rs @@ -48,7 +48,7 @@ use datafusion_common::{ use datafusion_expr::expr::{ self, AggregateFunctionDefinition, Alias, Between, BinaryExpr, Cast, GetFieldAccess, GetIndexedField, GroupingSet, InList, Like, Placeholder, ScalarFunction, - ScalarFunctionDefinition, Sort, + ScalarFunctionDefinition, Sort, Unnest, }; use datafusion_expr::{ logical_plan::PlanType, logical_plan::StringifiedPlan, AggregateFunction, @@ -987,6 +987,18 @@ impl TryFrom<&Expr> for protobuf::LogicalExprNode { expr_type: Some(ExprType::Negative(expr)), } } + Expr::Unnest(Unnest { exprs }) => { + let expr = protobuf::Unnest { + exprs: exprs.iter().map(|expr| expr.try_into()).collect::, + Error, + >>( + )?, + }; + Self { + expr_type: Some(ExprType::Unnest(expr)), + } + } Expr::InList(InList { expr, list, diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs index 0db086419a79..1bcdffe89236 100644 --- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs @@ -47,7 +47,7 @@ use datafusion_common::{FileType, Result}; use datafusion_expr::dml::{CopyOptions, CopyTo}; use datafusion_expr::expr::{ self, Between, BinaryExpr, Case, Cast, GroupingSet, InList, Like, ScalarFunction, - Sort, + Sort, Unnest, }; use datafusion_expr::logical_plan::{Extension, UserDefinedLogicalNodeCore}; use datafusion_expr::{ @@ -1463,6 +1463,16 @@ fn roundtrip_inlist() { roundtrip_expr_test(test_expr, ctx); } +#[test] +fn roundtrip_unnest() { + let test_expr = Expr::Unnest(Unnest { + exprs: vec![lit(1), lit(2), lit(3)], + }); + + let ctx = SessionContext::new(); + roundtrip_expr_test(test_expr, ctx); +} + #[test] fn roundtrip_wildcard() { let test_expr = Expr::Wildcard { qualifier: None }; diff --git a/datafusion/sql/src/expr/function.rs b/datafusion/sql/src/expr/function.rs index 30f8605c3954..3187f26dcc5d 100644 --- a/datafusion/sql/src/expr/function.rs +++ b/datafusion/sql/src/expr/function.rs @@ -17,15 +17,15 @@ use crate::planner::{ContextProvider, PlannerContext, SqlToRel}; use datafusion_common::{ - not_impl_err, plan_datafusion_err, plan_err, DFSchema, DataFusionError, Dependency, - Result, + exec_err, not_impl_err, plan_datafusion_err, plan_err, DFSchema, DataFusionError, + Dependency, Result, }; -use datafusion_expr::expr::ScalarFunction; +use datafusion_expr::expr::{ScalarFunction, Unnest}; use datafusion_expr::function::suggest_valid_function; use datafusion_expr::window_frame::{check_window_frame, regularize_window_order_by}; use datafusion_expr::{ - expr, AggregateFunction, BuiltinScalarFunction, Expr, WindowFrame, - WindowFunctionDefinition, + expr, AggregateFunction, BuiltinScalarFunction, Expr, ScalarFunctionDefinition, + WindowFrame, WindowFunctionDefinition, }; use sqlparser::ast::{ Expr as SQLExpr, Function as SQLFunction, FunctionArg, FunctionArgExpr, WindowType, @@ -70,6 +70,50 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { return Ok(Expr::ScalarFunction(ScalarFunction::new_udf(fm, args))); } + // Build Unnest expression + if name.eq("unnest") { + let exprs = + self.function_args_to_expr(args.clone(), schema, planner_context)?; + + match exprs.len() { + 0 => { + return exec_err!("unnest() requires at least one argument"); + } + 1 => { + if let Expr::ScalarFunction(ScalarFunction { + func_def: + ScalarFunctionDefinition::BuiltIn( + BuiltinScalarFunction::MakeArray, + ), + .. + }) = exprs[0] + { + // valid + } else if let Expr::Column(_) = exprs[0] { + // valid + } else if let Expr::ScalarFunction(ScalarFunction { + func_def: + ScalarFunctionDefinition::BuiltIn(BuiltinScalarFunction::Struct), + .. + }) = exprs[0] + { + return not_impl_err!("unnest() does not support struct yet"); + } else { + return plan_err!( + "unnest() can only be applied to array and structs and null" + ); + } + } + _ => { + return not_impl_err!( + "unnest() does not support multiple arguments yet" + ); + } + } + + return Ok(Expr::Unnest(Unnest { exprs })); + } + // next, scalar built-in if let Ok(fun) = BuiltinScalarFunction::from_str(&name) { let args = self.function_args_to_expr(args, schema, planner_context)?; diff --git a/datafusion/sql/src/select.rs b/datafusion/sql/src/select.rs index a0819e4aaf8e..7862715e5f1d 100644 --- a/datafusion/sql/src/select.rs +++ b/datafusion/sql/src/select.rs @@ -26,11 +26,10 @@ use crate::utils::{ use datafusion_common::Column; use datafusion_common::{not_impl_err, plan_err, DataFusionError, Result}; -use datafusion_expr::expr::Alias; +use datafusion_expr::expr::{Alias, Unnest}; use datafusion_expr::expr_rewriter::{ normalize_col, normalize_col_with_schemas_and_ambiguity_check, }; -use datafusion_expr::logical_plan::builder::project; use datafusion_expr::utils::{ expand_qualified_wildcard, expand_wildcard, expr_as_column_expr, expr_to_columns, find_aggregate_exprs, find_window_exprs, @@ -221,8 +220,8 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { plan }; - // final projection - let plan = project(plan, select_exprs_post_aggr)?; + // try process unnest expression or do the final projection + let plan = self.try_process_unnest(plan, select_exprs_post_aggr)?; // process distinct clause let plan = match select.distinct { @@ -275,6 +274,40 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { Ok(plan) } + // Try converting Expr::Unnest to LogicalPlan::Unnest if possible, otherwise do the final projection + fn try_process_unnest( + &self, + input: LogicalPlan, + select_exprs: Vec, + ) -> Result { + let mut exprs_to_unnest = vec![]; + + for expr in select_exprs.iter() { + if let Expr::Unnest(Unnest { exprs }) = expr { + exprs_to_unnest.push(exprs[0].clone()); + } + } + + // Do the final projection + if exprs_to_unnest.is_empty() { + LogicalPlanBuilder::from(input) + .project(select_exprs)? + .build() + } else { + if exprs_to_unnest.len() > 1 { + return not_impl_err!("Only support single unnest expression for now"); + } + + let expr = exprs_to_unnest[0].clone(); + let column = expr.display_name()?; + + LogicalPlanBuilder::from(input) + .project(vec![expr])? + .unnest_column(column)? + .build() + } + } + fn plan_selection( &self, selection: Option, diff --git a/datafusion/sqllogictest/test_files/unnest.slt b/datafusion/sqllogictest/test_files/unnest.slt new file mode 100644 index 000000000000..7e4ce06be203 --- /dev/null +++ b/datafusion/sqllogictest/test_files/unnest.slt @@ -0,0 +1,97 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +############################ +# Unnest Expressions Tests # +############################ + +statement ok +CREATE TABLE unnest_table +AS VALUES + ([1,2,3], [7], 1), + ([4,5], [8,9,10], 2), + ([6], [11,12], 3), + ([12], [null, 42, null], null) +; + +## Basic unnest expression +query I +select unnest([1,2,3]); +---- +1 +2 +3 + +query error DataFusion error: Error during planning: unnest\(\) can only be applied to array and structs and null +select unnest(null); + +## Unnest empty array +query ? +select unnest([]); +---- + +## Unnest column non-null +query I +select unnest(column1) from unnest_table; +---- +1 +2 +3 +4 +5 +6 +12 + +## Unnest column with null +query I +select unnest(column2) from unnest_table; +---- +7 +8 +9 +10 +11 +12 +NULL +42 +NULL + +## Unnest column with scalars +# TODO: This should be an error, but unnest is able to process scalar values now. +query I +select unnest(column3) from unnest_table; +---- +1 +2 +3 +NULL + +## Unnest multiple columns +query error DataFusion error: This feature is not implemented: Only support single unnest expression for now +select unnest(column1), unnest(column2) from unnest_table; + +## Unnest scalar +query error DataFusion error: Error during planning: unnest\(\) can only be applied to array and structs and null +select unnest(1); + + +## Unnest empty expression +query error DataFusion error: Execution error: unnest\(\) requires at least one argument +select unnest(); + +statement ok +drop table unnest_table;