From cb96693cf3d80cc3dd67f76d2a93e06f1adcd856 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Thu, 19 Dec 2024 20:09:28 -0800 Subject: [PATCH] WIP on extending CPut with FailOnTombstones option --- pkg/kv/batch.go | 22 ++++++---- pkg/kv/kvpb/api.go | 12 +++++- pkg/kv/kvpb/api.proto | 10 +++++ .../kvserver/batcheval/cmd_conditional_put.go | 2 + pkg/kv/kvserver/batcheval/cmd_init_put.go | 2 +- pkg/kv/kvserver/kvserverbase/knobs.go | 9 ++-- pkg/kv/kvserver/replica_test.go | 2 +- pkg/server/config.go | 2 +- pkg/sql/logictest/logic.go | 4 +- pkg/sql/sqlliveness/slstorage/BUILD.bazel | 1 + pkg/sql/sqlliveness/slstorage/slstorage.go | 7 +++- pkg/storage/metamorphic/operations.go | 4 +- pkg/storage/mvcc.go | 42 +++++++++++++++++-- 13 files changed, 95 insertions(+), 24 deletions(-) diff --git a/pkg/kv/batch.go b/pkg/kv/batch.go index 56eefac61726..7c76ee8951cf 100644 --- a/pkg/kv/batch.go +++ b/pkg/kv/batch.go @@ -544,14 +544,20 @@ func (b *Batch) PutInline(key, value interface{}) { // expValue needs to correspond to a Value.TagAndDataBytes() - i.e. a key's // value without the checksum (as the checksum includes the key too). func (b *Batch) CPut(key, value interface{}, expValue []byte) { - b.cputInternal(key, value, expValue, false, false) + b.cputInternal(key, value, expValue, false, false, false) } // CPutAllowingIfNotExists is like CPut except it also allows the Put when the // existing entry does not exist -- i.e. it succeeds if there is no existing // entry or the existing entry has the expected value. func (b *Batch) CPutAllowingIfNotExists(key, value interface{}, expValue []byte) { - b.cputInternal(key, value, expValue, true, false) + b.cputInternal(key, value, expValue, true, false, false) +} + +// CPutFailOnTombstones is like CPut except it fails with a +// ConditionedFailedError if it encounters a tombstone. +func (b *Batch) CPutFailOnTombstones(key, value interface{}, expValue []byte) { + b.cputInternal(key, value, expValue, false, false, true) } // CPutWithOriginTimestamp is like CPut except that it also sets the @@ -577,7 +583,7 @@ func (b *Batch) CPutWithOriginTimestamp( b.initResult(0, 1, notRaw, err) return } - r := kvpb.NewConditionalPut(k, v, expValue, false) + r := kvpb.NewConditionalPut(k, v, expValue, false, false) r.(*kvpb.ConditionalPutRequest).OriginTimestamp = ts r.(*kvpb.ConditionalPutRequest).ShouldWinOriginTimestampTie = shouldWinTie b.appendReqs(r) @@ -600,11 +606,11 @@ func (b *Batch) CPutWithOriginTimestamp( // A nil value can be used to delete the respective key, since there is no // DelInline(). This is different from CPut(). func (b *Batch) CPutInline(key, value interface{}, expValue []byte) { - b.cputInternal(key, value, expValue, false, true) + b.cputInternal(key, value, expValue, false, true, false) } func (b *Batch) cputInternal( - key, value interface{}, expValue []byte, allowNotExist bool, inline bool, + key, value interface{}, expValue []byte, allowNotExist bool, inline bool, failOnTombstones bool, ) { k, err := marshalKey(key) if err != nil { @@ -617,9 +623,9 @@ func (b *Batch) cputInternal( return } if inline { - b.appendReqs(kvpb.NewConditionalPutInline(k, v, expValue, allowNotExist)) + b.appendReqs(kvpb.NewConditionalPutInline(k, v, expValue, allowNotExist, failOnTombstones)) } else { - b.appendReqs(kvpb.NewConditionalPut(k, v, expValue, allowNotExist)) + b.appendReqs(kvpb.NewConditionalPut(k, v, expValue, allowNotExist, failOnTombstones)) } b.approxMutationReqBytes += len(k) + len(v.RawBytes) b.initResult(1, 1, notRaw, nil) @@ -711,6 +717,8 @@ func (b *Batch) CPutValuesEmpty(bs BulkSource[roachpb.Value]) { // key can be either a byte slice or a string. value can be any key type, a // protoutil.Message or any Go primitive type (bool, int, etc). It is illegal // to set value to nil. +// TODO(yuzefovich): this can be removed after compatibility with 25.1 is no +// longer needed. func (b *Batch) InitPut(key, value interface{}, failOnTombstones bool) { k, err := marshalKey(key) if err != nil { diff --git a/pkg/kv/kvpb/api.go b/pkg/kv/kvpb/api.go index 228a3dddf1b7..3a4644dadfc0 100644 --- a/pkg/kv/kvpb/api.go +++ b/pkg/kv/kvpb/api.go @@ -1656,7 +1656,7 @@ func NewPutInline(key roachpb.Key, value roachpb.Value) Request { // them. The caller retains ownership of expVal; NewConditionalPut will copy it // into the request. func NewConditionalPut( - key roachpb.Key, value roachpb.Value, expValue []byte, allowNotExist bool, + key roachpb.Key, value roachpb.Value, expValue []byte, allowNotExist bool, failOnTombstones bool, ) Request { value.InitChecksum(key) return &ConditionalPutRequest{ @@ -1666,6 +1666,7 @@ func NewConditionalPut( Value: value, ExpBytes: expValue, AllowIfDoesNotExist: allowNotExist, + FailOnTombstones: failOnTombstones, } } @@ -1676,7 +1677,7 @@ func NewConditionalPut( // them. The caller retains ownership of expVal; NewConditionalPut will copy it // into the request. func NewConditionalPutInline( - key roachpb.Key, value roachpb.Value, expValue []byte, allowNotExist bool, + key roachpb.Key, value roachpb.Value, expValue []byte, allowNotExist bool, failOnTombstones bool, ) Request { value.InitChecksum(key) return &ConditionalPutRequest{ @@ -1687,6 +1688,7 @@ func NewConditionalPutInline( ExpBytes: expValue, AllowIfDoesNotExist: allowNotExist, Inline: true, + FailOnTombstones: failOnTombstones, } } @@ -2555,10 +2557,16 @@ func (writeOptions *WriteOptions) GetOriginTimestamp() hlc.Timestamp { } func (r *ConditionalPutRequest) Validate() error { + if r.AllowIfDoesNotExist && r.FailOnTombstones { + return errors.AssertionFailedf("invalid ConditionalPutRequest: AllowIfDoesNotExist and FailOnTombstones are incompatible") + } if !r.OriginTimestamp.IsEmpty() { if r.AllowIfDoesNotExist { return errors.AssertionFailedf("invalid ConditionalPutRequest: AllowIfDoesNotExist and non-empty OriginTimestamp are incompatible") } + if r.FailOnTombstones { + return errors.AssertionFailedf("invalid ConditionalPutRequest: FailOnTombstones and non-empty OriginTimestamp are incompatible") + } } return nil } diff --git a/pkg/kv/kvpb/api.proto b/pkg/kv/kvpb/api.proto index 87a225e1cec1..0a20b2caf8e2 100644 --- a/pkg/kv/kvpb/api.proto +++ b/pkg/kv/kvpb/api.proto @@ -333,6 +333,8 @@ message ConditionalPutRequest { // exist with that value. Passing this indicates that it is also OK if the key // does not exist. This is useful when a given value is expected but it is // possible it has not yet been written. + // + // Cannot be combined with FailOnTombstones nor with OriginTimestamp. bool allow_if_does_not_exist = 5; // Specify as true to put the value without a corresponding // timestamp. This option should be used with care as it precludes @@ -371,6 +373,11 @@ message ConditionalPutRequest { // // This must only be used in conjunction with OriginTimestamp. bool should_win_origin_timestamp_tie = 9; + + // If true, tombstones cause ConditionFailedErrors. + // + // Cannot be combined with AllowIfDoesNotExit nor with OriginTimestamp. + bool failOnTombstones = 10; } // A ConditionalPutResponse is the return value from the @@ -385,6 +392,9 @@ message ConditionalPutResponse { // - If key exists, returns a ConditionFailedError if value != existing value // If failOnTombstones is set to true, tombstone values count as mismatched // values and will cause a ConditionFailedError. +// TODO(yuzefovich): this can be removed once the compatibility with the version +// in which we stopped issuing InitPuts (some time after 25.1) is no longer +// needed. message InitPutRequest { RequestHeader header = 1 [(gogoproto.nullable) = false, (gogoproto.embed) = true]; Value value = 2 [(gogoproto.nullable) = false]; diff --git a/pkg/kv/kvserver/batcheval/cmd_conditional_put.go b/pkg/kv/kvserver/batcheval/cmd_conditional_put.go index ad60521030fd..9d806c43b240 100644 --- a/pkg/kv/kvserver/batcheval/cmd_conditional_put.go +++ b/pkg/kv/kvserver/batcheval/cmd_conditional_put.go @@ -66,6 +66,7 @@ func ConditionalPut( return result.Result{}, errors.AssertionFailedf("OriginTimestamp cannot be passed via CPut arg and in request header") } + failOnTombstones := args.FailOnTombstones && !cArgs.EvalCtx.EvalKnobs().DisableInitPutCPutFailOnTombstones opts := storage.ConditionalPutWriteOptions{ MVCCWriteOptions: storage.MVCCWriteOptions{ Txn: h.Txn, @@ -80,6 +81,7 @@ func ConditionalPut( Category: fs.BatchEvalReadCategory, }, AllowIfDoesNotExist: storage.CPutMissingBehavior(args.AllowIfDoesNotExist), + FailOnTombstones: storage.CPutTombstoneBehavior(failOnTombstones), OriginTimestamp: args.OriginTimestamp, ShouldWinOriginTimestampTie: args.ShouldWinOriginTimestampTie, } diff --git a/pkg/kv/kvserver/batcheval/cmd_init_put.go b/pkg/kv/kvserver/batcheval/cmd_init_put.go index 3fbd70236528..a251e29bc5da 100644 --- a/pkg/kv/kvserver/batcheval/cmd_init_put.go +++ b/pkg/kv/kvserver/batcheval/cmd_init_put.go @@ -29,7 +29,7 @@ func InitPut( args := cArgs.Args.(*kvpb.InitPutRequest) h := cArgs.Header - failOnTombstones := args.FailOnTombstones && !cArgs.EvalCtx.EvalKnobs().DisableInitPutFailOnTombstones + failOnTombstones := args.FailOnTombstones && !cArgs.EvalCtx.EvalKnobs().DisableInitPutCPutFailOnTombstones opts := storage.MVCCWriteOptions{ Txn: h.Txn, LocalTimestamp: cArgs.Now, diff --git a/pkg/kv/kvserver/kvserverbase/knobs.go b/pkg/kv/kvserver/kvserverbase/knobs.go index 84536ad425db..566ac20fbaa6 100644 --- a/pkg/kv/kvserver/kvserverbase/knobs.go +++ b/pkg/kv/kvserver/kvserverbase/knobs.go @@ -36,10 +36,11 @@ type BatchEvalTestingKnobs struct { // explanation of why. AllowGCWithNewThresholdAndKeys bool - // DisableInitPutFailOnTombstones disables FailOnTombstones for InitPut. This - // is useful together with e.g. StoreTestingKnobs.GlobalMVCCRangeTombstone, - // where we still want InitPut to succeed on top of the range tombstone. - DisableInitPutFailOnTombstones bool + // DisableInitPutCPutFailOnTombstones disables FailOnTombstones for InitPut + // and CPut. This is useful together with e.g. + // StoreTestingKnobs.GlobalMVCCRangeTombstone, where we still want InitPut + // and CPut to succeed on top of the range tombstone. + DisableInitPutCPutFailOnTombstones bool // UseRangeTombstonesForPointDeletes will use point-sized MVCC range // tombstones when deleting point keys, to increase test coverage. These diff --git a/pkg/kv/kvserver/replica_test.go b/pkg/kv/kvserver/replica_test.go index 45f9f6222bf3..8b4ab89e2551 100644 --- a/pkg/kv/kvserver/replica_test.go +++ b/pkg/kv/kvserver/replica_test.go @@ -1372,7 +1372,7 @@ func cPutArgs(key roachpb.Key, value, expValue []byte) kvpb.ConditionalPutReques if expValue != nil { expValue = roachpb.MakeValueFromBytes(expValue).TagAndDataBytes() } - req := kvpb.NewConditionalPut(key, roachpb.MakeValueFromBytes(value), expValue, false /* allowNotExist */) + req := kvpb.NewConditionalPut(key, roachpb.MakeValueFromBytes(value), expValue, false /* allowNotExist */, false /* failOnTombstones */) return *req.(*kvpb.ConditionalPutRequest) } diff --git a/pkg/server/config.go b/pkg/server/config.go index b0699a061a7e..3b6a58561efa 100644 --- a/pkg/server/config.go +++ b/pkg/server/config.go @@ -348,7 +348,7 @@ func (cfg *BaseConfig) InitTestingKnobs() { } storeKnobs := cfg.TestingKnobs.Store.(*kvserver.StoreTestingKnobs) storeKnobs.GlobalMVCCRangeTombstone = true - storeKnobs.EvalKnobs.DisableInitPutFailOnTombstones = true + storeKnobs.EvalKnobs.DisableInitPutCPutFailOnTombstones = true cfg.TestingKnobs.RangeFeed.(*rangefeed.TestingKnobs).IgnoreOnDeleteRangeError = true } diff --git a/pkg/sql/logictest/logic.go b/pkg/sql/logictest/logic.go index afc1d1825b10..cb85228bcee3 100644 --- a/pkg/sql/logictest/logic.go +++ b/pkg/sql/logictest/logic.go @@ -1536,8 +1536,8 @@ func (t *logicTest) newCluster( DisableConsistencyQueue: true, GlobalMVCCRangeTombstone: globalMVCCRangeTombstone, EvalKnobs: kvserverbase.BatchEvalTestingKnobs{ - DisableInitPutFailOnTombstones: ignoreMVCCRangeTombstoneErrors, - UseRangeTombstonesForPointDeletes: shouldUseMVCCRangeTombstonesForPointDeletes, + DisableInitPutCPutFailOnTombstones: ignoreMVCCRangeTombstoneErrors, + UseRangeTombstonesForPointDeletes: shouldUseMVCCRangeTombstonesForPointDeletes, }, }, RangeFeed: &rangefeed.TestingKnobs{ diff --git a/pkg/sql/sqlliveness/slstorage/BUILD.bazel b/pkg/sql/sqlliveness/slstorage/BUILD.bazel index 5122e74e8a8c..526e62d3c2c9 100644 --- a/pkg/sql/sqlliveness/slstorage/BUILD.bazel +++ b/pkg/sql/sqlliveness/slstorage/BUILD.bazel @@ -13,6 +13,7 @@ go_library( importpath = "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slstorage", visibility = ["//visibility:public"], deps = [ + "//pkg/clusterversion", "//pkg/keys", "//pkg/kv", "//pkg/kv/kvpb", diff --git a/pkg/sql/sqlliveness/slstorage/slstorage.go b/pkg/sql/sqlliveness/slstorage/slstorage.go index 245860c12575..df121c2e99a9 100644 --- a/pkg/sql/sqlliveness/slstorage/slstorage.go +++ b/pkg/sql/sqlliveness/slstorage/slstorage.go @@ -10,6 +10,7 @@ import ( "math/rand" "time" + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" @@ -529,7 +530,11 @@ func (s *Storage) Insert( } v := encodeValue(expiration) - batch.InitPut(k, &v, true) + if s.settings.Version.IsActive(ctx, clusterversion.V25_1) { + batch.CPutFailOnTombstones(k, &v, nil /* expValue */) + } else { + batch.InitPut(k, &v, true) + } return txn.CommitInBatch(ctx, batch) }); err != nil { diff --git a/pkg/storage/metamorphic/operations.go b/pkg/storage/metamorphic/operations.go index c898ce308a5c..2a3dd6728a96 100644 --- a/pkg/storage/metamorphic/operations.go +++ b/pkg/storage/metamorphic/operations.go @@ -235,7 +235,9 @@ func (m mvccCPutOp) run(ctx context.Context) string { _, err := storage.MVCCConditionalPut(ctx, writer, m.key, txn.ReadTimestamp, m.value, m.expVal, storage.ConditionalPutWriteOptions{ - MVCCWriteOptions: storage.MVCCWriteOptions{Txn: txn}, + MVCCWriteOptions: storage.MVCCWriteOptions{Txn: txn}, + // TODO: why we unconditionally set this flag? + // TODO: should we add FailOnTombstones? AllowIfDoesNotExist: true, }) if err != nil { diff --git a/pkg/storage/mvcc.go b/pkg/storage/mvcc.go index 0287966d1f56..a7ac53dfb5cd 100644 --- a/pkg/storage/mvcc.go +++ b/pkg/storage/mvcc.go @@ -2883,12 +2883,25 @@ const ( CPutFailIfMissing CPutMissingBehavior = false ) +// CPutTombstoneBehavior describes the handling of a tombstone. +type CPutTombstoneBehavior bool + +const ( + // CPutIgnoreTombstones is used to indicate that CPut should ignore the + // tombstones and treat them as key non-existing. + CPutIgnoreTombstones CPutTombstoneBehavior = false + // CPutFailOnTombstones is used to indicate that CPut should fail if it + // finds a tombstone. + CPutFailOnTombstones CPutTombstoneBehavior = true +) + // ConditionalPutWriteOptions bundles options for the // MVCCConditionalPut and MVCCBlindConditionalPut functions. type ConditionalPutWriteOptions struct { MVCCWriteOptions AllowIfDoesNotExist CPutMissingBehavior + FailOnTombstones CPutTombstoneBehavior // OriginTimestamp, if set, indicates that the caller wants to put the // value only if any existing key is older than this timestamp. // @@ -3011,10 +3024,16 @@ func mvccConditionalPutUsingIter( expBytes []byte, opts ConditionalPutWriteOptions, ) (roachpb.LockAcquisition, error) { + if bool(opts.AllowIfDoesNotExist) && bool(opts.FailOnTombstones) { + return roachpb.LockAcquisition{}, errors.AssertionFailedf("AllowIfDoesNotExist and FailOnTombstones are incompatible") + } if !opts.OriginTimestamp.IsEmpty() { if bool(opts.AllowIfDoesNotExist) { return roachpb.LockAcquisition{}, errors.AssertionFailedf("AllowIfDoesNotExist and non-zero OriginTimestamp are incompatible") } + if bool(opts.FailOnTombstones) { + return roachpb.LockAcquisition{}, errors.AssertionFailedf("FailOnTombstones and non-zero OriginTimestamp are incompatible") + } putIsInline := timestamp.IsEmpty() if putIsInline { return roachpb.LockAcquisition{}, errors.AssertionFailedf("inline put and non-zero OriginTimestamp are incompatible") @@ -3023,11 +3042,26 @@ func mvccConditionalPutUsingIter( var valueFn func(existVal optionalValue) (roachpb.Value, error) if opts.OriginTimestamp.IsEmpty() { - valueFn = func(actualValue optionalValue) (roachpb.Value, error) { - if err := maybeConditionFailedError(expBytes, actualValue, bool(opts.AllowIfDoesNotExist)); err != nil { - return roachpb.Value{}, err + if opts.FailOnTombstones { + valueFn = func(existVal optionalValue) (roachpb.Value, error) { + if existVal.IsTombstone() { + // We found a tombstone and FailOnTombstones is true: fail. + return roachpb.Value{}, &kvpb.ConditionFailedError{ + ActualValue: existVal.ToPointer(), + } + } + if err := maybeConditionFailedError(expBytes, existVal, false /* allowNoExisting */); err != nil { + return roachpb.Value{}, err + } + return value, nil + } + } else { + valueFn = func(existVal optionalValue) (roachpb.Value, error) { + if err := maybeConditionFailedError(expBytes, existVal, bool(opts.AllowIfDoesNotExist)); err != nil { + return roachpb.Value{}, err + } + return value, nil } - return value, nil } } else { valueFn = func(existVal optionalValue) (roachpb.Value, error) {