Skip to content

Commit

Permalink
opt: add rule to merge GroupBy and Window
Browse files Browse the repository at this point in the history
This commit adds a new norm rule, `FoldGroupByAndWindow`, which can
merge a Window operator with a parent GroupBy operator when the grouping
columns are the same as the partition columns. See the rule comment for
the complete list of conditions. In addition to removing a potentially
expensive Window operator, this transformation makes way for other rules
to match.

Fixes #113292

Release note: None
  • Loading branch information
DrewKimball committed Dec 12, 2024
1 parent 6a3221f commit 147054d
Show file tree
Hide file tree
Showing 8 changed files with 713 additions and 73 deletions.
2 changes: 1 addition & 1 deletion pkg/bench/rttanalysis/testdata/benchmark_expectations
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ exp,benchmark
3,Jobs/show_job
3-5,Jobs/show_jobs
3,ORMQueries/activerecord_type_introspection_query
0,ORMQueries/asyncpg_types
4,ORMQueries/asyncpg_types
6,ORMQueries/column_descriptions_json_agg
4,ORMQueries/django_column_introspection_1_table
4,ORMQueries/django_column_introspection_4_tables
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/logictest/testdata/logic_test/pg_catalog
Original file line number Diff line number Diff line change
Expand Up @@ -4490,6 +4490,7 @@ FROM (
WHERE c.relname = 'indexes_table'
) s2
GROUP BY indexname, indisunique, indisprimary, amname, exprdef, attoptions
ORDER BY indexname
----
indexname array_agg indisunique indisprimary array_agg amname exprdef attoptions
indexes_include_idx {a,c,d} false false {ASC,ASC,ASC} prefix NULL NULL
Expand Down
65 changes: 34 additions & 31 deletions pkg/sql/opt/exec/execbuilder/testdata/explain
Original file line number Diff line number Diff line change
Expand Up @@ -560,7 +560,8 @@ vectorized: true
└── • group (hash)
│ group by: column_name, ordinal_position, column_default, is_nullable, generation_expression, is_hidden, crdb_sql_type
└── • window
└── • sort
│ order: +index_name
└── • hash join (left outer)
│ equality: (column_name) = (column_name)
Expand Down Expand Up @@ -734,39 +735,41 @@ vectorized: true
│ estimated row count: 3
│ order: +"role"
└── • hash join (left outer)
└── • merge join (right outer)
│ estimated row count: 3
│ equality: (username) = (member)
left cols are key
│ equality: (member) = (username)
right cols are key
├── • group (hash)
│ │ estimated row count: 3
│ │ group by: username
│ │
│ └── • window
│ │ estimated row count: 3
│ │
│ └── • render
│ │
│ └── • hash join (left outer)
│ │ estimated row count: 3
│ │ equality: (username) = (username)
│ │ left cols are key
│ │
│ ├── • scan
│ │ estimated row count: 3 (100% of the table; stats collected <hidden> ago)
│ │ table: users@users_user_id_idx
│ │ spans: FULL SCAN
│ │
│ └── • scan
│ estimated row count: 1 (100% of the table; stats collected <hidden> ago)
│ table: role_options@primary
│ spans: FULL SCAN
├── • scan
│ estimated row count: 1 (100% of the table; stats collected <hidden> ago)
│ table: role_members@role_members_member_idx
│ spans: FULL SCAN
└── • scan
estimated row count: 1 (100% of the table; stats collected <hidden> ago)
table: role_members@role_members_role_idx
spans: FULL SCAN
└── • group (streaming)
│ estimated row count: 3
│ group by: username
│ ordered: +username
└── • sort
│ estimated row count: 3
│ order: +username,+option
└── • render
└── • hash join (left outer)
│ estimated row count: 3
│ equality: (username) = (username)
│ left cols are key
├── • scan
│ estimated row count: 3 (100% of the table; stats collected <hidden> ago)
│ table: users@users_user_id_idx
│ spans: FULL SCAN
└── • scan
estimated row count: 1 (100% of the table; stats collected <hidden> ago)
table: role_options@primary
spans: FULL SCAN

# EXPLAIN selecting from a sequence.
statement ok
Expand Down
76 changes: 76 additions & 0 deletions pkg/sql/opt/norm/groupby_funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,82 @@ func (c *CustomFuncs) MergeAggs(
return newAggs
}

// CanMergeAggsAndWindow returns true if all the given aggregations satisfy one
// of the following conditions:
// 1. Reference only columns from the input of the Window operator.
// 2. Is a ConstAgg (or similar) that references an input aggregate window
// function.
//
// CanMergeAggsAndWindow expects that all the window functions have been
// verified to be aggregate functions.
func (c *CustomFuncs) CanMergeAggsAndWindow(
aggs memo.AggregationsExpr, windows memo.WindowsExpr, inputCols opt.ColSet,
) bool {
// Collect the columns produced by the window functions.
var windowCols opt.ColSet
for i := range windows {
windowCols.Add(windows[i].Col)
}
for i := range aggs {
if memo.ExtractAggInputColumns(aggs[i].Agg).SubsetOf(inputCols) {
// Condition 1: the aggregate function only references columns from the
// input of the Window operator. It will not be affected by a merge.
// In this case, it doesn't matter what the aggregate is, since it won't
// be modified in any way.
//
// Note that unlike for CanMergeAggs, it is not necessary to check for
// duplicate sensitivity. This is because window operators do not group or
// duplicate rows.
continue
}
// Condition 2: the aggregate function must be a AnyNotNullAgg, ConstAgg,
// ConstNotNullAgg, or FirstAggOp that references a window function.
switch aggs[i].Agg.Op() {
case opt.AnyNotNullAggOp, opt.ConstAggOp, opt.ConstNotNullAggOp, opt.FirstAggOp:
// Ensure that the input to the aggregation is a direct reference to a
// window function, with no intervening logic.
ref, ok := aggs[i].Agg.Child(0).(*memo.VariableExpr)
if !ok {
return false
}
if !windowCols.Contains(ref.Col) {
return false
}
default:
return false
}
}
return true
}

