diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHSuite.scala index 943d5d32caad7..6de5ad37d7769 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHSuite.scala @@ -366,7 +366,7 @@ class GlutenClickHouseTPCHSuite extends GlutenClickHouseTPCHAbstractSuite { val sortExec = df.queryExecution.executedPlan.collect { case sortExec: SortExecTransformer => sortExec } - assert(sortExec.size == 1) + assert(sortExec.size == 2) val result = df.collect() val expectedResult = Seq(Row(0), Row(1), Row(2), Row(3), Row(4)) TestUtils.compareAnswers(result, expectedResult) diff --git a/backends-velox/src/test/resources/tpch-approved-plan/v1-bhj-ras/spark32/18.txt b/backends-velox/src/test/resources/tpch-approved-plan/v1-bhj-ras/spark32/18.txt index b14dd9af62978..d158d859aed91 100644 --- a/backends-velox/src/test/resources/tpch-approved-plan/v1-bhj-ras/spark32/18.txt +++ b/backends-velox/src/test/resources/tpch-approved-plan/v1-bhj-ras/spark32/18.txt @@ -1,86 +1,91 @@ == Physical Plan == -AdaptiveSparkPlan (86) +AdaptiveSparkPlan (92) +- == Final Plan == - TakeOrderedAndProject (53) - +- VeloxColumnarToRowExec (52) - +- ^ RegularHashAggregateExecTransformer (50) - +- ^ InputIteratorTransformer (49) - +- ^ InputAdapter (48) - +- ^ ShuffleQueryStage (47) - +- ColumnarExchange (46) - +- ^ ProjectExecTransformer (44) - +- ^ FlushableHashAggregateExecTransformer (43) - +- ^ ProjectExecTransformer (42) - +- ^ BroadcastHashJoinExecTransformer Inner (41) - :- ^ ProjectExecTransformer (28) - : +- ^ BroadcastHashJoinExecTransformer Inner (27) - : :- ^ InputIteratorTransformer (7) - : : +- ^ InputAdapter (6) - : : +- ^ BroadcastQueryStage (5) - : : +- ColumnarBroadcastExchange (4) - : : +- ^ NoopFilter (2) - : : +- ^ Scan parquet (1) - : +- ^ BroadcastHashJoinExecTransformer LeftSemi (26) - : :- ^ NoopFilter (9) - : : +- ^ Scan parquet (8) - : +- ^ InputIteratorTransformer (25) - : +- ^ InputAdapter (24) - : +- ^ BroadcastQueryStage (23) - : +- ColumnarBroadcastExchange (22) - : +- ^ ProjectExecTransformer (20) - : +- ^ FilterExecTransformer (19) - : +- ^ RegularHashAggregateExecTransformer (18) - : +- ^ InputIteratorTransformer (17) - : +- ^ InputAdapter (16) - : +- ^ ShuffleQueryStage (15) - : +- ColumnarExchange (14) - : +- ^ ProjectExecTransformer (12) - : +- ^ FlushableHashAggregateExecTransformer (11) - : +- ^ Scan parquet (10) - +- ^ InputIteratorTransformer (40) - +- ^ InputAdapter (39) - +- ^ BroadcastQueryStage (38) - +- ColumnarBroadcastExchange (37) - +- ^ BroadcastHashJoinExecTransformer LeftSemi (35) - :- ^ NoopFilter (30) - : +- ^ Scan parquet (29) - +- ^ InputIteratorTransformer (34) - +- ^ InputAdapter (33) - +- ^ BroadcastQueryStage (32) - +- ReusedExchange (31) + VeloxColumnarToRowExec (59) + +- ^ ProjectExecTransformer (57) + +- ^ TopNTransformer (56) + +- ^ InputIteratorTransformer (55) + +- ^ InputAdapter (54) + +- ^ ColumnarExchange (53) + +- ^ TopNTransformer (51) + +- ^ RegularHashAggregateExecTransformer (50) + +- ^ InputIteratorTransformer (49) + +- ^ InputAdapter (48) + +- ^ ShuffleQueryStage (47) + +- ColumnarExchange (46) + +- ^ ProjectExecTransformer (44) + +- ^ FlushableHashAggregateExecTransformer (43) + +- ^ ProjectExecTransformer (42) + +- ^ BroadcastHashJoinExecTransformer Inner (41) + :- ^ ProjectExecTransformer (28) + : +- ^ BroadcastHashJoinExecTransformer Inner (27) + : :- ^ InputIteratorTransformer (7) + : : +- ^ InputAdapter (6) + : : +- ^ BroadcastQueryStage (5) + : : +- ColumnarBroadcastExchange (4) + : : +- ^ NoopFilter (2) + : : +- ^ Scan parquet (1) + : +- ^ BroadcastHashJoinExecTransformer LeftSemi (26) + : :- ^ NoopFilter (9) + : : +- ^ Scan parquet (8) + : +- ^ InputIteratorTransformer (25) + : +- ^ InputAdapter (24) + : +- ^ BroadcastQueryStage (23) + : +- ColumnarBroadcastExchange (22) + : +- ^ ProjectExecTransformer (20) + : +- ^ FilterExecTransformer (19) + : +- ^ RegularHashAggregateExecTransformer (18) + : +- ^ InputIteratorTransformer (17) + : +- ^ InputAdapter (16) + : +- ^ ShuffleQueryStage (15) + : +- ColumnarExchange (14) + : +- ^ ProjectExecTransformer (12) + : +- ^ FlushableHashAggregateExecTransformer (11) + : +- ^ Scan parquet (10) + +- ^ InputIteratorTransformer (40) + +- ^ InputAdapter (39) + +- ^ BroadcastQueryStage (38) + +- ColumnarBroadcastExchange (37) + +- ^ BroadcastHashJoinExecTransformer LeftSemi (35) + :- ^ NoopFilter (30) + : +- ^ Scan parquet (29) + +- ^ InputIteratorTransformer (34) + +- ^ InputAdapter (33) + +- ^ BroadcastQueryStage (32) + +- ReusedExchange (31) +- == Initial Plan == - TakeOrderedAndProject (85) - +- HashAggregate (84) - +- Exchange (83) - +- HashAggregate (82) - +- Project (81) - +- BroadcastHashJoin Inner BuildRight (80) - :- Project (68) - : +- BroadcastHashJoin Inner BuildLeft (67) - : :- BroadcastExchange (56) - : : +- Filter (55) - : : +- Scan parquet (54) - : +- BroadcastHashJoin LeftSemi BuildRight (66) - : :- Filter (58) - : : +- Scan parquet (57) - : +- BroadcastExchange (65) - : +- Project (64) - : +- Filter (63) - : +- HashAggregate (62) - : +- Exchange (61) - : +- HashAggregate (60) - : +- Scan parquet (59) - +- BroadcastExchange (79) - +- BroadcastHashJoin LeftSemi BuildRight (78) - :- Filter (70) - : +- Scan parquet (69) - +- BroadcastExchange (77) - +- Project (76) - +- Filter (75) - +- HashAggregate (74) - +- Exchange (73) - +- HashAggregate (72) - +- Scan parquet (71) + TakeOrderedAndProject (91) + +- HashAggregate (90) + +- Exchange (89) + +- HashAggregate (88) + +- Project (87) + +- BroadcastHashJoin Inner BuildRight (86) + :- Project (74) + : +- BroadcastHashJoin Inner BuildLeft (73) + : :- BroadcastExchange (62) + : : +- Filter (61) + : : +- Scan parquet (60) + : +- BroadcastHashJoin LeftSemi BuildRight (72) + : :- Filter (64) + : : +- Scan parquet (63) + : +- BroadcastExchange (71) + : +- Project (70) + : +- Filter (69) + : +- HashAggregate (68) + : +- Exchange (67) + : +- HashAggregate (66) + : +- Scan parquet (65) + +- BroadcastExchange (85) + +- BroadcastHashJoin LeftSemi BuildRight (84) + :- Filter (76) + : +- Scan parquet (75) + +- BroadcastExchange (83) + +- Project (82) + +- Filter (81) + +- HashAggregate (80) + +- Exchange (79) + +- HashAggregate (78) + +- Scan parquet (77) (1) Scan parquet @@ -297,180 +302,202 @@ Functions [1]: [sum(l_quantity#X)] Aggregate Attributes [1]: [sum(l_quantity#X)#X] Results [6]: [c_name#X, c_custkey#X, o_orderkey#X, o_orderdate#X, o_totalprice#X, sum(l_quantity#X)#X AS sum(l_quantity)#X] -(51) WholeStageCodegenTransformer (X) +(51) TopNTransformer +Input [6]: [c_name#X, c_custkey#X, o_orderkey#X, o_orderdate#X, o_totalprice#X, sum(l_quantity)#X] +Arguments: X, [o_totalprice#X DESC NULLS LAST, o_orderdate#X ASC NULLS FIRST], false + +(52) WholeStageCodegenTransformer (X) Input [6]: [c_name#X, c_custkey#X, o_orderkey#X, o_orderdate#X, o_totalprice#X, sum(l_quantity)#X] Arguments: false -(52) VeloxColumnarToRowExec +(53) ColumnarExchange Input [6]: [c_name#X, c_custkey#X, o_orderkey#X, o_orderdate#X, o_totalprice#X, sum(l_quantity)#X] +Arguments: SinglePartition, ENSURE_REQUIREMENTS, [c_name#X, c_custkey#X, o_orderkey#X, o_orderdate#X, o_totalprice#X, sum(l_quantity)#X], [plan_id=X], [id=#X] -(53) TakeOrderedAndProject +(54) InputAdapter +Input [6]: [c_name#X, c_custkey#X, o_orderkey#X, o_orderdate#X, o_totalprice#X, sum(l_quantity)#X] + +(55) InputIteratorTransformer +Input [6]: [c_name#X, c_custkey#X, o_orderkey#X, o_orderdate#X, o_totalprice#X, sum(l_quantity)#X] + +(56) TopNTransformer +Input [6]: [c_name#X, c_custkey#X, o_orderkey#X, o_orderdate#X, o_totalprice#X, sum(l_quantity)#X] +Arguments: X, [o_totalprice#X DESC NULLS LAST, o_orderdate#X ASC NULLS FIRST], true + +(57) ProjectExecTransformer +Output [6]: [c_name#X, c_custkey#X, o_orderkey#X, o_orderdate#X, o_totalprice#X, sum(l_quantity)#X] +Input [6]: [c_name#X, c_custkey#X, o_orderkey#X, o_orderdate#X, o_totalprice#X, sum(l_quantity)#X] + +(58) WholeStageCodegenTransformer (X) +Input [6]: [c_name#X, c_custkey#X, o_orderkey#X, o_orderdate#X, o_totalprice#X, sum(l_quantity)#X] +Arguments: false + +(59) VeloxColumnarToRowExec Input [6]: [c_name#X, c_custkey#X, o_orderkey#X, o_orderdate#X, o_totalprice#X, sum(l_quantity)#X] -Arguments: X, [o_totalprice#X DESC NULLS LAST, o_orderdate#X ASC NULLS FIRST], [c_name#X, c_custkey#X, o_orderkey#X, o_orderdate#X, o_totalprice#X, sum(l_quantity)#X] -(54) Scan parquet +(60) Scan parquet Output [2]: [c_custkey#X, c_name#X] Batched: true Location: InMemoryFileIndex [*] PushedFilters: [IsNotNull(c_custkey)] ReadSchema: struct -(55) Filter +(61) Filter Input [2]: [c_custkey#X, c_name#X] Condition : isnotnull(c_custkey#X) -(56) BroadcastExchange +(62) BroadcastExchange Input [2]: [c_custkey#X, c_name#X] Arguments: HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [plan_id=X] -(57) Scan parquet +(63) Scan parquet Output [4]: [o_orderkey#X, o_custkey#X, o_totalprice#X, o_orderdate#X] Batched: true Location: InMemoryFileIndex [*] PushedFilters: [IsNotNull(o_custkey), IsNotNull(o_orderkey)] ReadSchema: struct -(58) Filter +(64) Filter Input [4]: [o_orderkey#X, o_custkey#X, o_totalprice#X, o_orderdate#X] Condition : (isnotnull(o_custkey#X) AND isnotnull(o_orderkey#X)) -(59) Scan parquet +(65) Scan parquet Output [2]: [l_orderkey#X, l_quantity#X] Batched: true Location: InMemoryFileIndex [*] ReadSchema: struct -(60) HashAggregate +(66) HashAggregate Input [2]: [l_orderkey#X, l_quantity#X] Keys [1]: [l_orderkey#X] Functions [1]: [partial_sum(l_quantity#X)] Aggregate Attributes [2]: [sum#X, isEmpty#X] Results [3]: [l_orderkey#X, sum#X, isEmpty#X] -(61) Exchange +(67) Exchange Input [3]: [l_orderkey#X, sum#X, isEmpty#X] Arguments: hashpartitioning(l_orderkey#X, 1), ENSURE_REQUIREMENTS, [plan_id=X] -(62) HashAggregate +(68) HashAggregate Input [3]: [l_orderkey#X, sum#X, isEmpty#X] Keys [1]: [l_orderkey#X] Functions [1]: [sum(l_quantity#X)] Aggregate Attributes [1]: [sum(l_quantity#X)#X] Results [2]: [l_orderkey#X, sum(l_quantity#X)#X AS sum(l_quantity#X)#X] -(63) Filter +(69) Filter Input [2]: [l_orderkey#X, sum(l_quantity#X)#X] Condition : (isnotnull(sum(l_quantity#X)#X) AND (sum(l_quantity#X)#X > 300.00)) -(64) Project +(70) Project Output [1]: [l_orderkey#X] Input [2]: [l_orderkey#X, sum(l_quantity#X)#X] -(65) BroadcastExchange +(71) BroadcastExchange Input [1]: [l_orderkey#X] Arguments: HashedRelationBroadcastMode(List(input[0, bigint, true]),false), [plan_id=X] -(66) BroadcastHashJoin +(72) BroadcastHashJoin Left keys [1]: [o_orderkey#X] Right keys [1]: [l_orderkey#X] Join condition: None -(67) BroadcastHashJoin +(73) BroadcastHashJoin Left keys [1]: [c_custkey#X] Right keys [1]: [o_custkey#X] Join condition: None -(68) Project +(74) Project Output [5]: [c_custkey#X, c_name#X, o_orderkey#X, o_totalprice#X, o_orderdate#X] Input [6]: [c_custkey#X, c_name#X, o_orderkey#X, o_custkey#X, o_totalprice#X, o_orderdate#X] -(69) Scan parquet +(75) Scan parquet Output [2]: [l_orderkey#X, l_quantity#X] Batched: true Location: InMemoryFileIndex [*] PushedFilters: [IsNotNull(l_orderkey)] ReadSchema: struct -(70) Filter +(76) Filter Input [2]: [l_orderkey#X, l_quantity#X] Condition : isnotnull(l_orderkey#X) -(71) Scan parquet +(77) Scan parquet Output [2]: [l_orderkey#X, l_quantity#X] Batched: true Location: InMemoryFileIndex [*] ReadSchema: struct -(72) HashAggregate +(78) HashAggregate Input [2]: [l_orderkey#X, l_quantity#X] Keys [1]: [l_orderkey#X] Functions [1]: [partial_sum(l_quantity#X)] Aggregate Attributes [2]: [sum#X, isEmpty#X] Results [3]: [l_orderkey#X, sum#X, isEmpty#X] -(73) Exchange +(79) Exchange Input [3]: [l_orderkey#X, sum#X, isEmpty#X] Arguments: hashpartitioning(l_orderkey#X, 1), ENSURE_REQUIREMENTS, [plan_id=X] -(74) HashAggregate +(80) HashAggregate Input [3]: [l_orderkey#X, sum#X, isEmpty#X] Keys [1]: [l_orderkey#X] Functions [1]: [sum(l_quantity#X)] Aggregate Attributes [1]: [sum(l_quantity#X)#X] Results [2]: [l_orderkey#X, sum(l_quantity#X)#X AS sum(l_quantity#X)#X] -(75) Filter +(81) Filter Input [2]: [l_orderkey#X, sum(l_quantity#X)#X] Condition : (isnotnull(sum(l_quantity#X)#X) AND (sum(l_quantity#X)#X > 300.00)) -(76) Project +(82) Project Output [1]: [l_orderkey#X] Input [2]: [l_orderkey#X, sum(l_quantity#X)#X] -(77) BroadcastExchange +(83) BroadcastExchange Input [1]: [l_orderkey#X] Arguments: HashedRelationBroadcastMode(List(input[0, bigint, true]),false), [plan_id=X] -(78) BroadcastHashJoin +(84) BroadcastHashJoin Left keys [1]: [l_orderkey#X] Right keys [1]: [l_orderkey#X] Join condition: None -(79) BroadcastExchange +(85) BroadcastExchange Input [2]: [l_orderkey#X, l_quantity#X] Arguments: HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [plan_id=X] -(80) BroadcastHashJoin +(86) BroadcastHashJoin Left keys [1]: [o_orderkey#X] Right keys [1]: [l_orderkey#X] Join condition: None -(81) Project +(87) Project Output [6]: [c_custkey#X, c_name#X, o_orderkey#X, o_totalprice#X, o_orderdate#X, l_quantity#X] Input [7]: [c_custkey#X, c_name#X, o_orderkey#X, o_totalprice#X, o_orderdate#X, l_orderkey#X, l_quantity#X] -(82) HashAggregate +(88) HashAggregate Input [6]: [c_custkey#X, c_name#X, o_orderkey#X, o_totalprice#X, o_orderdate#X, l_quantity#X] Keys [5]: [c_name#X, c_custkey#X, o_orderkey#X, o_orderdate#X, o_totalprice#X] Functions [1]: [partial_sum(l_quantity#X)] Aggregate Attributes [2]: [sum#X, isEmpty#X] Results [7]: [c_name#X, c_custkey#X, o_orderkey#X, o_orderdate#X, o_totalprice#X, sum#X, isEmpty#X] -(83) Exchange +(89) Exchange Input [7]: [c_name#X, c_custkey#X, o_orderkey#X, o_orderdate#X, o_totalprice#X, sum#X, isEmpty#X] Arguments: hashpartitioning(c_name#X, c_custkey#X, o_orderkey#X, o_orderdate#X, o_totalprice#X, 1), ENSURE_REQUIREMENTS, [plan_id=X] -(84) HashAggregate +(90) HashAggregate Input [7]: [c_name#X, c_custkey#X, o_orderkey#X, o_orderdate#X, o_totalprice#X, sum#X, isEmpty#X] Keys [5]: [c_name#X, c_custkey#X, o_orderkey#X, o_orderdate#X, o_totalprice#X] Functions [1]: [sum(l_quantity#X)] Aggregate Attributes [1]: [sum(l_quantity#X)#X] Results [6]: [c_name#X, c_custkey#X, o_orderkey#X, o_orderdate#X, o_totalprice#X, sum(l_quantity#X)#X AS sum(l_quantity)#X] -(85) TakeOrderedAndProject +(91) TakeOrderedAndProject Input [6]: [c_name#X, c_custkey#X, o_orderkey#X, o_orderdate#X, o_totalprice#X, sum(l_quantity)#X] Arguments: X, [o_totalprice#X DESC NULLS LAST, o_orderdate#X ASC NULLS FIRST], [c_name#X, c_custkey#X, o_orderkey#X, o_orderdate#X, o_totalprice#X, sum(l_quantity)#X] -(86) AdaptiveSparkPlan +(92) AdaptiveSparkPlan Output [6]: [c_name#X, c_custkey#X, o_orderkey#X, o_orderdate#X, o_totalprice#X, sum(l_quantity)#X] Arguments: isFinalPlan=true \ No newline at end of file diff --git a/backends-velox/src/test/resources/tpch-approved-plan/v1-bhj-ras/spark32/21.txt b/backends-velox/src/test/resources/tpch-approved-plan/v1-bhj-ras/spark32/21.txt index 2b0a43ea6febc..f2b9c2ac144dc 100644 --- a/backends-velox/src/test/resources/tpch-approved-plan/v1-bhj-ras/spark32/21.txt +++ b/backends-velox/src/test/resources/tpch-approved-plan/v1-bhj-ras/spark32/21.txt @@ -1,91 +1,96 @@ == Physical Plan == -AdaptiveSparkPlan (92) +AdaptiveSparkPlan (98) +- == Final Plan == - TakeOrderedAndProject (59) - +- VeloxColumnarToRowExec (58) - +- ^ RegularHashAggregateExecTransformer (56) - +- ^ InputIteratorTransformer (55) - +- ^ InputAdapter (54) - +- ^ ShuffleQueryStage (53) - +- ColumnarExchange (52) - +- ^ ProjectExecTransformer (50) - +- ^ FlushableHashAggregateExecTransformer (49) - +- ^ ProjectExecTransformer (48) - +- ^ BroadcastHashJoinExecTransformer Inner (47) - :- ^ ProjectExecTransformer (38) - : +- ^ BroadcastHashJoinExecTransformer Inner (37) - : :- ^ ProjectExecTransformer (28) - : : +- ^ BroadcastHashJoinExecTransformer Inner (27) - : : :- ^ InputIteratorTransformer (7) - : : : +- ^ InputAdapter (6) - : : : +- ^ BroadcastQueryStage (5) - : : : +- ColumnarBroadcastExchange (4) - : : : +- ^ NoopFilter (2) - : : : +- ^ Scan parquet (1) - : : +- ^ BroadcastHashJoinExecTransformer LeftAnti (26) - : : :- ^ BroadcastHashJoinExecTransformer LeftSemi (17) - : : : :- ^ ProjectExecTransformer (10) - : : : : +- ^ NoopFilter (9) - : : : : +- ^ Scan parquet (8) - : : : +- ^ InputIteratorTransformer (16) - : : : +- ^ InputAdapter (15) - : : : +- ^ BroadcastQueryStage (14) - : : : +- ColumnarBroadcastExchange (13) - : : : +- ^ Scan parquet (11) - : : +- ^ InputIteratorTransformer (25) - : : +- ^ InputAdapter (24) - : : +- ^ BroadcastQueryStage (23) - : : +- ColumnarBroadcastExchange (22) - : : +- ^ ProjectExecTransformer (20) - : : +- ^ NoopFilter (19) - : : +- ^ Scan parquet (18) - : +- ^ InputIteratorTransformer (36) - : +- ^ InputAdapter (35) - : +- ^ BroadcastQueryStage (34) - : +- ColumnarBroadcastExchange (33) - : +- ^ ProjectExecTransformer (31) - : +- ^ NoopFilter (30) - : +- ^ Scan parquet (29) - +- ^ InputIteratorTransformer (46) - +- ^ InputAdapter (45) - +- ^ BroadcastQueryStage (44) - +- ColumnarBroadcastExchange (43) - +- ^ ProjectExecTransformer (41) - +- ^ NoopFilter (40) - +- ^ Scan parquet (39) + VeloxColumnarToRowExec (65) + +- ^ ProjectExecTransformer (63) + +- ^ TopNTransformer (62) + +- ^ InputIteratorTransformer (61) + +- ^ InputAdapter (60) + +- ^ ColumnarExchange (59) + +- ^ TopNTransformer (57) + +- ^ RegularHashAggregateExecTransformer (56) + +- ^ InputIteratorTransformer (55) + +- ^ InputAdapter (54) + +- ^ ShuffleQueryStage (53) + +- ColumnarExchange (52) + +- ^ ProjectExecTransformer (50) + +- ^ FlushableHashAggregateExecTransformer (49) + +- ^ ProjectExecTransformer (48) + +- ^ BroadcastHashJoinExecTransformer Inner (47) + :- ^ ProjectExecTransformer (38) + : +- ^ BroadcastHashJoinExecTransformer Inner (37) + : :- ^ ProjectExecTransformer (28) + : : +- ^ BroadcastHashJoinExecTransformer Inner (27) + : : :- ^ InputIteratorTransformer (7) + : : : +- ^ InputAdapter (6) + : : : +- ^ BroadcastQueryStage (5) + : : : +- ColumnarBroadcastExchange (4) + : : : +- ^ NoopFilter (2) + : : : +- ^ Scan parquet (1) + : : +- ^ BroadcastHashJoinExecTransformer LeftAnti (26) + : : :- ^ BroadcastHashJoinExecTransformer LeftSemi (17) + : : : :- ^ ProjectExecTransformer (10) + : : : : +- ^ NoopFilter (9) + : : : : +- ^ Scan parquet (8) + : : : +- ^ InputIteratorTransformer (16) + : : : +- ^ InputAdapter (15) + : : : +- ^ BroadcastQueryStage (14) + : : : +- ColumnarBroadcastExchange (13) + : : : +- ^ Scan parquet (11) + : : +- ^ InputIteratorTransformer (25) + : : +- ^ InputAdapter (24) + : : +- ^ BroadcastQueryStage (23) + : : +- ColumnarBroadcastExchange (22) + : : +- ^ ProjectExecTransformer (20) + : : +- ^ NoopFilter (19) + : : +- ^ Scan parquet (18) + : +- ^ InputIteratorTransformer (36) + : +- ^ InputAdapter (35) + : +- ^ BroadcastQueryStage (34) + : +- ColumnarBroadcastExchange (33) + : +- ^ ProjectExecTransformer (31) + : +- ^ NoopFilter (30) + : +- ^ Scan parquet (29) + +- ^ InputIteratorTransformer (46) + +- ^ InputAdapter (45) + +- ^ BroadcastQueryStage (44) + +- ColumnarBroadcastExchange (43) + +- ^ ProjectExecTransformer (41) + +- ^ NoopFilter (40) + +- ^ Scan parquet (39) +- == Initial Plan == - TakeOrderedAndProject (91) - +- HashAggregate (90) - +- Exchange (89) - +- HashAggregate (88) - +- Project (87) - +- BroadcastHashJoin Inner BuildRight (86) - :- Project (81) - : +- BroadcastHashJoin Inner BuildRight (80) - : :- Project (75) - : : +- BroadcastHashJoin Inner BuildLeft (74) - : : :- BroadcastExchange (62) - : : : +- Filter (61) - : : : +- Scan parquet (60) - : : +- BroadcastHashJoin LeftAnti BuildRight (73) - : : :- BroadcastHashJoin LeftSemi BuildRight (68) - : : : :- Project (65) - : : : : +- Filter (64) - : : : : +- Scan parquet (63) - : : : +- BroadcastExchange (67) - : : : +- Scan parquet (66) - : : +- BroadcastExchange (72) - : : +- Project (71) - : : +- Filter (70) - : : +- Scan parquet (69) - : +- BroadcastExchange (79) - : +- Project (78) - : +- Filter (77) - : +- Scan parquet (76) - +- BroadcastExchange (85) - +- Project (84) - +- Filter (83) - +- Scan parquet (82) + TakeOrderedAndProject (97) + +- HashAggregate (96) + +- Exchange (95) + +- HashAggregate (94) + +- Project (93) + +- BroadcastHashJoin Inner BuildRight (92) + :- Project (87) + : +- BroadcastHashJoin Inner BuildRight (86) + : :- Project (81) + : : +- BroadcastHashJoin Inner BuildLeft (80) + : : :- BroadcastExchange (68) + : : : +- Filter (67) + : : : +- Scan parquet (66) + : : +- BroadcastHashJoin LeftAnti BuildRight (79) + : : :- BroadcastHashJoin LeftSemi BuildRight (74) + : : : :- Project (71) + : : : : +- Filter (70) + : : : : +- Scan parquet (69) + : : : +- BroadcastExchange (73) + : : : +- Scan parquet (72) + : : +- BroadcastExchange (78) + : : +- Project (77) + : : +- Filter (76) + : : +- Scan parquet (75) + : +- BroadcastExchange (85) + : +- Project (84) + : +- Filter (83) + : +- Scan parquet (82) + +- BroadcastExchange (91) + +- Project (90) + +- Filter (89) + +- Scan parquet (88) (1) Scan parquet @@ -328,173 +333,195 @@ Functions [1]: [count(1)] Aggregate Attributes [1]: [count(1)#X] Results [2]: [s_name#X, count(1)#X AS numwait#X] -(57) WholeStageCodegenTransformer (X) +(57) TopNTransformer +Input [2]: [s_name#X, numwait#X] +Arguments: X, [numwait#X DESC NULLS LAST, s_name#X ASC NULLS FIRST], false + +(58) WholeStageCodegenTransformer (X) Input [2]: [s_name#X, numwait#X] Arguments: false -(58) VeloxColumnarToRowExec +(59) ColumnarExchange Input [2]: [s_name#X, numwait#X] +Arguments: SinglePartition, ENSURE_REQUIREMENTS, [s_name#X, numwait#X], [plan_id=X], [id=#X] -(59) TakeOrderedAndProject +(60) InputAdapter Input [2]: [s_name#X, numwait#X] -Arguments: X, [numwait#X DESC NULLS LAST, s_name#X ASC NULLS FIRST], [s_name#X, numwait#X] -(60) Scan parquet +(61) InputIteratorTransformer +Input [2]: [s_name#X, numwait#X] + +(62) TopNTransformer +Input [2]: [s_name#X, numwait#X] +Arguments: X, [numwait#X DESC NULLS LAST, s_name#X ASC NULLS FIRST], true + +(63) ProjectExecTransformer +Output [2]: [s_name#X, numwait#X] +Input [2]: [s_name#X, numwait#X] + +(64) WholeStageCodegenTransformer (X) +Input [2]: [s_name#X, numwait#X] +Arguments: false + +(65) VeloxColumnarToRowExec +Input [2]: [s_name#X, numwait#X] + +(66) Scan parquet Output [3]: [s_suppkey#X, s_name#X, s_nationkey#X] Batched: true Location: InMemoryFileIndex [*] PushedFilters: [IsNotNull(s_suppkey), IsNotNull(s_nationkey)] ReadSchema: struct -(61) Filter +(67) Filter Input [3]: [s_suppkey#X, s_name#X, s_nationkey#X] Condition : (isnotnull(s_suppkey#X) AND isnotnull(s_nationkey#X)) -(62) BroadcastExchange +(68) BroadcastExchange Input [3]: [s_suppkey#X, s_name#X, s_nationkey#X] Arguments: HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [plan_id=X] -(63) Scan parquet +(69) Scan parquet Output [4]: [l_orderkey#X, l_suppkey#X, l_commitdate#X, l_receiptdate#X] Batched: true Location: InMemoryFileIndex [*] PushedFilters: [IsNotNull(l_receiptdate), IsNotNull(l_commitdate), IsNotNull(l_suppkey), IsNotNull(l_orderkey)] ReadSchema: struct -(64) Filter +(70) Filter Input [4]: [l_orderkey#X, l_suppkey#X, l_commitdate#X, l_receiptdate#X] Condition : ((((isnotnull(l_receiptdate#X) AND isnotnull(l_commitdate#X)) AND (l_receiptdate#X > l_commitdate#X)) AND isnotnull(l_suppkey#X)) AND isnotnull(l_orderkey#X)) -(65) Project +(71) Project Output [2]: [l_orderkey#X, l_suppkey#X] Input [4]: [l_orderkey#X, l_suppkey#X, l_commitdate#X, l_receiptdate#X] -(66) Scan parquet +(72) Scan parquet Output [2]: [l_orderkey#X, l_suppkey#X] Batched: true Location: InMemoryFileIndex [*] ReadSchema: struct -(67) BroadcastExchange +(73) BroadcastExchange Input [2]: [l_orderkey#X, l_suppkey#X] Arguments: HashedRelationBroadcastMode(List(input[0, bigint, true]),false), [plan_id=X] -(68) BroadcastHashJoin +(74) BroadcastHashJoin Left keys [1]: [l_orderkey#X] Right keys [1]: [l_orderkey#X] Join condition: NOT (l_suppkey#X = l_suppkey#X) -(69) Scan parquet +(75) Scan parquet Output [4]: [l_orderkey#X, l_suppkey#X, l_commitdate#X, l_receiptdate#X] Batched: true Location: InMemoryFileIndex [*] PushedFilters: [IsNotNull(l_receiptdate), IsNotNull(l_commitdate)] ReadSchema: struct -(70) Filter +(76) Filter Input [4]: [l_orderkey#X, l_suppkey#X, l_commitdate#X, l_receiptdate#X] Condition : ((isnotnull(l_receiptdate#X) AND isnotnull(l_commitdate#X)) AND (l_receiptdate#X > l_commitdate#X)) -(71) Project +(77) Project Output [2]: [l_orderkey#X, l_suppkey#X] Input [4]: [l_orderkey#X, l_suppkey#X, l_commitdate#X, l_receiptdate#X] -(72) BroadcastExchange +(78) BroadcastExchange Input [2]: [l_orderkey#X, l_suppkey#X] Arguments: HashedRelationBroadcastMode(List(input[0, bigint, true]),false), [plan_id=X] -(73) BroadcastHashJoin +(79) BroadcastHashJoin Left keys [1]: [l_orderkey#X] Right keys [1]: [l_orderkey#X] Join condition: NOT (l_suppkey#X = l_suppkey#X) -(74) BroadcastHashJoin +(80) BroadcastHashJoin Left keys [1]: [s_suppkey#X] Right keys [1]: [l_suppkey#X] Join condition: None -(75) Project +(81) Project Output [3]: [s_name#X, s_nationkey#X, l_orderkey#X] Input [5]: [s_suppkey#X, s_name#X, s_nationkey#X, l_orderkey#X, l_suppkey#X] -(76) Scan parquet +(82) Scan parquet Output [2]: [o_orderkey#X, o_orderstatus#X] Batched: true Location: InMemoryFileIndex [*] PushedFilters: [IsNotNull(o_orderstatus), EqualTo(o_orderstatus,F), IsNotNull(o_orderkey)] ReadSchema: struct -(77) Filter +(83) Filter Input [2]: [o_orderkey#X, o_orderstatus#X] Condition : ((isnotnull(o_orderstatus#X) AND (o_orderstatus#X = F)) AND isnotnull(o_orderkey#X)) -(78) Project +(84) Project Output [1]: [o_orderkey#X] Input [2]: [o_orderkey#X, o_orderstatus#X] -(79) BroadcastExchange +(85) BroadcastExchange Input [1]: [o_orderkey#X] Arguments: HashedRelationBroadcastMode(List(input[0, bigint, true]),false), [plan_id=X] -(80) BroadcastHashJoin +(86) BroadcastHashJoin Left keys [1]: [l_orderkey#X] Right keys [1]: [o_orderkey#X] Join condition: None -(81) Project +(87) Project Output [2]: [s_name#X, s_nationkey#X] Input [4]: [s_name#X, s_nationkey#X, l_orderkey#X, o_orderkey#X] -(82) Scan parquet +(88) Scan parquet Output [2]: [n_nationkey#X, n_name#X] Batched: true Location: InMemoryFileIndex [*] PushedFilters: [IsNotNull(n_name), EqualTo(n_name,SAUDI ARABIA), IsNotNull(n_nationkey)] ReadSchema: struct -(83) Filter +(89) Filter Input [2]: [n_nationkey#X, n_name#X] Condition : ((isnotnull(n_name#X) AND (n_name#X = SAUDI ARABIA)) AND isnotnull(n_nationkey#X)) -(84) Project +(90) Project Output [1]: [n_nationkey#X] Input [2]: [n_nationkey#X, n_name#X] -(85) BroadcastExchange +(91) BroadcastExchange Input [1]: [n_nationkey#X] Arguments: HashedRelationBroadcastMode(List(input[0, bigint, true]),false), [plan_id=X] -(86) BroadcastHashJoin +(92) BroadcastHashJoin Left keys [1]: [s_nationkey#X] Right keys [1]: [n_nationkey#X] Join condition: None -(87) Project +(93) Project Output [1]: [s_name#X] Input [3]: [s_name#X, s_nationkey#X, n_nationkey#X] -(88) HashAggregate +(94) HashAggregate Input [1]: [s_name#X] Keys [1]: [s_name#X] Functions [1]: [partial_count(1)] Aggregate Attributes [1]: [count#X] Results [2]: [s_name#X, count#X] -(89) Exchange +(95) Exchange Input [2]: [s_name#X, count#X] Arguments: hashpartitioning(s_name#X, 1), ENSURE_REQUIREMENTS, [plan_id=X] -(90) HashAggregate +(96) HashAggregate Input [2]: [s_name#X, count#X] Keys [1]: [s_name#X] Functions [1]: [count(1)] Aggregate Attributes [1]: [count(1)#X] Results [2]: [s_name#X, count(1)#X AS numwait#X] -(91) TakeOrderedAndProject +(97) TakeOrderedAndProject Input [2]: [s_name#X, numwait#X] Arguments: X, [numwait#X DESC NULLS LAST, s_name#X ASC NULLS FIRST], [s_name#X, numwait#X] -(92) AdaptiveSparkPlan +(98) AdaptiveSparkPlan Output [2]: [s_name#X, numwait#X] Arguments: isFinalPlan=true \ No newline at end of file diff --git a/backends-velox/src/test/resources/tpch-approved-plan/v1-bhj-ras/spark33/18.txt b/backends-velox/src/test/resources/tpch-approved-plan/v1-bhj-ras/spark33/18.txt index 387110df9f4c2..96ad77829a05b 100644 --- a/backends-velox/src/test/resources/tpch-approved-plan/v1-bhj-ras/spark33/18.txt +++ b/backends-velox/src/test/resources/tpch-approved-plan/v1-bhj-ras/spark33/18.txt @@ -1,86 +1,91 @@ == Physical Plan == -AdaptiveSparkPlan (86) +AdaptiveSparkPlan (92) +- == Final Plan == - TakeOrderedAndProject (53) - +- VeloxColumnarToRowExec (52) - +- ^ RegularHashAggregateExecTransformer (50) - +- ^ InputIteratorTransformer (49) - +- ^ InputAdapter (48) - +- ^ ShuffleQueryStage (47), Statistics(X) - +- ColumnarExchange (46) - +- ^ ProjectExecTransformer (44) - +- ^ FlushableHashAggregateExecTransformer (43) - +- ^ ProjectExecTransformer (42) - +- ^ BroadcastHashJoinExecTransformer Inner (41) - :- ^ ProjectExecTransformer (28) - : +- ^ BroadcastHashJoinExecTransformer Inner (27) - : :- ^ InputIteratorTransformer (7) - : : +- ^ InputAdapter (6) - : : +- ^ BroadcastQueryStage (5), Statistics(X) - : : +- ColumnarBroadcastExchange (4) - : : +- ^ NoopFilter (2) - : : +- ^ Scan parquet (1) - : +- ^ BroadcastHashJoinExecTransformer LeftSemi (26) - : :- ^ NoopFilter (9) - : : +- ^ Scan parquet (8) - : +- ^ InputIteratorTransformer (25) - : +- ^ InputAdapter (24) - : +- ^ BroadcastQueryStage (23), Statistics(X) - : +- ColumnarBroadcastExchange (22) - : +- ^ ProjectExecTransformer (20) - : +- ^ FilterExecTransformer (19) - : +- ^ RegularHashAggregateExecTransformer (18) - : +- ^ InputIteratorTransformer (17) - : +- ^ InputAdapter (16) - : +- ^ ShuffleQueryStage (15), Statistics(X) - : +- ColumnarExchange (14) - : +- ^ ProjectExecTransformer (12) - : +- ^ FlushableHashAggregateExecTransformer (11) - : +- ^ Scan parquet (10) - +- ^ InputIteratorTransformer (40) - +- ^ InputAdapter (39) - +- ^ BroadcastQueryStage (38), Statistics(X) - +- ColumnarBroadcastExchange (37) - +- ^ BroadcastHashJoinExecTransformer LeftSemi (35) - :- ^ NoopFilter (30) - : +- ^ Scan parquet (29) - +- ^ InputIteratorTransformer (34) - +- ^ InputAdapter (33) - +- ^ BroadcastQueryStage (32), Statistics(X) - +- ReusedExchange (31) + VeloxColumnarToRowExec (59) + +- ^ ProjectExecTransformer (57) + +- ^ TopNTransformer (56) + +- ^ InputIteratorTransformer (55) + +- ^ InputAdapter (54) + +- ^ ColumnarExchange (53) + +- ^ TopNTransformer (51) + +- ^ RegularHashAggregateExecTransformer (50) + +- ^ InputIteratorTransformer (49) + +- ^ InputAdapter (48) + +- ^ ShuffleQueryStage (47), Statistics(X) + +- ColumnarExchange (46) + +- ^ ProjectExecTransformer (44) + +- ^ FlushableHashAggregateExecTransformer (43) + +- ^ ProjectExecTransformer (42) + +- ^ BroadcastHashJoinExecTransformer Inner (41) + :- ^ ProjectExecTransformer (28) + : +- ^ BroadcastHashJoinExecTransformer Inner (27) + : :- ^ InputIteratorTransformer (7) + : : +- ^ InputAdapter (6) + : : +- ^ BroadcastQueryStage (5), Statistics(X) + : : +- ColumnarBroadcastExchange (4) + : : +- ^ NoopFilter (2) + : : +- ^ Scan parquet (1) + : +- ^ BroadcastHashJoinExecTransformer LeftSemi (26) + : :- ^ NoopFilter (9) + : : +- ^ Scan parquet (8) + : +- ^ InputIteratorTransformer (25) + : +- ^ InputAdapter (24) + : +- ^ BroadcastQueryStage (23), Statistics(X) + : +- ColumnarBroadcastExchange (22) + : +- ^ ProjectExecTransformer (20) + : +- ^ FilterExecTransformer (19) + : +- ^ RegularHashAggregateExecTransformer (18) + : +- ^ InputIteratorTransformer (17) + : +- ^ InputAdapter (16) + : +- ^ ShuffleQueryStage (15), Statistics(X) + : +- ColumnarExchange (14) + : +- ^ ProjectExecTransformer (12) + : +- ^ FlushableHashAggregateExecTransformer (11) + : +- ^ Scan parquet (10) + +- ^ InputIteratorTransformer (40) + +- ^ InputAdapter (39) + +- ^ BroadcastQueryStage (38), Statistics(X) + +- ColumnarBroadcastExchange (37) + +- ^ BroadcastHashJoinExecTransformer LeftSemi (35) + :- ^ NoopFilter (30) + : +- ^ Scan parquet (29) + +- ^ InputIteratorTransformer (34) + +- ^ InputAdapter (33) + +- ^ BroadcastQueryStage (32), Statistics(X) + +- ReusedExchange (31) +- == Initial Plan == - TakeOrderedAndProject (85) - +- HashAggregate (84) - +- Exchange (83) - +- HashAggregate (82) - +- Project (81) - +- BroadcastHashJoin Inner BuildRight (80) - :- Project (68) - : +- BroadcastHashJoin Inner BuildLeft (67) - : :- BroadcastExchange (56) - : : +- Filter (55) - : : +- Scan parquet (54) - : +- BroadcastHashJoin LeftSemi BuildRight (66) - : :- Filter (58) - : : +- Scan parquet (57) - : +- BroadcastExchange (65) - : +- Project (64) - : +- Filter (63) - : +- HashAggregate (62) - : +- Exchange (61) - : +- HashAggregate (60) - : +- Scan parquet (59) - +- BroadcastExchange (79) - +- BroadcastHashJoin LeftSemi BuildRight (78) - :- Filter (70) - : +- Scan parquet (69) - +- BroadcastExchange (77) - +- Project (76) - +- Filter (75) - +- HashAggregate (74) - +- Exchange (73) - +- HashAggregate (72) - +- Scan parquet (71) + TakeOrderedAndProject (91) + +- HashAggregate (90) + +- Exchange (89) + +- HashAggregate (88) + +- Project (87) + +- BroadcastHashJoin Inner BuildRight (86) + :- Project (74) + : +- BroadcastHashJoin Inner BuildLeft (73) + : :- BroadcastExchange (62) + : : +- Filter (61) + : : +- Scan parquet (60) + : +- BroadcastHashJoin LeftSemi BuildRight (72) + : :- Filter (64) + : : +- Scan parquet (63) + : +- BroadcastExchange (71) + : +- Project (70) + : +- Filter (69) + : +- HashAggregate (68) + : +- Exchange (67) + : +- HashAggregate (66) + : +- Scan parquet (65) + +- BroadcastExchange (85) + +- BroadcastHashJoin LeftSemi BuildRight (84) + :- Filter (76) + : +- Scan parquet (75) + +- BroadcastExchange (83) + +- Project (82) + +- Filter (81) + +- HashAggregate (80) + +- Exchange (79) + +- HashAggregate (78) + +- Scan parquet (77) (1) Scan parquet @@ -297,180 +302,202 @@ Functions [1]: [sum(l_quantity#X)] Aggregate Attributes [1]: [sum(l_quantity#X)#X] Results [6]: [c_name#X, c_custkey#X, o_orderkey#X, o_orderdate#X, o_totalprice#X, sum(l_quantity#X)#X AS sum(l_quantity)#X] -(51) WholeStageCodegenTransformer (X) +(51) TopNTransformer +Input [6]: [c_name#X, c_custkey#X, o_orderkey#X, o_orderdate#X, o_totalprice#X, sum(l_quantity)#X] +Arguments: X, [o_totalprice#X DESC NULLS LAST, o_orderdate#X ASC NULLS FIRST], false + +(52) WholeStageCodegenTransformer (X) Input [6]: [c_name#X, c_custkey#X, o_orderkey#X, o_orderdate#X, o_totalprice#X, sum(l_quantity)#X] Arguments: false -(52) VeloxColumnarToRowExec +(53) ColumnarExchange Input [6]: [c_name#X, c_custkey#X, o_orderkey#X, o_orderdate#X, o_totalprice#X, sum(l_quantity)#X] +Arguments: SinglePartition, ENSURE_REQUIREMENTS, [c_name#X, c_custkey#X, o_orderkey#X, o_orderdate#X, o_totalprice#X, sum(l_quantity)#X], [plan_id=X], [id=#X] -(53) TakeOrderedAndProject +(54) InputAdapter +Input [6]: [c_name#X, c_custkey#X, o_orderkey#X, o_orderdate#X, o_totalprice#X, sum(l_quantity)#X] + +(55) InputIteratorTransformer +Input [6]: [c_name#X, c_custkey#X, o_orderkey#X, o_orderdate#X, o_totalprice#X, sum(l_quantity)#X] + +(56) TopNTransformer +Input [6]: [c_name#X, c_custkey#X, o_orderkey#X, o_orderdate#X, o_totalprice#X, sum(l_quantity)#X] +Arguments: X, [o_totalprice#X DESC NULLS LAST, o_orderdate#X ASC NULLS FIRST], true + +(57) ProjectExecTransformer +Output [6]: [c_name#X, c_custkey#X, o_orderkey#X, o_orderdate#X, o_totalprice#X, sum(l_quantity)#X] +Input [6]: [c_name#X, c_custkey#X, o_orderkey#X, o_orderdate#X, o_totalprice#X, sum(l_quantity)#X] + +(58) WholeStageCodegenTransformer (X) +Input [6]: [c_name#X, c_custkey#X, o_orderkey#X, o_orderdate#X, o_totalprice#X, sum(l_quantity)#X] +Arguments: false + +(59) VeloxColumnarToRowExec Input [6]: [c_name#X, c_custkey#X, o_orderkey#X, o_orderdate#X, o_totalprice#X, sum(l_quantity)#X] -Arguments: X, [o_totalprice#X DESC NULLS LAST, o_orderdate#X ASC NULLS FIRST], [c_name#X, c_custkey#X, o_orderkey#X, o_orderdate#X, o_totalprice#X, sum(l_quantity)#X] -(54) Scan parquet +(60) Scan parquet Output [2]: [c_custkey#X, c_name#X] Batched: true Location: InMemoryFileIndex [*] PushedFilters: [IsNotNull(c_custkey)] ReadSchema: struct -(55) Filter +(61) Filter Input [2]: [c_custkey#X, c_name#X] Condition : isnotnull(c_custkey#X) -(56) BroadcastExchange +(62) BroadcastExchange Input [2]: [c_custkey#X, c_name#X] Arguments: HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [plan_id=X] -(57) Scan parquet +(63) Scan parquet Output [4]: [o_orderkey#X, o_custkey#X, o_totalprice#X, o_orderdate#X] Batched: true Location: InMemoryFileIndex [*] PushedFilters: [IsNotNull(o_custkey), IsNotNull(o_orderkey)] ReadSchema: struct -(58) Filter +(64) Filter Input [4]: [o_orderkey#X, o_custkey#X, o_totalprice#X, o_orderdate#X] Condition : (isnotnull(o_custkey#X) AND isnotnull(o_orderkey#X)) -(59) Scan parquet +(65) Scan parquet Output [2]: [l_orderkey#X, l_quantity#X] Batched: true Location: InMemoryFileIndex [*] ReadSchema: struct -(60) HashAggregate +(66) HashAggregate Input [2]: [l_orderkey#X, l_quantity#X] Keys [1]: [l_orderkey#X] Functions [1]: [partial_sum(l_quantity#X)] Aggregate Attributes [2]: [sum#X, isEmpty#X] Results [3]: [l_orderkey#X, sum#X, isEmpty#X] -(61) Exchange +(67) Exchange Input [3]: [l_orderkey#X, sum#X, isEmpty#X] Arguments: hashpartitioning(l_orderkey#X, 1), ENSURE_REQUIREMENTS, [plan_id=X] -(62) HashAggregate +(68) HashAggregate Input [3]: [l_orderkey#X, sum#X, isEmpty#X] Keys [1]: [l_orderkey#X] Functions [1]: [sum(l_quantity#X)] Aggregate Attributes [1]: [sum(l_quantity#X)#X] Results [2]: [l_orderkey#X, sum(l_quantity#X)#X AS sum(l_quantity#X)#X] -(63) Filter +(69) Filter Input [2]: [l_orderkey#X, sum(l_quantity#X)#X] Condition : (isnotnull(sum(l_quantity#X)#X) AND (sum(l_quantity#X)#X > 300.00)) -(64) Project +(70) Project Output [1]: [l_orderkey#X] Input [2]: [l_orderkey#X, sum(l_quantity#X)#X] -(65) BroadcastExchange +(71) BroadcastExchange Input [1]: [l_orderkey#X] Arguments: HashedRelationBroadcastMode(List(input[0, bigint, true]),false), [plan_id=X] -(66) BroadcastHashJoin +(72) BroadcastHashJoin Left keys [1]: [o_orderkey#X] Right keys [1]: [l_orderkey#X] Join condition: None -(67) BroadcastHashJoin +(73) BroadcastHashJoin Left keys [1]: [c_custkey#X] Right keys [1]: [o_custkey#X] Join condition: None -(68) Project +(74) Project Output [5]: [c_custkey#X, c_name#X, o_orderkey#X, o_totalprice#X, o_orderdate#X] Input [6]: [c_custkey#X, c_name#X, o_orderkey#X, o_custkey#X, o_totalprice#X, o_orderdate#X] -(69) Scan parquet +(75) Scan parquet Output [2]: [l_orderkey#X, l_quantity#X] Batched: true Location: InMemoryFileIndex [*] PushedFilters: [IsNotNull(l_orderkey)] ReadSchema: struct -(70) Filter +(76) Filter Input [2]: [l_orderkey#X, l_quantity#X] Condition : isnotnull(l_orderkey#X) -(71) Scan parquet +(77) Scan parquet Output [2]: [l_orderkey#X, l_quantity#X] Batched: true Location: InMemoryFileIndex [*] ReadSchema: struct -(72) HashAggregate +(78) HashAggregate Input [2]: [l_orderkey#X, l_quantity#X] Keys [1]: [l_orderkey#X] Functions [1]: [partial_sum(l_quantity#X)] Aggregate Attributes [2]: [sum#X, isEmpty#X] Results [3]: [l_orderkey#X, sum#X, isEmpty#X] -(73) Exchange +(79) Exchange Input [3]: [l_orderkey#X, sum#X, isEmpty#X] Arguments: hashpartitioning(l_orderkey#X, 1), ENSURE_REQUIREMENTS, [plan_id=X] -(74) HashAggregate +(80) HashAggregate Input [3]: [l_orderkey#X, sum#X, isEmpty#X] Keys [1]: [l_orderkey#X] Functions [1]: [sum(l_quantity#X)] Aggregate Attributes [1]: [sum(l_quantity#X)#X] Results [2]: [l_orderkey#X, sum(l_quantity#X)#X AS sum(l_quantity#X)#X] -(75) Filter +(81) Filter Input [2]: [l_orderkey#X, sum(l_quantity#X)#X] Condition : (isnotnull(sum(l_quantity#X)#X) AND (sum(l_quantity#X)#X > 300.00)) -(76) Project +(82) Project Output [1]: [l_orderkey#X] Input [2]: [l_orderkey#X, sum(l_quantity#X)#X] -(77) BroadcastExchange +(83) BroadcastExchange Input [1]: [l_orderkey#X] Arguments: HashedRelationBroadcastMode(List(input[0, bigint, true]),false), [plan_id=X] -(78) BroadcastHashJoin +(84) BroadcastHashJoin Left keys [1]: [l_orderkey#X] Right keys [1]: [l_orderkey#X] Join condition: None -(79) BroadcastExchange +(85) BroadcastExchange Input [2]: [l_orderkey#X, l_quantity#X] Arguments: HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [plan_id=X] -(80) BroadcastHashJoin +(86) BroadcastHashJoin Left keys [1]: [o_orderkey#X] Right keys [1]: [l_orderkey#X] Join condition: None -(81) Project +(87) Project Output [6]: [c_custkey#X, c_name#X, o_orderkey#X, o_totalprice#X, o_orderdate#X, l_quantity#X] Input [7]: [c_custkey#X, c_name#X, o_orderkey#X, o_totalprice#X, o_orderdate#X, l_orderkey#X, l_quantity#X] -(82) HashAggregate +(88) HashAggregate Input [6]: [c_custkey#X, c_name#X, o_orderkey#X, o_totalprice#X, o_orderdate#X, l_quantity#X] Keys [5]: [c_name#X, c_custkey#X, o_orderkey#X, o_orderdate#X, o_totalprice#X] Functions [1]: [partial_sum(l_quantity#X)] Aggregate Attributes [2]: [sum#X, isEmpty#X] Results [7]: [c_name#X, c_custkey#X, o_orderkey#X, o_orderdate#X, o_totalprice#X, sum#X, isEmpty#X] -(83) Exchange +(89) Exchange Input [7]: [c_name#X, c_custkey#X, o_orderkey#X, o_orderdate#X, o_totalprice#X, sum#X, isEmpty#X] Arguments: hashpartitioning(c_name#X, c_custkey#X, o_orderkey#X, o_orderdate#X, o_totalprice#X, 1), ENSURE_REQUIREMENTS, [plan_id=X] -(84) HashAggregate +(90) HashAggregate Input [7]: [c_name#X, c_custkey#X, o_orderkey#X, o_orderdate#X, o_totalprice#X, sum#X, isEmpty#X] Keys [5]: [c_name#X, c_custkey#X, o_orderkey#X, o_orderdate#X, o_totalprice#X] Functions [1]: [sum(l_quantity#X)] Aggregate Attributes [1]: [sum(l_quantity#X)#X] Results [6]: [c_name#X, c_custkey#X, o_orderkey#X, o_orderdate#X, o_totalprice#X, sum(l_quantity#X)#X AS sum(l_quantity)#X] -(85) TakeOrderedAndProject +(91) TakeOrderedAndProject Input [6]: [c_name#X, c_custkey#X, o_orderkey#X, o_orderdate#X, o_totalprice#X, sum(l_quantity)#X] Arguments: X, [o_totalprice#X DESC NULLS LAST, o_orderdate#X ASC NULLS FIRST], [c_name#X, c_custkey#X, o_orderkey#X, o_orderdate#X, o_totalprice#X, sum(l_quantity)#X] -(86) AdaptiveSparkPlan +(92) AdaptiveSparkPlan Output [6]: [c_name#X, c_custkey#X, o_orderkey#X, o_orderdate#X, o_totalprice#X, sum(l_quantity)#X] Arguments: isFinalPlan=true \ No newline at end of file diff --git a/backends-velox/src/test/resources/tpch-approved-plan/v1-bhj-ras/spark33/3.txt b/backends-velox/src/test/resources/tpch-approved-plan/v1-bhj-ras/spark33/3.txt index b99911f2b9feb..0c535cca37ba8 100644 --- a/backends-velox/src/test/resources/tpch-approved-plan/v1-bhj-ras/spark33/3.txt +++ b/backends-velox/src/test/resources/tpch-approved-plan/v1-bhj-ras/spark33/3.txt @@ -1,55 +1,60 @@ == Physical Plan == -AdaptiveSparkPlan (53) +AdaptiveSparkPlan (59) +- == Final Plan == - TakeOrderedAndProject (34) - +- VeloxColumnarToRowExec (33) - +- ^ ProjectExecTransformer (31) - +- ^ RegularHashAggregateExecTransformer (30) - +- ^ InputIteratorTransformer (29) - +- ^ InputAdapter (28) - +- ^ ShuffleQueryStage (27), Statistics(X) - +- ColumnarExchange (26) - +- ^ ProjectExecTransformer (24) - +- ^ FlushableHashAggregateExecTransformer (23) - +- ^ ProjectExecTransformer (22) - +- ^ BroadcastHashJoinExecTransformer Inner (21) - :- ^ ProjectExecTransformer (12) - : +- ^ BroadcastHashJoinExecTransformer Inner (11) - : :- ^ InputIteratorTransformer (8) - : : +- ^ InputAdapter (7) - : : +- ^ BroadcastQueryStage (6), Statistics(X) - : : +- ColumnarBroadcastExchange (5) - : : +- ^ ProjectExecTransformer (3) - : : +- ^ NoopFilter (2) - : : +- ^ Scan parquet (1) - : +- ^ NoopFilter (10) - : +- ^ Scan parquet (9) - +- ^ InputIteratorTransformer (20) - +- ^ InputAdapter (19) - +- ^ BroadcastQueryStage (18), Statistics(X) - +- ColumnarBroadcastExchange (17) - +- ^ ProjectExecTransformer (15) - +- ^ NoopFilter (14) - +- ^ Scan parquet (13) + VeloxColumnarToRowExec (40) + +- ^ ProjectExecTransformer (38) + +- ^ TopNTransformer (37) + +- ^ InputIteratorTransformer (36) + +- ^ InputAdapter (35) + +- ^ ColumnarExchange (34) + +- ^ TopNTransformer (32) + +- ^ ProjectExecTransformer (31) + +- ^ RegularHashAggregateExecTransformer (30) + +- ^ InputIteratorTransformer (29) + +- ^ InputAdapter (28) + +- ^ ShuffleQueryStage (27), Statistics(X) + +- ColumnarExchange (26) + +- ^ ProjectExecTransformer (24) + +- ^ FlushableHashAggregateExecTransformer (23) + +- ^ ProjectExecTransformer (22) + +- ^ BroadcastHashJoinExecTransformer Inner (21) + :- ^ ProjectExecTransformer (12) + : +- ^ BroadcastHashJoinExecTransformer Inner (11) + : :- ^ InputIteratorTransformer (8) + : : +- ^ InputAdapter (7) + : : +- ^ BroadcastQueryStage (6), Statistics(X) + : : +- ColumnarBroadcastExchange (5) + : : +- ^ ProjectExecTransformer (3) + : : +- ^ NoopFilter (2) + : : +- ^ Scan parquet (1) + : +- ^ NoopFilter (10) + : +- ^ Scan parquet (9) + +- ^ InputIteratorTransformer (20) + +- ^ InputAdapter (19) + +- ^ BroadcastQueryStage (18), Statistics(X) + +- ColumnarBroadcastExchange (17) + +- ^ ProjectExecTransformer (15) + +- ^ NoopFilter (14) + +- ^ Scan parquet (13) +- == Initial Plan == - TakeOrderedAndProject (52) - +- HashAggregate (51) - +- Exchange (50) - +- HashAggregate (49) - +- Project (48) - +- BroadcastHashJoin Inner BuildRight (47) - :- Project (42) - : +- BroadcastHashJoin Inner BuildLeft (41) - : :- BroadcastExchange (38) - : : +- Project (37) - : : +- Filter (36) - : : +- Scan parquet (35) - : +- Filter (40) - : +- Scan parquet (39) - +- BroadcastExchange (46) - +- Project (45) - +- Filter (44) - +- Scan parquet (43) + TakeOrderedAndProject (58) + +- HashAggregate (57) + +- Exchange (56) + +- HashAggregate (55) + +- Project (54) + +- BroadcastHashJoin Inner BuildRight (53) + :- Project (48) + : +- BroadcastHashJoin Inner BuildLeft (47) + : :- BroadcastExchange (44) + : : +- Project (43) + : : +- Filter (42) + : : +- Scan parquet (41) + : +- Filter (46) + : +- Scan parquet (45) + +- BroadcastExchange (52) + +- Project (51) + +- Filter (50) + +- Scan parquet (49) (1) Scan parquet @@ -187,106 +192,128 @@ Results [4]: [l_orderkey#X, o_orderdate#X, o_shippriority#X, sum(CheckOverflow(( Output [4]: [l_orderkey#X, sum(CheckOverflow((promote_precision(cast(l_extendedprice#X as decimal(13,2))) * promote_precision(CheckOverflow((1.00 - promote_precision(cast(l_discount#X as decimal(13,2)))), DecimalType(13,2)))), DecimalType(26,4)))#X AS revenue#X, o_orderdate#X, o_shippriority#X] Input [4]: [l_orderkey#X, o_orderdate#X, o_shippriority#X, sum(CheckOverflow((promote_precision(cast(l_extendedprice#X as decimal(13,2))) * promote_precision(CheckOverflow((1.00 - promote_precision(cast(l_discount#X as decimal(13,2)))), DecimalType(13,2)))), DecimalType(26,4)))#X] -(32) WholeStageCodegenTransformer (X) +(32) TopNTransformer +Input [4]: [l_orderkey#X, revenue#X, o_orderdate#X, o_shippriority#X] +Arguments: X, [revenue#X DESC NULLS LAST, o_orderdate#X ASC NULLS FIRST], false + +(33) WholeStageCodegenTransformer (X) Input [4]: [l_orderkey#X, revenue#X, o_orderdate#X, o_shippriority#X] Arguments: false -(33) VeloxColumnarToRowExec +(34) ColumnarExchange Input [4]: [l_orderkey#X, revenue#X, o_orderdate#X, o_shippriority#X] +Arguments: SinglePartition, ENSURE_REQUIREMENTS, [l_orderkey#X, revenue#X, o_orderdate#X, o_shippriority#X], [plan_id=X], [id=#X] -(34) TakeOrderedAndProject +(35) InputAdapter +Input [4]: [l_orderkey#X, revenue#X, o_orderdate#X, o_shippriority#X] + +(36) InputIteratorTransformer +Input [4]: [l_orderkey#X, revenue#X, o_orderdate#X, o_shippriority#X] + +(37) TopNTransformer +Input [4]: [l_orderkey#X, revenue#X, o_orderdate#X, o_shippriority#X] +Arguments: X, [revenue#X DESC NULLS LAST, o_orderdate#X ASC NULLS FIRST], true + +(38) ProjectExecTransformer +Output [4]: [l_orderkey#X, revenue#X, o_orderdate#X, o_shippriority#X] Input [4]: [l_orderkey#X, revenue#X, o_orderdate#X, o_shippriority#X] -Arguments: X, [revenue#X DESC NULLS LAST, o_orderdate#X ASC NULLS FIRST], [l_orderkey#X, revenue#X, o_orderdate#X, o_shippriority#X] -(35) Scan parquet +(39) WholeStageCodegenTransformer (X) +Input [4]: [l_orderkey#X, revenue#X, o_orderdate#X, o_shippriority#X] +Arguments: false + +(40) VeloxColumnarToRowExec +Input [4]: [l_orderkey#X, revenue#X, o_orderdate#X, o_shippriority#X] + +(41) Scan parquet Output [2]: [c_custkey#X, c_mktsegment#X] Batched: true Location: InMemoryFileIndex [*] PushedFilters: [IsNotNull(c_mktsegment), EqualTo(c_mktsegment,BUILDING), IsNotNull(c_custkey)] ReadSchema: struct -(36) Filter +(42) Filter Input [2]: [c_custkey#X, c_mktsegment#X] Condition : ((isnotnull(c_mktsegment#X) AND (c_mktsegment#X = BUILDING)) AND isnotnull(c_custkey#X)) -(37) Project +(43) Project Output [1]: [c_custkey#X] Input [2]: [c_custkey#X, c_mktsegment#X] -(38) BroadcastExchange +(44) BroadcastExchange Input [1]: [c_custkey#X] Arguments: HashedRelationBroadcastMode(List(input[0, bigint, true]),false), [plan_id=X] -(39) Scan parquet +(45) Scan parquet Output [4]: [o_orderkey#X, o_custkey#X, o_orderdate#X, o_shippriority#X] Batched: true Location: InMemoryFileIndex [*] PushedFilters: [IsNotNull(o_orderdate), LessThan(o_orderdate,1995-03-15), IsNotNull(o_custkey), IsNotNull(o_orderkey)] ReadSchema: struct -(40) Filter +(46) Filter Input [4]: [o_orderkey#X, o_custkey#X, o_orderdate#X, o_shippriority#X] Condition : (((isnotnull(o_orderdate#X) AND (o_orderdate#X < 1995-03-15)) AND isnotnull(o_custkey#X)) AND isnotnull(o_orderkey#X)) -(41) BroadcastHashJoin +(47) BroadcastHashJoin Left keys [1]: [c_custkey#X] Right keys [1]: [o_custkey#X] Join condition: None -(42) Project +(48) Project Output [3]: [o_orderkey#X, o_orderdate#X, o_shippriority#X] Input [5]: [c_custkey#X, o_orderkey#X, o_custkey#X, o_orderdate#X, o_shippriority#X] -(43) Scan parquet +(49) Scan parquet Output [4]: [l_orderkey#X, l_extendedprice#X, l_discount#X, l_shipdate#X] Batched: true Location: InMemoryFileIndex [*] PushedFilters: [IsNotNull(l_shipdate), GreaterThan(l_shipdate,1995-03-15), IsNotNull(l_orderkey)] ReadSchema: struct -(44) Filter +(50) Filter Input [4]: [l_orderkey#X, l_extendedprice#X, l_discount#X, l_shipdate#X] Condition : ((isnotnull(l_shipdate#X) AND (l_shipdate#X > 1995-03-15)) AND isnotnull(l_orderkey#X)) -(45) Project +(51) Project Output [3]: [l_orderkey#X, l_extendedprice#X, l_discount#X] Input [4]: [l_orderkey#X, l_extendedprice#X, l_discount#X, l_shipdate#X] -(46) BroadcastExchange +(52) BroadcastExchange Input [3]: [l_orderkey#X, l_extendedprice#X, l_discount#X] Arguments: HashedRelationBroadcastMode(List(input[0, bigint, true]),false), [plan_id=X] -(47) BroadcastHashJoin +(53) BroadcastHashJoin Left keys [1]: [o_orderkey#X] Right keys [1]: [l_orderkey#X] Join condition: None -(48) Project +(54) Project Output [5]: [o_orderdate#X, o_shippriority#X, l_orderkey#X, l_extendedprice#X, l_discount#X] Input [6]: [o_orderkey#X, o_orderdate#X, o_shippriority#X, l_orderkey#X, l_extendedprice#X, l_discount#X] -(49) HashAggregate +(55) HashAggregate Input [5]: [o_orderdate#X, o_shippriority#X, l_orderkey#X, l_extendedprice#X, l_discount#X] Keys [3]: [l_orderkey#X, o_orderdate#X, o_shippriority#X] Functions [1]: [partial_sum(CheckOverflow((promote_precision(cast(l_extendedprice#X as decimal(13,2))) * promote_precision(CheckOverflow((1.00 - promote_precision(cast(l_discount#X as decimal(13,2)))), DecimalType(13,2)))), DecimalType(26,4)))] Aggregate Attributes [2]: [sum#X, isEmpty#X] Results [5]: [l_orderkey#X, o_orderdate#X, o_shippriority#X, sum#X, isEmpty#X] -(50) Exchange +(56) Exchange Input [5]: [l_orderkey#X, o_orderdate#X, o_shippriority#X, sum#X, isEmpty#X] Arguments: hashpartitioning(l_orderkey#X, o_orderdate#X, o_shippriority#X, 1), ENSURE_REQUIREMENTS, [plan_id=X] -(51) HashAggregate +(57) HashAggregate Input [5]: [l_orderkey#X, o_orderdate#X, o_shippriority#X, sum#X, isEmpty#X] Keys [3]: [l_orderkey#X, o_orderdate#X, o_shippriority#X] Functions [1]: [sum(CheckOverflow((promote_precision(cast(l_extendedprice#X as decimal(13,2))) * promote_precision(CheckOverflow((1.00 - promote_precision(cast(l_discount#X as decimal(13,2)))), DecimalType(13,2)))), DecimalType(26,4)))] Aggregate Attributes [1]: [sum(CheckOverflow((promote_precision(cast(l_extendedprice#X as decimal(13,2))) * promote_precision(CheckOverflow((1.00 - promote_precision(cast(l_discount#X as decimal(13,2)))), DecimalType(13,2)))), DecimalType(26,4)))#X] Results [4]: [l_orderkey#X, sum(CheckOverflow((promote_precision(cast(l_extendedprice#X as decimal(13,2))) * promote_precision(CheckOverflow((1.00 - promote_precision(cast(l_discount#X as decimal(13,2)))), DecimalType(13,2)))), DecimalType(26,4)))#X AS revenue#X, o_orderdate#X, o_shippriority#X] -(52) TakeOrderedAndProject +(58) TakeOrderedAndProject Input [4]: [l_orderkey#X, revenue#X, o_orderdate#X, o_shippriority#X] Arguments: X, [revenue#X DESC NULLS LAST, o_orderdate#X ASC NULLS FIRST], [l_orderkey#X, revenue#X, o_orderdate#X, o_shippriority#X] -(53) AdaptiveSparkPlan +(59) AdaptiveSparkPlan Output [4]: [l_orderkey#X, revenue#X, o_orderdate#X, o_shippriority#X] Arguments: isFinalPlan=true \ No newline at end of file diff --git a/backends-velox/src/test/resources/tpch-approved-plan/v1-bhj-ras/spark34/10.txt b/backends-velox/src/test/resources/tpch-approved-plan/v1-bhj-ras/spark34/10.txt index ab3d73dde692f..9cf7ca78644aa 100644 --- a/backends-velox/src/test/resources/tpch-approved-plan/v1-bhj-ras/spark34/10.txt +++ b/backends-velox/src/test/resources/tpch-approved-plan/v1-bhj-ras/spark34/10.txt @@ -1,68 +1,73 @@ == Physical Plan == -AdaptiveSparkPlan (67) +AdaptiveSparkPlan (73) +- == Final Plan == - TakeOrderedAndProject (43) - +- VeloxColumnarToRowExec (42) - +- ^ ProjectExecTransformer (40) - +- ^ RegularHashAggregateExecTransformer (39) - +- ^ InputIteratorTransformer (38) - +- ^ InputAdapter (37) - +- ^ ShuffleQueryStage (36), Statistics(X) - +- ColumnarExchange (35) - +- ^ ProjectExecTransformer (33) - +- ^ FlushableHashAggregateExecTransformer (32) - +- ^ ProjectExecTransformer (31) - +- ^ BroadcastHashJoinExecTransformer Inner (30) - :- ^ ProjectExecTransformer (22) - : +- ^ BroadcastHashJoinExecTransformer Inner (21) - : :- ^ ProjectExecTransformer (12) - : : +- ^ BroadcastHashJoinExecTransformer Inner (11) - : : :- ^ NoopFilter (2) - : : : +- ^ Scan parquet (1) - : : +- ^ InputIteratorTransformer (10) - : : +- ^ InputAdapter (9) - : : +- ^ BroadcastQueryStage (8), Statistics(X) - : : +- ColumnarBroadcastExchange (7) - : : +- ^ ProjectExecTransformer (5) - : : +- ^ NoopFilter (4) - : : +- ^ Scan parquet (3) - : +- ^ InputIteratorTransformer (20) - : +- ^ InputAdapter (19) - : +- ^ BroadcastQueryStage (18), Statistics(X) - : +- ColumnarBroadcastExchange (17) - : +- ^ ProjectExecTransformer (15) - : +- ^ NoopFilter (14) - : +- ^ Scan parquet (13) - +- ^ InputIteratorTransformer (29) - +- ^ InputAdapter (28) - +- ^ BroadcastQueryStage (27), Statistics(X) - +- ColumnarBroadcastExchange (26) - +- ^ NoopFilter (24) - +- ^ Scan parquet (23) + VeloxColumnarToRowExec (49) + +- ^ ProjectExecTransformer (47) + +- ^ TopNTransformer (46) + +- ^ InputIteratorTransformer (45) + +- ^ InputAdapter (44) + +- ^ ColumnarExchange (43) + +- ^ TopNTransformer (41) + +- ^ ProjectExecTransformer (40) + +- ^ RegularHashAggregateExecTransformer (39) + +- ^ InputIteratorTransformer (38) + +- ^ InputAdapter (37) + +- ^ ShuffleQueryStage (36), Statistics(X) + +- ColumnarExchange (35) + +- ^ ProjectExecTransformer (33) + +- ^ FlushableHashAggregateExecTransformer (32) + +- ^ ProjectExecTransformer (31) + +- ^ BroadcastHashJoinExecTransformer Inner (30) + :- ^ ProjectExecTransformer (22) + : +- ^ BroadcastHashJoinExecTransformer Inner (21) + : :- ^ ProjectExecTransformer (12) + : : +- ^ BroadcastHashJoinExecTransformer Inner (11) + : : :- ^ NoopFilter (2) + : : : +- ^ Scan parquet (1) + : : +- ^ InputIteratorTransformer (10) + : : +- ^ InputAdapter (9) + : : +- ^ BroadcastQueryStage (8), Statistics(X) + : : +- ColumnarBroadcastExchange (7) + : : +- ^ ProjectExecTransformer (5) + : : +- ^ NoopFilter (4) + : : +- ^ Scan parquet (3) + : +- ^ InputIteratorTransformer (20) + : +- ^ InputAdapter (19) + : +- ^ BroadcastQueryStage (18), Statistics(X) + : +- ColumnarBroadcastExchange (17) + : +- ^ ProjectExecTransformer (15) + : +- ^ NoopFilter (14) + : +- ^ Scan parquet (13) + +- ^ InputIteratorTransformer (29) + +- ^ InputAdapter (28) + +- ^ BroadcastQueryStage (27), Statistics(X) + +- ColumnarBroadcastExchange (26) + +- ^ NoopFilter (24) + +- ^ Scan parquet (23) +- == Initial Plan == - TakeOrderedAndProject (66) - +- HashAggregate (65) - +- Exchange (64) - +- HashAggregate (63) - +- Project (62) - +- BroadcastHashJoin Inner BuildRight (61) - :- Project (57) - : +- BroadcastHashJoin Inner BuildRight (56) - : :- Project (51) - : : +- BroadcastHashJoin Inner BuildRight (50) - : : :- Filter (45) - : : : +- Scan parquet (44) - : : +- BroadcastExchange (49) - : : +- Project (48) - : : +- Filter (47) - : : +- Scan parquet (46) - : +- BroadcastExchange (55) - : +- Project (54) - : +- Filter (53) - : +- Scan parquet (52) - +- BroadcastExchange (60) - +- Filter (59) - +- Scan parquet (58) + TakeOrderedAndProject (72) + +- HashAggregate (71) + +- Exchange (70) + +- HashAggregate (69) + +- Project (68) + +- BroadcastHashJoin Inner BuildRight (67) + :- Project (63) + : +- BroadcastHashJoin Inner BuildRight (62) + : :- Project (57) + : : +- BroadcastHashJoin Inner BuildRight (56) + : : :- Filter (51) + : : : +- Scan parquet (50) + : : +- BroadcastExchange (55) + : : +- Project (54) + : : +- Filter (53) + : : +- Scan parquet (52) + : +- BroadcastExchange (61) + : +- Project (60) + : +- Filter (59) + : +- Scan parquet (58) + +- BroadcastExchange (66) + +- Filter (65) + +- Scan parquet (64) (1) Scan parquet @@ -241,133 +246,155 @@ Results [8]: [c_custkey#X, c_name#X, c_acctbal#X, c_phone#X, n_name#X, c_address Output [8]: [c_custkey#X, c_name#X, sum((l_extendedprice#X * (1 - l_discount#X)))#X AS revenue#X, c_acctbal#X, n_name#X, c_address#X, c_phone#X, c_comment#X] Input [8]: [c_custkey#X, c_name#X, c_acctbal#X, c_phone#X, n_name#X, c_address#X, c_comment#X, sum((l_extendedprice#X * (1 - l_discount#X)))#X] -(41) WholeStageCodegenTransformer (X) +(41) TopNTransformer +Input [8]: [c_custkey#X, c_name#X, revenue#X, c_acctbal#X, n_name#X, c_address#X, c_phone#X, c_comment#X] +Arguments: X, [revenue#X DESC NULLS LAST], false + +(42) WholeStageCodegenTransformer (X) Input [8]: [c_custkey#X, c_name#X, revenue#X, c_acctbal#X, n_name#X, c_address#X, c_phone#X, c_comment#X] Arguments: false -(42) VeloxColumnarToRowExec +(43) ColumnarExchange Input [8]: [c_custkey#X, c_name#X, revenue#X, c_acctbal#X, n_name#X, c_address#X, c_phone#X, c_comment#X] +Arguments: SinglePartition, ENSURE_REQUIREMENTS, [c_custkey#X, c_name#X, revenue#X, c_acctbal#X, n_name#X, c_address#X, c_phone#X, c_comment#X], [plan_id=X], [id=#X] -(43) TakeOrderedAndProject +(44) InputAdapter +Input [8]: [c_custkey#X, c_name#X, revenue#X, c_acctbal#X, n_name#X, c_address#X, c_phone#X, c_comment#X] + +(45) InputIteratorTransformer +Input [8]: [c_custkey#X, c_name#X, revenue#X, c_acctbal#X, n_name#X, c_address#X, c_phone#X, c_comment#X] + +(46) TopNTransformer +Input [8]: [c_custkey#X, c_name#X, revenue#X, c_acctbal#X, n_name#X, c_address#X, c_phone#X, c_comment#X] +Arguments: X, [revenue#X DESC NULLS LAST], true + +(47) ProjectExecTransformer +Output [8]: [c_custkey#X, c_name#X, revenue#X, c_acctbal#X, n_name#X, c_address#X, c_phone#X, c_comment#X] +Input [8]: [c_custkey#X, c_name#X, revenue#X, c_acctbal#X, n_name#X, c_address#X, c_phone#X, c_comment#X] + +(48) WholeStageCodegenTransformer (X) +Input [8]: [c_custkey#X, c_name#X, revenue#X, c_acctbal#X, n_name#X, c_address#X, c_phone#X, c_comment#X] +Arguments: false + +(49) VeloxColumnarToRowExec Input [8]: [c_custkey#X, c_name#X, revenue#X, c_acctbal#X, n_name#X, c_address#X, c_phone#X, c_comment#X] -Arguments: X, [revenue#X DESC NULLS LAST], [c_custkey#X, c_name#X, revenue#X, c_acctbal#X, n_name#X, c_address#X, c_phone#X, c_comment#X] -(44) Scan parquet +(50) Scan parquet Output [7]: [c_custkey#X, c_name#X, c_address#X, c_nationkey#X, c_phone#X, c_acctbal#X, c_comment#X] Batched: true Location: InMemoryFileIndex [*] PushedFilters: [IsNotNull(c_custkey), IsNotNull(c_nationkey)] ReadSchema: struct -(45) Filter +(51) Filter Input [7]: [c_custkey#X, c_name#X, c_address#X, c_nationkey#X, c_phone#X, c_acctbal#X, c_comment#X] Condition : (isnotnull(c_custkey#X) AND isnotnull(c_nationkey#X)) -(46) Scan parquet +(52) Scan parquet Output [3]: [o_orderkey#X, o_custkey#X, o_orderdate#X] Batched: true Location: InMemoryFileIndex [*] PushedFilters: [IsNotNull(o_orderdate), GreaterThanOrEqual(o_orderdate,1993-10-01), LessThan(o_orderdate,1994-01-01), IsNotNull(o_custkey), IsNotNull(o_orderkey)] ReadSchema: struct -(47) Filter +(53) Filter Input [3]: [o_orderkey#X, o_custkey#X, o_orderdate#X] Condition : ((((isnotnull(o_orderdate#X) AND (o_orderdate#X >= 1993-10-01)) AND (o_orderdate#X < 1994-01-01)) AND isnotnull(o_custkey#X)) AND isnotnull(o_orderkey#X)) -(48) Project +(54) Project Output [2]: [o_orderkey#X, o_custkey#X] Input [3]: [o_orderkey#X, o_custkey#X, o_orderdate#X] -(49) BroadcastExchange +(55) BroadcastExchange Input [2]: [o_orderkey#X, o_custkey#X] Arguments: HashedRelationBroadcastMode(List(input[1, bigint, true]),false), [plan_id=X] -(50) BroadcastHashJoin +(56) BroadcastHashJoin Left keys [1]: [c_custkey#X] Right keys [1]: [o_custkey#X] Join type: Inner Join condition: None -(51) Project +(57) Project Output [8]: [c_custkey#X, c_name#X, c_address#X, c_nationkey#X, c_phone#X, c_acctbal#X, c_comment#X, o_orderkey#X] Input [9]: [c_custkey#X, c_name#X, c_address#X, c_nationkey#X, c_phone#X, c_acctbal#X, c_comment#X, o_orderkey#X, o_custkey#X] -(52) Scan parquet +(58) Scan parquet Output [4]: [l_orderkey#X, l_extendedprice#X, l_discount#X, l_returnflag#X] Batched: true Location: InMemoryFileIndex [*] PushedFilters: [IsNotNull(l_returnflag), EqualTo(l_returnflag,R), IsNotNull(l_orderkey)] ReadSchema: struct -(53) Filter +(59) Filter Input [4]: [l_orderkey#X, l_extendedprice#X, l_discount#X, l_returnflag#X] Condition : ((isnotnull(l_returnflag#X) AND (l_returnflag#X = R)) AND isnotnull(l_orderkey#X)) -(54) Project +(60) Project Output [3]: [l_orderkey#X, l_extendedprice#X, l_discount#X] Input [4]: [l_orderkey#X, l_extendedprice#X, l_discount#X, l_returnflag#X] -(55) BroadcastExchange +(61) BroadcastExchange Input [3]: [l_orderkey#X, l_extendedprice#X, l_discount#X] Arguments: HashedRelationBroadcastMode(List(input[0, bigint, true]),false), [plan_id=X] -(56) BroadcastHashJoin +(62) BroadcastHashJoin Left keys [1]: [o_orderkey#X] Right keys [1]: [l_orderkey#X] Join type: Inner Join condition: None -(57) Project +(63) Project Output [9]: [c_custkey#X, c_name#X, c_address#X, c_nationkey#X, c_phone#X, c_acctbal#X, c_comment#X, l_extendedprice#X, l_discount#X] Input [11]: [c_custkey#X, c_name#X, c_address#X, c_nationkey#X, c_phone#X, c_acctbal#X, c_comment#X, o_orderkey#X, l_orderkey#X, l_extendedprice#X, l_discount#X] -(58) Scan parquet +(64) Scan parquet Output [2]: [n_nationkey#X, n_name#X] Batched: true Location: InMemoryFileIndex [*] PushedFilters: [IsNotNull(n_nationkey)] ReadSchema: struct -(59) Filter +(65) Filter Input [2]: [n_nationkey#X, n_name#X] Condition : isnotnull(n_nationkey#X) -(60) BroadcastExchange +(66) BroadcastExchange Input [2]: [n_nationkey#X, n_name#X] Arguments: HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [plan_id=X] -(61) BroadcastHashJoin +(67) BroadcastHashJoin Left keys [1]: [c_nationkey#X] Right keys [1]: [n_nationkey#X] Join type: Inner Join condition: None -(62) Project +(68) Project Output [9]: [c_custkey#X, c_name#X, c_address#X, c_phone#X, c_acctbal#X, c_comment#X, l_extendedprice#X, l_discount#X, n_name#X] Input [11]: [c_custkey#X, c_name#X, c_address#X, c_nationkey#X, c_phone#X, c_acctbal#X, c_comment#X, l_extendedprice#X, l_discount#X, n_nationkey#X, n_name#X] -(63) HashAggregate +(69) HashAggregate Input [9]: [c_custkey#X, c_name#X, c_address#X, c_phone#X, c_acctbal#X, c_comment#X, l_extendedprice#X, l_discount#X, n_name#X] Keys [7]: [c_custkey#X, c_name#X, c_acctbal#X, c_phone#X, n_name#X, c_address#X, c_comment#X] Functions [1]: [partial_sum((l_extendedprice#X * (1 - l_discount#X)))] Aggregate Attributes [2]: [sum#X, isEmpty#X] Results [9]: [c_custkey#X, c_name#X, c_acctbal#X, c_phone#X, n_name#X, c_address#X, c_comment#X, sum#X, isEmpty#X] -(64) Exchange +(70) Exchange Input [9]: [c_custkey#X, c_name#X, c_acctbal#X, c_phone#X, n_name#X, c_address#X, c_comment#X, sum#X, isEmpty#X] Arguments: hashpartitioning(c_custkey#X, c_name#X, c_acctbal#X, c_phone#X, n_name#X, c_address#X, c_comment#X, 1), ENSURE_REQUIREMENTS, [plan_id=X] -(65) HashAggregate +(71) HashAggregate Input [9]: [c_custkey#X, c_name#X, c_acctbal#X, c_phone#X, n_name#X, c_address#X, c_comment#X, sum#X, isEmpty#X] Keys [7]: [c_custkey#X, c_name#X, c_acctbal#X, c_phone#X, n_name#X, c_address#X, c_comment#X] Functions [1]: [sum((l_extendedprice#X * (1 - l_discount#X)))] Aggregate Attributes [1]: [sum((l_extendedprice#X * (1 - l_discount#X)))#X] Results [8]: [c_custkey#X, c_name#X, sum((l_extendedprice#X * (1 - l_discount#X)))#X AS revenue#X, c_acctbal#X, n_name#X, c_address#X, c_phone#X, c_comment#X] -(66) TakeOrderedAndProject +(72) TakeOrderedAndProject Input [8]: [c_custkey#X, c_name#X, revenue#X, c_acctbal#X, n_name#X, c_address#X, c_phone#X, c_comment#X] Arguments: X, [revenue#X DESC NULLS LAST], [c_custkey#X, c_name#X, revenue#X, c_acctbal#X, n_name#X, c_address#X, c_phone#X, c_comment#X] -(67) AdaptiveSparkPlan +(73) AdaptiveSparkPlan Output [8]: [c_custkey#X, c_name#X, revenue#X, c_acctbal#X, n_name#X, c_address#X, c_phone#X, c_comment#X] Arguments: isFinalPlan=true \ No newline at end of file diff --git a/backends-velox/src/test/resources/tpch-approved-plan/v1-ras/spark32/3.txt b/backends-velox/src/test/resources/tpch-approved-plan/v1-ras/spark32/3.txt index 8927e80a6cd15..ac63158b24b7e 100644 --- a/backends-velox/src/test/resources/tpch-approved-plan/v1-ras/spark32/3.txt +++ b/backends-velox/src/test/resources/tpch-approved-plan/v1-ras/spark32/3.txt @@ -1,60 +1,65 @@ == Physical Plan == -AdaptiveSparkPlan (59) +AdaptiveSparkPlan (65) +- == Final Plan == - TakeOrderedAndProject (39) - +- VeloxColumnarToRowExec (38) - +- ^ ProjectExecTransformer (36) - +- ^ RegularHashAggregateExecTransformer (35) - +- ^ RegularHashAggregateExecTransformer (34) - +- ^ ProjectExecTransformer (33) - +- ^ ShuffledHashJoinExecTransformer Inner (32) - :- ^ InputIteratorTransformer (23) - : +- ^ InputAdapter (22) - : +- ^ ShuffleQueryStage (21) - : +- ColumnarExchange (20) - : +- ^ ProjectExecTransformer (18) - : +- ^ ShuffledHashJoinExecTransformer Inner (17) - : :- ^ InputIteratorTransformer (8) - : : +- ^ InputAdapter (7) - : : +- ^ ShuffleQueryStage (6) - : : +- ColumnarExchange (5) - : : +- ^ ProjectExecTransformer (3) - : : +- ^ NoopFilter (2) - : : +- ^ Scan parquet (1) - : +- ^ InputIteratorTransformer (16) - : +- ^ InputAdapter (15) - : +- ^ ShuffleQueryStage (14) - : +- ColumnarExchange (13) - : +- ^ ProjectExecTransformer (11) - : +- ^ NoopFilter (10) - : +- ^ Scan parquet (9) - +- ^ InputIteratorTransformer (31) - +- ^ InputAdapter (30) - +- ^ ShuffleQueryStage (29) - +- ColumnarExchange (28) - +- ^ ProjectExecTransformer (26) - +- ^ NoopFilter (25) - +- ^ Scan parquet (24) + VeloxColumnarToRowExec (45) + +- ^ ProjectExecTransformer (43) + +- ^ TopNTransformer (42) + +- ^ InputIteratorTransformer (41) + +- ^ InputAdapter (40) + +- ^ ColumnarExchange (39) + +- ^ TopNTransformer (37) + +- ^ ProjectExecTransformer (36) + +- ^ RegularHashAggregateExecTransformer (35) + +- ^ RegularHashAggregateExecTransformer (34) + +- ^ ProjectExecTransformer (33) + +- ^ ShuffledHashJoinExecTransformer Inner (32) + :- ^ InputIteratorTransformer (23) + : +- ^ InputAdapter (22) + : +- ^ ShuffleQueryStage (21) + : +- ColumnarExchange (20) + : +- ^ ProjectExecTransformer (18) + : +- ^ ShuffledHashJoinExecTransformer Inner (17) + : :- ^ InputIteratorTransformer (8) + : : +- ^ InputAdapter (7) + : : +- ^ ShuffleQueryStage (6) + : : +- ColumnarExchange (5) + : : +- ^ ProjectExecTransformer (3) + : : +- ^ NoopFilter (2) + : : +- ^ Scan parquet (1) + : +- ^ InputIteratorTransformer (16) + : +- ^ InputAdapter (15) + : +- ^ ShuffleQueryStage (14) + : +- ColumnarExchange (13) + : +- ^ ProjectExecTransformer (11) + : +- ^ NoopFilter (10) + : +- ^ Scan parquet (9) + +- ^ InputIteratorTransformer (31) + +- ^ InputAdapter (30) + +- ^ ShuffleQueryStage (29) + +- ColumnarExchange (28) + +- ^ ProjectExecTransformer (26) + +- ^ NoopFilter (25) + +- ^ Scan parquet (24) +- == Initial Plan == - TakeOrderedAndProject (58) - +- HashAggregate (57) - +- HashAggregate (56) - +- Project (55) - +- ShuffledHashJoin Inner BuildRight (54) - :- Exchange (49) - : +- Project (48) - : +- ShuffledHashJoin Inner BuildLeft (47) - : :- Exchange (43) - : : +- Project (42) - : : +- Filter (41) - : : +- Scan parquet (40) - : +- Exchange (46) - : +- Filter (45) - : +- Scan parquet (44) - +- Exchange (53) - +- Project (52) - +- Filter (51) - +- Scan parquet (50) + TakeOrderedAndProject (64) + +- HashAggregate (63) + +- HashAggregate (62) + +- Project (61) + +- ShuffledHashJoin Inner BuildRight (60) + :- Exchange (55) + : +- Project (54) + : +- ShuffledHashJoin Inner BuildLeft (53) + : :- Exchange (49) + : : +- Project (48) + : : +- Filter (47) + : : +- Scan parquet (46) + : +- Exchange (52) + : +- Filter (51) + : +- Scan parquet (50) + +- Exchange (59) + +- Project (58) + +- Filter (57) + +- Scan parquet (56) (1) Scan parquet @@ -210,110 +215,132 @@ Results [4]: [l_orderkey#X, o_orderdate#X, o_shippriority#X, sum(CheckOverflow(( Output [4]: [l_orderkey#X, sum(CheckOverflow((promote_precision(cast(l_extendedprice#X as decimal(13,2))) * promote_precision(CheckOverflow((1.00 - promote_precision(cast(l_discount#X as decimal(13,2)))), DecimalType(13,2), true))), DecimalType(26,4), true))#X AS revenue#X, o_orderdate#X, o_shippriority#X] Input [4]: [l_orderkey#X, o_orderdate#X, o_shippriority#X, sum(CheckOverflow((promote_precision(cast(l_extendedprice#X as decimal(13,2))) * promote_precision(CheckOverflow((1.00 - promote_precision(cast(l_discount#X as decimal(13,2)))), DecimalType(13,2), true))), DecimalType(26,4), true))#X] -(37) WholeStageCodegenTransformer (X) +(37) TopNTransformer +Input [4]: [l_orderkey#X, revenue#X, o_orderdate#X, o_shippriority#X] +Arguments: X, [revenue#X DESC NULLS LAST, o_orderdate#X ASC NULLS FIRST], false + +(38) WholeStageCodegenTransformer (X) Input [4]: [l_orderkey#X, revenue#X, o_orderdate#X, o_shippriority#X] Arguments: false -(38) VeloxColumnarToRowExec +(39) ColumnarExchange Input [4]: [l_orderkey#X, revenue#X, o_orderdate#X, o_shippriority#X] +Arguments: SinglePartition, ENSURE_REQUIREMENTS, [l_orderkey#X, revenue#X, o_orderdate#X, o_shippriority#X], [plan_id=X], [id=#X] -(39) TakeOrderedAndProject +(40) InputAdapter +Input [4]: [l_orderkey#X, revenue#X, o_orderdate#X, o_shippriority#X] + +(41) InputIteratorTransformer +Input [4]: [l_orderkey#X, revenue#X, o_orderdate#X, o_shippriority#X] + +(42) TopNTransformer +Input [4]: [l_orderkey#X, revenue#X, o_orderdate#X, o_shippriority#X] +Arguments: X, [revenue#X DESC NULLS LAST, o_orderdate#X ASC NULLS FIRST], true + +(43) ProjectExecTransformer +Output [4]: [l_orderkey#X, revenue#X, o_orderdate#X, o_shippriority#X] Input [4]: [l_orderkey#X, revenue#X, o_orderdate#X, o_shippriority#X] -Arguments: X, [revenue#X DESC NULLS LAST, o_orderdate#X ASC NULLS FIRST], [l_orderkey#X, revenue#X, o_orderdate#X, o_shippriority#X] -(40) Scan parquet +(44) WholeStageCodegenTransformer (X) +Input [4]: [l_orderkey#X, revenue#X, o_orderdate#X, o_shippriority#X] +Arguments: false + +(45) VeloxColumnarToRowExec +Input [4]: [l_orderkey#X, revenue#X, o_orderdate#X, o_shippriority#X] + +(46) Scan parquet Output [2]: [c_custkey#X, c_mktsegment#X] Batched: true Location: InMemoryFileIndex [*] PushedFilters: [IsNotNull(c_mktsegment), EqualTo(c_mktsegment,BUILDING), IsNotNull(c_custkey)] ReadSchema: struct -(41) Filter +(47) Filter Input [2]: [c_custkey#X, c_mktsegment#X] Condition : ((isnotnull(c_mktsegment#X) AND (c_mktsegment#X = BUILDING)) AND isnotnull(c_custkey#X)) -(42) Project +(48) Project Output [1]: [c_custkey#X] Input [2]: [c_custkey#X, c_mktsegment#X] -(43) Exchange +(49) Exchange Input [1]: [c_custkey#X] Arguments: hashpartitioning(c_custkey#X, 1), ENSURE_REQUIREMENTS, [plan_id=X] -(44) Scan parquet +(50) Scan parquet Output [4]: [o_orderkey#X, o_custkey#X, o_orderdate#X, o_shippriority#X] Batched: true Location: InMemoryFileIndex [*] PushedFilters: [IsNotNull(o_orderdate), LessThan(o_orderdate,1995-03-15), IsNotNull(o_custkey), IsNotNull(o_orderkey)] ReadSchema: struct -(45) Filter +(51) Filter Input [4]: [o_orderkey#X, o_custkey#X, o_orderdate#X, o_shippriority#X] Condition : (((isnotnull(o_orderdate#X) AND (o_orderdate#X < 1995-03-15)) AND isnotnull(o_custkey#X)) AND isnotnull(o_orderkey#X)) -(46) Exchange +(52) Exchange Input [4]: [o_orderkey#X, o_custkey#X, o_orderdate#X, o_shippriority#X] Arguments: hashpartitioning(o_custkey#X, 1), ENSURE_REQUIREMENTS, [plan_id=X] -(47) ShuffledHashJoin +(53) ShuffledHashJoin Left keys [1]: [c_custkey#X] Right keys [1]: [o_custkey#X] Join condition: None -(48) Project +(54) Project Output [3]: [o_orderkey#X, o_orderdate#X, o_shippriority#X] Input [5]: [c_custkey#X, o_orderkey#X, o_custkey#X, o_orderdate#X, o_shippriority#X] -(49) Exchange +(55) Exchange Input [3]: [o_orderkey#X, o_orderdate#X, o_shippriority#X] Arguments: hashpartitioning(o_orderkey#X, 1), ENSURE_REQUIREMENTS, [plan_id=X] -(50) Scan parquet +(56) Scan parquet Output [4]: [l_orderkey#X, l_extendedprice#X, l_discount#X, l_shipdate#X] Batched: true Location: InMemoryFileIndex [*] PushedFilters: [IsNotNull(l_shipdate), GreaterThan(l_shipdate,1995-03-15), IsNotNull(l_orderkey)] ReadSchema: struct -(51) Filter +(57) Filter Input [4]: [l_orderkey#X, l_extendedprice#X, l_discount#X, l_shipdate#X] Condition : ((isnotnull(l_shipdate#X) AND (l_shipdate#X > 1995-03-15)) AND isnotnull(l_orderkey#X)) -(52) Project +(58) Project Output [3]: [l_orderkey#X, l_extendedprice#X, l_discount#X] Input [4]: [l_orderkey#X, l_extendedprice#X, l_discount#X, l_shipdate#X] -(53) Exchange +(59) Exchange Input [3]: [l_orderkey#X, l_extendedprice#X, l_discount#X] Arguments: hashpartitioning(l_orderkey#X, 1), ENSURE_REQUIREMENTS, [plan_id=X] -(54) ShuffledHashJoin +(60) ShuffledHashJoin Left keys [1]: [o_orderkey#X] Right keys [1]: [l_orderkey#X] Join condition: None -(55) Project +(61) Project Output [5]: [o_orderdate#X, o_shippriority#X, l_orderkey#X, l_extendedprice#X, l_discount#X] Input [6]: [o_orderkey#X, o_orderdate#X, o_shippriority#X, l_orderkey#X, l_extendedprice#X, l_discount#X] -(56) HashAggregate +(62) HashAggregate Input [5]: [o_orderdate#X, o_shippriority#X, l_orderkey#X, l_extendedprice#X, l_discount#X] Keys [3]: [l_orderkey#X, o_orderdate#X, o_shippriority#X] Functions [1]: [partial_sum(CheckOverflow((promote_precision(cast(l_extendedprice#X as decimal(13,2))) * promote_precision(CheckOverflow((1.00 - promote_precision(cast(l_discount#X as decimal(13,2)))), DecimalType(13,2), true))), DecimalType(26,4), true))] Aggregate Attributes [2]: [sum#X, isEmpty#X] Results [5]: [l_orderkey#X, o_orderdate#X, o_shippriority#X, sum#X, isEmpty#X] -(57) HashAggregate +(63) HashAggregate Input [5]: [l_orderkey#X, o_orderdate#X, o_shippriority#X, sum#X, isEmpty#X] Keys [3]: [l_orderkey#X, o_orderdate#X, o_shippriority#X] Functions [1]: [sum(CheckOverflow((promote_precision(cast(l_extendedprice#X as decimal(13,2))) * promote_precision(CheckOverflow((1.00 - promote_precision(cast(l_discount#X as decimal(13,2)))), DecimalType(13,2), true))), DecimalType(26,4), true))] Aggregate Attributes [1]: [sum(CheckOverflow((promote_precision(cast(l_extendedprice#X as decimal(13,2))) * promote_precision(CheckOverflow((1.00 - promote_precision(cast(l_discount#X as decimal(13,2)))), DecimalType(13,2), true))), DecimalType(26,4), true))#X] Results [4]: [l_orderkey#X, sum(CheckOverflow((promote_precision(cast(l_extendedprice#X as decimal(13,2))) * promote_precision(CheckOverflow((1.00 - promote_precision(cast(l_discount#X as decimal(13,2)))), DecimalType(13,2), true))), DecimalType(26,4), true))#X AS revenue#X, o_orderdate#X, o_shippriority#X] -(58) TakeOrderedAndProject +(64) TakeOrderedAndProject Input [4]: [l_orderkey#X, revenue#X, o_orderdate#X, o_shippriority#X] Arguments: X, [revenue#X DESC NULLS LAST, o_orderdate#X ASC NULLS FIRST], [l_orderkey#X, revenue#X, o_orderdate#X, o_shippriority#X] -(59) AdaptiveSparkPlan +(65) AdaptiveSparkPlan Output [4]: [l_orderkey#X, revenue#X, o_orderdate#X, o_shippriority#X] Arguments: isFinalPlan=true \ No newline at end of file diff --git a/backends-velox/src/test/resources/tpch-approved-plan/v1-ras/spark33/10.txt b/backends-velox/src/test/resources/tpch-approved-plan/v1-ras/spark33/10.txt index 2b450c890975e..aef12f7859e4e 100644 --- a/backends-velox/src/test/resources/tpch-approved-plan/v1-ras/spark33/10.txt +++ b/backends-velox/src/test/resources/tpch-approved-plan/v1-ras/spark33/10.txt @@ -1,85 +1,90 @@ == Physical Plan == -AdaptiveSparkPlan (87) +AdaptiveSparkPlan (93) +- == Final Plan == - TakeOrderedAndProject (60) - +- VeloxColumnarToRowExec (59) - +- ^ ProjectExecTransformer (57) - +- ^ RegularHashAggregateExecTransformer (56) - +- ^ InputIteratorTransformer (55) - +- ^ InputAdapter (54) - +- ^ ShuffleQueryStage (53), Statistics(X) - +- ColumnarExchange (52) - +- ^ ProjectExecTransformer (50) - +- ^ FlushableHashAggregateExecTransformer (49) - +- ^ ProjectExecTransformer (48) - +- ^ ShuffledHashJoinExecTransformer Inner (47) - :- ^ InputIteratorTransformer (38) - : +- ^ InputAdapter (37) - : +- ^ ShuffleQueryStage (36), Statistics(X) - : +- ColumnarExchange (35) - : +- ^ ProjectExecTransformer (33) - : +- ^ ShuffledHashJoinExecTransformer Inner (32) - : :- ^ InputIteratorTransformer (23) - : : +- ^ InputAdapter (22) - : : +- ^ ShuffleQueryStage (21), Statistics(X) - : : +- ColumnarExchange (20) - : : +- ^ ProjectExecTransformer (18) - : : +- ^ ShuffledHashJoinExecTransformer Inner (17) - : : :- ^ InputIteratorTransformer (8) - : : : +- ^ InputAdapter (7) - : : : +- ^ ShuffleQueryStage (6), Statistics(X) - : : : +- ColumnarExchange (5) - : : : +- ^ ProjectExecTransformer (3) - : : : +- ^ NoopFilter (2) - : : : +- ^ Scan parquet (1) - : : +- ^ InputIteratorTransformer (16) - : : +- ^ InputAdapter (15) - : : +- ^ ShuffleQueryStage (14), Statistics(X) - : : +- ColumnarExchange (13) - : : +- ^ ProjectExecTransformer (11) - : : +- ^ NoopFilter (10) - : : +- ^ Scan parquet (9) - : +- ^ InputIteratorTransformer (31) - : +- ^ InputAdapter (30) - : +- ^ ShuffleQueryStage (29), Statistics(X) - : +- ColumnarExchange (28) - : +- ^ ProjectExecTransformer (26) - : +- ^ NoopFilter (25) - : +- ^ Scan parquet (24) - +- ^ InputIteratorTransformer (46) - +- ^ InputAdapter (45) - +- ^ ShuffleQueryStage (44), Statistics(X) - +- ColumnarExchange (43) - +- ^ ProjectExecTransformer (41) - +- ^ NoopFilter (40) - +- ^ Scan parquet (39) + VeloxColumnarToRowExec (66) + +- ^ ProjectExecTransformer (64) + +- ^ TopNTransformer (63) + +- ^ InputIteratorTransformer (62) + +- ^ InputAdapter (61) + +- ^ ColumnarExchange (60) + +- ^ TopNTransformer (58) + +- ^ ProjectExecTransformer (57) + +- ^ RegularHashAggregateExecTransformer (56) + +- ^ InputIteratorTransformer (55) + +- ^ InputAdapter (54) + +- ^ ShuffleQueryStage (53), Statistics(X) + +- ColumnarExchange (52) + +- ^ ProjectExecTransformer (50) + +- ^ FlushableHashAggregateExecTransformer (49) + +- ^ ProjectExecTransformer (48) + +- ^ ShuffledHashJoinExecTransformer Inner (47) + :- ^ InputIteratorTransformer (38) + : +- ^ InputAdapter (37) + : +- ^ ShuffleQueryStage (36), Statistics(X) + : +- ColumnarExchange (35) + : +- ^ ProjectExecTransformer (33) + : +- ^ ShuffledHashJoinExecTransformer Inner (32) + : :- ^ InputIteratorTransformer (23) + : : +- ^ InputAdapter (22) + : : +- ^ ShuffleQueryStage (21), Statistics(X) + : : +- ColumnarExchange (20) + : : +- ^ ProjectExecTransformer (18) + : : +- ^ ShuffledHashJoinExecTransformer Inner (17) + : : :- ^ InputIteratorTransformer (8) + : : : +- ^ InputAdapter (7) + : : : +- ^ ShuffleQueryStage (6), Statistics(X) + : : : +- ColumnarExchange (5) + : : : +- ^ ProjectExecTransformer (3) + : : : +- ^ NoopFilter (2) + : : : +- ^ Scan parquet (1) + : : +- ^ InputIteratorTransformer (16) + : : +- ^ InputAdapter (15) + : : +- ^ ShuffleQueryStage (14), Statistics(X) + : : +- ColumnarExchange (13) + : : +- ^ ProjectExecTransformer (11) + : : +- ^ NoopFilter (10) + : : +- ^ Scan parquet (9) + : +- ^ InputIteratorTransformer (31) + : +- ^ InputAdapter (30) + : +- ^ ShuffleQueryStage (29), Statistics(X) + : +- ColumnarExchange (28) + : +- ^ ProjectExecTransformer (26) + : +- ^ NoopFilter (25) + : +- ^ Scan parquet (24) + +- ^ InputIteratorTransformer (46) + +- ^ InputAdapter (45) + +- ^ ShuffleQueryStage (44), Statistics(X) + +- ColumnarExchange (43) + +- ^ ProjectExecTransformer (41) + +- ^ NoopFilter (40) + +- ^ Scan parquet (39) +- == Initial Plan == - TakeOrderedAndProject (86) - +- HashAggregate (85) - +- Exchange (84) - +- HashAggregate (83) - +- Project (82) - +- ShuffledHashJoin Inner BuildRight (81) - :- Exchange (77) - : +- Project (76) - : +- ShuffledHashJoin Inner BuildRight (75) - : :- Exchange (70) - : : +- Project (69) - : : +- ShuffledHashJoin Inner BuildRight (68) - : : :- Exchange (63) - : : : +- Filter (62) - : : : +- Scan parquet (61) - : : +- Exchange (67) - : : +- Project (66) - : : +- Filter (65) - : : +- Scan parquet (64) - : +- Exchange (74) - : +- Project (73) - : +- Filter (72) - : +- Scan parquet (71) - +- Exchange (80) - +- Filter (79) - +- Scan parquet (78) + TakeOrderedAndProject (92) + +- HashAggregate (91) + +- Exchange (90) + +- HashAggregate (89) + +- Project (88) + +- ShuffledHashJoin Inner BuildRight (87) + :- Exchange (83) + : +- Project (82) + : +- ShuffledHashJoin Inner BuildRight (81) + : :- Exchange (76) + : : +- Project (75) + : : +- ShuffledHashJoin Inner BuildRight (74) + : : :- Exchange (69) + : : : +- Filter (68) + : : : +- Scan parquet (67) + : : +- Exchange (73) + : : +- Project (72) + : : +- Filter (71) + : : +- Scan parquet (70) + : +- Exchange (80) + : +- Project (79) + : +- Filter (78) + : +- Scan parquet (77) + +- Exchange (86) + +- Filter (85) + +- Scan parquet (84) (1) Scan parquet @@ -317,142 +322,164 @@ Results [8]: [c_custkey#X, c_name#X, c_acctbal#X, c_phone#X, n_name#X, c_address Output [8]: [c_custkey#X, c_name#X, sum(CheckOverflow((promote_precision(cast(l_extendedprice#X as decimal(13,2))) * promote_precision(CheckOverflow((1.00 - promote_precision(cast(l_discount#X as decimal(13,2)))), DecimalType(13,2)))), DecimalType(26,4)))#X AS revenue#X, c_acctbal#X, n_name#X, c_address#X, c_phone#X, c_comment#X] Input [8]: [c_custkey#X, c_name#X, c_acctbal#X, c_phone#X, n_name#X, c_address#X, c_comment#X, sum(CheckOverflow((promote_precision(cast(l_extendedprice#X as decimal(13,2))) * promote_precision(CheckOverflow((1.00 - promote_precision(cast(l_discount#X as decimal(13,2)))), DecimalType(13,2)))), DecimalType(26,4)))#X] -(58) WholeStageCodegenTransformer (X) +(58) TopNTransformer +Input [8]: [c_custkey#X, c_name#X, revenue#X, c_acctbal#X, n_name#X, c_address#X, c_phone#X, c_comment#X] +Arguments: X, [revenue#X DESC NULLS LAST], false + +(59) WholeStageCodegenTransformer (X) Input [8]: [c_custkey#X, c_name#X, revenue#X, c_acctbal#X, n_name#X, c_address#X, c_phone#X, c_comment#X] Arguments: false -(59) VeloxColumnarToRowExec +(60) ColumnarExchange Input [8]: [c_custkey#X, c_name#X, revenue#X, c_acctbal#X, n_name#X, c_address#X, c_phone#X, c_comment#X] +Arguments: SinglePartition, ENSURE_REQUIREMENTS, [c_custkey#X, c_name#X, revenue#X, c_acctbal#X, n_name#X, c_address#X, c_phone#X, c_comment#X], [plan_id=X], [id=#X] -(60) TakeOrderedAndProject +(61) InputAdapter Input [8]: [c_custkey#X, c_name#X, revenue#X, c_acctbal#X, n_name#X, c_address#X, c_phone#X, c_comment#X] -Arguments: X, [revenue#X DESC NULLS LAST], [c_custkey#X, c_name#X, revenue#X, c_acctbal#X, n_name#X, c_address#X, c_phone#X, c_comment#X] -(61) Scan parquet +(62) InputIteratorTransformer +Input [8]: [c_custkey#X, c_name#X, revenue#X, c_acctbal#X, n_name#X, c_address#X, c_phone#X, c_comment#X] + +(63) TopNTransformer +Input [8]: [c_custkey#X, c_name#X, revenue#X, c_acctbal#X, n_name#X, c_address#X, c_phone#X, c_comment#X] +Arguments: X, [revenue#X DESC NULLS LAST], true + +(64) ProjectExecTransformer +Output [8]: [c_custkey#X, c_name#X, revenue#X, c_acctbal#X, n_name#X, c_address#X, c_phone#X, c_comment#X] +Input [8]: [c_custkey#X, c_name#X, revenue#X, c_acctbal#X, n_name#X, c_address#X, c_phone#X, c_comment#X] + +(65) WholeStageCodegenTransformer (X) +Input [8]: [c_custkey#X, c_name#X, revenue#X, c_acctbal#X, n_name#X, c_address#X, c_phone#X, c_comment#X] +Arguments: false + +(66) VeloxColumnarToRowExec +Input [8]: [c_custkey#X, c_name#X, revenue#X, c_acctbal#X, n_name#X, c_address#X, c_phone#X, c_comment#X] + +(67) Scan parquet Output [7]: [c_custkey#X, c_name#X, c_address#X, c_nationkey#X, c_phone#X, c_acctbal#X, c_comment#X] Batched: true Location: InMemoryFileIndex [*] PushedFilters: [IsNotNull(c_custkey), IsNotNull(c_nationkey)] ReadSchema: struct -(62) Filter +(68) Filter Input [7]: [c_custkey#X, c_name#X, c_address#X, c_nationkey#X, c_phone#X, c_acctbal#X, c_comment#X] Condition : (isnotnull(c_custkey#X) AND isnotnull(c_nationkey#X)) -(63) Exchange +(69) Exchange Input [7]: [c_custkey#X, c_name#X, c_address#X, c_nationkey#X, c_phone#X, c_acctbal#X, c_comment#X] Arguments: hashpartitioning(c_custkey#X, 1), ENSURE_REQUIREMENTS, [plan_id=X] -(64) Scan parquet +(70) Scan parquet Output [3]: [o_orderkey#X, o_custkey#X, o_orderdate#X] Batched: true Location: InMemoryFileIndex [*] PushedFilters: [IsNotNull(o_orderdate), GreaterThanOrEqual(o_orderdate,1993-10-01), LessThan(o_orderdate,1994-01-01), IsNotNull(o_custkey), IsNotNull(o_orderkey)] ReadSchema: struct -(65) Filter +(71) Filter Input [3]: [o_orderkey#X, o_custkey#X, o_orderdate#X] Condition : ((((isnotnull(o_orderdate#X) AND (o_orderdate#X >= 1993-10-01)) AND (o_orderdate#X < 1994-01-01)) AND isnotnull(o_custkey#X)) AND isnotnull(o_orderkey#X)) -(66) Project +(72) Project Output [2]: [o_orderkey#X, o_custkey#X] Input [3]: [o_orderkey#X, o_custkey#X, o_orderdate#X] -(67) Exchange +(73) Exchange Input [2]: [o_orderkey#X, o_custkey#X] Arguments: hashpartitioning(o_custkey#X, 1), ENSURE_REQUIREMENTS, [plan_id=X] -(68) ShuffledHashJoin +(74) ShuffledHashJoin Left keys [1]: [c_custkey#X] Right keys [1]: [o_custkey#X] Join condition: None -(69) Project +(75) Project Output [8]: [c_custkey#X, c_name#X, c_address#X, c_nationkey#X, c_phone#X, c_acctbal#X, c_comment#X, o_orderkey#X] Input [9]: [c_custkey#X, c_name#X, c_address#X, c_nationkey#X, c_phone#X, c_acctbal#X, c_comment#X, o_orderkey#X, o_custkey#X] -(70) Exchange +(76) Exchange Input [8]: [c_custkey#X, c_name#X, c_address#X, c_nationkey#X, c_phone#X, c_acctbal#X, c_comment#X, o_orderkey#X] Arguments: hashpartitioning(o_orderkey#X, 1), ENSURE_REQUIREMENTS, [plan_id=X] -(71) Scan parquet +(77) Scan parquet Output [4]: [l_orderkey#X, l_extendedprice#X, l_discount#X, l_returnflag#X] Batched: true Location: InMemoryFileIndex [*] PushedFilters: [IsNotNull(l_returnflag), EqualTo(l_returnflag,R), IsNotNull(l_orderkey)] ReadSchema: struct -(72) Filter +(78) Filter Input [4]: [l_orderkey#X, l_extendedprice#X, l_discount#X, l_returnflag#X] Condition : ((isnotnull(l_returnflag#X) AND (l_returnflag#X = R)) AND isnotnull(l_orderkey#X)) -(73) Project +(79) Project Output [3]: [l_orderkey#X, l_extendedprice#X, l_discount#X] Input [4]: [l_orderkey#X, l_extendedprice#X, l_discount#X, l_returnflag#X] -(74) Exchange +(80) Exchange Input [3]: [l_orderkey#X, l_extendedprice#X, l_discount#X] Arguments: hashpartitioning(l_orderkey#X, 1), ENSURE_REQUIREMENTS, [plan_id=X] -(75) ShuffledHashJoin +(81) ShuffledHashJoin Left keys [1]: [o_orderkey#X] Right keys [1]: [l_orderkey#X] Join condition: None -(76) Project +(82) Project Output [9]: [c_custkey#X, c_name#X, c_address#X, c_nationkey#X, c_phone#X, c_acctbal#X, c_comment#X, l_extendedprice#X, l_discount#X] Input [11]: [c_custkey#X, c_name#X, c_address#X, c_nationkey#X, c_phone#X, c_acctbal#X, c_comment#X, o_orderkey#X, l_orderkey#X, l_extendedprice#X, l_discount#X] -(77) Exchange +(83) Exchange Input [9]: [c_custkey#X, c_name#X, c_address#X, c_nationkey#X, c_phone#X, c_acctbal#X, c_comment#X, l_extendedprice#X, l_discount#X] Arguments: hashpartitioning(c_nationkey#X, 1), ENSURE_REQUIREMENTS, [plan_id=X] -(78) Scan parquet +(84) Scan parquet Output [2]: [n_nationkey#X, n_name#X] Batched: true Location: InMemoryFileIndex [*] PushedFilters: [IsNotNull(n_nationkey)] ReadSchema: struct -(79) Filter +(85) Filter Input [2]: [n_nationkey#X, n_name#X] Condition : isnotnull(n_nationkey#X) -(80) Exchange +(86) Exchange Input [2]: [n_nationkey#X, n_name#X] Arguments: hashpartitioning(n_nationkey#X, 1), ENSURE_REQUIREMENTS, [plan_id=X] -(81) ShuffledHashJoin +(87) ShuffledHashJoin Left keys [1]: [c_nationkey#X] Right keys [1]: [n_nationkey#X] Join condition: None -(82) Project +(88) Project Output [9]: [c_custkey#X, c_name#X, c_address#X, c_phone#X, c_acctbal#X, c_comment#X, l_extendedprice#X, l_discount#X, n_name#X] Input [11]: [c_custkey#X, c_name#X, c_address#X, c_nationkey#X, c_phone#X, c_acctbal#X, c_comment#X, l_extendedprice#X, l_discount#X, n_nationkey#X, n_name#X] -(83) HashAggregate +(89) HashAggregate Input [9]: [c_custkey#X, c_name#X, c_address#X, c_phone#X, c_acctbal#X, c_comment#X, l_extendedprice#X, l_discount#X, n_name#X] Keys [7]: [c_custkey#X, c_name#X, c_acctbal#X, c_phone#X, n_name#X, c_address#X, c_comment#X] Functions [1]: [partial_sum(CheckOverflow((promote_precision(cast(l_extendedprice#X as decimal(13,2))) * promote_precision(CheckOverflow((1.00 - promote_precision(cast(l_discount#X as decimal(13,2)))), DecimalType(13,2)))), DecimalType(26,4)))] Aggregate Attributes [2]: [sum#X, isEmpty#X] Results [9]: [c_custkey#X, c_name#X, c_acctbal#X, c_phone#X, n_name#X, c_address#X, c_comment#X, sum#X, isEmpty#X] -(84) Exchange +(90) Exchange Input [9]: [c_custkey#X, c_name#X, c_acctbal#X, c_phone#X, n_name#X, c_address#X, c_comment#X, sum#X, isEmpty#X] Arguments: hashpartitioning(c_custkey#X, c_name#X, c_acctbal#X, c_phone#X, n_name#X, c_address#X, c_comment#X, 1), ENSURE_REQUIREMENTS, [plan_id=X] -(85) HashAggregate +(91) HashAggregate Input [9]: [c_custkey#X, c_name#X, c_acctbal#X, c_phone#X, n_name#X, c_address#X, c_comment#X, sum#X, isEmpty#X] Keys [7]: [c_custkey#X, c_name#X, c_acctbal#X, c_phone#X, n_name#X, c_address#X, c_comment#X] Functions [1]: [sum(CheckOverflow((promote_precision(cast(l_extendedprice#X as decimal(13,2))) * promote_precision(CheckOverflow((1.00 - promote_precision(cast(l_discount#X as decimal(13,2)))), DecimalType(13,2)))), DecimalType(26,4)))] Aggregate Attributes [1]: [sum(CheckOverflow((promote_precision(cast(l_extendedprice#X as decimal(13,2))) * promote_precision(CheckOverflow((1.00 - promote_precision(cast(l_discount#X as decimal(13,2)))), DecimalType(13,2)))), DecimalType(26,4)))#X] Results [8]: [c_custkey#X, c_name#X, sum(CheckOverflow((promote_precision(cast(l_extendedprice#X as decimal(13,2))) * promote_precision(CheckOverflow((1.00 - promote_precision(cast(l_discount#X as decimal(13,2)))), DecimalType(13,2)))), DecimalType(26,4)))#X AS revenue#X, c_acctbal#X, n_name#X, c_address#X, c_phone#X, c_comment#X] -(86) TakeOrderedAndProject +(92) TakeOrderedAndProject Input [8]: [c_custkey#X, c_name#X, revenue#X, c_acctbal#X, n_name#X, c_address#X, c_phone#X, c_comment#X] Arguments: X, [revenue#X DESC NULLS LAST], [c_custkey#X, c_name#X, revenue#X, c_acctbal#X, n_name#X, c_address#X, c_phone#X, c_comment#X] -(87) AdaptiveSparkPlan +(93) AdaptiveSparkPlan Output [8]: [c_custkey#X, c_name#X, revenue#X, c_acctbal#X, n_name#X, c_address#X, c_phone#X, c_comment#X] Arguments: isFinalPlan=true \ No newline at end of file diff --git a/backends-velox/src/test/resources/tpch-approved-plan/v1-ras/spark34/3.txt b/backends-velox/src/test/resources/tpch-approved-plan/v1-ras/spark34/3.txt index 5c9e3093b6f41..4f172b8e99ed1 100644 --- a/backends-velox/src/test/resources/tpch-approved-plan/v1-ras/spark34/3.txt +++ b/backends-velox/src/test/resources/tpch-approved-plan/v1-ras/spark34/3.txt @@ -1,60 +1,65 @@ == Physical Plan == -AdaptiveSparkPlan (59) +AdaptiveSparkPlan (65) +- == Final Plan == - TakeOrderedAndProject (39) - +- VeloxColumnarToRowExec (38) - +- ^ ProjectExecTransformer (36) - +- ^ RegularHashAggregateExecTransformer (35) - +- ^ RegularHashAggregateExecTransformer (34) - +- ^ ProjectExecTransformer (33) - +- ^ ShuffledHashJoinExecTransformer Inner (32) - :- ^ InputIteratorTransformer (23) - : +- ^ InputAdapter (22) - : +- ^ ShuffleQueryStage (21), Statistics(X) - : +- ColumnarExchange (20) - : +- ^ ProjectExecTransformer (18) - : +- ^ ShuffledHashJoinExecTransformer Inner (17) - : :- ^ InputIteratorTransformer (8) - : : +- ^ InputAdapter (7) - : : +- ^ ShuffleQueryStage (6), Statistics(X) - : : +- ColumnarExchange (5) - : : +- ^ ProjectExecTransformer (3) - : : +- ^ NoopFilter (2) - : : +- ^ Scan parquet (1) - : +- ^ InputIteratorTransformer (16) - : +- ^ InputAdapter (15) - : +- ^ ShuffleQueryStage (14), Statistics(X) - : +- ColumnarExchange (13) - : +- ^ ProjectExecTransformer (11) - : +- ^ NoopFilter (10) - : +- ^ Scan parquet (9) - +- ^ InputIteratorTransformer (31) - +- ^ InputAdapter (30) - +- ^ ShuffleQueryStage (29), Statistics(X) - +- ColumnarExchange (28) - +- ^ ProjectExecTransformer (26) - +- ^ NoopFilter (25) - +- ^ Scan parquet (24) + VeloxColumnarToRowExec (45) + +- ^ ProjectExecTransformer (43) + +- ^ TopNTransformer (42) + +- ^ InputIteratorTransformer (41) + +- ^ InputAdapter (40) + +- ^ ColumnarExchange (39) + +- ^ TopNTransformer (37) + +- ^ ProjectExecTransformer (36) + +- ^ RegularHashAggregateExecTransformer (35) + +- ^ RegularHashAggregateExecTransformer (34) + +- ^ ProjectExecTransformer (33) + +- ^ ShuffledHashJoinExecTransformer Inner (32) + :- ^ InputIteratorTransformer (23) + : +- ^ InputAdapter (22) + : +- ^ ShuffleQueryStage (21), Statistics(X) + : +- ColumnarExchange (20) + : +- ^ ProjectExecTransformer (18) + : +- ^ ShuffledHashJoinExecTransformer Inner (17) + : :- ^ InputIteratorTransformer (8) + : : +- ^ InputAdapter (7) + : : +- ^ ShuffleQueryStage (6), Statistics(X) + : : +- ColumnarExchange (5) + : : +- ^ ProjectExecTransformer (3) + : : +- ^ NoopFilter (2) + : : +- ^ Scan parquet (1) + : +- ^ InputIteratorTransformer (16) + : +- ^ InputAdapter (15) + : +- ^ ShuffleQueryStage (14), Statistics(X) + : +- ColumnarExchange (13) + : +- ^ ProjectExecTransformer (11) + : +- ^ NoopFilter (10) + : +- ^ Scan parquet (9) + +- ^ InputIteratorTransformer (31) + +- ^ InputAdapter (30) + +- ^ ShuffleQueryStage (29), Statistics(X) + +- ColumnarExchange (28) + +- ^ ProjectExecTransformer (26) + +- ^ NoopFilter (25) + +- ^ Scan parquet (24) +- == Initial Plan == - TakeOrderedAndProject (58) - +- HashAggregate (57) - +- HashAggregate (56) - +- Project (55) - +- ShuffledHashJoin Inner BuildRight (54) - :- Exchange (49) - : +- Project (48) - : +- ShuffledHashJoin Inner BuildLeft (47) - : :- Exchange (43) - : : +- Project (42) - : : +- Filter (41) - : : +- Scan parquet (40) - : +- Exchange (46) - : +- Filter (45) - : +- Scan parquet (44) - +- Exchange (53) - +- Project (52) - +- Filter (51) - +- Scan parquet (50) + TakeOrderedAndProject (64) + +- HashAggregate (63) + +- HashAggregate (62) + +- Project (61) + +- ShuffledHashJoin Inner BuildRight (60) + :- Exchange (55) + : +- Project (54) + : +- ShuffledHashJoin Inner BuildLeft (53) + : :- Exchange (49) + : : +- Project (48) + : : +- Filter (47) + : : +- Scan parquet (46) + : +- Exchange (52) + : +- Filter (51) + : +- Scan parquet (50) + +- Exchange (59) + +- Project (58) + +- Filter (57) + +- Scan parquet (56) (1) Scan parquet @@ -212,112 +217,134 @@ Results [4]: [l_orderkey#X, o_orderdate#X, o_shippriority#X, sum((l_extendedpric Output [4]: [l_orderkey#X, sum((l_extendedprice#X * (1 - l_discount#X)))#X AS revenue#X, o_orderdate#X, o_shippriority#X] Input [4]: [l_orderkey#X, o_orderdate#X, o_shippriority#X, sum((l_extendedprice#X * (1 - l_discount#X)))#X] -(37) WholeStageCodegenTransformer (X) +(37) TopNTransformer +Input [4]: [l_orderkey#X, revenue#X, o_orderdate#X, o_shippriority#X] +Arguments: X, [revenue#X DESC NULLS LAST, o_orderdate#X ASC NULLS FIRST], false + +(38) WholeStageCodegenTransformer (X) Input [4]: [l_orderkey#X, revenue#X, o_orderdate#X, o_shippriority#X] Arguments: false -(38) VeloxColumnarToRowExec +(39) ColumnarExchange Input [4]: [l_orderkey#X, revenue#X, o_orderdate#X, o_shippriority#X] +Arguments: SinglePartition, ENSURE_REQUIREMENTS, [l_orderkey#X, revenue#X, o_orderdate#X, o_shippriority#X], [plan_id=X], [id=#X] -(39) TakeOrderedAndProject +(40) InputAdapter +Input [4]: [l_orderkey#X, revenue#X, o_orderdate#X, o_shippriority#X] + +(41) InputIteratorTransformer +Input [4]: [l_orderkey#X, revenue#X, o_orderdate#X, o_shippriority#X] + +(42) TopNTransformer +Input [4]: [l_orderkey#X, revenue#X, o_orderdate#X, o_shippriority#X] +Arguments: X, [revenue#X DESC NULLS LAST, o_orderdate#X ASC NULLS FIRST], true + +(43) ProjectExecTransformer +Output [4]: [l_orderkey#X, revenue#X, o_orderdate#X, o_shippriority#X] Input [4]: [l_orderkey#X, revenue#X, o_orderdate#X, o_shippriority#X] -Arguments: X, [revenue#X DESC NULLS LAST, o_orderdate#X ASC NULLS FIRST], [l_orderkey#X, revenue#X, o_orderdate#X, o_shippriority#X] -(40) Scan parquet +(44) WholeStageCodegenTransformer (X) +Input [4]: [l_orderkey#X, revenue#X, o_orderdate#X, o_shippriority#X] +Arguments: false + +(45) VeloxColumnarToRowExec +Input [4]: [l_orderkey#X, revenue#X, o_orderdate#X, o_shippriority#X] + +(46) Scan parquet Output [2]: [c_custkey#X, c_mktsegment#X] Batched: true Location: InMemoryFileIndex [*] PushedFilters: [IsNotNull(c_mktsegment), EqualTo(c_mktsegment,BUILDING), IsNotNull(c_custkey)] ReadSchema: struct -(41) Filter +(47) Filter Input [2]: [c_custkey#X, c_mktsegment#X] Condition : ((isnotnull(c_mktsegment#X) AND (c_mktsegment#X = BUILDING)) AND isnotnull(c_custkey#X)) -(42) Project +(48) Project Output [1]: [c_custkey#X] Input [2]: [c_custkey#X, c_mktsegment#X] -(43) Exchange +(49) Exchange Input [1]: [c_custkey#X] Arguments: hashpartitioning(c_custkey#X, 1), ENSURE_REQUIREMENTS, [plan_id=X] -(44) Scan parquet +(50) Scan parquet Output [4]: [o_orderkey#X, o_custkey#X, o_orderdate#X, o_shippriority#X] Batched: true Location: InMemoryFileIndex [*] PushedFilters: [IsNotNull(o_orderdate), LessThan(o_orderdate,1995-03-15), IsNotNull(o_custkey), IsNotNull(o_orderkey)] ReadSchema: struct -(45) Filter +(51) Filter Input [4]: [o_orderkey#X, o_custkey#X, o_orderdate#X, o_shippriority#X] Condition : (((isnotnull(o_orderdate#X) AND (o_orderdate#X < 1995-03-15)) AND isnotnull(o_custkey#X)) AND isnotnull(o_orderkey#X)) -(46) Exchange +(52) Exchange Input [4]: [o_orderkey#X, o_custkey#X, o_orderdate#X, o_shippriority#X] Arguments: hashpartitioning(o_custkey#X, 1), ENSURE_REQUIREMENTS, [plan_id=X] -(47) ShuffledHashJoin +(53) ShuffledHashJoin Left keys [1]: [c_custkey#X] Right keys [1]: [o_custkey#X] Join type: Inner Join condition: None -(48) Project +(54) Project Output [3]: [o_orderkey#X, o_orderdate#X, o_shippriority#X] Input [5]: [c_custkey#X, o_orderkey#X, o_custkey#X, o_orderdate#X, o_shippriority#X] -(49) Exchange +(55) Exchange Input [3]: [o_orderkey#X, o_orderdate#X, o_shippriority#X] Arguments: hashpartitioning(o_orderkey#X, 1), ENSURE_REQUIREMENTS, [plan_id=X] -(50) Scan parquet +(56) Scan parquet Output [4]: [l_orderkey#X, l_extendedprice#X, l_discount#X, l_shipdate#X] Batched: true Location: InMemoryFileIndex [*] PushedFilters: [IsNotNull(l_shipdate), GreaterThan(l_shipdate,1995-03-15), IsNotNull(l_orderkey)] ReadSchema: struct -(51) Filter +(57) Filter Input [4]: [l_orderkey#X, l_extendedprice#X, l_discount#X, l_shipdate#X] Condition : ((isnotnull(l_shipdate#X) AND (l_shipdate#X > 1995-03-15)) AND isnotnull(l_orderkey#X)) -(52) Project +(58) Project Output [3]: [l_orderkey#X, l_extendedprice#X, l_discount#X] Input [4]: [l_orderkey#X, l_extendedprice#X, l_discount#X, l_shipdate#X] -(53) Exchange +(59) Exchange Input [3]: [l_orderkey#X, l_extendedprice#X, l_discount#X] Arguments: hashpartitioning(l_orderkey#X, 1), ENSURE_REQUIREMENTS, [plan_id=X] -(54) ShuffledHashJoin +(60) ShuffledHashJoin Left keys [1]: [o_orderkey#X] Right keys [1]: [l_orderkey#X] Join type: Inner Join condition: None -(55) Project +(61) Project Output [5]: [o_orderdate#X, o_shippriority#X, l_orderkey#X, l_extendedprice#X, l_discount#X] Input [6]: [o_orderkey#X, o_orderdate#X, o_shippriority#X, l_orderkey#X, l_extendedprice#X, l_discount#X] -(56) HashAggregate +(62) HashAggregate Input [5]: [o_orderdate#X, o_shippriority#X, l_orderkey#X, l_extendedprice#X, l_discount#X] Keys [3]: [l_orderkey#X, o_orderdate#X, o_shippriority#X] Functions [1]: [partial_sum((l_extendedprice#X * (1 - l_discount#X)))] Aggregate Attributes [2]: [sum#X, isEmpty#X] Results [5]: [l_orderkey#X, o_orderdate#X, o_shippriority#X, sum#X, isEmpty#X] -(57) HashAggregate +(63) HashAggregate Input [5]: [l_orderkey#X, o_orderdate#X, o_shippriority#X, sum#X, isEmpty#X] Keys [3]: [l_orderkey#X, o_orderdate#X, o_shippriority#X] Functions [1]: [sum((l_extendedprice#X * (1 - l_discount#X)))] Aggregate Attributes [1]: [sum((l_extendedprice#X * (1 - l_discount#X)))#X] Results [4]: [l_orderkey#X, sum((l_extendedprice#X * (1 - l_discount#X)))#X AS revenue#X, o_orderdate#X, o_shippriority#X] -(58) TakeOrderedAndProject +(64) TakeOrderedAndProject Input [4]: [l_orderkey#X, revenue#X, o_orderdate#X, o_shippriority#X] Arguments: X, [revenue#X DESC NULLS LAST, o_orderdate#X ASC NULLS FIRST], [l_orderkey#X, revenue#X, o_orderdate#X, o_shippriority#X] -(59) AdaptiveSparkPlan +(65) AdaptiveSparkPlan Output [4]: [l_orderkey#X, revenue#X, o_orderdate#X, o_shippriority#X] Arguments: isFinalPlan=true \ No newline at end of file diff --git a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxTPCHSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxTPCHSuite.scala index d53b0010fb87a..04d377bab22d0 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxTPCHSuite.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxTPCHSuite.scala @@ -272,7 +272,7 @@ class VeloxTPCHMiscSuite extends VeloxTPCHTableSupport { |""".stripMargin ) val topN = df.queryExecution.executedPlan.collect { case sortExec: TopNTransformer => sortExec } - assert(topN.size == 1) + assert(topN.size == 2) val result = df.collect() df.explain(true) val expectedResult = Seq(Row(0), Row(1), Row(2), Row(3), Row(4)) diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/RasOffload.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/RasOffload.scala index 6af89dc057aa9..036a20dd14625 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/RasOffload.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/RasOffload.scala @@ -105,20 +105,20 @@ object RasOffload { validator.validate(from) match { case Validator.Passed => val offloaded = base.offload(from) - offloaded match { - case t: GlutenPlan if !t.doValidate().isValid => - // 4. If native validation fails on the offloaded node, return the - // original one. - from - case other => - other + val offloadedNodes = offloaded.collect[GlutenPlan] { case t: GlutenPlan => t } + if (offloadedNodes.exists(!_.doValidate().isValid)) { + // 4. If native validation fails on the offloaded node, return the + // original one. + from + } else { + offloaded } case Validator.Failed(reason) => from } } - // 5. If rewritten plan is not offload-able, discard it. + // 5. If the entire rewritten plan is not offload-able, discard it. if (offloaded.fastEquals(rewritten)) { return List.empty } diff --git a/gluten-core/src/main/scala/org/apache/gluten/planner/cost/GlutenCostModel.scala b/gluten-core/src/main/scala/org/apache/gluten/planner/cost/GlutenCostModel.scala index c45314a9f58f9..ca8833b66b0ba 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/planner/cost/GlutenCostModel.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/planner/cost/GlutenCostModel.scala @@ -25,7 +25,7 @@ import org.apache.gluten.ras.{Cost, CostModel} import org.apache.gluten.utils.PlanUtil import org.apache.spark.internal.Logging -import org.apache.spark.sql.execution.{ColumnarToRowExec, RowToColumnarExec, SparkPlan} +import org.apache.spark.sql.execution.{ColumnarToRowExec, RowToColumnarExec, SparkPlan, TakeOrderedAndProjectExec} import org.apache.spark.sql.execution.joins.ShuffledHashJoinExec import org.apache.spark.sql.utils.ReflectionUtil @@ -88,6 +88,9 @@ object GlutenCostModel extends Logging { case RowToColumnarLike(child) => 3L case p if PlanUtil.isGlutenColumnarOp(p) => 2L case p if PlanUtil.isVanillaColumnarOp(p) => 3L + case _: TakeOrderedAndProjectExec => + // Make vanilla c2r -> top-n larger than offloaded project -> top-n -> exchange -> top-n + 6L // Other row ops. Usually a vanilla row op. case _ => 5L }