Skip to content

Commit

Permalink
Unnest with single expression (#9069)
Browse files Browse the repository at this point in the history
* introduce basic unnest

Signed-off-by: jayzhan211 <[email protected]>

* proto

Signed-off-by: jayzhan211 <[email protected]>

* cleanup

Signed-off-by: jayzhan211 <[email protected]>

* cleanup

Signed-off-by: jayzhan211 <[email protected]>

* cleanup

Signed-off-by: jayzhan211 <[email protected]>

* rename

Signed-off-by: jayzhan211 <[email protected]>

* change panic to internal error

Signed-off-by: jayzhan211 <[email protected]>

* cleanup

Signed-off-by: jayzhan211 <[email protected]>

* typo

Signed-off-by: jayzhan211 <[email protected]>

* more err handling

Signed-off-by: jayzhan211 <[email protected]>

* fix reimported

Signed-off-by: jayzhan211 <[email protected]>

* comments and fix typo

Signed-off-by: jayzhan211 <[email protected]>

* Add tests for nulls

---------

Signed-off-by: jayzhan211 <[email protected]>
Co-authored-by: Andrew Lamb <[email protected]>
  • Loading branch information
jayzhan211 and alamb authored Feb 4, 2024
1 parent 9d1502b commit 24197d7
Show file tree
Hide file tree
Showing 19 changed files with 385 additions and 17 deletions.
1 change: 1 addition & 0 deletions datafusion/core/src/datasource/listing/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ pub fn expr_applicable_for_cols(col_names: &[String], expr: &Expr) -> bool {
| Expr::Sort { .. }
| Expr::WindowFunction { .. }
| Expr::Wildcard { .. }
| Expr::Unnest { .. }
| Expr::Placeholder(_) => {
is_applicable = false;
Ok(VisitRecursion::Stop)
Expand Down
5 changes: 5 additions & 0 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,11 @@ fn physical_name(e: &Expr) -> Result<String> {

fn create_physical_name(e: &Expr, is_first_expr: bool) -> Result<String> {
match e {
Expr::Unnest(_) => {
internal_err!(
"Expr::Unnest should have been converted to LogicalPlan::Unnest"
)
}
Expr::Column(c) => {
if is_first_expr {
Ok(c.name.clone())
Expand Down
13 changes: 13 additions & 0 deletions datafusion/expr/src/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,13 @@ pub enum Expr {
/// A place holder which hold a reference to a qualified field
/// in the outer query, used for correlated sub queries.
OuterReferenceColumn(DataType, Column),
/// Unnest expression
Unnest(Unnest),
}

#[derive(Clone, PartialEq, Eq, Hash, Debug)]
pub struct Unnest {
pub exprs: Vec<Expr>,
}

/// Alias expression
Expand Down Expand Up @@ -917,6 +924,7 @@ impl Expr {
Expr::TryCast { .. } => "TryCast",
Expr::WindowFunction { .. } => "WindowFunction",
Expr::Wildcard { .. } => "Wildcard",
Expr::Unnest { .. } => "Unnest",
}
}

Expand Down Expand Up @@ -1307,6 +1315,7 @@ impl Expr {
| Expr::Negative(..)
| Expr::OuterReferenceColumn(_, _)
| Expr::TryCast(..)
| Expr::Unnest(..)
| Expr::Wildcard { .. }
| Expr::WindowFunction(..)
| Expr::Literal(..)
Expand Down Expand Up @@ -1561,6 +1570,9 @@ impl fmt::Display for Expr {
}
},
Expr::Placeholder(Placeholder { id, .. }) => write!(f, "{id}"),
Expr::Unnest(Unnest { exprs }) => {
write!(f, "UNNEST({exprs:?})")
}
}
}
}
Expand Down Expand Up @@ -1748,6 +1760,7 @@ fn create_name(e: &Expr) -> Result<String> {
}
}
}
Expr::Unnest(Unnest { exprs }) => Ok(format!("UNNEST({exprs:?})")),
Expr::ScalarFunction(fun) => create_function_name(fun.name(), false, &fun.args),
Expr::WindowFunction(WindowFunction {
fun,
Expand Down
12 changes: 11 additions & 1 deletion datafusion/expr/src/expr_rewriter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

//! Expression rewriter
use crate::expr::Alias;
use crate::expr::{Alias, Unnest};
use crate::logical_plan::Projection;
use crate::{Expr, ExprSchemable, LogicalPlan, LogicalPlanBuilder};
use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRewriter};
Expand Down Expand Up @@ -75,6 +75,16 @@ pub fn normalize_col_with_schemas_and_ambiguity_check(
schemas: &[&[&DFSchema]],
using_columns: &[HashSet<Column>],
) -> Result<Expr> {
// Normalize column inside Unnest
if let Expr::Unnest(Unnest { exprs }) = expr {
let e = normalize_col_with_schemas_and_ambiguity_check(
exprs[0].clone(),
schemas,
using_columns,
)?;
return Ok(Expr::Unnest(Unnest { exprs: vec![e] }));
}

expr.transform(&|expr| {
Ok({
if let Expr::Column(c) = expr {
Expand Down
10 changes: 9 additions & 1 deletion datafusion/expr/src/expr_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use super::{Between, Expr, Like};
use crate::expr::{
AggregateFunction, AggregateFunctionDefinition, Alias, BinaryExpr, Cast,
GetFieldAccess, GetIndexedField, InList, InSubquery, Placeholder, ScalarFunction,
ScalarFunctionDefinition, Sort, TryCast, WindowFunction,
ScalarFunctionDefinition, Sort, TryCast, Unnest, WindowFunction,
};
use crate::field_util::GetFieldAccessSchema;
use crate::type_coercion::binary::get_result_type;
Expand Down Expand Up @@ -82,6 +82,13 @@ impl ExprSchemable for Expr {
Expr::Case(case) => case.when_then_expr[0].1.get_type(schema),
Expr::Cast(Cast { data_type, .. })
| Expr::TryCast(TryCast { data_type, .. }) => Ok(data_type.clone()),
Expr::Unnest(Unnest { exprs }) => {
let arg_data_types = exprs
.iter()
.map(|e| e.get_type(schema))
.collect::<Result<Vec<_>>>()?;
Ok(arg_data_types[0].clone())
}
Expr::ScalarFunction(ScalarFunction { func_def, args }) => {
let arg_data_types = args
.iter()
Expand Down Expand Up @@ -250,6 +257,7 @@ impl ExprSchemable for Expr {
| Expr::ScalarFunction(..)
| Expr::WindowFunction { .. }
| Expr::AggregateFunction { .. }
| Expr::Unnest(_)
| Expr::Placeholder(_) => Ok(true),
Expr::IsNull(_)
| Expr::IsNotNull(_)
Expand Down
6 changes: 4 additions & 2 deletions datafusion/expr/src/tree_node/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
use crate::expr::{
AggregateFunction, AggregateFunctionDefinition, Alias, Between, BinaryExpr, Case,
Cast, GetIndexedField, GroupingSet, InList, InSubquery, Like, Placeholder,
ScalarFunction, ScalarFunctionDefinition, Sort, TryCast, WindowFunction,
ScalarFunction, ScalarFunctionDefinition, Sort, TryCast, Unnest, WindowFunction,
};
use crate::{Expr, GetFieldAccess};

Expand All @@ -33,7 +33,7 @@ impl TreeNode for Expr {
op: &mut F,
) -> Result<VisitRecursion> {
let children = match self {
Expr::Alias(Alias{expr, .. })
Expr::Alias(Alias{expr,..})
| Expr::Not(expr)
| Expr::IsNotNull(expr)
| Expr::IsTrue(expr)
Expand All @@ -58,6 +58,7 @@ impl TreeNode for Expr {
GetFieldAccess::NamedStructField { .. } => vec![expr],
}
}
Expr::Unnest(Unnest { exprs }) |
Expr::GroupingSet(GroupingSet::Rollup(exprs))
| Expr::GroupingSet(GroupingSet::Cube(exprs)) => exprs.iter().collect(),
Expr::ScalarFunction (ScalarFunction{ args, .. } ) => {
Expand Down Expand Up @@ -151,6 +152,7 @@ impl TreeNode for Expr {
| Expr::Exists { .. }
| Expr::ScalarSubquery(_)
| Expr::ScalarVariable(_, _)
| Expr::Unnest(_)
| Expr::Literal(_) => self,
Expr::Alias(Alias {
expr,
Expand Down
3 changes: 2 additions & 1 deletion datafusion/expr/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,8 @@ pub fn expr_to_columns(expr: &Expr, accum: &mut HashSet<Column>) -> Result<()> {
// Use explicit pattern match instead of a default
// implementation, so that in the future if someone adds
// new Expr types, they will check here as well
Expr::ScalarVariable(_, _)
Expr::Unnest(_)
| Expr::ScalarVariable(_, _)
| Expr::Alias(_)
| Expr::Literal(_)
| Expr::BinaryExpr { .. }
Expand Down
3 changes: 3 additions & 0 deletions datafusion/optimizer/src/analyzer/type_coercion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,9 @@ impl TreeNodeRewriter for TypeCoercionRewriter {

fn mutate(&mut self, expr: Expr) -> Result<Expr> {
match expr {
Expr::Unnest(_) => internal_err!(
"Unnest should be rewritten to LogicalPlan::Unnest before type coercion"
),
Expr::ScalarSubquery(Subquery {
subquery,
outer_ref_columns,
Expand Down
1 change: 1 addition & 0 deletions datafusion/optimizer/src/push_down_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ fn can_evaluate_as_join_condition(predicate: &Expr) -> Result<bool> {
| Expr::InSubquery(_)
| Expr::ScalarSubquery(_)
| Expr::OuterReferenceColumn(_, _)
| Expr::Unnest(_)
| Expr::ScalarFunction(datafusion_expr::expr::ScalarFunction {
func_def: ScalarFunctionDefinition::UDF(_),
..
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,7 @@ impl<'a> ConstEvaluator<'a> {
ScalarFunctionDefinition::Name(_) => false,
},
Expr::Literal(_)
| Expr::Unnest(_)
| Expr::BinaryExpr { .. }
| Expr::Not(_)
| Expr::IsNotNull(_)
Expand Down
6 changes: 6 additions & 0 deletions datafusion/proto/proto/datafusion.proto
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,8 @@ message LogicalExprNode {

PlaceholderNode placeholder = 34;

Unnest unnest = 35;

}
}

Expand Down Expand Up @@ -531,6 +533,10 @@ message NegativeNode {
LogicalExprNode expr = 1;
}

message Unnest {
repeated LogicalExprNode exprs = 1;
}

message InListNode {
LogicalExprNode expr = 1;
repeated LogicalExprNode list = 2;
Expand Down
104 changes: 104 additions & 0 deletions datafusion/proto/src/generated/pbjson.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 9 additions & 1 deletion datafusion/proto/src/generated/prost.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 24197d7

Please sign in to comment.