Skip to content

Commit

Permalink
[VL] RAS: Include aggregate transformation into enumerated transform,…
Browse files Browse the repository at this point in the history
… and add TPC-H golden checks for RAS
  • Loading branch information
zhztheplayer authored Apr 10, 2024
1 parent 8e5f63c commit 8d23a30
Show file tree
Hide file tree
Showing 139 changed files with 49,570 additions and 136 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,7 @@ object BackendSettings extends BackendSettingsApi {
!mayNeedOffload(plan)
}

override def fallbackAggregateWithChild(): Boolean = true
override def fallbackAggregateWithEmptyOutputChild(): Boolean = true

override def recreateJoinExecOnFallback(): Boolean = true
override def rescaleDecimalLiteral(): Boolean = true
Expand Down

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,314 @@
== Physical Plan ==
AdaptiveSparkPlan (58)
+- == Final Plan ==
VeloxColumnarToRowExec (38)
+- ^ SortExecTransformer (36)
+- ^ InputIteratorTransformer (35)
+- ^ InputAdapter (34)
+- ^ ShuffleQueryStage (33)
+- ColumnarExchange (32)
+- ^ FilterExecTransformer (30)
+- ^ RegularHashAggregateExecTransformer (29)
+- ^ InputIteratorTransformer (28)
+- ^ InputAdapter (27)
+- ^ ShuffleQueryStage (26)
+- ColumnarExchange (25)
+- ^ ProjectExecTransformer (23)
+- ^ FlushableHashAggregateExecTransformer (22)
+- ^ ProjectExecTransformer (21)
+- ^ BroadcastHashJoinExecTransformer Inner (20)
:- ^ ProjectExecTransformer (11)
: +- ^ BroadcastHashJoinExecTransformer Inner (10)
: :- ^ FilterExecTransformer (2)
: : +- ^ Scan parquet (1)
: +- ^ InputIteratorTransformer (9)
: +- ^ InputAdapter (8)
: +- ^ BroadcastQueryStage (7)
: +- ColumnarBroadcastExchange (6)
: +- ^ FilterExecTransformer (4)
: +- ^ Scan parquet (3)
+- ^ InputIteratorTransformer (19)
+- ^ InputAdapter (18)
+- ^ BroadcastQueryStage (17)
+- ColumnarBroadcastExchange (16)
+- ^ ProjectExecTransformer (14)
+- ^ FilterExecTransformer (13)
+- ^ Scan parquet (12)
+- == Initial Plan ==
Sort (57)
+- Exchange (56)
+- Filter (55)
+- HashAggregate (54)
+- Exchange (53)
+- HashAggregate (52)
+- Project (51)
+- BroadcastHashJoin Inner BuildRight (50)
:- Project (45)
: +- BroadcastHashJoin Inner BuildRight (44)
: :- Filter (40)
: : +- Scan parquet (39)
: +- BroadcastExchange (43)
: +- Filter (42)
: +- Scan parquet (41)
+- BroadcastExchange (49)
+- Project (48)
+- Filter (47)
+- Scan parquet (46)