// MergeAggsAndWindow returns an AggregationsExpr that is equivalent to the
// combination of the given (outer) aggregations and (inner) window functions.
// ConstAgg-like outer aggregations that reference a window function are
// replaced with that window function.
//
// MergeAggs will panic if CanMergeAggs is false. It also expects the given
// window functions to all be aggregate functions.
func (c *CustomFuncs) MergeAggsAndWindow(
aggs memo.AggregationsExpr, windows memo.WindowsExpr, inputCols opt.ColSet,
) memo.AggregationsExpr {
// Create a mapping from column IDs to the window functions that produce them.
colsToWindowFuncs := map[opt.ColumnID]opt.ScalarExpr{}
for i := range windows {
colsToWindowFuncs[windows[i].Col] = windows[i].Function
}
newAggs := make(memo.AggregationsExpr, len(aggs))
for i := range aggs {
aggCols := memo.ExtractAggInputColumns(aggs[i].Agg)
if aggCols.SubsetOf(inputCols) {
newAggs[i] = aggs[i]
continue
}
windowFunc := colsToWindowFuncs[aggCols.SingleColumn()]
newAggs[i] = c.f.ConstructAggregationsItem(windowFunc, aggs[i].Col)
}
return newAggs
}

// CanEliminateJoinUnderGroupByLeft returns true if the given join can be
// eliminated and replaced by its left input. It should be called only when the
// join is under a grouping operator that is only using columns from the join's
Expand Down
80 changes: 80 additions & 0 deletions pkg/sql/opt/norm/rules/groupby.opt
Original file line number Diff line number Diff line change
Expand Up @@ -587,3 +587,83 @@
(MergeAggs $innerAggs $outerAggs $innerGroupingCols)
(MakeGrouping $outerGroupingCols (EmptyOrdering))
)

# FoldGroupByAndWindow merges a GroupBy operator with an input Window operator.
# This is possible when the following conditions are satisfied:
#
# 1. The GroupBy is unordered. This may not technically be necessary, but
# avoids complication in determining the correctness of ordering-sensitive
# aggregations.
#
# 2. The window function output cols are functionally determined by the
# partition-by cols. This means that the window function outputs the
# same value for every row in the partition (group).
#
# 3. The Window operator partition-by cols and grouping cols are the same.
# This ensures that an aggregate operator will act on the same set of rows,
# whether it is part of the Window operator or the GroupBy operator.
#
# 4. The window functions are all aggregate functions. This ensures they are
# compatible with GroupBy operators.
#
# 5. Finally, all of the GroupBy's aggregations must satisfy one of two cases:
# a. The aggregate only references cols from the Window operator's input.
# b. The aggregate is a ConstAgg (or ConstNotNull, AnyNotNull, or FirstAgg)
# that passes through the result of a window function.
#
# Assuming all of the above are satisfied, each GroupBy aggregate that only
# references the Window's input can be left alone (5a). Then, each ConstAgg
# referencing a window function can be replaced by that function (5b).
#
# Here's an example with slightly altered SQL syntax:
#
# SELECT max(b), const_agg(foo), const_agg(bar)
# FROM
# (
# SELECT *, count(c) OVER w AS foo, array_agg(d) OVER w AS bar
# FROM abcd
# WINDOW w AS (
# PARTITION BY a ORDER BY d
# RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
# )
# )
# GROUP BY a;
# =>
# SELECT max(b), count(c), array_agg(d ORDER BY d) FROM abcd GROUP BY a;
#
# Note also that the Window's ordering should be preserved by the GroupBy to
# ensure that ordering-sensitive aggregates produce correct results.
[FoldGroupByAndWindow, Normalize]
(GroupBy | ScalarGroupBy
$window:(Window
$input:*
$windows:* & (WindowsAreAggregations $windows)
$windowPrivate:*
) &
(ColsAreDeterminedBy
(WindowFuncOutputCols $windows)
$partitionByCols:(WindowPartition $windowPrivate)
$window
)
$aggs:* &
(CanMergeAggsAndWindow
$aggs
$windows
$inputCols:(OutputCols $input)
)
$groupingPrivate:* &
(IsUnorderedGrouping $groupingPrivate) &
(ColsAreEqual
$groupingCols:(GroupingCols $groupingPrivate)
$partitionByCols
)
)
=>
((OpName)
$input
(MergeAggsAndWindow $aggs $windows $inputCols)
(MakeGrouping
(GroupingCols $groupingPrivate)
(WindowOrdering $windowPrivate)
)
)
Loading

0 comments on commit 147054d

Please sign in to comment.