Skip to content

Commit

Permalink
fix push down logic for unnest (#10991)
Browse files Browse the repository at this point in the history
Signed-off-by: jayzhan211 <[email protected]>
  • Loading branch information
jayzhan211 authored Jun 19, 2024
1 parent 0f80b92 commit 61e2ddb
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 5 deletions.
13 changes: 12 additions & 1 deletion datafusion/optimizer/src/push_down_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ use datafusion_expr::logical_plan::tree_node::unwrap_arc;
use datafusion_expr::logical_plan::{
CrossJoin, Join, JoinType, LogicalPlan, TableScan, Union,
};
use datafusion_expr::utils::{conjunction, split_conjunction, split_conjunction_owned};
use datafusion_expr::utils::{
conjunction, expr_to_columns, split_conjunction, split_conjunction_owned,
};
use datafusion_expr::{
and, build_join_schema, or, BinaryExpr, Expr, Filter, LogicalPlanBuilder, Operator,
Projection, TableProviderFilterPushDown,
Expand Down Expand Up @@ -707,6 +709,15 @@ impl OptimizerRule for PushDownFilter {
}
}
LogicalPlan::Unnest(mut unnest) => {
// collect all the Expr::Column in predicate recursively
let mut accum: HashSet<Column> = HashSet::new();
expr_to_columns(&filter.predicate, &mut accum)?;

if unnest.exec_columns.iter().any(|c| accum.contains(c)) {
filter.input = Arc::new(LogicalPlan::Unnest(unnest));
return Ok(Transformed::no(LogicalPlan::Filter(filter)));
}

// Unnest is built above Projection, so we only take Projection into consideration
match unwrap_arc(unnest.input) {
LogicalPlan::Projection(projection) => {
Expand Down
43 changes: 39 additions & 4 deletions datafusion/sqllogictest/test_files/push_down_filter.slt
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,45 @@ logical_plan
04)------Filter: v.column1 = Int64(2)
05)--------TableScan: v projection=[column1, column2]

# TODO: fix the query
query error DataFusion error: External error: Arrow error: Invalid argument error: Invalid comparison operation: List\(Field \{ name: "item", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: \{\} \}\) > Int64
query I
select uc2 from (select unnest(column2) as uc2, column1 from v) where uc2 > 3;
----
4
5

# test push down filter for unnest with filter on unnest column
# query TT
# explain select uc2 from (select unnest(column2) as uc2, column1 from v) where uc2 > 3;
query TT
explain select uc2 from (select unnest(column2) as uc2, column1 from v) where uc2 > 3;
----
logical_plan
01)Projection: unnest(v.column2) AS uc2
02)--Filter: unnest(v.column2) > Int64(3)
03)----Projection: unnest(v.column2)
04)------Unnest: lists[unnest(v.column2)] structs[]
05)--------Projection: v.column2 AS unnest(v.column2), v.column1
06)----------TableScan: v projection=[column1, column2]

statement ok
drop table v;

# test with unnest struct, should not push down filter
statement ok
CREATE TABLE d AS VALUES(1,[named_struct('a', 1, 'b', 2)]),(2,[named_struct('a', 3, 'b', 4), named_struct('a', 5, 'b', 6)]);

query I?
select * from (select column1, unnest(column2) as o from d) where o['a'] = 1;
----
1 {a: 1, b: 2}

query TT
explain select * from (select column1, unnest(column2) as o from d) where o['a'] = 1;
----
logical_plan
01)Projection: d.column1, unnest(d.column2) AS o
02)--Filter: get_field(unnest(d.column2), Utf8("a")) = Int64(1)
03)----Unnest: lists[unnest(d.column2)] structs[]
04)------Projection: d.column1, d.column2 AS unnest(d.column2)
05)--------TableScan: d projection=[column1, column2]

statement ok
drop table d;

0 comments on commit 61e2ddb

Please sign in to comment.