From 5bc005122ff0adb40a30c1d1d40adab698dac8b7 Mon Sep 17 00:00:00 2001 From: mingmwang Date: Sat, 1 Apr 2023 17:24:26 +0800 Subject: [PATCH] improve filter pushdown to join (#5770) --- benchmarks/expected-plans/q17.txt | 104 +++++------ benchmarks/expected-plans/q19.txt | 73 ++++---- benchmarks/expected-plans/q20.txt | 165 ++++++++--------- benchmarks/expected-plans/q7.txt | 181 +++++++++---------- datafusion/core/src/physical_plan/planner.rs | 9 +- datafusion/core/tests/sql/joins.rs | 20 +- datafusion/core/tests/sql/predicates.rs | 12 +- datafusion/core/tests/sql/subqueries.rs | 28 ++- datafusion/optimizer/src/push_down_filter.rs | 151 ++++++++++++---- 9 files changed, 399 insertions(+), 344 deletions(-) diff --git a/benchmarks/expected-plans/q17.txt b/benchmarks/expected-plans/q17.txt index 998450328cfa..be3e81084af3 100644 --- a/benchmarks/expected-plans/q17.txt +++ b/benchmarks/expected-plans/q17.txt @@ -1,55 +1,49 @@ -+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -| plan_type | plan | -+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -| logical_plan | Projection: CAST(SUM(lineitem.l_extendedprice) AS Float64) / Float64(7) AS avg_yearly | -| | Aggregate: groupBy=[[]], aggr=[[SUM(lineitem.l_extendedprice)]] | -| | Projection: lineitem.l_extendedprice | -| | Filter: CAST(lineitem.l_quantity AS Decimal128(30, 15)) < CAST(__scalar_sq_1.__value AS Decimal128(30, 15)) AND __scalar_sq_1.l_partkey = lineitem.l_partkey | -| | Projection: lineitem.l_partkey, lineitem.l_quantity, lineitem.l_extendedprice, __scalar_sq_1.l_partkey, __scalar_sq_1.__value | -| | Inner Join: part.p_partkey = __scalar_sq_1.l_partkey | -| | Filter: part.p_partkey = lineitem.l_partkey AND lineitem.l_partkey = part.p_partkey | -| | Inner Join: lineitem.l_partkey = part.p_partkey | -| | TableScan: lineitem projection=[l_partkey, l_quantity, l_extendedprice] | -| | Projection: part.p_partkey | -| | Filter: part.p_brand = Utf8("Brand#23") AND part.p_container = Utf8("MED BOX") | -| | TableScan: part projection=[p_partkey, p_brand, p_container] | -| | SubqueryAlias: __scalar_sq_1 | -| | Projection: lineitem.l_partkey, Float64(0.2) * CAST(AVG(lineitem.l_quantity) AS Float64) AS __value | -| | Aggregate: groupBy=[[lineitem.l_partkey]], aggr=[[AVG(lineitem.l_quantity)]] | -| | TableScan: lineitem projection=[l_partkey, l_quantity] | -| physical_plan | ProjectionExec: expr=[CAST(SUM(lineitem.l_extendedprice)@0 AS Float64) / 7 as avg_yearly] | -| | AggregateExec: mode=Final, gby=[], aggr=[SUM(lineitem.l_extendedprice)] | -| | CoalescePartitionsExec | -| | AggregateExec: mode=Partial, gby=[], aggr=[SUM(lineitem.l_extendedprice)] | -| | ProjectionExec: expr=[l_extendedprice@2 as l_extendedprice] | -| | CoalesceBatchesExec: target_batch_size=8192 | -| | FilterExec: CAST(l_quantity@1 AS Decimal128(30, 15)) < CAST(__value@4 AS Decimal128(30, 15)) AND l_partkey@3 = l_partkey@0 | -| | ProjectionExec: expr=[l_partkey@0 as l_partkey, l_quantity@1 as l_quantity, l_extendedprice@2 as l_extendedprice, l_partkey@4 as l_partkey, __value@5 as __value] | -| | CoalesceBatchesExec: target_batch_size=8192 | -| | HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "p_partkey", index: 3 }, Column { name: "l_partkey", index: 0 })] | -| | CoalesceBatchesExec: target_batch_size=8192 | -| | RepartitionExec: partitioning=Hash([Column { name: "p_partkey", index: 3 }], 2), input_partitions=2 | -| | CoalesceBatchesExec: target_batch_size=8192 | -| | FilterExec: p_partkey@3 = l_partkey@0 AND l_partkey@0 = p_partkey@3 | -| | RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=2 | -| | CoalesceBatchesExec: target_batch_size=8192 | -| | HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "l_partkey", index: 0 }, Column { name: "p_partkey", index: 0 })] | -| | CoalesceBatchesExec: target_batch_size=8192 | -| | RepartitionExec: partitioning=Hash([Column { name: "l_partkey", index: 0 }], 2), input_partitions=0 | -| | MemoryExec: partitions=0, partition_sizes=[] | -| | CoalesceBatchesExec: target_batch_size=8192 | -| | RepartitionExec: partitioning=Hash([Column { name: "p_partkey", index: 0 }], 2), input_partitions=2 | -| | RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=0 | -| | ProjectionExec: expr=[p_partkey@0 as p_partkey] | -| | CoalesceBatchesExec: target_batch_size=8192 | -| | FilterExec: p_brand@1 = Brand#23 AND p_container@2 = MED BOX | -| | MemoryExec: partitions=0, partition_sizes=[] | -| | ProjectionExec: expr=[l_partkey@0 as l_partkey, 0.2 * CAST(AVG(lineitem.l_quantity)@1 AS Float64) as __value] | -| | AggregateExec: mode=FinalPartitioned, gby=[l_partkey@0 as l_partkey], aggr=[AVG(lineitem.l_quantity)] | -| | CoalesceBatchesExec: target_batch_size=8192 | -| | RepartitionExec: partitioning=Hash([Column { name: "l_partkey", index: 0 }], 2), input_partitions=2 | -| | RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=0 | -| | AggregateExec: mode=Partial, gby=[l_partkey@0 as l_partkey], aggr=[AVG(lineitem.l_quantity)] | -| | MemoryExec: partitions=0, partition_sizes=[] | -| | | -+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ \ No newline at end of file ++---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| logical_plan | Projection: CAST(SUM(lineitem.l_extendedprice) AS Float64) / Float64(7) AS avg_yearly | +| | Aggregate: groupBy=[[]], aggr=[[SUM(lineitem.l_extendedprice)]] | +| | Projection: lineitem.l_extendedprice | +| | Inner Join: part.p_partkey = __scalar_sq_1.l_partkey Filter: CAST(lineitem.l_quantity AS Decimal128(30, 15)) < CAST(__scalar_sq_1.__value AS Decimal128(30, 15)) | +| | Projection: lineitem.l_quantity, lineitem.l_extendedprice, part.p_partkey | +| | Inner Join: lineitem.l_partkey = part.p_partkey | +| | TableScan: lineitem projection=[l_partkey, l_quantity, l_extendedprice] | +| | Projection: part.p_partkey | +| | Filter: part.p_brand = Utf8("Brand#23") AND part.p_container = Utf8("MED BOX") | +| | TableScan: part projection=[p_partkey, p_brand, p_container] | +| | SubqueryAlias: __scalar_sq_1 | +| | Projection: lineitem.l_partkey, Float64(0.2) * CAST(AVG(lineitem.l_quantity) AS Float64) AS __value | +| | Aggregate: groupBy=[[lineitem.l_partkey]], aggr=[[AVG(lineitem.l_quantity)]] | +| | TableScan: lineitem projection=[l_partkey, l_quantity] | +| physical_plan | ProjectionExec: expr=[CAST(SUM(lineitem.l_extendedprice)@0 AS Float64) / 7 as avg_yearly] | +| | AggregateExec: mode=Final, gby=[], aggr=[SUM(lineitem.l_extendedprice)] | +| | CoalescePartitionsExec | +| | AggregateExec: mode=Partial, gby=[], aggr=[SUM(lineitem.l_extendedprice)] | +| | ProjectionExec: expr=[l_extendedprice@1 as l_extendedprice] | +| | CoalesceBatchesExec: target_batch_size=8192 | +| | HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "p_partkey", index: 2 }, Column { name: "l_partkey", index: 0 })], filter=BinaryExpr { left: CastExpr { expr: Column { name: "l_quantity", index: 0 }, cast_type: Decimal128(30, 15), cast_options: CastOptions { safe: false } }, op: Lt, right: CastExpr { expr: Column { name: "__value", index: 1 }, cast_type: Decimal128(30, 15), cast_options: CastOptions { safe: false } } } | +| | CoalesceBatchesExec: target_batch_size=8192 | +| | RepartitionExec: partitioning=Hash([Column { name: "p_partkey", index: 2 }], 2), input_partitions=2 | +| | RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=2 | +| | ProjectionExec: expr=[l_quantity@1 as l_quantity, l_extendedprice@2 as l_extendedprice, p_partkey@3 as p_partkey] | +| | CoalesceBatchesExec: target_batch_size=8192 | +| | HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "l_partkey", index: 0 }, Column { name: "p_partkey", index: 0 })] | +| | CoalesceBatchesExec: target_batch_size=8192 | +| | RepartitionExec: partitioning=Hash([Column { name: "l_partkey", index: 0 }], 2), input_partitions=0 | +| | MemoryExec: partitions=0, partition_sizes=[] | +| | CoalesceBatchesExec: target_batch_size=8192 | +| | RepartitionExec: partitioning=Hash([Column { name: "p_partkey", index: 0 }], 2), input_partitions=2 | +| | RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=0 | +| | ProjectionExec: expr=[p_partkey@0 as p_partkey] | +| | CoalesceBatchesExec: target_batch_size=8192 | +| | FilterExec: p_brand@1 = Brand#23 AND p_container@2 = MED BOX | +| | MemoryExec: partitions=0, partition_sizes=[] | +| | ProjectionExec: expr=[l_partkey@0 as l_partkey, 0.2 * CAST(AVG(lineitem.l_quantity)@1 AS Float64) as __value] | +| | AggregateExec: mode=FinalPartitioned, gby=[l_partkey@0 as l_partkey], aggr=[AVG(lineitem.l_quantity)] | +| | CoalesceBatchesExec: target_batch_size=8192 | +| | RepartitionExec: partitioning=Hash([Column { name: "l_partkey", index: 0 }], 2), input_partitions=2 | +| | RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=0 | +| | AggregateExec: mode=Partial, gby=[l_partkey@0 as l_partkey], aggr=[AVG(lineitem.l_quantity)] | +| | MemoryExec: partitions=0, partition_sizes=[] | +| | | ++---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ \ No newline at end of file diff --git a/benchmarks/expected-plans/q19.txt b/benchmarks/expected-plans/q19.txt index 610238203282..2e3ccbde94e7 100644 --- a/benchmarks/expected-plans/q19.txt +++ b/benchmarks/expected-plans/q19.txt @@ -1,39 +1,34 @@ -+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -| plan_type | plan | -+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -| logical_plan | Projection: SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount) AS revenue | -| | Aggregate: groupBy=[[]], aggr=[[SUM(CAST(lineitem.l_extendedprice AS Decimal128(38, 4)) * CAST(Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2)) AS Decimal128(38, 4))) AS SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]] | -| | Projection: lineitem.l_extendedprice, lineitem.l_discount | -| | Filter: part.p_brand = Utf8("Brand#12") AND part.p_container IN ([Utf8("SM CASE"), Utf8("SM BOX"), Utf8("SM PACK"), Utf8("SM PKG")]) AND lineitem.l_quantity >= Decimal128(Some(100),15,2) AND lineitem.l_quantity <= Decimal128(Some(1100),15,2) AND part.p_size <= Int32(5) OR part.p_brand = Utf8("Brand#23") AND part.p_container IN ([Utf8("MED BAG"), Utf8("MED BOX"), Utf8("MED PKG"), Utf8("MED PACK")]) AND lineitem.l_quantity >= Decimal128(Some(1000),15,2) AND lineitem.l_quantity <= Decimal128(Some(2000),15,2) AND part.p_size <= Int32(10) OR part.p_brand = Utf8("Brand#34") AND part.p_container IN ([Utf8("LG CASE"), Utf8("LG BOX"), Utf8("LG PACK"), Utf8("LG PKG")]) AND lineitem.l_quantity >= Decimal128(Some(2000),15,2) AND lineitem.l_quantity <= Decimal128(Some(3000),15,2) AND part.p_size <= Int32(15) | -| | Projection: lineitem.l_quantity, lineitem.l_extendedprice, lineitem.l_discount, part.p_brand, part.p_size, part.p_container | -| | Inner Join: lineitem.l_partkey = part.p_partkey | -| | Projection: lineitem.l_partkey, lineitem.l_quantity, lineitem.l_extendedprice, lineitem.l_discount | -| | Filter: (lineitem.l_quantity >= Decimal128(Some(100),15,2) AND lineitem.l_quantity <= Decimal128(Some(1100),15,2) OR lineitem.l_quantity >= Decimal128(Some(1000),15,2) AND lineitem.l_quantity <= Decimal128(Some(2000),15,2) OR lineitem.l_quantity >= Decimal128(Some(2000),15,2) AND lineitem.l_quantity <= Decimal128(Some(3000),15,2)) AND (lineitem.l_shipmode = Utf8("AIR REG") OR lineitem.l_shipmode = Utf8("AIR")) AND lineitem.l_shipinstruct = Utf8("DELIVER IN PERSON") | -| | TableScan: lineitem projection=[l_partkey, l_quantity, l_extendedprice, l_discount, l_shipinstruct, l_shipmode] | -| | Filter: (part.p_brand = Utf8("Brand#12") AND part.p_container IN ([Utf8("SM CASE"), Utf8("SM BOX"), Utf8("SM PACK"), Utf8("SM PKG")]) AND part.p_size <= Int32(5) OR part.p_brand = Utf8("Brand#23") AND part.p_container IN ([Utf8("MED BAG"), Utf8("MED BOX"), Utf8("MED PKG"), Utf8("MED PACK")]) AND part.p_size <= Int32(10) OR part.p_brand = Utf8("Brand#34") AND part.p_container IN ([Utf8("LG CASE"), Utf8("LG BOX"), Utf8("LG PACK"), Utf8("LG PKG")]) AND part.p_size <= Int32(15)) AND part.p_size >= Int32(1) | -| | TableScan: part projection=[p_partkey, p_brand, p_size, p_container] | -| physical_plan | ProjectionExec: expr=[SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)@0 as revenue] | -| | AggregateExec: mode=Final, gby=[], aggr=[SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)] | -| | CoalescePartitionsExec | -| | AggregateExec: mode=Partial, gby=[], aggr=[SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)] | -| | ProjectionExec: expr=[l_extendedprice@1 as l_extendedprice, l_discount@2 as l_discount] | -| | CoalesceBatchesExec: target_batch_size=8192 | -| | FilterExec: p_brand@3 = Brand#12 AND Use p_container@5 IN (SET) ([Literal { value: Utf8("SM CASE") }, Literal { value: Utf8("SM BOX") }, Literal { value: Utf8("SM PACK") }, Literal { value: Utf8("SM PKG") }]) AND l_quantity@0 >= Some(100),15,2 AND l_quantity@0 <= Some(1100),15,2 AND p_size@4 <= 5 OR p_brand@3 = Brand#23 AND Use p_container@5 IN (SET) ([Literal { value: Utf8("MED BAG") }, Literal { value: Utf8("MED BOX") }, Literal { value: Utf8("MED PKG") }, Literal { value: Utf8("MED PACK") }]) AND l_quantity@0 >= Some(1000),15,2 AND l_quantity@0 <= Some(2000),15,2 AND p_size@4 <= 10 OR p_brand@3 = Brand#34 AND Use p_container@5 IN (SET) ([Literal { value: Utf8("LG CASE") }, Literal { value: Utf8("LG BOX") }, Literal { value: Utf8("LG PACK") }, Literal { value: Utf8("LG PKG") }]) AND l_quantity@0 >= Some(2000),15,2 AND l_quantity@0 <= Some(3000),15,2 AND p_size@4 <= 15 | -| | ProjectionExec: expr=[l_quantity@1 as l_quantity, l_extendedprice@2 as l_extendedprice, l_discount@3 as l_discount, p_brand@5 as p_brand, p_size@6 as p_size, p_container@7 as p_container] | -| | CoalesceBatchesExec: target_batch_size=8192 | -| | HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "l_partkey", index: 0 }, Column { name: "p_partkey", index: 0 })] | -| | CoalesceBatchesExec: target_batch_size=8192 | -| | RepartitionExec: partitioning=Hash([Column { name: "l_partkey", index: 0 }], 2), input_partitions=2 | -| | RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=0 | -| | ProjectionExec: expr=[l_partkey@0 as l_partkey, l_quantity@1 as l_quantity, l_extendedprice@2 as l_extendedprice, l_discount@3 as l_discount] | -| | CoalesceBatchesExec: target_batch_size=8192 | -| | FilterExec: (l_quantity@1 >= Some(100),15,2 AND l_quantity@1 <= Some(1100),15,2 OR l_quantity@1 >= Some(1000),15,2 AND l_quantity@1 <= Some(2000),15,2 OR l_quantity@1 >= Some(2000),15,2 AND l_quantity@1 <= Some(3000),15,2) AND (l_shipmode@5 = AIR REG OR l_shipmode@5 = AIR) AND l_shipinstruct@4 = DELIVER IN PERSON | -| | MemoryExec: partitions=0, partition_sizes=[] | -| | CoalesceBatchesExec: target_batch_size=8192 | -| | RepartitionExec: partitioning=Hash([Column { name: "p_partkey", index: 0 }], 2), input_partitions=2 | -| | RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=0 | -| | CoalesceBatchesExec: target_batch_size=8192 | -| | FilterExec: (p_brand@1 = Brand#12 AND Use p_container@3 IN (SET) ([Literal { value: Utf8("SM CASE") }, Literal { value: Utf8("SM BOX") }, Literal { value: Utf8("SM PACK") }, Literal { value: Utf8("SM PKG") }]) AND p_size@2 <= 5 OR p_brand@1 = Brand#23 AND Use p_container@3 IN (SET) ([Literal { value: Utf8("MED BAG") }, Literal { value: Utf8("MED BOX") }, Literal { value: Utf8("MED PKG") }, Literal { value: Utf8("MED PACK") }]) AND p_size@2 <= 10 OR p_brand@1 = Brand#34 AND Use p_container@3 IN (SET) ([Literal { value: Utf8("LG CASE") }, Literal { value: Utf8("LG BOX") }, Literal { value: Utf8("LG PACK") }, Literal { value: Utf8("LG PKG") }]) AND p_size@2 <= 15) AND p_size@2 >= 1 | -| | MemoryExec: partitions=0, partition_sizes=[] | -| | | -+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ \ No newline at end of file ++---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| logical_plan | Projection: SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount) AS revenue | +| | Aggregate: groupBy=[[]], aggr=[[SUM(CAST(lineitem.l_extendedprice AS Decimal128(38, 4)) * CAST(Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2)) AS Decimal128(38, 4))) AS SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]] | +| | Projection: lineitem.l_extendedprice, lineitem.l_discount | +| | Inner Join: lineitem.l_partkey = part.p_partkey Filter: part.p_brand = Utf8("Brand#12") AND part.p_container IN ([Utf8("SM CASE"), Utf8("SM BOX"), Utf8("SM PACK"), Utf8("SM PKG")]) AND lineitem.l_quantity >= Decimal128(Some(100),15,2) AND lineitem.l_quantity <= Decimal128(Some(1100),15,2) AND part.p_size <= Int32(5) OR part.p_brand = Utf8("Brand#23") AND part.p_container IN ([Utf8("MED BAG"), Utf8("MED BOX"), Utf8("MED PKG"), Utf8("MED PACK")]) AND lineitem.l_quantity >= Decimal128(Some(1000),15,2) AND lineitem.l_quantity <= Decimal128(Some(2000),15,2) AND part.p_size <= Int32(10) OR part.p_brand = Utf8("Brand#34") AND part.p_container IN ([Utf8("LG CASE"), Utf8("LG BOX"), Utf8("LG PACK"), Utf8("LG PKG")]) AND lineitem.l_quantity >= Decimal128(Some(2000),15,2) AND lineitem.l_quantity <= Decimal128(Some(3000),15,2) AND part.p_size <= Int32(15) | +| | Projection: lineitem.l_partkey, lineitem.l_quantity, lineitem.l_extendedprice, lineitem.l_discount | +| | Filter: (lineitem.l_quantity >= Decimal128(Some(100),15,2) AND lineitem.l_quantity <= Decimal128(Some(1100),15,2) OR lineitem.l_quantity >= Decimal128(Some(1000),15,2) AND lineitem.l_quantity <= Decimal128(Some(2000),15,2) OR lineitem.l_quantity >= Decimal128(Some(2000),15,2) AND lineitem.l_quantity <= Decimal128(Some(3000),15,2)) AND (lineitem.l_shipmode = Utf8("AIR REG") OR lineitem.l_shipmode = Utf8("AIR")) AND lineitem.l_shipinstruct = Utf8("DELIVER IN PERSON") | +| | TableScan: lineitem projection=[l_partkey, l_quantity, l_extendedprice, l_discount, l_shipinstruct, l_shipmode] | +| | Filter: (part.p_brand = Utf8("Brand#12") AND part.p_container IN ([Utf8("SM CASE"), Utf8("SM BOX"), Utf8("SM PACK"), Utf8("SM PKG")]) AND part.p_size <= Int32(5) OR part.p_brand = Utf8("Brand#23") AND part.p_container IN ([Utf8("MED BAG"), Utf8("MED BOX"), Utf8("MED PKG"), Utf8("MED PACK")]) AND part.p_size <= Int32(10) OR part.p_brand = Utf8("Brand#34") AND part.p_container IN ([Utf8("LG CASE"), Utf8("LG BOX"), Utf8("LG PACK"), Utf8("LG PKG")]) AND part.p_size <= Int32(15)) AND part.p_size >= Int32(1) | +| | TableScan: part projection=[p_partkey, p_brand, p_size, p_container] | +| physical_plan | ProjectionExec: expr=[SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)@0 as revenue] | +| | AggregateExec: mode=Final, gby=[], aggr=[SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)] | +| | CoalescePartitionsExec | +| | AggregateExec: mode=Partial, gby=[], aggr=[SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)] | +| | ProjectionExec: expr=[l_extendedprice@2 as l_extendedprice, l_discount@3 as l_discount] | +| | CoalesceBatchesExec: target_batch_size=8192 | +| | HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "l_partkey", index: 0 }, Column { name: "p_partkey", index: 0 })], filter=BinaryExpr { left: BinaryExpr { left: BinaryExpr { left: BinaryExpr { left: BinaryExpr { left: BinaryExpr { left: BinaryExpr { left: Column { name: "p_brand", index: 1 }, op: Eq, right: Literal { value: Utf8("Brand#12") } }, op: And, right: InListExpr { expr: Column { name: "p_container", index: 3 }, list: [Literal { value: Utf8("SM CASE") }, Literal { value: Utf8("SM BOX") }, Literal { value: Utf8("SM PACK") }, Literal { value: Utf8("SM PKG") }], negated: false } }, op: And, right: BinaryExpr { left: Column { name: "l_quantity", index: 0 }, op: GtEq, right: Literal { value: Decimal128(Some(100),15,2) } } }, op: And, right: BinaryExpr { left: Column { name: "l_quantity", index: 0 }, op: LtEq, right: Literal { value: Decimal128(Some(1100),15,2) } } }, op: And, right: BinaryExpr { left: Column { name: "p_size", index: 2 }, op: LtEq, right: Literal { value: Int32(5) } } }, op: Or, right: BinaryExpr { left: BinaryExpr { left: BinaryExpr { left: BinaryExpr { left: BinaryExpr { left: Column { name: "p_brand", index: 1 }, op: Eq, right: Literal { value: Utf8("Brand#23") } }, op: And, right: InListExpr { expr: Column { name: "p_container", index: 3 }, list: [Literal { value: Utf8("MED BAG") }, Literal { value: Utf8("MED BOX") }, Literal { value: Utf8("MED PKG") }, Literal { value: Utf8("MED PACK") }], negated: false } }, op: And, right: BinaryExpr { left: Column { name: "l_quantity", index: 0 }, op: GtEq, right: Literal { value: Decimal128(Some(1000),15,2) } } }, op: And, right: BinaryExpr { left: Column { name: "l_quantity", index: 0 }, op: LtEq, right: Literal { value: Decimal128(Some(2000),15,2) } } }, op: And, right: BinaryExpr { left: Column { name: "p_size", index: 2 }, op: LtEq, right: Literal { value: Int32(10) } } } }, op: Or, right: BinaryExpr { left: BinaryExpr { left: BinaryExpr { left: BinaryExpr { left: BinaryExpr { left: Column { name: "p_brand", index: 1 }, op: Eq, right: Literal { value: Utf8("Brand#34") } }, op: And, right: InListExpr { expr: Column { name: "p_container", index: 3 }, list: [Literal { value: Utf8("LG CASE") }, Literal { value: Utf8("LG BOX") }, Literal { value: Utf8("LG PACK") }, Literal { value: Utf8("LG PKG") }], negated: false } }, op: And, right: BinaryExpr { left: Column { name: "l_quantity", index: 0 }, op: GtEq, right: Literal { value: Decimal128(Some(2000),15,2) } } }, op: And, right: BinaryExpr { left: Column { name: "l_quantity", index: 0 }, op: LtEq, right: Literal { value: Decimal128(Some(3000),15,2) } } }, op: And, right: BinaryExpr { left: Column { name: "p_size", index: 2 }, op: LtEq, right: Literal { value: Int32(15) } } } } | +| | CoalesceBatchesExec: target_batch_size=8192 | +| | RepartitionExec: partitioning=Hash([Column { name: "l_partkey", index: 0 }], 2), input_partitions=2 | +| | RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=0 | +| | ProjectionExec: expr=[l_partkey@0 as l_partkey, l_quantity@1 as l_quantity, l_extendedprice@2 as l_extendedprice, l_discount@3 as l_discount] | +| | CoalesceBatchesExec: target_batch_size=8192 | +| | FilterExec: (l_quantity@1 >= Some(100),15,2 AND l_quantity@1 <= Some(1100),15,2 OR l_quantity@1 >= Some(1000),15,2 AND l_quantity@1 <= Some(2000),15,2 OR l_quantity@1 >= Some(2000),15,2 AND l_quantity@1 <= Some(3000),15,2) AND (l_shipmode@5 = AIR REG OR l_shipmode@5 = AIR) AND l_shipinstruct@4 = DELIVER IN PERSON | +| | MemoryExec: partitions=0, partition_sizes=[] | +| | CoalesceBatchesExec: target_batch_size=8192 | +| | RepartitionExec: partitioning=Hash([Column { name: "p_partkey", index: 0 }], 2), input_partitions=2 | +| | RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=0 | +| | CoalesceBatchesExec: target_batch_size=8192 | +| | FilterExec: (p_brand@1 = Brand#12 AND Use p_container@3 IN (SET) ([Literal { value: Utf8("SM CASE") }, Literal { value: Utf8("SM BOX") }, Literal { value: Utf8("SM PACK") }, Literal { value: Utf8("SM PKG") }]) AND p_size@2 <= 5 OR p_brand@1 = Brand#23 AND Use p_container@3 IN (SET) ([Literal { value: Utf8("MED BAG") }, Literal { value: Utf8("MED BOX") }, Literal { value: Utf8("MED PKG") }, Literal { value: Utf8("MED PACK") }]) AND p_size@2 <= 10 OR p_brand@1 = Brand#34 AND Use p_container@3 IN (SET) ([Literal { value: Utf8("LG CASE") }, Literal { value: Utf8("LG BOX") }, Literal { value: Utf8("LG PACK") }, Literal { value: Utf8("LG PKG") }]) AND p_size@2 <= 15) AND p_size@2 >= 1 | +| | MemoryExec: partitions=0, partition_sizes=[] | +| | | ++---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ \ No newline at end of file diff --git a/benchmarks/expected-plans/q20.txt b/benchmarks/expected-plans/q20.txt index a3209f3e2d7d..03ad420c58df 100644 --- a/benchmarks/expected-plans/q20.txt +++ b/benchmarks/expected-plans/q20.txt @@ -1,85 +1,80 @@ -+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -| plan_type | plan | -+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -| logical_plan | Sort: supplier.s_name ASC NULLS LAST | -| | Projection: supplier.s_name, supplier.s_address | -| | LeftSemi Join: supplier.s_suppkey = __correlated_sq_1.ps_suppkey | -| | Projection: supplier.s_suppkey, supplier.s_name, supplier.s_address | -| | Inner Join: supplier.s_nationkey = nation.n_nationkey | -| | TableScan: supplier projection=[s_suppkey, s_name, s_address, s_nationkey] | -| | Projection: nation.n_nationkey | -| | Filter: nation.n_name = Utf8("CANADA") | -| | TableScan: nation projection=[n_nationkey, n_name] | -| | SubqueryAlias: __correlated_sq_1 | -| | Projection: partsupp.ps_suppkey AS ps_suppkey | -| | Filter: CAST(partsupp.ps_availqty AS Float64) > __scalar_sq_1.__value | -| | Projection: partsupp.ps_suppkey, partsupp.ps_availqty, __scalar_sq_1.__value | -| | Inner Join: partsupp.ps_partkey = __scalar_sq_1.l_partkey, partsupp.ps_suppkey = __scalar_sq_1.l_suppkey | -| | LeftSemi Join: partsupp.ps_partkey = __correlated_sq_2.p_partkey | -| | TableScan: partsupp projection=[ps_partkey, ps_suppkey, ps_availqty] | -| | SubqueryAlias: __correlated_sq_2 | -| | Projection: part.p_partkey AS p_partkey | -| | Filter: part.p_name LIKE Utf8("forest%") | -| | TableScan: part projection=[p_partkey, p_name] | -| | SubqueryAlias: __scalar_sq_1 | -| | Projection: lineitem.l_partkey, lineitem.l_suppkey, Float64(0.5) * CAST(SUM(lineitem.l_quantity) AS Float64) AS __value | -| | Aggregate: groupBy=[[lineitem.l_partkey, lineitem.l_suppkey]], aggr=[[SUM(lineitem.l_quantity)]] | -| | Projection: lineitem.l_partkey, lineitem.l_suppkey, lineitem.l_quantity | -| | Filter: lineitem.l_shipdate >= Date32("8766") AND lineitem.l_shipdate < Date32("9131") | -| | TableScan: lineitem projection=[l_partkey, l_suppkey, l_quantity, l_shipdate] | -| physical_plan | SortPreservingMergeExec: [s_name@0 ASC NULLS LAST] | -| | SortExec: expr=[s_name@0 ASC NULLS LAST] | -| | ProjectionExec: expr=[s_name@1 as s_name, s_address@2 as s_address] | -| | CoalesceBatchesExec: target_batch_size=8192 | -| | HashJoinExec: mode=Partitioned, join_type=LeftSemi, on=[(Column { name: "s_suppkey", index: 0 }, Column { name: "ps_suppkey", index: 0 })] | -| | CoalesceBatchesExec: target_batch_size=8192 | -| | RepartitionExec: partitioning=Hash([Column { name: "s_suppkey", index: 0 }], 2), input_partitions=2 | -| | RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=2 | -| | ProjectionExec: expr=[s_suppkey@0 as s_suppkey, s_name@1 as s_name, s_address@2 as s_address] | -| | CoalesceBatchesExec: target_batch_size=8192 | -| | HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "s_nationkey", index: 3 }, Column { name: "n_nationkey", index: 0 })] | -| | CoalesceBatchesExec: target_batch_size=8192 | -| | RepartitionExec: partitioning=Hash([Column { name: "s_nationkey", index: 3 }], 2), input_partitions=0 | -| | MemoryExec: partitions=0, partition_sizes=[] | -| | CoalesceBatchesExec: target_batch_size=8192 | -| | RepartitionExec: partitioning=Hash([Column { name: "n_nationkey", index: 0 }], 2), input_partitions=2 | -| | RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=0 | -| | ProjectionExec: expr=[n_nationkey@0 as n_nationkey] | -| | CoalesceBatchesExec: target_batch_size=8192 | -| | FilterExec: n_name@1 = CANADA | -| | MemoryExec: partitions=0, partition_sizes=[] | -| | CoalesceBatchesExec: target_batch_size=8192 | -| | RepartitionExec: partitioning=Hash([Column { name: "ps_suppkey", index: 0 }], 2), input_partitions=2 | -| | ProjectionExec: expr=[ps_suppkey@0 as ps_suppkey] | -| | CoalesceBatchesExec: target_batch_size=8192 | -| | FilterExec: CAST(ps_availqty@1 AS Float64) > __value@2 | -| | ProjectionExec: expr=[ps_suppkey@1 as ps_suppkey, ps_availqty@2 as ps_availqty, __value@5 as __value] | -| | CoalesceBatchesExec: target_batch_size=8192 | -| | HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "ps_partkey", index: 0 }, Column { name: "l_partkey", index: 0 }), (Column { name: "ps_suppkey", index: 1 }, Column { name: "l_suppkey", index: 1 })] | -| | CoalesceBatchesExec: target_batch_size=8192 | -| | RepartitionExec: partitioning=Hash([Column { name: "ps_partkey", index: 0 }, Column { name: "ps_suppkey", index: 1 }], 2), input_partitions=2 | -| | RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=2 | -| | CoalesceBatchesExec: target_batch_size=8192 | -| | HashJoinExec: mode=Partitioned, join_type=LeftSemi, on=[(Column { name: "ps_partkey", index: 0 }, Column { name: "p_partkey", index: 0 })] | -| | CoalesceBatchesExec: target_batch_size=8192 | -| | RepartitionExec: partitioning=Hash([Column { name: "ps_partkey", index: 0 }], 2), input_partitions=0 | -| | MemoryExec: partitions=0, partition_sizes=[] | -| | CoalesceBatchesExec: target_batch_size=8192 | -| | RepartitionExec: partitioning=Hash([Column { name: "p_partkey", index: 0 }], 2), input_partitions=2 | -| | RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=0 | -| | ProjectionExec: expr=[p_partkey@0 as p_partkey] | -| | CoalesceBatchesExec: target_batch_size=8192 | -| | FilterExec: p_name@1 LIKE forest% | -| | MemoryExec: partitions=0, partition_sizes=[] | -| | ProjectionExec: expr=[l_partkey@0 as l_partkey, l_suppkey@1 as l_suppkey, 0.5 * CAST(SUM(lineitem.l_quantity)@2 AS Float64) as __value] | -| | AggregateExec: mode=FinalPartitioned, gby=[l_partkey@0 as l_partkey, l_suppkey@1 as l_suppkey], aggr=[SUM(lineitem.l_quantity)] | -| | CoalesceBatchesExec: target_batch_size=8192 | -| | RepartitionExec: partitioning=Hash([Column { name: "l_partkey", index: 0 }, Column { name: "l_suppkey", index: 1 }], 2), input_partitions=2 | -| | AggregateExec: mode=Partial, gby=[l_partkey@0 as l_partkey, l_suppkey@1 as l_suppkey], aggr=[SUM(lineitem.l_quantity)] | -| | RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=0 | -| | ProjectionExec: expr=[l_partkey@0 as l_partkey, l_suppkey@1 as l_suppkey, l_quantity@2 as l_quantity] | -| | CoalesceBatchesExec: target_batch_size=8192 | -| | FilterExec: l_shipdate@3 >= 8766 AND l_shipdate@3 < 9131 | -| | MemoryExec: partitions=0, partition_sizes=[] | -| | | -+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ \ No newline at end of file ++---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| logical_plan | Sort: supplier.s_name ASC NULLS LAST | +| | Projection: supplier.s_name, supplier.s_address | +| | LeftSemi Join: supplier.s_suppkey = __correlated_sq_1.ps_suppkey | +| | Projection: supplier.s_suppkey, supplier.s_name, supplier.s_address | +| | Inner Join: supplier.s_nationkey = nation.n_nationkey | +| | TableScan: supplier projection=[s_suppkey, s_name, s_address, s_nationkey] | +| | Projection: nation.n_nationkey | +| | Filter: nation.n_name = Utf8("CANADA") | +| | TableScan: nation projection=[n_nationkey, n_name] | +| | SubqueryAlias: __correlated_sq_1 | +| | Projection: partsupp.ps_suppkey AS ps_suppkey | +| | Inner Join: partsupp.ps_partkey = __scalar_sq_1.l_partkey, partsupp.ps_suppkey = __scalar_sq_1.l_suppkey Filter: CAST(partsupp.ps_availqty AS Float64) > __scalar_sq_1.__value | +| | LeftSemi Join: partsupp.ps_partkey = __correlated_sq_2.p_partkey | +| | TableScan: partsupp projection=[ps_partkey, ps_suppkey, ps_availqty] | +| | SubqueryAlias: __correlated_sq_2 | +| | Projection: part.p_partkey AS p_partkey | +| | Filter: part.p_name LIKE Utf8("forest%") | +| | TableScan: part projection=[p_partkey, p_name] | +| | SubqueryAlias: __scalar_sq_1 | +| | Projection: lineitem.l_partkey, lineitem.l_suppkey, Float64(0.5) * CAST(SUM(lineitem.l_quantity) AS Float64) AS __value | +| | Aggregate: groupBy=[[lineitem.l_partkey, lineitem.l_suppkey]], aggr=[[SUM(lineitem.l_quantity)]] | +| | Projection: lineitem.l_partkey, lineitem.l_suppkey, lineitem.l_quantity | +| | Filter: lineitem.l_shipdate >= Date32("8766") AND lineitem.l_shipdate < Date32("9131") | +| | TableScan: lineitem projection=[l_partkey, l_suppkey, l_quantity, l_shipdate] | +| physical_plan | SortPreservingMergeExec: [s_name@0 ASC NULLS LAST] | +| | SortExec: expr=[s_name@0 ASC NULLS LAST] | +| | ProjectionExec: expr=[s_name@1 as s_name, s_address@2 as s_address] | +| | CoalesceBatchesExec: target_batch_size=8192 | +| | HashJoinExec: mode=Partitioned, join_type=LeftSemi, on=[(Column { name: "s_suppkey", index: 0 }, Column { name: "ps_suppkey", index: 0 })] | +| | CoalesceBatchesExec: target_batch_size=8192 | +| | RepartitionExec: partitioning=Hash([Column { name: "s_suppkey", index: 0 }], 2), input_partitions=2 | +| | RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=2 | +| | ProjectionExec: expr=[s_suppkey@0 as s_suppkey, s_name@1 as s_name, s_address@2 as s_address] | +| | CoalesceBatchesExec: target_batch_size=8192 | +| | HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "s_nationkey", index: 3 }, Column { name: "n_nationkey", index: 0 })] | +| | CoalesceBatchesExec: target_batch_size=8192 | +| | RepartitionExec: partitioning=Hash([Column { name: "s_nationkey", index: 3 }], 2), input_partitions=0 | +| | MemoryExec: partitions=0, partition_sizes=[] | +| | CoalesceBatchesExec: target_batch_size=8192 | +| | RepartitionExec: partitioning=Hash([Column { name: "n_nationkey", index: 0 }], 2), input_partitions=2 | +| | RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=0 | +| | ProjectionExec: expr=[n_nationkey@0 as n_nationkey] | +| | CoalesceBatchesExec: target_batch_size=8192 | +| | FilterExec: n_name@1 = CANADA | +| | MemoryExec: partitions=0, partition_sizes=[] | +| | CoalesceBatchesExec: target_batch_size=8192 | +| | RepartitionExec: partitioning=Hash([Column { name: "ps_suppkey", index: 0 }], 2), input_partitions=2 | +| | ProjectionExec: expr=[ps_suppkey@1 as ps_suppkey] | +| | CoalesceBatchesExec: target_batch_size=8192 | +| | HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "ps_partkey", index: 0 }, Column { name: "l_partkey", index: 0 }), (Column { name: "ps_suppkey", index: 1 }, Column { name: "l_suppkey", index: 1 })], filter=BinaryExpr { left: CastExpr { expr: Column { name: "ps_availqty", index: 0 }, cast_type: Float64, cast_options: CastOptions { safe: false } }, op: Gt, right: Column { name: "__value", index: 1 } } | +| | CoalesceBatchesExec: target_batch_size=8192 | +| | RepartitionExec: partitioning=Hash([Column { name: "ps_partkey", index: 0 }, Column { name: "ps_suppkey", index: 1 }], 2), input_partitions=2 | +| | RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=2 | +| | CoalesceBatchesExec: target_batch_size=8192 | +| | HashJoinExec: mode=Partitioned, join_type=LeftSemi, on=[(Column { name: "ps_partkey", index: 0 }, Column { name: "p_partkey", index: 0 })] | +| | CoalesceBatchesExec: target_batch_size=8192 | +| | RepartitionExec: partitioning=Hash([Column { name: "ps_partkey", index: 0 }], 2), input_partitions=0 | +| | MemoryExec: partitions=0, partition_sizes=[] | +| | CoalesceBatchesExec: target_batch_size=8192 | +| | RepartitionExec: partitioning=Hash([Column { name: "p_partkey", index: 0 }], 2), input_partitions=2 | +| | RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=0 | +| | ProjectionExec: expr=[p_partkey@0 as p_partkey] | +| | CoalesceBatchesExec: target_batch_size=8192 | +| | FilterExec: p_name@1 LIKE forest% | +| | MemoryExec: partitions=0, partition_sizes=[] | +| | ProjectionExec: expr=[l_partkey@0 as l_partkey, l_suppkey@1 as l_suppkey, 0.5 * CAST(SUM(lineitem.l_quantity)@2 AS Float64) as __value] | +| | AggregateExec: mode=FinalPartitioned, gby=[l_partkey@0 as l_partkey, l_suppkey@1 as l_suppkey], aggr=[SUM(lineitem.l_quantity)] | +| | CoalesceBatchesExec: target_batch_size=8192 | +| | RepartitionExec: partitioning=Hash([Column { name: "l_partkey", index: 0 }, Column { name: "l_suppkey", index: 1 }], 2), input_partitions=2 | +| | AggregateExec: mode=Partial, gby=[l_partkey@0 as l_partkey, l_suppkey@1 as l_suppkey], aggr=[SUM(lineitem.l_quantity)] | +| | RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=0 | +| | ProjectionExec: expr=[l_partkey@0 as l_partkey, l_suppkey@1 as l_suppkey, l_quantity@2 as l_quantity] | +| | CoalesceBatchesExec: target_batch_size=8192 | +| | FilterExec: l_shipdate@3 >= 8766 AND l_shipdate@3 < 9131 | +| | MemoryExec: partitions=0, partition_sizes=[] | +| | | ++---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ \ No newline at end of file diff --git a/benchmarks/expected-plans/q7.txt b/benchmarks/expected-plans/q7.txt index 2f5d7fc663e9..619ac28d0a03 100644 --- a/benchmarks/expected-plans/q7.txt +++ b/benchmarks/expected-plans/q7.txt @@ -1,93 +1,88 @@ -+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -| plan_type | plan | -+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -| logical_plan | Sort: shipping.supp_nation ASC NULLS LAST, shipping.cust_nation ASC NULLS LAST, shipping.l_year ASC NULLS LAST | -| | Projection: shipping.supp_nation, shipping.cust_nation, shipping.l_year, SUM(shipping.volume) AS revenue | -| | Aggregate: groupBy=[[shipping.supp_nation, shipping.cust_nation, shipping.l_year]], aggr=[[SUM(shipping.volume)]] | -| | SubqueryAlias: shipping | -| | Projection: n1.n_name AS supp_nation, n2.n_name AS cust_nation, datepart(Utf8("YEAR"), lineitem.l_shipdate) AS l_year, CAST(lineitem.l_extendedprice AS Decimal128(38, 4)) * CAST(Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2)) AS Decimal128(38, 4)) AS volume | -| | Filter: n1.n_name = Utf8("FRANCE") AND n2.n_name = Utf8("GERMANY") OR n1.n_name = Utf8("GERMANY") AND n2.n_name = Utf8("FRANCE") | -| | Projection: lineitem.l_extendedprice, lineitem.l_discount, lineitem.l_shipdate, n1.n_name, n2.n_name | -| | Inner Join: customer.c_nationkey = n2.n_nationkey | -| | Projection: lineitem.l_extendedprice, lineitem.l_discount, lineitem.l_shipdate, customer.c_nationkey, n1.n_name | -| | Inner Join: supplier.s_nationkey = n1.n_nationkey | -| | Projection: supplier.s_nationkey, lineitem.l_extendedprice, lineitem.l_discount, lineitem.l_shipdate, customer.c_nationkey | -| | Inner Join: orders.o_custkey = customer.c_custkey | -| | Projection: supplier.s_nationkey, lineitem.l_extendedprice, lineitem.l_discount, lineitem.l_shipdate, orders.o_custkey | -| | Inner Join: lineitem.l_orderkey = orders.o_orderkey | -| | Projection: supplier.s_nationkey, lineitem.l_orderkey, lineitem.l_extendedprice, lineitem.l_discount, lineitem.l_shipdate | -| | Inner Join: supplier.s_suppkey = lineitem.l_suppkey | -| | TableScan: supplier projection=[s_suppkey, s_nationkey] | -| | Filter: lineitem.l_shipdate >= Date32("9131") AND lineitem.l_shipdate <= Date32("9861") | -| | TableScan: lineitem projection=[l_orderkey, l_suppkey, l_extendedprice, l_discount, l_shipdate] | -| | TableScan: orders projection=[o_orderkey, o_custkey] | -| | TableScan: customer projection=[c_custkey, c_nationkey] | -| | SubqueryAlias: n1 | -| | Filter: nation.n_name = Utf8("FRANCE") OR nation.n_name = Utf8("GERMANY") | -| | TableScan: nation projection=[n_nationkey, n_name] | -| | SubqueryAlias: n2 | -| | Filter: nation.n_name = Utf8("GERMANY") OR nation.n_name = Utf8("FRANCE") | -| | TableScan: nation projection=[n_nationkey, n_name] | -| physical_plan | SortPreservingMergeExec: [supp_nation@0 ASC NULLS LAST,cust_nation@1 ASC NULLS LAST,l_year@2 ASC NULLS LAST] | -| | SortExec: expr=[supp_nation@0 ASC NULLS LAST,cust_nation@1 ASC NULLS LAST,l_year@2 ASC NULLS LAST] | -| | ProjectionExec: expr=[supp_nation@0 as supp_nation, cust_nation@1 as cust_nation, l_year@2 as l_year, SUM(shipping.volume)@3 as revenue] | -| | AggregateExec: mode=FinalPartitioned, gby=[supp_nation@0 as supp_nation, cust_nation@1 as cust_nation, l_year@2 as l_year], aggr=[SUM(shipping.volume)] | -| | CoalesceBatchesExec: target_batch_size=8192 | -| | RepartitionExec: partitioning=Hash([Column { name: "supp_nation", index: 0 }, Column { name: "cust_nation", index: 1 }, Column { name: "l_year", index: 2 }], 2), input_partitions=2 | -| | AggregateExec: mode=Partial, gby=[supp_nation@0 as supp_nation, cust_nation@1 as cust_nation, l_year@2 as l_year], aggr=[SUM(shipping.volume)] | -| | ProjectionExec: expr=[n_name@3 as supp_nation, n_name@4 as cust_nation, datepart(YEAR, l_shipdate@2) as l_year, CAST(l_extendedprice@0 AS Decimal128(38, 4)) * CAST(Some(100),23,2 - CAST(l_discount@1 AS Decimal128(23, 2)) AS Decimal128(38, 4)) as volume] | -| | CoalesceBatchesExec: target_batch_size=8192 | -| | FilterExec: n_name@3 = FRANCE AND n_name@4 = GERMANY OR n_name@3 = GERMANY AND n_name@4 = FRANCE | -| | ProjectionExec: expr=[l_extendedprice@0 as l_extendedprice, l_discount@1 as l_discount, l_shipdate@2 as l_shipdate, n_name@4 as n_name, n_name@6 as n_name] | -| | CoalesceBatchesExec: target_batch_size=8192 | -| | HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "c_nationkey", index: 3 }, Column { name: "n_nationkey", index: 0 })] | -| | CoalesceBatchesExec: target_batch_size=8192 | -| | RepartitionExec: partitioning=Hash([Column { name: "c_nationkey", index: 3 }], 2), input_partitions=2 | -| | ProjectionExec: expr=[l_extendedprice@1 as l_extendedprice, l_discount@2 as l_discount, l_shipdate@3 as l_shipdate, c_nationkey@4 as c_nationkey, n_name@6 as n_name] | -| | CoalesceBatchesExec: target_batch_size=8192 | -| | HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "s_nationkey", index: 0 }, Column { name: "n_nationkey", index: 0 })] | -| | CoalesceBatchesExec: target_batch_size=8192 | -| | RepartitionExec: partitioning=Hash([Column { name: "s_nationkey", index: 0 }], 2), input_partitions=2 | -| | ProjectionExec: expr=[s_nationkey@0 as s_nationkey, l_extendedprice@1 as l_extendedprice, l_discount@2 as l_discount, l_shipdate@3 as l_shipdate, c_nationkey@6 as c_nationkey] | -| | CoalesceBatchesExec: target_batch_size=8192 | -| | HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "o_custkey", index: 4 }, Column { name: "c_custkey", index: 0 })] | -| | CoalesceBatchesExec: target_batch_size=8192 | -| | RepartitionExec: partitioning=Hash([Column { name: "o_custkey", index: 4 }], 2), input_partitions=2 | -| | ProjectionExec: expr=[s_nationkey@0 as s_nationkey, l_extendedprice@2 as l_extendedprice, l_discount@3 as l_discount, l_shipdate@4 as l_shipdate, o_custkey@6 as o_custkey] | -| | CoalesceBatchesExec: target_batch_size=8192 | -| | HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "l_orderkey", index: 1 }, Column { name: "o_orderkey", index: 0 })] | -| | CoalesceBatchesExec: target_batch_size=8192 | -| | RepartitionExec: partitioning=Hash([Column { name: "l_orderkey", index: 1 }], 2), input_partitions=2 | -| | RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=2 | -| | ProjectionExec: expr=[s_nationkey@1 as s_nationkey, l_orderkey@2 as l_orderkey, l_extendedprice@4 as l_extendedprice, l_discount@5 as l_discount, l_shipdate@6 as l_shipdate] | -| | CoalesceBatchesExec: target_batch_size=8192 | -| | HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "s_suppkey", index: 0 }, Column { name: "l_suppkey", index: 1 })] | -| | CoalesceBatchesExec: target_batch_size=8192 | -| | RepartitionExec: partitioning=Hash([Column { name: "s_suppkey", index: 0 }], 2), input_partitions=0 | -| | MemoryExec: partitions=0, partition_sizes=[] | -| | CoalesceBatchesExec: target_batch_size=8192 | -| | RepartitionExec: partitioning=Hash([Column { name: "l_suppkey", index: 1 }], 2), input_partitions=2 | -| | RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=0 | -| | CoalesceBatchesExec: target_batch_size=8192 | -| | FilterExec: l_shipdate@4 >= 9131 AND l_shipdate@4 <= 9861 | -| | MemoryExec: partitions=0, partition_sizes=[] | -| | CoalesceBatchesExec: target_batch_size=8192 | -| | RepartitionExec: partitioning=Hash([Column { name: "o_orderkey", index: 0 }], 2), input_partitions=0 | -| | MemoryExec: partitions=0, partition_sizes=[] | -| | CoalesceBatchesExec: target_batch_size=8192 | -| | RepartitionExec: partitioning=Hash([Column { name: "c_custkey", index: 0 }], 2), input_partitions=0 | -| | MemoryExec: partitions=0, partition_sizes=[] | -| | CoalesceBatchesExec: target_batch_size=8192 | -| | RepartitionExec: partitioning=Hash([Column { name: "n_nationkey", index: 0 }], 2), input_partitions=2 | -| | RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=0 | -| | CoalesceBatchesExec: target_batch_size=8192 | -| | FilterExec: n_name@1 = FRANCE OR n_name@1 = GERMANY | -| | MemoryExec: partitions=0, partition_sizes=[] | -| | CoalesceBatchesExec: target_batch_size=8192 | -| | RepartitionExec: partitioning=Hash([Column { name: "n_nationkey", index: 0 }], 2), input_partitions=2 | -| | RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=0 | -| | CoalesceBatchesExec: target_batch_size=8192 | -| | FilterExec: n_name@1 = GERMANY OR n_name@1 = FRANCE | -| | MemoryExec: partitions=0, partition_sizes=[] | -| | | -+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ \ No newline at end of file ++---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| logical_plan | Sort: shipping.supp_nation ASC NULLS LAST, shipping.cust_nation ASC NULLS LAST, shipping.l_year ASC NULLS LAST | +| | Projection: shipping.supp_nation, shipping.cust_nation, shipping.l_year, SUM(shipping.volume) AS revenue | +| | Aggregate: groupBy=[[shipping.supp_nation, shipping.cust_nation, shipping.l_year]], aggr=[[SUM(shipping.volume)]] | +| | SubqueryAlias: shipping | +| | Projection: n1.n_name AS supp_nation, n2.n_name AS cust_nation, datepart(Utf8("YEAR"), lineitem.l_shipdate) AS l_year, CAST(lineitem.l_extendedprice AS Decimal128(38, 4)) * CAST(Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2)) AS Decimal128(38, 4)) AS volume | +| | Inner Join: customer.c_nationkey = n2.n_nationkey Filter: n1.n_name = Utf8("FRANCE") AND n2.n_name = Utf8("GERMANY") OR n1.n_name = Utf8("GERMANY") AND n2.n_name = Utf8("FRANCE") | +| | Projection: lineitem.l_extendedprice, lineitem.l_discount, lineitem.l_shipdate, customer.c_nationkey, n1.n_name | +| | Inner Join: supplier.s_nationkey = n1.n_nationkey | +| | Projection: supplier.s_nationkey, lineitem.l_extendedprice, lineitem.l_discount, lineitem.l_shipdate, customer.c_nationkey | +| | Inner Join: orders.o_custkey = customer.c_custkey | +| | Projection: supplier.s_nationkey, lineitem.l_extendedprice, lineitem.l_discount, lineitem.l_shipdate, orders.o_custkey | +| | Inner Join: lineitem.l_orderkey = orders.o_orderkey | +| | Projection: supplier.s_nationkey, lineitem.l_orderkey, lineitem.l_extendedprice, lineitem.l_discount, lineitem.l_shipdate | +| | Inner Join: supplier.s_suppkey = lineitem.l_suppkey | +| | TableScan: supplier projection=[s_suppkey, s_nationkey] | +| | Filter: lineitem.l_shipdate >= Date32("9131") AND lineitem.l_shipdate <= Date32("9861") | +| | TableScan: lineitem projection=[l_orderkey, l_suppkey, l_extendedprice, l_discount, l_shipdate] | +| | TableScan: orders projection=[o_orderkey, o_custkey] | +| | TableScan: customer projection=[c_custkey, c_nationkey] | +| | SubqueryAlias: n1 | +| | Filter: nation.n_name = Utf8("FRANCE") OR nation.n_name = Utf8("GERMANY") | +| | TableScan: nation projection=[n_nationkey, n_name] | +| | SubqueryAlias: n2 | +| | Filter: nation.n_name = Utf8("GERMANY") OR nation.n_name = Utf8("FRANCE") | +| | TableScan: nation projection=[n_nationkey, n_name] | +| physical_plan | SortPreservingMergeExec: [supp_nation@0 ASC NULLS LAST,cust_nation@1 ASC NULLS LAST,l_year@2 ASC NULLS LAST] | +| | SortExec: expr=[supp_nation@0 ASC NULLS LAST,cust_nation@1 ASC NULLS LAST,l_year@2 ASC NULLS LAST] | +| | ProjectionExec: expr=[supp_nation@0 as supp_nation, cust_nation@1 as cust_nation, l_year@2 as l_year, SUM(shipping.volume)@3 as revenue] | +| | AggregateExec: mode=FinalPartitioned, gby=[supp_nation@0 as supp_nation, cust_nation@1 as cust_nation, l_year@2 as l_year], aggr=[SUM(shipping.volume)] | +| | CoalesceBatchesExec: target_batch_size=8192 | +| | RepartitionExec: partitioning=Hash([Column { name: "supp_nation", index: 0 }, Column { name: "cust_nation", index: 1 }, Column { name: "l_year", index: 2 }], 2), input_partitions=2 | +| | AggregateExec: mode=Partial, gby=[supp_nation@0 as supp_nation, cust_nation@1 as cust_nation, l_year@2 as l_year], aggr=[SUM(shipping.volume)] | +| | ProjectionExec: expr=[n_name@4 as supp_nation, n_name@6 as cust_nation, datepart(YEAR, l_shipdate@2) as l_year, CAST(l_extendedprice@0 AS Decimal128(38, 4)) * CAST(Some(100),23,2 - CAST(l_discount@1 AS Decimal128(23, 2)) AS Decimal128(38, 4)) as volume] | +| | CoalesceBatchesExec: target_batch_size=8192 | +| | HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "c_nationkey", index: 3 }, Column { name: "n_nationkey", index: 0 })], filter=BinaryExpr { left: BinaryExpr { left: BinaryExpr { left: Column { name: "n_name", index: 0 }, op: Eq, right: Literal { value: Utf8("FRANCE") } }, op: And, right: BinaryExpr { left: Column { name: "n_name", index: 1 }, op: Eq, right: Literal { value: Utf8("GERMANY") } } }, op: Or, right: BinaryExpr { left: BinaryExpr { left: Column { name: "n_name", index: 0 }, op: Eq, right: Literal { value: Utf8("GERMANY") } }, op: And, right: BinaryExpr { left: Column { name: "n_name", index: 1 }, op: Eq, right: Literal { value: Utf8("FRANCE") } } } } | +| | CoalesceBatchesExec: target_batch_size=8192 | +| | RepartitionExec: partitioning=Hash([Column { name: "c_nationkey", index: 3 }], 2), input_partitions=2 | +| | ProjectionExec: expr=[l_extendedprice@1 as l_extendedprice, l_discount@2 as l_discount, l_shipdate@3 as l_shipdate, c_nationkey@4 as c_nationkey, n_name@6 as n_name] | +| | CoalesceBatchesExec: target_batch_size=8192 | +| | HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "s_nationkey", index: 0 }, Column { name: "n_nationkey", index: 0 })] | +| | CoalesceBatchesExec: target_batch_size=8192 | +| | RepartitionExec: partitioning=Hash([Column { name: "s_nationkey", index: 0 }], 2), input_partitions=2 | +| | ProjectionExec: expr=[s_nationkey@0 as s_nationkey, l_extendedprice@1 as l_extendedprice, l_discount@2 as l_discount, l_shipdate@3 as l_shipdate, c_nationkey@6 as c_nationkey] | +| | CoalesceBatchesExec: target_batch_size=8192 | +| | HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "o_custkey", index: 4 }, Column { name: "c_custkey", index: 0 })] | +| | CoalesceBatchesExec: target_batch_size=8192 | +| | RepartitionExec: partitioning=Hash([Column { name: "o_custkey", index: 4 }], 2), input_partitions=2 | +| | ProjectionExec: expr=[s_nationkey@0 as s_nationkey, l_extendedprice@2 as l_extendedprice, l_discount@3 as l_discount, l_shipdate@4 as l_shipdate, o_custkey@6 as o_custkey] | +| | CoalesceBatchesExec: target_batch_size=8192 | +| | HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "l_orderkey", index: 1 }, Column { name: "o_orderkey", index: 0 })] | +| | CoalesceBatchesExec: target_batch_size=8192 | +| | RepartitionExec: partitioning=Hash([Column { name: "l_orderkey", index: 1 }], 2), input_partitions=2 | +| | RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=2 | +| | ProjectionExec: expr=[s_nationkey@1 as s_nationkey, l_orderkey@2 as l_orderkey, l_extendedprice@4 as l_extendedprice, l_discount@5 as l_discount, l_shipdate@6 as l_shipdate] | +| | CoalesceBatchesExec: target_batch_size=8192 | +| | HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "s_suppkey", index: 0 }, Column { name: "l_suppkey", index: 1 })] | +| | CoalesceBatchesExec: target_batch_size=8192 | +| | RepartitionExec: partitioning=Hash([Column { name: "s_suppkey", index: 0 }], 2), input_partitions=0 | +| | MemoryExec: partitions=0, partition_sizes=[] | +| | CoalesceBatchesExec: target_batch_size=8192 | +| | RepartitionExec: partitioning=Hash([Column { name: "l_suppkey", index: 1 }], 2), input_partitions=2 | +| | RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=0 | +| | CoalesceBatchesExec: target_batch_size=8192 | +| | FilterExec: l_shipdate@4 >= 9131 AND l_shipdate@4 <= 9861 | +| | MemoryExec: partitions=0, partition_sizes=[] | +| | CoalesceBatchesExec: target_batch_size=8192 | +| | RepartitionExec: partitioning=Hash([Column { name: "o_orderkey", index: 0 }], 2), input_partitions=0 | +| | MemoryExec: partitions=0, partition_sizes=[] | +| | CoalesceBatchesExec: target_batch_size=8192 | +| | RepartitionExec: partitioning=Hash([Column { name: "c_custkey", index: 0 }], 2), input_partitions=0 | +| | MemoryExec: partitions=0, partition_sizes=[] | +| | CoalesceBatchesExec: target_batch_size=8192 | +| | RepartitionExec: partitioning=Hash([Column { name: "n_nationkey", index: 0 }], 2), input_partitions=2 | +| | RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=0 | +| | CoalesceBatchesExec: target_batch_size=8192 | +| | FilterExec: n_name@1 = FRANCE OR n_name@1 = GERMANY | +| | MemoryExec: partitions=0, partition_sizes=[] | +| | CoalesceBatchesExec: target_batch_size=8192 | +| | RepartitionExec: partitioning=Hash([Column { name: "n_nationkey", index: 0 }], 2), input_partitions=2 | +| | RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=0 | +| | CoalesceBatchesExec: target_batch_size=8192 | +| | FilterExec: n_name@1 = GERMANY OR n_name@1 = FRANCE | +| | MemoryExec: partitions=0, partition_sizes=[] | +| | | ++---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ \ No newline at end of file diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs index 4b21a9bd735e..81a9ebceab27 100644 --- a/datafusion/core/src/physical_plan/planner.rs +++ b/datafusion/core/src/physical_plan/planner.rs @@ -923,21 +923,21 @@ impl DefaultPhysicalPlanner { let join_filter = match filter { Some(expr) => { - // Extract columns from filter expression + // Extract columns from filter expression and saved in a HashSet let cols = expr.to_columns()?; - // Collect left & right field indices + // Collect left & right field indices, the field indices are sorted in ascending order let left_field_indices = cols.iter() .filter_map(|c| match left_df_schema.index_of_column(c) { Ok(idx) => Some(idx), _ => None, - }) + }).sorted() .collect::>(); let right_field_indices = cols.iter() .filter_map(|c| match right_df_schema.index_of_column(c) { Ok(idx) => Some(idx), _ => None, - }) + }).sorted() .collect::>(); // Collect DFFields and Fields required for intermediate schemas @@ -957,7 +957,6 @@ impl DefaultPhysicalPlanner { ) .unzip(); - // Construct intermediate schemas used for filtering data and // convert logical expression to physical according to filter schema let filter_df_schema = DFSchema::new_with_metadata(filter_df_fields, HashMap::new())?; diff --git a/datafusion/core/tests/sql/joins.rs b/datafusion/core/tests/sql/joins.rs index dc6f741c5b47..245f257adbd2 100644 --- a/datafusion/core/tests/sql/joins.rs +++ b/datafusion/core/tests/sql/joins.rs @@ -1103,12 +1103,11 @@ async fn reduce_left_join_2() -> Result<()> { // the right part `(t2.t2_name != 'w' or t2.t2_int < 10)` could be push down left join side and remove in filter. let expected = vec![ - "Explain [plan_type:Utf8, plan:Utf8]", - " Filter: t2.t2_int < UInt32(10) OR t1.t1_int > UInt32(2) AND t2.t2_name != Utf8(\"w\") [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]", - " Inner Join: t1.t1_id = t2.t2_id [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]", - " TableScan: t1 projection=[t1_id, t1_name, t1_int] [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]", - " Filter: t2.t2_int < UInt32(10) OR t2.t2_name != Utf8(\"w\") [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]", - " TableScan: t2 projection=[t2_id, t2_name, t2_int] [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]", + "Explain [plan_type:Utf8, plan:Utf8]", + " Inner Join: t1.t1_id = t2.t2_id Filter: t2.t2_int < UInt32(10) OR t1.t1_int > UInt32(2) AND t2.t2_name != Utf8(\"w\") [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]", + " TableScan: t1 projection=[t1_id, t1_name, t1_int] [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]", + " Filter: t2.t2_int < UInt32(10) OR t2.t2_name != Utf8(\"w\") [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]", + " TableScan: t2 projection=[t2_id, t2_name, t2_int] [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]", ]; let formatted = plan.display_indent_schema().to_string(); let actual: Vec<&str> = formatted.trim().lines().collect(); @@ -1188,11 +1187,10 @@ async fn reduce_right_join_2() -> Result<()> { let dataframe = ctx.sql(&("explain ".to_owned() + sql)).await.expect(&msg); let plan = dataframe.into_optimized_plan()?; let expected = vec![ - "Explain [plan_type:Utf8, plan:Utf8]", - " Filter: t1.t1_int != t2.t2_int [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]", - " Inner Join: t1.t1_id = t2.t2_id [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]", - " TableScan: t1 projection=[t1_id, t1_name, t1_int] [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]", - " TableScan: t2 projection=[t2_id, t2_name, t2_int] [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]", + "Explain [plan_type:Utf8, plan:Utf8]", + " Inner Join: t1.t1_id = t2.t2_id Filter: t1.t1_int != t2.t2_int [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]", + " TableScan: t1 projection=[t1_id, t1_name, t1_int] [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]", + " TableScan: t2 projection=[t2_id, t2_name, t2_int] [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]", ]; let formatted = plan.display_indent_schema().to_string(); let actual: Vec<&str> = formatted.trim().lines().collect(); diff --git a/datafusion/core/tests/sql/predicates.rs b/datafusion/core/tests/sql/predicates.rs index cf8bf998e070..498952a808c4 100644 --- a/datafusion/core/tests/sql/predicates.rs +++ b/datafusion/core/tests/sql/predicates.rs @@ -100,13 +100,11 @@ async fn multiple_or_predicates() -> Result<()> { let expected = vec![ "Explain [plan_type:Utf8, plan:Utf8]", " Projection: lineitem.l_partkey [l_partkey:Int64]", - " Filter: part.p_brand = Utf8(\"Brand#12\") AND lineitem.l_quantity >= Decimal128(Some(100),15,2) AND lineitem.l_quantity <= Decimal128(Some(1100),15,2) AND part.p_size <= Int32(5) OR part.p_brand = Utf8(\"Brand#23\") AND lineitem.l_quantity >= Decimal128(Some(1000),15,2) AND lineitem.l_quantity <= Decimal128(Some(2000),15,2) AND part.p_size <= Int32(10) OR part.p_brand = Utf8(\"Brand#34\") AND lineitem.l_quantity >= Decimal128(Some(2000),15,2) AND lineitem.l_quantity <= Decimal128(Some(3000),15,2) AND part.p_size <= Int32(15) [l_partkey:Int64, l_quantity:Decimal128(15, 2), p_brand:Utf8, p_size:Int32]", - " Projection: lineitem.l_partkey, lineitem.l_quantity, part.p_brand, part.p_size [l_partkey:Int64, l_quantity:Decimal128(15, 2), p_brand:Utf8, p_size:Int32]", - " Inner Join: lineitem.l_partkey = part.p_partkey [l_partkey:Int64, l_quantity:Decimal128(15, 2), p_partkey:Int64, p_brand:Utf8, p_size:Int32]", - " Filter: lineitem.l_quantity >= Decimal128(Some(100),15,2) AND lineitem.l_quantity <= Decimal128(Some(1100),15,2) OR lineitem.l_quantity >= Decimal128(Some(1000),15,2) AND lineitem.l_quantity <= Decimal128(Some(2000),15,2) OR lineitem.l_quantity >= Decimal128(Some(2000),15,2) AND lineitem.l_quantity <= Decimal128(Some(3000),15,2) [l_partkey:Int64, l_quantity:Decimal128(15, 2)]", - " TableScan: lineitem projection=[l_partkey, l_quantity], partial_filters=[lineitem.l_quantity >= Decimal128(Some(100),15,2) AND lineitem.l_quantity <= Decimal128(Some(1100),15,2) OR lineitem.l_quantity >= Decimal128(Some(1000),15,2) AND lineitem.l_quantity <= Decimal128(Some(2000),15,2) OR lineitem.l_quantity >= Decimal128(Some(2000),15,2) AND lineitem.l_quantity <= Decimal128(Some(3000),15,2)] [l_partkey:Int64, l_quantity:Decimal128(15, 2)]", - " Filter: (part.p_brand = Utf8(\"Brand#12\") AND part.p_size <= Int32(5) OR part.p_brand = Utf8(\"Brand#23\") AND part.p_size <= Int32(10) OR part.p_brand = Utf8(\"Brand#34\") AND part.p_size <= Int32(15)) AND part.p_size >= Int32(1) [p_partkey:Int64, p_brand:Utf8, p_size:Int32]", - " TableScan: part projection=[p_partkey, p_brand, p_size], partial_filters=[part.p_size >= Int32(1), part.p_brand = Utf8(\"Brand#12\") AND part.p_size <= Int32(5) OR part.p_brand = Utf8(\"Brand#23\") AND part.p_size <= Int32(10) OR part.p_brand = Utf8(\"Brand#34\") AND part.p_size <= Int32(15)] [p_partkey:Int64, p_brand:Utf8, p_size:Int32]", + " Inner Join: lineitem.l_partkey = part.p_partkey Filter: part.p_brand = Utf8(\"Brand#12\") AND lineitem.l_quantity >= Decimal128(Some(100),15,2) AND lineitem.l_quantity <= Decimal128(Some(1100),15,2) AND part.p_size <= Int32(5) OR part.p_brand = Utf8(\"Brand#23\") AND lineitem.l_quantity >= Decimal128(Some(1000),15,2) AND lineitem.l_quantity <= Decimal128(Some(2000),15,2) AND part.p_size <= Int32(10) OR part.p_brand = Utf8(\"Brand#34\") AND lineitem.l_quantity >= Decimal128(Some(2000),15,2) AND lineitem.l_quantity <= Decimal128(Some(3000),15,2) AND part.p_size <= Int32(15) [l_partkey:Int64, l_quantity:Decimal128(15, 2), p_partkey:Int64, p_brand:Utf8, p_size:Int32]", + " Filter: lineitem.l_quantity >= Decimal128(Some(100),15,2) AND lineitem.l_quantity <= Decimal128(Some(1100),15,2) OR lineitem.l_quantity >= Decimal128(Some(1000),15,2) AND lineitem.l_quantity <= Decimal128(Some(2000),15,2) OR lineitem.l_quantity >= Decimal128(Some(2000),15,2) AND lineitem.l_quantity <= Decimal128(Some(3000),15,2) [l_partkey:Int64, l_quantity:Decimal128(15, 2)]", + " TableScan: lineitem projection=[l_partkey, l_quantity], partial_filters=[lineitem.l_quantity >= Decimal128(Some(100),15,2) AND lineitem.l_quantity <= Decimal128(Some(1100),15,2) OR lineitem.l_quantity >= Decimal128(Some(1000),15,2) AND lineitem.l_quantity <= Decimal128(Some(2000),15,2) OR lineitem.l_quantity >= Decimal128(Some(2000),15,2) AND lineitem.l_quantity <= Decimal128(Some(3000),15,2)] [l_partkey:Int64, l_quantity:Decimal128(15, 2)]", + " Filter: (part.p_brand = Utf8(\"Brand#12\") AND part.p_size <= Int32(5) OR part.p_brand = Utf8(\"Brand#23\") AND part.p_size <= Int32(10) OR part.p_brand = Utf8(\"Brand#34\") AND part.p_size <= Int32(15)) AND part.p_size >= Int32(1) [p_partkey:Int64, p_brand:Utf8, p_size:Int32]", + " TableScan: part projection=[p_partkey, p_brand, p_size], partial_filters=[part.p_size >= Int32(1), part.p_brand = Utf8(\"Brand#12\") AND part.p_size <= Int32(5) OR part.p_brand = Utf8(\"Brand#23\") AND part.p_size <= Int32(10) OR part.p_brand = Utf8(\"Brand#34\") AND part.p_size <= Int32(15)] [p_partkey:Int64, p_brand:Utf8, p_size:Int32]", ]; let formatted = plan.display_indent_schema().to_string(); let actual: Vec<&str> = formatted.trim().lines().collect(); diff --git a/datafusion/core/tests/sql/subqueries.rs b/datafusion/core/tests/sql/subqueries.rs index 6711384d17b8..6bdfbb7adf83 100644 --- a/datafusion/core/tests/sql/subqueries.rs +++ b/datafusion/core/tests/sql/subqueries.rs @@ -52,22 +52,18 @@ where c_acctbal < ( let actual = format!("{}", plan.display_indent()); let expected = "Sort: customer.c_custkey ASC NULLS LAST\ \n Projection: customer.c_custkey\ - \n Filter: CAST(customer.c_acctbal AS Decimal128(25, 2)) < __scalar_sq_1.__value\ - \n Projection: customer.c_custkey, customer.c_acctbal, __scalar_sq_1.__value\ - \n Inner Join: customer.c_custkey = __scalar_sq_1.o_custkey\ - \n TableScan: customer projection=[c_custkey, c_acctbal]\ - \n SubqueryAlias: __scalar_sq_1\ - \n Projection: orders.o_custkey, SUM(orders.o_totalprice) AS __value\ - \n Aggregate: groupBy=[[orders.o_custkey]], aggr=[[SUM(orders.o_totalprice)]]\ - \n Projection: orders.o_custkey, orders.o_totalprice\ - \n Filter: CAST(orders.o_totalprice AS Decimal128(25, 2)) < __scalar_sq_2.__value\ - \n Projection: orders.o_custkey, orders.o_totalprice, __scalar_sq_2.__value\ - \n Inner Join: orders.o_orderkey = __scalar_sq_2.l_orderkey\ - \n TableScan: orders projection=[o_orderkey, o_custkey, o_totalprice]\ - \n SubqueryAlias: __scalar_sq_2\ - \n Projection: lineitem.l_orderkey, SUM(lineitem.l_extendedprice) AS price AS __value\ - \n Aggregate: groupBy=[[lineitem.l_orderkey]], aggr=[[SUM(lineitem.l_extendedprice)]]\ - \n TableScan: lineitem projection=[l_orderkey, l_extendedprice]"; + \n Inner Join: customer.c_custkey = __scalar_sq_1.o_custkey Filter: CAST(customer.c_acctbal AS Decimal128(25, 2)) < __scalar_sq_1.__value\ + \n TableScan: customer projection=[c_custkey, c_acctbal]\ + \n SubqueryAlias: __scalar_sq_1\ + \n Projection: orders.o_custkey, SUM(orders.o_totalprice) AS __value\ + \n Aggregate: groupBy=[[orders.o_custkey]], aggr=[[SUM(orders.o_totalprice)]]\ + \n Projection: orders.o_custkey, orders.o_totalprice\ + \n Inner Join: orders.o_orderkey = __scalar_sq_2.l_orderkey Filter: CAST(orders.o_totalprice AS Decimal128(25, 2)) < __scalar_sq_2.__value\ + \n TableScan: orders projection=[o_orderkey, o_custkey, o_totalprice]\ + \n SubqueryAlias: __scalar_sq_2\ + \n Projection: lineitem.l_orderkey, SUM(lineitem.l_extendedprice) AS price AS __value\ + \n Aggregate: groupBy=[[lineitem.l_orderkey]], aggr=[[SUM(lineitem.l_extendedprice)]]\ + \n TableScan: lineitem projection=[l_orderkey, l_extendedprice]"; assert_eq!(actual, expected); Ok(()) diff --git a/datafusion/optimizer/src/push_down_filter.rs b/datafusion/optimizer/src/push_down_filter.rs index 2a78551ea131..1c0b4b07e57b 100644 --- a/datafusion/optimizer/src/push_down_filter.rs +++ b/datafusion/optimizer/src/push_down_filter.rs @@ -17,7 +17,7 @@ use crate::optimizer::ApplyOrder; use crate::utils::{conjunction, split_conjunction}; use crate::{utils, OptimizerConfig, OptimizerRule}; -use datafusion_common::tree_node::{Transformed, TreeNode}; +use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion}; use datafusion_common::{Column, DFSchema, DataFusionError, Result}; use datafusion_expr::{ and, @@ -29,7 +29,6 @@ use datafusion_expr::{ }; use itertools::Itertools; use std::collections::{HashMap, HashSet}; -use std::iter::once; use std::sync::Arc; /// Push Down Filter optimizer rule pushes filter clauses down the plan @@ -149,6 +148,57 @@ fn can_pushdown_join_predicate(predicate: &Expr, schema: &DFSchema) -> Result Result { + let mut is_evaluate = true; + predicate.apply(&mut |expr| match expr { + Expr::Column(_) + | Expr::Literal(_) + | Expr::Placeholder { .. } + | Expr::ScalarVariable(_, _) => Ok(VisitRecursion::Skip), + Expr::Exists { .. } + | Expr::InSubquery { .. } + | Expr::ScalarSubquery(_) + | Expr::OuterReferenceColumn(_, _) + | Expr::ScalarUDF { .. } => { + is_evaluate = false; + Ok(VisitRecursion::Stop) + } + Expr::Alias(_, _) + | Expr::BinaryExpr(_) + | Expr::Like(_) + | Expr::ILike(_) + | Expr::SimilarTo(_) + | Expr::Not(_) + | Expr::IsNotNull(_) + | Expr::IsNull(_) + | Expr::IsTrue(_) + | Expr::IsFalse(_) + | Expr::IsUnknown(_) + | Expr::IsNotTrue(_) + | Expr::IsNotFalse(_) + | Expr::IsNotUnknown(_) + | Expr::Negative(_) + | Expr::GetIndexedField(_) + | Expr::Between(_) + | Expr::Case(_) + | Expr::Cast(_) + | Expr::TryCast(_) + | Expr::ScalarFunction { .. } + | Expr::InList { .. } => Ok(VisitRecursion::Continue), + Expr::Sort(_) + | Expr::AggregateFunction(_) + | Expr::WindowFunction(_) + | Expr::AggregateUDF { .. } + | Expr::Wildcard + | Expr::QualifiedWildcard { .. } + | Expr::GroupingSet(_) => Err(DataFusionError::Internal( + "Unsupported predicate type".to_string(), + )), + })?; + Ok(is_evaluate) +} + // examine OR clause to see if any useful clauses can be extracted and push down. // extract at least one qual from each sub clauses of OR clause, then form the quals // to new OR clause as predicate. @@ -295,18 +345,25 @@ fn extract_or_clause(expr: &Expr, schema_columns: &HashSet) -> Option, - plan: &LogicalPlan, + infer_predicates: Vec, + join_plan: &LogicalPlan, left: &LogicalPlan, right: &LogicalPlan, on_filter: Vec, + is_inner_join: bool, ) -> Result { let on_filter_empty = on_filter.is_empty(); // Get pushable predicates from current optimizer state - let (left_preserved, right_preserved) = lr_is_preserved(plan)?; + let (left_preserved, right_preserved) = lr_is_preserved(join_plan)?; + + // The predicates can be divided to three categories: + // 1) can push through join to its children(left or right) + // 2) can be converted to join conditions if the join type is Inner + // 3) should be kept as filter conditions let mut left_push = vec![]; let mut right_push = vec![]; - let mut keep_predicates = vec![]; + let mut join_conditions = vec![]; for predicate in predicates { if left_preserved && can_pushdown_join_predicate(&predicate, left.schema())? { left_push.push(predicate); @@ -314,14 +371,28 @@ fn push_down_all_join( && can_pushdown_join_predicate(&predicate, right.schema())? { right_push.push(predicate); + } else if is_inner_join && can_evaluate_as_join_condition(&predicate)? { + // Here we do not differ it is eq or non-eq predicate, ExtractEquijoinPredicate will extract the eq predicate + // and convert to the join on condition + join_conditions.push(predicate); } else { keep_predicates.push(predicate); } } - let mut keep_condition = vec![]; + // For infer predicates, if they can not push through join, just drop them + for predicate in infer_predicates { + if left_preserved && can_pushdown_join_predicate(&predicate, left.schema())? { + left_push.push(predicate); + } else if right_preserved + && can_pushdown_join_predicate(&predicate, right.schema())? + { + right_push.push(predicate); + } + } + if !on_filter.is_empty() { - let (on_left_preserved, on_right_preserved) = on_lr_is_preserved(plan)?; + let (on_left_preserved, on_right_preserved) = on_lr_is_preserved(join_plan)?; for on in on_filter { if on_left_preserved && can_pushdown_join_predicate(&on, left.schema())? { left_push.push(on) @@ -330,7 +401,7 @@ fn push_down_all_join( { right_push.push(on) } else { - keep_condition.push(on) + join_conditions.push(on) } } } @@ -348,12 +419,12 @@ fn push_down_all_join( right_preserved, ); let on_or_to_left = extract_or_clauses_for_join( - &keep_condition.iter().collect::>(), + &join_conditions.iter().collect::>(), left.schema(), left_preserved, ); let on_or_to_right = extract_or_clauses_for_join( - &keep_condition.iter().collect::>(), + &join_conditions.iter().collect::>(), right.schema(), right_preserved, ); @@ -383,21 +454,16 @@ fn push_down_all_join( // it always will be the last element, otherwise result // vector will contain only join keys (without additional // element representing filter). - let expr = plan.expressions(); - let expr = if !on_filter_empty && keep_condition.is_empty() { - // New filter expression is None - should remove last element + let expr = join_plan.expressions(); + let mut new_exprs = if !on_filter_empty { expr[..expr.len() - 1].to_vec() - } else if !keep_condition.is_empty() { - // Replace last element with new filter expression - expr[..expr.len() - 1] - .iter() - .cloned() - .chain(once(keep_condition.into_iter().reduce(Expr::and).unwrap())) - .collect() } else { expr }; - let plan = from_plan(plan, &expr, &[left, right])?; + if !join_conditions.is_empty() { + new_exprs.push(join_conditions.into_iter().reduce(Expr::and).unwrap()); + } + let plan = from_plan(join_plan, &new_exprs, &[left, right])?; if keep_predicates.is_empty() { Ok(plan) @@ -418,7 +484,7 @@ fn push_down_join( join: &Join, parent_predicate: Option<&Expr>, ) -> Result> { - let mut predicates = match parent_predicate { + let predicates = match parent_predicate { Some(parent_predicate) => { utils::split_conjunction_owned(parent_predicate.clone()) } @@ -432,7 +498,10 @@ fn push_down_join( .map(|e| utils::split_conjunction_owned(e.clone())) .unwrap_or_else(Vec::new); - if join.join_type == JoinType::Inner { + let mut is_inner_join = false; + let infer_predicates = if join.join_type == JoinType::Inner { + is_inner_join = true; + // TODO refine the logic, introduce EquivalenceProperties to logical plan and infer additional filters to push down // For inner joins, duplicate filters for joined columns so filters can be pushed down // to both sides. Take the following query as an example: // @@ -446,7 +515,7 @@ fn push_down_join( // Join clauses with `Using` constraints also take advantage of this logic to make sure // predicates reference the shared join columns are pushed to both sides. // This logic should also been applied to conditions in JOIN ON clause - let join_side_filters = predicates + predicates .iter() .chain(on_filters.iter()) .filter_map(|predicate| { @@ -492,18 +561,22 @@ fn push_down_join( Some(Ok(join_side_predicate)) }) - .collect::>>()?; - predicates.extend(join_side_filters); - } - if on_filters.is_empty() && predicates.is_empty() { + .collect::>>()? + } else { + vec![] + }; + + if on_filters.is_empty() && predicates.is_empty() && infer_predicates.is_empty() { return Ok(None); } Ok(Some(push_down_all_join( predicates, + infer_predicates, plan, &join.left, &join.right, on_filters, + is_inner_join, )?)) } @@ -693,7 +766,15 @@ impl OptimizerRule for PushDownFilter { } LogicalPlan::CrossJoin(CrossJoin { left, right, .. }) => { let predicates = utils::split_conjunction_owned(filter.predicate.clone()); - push_down_all_join(predicates, &filter.input, left, right, vec![])? + push_down_all_join( + predicates, + vec![], + &filter.input, + left, + right, + vec![], + false, + )? } LogicalPlan::TableScan(scan) => { let filter_predicates = split_conjunction(&filter.predicate); @@ -1554,7 +1635,7 @@ mod tests { assert_optimized_plan_eq(&plan, expected) } - /// post-join predicates with columns from both sides are not pushed + /// post-join predicates with columns from both sides are converted to join filterss #[test] fn filter_join_on_common_dependent() -> Result<()> { let table_scan = test_table_scan()?; @@ -1572,7 +1653,6 @@ mod tests { (vec![Column::from_name("a")], vec![Column::from_name("a")]), None, )? - // "b" and "c" are not shared by either side: they are only available together after the join .filter(col("c").lt_eq(col("b")))? .build()?; @@ -1587,8 +1667,13 @@ mod tests { \n TableScan: test2" ); - // expected is equal: no push-down - let expected = &format!("{plan:?}"); + // Filter is converted to Join Filter + let expected = "\ + Inner Join: test.a = test2.a Filter: test.c <= test2.b\ + \n Projection: test.a, test.c\ + \n TableScan: test\ + \n Projection: test2.a, test2.b\ + \n TableScan: test2"; assert_optimized_plan_eq(&plan, expected) }