Skip to content

Commit

Permalink
opt: add implicit FOR UPDATE locks for generic query plans
Browse files Browse the repository at this point in the history
Implicit `FOR UPDATE` locks are now added to the reads in
`INSERT .. ON CONFLICT`, `UPSERT`, `UPDATE`, and `DELETE` queries when
those mutations use generic query plans.

Fixes #137352

Release note: None
  • Loading branch information
mgartner committed Dec 13, 2024
1 parent 3878bf6 commit b02c286
Show file tree
Hide file tree
Showing 6 changed files with 572 additions and 7 deletions.
34 changes: 27 additions & 7 deletions pkg/sql/opt/exec/execbuilder/mutation.go
Original file line number Diff line number Diff line change
Expand Up @@ -1144,13 +1144,13 @@ func (b *Builder) shouldApplyImplicitLockingToMutationInput(
return 0, nil

case *memo.UpdateExpr:
return shouldApplyImplicitLockingToUpdateOrDeleteInput(t), nil
return shouldApplyImplicitLockingToUpdateOrDeleteInput(t.Input, t.Table), nil

case *memo.UpsertExpr:
return shouldApplyImplicitLockingToUpsertInput(t), nil

case *memo.DeleteExpr:
return shouldApplyImplicitLockingToUpdateOrDeleteInput(t), nil
return shouldApplyImplicitLockingToUpdateOrDeleteInput(t.Input, t.Table), nil

default:
return 0, errors.AssertionFailedf("unexpected mutation expression %T", t)
Expand Down Expand Up @@ -1183,18 +1183,31 @@ func (b *Builder) shouldApplyImplicitLockingToMutationInput(
//
// UPDATEs and DELETEs happen to have exactly the same matching pattern, so we
// reuse this function for both.
func shouldApplyImplicitLockingToUpdateOrDeleteInput(mutExpr memo.RelExpr) opt.TableID {
func shouldApplyImplicitLockingToUpdateOrDeleteInput(
input memo.RelExpr, tabID opt.TableID,
) opt.TableID {
// Try to match the mutation's input expression against the pattern:
//
// [Project]* [IndexJoin] Scan
//
input := mutExpr.Child(0).(memo.RelExpr)
// input := mutExpr.Child(0).(memo.RelExpr)
input = unwrapProjectExprs(input)
if idxJoin, ok := input.(*memo.IndexJoinExpr); ok {
input = idxJoin.Input
}
if scan, ok := input.(*memo.ScanExpr); ok {
return scan.Table
switch t := input.(type) {
case *memo.ScanExpr:
return t.Table
case *memo.LookupJoinExpr:
md := input.Memo().Metadata()
mutStableID := md.Table(tabID).ID()
lookupStableID := md.Table(t.Table).ID()
// Only lock rows read in the lookup join if the lookup table is the
// same as the table being updated. Also, don't lock rows if there is an
// ON condition so that we don't lock rows that won't be updated.
if mutStableID == lookupStableID && t.On.IsTrue() {
return t.Table
}
}
return 0
}
Expand All @@ -1206,7 +1219,7 @@ func shouldApplyImplicitLockingToUpdateOrDeleteInput(mutExpr memo.RelExpr) opt.T
func shouldApplyImplicitLockingToUpsertInput(ups *memo.UpsertExpr) opt.TableID {
// Try to match the Upsert's input expression against the pattern:
//
// [Project]* (LeftJoin Scan | LookupJoin) [Project]* Values
// [Project]* (LeftJoin Scan | LookupJoin [LookupJoin] ) [Project]* Values
//
input := ups.Input
input = unwrapProjectExprs(input)
Expand All @@ -1222,6 +1235,13 @@ func shouldApplyImplicitLockingToUpsertInput(ups *memo.UpsertExpr) opt.TableID {

case *memo.LookupJoinExpr:
input = join.Input
if inner, ok := input.(*memo.LookupJoinExpr); ok && inner.Table == join.Table {
// When a generic query plan reads from a secondary index first,
// then performs a lookup into the primary index, the plan has a
// double lookup join pattern. We add implicit locks in this case
// where both lookup joins have the same table.
input = inner.Input
}
toLock = join.Table

default:
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/opt/exec/execbuilder/testdata/cascade
Original file line number Diff line number Diff line change
Expand Up @@ -1029,6 +1029,7 @@ vectorized: true
│ └── • lookup join
│ │ table: loop_a@loop_a_cascade_delete_idx
│ │ equality: (id) = (cascade_delete)
│ │ locking strength: for update
│ │
│ └── • distinct
│ │ estimated row count: 10
Expand Down Expand Up @@ -1168,6 +1169,7 @@ quality of service: regular
│ │ estimated max memory allocated: 0 B
│ │ table: loop_a@loop_a_cascade_delete_idx
│ │ equality: (id) = (cascade_delete)
│ │ locking strength: for update
│ │
│ └── • distinct
│ │ sql nodes: <hidden>
Expand Down
125 changes: 125 additions & 0 deletions pkg/sql/opt/exec/execbuilder/testdata/delete
Original file line number Diff line number Diff line change
Expand Up @@ -388,3 +388,128 @@ vectorized: true
estimated row count: 333 (missing stats)
table: b@b_pkey
spans: /2-

# Test that FOR UPDATE locks are applied to reads in generic query plans.

statement ok
CREATE TABLE t137352 (
k INT PRIMARY KEY,
a INT,
b INT,
INDEX (a),
INDEX (b)
)

statement ok
SET plan_cache_mode = force_generic_plan

statement ok
PREPARE p AS DELETE FROM t137352 WHERE k = $1

query T
EXPLAIN ANALYZE EXECUTE p(1)
----
planning time: 10µs
execution time: 100µs
distribution: <hidden>
vectorized: <hidden>
plan type: generic, re-optimized
maximum memory usage: <hidden>
network usage: <hidden>
regions: <hidden>
isolation level: serializable
priority: normal
quality of service: regular
·
• delete
│ sql nodes: <hidden>
│ regions: <hidden>
│ actual row count: 1
│ from: t137352
│ auto commit
└── • lookup join
│ sql nodes: <hidden>
│ kv nodes: <hidden>
│ regions: <hidden>
│ actual row count: 0
│ KV time: 0µs
│ KV contention time: 0µs
│ KV rows decoded: 0
│ KV bytes read: 0 B
│ KV gRPC calls: 0
│ estimated max memory allocated: 0 B
│ table: t137352@t137352_pkey
│ equality: ($1) = (k)
│ equality cols are key
│ locking strength: for update
└── • values
sql nodes: <hidden>
regions: <hidden>
actual row count: 1
size: 1 column, 1 row

statement ok
DEALLOCATE p

statement ok
PREPARE p AS DELETE FROM t137352 WHERE a = $1

query T
EXPLAIN ANALYZE EXECUTE p(10)
----
planning time: 10µs
execution time: 100µs
distribution: <hidden>
vectorized: <hidden>
plan type: generic, re-optimized
maximum memory usage: <hidden>
network usage: <hidden>
regions: <hidden>
isolation level: serializable
priority: normal
quality of service: regular
·
• delete
│ sql nodes: <hidden>
│ regions: <hidden>
│ actual row count: 1
│ from: t137352
│ auto commit
└── • lookup join
│ sql nodes: <hidden>
│ regions: <hidden>
│ actual row count: 0
│ KV time: 0µs
│ KV contention time: 0µs
│ KV rows decoded: 0
│ KV bytes read: 0 B
│ KV gRPC calls: 0
│ estimated max memory allocated: 0 B
│ table: t137352@t137352_pkey
│ equality: (k) = (k)
│ equality cols are key
│ locking strength: for update
└── • lookup join
│ sql nodes: <hidden>
│ kv nodes: <hidden>
│ regions: <hidden>
│ actual row count: 0
│ KV time: 0µs
│ KV contention time: 0µs
│ KV rows decoded: 0
│ KV bytes read: 0 B
│ KV gRPC calls: 0
│ estimated max memory allocated: 0 B
│ table: t137352@t137352_a_idx
│ equality: ($1) = (a)
│ locking strength: for update
└── • values
sql nodes: <hidden>
regions: <hidden>
actual row count: 1
size: 1 column, 1 row
1 change: 1 addition & 0 deletions pkg/sql/opt/exec/execbuilder/testdata/not_visible_index
Original file line number Diff line number Diff line change
Expand Up @@ -1103,6 +1103,7 @@ vectorized: true
│ └── • lookup join
│ │ table: child_delete@c_delete_idx_invisible
│ │ equality: (p) = (p)
│ │ locking strength: for update
│ │
│ └── • distinct
│ │ estimated row count: 10
Expand Down
Loading

0 comments on commit b02c286

Please sign in to comment.