(1) Scan parquet
Output [4]: [ps_partkey#X, ps_suppkey#X, ps_availqty#X, ps_supplycost#X]
Batched: true
Location: InMemoryFileIndex [*]
PushedFilters: [IsNotNull(ps_suppkey)]
ReadSchema: struct<ps_partkey:bigint,ps_suppkey:bigint,ps_availqty:int,ps_supplycost:decimal(12,2)>

(2) FilterExecTransformer
Input [4]: [ps_partkey#X, ps_suppkey#X, ps_availqty#X, ps_supplycost#X]
Arguments: isnotnull(ps_suppkey#X)

(3) Scan parquet
Output [2]: [s_suppkey#X, s_nationkey#X]
Batched: true
Location: InMemoryFileIndex [*]
PushedFilters: [IsNotNull(s_suppkey), IsNotNull(s_nationkey)]
ReadSchema: struct<s_suppkey:bigint,s_nationkey:bigint>

(4) FilterExecTransformer
Input [2]: [s_suppkey#X, s_nationkey#X]
Arguments: (isnotnull(s_suppkey#X) AND isnotnull(s_nationkey#X))

(5) WholeStageCodegenTransformer (X)
Input [2]: [s_suppkey#X, s_nationkey#X]
Arguments: false

(6) ColumnarBroadcastExchange
Input [2]: [s_suppkey#X, s_nationkey#X]
Arguments: HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [plan_id=X]

(7) BroadcastQueryStage
Output [2]: [s_suppkey#X, s_nationkey#X]
Arguments: X

(8) InputAdapter
Input [2]: [s_suppkey#X, s_nationkey#X]

(9) InputIteratorTransformer
Input [2]: [s_suppkey#X, s_nationkey#X]

(10) BroadcastHashJoinExecTransformer
Left keys [1]: [ps_suppkey#X]
Right keys [1]: [s_suppkey#X]
Join condition: None

(11) ProjectExecTransformer
Output [4]: [ps_partkey#X, ps_availqty#X, ps_supplycost#X, s_nationkey#X]
Input [6]: [ps_partkey#X, ps_suppkey#X, ps_availqty#X, ps_supplycost#X, s_suppkey#X, s_nationkey#X]

(12) Scan parquet
Output [2]: [n_nationkey#X, n_name#X]
Batched: true
Location: InMemoryFileIndex [*]
PushedFilters: [IsNotNull(n_name), EqualTo(n_name,GERMANY), IsNotNull(n_nationkey)]
ReadSchema: struct<n_nationkey:bigint,n_name:string>

(13) FilterExecTransformer
Input [2]: [n_nationkey#X, n_name#X]
Arguments: ((isnotnull(n_name#X) AND (n_name#X = GERMANY)) AND isnotnull(n_nationkey#X))

(14) ProjectExecTransformer
Output [1]: [n_nationkey#X]
Input [2]: [n_nationkey#X, n_name#X]

(15) WholeStageCodegenTransformer (X)
Input [1]: [n_nationkey#X]
Arguments: false

(16) ColumnarBroadcastExchange
Input [1]: [n_nationkey#X]
Arguments: HashedRelationBroadcastMode(List(input[0, bigint, true]),false), [plan_id=X]

(17) BroadcastQueryStage
Output [1]: [n_nationkey#X]
Arguments: X

(18) InputAdapter
Input [1]: [n_nationkey#X]

(19) InputIteratorTransformer
Input [1]: [n_nationkey#X]

(20) BroadcastHashJoinExecTransformer
Left keys [1]: [s_nationkey#X]
Right keys [1]: [n_nationkey#X]
Join condition: None

(21) ProjectExecTransformer
Output [4]: [ps_partkey#X, ps_availqty#X, ps_supplycost#X, CheckOverflow((promote_precision(ps_supplycost#X) * promote_precision(cast(cast(ps_availqty#X as decimal(10,0)) as decimal(12,2)))), DecimalType(23,2), true) AS _pre_X#X]
Input [5]: [ps_partkey#X, ps_availqty#X, ps_supplycost#X, s_nationkey#X, n_nationkey#X]

(22) FlushableHashAggregateExecTransformer
Input [4]: [ps_partkey#X, ps_availqty#X, ps_supplycost#X, _pre_X#X]
Keys [1]: [ps_partkey#X]
Functions [1]: [partial_sum(_pre_X#X)]
Aggregate Attributes [2]: [sum#X, isEmpty#X]
Results [3]: [ps_partkey#X, sum#X, isEmpty#X]

(23) ProjectExecTransformer
Output [4]: [hash(ps_partkey#X, 42) AS hash_partition_key#X, ps_partkey#X, sum#X, isEmpty#X]
Input [3]: [ps_partkey#X, sum#X, isEmpty#X]

(24) WholeStageCodegenTransformer (X)
Input [4]: [hash_partition_key#X, ps_partkey#X, sum#X, isEmpty#X]
Arguments: false

(25) ColumnarExchange
Input [4]: [hash_partition_key#X, ps_partkey#X, sum#X, isEmpty#X]
Arguments: hashpartitioning(ps_partkey#X, 1), ENSURE_REQUIREMENTS, [ps_partkey#X, sum#X, isEmpty#X], [plan_id=X], [id=#X]

(26) ShuffleQueryStage
Output [3]: [ps_partkey#X, sum#X, isEmpty#X]
Arguments: X

(27) InputAdapter
Input [3]: [ps_partkey#X, sum#X, isEmpty#X]

(28) InputIteratorTransformer
Input [3]: [ps_partkey#X, sum#X, isEmpty#X]

(29) RegularHashAggregateExecTransformer
Input [3]: [ps_partkey#X, sum#X, isEmpty#X]
Keys [1]: [ps_partkey#X]
Functions [1]: [sum(CheckOverflow((promote_precision(ps_supplycost#X) * promote_precision(cast(cast(ps_availqty#X as decimal(10,0)) as decimal(12,2)))), DecimalType(23,2), true))]
Aggregate Attributes [1]: [sum(CheckOverflow((promote_precision(ps_supplycost#X) * promote_precision(cast(cast(ps_availqty#X as decimal(10,0)) as decimal(12,2)))), DecimalType(23,2), true))#X]
Results [2]: [ps_partkey#X, sum(CheckOverflow((promote_precision(ps_supplycost#X) * promote_precision(cast(cast(ps_availqty#X as decimal(10,0)) as decimal(12,2)))), DecimalType(23,2), true))#X AS value#X]

(30) FilterExecTransformer
Input [2]: [ps_partkey#X, value#X]
Arguments: (isnotnull(value#X) AND (cast(value#X as decimal(38,6)) > Subquery subquery#X, [id=#X]))

(31) WholeStageCodegenTransformer (X)
Input [2]: [ps_partkey#X, value#X]
Arguments: false

(32) ColumnarExchange
Input [2]: [ps_partkey#X, value#X]
Arguments: rangepartitioning(value#X DESC NULLS LAST, 1), ENSURE_REQUIREMENTS, [plan_id=X], [id=#X]

(33) ShuffleQueryStage
Output [2]: [ps_partkey#X, value#X]
Arguments: X

(34) InputAdapter
Input [2]: [ps_partkey#X, value#X]

(35) InputIteratorTransformer
Input [2]: [ps_partkey#X, value#X]

(36) SortExecTransformer
Input [2]: [ps_partkey#X, value#X]
Arguments: [value#X DESC NULLS LAST], true, 0

(37) WholeStageCodegenTransformer (X)
Input [2]: [ps_partkey#X, value#X]
Arguments: false

(38) VeloxColumnarToRowExec
Input [2]: [ps_partkey#X, value#X]

(39) Scan parquet
Output [4]: [ps_partkey#X, ps_suppkey#X, ps_availqty#X, ps_supplycost#X]
Batched: true
Location: InMemoryFileIndex [*]
PushedFilters: [IsNotNull(ps_suppkey)]
ReadSchema: struct<ps_partkey:bigint,ps_suppkey:bigint,ps_availqty:int,ps_supplycost:decimal(12,2)>

(40) Filter
Input [4]: [ps_partkey#X, ps_suppkey#X, ps_availqty#X, ps_supplycost#X]
Condition : isnotnull(ps_suppkey#X)

(41) Scan parquet
Output [2]: [s_suppkey#X, s_nationkey#X]
Batched: true
Location: InMemoryFileIndex [*]
PushedFilters: [IsNotNull(s_suppkey), IsNotNull(s_nationkey)]
ReadSchema: struct<s_suppkey:bigint,s_nationkey:bigint>

(42) Filter
Input [2]: [s_suppkey#X, s_nationkey#X]
Condition : (isnotnull(s_suppkey#X) AND isnotnull(s_nationkey#X))

(43) BroadcastExchange
Input [2]: [s_suppkey#X, s_nationkey#X]
Arguments: HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [plan_id=X]

(44) BroadcastHashJoin
Left keys [1]: [ps_suppkey#X]
Right keys [1]: [s_suppkey#X]
Join condition: None

(45) Project
Output [4]: [ps_partkey#X, ps_availqty#X, ps_supplycost#X, s_nationkey#X]
Input [6]: [ps_partkey#X, ps_suppkey#X, ps_availqty#X, ps_supplycost#X, s_suppkey#X, s_nationkey#X]

(46) Scan parquet
Output [2]: [n_nationkey#X, n_name#X]
Batched: true
Location: InMemoryFileIndex [*]
PushedFilters: [IsNotNull(n_name), EqualTo(n_name,GERMANY), IsNotNull(n_nationkey)]
ReadSchema: struct<n_nationkey:bigint,n_name:string>

(47) Filter
Input [2]: [n_nationkey#X, n_name#X]
Condition : ((isnotnull(n_name#X) AND (n_name#X = GERMANY)) AND isnotnull(n_nationkey#X))

(48) Project
Output [1]: [n_nationkey#X]
Input [2]: [n_nationkey#X, n_name#X]

(49) BroadcastExchange
Input [1]: [n_nationkey#X]
Arguments: HashedRelationBroadcastMode(List(input[0, bigint, true]),false), [plan_id=X]

(50) BroadcastHashJoin
Left keys [1]: [s_nationkey#X]
Right keys [1]: [n_nationkey#X]
Join condition: None

(51) Project
Output [3]: [ps_partkey#X, ps_availqty#X, ps_supplycost#X]
Input [5]: [ps_partkey#X, ps_availqty#X, ps_supplycost#X, s_nationkey#X, n_nationkey#X]

(52) HashAggregate
Input [3]: [ps_partkey#X, ps_availqty#X, ps_supplycost#X]
Keys [1]: [ps_partkey#X]
Functions [1]: [partial_sum(CheckOverflow((promote_precision(ps_supplycost#X) * promote_precision(cast(cast(ps_availqty#X as decimal(10,0)) as decimal(12,2)))), DecimalType(23,2), true))]
Aggregate Attributes [2]: [sum#X, isEmpty#X]
Results [3]: [ps_partkey#X, sum#X, isEmpty#X]

(53) Exchange
Input [3]: [ps_partkey#X, sum#X, isEmpty#X]
Arguments: hashpartitioning(ps_partkey#X, 1), ENSURE_REQUIREMENTS, [plan_id=X]

(54) HashAggregate
Input [3]: [ps_partkey#X, sum#X, isEmpty#X]
Keys [1]: [ps_partkey#X]
Functions [1]: [sum(CheckOverflow((promote_precision(ps_supplycost#X) * promote_precision(cast(cast(ps_availqty#X as decimal(10,0)) as decimal(12,2)))), DecimalType(23,2), true))]
Aggregate Attributes [1]: [sum(CheckOverflow((promote_precision(ps_supplycost#X) * promote_precision(cast(cast(ps_availqty#X as decimal(10,0)) as decimal(12,2)))), DecimalType(23,2), true))#X]
Results [2]: [ps_partkey#X, sum(CheckOverflow((promote_precision(ps_supplycost#X) * promote_precision(cast(cast(ps_availqty#X as decimal(10,0)) as decimal(12,2)))), DecimalType(23,2), true))#X AS value#X]

(55) Filter
Input [2]: [ps_partkey#X, value#X]
Condition : (isnotnull(value#X) AND (cast(value#X as decimal(38,6)) > Subquery subquery#X, [id=#X]))

(56) Exchange
Input [2]: [ps_partkey#X, value#X]
Arguments: rangepartitioning(value#X DESC NULLS LAST, 1), ENSURE_REQUIREMENTS, [plan_id=X]

(57) Sort
Input [2]: [ps_partkey#X, value#X]
Arguments: [value#X DESC NULLS LAST], true, 0

(58) AdaptiveSparkPlan
Output [2]: [ps_partkey#X, value#X]
Arguments: isFinalPlan=true
Loading

0 comments on commit 8d23a30

Please sign in to comment.