Skip to content

Commit

Permalink
batcheval: refactor delete range
Browse files Browse the repository at this point in the history
This refactors delete range to make it clear the implementation has
three mutually exclusive implementations. The idempotent implementation
is used by components like schema changer to delete whole table spans.
The `predicates` implementation is used by import rollback. The
transactional implementation has many users including sql and the time
series db.

Release Note: none
Informs: #131844
  • Loading branch information
jeffswenson committed Dec 19, 2024
1 parent 5213196 commit 23eecbc
Showing 1 changed file with 141 additions and 109 deletions.
250 changes: 141 additions & 109 deletions pkg/kv/kvserver/batcheval/cmd_delete_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,29 +89,15 @@ func DeleteRange(
ctx context.Context, readWriter storage.ReadWriter, cArgs CommandArgs, resp kvpb.Response,
) (result.Result, error) {
args := cArgs.Args.(*kvpb.DeleteRangeRequest)
h := cArgs.Header
reply := resp.(*kvpb.DeleteRangeResponse)

if args.Predicates != (kvpb.DeleteRangePredicates{}) && !args.UseRangeTombstone {
// This ensures predicate based DeleteRange piggybacks on the version gate,
// roachpb api flags, and latch declarations used by the UseRangeTombstone.
return result.Result{}, errors.AssertionFailedf(
"UseRangeTombstones must be passed with predicate based Delete Range")
}

if args.UpdateRangeDeleteGCHint && !args.UseRangeTombstone {
// Check for prerequisite for gc hint. If it doesn't hold, this is incorrect
// usage of hint.
return result.Result{}, errors.AssertionFailedf(
"GCRangeHint must only be used together with UseRangeTombstone")
}

if args.Predicates.ImportEpoch > 0 && !args.Predicates.StartTime.IsEmpty() {
return result.Result{}, errors.AssertionFailedf(
"DeleteRangePredicate should not have both non-zero ImportEpoch and non-empty StartTime")
}

// Use MVCC range tombstone if requested.
if args.UseRangeTombstone {
if cArgs.Header.Txn != nil {
return result.Result{}, ErrTransactionUnsupported
Expand All @@ -123,112 +109,158 @@ func DeleteRange(
return result.Result{}, errors.AssertionFailedf(
"ReturnKeys can't be used with range tombstones")
}
}

hasPredicate := args.Predicates != (kvpb.DeleteRangePredicates{})
if hasPredicate && args.IdempotentTombstone {
return result.Result{}, errors.AssertionFailedf("IdempotentTombstone not compatible with Predicates")
}

desc := cArgs.EvalCtx.Desc()

maybeUpdateGCHint := func(res *result.Result) error {
if !args.UpdateRangeDeleteGCHint {
return nil
}
sl := MakeStateLoader(cArgs.EvalCtx)
hint, err := sl.LoadGCHint(ctx, readWriter)
if err != nil {
return err
}

updated := hint.ScheduleGCFor(h.Timestamp)
// If the range tombstone covers the whole Range key span, update the
// corresponding timestamp in GCHint to enable ClearRange optimization.
if args.Key.Equal(desc.StartKey.AsRawKey()) && args.EndKey.Equal(desc.EndKey.AsRawKey()) {
// NB: don't swap the order, we want to call the method unconditionally.
updated = hint.ForwardLatestRangeDeleteTimestamp(h.Timestamp) || updated
}
if !updated {
return nil
}

if err := sl.SetGCHint(ctx, readWriter, cArgs.Stats, hint); err != nil {
return err
}
res.Replicated.State = &kvserverpb.ReplicaState{
GCHint: hint,
}
switch {
case args.UseRangeTombstone && !hasPredicate:
return deleteRangeIdempotent(ctx, readWriter, cArgs)
case hasPredicate:
return deleteRangeWithPredicate(ctx, readWriter, cArgs, reply)
default:
return deleteRangeTransactional(ctx, readWriter, cArgs, reply)
}
}

func deleteRangeIdempotent(
ctx context.Context, readWriter storage.ReadWriter, cArgs CommandArgs,
) (result.Result, error) {
args := cArgs.Args.(*kvpb.DeleteRangeRequest)
h := cArgs.Header
desc := cArgs.EvalCtx.Desc()

maybeUpdateGCHint := func(res *result.Result) error {
if !args.UpdateRangeDeleteGCHint {
return nil
}

leftPeekBound, rightPeekBound := rangeTombstonePeekBounds(
args.Key, args.EndKey, desc.StartKey.AsRawKey(), desc.EndKey.AsRawKey())
maxLockConflicts := storage.MaxConflictsPerLockConflictError.Get(&cArgs.EvalCtx.ClusterSettings().SV)
targetLockConflictBytes := storage.TargetBytesPerLockConflictError.Get(&cArgs.EvalCtx.ClusterSettings().SV)

// If no predicate parameters are passed, use the fast path. If we're
// deleting the entire Raft range, use an even faster path that avoids a
// point key scan to update MVCC stats.
if args.Predicates == (kvpb.DeleteRangePredicates{}) {
var statsCovered *enginepb.MVCCStats
if args.Key.Equal(desc.StartKey.AsRawKey()) && args.EndKey.Equal(desc.EndKey.AsRawKey()) {
// NB: We take the fast path even if stats are estimates, because the
// slow path will likely end up with similarly poor stats anyway.
s := cArgs.EvalCtx.GetMVCCStats()
statsCovered = &s
}
if err := storage.MVCCDeleteRangeUsingTombstone(ctx, readWriter, cArgs.Stats,
args.Key, args.EndKey, h.Timestamp, cArgs.Now, leftPeekBound, rightPeekBound,
args.IdempotentTombstone, maxLockConflicts, targetLockConflictBytes, statsCovered); err != nil {
return result.Result{}, err
}
var res result.Result
err := maybeUpdateGCHint(&res)
return res, err
sl := MakeStateLoader(cArgs.EvalCtx)
hint, err := sl.LoadGCHint(ctx, readWriter)
if err != nil {
return err
}

if h.MaxSpanRequestKeys == 0 {
// In production, MaxSpanRequestKeys must be greater than zero to ensure
// the DistSender serializes predicate based DeleteRange requests across
// ranges. This ensures that only one resumeSpan gets returned to the
// client.
//
// Also, note that DeleteRangeUsingTombstone requests pass the isAlone
// flag in roachpb.api.proto, ensuring the requests do not intermingle with
// other types of requests, preventing further resume span muddling.
return result.Result{}, errors.AssertionFailedf(
"MaxSpanRequestKeys must be greater than zero when using predicated based DeleteRange")
updated := hint.ScheduleGCFor(h.Timestamp)
// If the range tombstone covers the whole Range key span, update the
// corresponding timestamp in GCHint to enable ClearRange optimization.
if args.Key.Equal(desc.StartKey.AsRawKey()) && args.EndKey.Equal(desc.EndKey.AsRawKey()) {
// NB: don't swap the order, we want to call the method unconditionally.
updated = hint.ForwardLatestRangeDeleteTimestamp(h.Timestamp) || updated
}
if args.IdempotentTombstone {
return result.Result{}, errors.AssertionFailedf(
"IdempotentTombstone not compatible with Predicates")
}
// TODO (msbutler): Tune the threshold once DeleteRange and DeleteRangeUsingTombstone have
// been further optimized.
defaultRangeTombstoneThreshold := int64(64)
resumeSpan, err := storage.MVCCPredicateDeleteRange(ctx, readWriter, cArgs.Stats,
args.Key, args.EndKey, h.Timestamp, cArgs.Now, leftPeekBound, rightPeekBound,
args.Predicates, h.MaxSpanRequestKeys, maxDeleteRangeBatchBytes,
defaultRangeTombstoneThreshold, maxLockConflicts, targetLockConflictBytes)
if err != nil {
return result.Result{}, err
if !updated {
return nil
}

if resumeSpan != nil {
reply.ResumeSpan = resumeSpan
reply.ResumeReason = kvpb.RESUME_KEY_LIMIT

// Note: While MVCCPredicateDeleteRange _could_ return reply.NumKeys, as
// the number of keys iterated through, doing so could lead to a
// significant performance drawback. The DistSender would have used
// NumKeys to subtract the number of keys processed by one range from the
// MaxSpanRequestKeys limit given to the next range. In this case, that's
// specifically not what we want, because this request does not use the
// normal meaning of MaxSpanRequestKeys (i.e. number of keys to return).
// Here, MaxSpanRequest keys is used to limit the number of tombstones
// written. Thus, setting NumKeys would just reduce the limit available to
// the next range for no good reason.
if err := sl.SetGCHint(ctx, readWriter, cArgs.Stats, hint); err != nil {
return err
}
// Return result is always empty, since the reply is populated into the
// passed in resp pointer.
return result.Result{}, nil
res.Replicated.State = &kvserverpb.ReplicaState{
GCHint: hint,
}
return nil
}

leftPeekBound, rightPeekBound := rangeTombstonePeekBounds(
args.Key, args.EndKey, desc.StartKey.AsRawKey(), desc.EndKey.AsRawKey())
maxLockConflicts := storage.MaxConflictsPerLockConflictError.Get(&cArgs.EvalCtx.ClusterSettings().SV)
targetLockConflictBytes := storage.TargetBytesPerLockConflictError.Get(&cArgs.EvalCtx.ClusterSettings().SV)

// If no predicate parameters are passed, use the fast path. If we're
// deleting the entire Raft range, use an even faster path that avoids a
// point key scan to update MVCC stats.
var statsCovered *enginepb.MVCCStats
if args.Key.Equal(desc.StartKey.AsRawKey()) && args.EndKey.Equal(desc.EndKey.AsRawKey()) {
// NB: We take the fast path even if stats are estimates, because the
// slow path will likely end up with similarly poor stats anyway.
s := cArgs.EvalCtx.GetMVCCStats()
statsCovered = &s
}
if err := storage.MVCCDeleteRangeUsingTombstone(ctx, readWriter, cArgs.Stats,
args.Key, args.EndKey, h.Timestamp, cArgs.Now, leftPeekBound, rightPeekBound,
args.IdempotentTombstone, maxLockConflicts, targetLockConflictBytes, statsCovered); err != nil {
return result.Result{}, err
}

var res result.Result
err := maybeUpdateGCHint(&res)
return res, err
}

func deleteRangeWithPredicate(
ctx context.Context, readWriter storage.ReadWriter, cArgs CommandArgs, resp kvpb.Response,
) (result.Result, error) {
args := cArgs.Args.(*kvpb.DeleteRangeRequest)
h := cArgs.Header
reply := resp.(*kvpb.DeleteRangeResponse)

if args.Predicates != (kvpb.DeleteRangePredicates{}) && !args.UseRangeTombstone {
// This ensures predicate based DeleteRange piggybacks on the version gate,
// roachpb api flags, and latch declarations used by the UseRangeTombstone.
return result.Result{}, errors.AssertionFailedf(
"UseRangeTombstones must be passed with predicate based Delete Range")
}

if h.MaxSpanRequestKeys == 0 {
// In production, MaxSpanRequestKeys must be greater than zero to ensure
// the DistSender serializes predicate based DeleteRange requests across
// ranges. This ensures that only one resumeSpan gets returned to the
// client.
//
// Also, note that DeleteRangeUsingTombstone requests pass the isAlone
// flag in roachpb.api.proto, ensuring the requests do not intermingle with
// other types of requests, preventing further resume span muddling.
return result.Result{}, errors.AssertionFailedf(
"MaxSpanRequestKeys must be greater than zero when using predicated based DeleteRange")
}
if args.Predicates.ImportEpoch > 0 && !args.Predicates.StartTime.IsEmpty() {
return result.Result{}, errors.AssertionFailedf(
"DeleteRangePredicate should not have both non-zero ImportEpoch and non-empty StartTime")
}

desc := cArgs.EvalCtx.Desc()

leftPeekBound, rightPeekBound := rangeTombstonePeekBounds(
args.Key, args.EndKey, desc.StartKey.AsRawKey(), desc.EndKey.AsRawKey())
maxLockConflicts := storage.MaxConflictsPerLockConflictError.Get(&cArgs.EvalCtx.ClusterSettings().SV)
targetLockConflictBytes := storage.TargetBytesPerLockConflictError.Get(&cArgs.EvalCtx.ClusterSettings().SV)

// TODO (msbutler): Tune the threshold once DeleteRange and DeleteRangeUsingTombstone have
// been further optimized.
defaultRangeTombstoneThreshold := int64(64)
resumeSpan, err := storage.MVCCPredicateDeleteRange(ctx, readWriter, cArgs.Stats,
args.Key, args.EndKey, h.Timestamp, cArgs.Now, leftPeekBound, rightPeekBound,
args.Predicates, h.MaxSpanRequestKeys, maxDeleteRangeBatchBytes,
defaultRangeTombstoneThreshold, maxLockConflicts, targetLockConflictBytes)
if err != nil {
return result.Result{}, err
}

if resumeSpan != nil {
// Note: While MVCCPredicateDeleteRange _could_ return reply.NumKeys, as
// the number of keys iterated through, doing so could lead to a
// significant performance drawback. The DistSender would have used
// NumKeys to subtract the number of keys processed by one range from the
// MaxSpanRequestKeys limit given to the next range. In this case, that's
// specifically not what we want, because this request does not use the
// normal meaning of MaxSpanRequestKeys (i.e. number of keys to return).
// Here, MaxSpanRequest keys is used to limit the number of tombstones
// written. Thus, setting NumKeys would just reduce the limit available to
// the next range for no good reason.
reply.ResumeSpan = resumeSpan
reply.ResumeReason = kvpb.RESUME_KEY_LIMIT
}
return result.Result{}, nil
}

func deleteRangeTransactional(
ctx context.Context, readWriter storage.ReadWriter, cArgs CommandArgs, resp kvpb.Response,
) (result.Result, error) {
args := cArgs.Args.(*kvpb.DeleteRangeRequest)
h := cArgs.Header
reply := resp.(*kvpb.DeleteRangeResponse)
var timestamp hlc.Timestamp
if !args.Inline {
timestamp = h.Timestamp
Expand Down

0 comments on commit 23eecbc

Please sign in to comment.