diff --git a/pkg/sql/opt/exec/execbuilder/mutation.go b/pkg/sql/opt/exec/execbuilder/mutation.go index f1d8df8d973a..441597e1df45 100644 --- a/pkg/sql/opt/exec/execbuilder/mutation.go +++ b/pkg/sql/opt/exec/execbuilder/mutation.go @@ -1144,13 +1144,13 @@ func (b *Builder) shouldApplyImplicitLockingToMutationInput( return 0, nil case *memo.UpdateExpr: - return shouldApplyImplicitLockingToUpdateOrDeleteInput(t), nil + return shouldApplyImplicitLockingToUpdateOrDeleteInput(t.Input, t.Table), nil case *memo.UpsertExpr: return shouldApplyImplicitLockingToUpsertInput(t), nil case *memo.DeleteExpr: - return shouldApplyImplicitLockingToUpdateOrDeleteInput(t), nil + return shouldApplyImplicitLockingToUpdateOrDeleteInput(t.Input, t.Table), nil default: return 0, errors.AssertionFailedf("unexpected mutation expression %T", t) @@ -1183,18 +1183,31 @@ func (b *Builder) shouldApplyImplicitLockingToMutationInput( // // UPDATEs and DELETEs happen to have exactly the same matching pattern, so we // reuse this function for both. -func shouldApplyImplicitLockingToUpdateOrDeleteInput(mutExpr memo.RelExpr) opt.TableID { +func shouldApplyImplicitLockingToUpdateOrDeleteInput( + input memo.RelExpr, tabID opt.TableID, +) opt.TableID { // Try to match the mutation's input expression against the pattern: // // [Project]* [IndexJoin] Scan // - input := mutExpr.Child(0).(memo.RelExpr) + // input := mutExpr.Child(0).(memo.RelExpr) input = unwrapProjectExprs(input) if idxJoin, ok := input.(*memo.IndexJoinExpr); ok { input = idxJoin.Input } - if scan, ok := input.(*memo.ScanExpr); ok { - return scan.Table + switch t := input.(type) { + case *memo.ScanExpr: + return t.Table + case *memo.LookupJoinExpr: + md := input.Memo().Metadata() + mutStableID := md.Table(tabID).ID() + lookupStableID := md.Table(t.Table).ID() + // Only lock rows read in the lookup join if the lookup table is the + // same as the table being updated. Also, don't lock rows if there is an + // ON condition so that we don't lock rows that won't be updated. + if mutStableID == lookupStableID && t.On.IsTrue() { + return t.Table + } } return 0 } @@ -1206,7 +1219,7 @@ func shouldApplyImplicitLockingToUpdateOrDeleteInput(mutExpr memo.RelExpr) opt.T func shouldApplyImplicitLockingToUpsertInput(ups *memo.UpsertExpr) opt.TableID { // Try to match the Upsert's input expression against the pattern: // - // [Project]* (LeftJoin Scan | LookupJoin) [Project]* Values + // [Project]* (LeftJoin Scan | LookupJoin [LookupJoin] ) [Project]* Values // input := ups.Input input = unwrapProjectExprs(input) @@ -1222,6 +1235,13 @@ func shouldApplyImplicitLockingToUpsertInput(ups *memo.UpsertExpr) opt.TableID { case *memo.LookupJoinExpr: input = join.Input + if inner, ok := input.(*memo.LookupJoinExpr); ok && inner.Table == join.Table { + // When a generic query plan reads from a secondary index first, + // then performs a lookup into the primary index, the plan has a + // double lookup join pattern. We add implicit locks in this case + // where both lookup joins have the same table. + input = inner.Input + } toLock = join.Table default: diff --git a/pkg/sql/opt/exec/execbuilder/testdata/cascade b/pkg/sql/opt/exec/execbuilder/testdata/cascade index 23ca5d3178d8..6344f5824ebb 100644 --- a/pkg/sql/opt/exec/execbuilder/testdata/cascade +++ b/pkg/sql/opt/exec/execbuilder/testdata/cascade @@ -1029,6 +1029,7 @@ vectorized: true │ └── • lookup join │ │ table: loop_a@loop_a_cascade_delete_idx │ │ equality: (id) = (cascade_delete) + │ │ locking strength: for update │ │ │ └── • distinct │ │ estimated row count: 10 @@ -1168,6 +1169,7 @@ quality of service: regular │ │ estimated max memory allocated: 0 B │ │ table: loop_a@loop_a_cascade_delete_idx │ │ equality: (id) = (cascade_delete) + │ │ locking strength: for update │ │ │ └── • distinct │ │ sql nodes: diff --git a/pkg/sql/opt/exec/execbuilder/testdata/delete b/pkg/sql/opt/exec/execbuilder/testdata/delete index cdd0ead9b493..c2c6f253dce3 100644 --- a/pkg/sql/opt/exec/execbuilder/testdata/delete +++ b/pkg/sql/opt/exec/execbuilder/testdata/delete @@ -388,3 +388,128 @@ vectorized: true estimated row count: 333 (missing stats) table: b@b_pkey spans: /2- + +# Test that FOR UPDATE locks are applied to reads in generic query plans. + +statement ok +CREATE TABLE t137352 ( + k INT PRIMARY KEY, + a INT, + b INT, + INDEX (a), + INDEX (b) +) + +statement ok +SET plan_cache_mode = force_generic_plan + +statement ok +PREPARE p AS DELETE FROM t137352 WHERE k = $1 + +query T +EXPLAIN ANALYZE EXECUTE p(1) +---- +planning time: 10µs +execution time: 100µs +distribution: +vectorized: +plan type: generic, re-optimized +maximum memory usage: +network usage: +regions: +isolation level: serializable +priority: normal +quality of service: regular +· +• delete +│ sql nodes: +│ regions: +│ actual row count: 1 +│ from: t137352 +│ auto commit +│ +└── • lookup join + │ sql nodes: + │ kv nodes: + │ regions: + │ actual row count: 0 + │ KV time: 0µs + │ KV contention time: 0µs + │ KV rows decoded: 0 + │ KV bytes read: 0 B + │ KV gRPC calls: 0 + │ estimated max memory allocated: 0 B + │ table: t137352@t137352_pkey + │ equality: ($1) = (k) + │ equality cols are key + │ locking strength: for update + │ + └── • values + sql nodes: + regions: + actual row count: 1 + size: 1 column, 1 row + +statement ok +DEALLOCATE p + +statement ok +PREPARE p AS DELETE FROM t137352 WHERE a = $1 + +query T +EXPLAIN ANALYZE EXECUTE p(10) +---- +planning time: 10µs +execution time: 100µs +distribution: +vectorized: +plan type: generic, re-optimized +maximum memory usage: +network usage: +regions: +isolation level: serializable +priority: normal +quality of service: regular +· +• delete +│ sql nodes: +│ regions: +│ actual row count: 1 +│ from: t137352 +│ auto commit +│ +└── • lookup join + │ sql nodes: + │ regions: + │ actual row count: 0 + │ KV time: 0µs + │ KV contention time: 0µs + │ KV rows decoded: 0 + │ KV bytes read: 0 B + │ KV gRPC calls: 0 + │ estimated max memory allocated: 0 B + │ table: t137352@t137352_pkey + │ equality: (k) = (k) + │ equality cols are key + │ locking strength: for update + │ + └── • lookup join + │ sql nodes: + │ kv nodes: + │ regions: + │ actual row count: 0 + │ KV time: 0µs + │ KV contention time: 0µs + │ KV rows decoded: 0 + │ KV bytes read: 0 B + │ KV gRPC calls: 0 + │ estimated max memory allocated: 0 B + │ table: t137352@t137352_a_idx + │ equality: ($1) = (a) + │ locking strength: for update + │ + └── • values + sql nodes: + regions: + actual row count: 1 + size: 1 column, 1 row diff --git a/pkg/sql/opt/exec/execbuilder/testdata/not_visible_index b/pkg/sql/opt/exec/execbuilder/testdata/not_visible_index index 429918cd8698..a06535322da8 100644 --- a/pkg/sql/opt/exec/execbuilder/testdata/not_visible_index +++ b/pkg/sql/opt/exec/execbuilder/testdata/not_visible_index @@ -1103,6 +1103,7 @@ vectorized: true │ └── • lookup join │ │ table: child_delete@c_delete_idx_invisible │ │ equality: (p) = (p) +│ │ locking strength: for update │ │ │ └── • distinct │ │ estimated row count: 10 diff --git a/pkg/sql/opt/exec/execbuilder/testdata/update b/pkg/sql/opt/exec/execbuilder/testdata/update index 1b80230ce7c7..c8c8ed5ec832 100644 --- a/pkg/sql/opt/exec/execbuilder/testdata/update +++ b/pkg/sql/opt/exec/execbuilder/testdata/update @@ -899,3 +899,287 @@ vectorized: true missing stats table: t121322_c@t121322_c_pkey spans: FULL SCAN + +# Test that FOR UPDATE locks are applied to reads in generic query plans. + +statement ok +CREATE TABLE t137352 ( + k INT PRIMARY KEY, + a INT, + b INT, + INDEX (a), + INDEX (b) +) + +statement ok +SET plan_cache_mode = force_generic_plan + +statement ok +PREPARE p AS UPDATE t137352 SET b = $2 WHERE k = $1 + +query T +EXPLAIN ANALYZE EXECUTE p(1, 10) +---- +planning time: 10µs +execution time: 100µs +distribution: +vectorized: +plan type: generic, re-optimized +maximum memory usage: +network usage: +regions: +isolation level: serializable +priority: normal +quality of service: regular +· +• update +│ sql nodes: +│ regions: +│ actual row count: 1 +│ table: t137352 +│ set: b +│ auto commit +│ +└── • render + │ + └── • lookup join + │ sql nodes: + │ kv nodes: + │ regions: + │ actual row count: 0 + │ KV time: 0µs + │ KV contention time: 0µs + │ KV rows decoded: 0 + │ KV bytes read: 0 B + │ KV gRPC calls: 0 + │ estimated max memory allocated: 0 B + │ table: t137352@t137352_pkey + │ equality: ($1) = (k) + │ equality cols are key + │ locking strength: for update + │ + └── • values + sql nodes: + regions: + actual row count: 1 + size: 1 column, 1 row + +statement ok +DEALLOCATE p + +statement ok +PREPARE p AS UPDATE t137352 SET b = $2 WHERE a = $1 + +query T +EXPLAIN ANALYZE EXECUTE p(10, 100) +---- +planning time: 10µs +execution time: 100µs +distribution: +vectorized: +plan type: generic, re-optimized +maximum memory usage: +network usage: +regions: +isolation level: serializable +priority: normal +quality of service: regular +· +• update +│ sql nodes: +│ regions: +│ actual row count: 1 +│ table: t137352 +│ set: b +│ auto commit +│ +└── • render + │ + └── • lookup join + │ sql nodes: + │ regions: + │ actual row count: 0 + │ KV time: 0µs + │ KV contention time: 0µs + │ KV rows decoded: 0 + │ KV bytes read: 0 B + │ KV gRPC calls: 0 + │ estimated max memory allocated: 0 B + │ table: t137352@t137352_pkey + │ equality: (k) = (k) + │ equality cols are key + │ locking strength: for update + │ + └── • lookup join + │ sql nodes: + │ kv nodes: + │ regions: + │ actual row count: 0 + │ KV time: 0µs + │ KV contention time: 0µs + │ KV rows decoded: 0 + │ KV bytes read: 0 B + │ KV gRPC calls: 0 + │ estimated max memory allocated: 0 B + │ table: t137352@t137352_a_idx + │ equality: ($1) = (a) + │ locking strength: for update + │ + └── • values + sql nodes: + regions: + actual row count: 1 + size: 1 column, 1 row + +statement ok +DEALLOCATE p + +statement ok +PREPARE p AS UPDATE t137352 SET b = $2 WHERE k = $1 AND b > 0 + +# Do not apply implicit FOR UPDATE locks if the lookup join has an ON condition. +query T +EXPLAIN ANALYZE EXECUTE p(1, 10) +---- +planning time: 10µs +execution time: 100µs +distribution: +vectorized: +plan type: generic, re-optimized +maximum memory usage: +network usage: +regions: +isolation level: serializable +priority: normal +quality of service: regular +· +• update +│ sql nodes: +│ regions: +│ actual row count: 1 +│ table: t137352 +│ set: b +│ auto commit +│ +└── • render + │ + └── • lookup join + │ sql nodes: + │ kv nodes: + │ regions: + │ actual row count: 0 + │ KV time: 0µs + │ KV contention time: 0µs + │ KV rows decoded: 0 + │ KV bytes read: 0 B + │ KV gRPC calls: 0 + │ estimated max memory allocated: 0 B + │ table: t137352@t137352_pkey + │ equality: ($1) = (k) + │ equality cols are key + │ pred: b > 0 + │ + └── • values + sql nodes: + regions: + actual row count: 1 + size: 1 column, 1 row + +statement ok +DEALLOCATE p + +statement ok +PREPARE p AS UPDATE t137352 t1 SET b = t2.b + 1 FROM t137352 t2 WHERE t1.k = t2.a AND t1.k = $1 + +statement ok +ALTER TABLE t137352 INJECT STATISTICS '[ + { + "columns": ["k"], + "created_at": "2018-01-01 1:00:00.00000+00:00", + "row_count": 500000, + "distinct_count": 500000, + "avg_size": 1 + } +]' + +# TODO(mgartner): Should we add FOR UPDATE locks for any of the reads in a +# self-join? +query T +EXPLAIN ANALYZE EXECUTE p(1) +---- +planning time: 10µs +execution time: 100µs +distribution: +vectorized: +plan type: generic, re-optimized +maximum memory usage: +network usage: +regions: +isolation level: serializable +priority: normal +quality of service: regular +· +• update +│ sql nodes: +│ regions: +│ actual row count: 1 +│ table: t137352 +│ set: b +│ auto commit +│ +└── • render + │ + └── • limit + │ count: 1 + │ + └── • lookup join + │ sql nodes: + │ regions: + │ actual row count: 0 + │ KV time: 0µs + │ KV contention time: 0µs + │ KV rows decoded: 0 + │ KV bytes read: 0 B + │ KV gRPC calls: 0 + │ estimated max memory allocated: 0 B + │ estimated row count: 10 + │ table: t137352@t137352_pkey + │ equality: (k) = (k) + │ equality cols are key + │ + └── • lookup join + │ sql nodes: + │ regions: + │ actual row count: 0 + │ KV time: 0µs + │ KV contention time: 0µs + │ KV rows decoded: 0 + │ KV bytes read: 0 B + │ KV gRPC calls: 0 + │ estimated max memory allocated: 0 B + │ estimated row count: 10 + │ table: t137352@t137352_a_idx + │ equality: (k) = (a) + │ pred: a = 1 + │ + └── • lookup join + │ sql nodes: + │ kv nodes: + │ regions: + │ actual row count: 0 + │ KV time: 0µs + │ KV contention time: 0µs + │ KV rows decoded: 0 + │ KV bytes read: 0 B + │ KV gRPC calls: 0 + │ estimated max memory allocated: 0 B + │ estimated row count: 1 + │ table: t137352@t137352_pkey + │ equality: ($1) = (k) + │ equality cols are key + │ + └── • values + sql nodes: + regions: + actual row count: 1 + size: 1 column, 1 row diff --git a/pkg/sql/opt/exec/execbuilder/testdata/upsert b/pkg/sql/opt/exec/execbuilder/testdata/upsert index 3dc1b7e51e5c..8248ac7aecab 100644 --- a/pkg/sql/opt/exec/execbuilder/testdata/upsert +++ b/pkg/sql/opt/exec/execbuilder/testdata/upsert @@ -990,3 +990,136 @@ vectorized: true missing stats table: t121322_c@t121322_c_pkey spans: FULL SCAN + +# Test that FOR UPDATE locks are applied to reads in generic query plans. + +statement ok +CREATE TABLE t137352 ( + k INT PRIMARY KEY, + a INT, + b INT, + UNIQUE INDEX (a) +) + +statement ok +SET plan_cache_mode = force_generic_plan + +statement ok +PREPARE p AS UPSERT INTO t137352 VALUES ($1, $2, $3) + +query T +EXPLAIN ANALYZE EXECUTE p(1, 10, 100) +---- +planning time: 10µs +execution time: 100µs +distribution: +vectorized: +plan type: generic, re-optimized +maximum memory usage: +network usage: +regions: +isolation level: serializable +priority: normal +quality of service: regular +· +• upsert +│ sql nodes: +│ regions: +│ actual row count: 1 +│ into: t137352(k, a, b) +│ auto commit +│ arbiter indexes: t137352_pkey +│ +└── • lookup join (left outer) + │ sql nodes: + │ kv nodes: + │ regions: + │ actual row count: 1 + │ KV time: 0µs + │ KV contention time: 0µs + │ KV rows decoded: 0 + │ KV bytes read: 0 B + │ KV gRPC calls: 0 + │ estimated max memory allocated: 0 B + │ table: t137352@t137352_pkey + │ equality: (column1) = (k) + │ equality cols are key + │ locking strength: for update + │ + └── • values + sql nodes: + regions: + actual row count: 1 + size: 3 columns, 1 row + +statement ok +DEALLOCATE p + +statement ok +PREPARE p AS INSERT INTO t137352 VALUES ($1, $2, $3) ON CONFLICT (a) DO UPDATE SET b = $3 + +query T +EXPLAIN ANALYZE EXECUTE p(1, 10, 100) +---- +planning time: 10µs +execution time: 100µs +distribution: +vectorized: +plan type: generic, re-optimized +rows decoded from KV: 2 (16 B, 4 KVs, 2 gRPC calls) +maximum memory usage: +network usage: +regions: +isolation level: serializable +priority: normal +quality of service: regular +· +• upsert +│ sql nodes: +│ regions: +│ actual row count: 1 +│ into: t137352(k, a, b) +│ auto commit +│ arbiter indexes: t137352_a_key +│ +└── • render + │ + └── • lookup join (left outer) + │ sql nodes: + │ kv nodes: + │ regions: + │ actual row count: 1 + │ KV time: 0µs + │ KV contention time: 0µs + │ KV rows decoded: 1 + │ KV pairs read: 2 + │ KV bytes read: 8 B + │ KV gRPC calls: 1 + │ estimated max memory allocated: 0 B + │ table: t137352@t137352_pkey + │ equality: (k) = (k) + │ equality cols are key + │ locking strength: for update + │ + └── • lookup join (left outer) + │ sql nodes: + │ kv nodes: + │ regions: + │ actual row count: 1 + │ KV time: 0µs + │ KV contention time: 0µs + │ KV rows decoded: 1 + │ KV pairs read: 2 + │ KV bytes read: 8 B + │ KV gRPC calls: 1 + │ estimated max memory allocated: 0 B + │ table: t137352@t137352_a_key + │ equality: (column2) = (a) + │ equality cols are key + │ locking strength: for update + │ + └── • values + sql nodes: + regions: + actual row count: 1 + size: 3 columns, 1 row