Skip to content

Commit

Permalink
WIP on extending CPut with FailOnTombstones option
Browse files Browse the repository at this point in the history
  • Loading branch information
yuzefovich committed Dec 20, 2024
1 parent 721f467 commit cb96693
Show file tree
Hide file tree
Showing 13 changed files with 95 additions and 24 deletions.
22 changes: 15 additions & 7 deletions pkg/kv/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -544,14 +544,20 @@ func (b *Batch) PutInline(key, value interface{}) {
// expValue needs to correspond to a Value.TagAndDataBytes() - i.e. a key's
// value without the checksum (as the checksum includes the key too).
func (b *Batch) CPut(key, value interface{}, expValue []byte) {
b.cputInternal(key, value, expValue, false, false)
b.cputInternal(key, value, expValue, false, false, false)
}

// CPutAllowingIfNotExists is like CPut except it also allows the Put when the
// existing entry does not exist -- i.e. it succeeds if there is no existing
// entry or the existing entry has the expected value.
func (b *Batch) CPutAllowingIfNotExists(key, value interface{}, expValue []byte) {
b.cputInternal(key, value, expValue, true, false)
b.cputInternal(key, value, expValue, true, false, false)
}

// CPutFailOnTombstones is like CPut except it fails with a
// ConditionedFailedError if it encounters a tombstone.
func (b *Batch) CPutFailOnTombstones(key, value interface{}, expValue []byte) {
b.cputInternal(key, value, expValue, false, false, true)
}

// CPutWithOriginTimestamp is like CPut except that it also sets the
Expand All @@ -577,7 +583,7 @@ func (b *Batch) CPutWithOriginTimestamp(
b.initResult(0, 1, notRaw, err)
return
}
r := kvpb.NewConditionalPut(k, v, expValue, false)
r := kvpb.NewConditionalPut(k, v, expValue, false, false)
r.(*kvpb.ConditionalPutRequest).OriginTimestamp = ts
r.(*kvpb.ConditionalPutRequest).ShouldWinOriginTimestampTie = shouldWinTie
b.appendReqs(r)
Expand All @@ -600,11 +606,11 @@ func (b *Batch) CPutWithOriginTimestamp(
// A nil value can be used to delete the respective key, since there is no
// DelInline(). This is different from CPut().
func (b *Batch) CPutInline(key, value interface{}, expValue []byte) {
b.cputInternal(key, value, expValue, false, true)
b.cputInternal(key, value, expValue, false, true, false)
}

func (b *Batch) cputInternal(
key, value interface{}, expValue []byte, allowNotExist bool, inline bool,
key, value interface{}, expValue []byte, allowNotExist bool, inline bool, failOnTombstones bool,
) {
k, err := marshalKey(key)
if err != nil {
Expand All @@ -617,9 +623,9 @@ func (b *Batch) cputInternal(
return
}
if inline {
b.appendReqs(kvpb.NewConditionalPutInline(k, v, expValue, allowNotExist))
b.appendReqs(kvpb.NewConditionalPutInline(k, v, expValue, allowNotExist, failOnTombstones))
} else {
b.appendReqs(kvpb.NewConditionalPut(k, v, expValue, allowNotExist))
b.appendReqs(kvpb.NewConditionalPut(k, v, expValue, allowNotExist, failOnTombstones))
}
b.approxMutationReqBytes += len(k) + len(v.RawBytes)
b.initResult(1, 1, notRaw, nil)
Expand Down Expand Up @@ -711,6 +717,8 @@ func (b *Batch) CPutValuesEmpty(bs BulkSource[roachpb.Value]) {
// key can be either a byte slice or a string. value can be any key type, a
// protoutil.Message or any Go primitive type (bool, int, etc). It is illegal
// to set value to nil.
// TODO(yuzefovich): this can be removed after compatibility with 25.1 is no
// longer needed.
func (b *Batch) InitPut(key, value interface{}, failOnTombstones bool) {
k, err := marshalKey(key)
if err != nil {
Expand Down
12 changes: 10 additions & 2 deletions pkg/kv/kvpb/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -1656,7 +1656,7 @@ func NewPutInline(key roachpb.Key, value roachpb.Value) Request {
// them. The caller retains ownership of expVal; NewConditionalPut will copy it
// into the request.
func NewConditionalPut(
key roachpb.Key, value roachpb.Value, expValue []byte, allowNotExist bool,
key roachpb.Key, value roachpb.Value, expValue []byte, allowNotExist bool, failOnTombstones bool,
) Request {
value.InitChecksum(key)
return &ConditionalPutRequest{
Expand All @@ -1666,6 +1666,7 @@ func NewConditionalPut(
Value: value,
ExpBytes: expValue,
AllowIfDoesNotExist: allowNotExist,
FailOnTombstones: failOnTombstones,
}
}

Expand All @@ -1676,7 +1677,7 @@ func NewConditionalPut(
// them. The caller retains ownership of expVal; NewConditionalPut will copy it
// into the request.
func NewConditionalPutInline(
key roachpb.Key, value roachpb.Value, expValue []byte, allowNotExist bool,
key roachpb.Key, value roachpb.Value, expValue []byte, allowNotExist bool, failOnTombstones bool,
) Request {
value.InitChecksum(key)
return &ConditionalPutRequest{
Expand All @@ -1687,6 +1688,7 @@ func NewConditionalPutInline(
ExpBytes: expValue,
AllowIfDoesNotExist: allowNotExist,
Inline: true,
FailOnTombstones: failOnTombstones,
}
}

Expand Down Expand Up @@ -2555,10 +2557,16 @@ func (writeOptions *WriteOptions) GetOriginTimestamp() hlc.Timestamp {
}

func (r *ConditionalPutRequest) Validate() error {
if r.AllowIfDoesNotExist && r.FailOnTombstones {
return errors.AssertionFailedf("invalid ConditionalPutRequest: AllowIfDoesNotExist and FailOnTombstones are incompatible")
}
if !r.OriginTimestamp.IsEmpty() {
if r.AllowIfDoesNotExist {
return errors.AssertionFailedf("invalid ConditionalPutRequest: AllowIfDoesNotExist and non-empty OriginTimestamp are incompatible")
}
if r.FailOnTombstones {
return errors.AssertionFailedf("invalid ConditionalPutRequest: FailOnTombstones and non-empty OriginTimestamp are incompatible")
}
}
return nil
}
10 changes: 10 additions & 0 deletions pkg/kv/kvpb/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,8 @@ message ConditionalPutRequest {
// exist with that value. Passing this indicates that it is also OK if the key
// does not exist. This is useful when a given value is expected but it is
// possible it has not yet been written.
//
// Cannot be combined with FailOnTombstones nor with OriginTimestamp.
bool allow_if_does_not_exist = 5;
// Specify as true to put the value without a corresponding
// timestamp. This option should be used with care as it precludes
Expand Down Expand Up @@ -371,6 +373,11 @@ message ConditionalPutRequest {
//
// This must only be used in conjunction with OriginTimestamp.
bool should_win_origin_timestamp_tie = 9;

// If true, tombstones cause ConditionFailedErrors.
//
// Cannot be combined with AllowIfDoesNotExit nor with OriginTimestamp.
bool failOnTombstones = 10;
}

// A ConditionalPutResponse is the return value from the
Expand All @@ -385,6 +392,9 @@ message ConditionalPutResponse {
// - If key exists, returns a ConditionFailedError if value != existing value
// If failOnTombstones is set to true, tombstone values count as mismatched
// values and will cause a ConditionFailedError.
// TODO(yuzefovich): this can be removed once the compatibility with the version
// in which we stopped issuing InitPuts (some time after 25.1) is no longer
// needed.
message InitPutRequest {
RequestHeader header = 1 [(gogoproto.nullable) = false, (gogoproto.embed) = true];
Value value = 2 [(gogoproto.nullable) = false];
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/batcheval/cmd_conditional_put.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ func ConditionalPut(
return result.Result{}, errors.AssertionFailedf("OriginTimestamp cannot be passed via CPut arg and in request header")
}

failOnTombstones := args.FailOnTombstones && !cArgs.EvalCtx.EvalKnobs().DisableInitPutCPutFailOnTombstones
opts := storage.ConditionalPutWriteOptions{
MVCCWriteOptions: storage.MVCCWriteOptions{
Txn: h.Txn,
Expand All @@ -80,6 +81,7 @@ func ConditionalPut(
Category: fs.BatchEvalReadCategory,
},
AllowIfDoesNotExist: storage.CPutMissingBehavior(args.AllowIfDoesNotExist),
FailOnTombstones: storage.CPutTombstoneBehavior(failOnTombstones),
OriginTimestamp: args.OriginTimestamp,
ShouldWinOriginTimestampTie: args.ShouldWinOriginTimestampTie,
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/batcheval/cmd_init_put.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func InitPut(
args := cArgs.Args.(*kvpb.InitPutRequest)
h := cArgs.Header

failOnTombstones := args.FailOnTombstones && !cArgs.EvalCtx.EvalKnobs().DisableInitPutFailOnTombstones
failOnTombstones := args.FailOnTombstones && !cArgs.EvalCtx.EvalKnobs().DisableInitPutCPutFailOnTombstones
opts := storage.MVCCWriteOptions{
Txn: h.Txn,
LocalTimestamp: cArgs.Now,
Expand Down
9 changes: 5 additions & 4 deletions pkg/kv/kvserver/kvserverbase/knobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,11 @@ type BatchEvalTestingKnobs struct {
// explanation of why.
AllowGCWithNewThresholdAndKeys bool

// DisableInitPutFailOnTombstones disables FailOnTombstones for InitPut. This
// is useful together with e.g. StoreTestingKnobs.GlobalMVCCRangeTombstone,
// where we still want InitPut to succeed on top of the range tombstone.
DisableInitPutFailOnTombstones bool
// DisableInitPutCPutFailOnTombstones disables FailOnTombstones for InitPut
// and CPut. This is useful together with e.g.
// StoreTestingKnobs.GlobalMVCCRangeTombstone, where we still want InitPut
// and CPut to succeed on top of the range tombstone.
DisableInitPutCPutFailOnTombstones bool

// UseRangeTombstonesForPointDeletes will use point-sized MVCC range
// tombstones when deleting point keys, to increase test coverage. These
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1372,7 +1372,7 @@ func cPutArgs(key roachpb.Key, value, expValue []byte) kvpb.ConditionalPutReques
if expValue != nil {
expValue = roachpb.MakeValueFromBytes(expValue).TagAndDataBytes()
}
req := kvpb.NewConditionalPut(key, roachpb.MakeValueFromBytes(value), expValue, false /* allowNotExist */)
req := kvpb.NewConditionalPut(key, roachpb.MakeValueFromBytes(value), expValue, false /* allowNotExist */, false /* failOnTombstones */)
return *req.(*kvpb.ConditionalPutRequest)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ func (cfg *BaseConfig) InitTestingKnobs() {
}
storeKnobs := cfg.TestingKnobs.Store.(*kvserver.StoreTestingKnobs)
storeKnobs.GlobalMVCCRangeTombstone = true
storeKnobs.EvalKnobs.DisableInitPutFailOnTombstones = true
storeKnobs.EvalKnobs.DisableInitPutCPutFailOnTombstones = true
cfg.TestingKnobs.RangeFeed.(*rangefeed.TestingKnobs).IgnoreOnDeleteRangeError = true
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/logictest/logic.go
Original file line number Diff line number Diff line change
Expand Up @@ -1536,8 +1536,8 @@ func (t *logicTest) newCluster(
DisableConsistencyQueue: true,
GlobalMVCCRangeTombstone: globalMVCCRangeTombstone,
EvalKnobs: kvserverbase.BatchEvalTestingKnobs{
DisableInitPutFailOnTombstones: ignoreMVCCRangeTombstoneErrors,
UseRangeTombstonesForPointDeletes: shouldUseMVCCRangeTombstonesForPointDeletes,
DisableInitPutCPutFailOnTombstones: ignoreMVCCRangeTombstoneErrors,
UseRangeTombstonesForPointDeletes: shouldUseMVCCRangeTombstonesForPointDeletes,
},
},
RangeFeed: &rangefeed.TestingKnobs{
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/sqlliveness/slstorage/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ go_library(
importpath = "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slstorage",
visibility = ["//visibility:public"],
deps = [
"//pkg/clusterversion",
"//pkg/keys",
"//pkg/kv",
"//pkg/kv/kvpb",
Expand Down
7 changes: 6 additions & 1 deletion pkg/sql/sqlliveness/slstorage/slstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"math/rand"
"time"

"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
Expand Down Expand Up @@ -529,7 +530,11 @@ func (s *Storage) Insert(

}
v := encodeValue(expiration)
batch.InitPut(k, &v, true)
if s.settings.Version.IsActive(ctx, clusterversion.V25_1) {
batch.CPutFailOnTombstones(k, &v, nil /* expValue */)
} else {
batch.InitPut(k, &v, true)
}

return txn.CommitInBatch(ctx, batch)
}); err != nil {
Expand Down
4 changes: 3 additions & 1 deletion pkg/storage/metamorphic/operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,9 @@ func (m mvccCPutOp) run(ctx context.Context) string {

_, err := storage.MVCCConditionalPut(ctx, writer, m.key,
txn.ReadTimestamp, m.value, m.expVal, storage.ConditionalPutWriteOptions{
MVCCWriteOptions: storage.MVCCWriteOptions{Txn: txn},
MVCCWriteOptions: storage.MVCCWriteOptions{Txn: txn},
// TODO: why we unconditionally set this flag?
// TODO: should we add FailOnTombstones?
AllowIfDoesNotExist: true,
})
if err != nil {
Expand Down
42 changes: 38 additions & 4 deletions pkg/storage/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -2883,12 +2883,25 @@ const (
CPutFailIfMissing CPutMissingBehavior = false
)

// CPutTombstoneBehavior describes the handling of a tombstone.
type CPutTombstoneBehavior bool

const (
// CPutIgnoreTombstones is used to indicate that CPut should ignore the
// tombstones and treat them as key non-existing.
CPutIgnoreTombstones CPutTombstoneBehavior = false
// CPutFailOnTombstones is used to indicate that CPut should fail if it
// finds a tombstone.
CPutFailOnTombstones CPutTombstoneBehavior = true
)

// ConditionalPutWriteOptions bundles options for the
// MVCCConditionalPut and MVCCBlindConditionalPut functions.
type ConditionalPutWriteOptions struct {
MVCCWriteOptions

AllowIfDoesNotExist CPutMissingBehavior
FailOnTombstones CPutTombstoneBehavior
// OriginTimestamp, if set, indicates that the caller wants to put the
// value only if any existing key is older than this timestamp.
//
Expand Down Expand Up @@ -3011,10 +3024,16 @@ func mvccConditionalPutUsingIter(
expBytes []byte,
opts ConditionalPutWriteOptions,
) (roachpb.LockAcquisition, error) {
if bool(opts.AllowIfDoesNotExist) && bool(opts.FailOnTombstones) {
return roachpb.LockAcquisition{}, errors.AssertionFailedf("AllowIfDoesNotExist and FailOnTombstones are incompatible")
}
if !opts.OriginTimestamp.IsEmpty() {
if bool(opts.AllowIfDoesNotExist) {
return roachpb.LockAcquisition{}, errors.AssertionFailedf("AllowIfDoesNotExist and non-zero OriginTimestamp are incompatible")
}
if bool(opts.FailOnTombstones) {
return roachpb.LockAcquisition{}, errors.AssertionFailedf("FailOnTombstones and non-zero OriginTimestamp are incompatible")
}
putIsInline := timestamp.IsEmpty()
if putIsInline {
return roachpb.LockAcquisition{}, errors.AssertionFailedf("inline put and non-zero OriginTimestamp are incompatible")
Expand All @@ -3023,11 +3042,26 @@ func mvccConditionalPutUsingIter(

var valueFn func(existVal optionalValue) (roachpb.Value, error)
if opts.OriginTimestamp.IsEmpty() {
valueFn = func(actualValue optionalValue) (roachpb.Value, error) {
if err := maybeConditionFailedError(expBytes, actualValue, bool(opts.AllowIfDoesNotExist)); err != nil {
return roachpb.Value{}, err
if opts.FailOnTombstones {
valueFn = func(existVal optionalValue) (roachpb.Value, error) {
if existVal.IsTombstone() {
// We found a tombstone and FailOnTombstones is true: fail.
return roachpb.Value{}, &kvpb.ConditionFailedError{
ActualValue: existVal.ToPointer(),
}
}
if err := maybeConditionFailedError(expBytes, existVal, false /* allowNoExisting */); err != nil {
return roachpb.Value{}, err
}
return value, nil
}
} else {
valueFn = func(existVal optionalValue) (roachpb.Value, error) {
if err := maybeConditionFailedError(expBytes, existVal, bool(opts.AllowIfDoesNotExist)); err != nil {
return roachpb.Value{}, err
}
return value, nil
}
return value, nil
}
} else {
valueFn = func(existVal optionalValue) (roachpb.Value, error) {
Expand Down

0 comments on commit cb96693

Please sign in to comment.