Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

batcheval: refactor delete range #137803

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
250 changes: 141 additions & 109 deletions pkg/kv/kvserver/batcheval/cmd_delete_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,29 +89,15 @@ func DeleteRange(
ctx context.Context, readWriter storage.ReadWriter, cArgs CommandArgs, resp kvpb.Response,
) (result.Result, error) {
args := cArgs.Args.(*kvpb.DeleteRangeRequest)
h := cArgs.Header
reply := resp.(*kvpb.DeleteRangeResponse)

if args.Predicates != (kvpb.DeleteRangePredicates{}) && !args.UseRangeTombstone {
// This ensures predicate based DeleteRange piggybacks on the version gate,
// roachpb api flags, and latch declarations used by the UseRangeTombstone.
return result.Result{}, errors.AssertionFailedf(
"UseRangeTombstones must be passed with predicate based Delete Range")
}

if args.UpdateRangeDeleteGCHint && !args.UseRangeTombstone {
// Check for prerequisite for gc hint. If it doesn't hold, this is incorrect
// usage of hint.
return result.Result{}, errors.AssertionFailedf(
"GCRangeHint must only be used together with UseRangeTombstone")
}

if args.Predicates.ImportEpoch > 0 && !args.Predicates.StartTime.IsEmpty() {
return result.Result{}, errors.AssertionFailedf(
"DeleteRangePredicate should not have both non-zero ImportEpoch and non-empty StartTime")
}

// Use MVCC range tombstone if requested.
if args.UseRangeTombstone {
if cArgs.Header.Txn != nil {
return result.Result{}, ErrTransactionUnsupported
Expand All @@ -123,112 +109,158 @@ func DeleteRange(
return result.Result{}, errors.AssertionFailedf(
"ReturnKeys can't be used with range tombstones")
}
}

hasPredicate := args.Predicates != (kvpb.DeleteRangePredicates{})
if hasPredicate && args.IdempotentTombstone {
return result.Result{}, errors.AssertionFailedf("IdempotentTombstone not compatible with Predicates")
}

desc := cArgs.EvalCtx.Desc()

maybeUpdateGCHint := func(res *result.Result) error {
if !args.UpdateRangeDeleteGCHint {
return nil
}
sl := MakeStateLoader(cArgs.EvalCtx)
hint, err := sl.LoadGCHint(ctx, readWriter)
if err != nil {
return err
}

updated := hint.ScheduleGCFor(h.Timestamp)
// If the range tombstone covers the whole Range key span, update the
// corresponding timestamp in GCHint to enable ClearRange optimization.
if args.Key.Equal(desc.StartKey.AsRawKey()) && args.EndKey.Equal(desc.EndKey.AsRawKey()) {
// NB: don't swap the order, we want to call the method unconditionally.
updated = hint.ForwardLatestRangeDeleteTimestamp(h.Timestamp) || updated
}
if !updated {
return nil
}

if err := sl.SetGCHint(ctx, readWriter, cArgs.Stats, hint); err != nil {
return err
}
res.Replicated.State = &kvserverpb.ReplicaState{
GCHint: hint,
}
switch {
case hasPredicate:
return deleteRangeWithPredicate(ctx, readWriter, cArgs, reply)
case args.UseRangeTombstone:
return deleteRangeUsingTombstone(ctx, readWriter, cArgs)
default:
return deleteRangeTransactional(ctx, readWriter, cArgs, reply)
}
}

func deleteRangeUsingTombstone(
ctx context.Context, readWriter storage.ReadWriter, cArgs CommandArgs,
) (result.Result, error) {
args := cArgs.Args.(*kvpb.DeleteRangeRequest)
h := cArgs.Header
desc := cArgs.EvalCtx.Desc()

maybeUpdateGCHint := func(res *result.Result) error {
if !args.UpdateRangeDeleteGCHint {
return nil
}

leftPeekBound, rightPeekBound := rangeTombstonePeekBounds(
args.Key, args.EndKey, desc.StartKey.AsRawKey(), desc.EndKey.AsRawKey())
maxLockConflicts := storage.MaxConflictsPerLockConflictError.Get(&cArgs.EvalCtx.ClusterSettings().SV)
targetLockConflictBytes := storage.TargetBytesPerLockConflictError.Get(&cArgs.EvalCtx.ClusterSettings().SV)

// If no predicate parameters are passed, use the fast path. If we're
// deleting the entire Raft range, use an even faster path that avoids a
// point key scan to update MVCC stats.
if args.Predicates == (kvpb.DeleteRangePredicates{}) {
var statsCovered *enginepb.MVCCStats
if args.Key.Equal(desc.StartKey.AsRawKey()) && args.EndKey.Equal(desc.EndKey.AsRawKey()) {
// NB: We take the fast path even if stats are estimates, because the
// slow path will likely end up with similarly poor stats anyway.
s := cArgs.EvalCtx.GetMVCCStats()
statsCovered = &s
}
if err := storage.MVCCDeleteRangeUsingTombstone(ctx, readWriter, cArgs.Stats,
args.Key, args.EndKey, h.Timestamp, cArgs.Now, leftPeekBound, rightPeekBound,
args.IdempotentTombstone, maxLockConflicts, targetLockConflictBytes, statsCovered); err != nil {
return result.Result{}, err
}
var res result.Result
err := maybeUpdateGCHint(&res)
return res, err
sl := MakeStateLoader(cArgs.EvalCtx)
hint, err := sl.LoadGCHint(ctx, readWriter)
if err != nil {
return err
}

if h.MaxSpanRequestKeys == 0 {
// In production, MaxSpanRequestKeys must be greater than zero to ensure
// the DistSender serializes predicate based DeleteRange requests across
// ranges. This ensures that only one resumeSpan gets returned to the
// client.
//
// Also, note that DeleteRangeUsingTombstone requests pass the isAlone
// flag in roachpb.api.proto, ensuring the requests do not intermingle with
// other types of requests, preventing further resume span muddling.
return result.Result{}, errors.AssertionFailedf(
"MaxSpanRequestKeys must be greater than zero when using predicated based DeleteRange")
updated := hint.ScheduleGCFor(h.Timestamp)
// If the range tombstone covers the whole Range key span, update the
// corresponding timestamp in GCHint to enable ClearRange optimization.
if args.Key.Equal(desc.StartKey.AsRawKey()) && args.EndKey.Equal(desc.EndKey.AsRawKey()) {
// NB: don't swap the order, we want to call the method unconditionally.
updated = hint.ForwardLatestRangeDeleteTimestamp(h.Timestamp) || updated
}
if args.IdempotentTombstone {
return result.Result{}, errors.AssertionFailedf(
"IdempotentTombstone not compatible with Predicates")
}
// TODO (msbutler): Tune the threshold once DeleteRange and DeleteRangeUsingTombstone have
// been further optimized.
defaultRangeTombstoneThreshold := int64(64)
resumeSpan, err := storage.MVCCPredicateDeleteRange(ctx, readWriter, cArgs.Stats,
args.Key, args.EndKey, h.Timestamp, cArgs.Now, leftPeekBound, rightPeekBound,
args.Predicates, h.MaxSpanRequestKeys, maxDeleteRangeBatchBytes,
defaultRangeTombstoneThreshold, maxLockConflicts, targetLockConflictBytes)
if err != nil {
return result.Result{}, err
if !updated {
return nil
}

if resumeSpan != nil {
reply.ResumeSpan = resumeSpan
reply.ResumeReason = kvpb.RESUME_KEY_LIMIT

// Note: While MVCCPredicateDeleteRange _could_ return reply.NumKeys, as
// the number of keys iterated through, doing so could lead to a
// significant performance drawback. The DistSender would have used
// NumKeys to subtract the number of keys processed by one range from the
// MaxSpanRequestKeys limit given to the next range. In this case, that's
// specifically not what we want, because this request does not use the
// normal meaning of MaxSpanRequestKeys (i.e. number of keys to return).
// Here, MaxSpanRequest keys is used to limit the number of tombstones
// written. Thus, setting NumKeys would just reduce the limit available to
// the next range for no good reason.
if err := sl.SetGCHint(ctx, readWriter, cArgs.Stats, hint); err != nil {
return err
}
// Return result is always empty, since the reply is populated into the
// passed in resp pointer.
return result.Result{}, nil
res.Replicated.State = &kvserverpb.ReplicaState{
GCHint: hint,
}
return nil
}

leftPeekBound, rightPeekBound := rangeTombstonePeekBounds(
args.Key, args.EndKey, desc.StartKey.AsRawKey(), desc.EndKey.AsRawKey())
maxLockConflicts := storage.MaxConflictsPerLockConflictError.Get(&cArgs.EvalCtx.ClusterSettings().SV)
targetLockConflictBytes := storage.TargetBytesPerLockConflictError.Get(&cArgs.EvalCtx.ClusterSettings().SV)

// If no predicate parameters are passed, use the fast path. If we're
// deleting the entire Raft range, use an even faster path that avoids a
// point key scan to update MVCC stats.
var statsCovered *enginepb.MVCCStats
if args.Key.Equal(desc.StartKey.AsRawKey()) && args.EndKey.Equal(desc.EndKey.AsRawKey()) {
// NB: We take the fast path even if stats are estimates, because the
// slow path will likely end up with similarly poor stats anyway.
s := cArgs.EvalCtx.GetMVCCStats()
statsCovered = &s
}
if err := storage.MVCCDeleteRangeUsingTombstone(ctx, readWriter, cArgs.Stats,
args.Key, args.EndKey, h.Timestamp, cArgs.Now, leftPeekBound, rightPeekBound,
args.IdempotentTombstone, maxLockConflicts, targetLockConflictBytes, statsCovered); err != nil {
return result.Result{}, err
}

var res result.Result
err := maybeUpdateGCHint(&res)
return res, err
}

func deleteRangeWithPredicate(
ctx context.Context, readWriter storage.ReadWriter, cArgs CommandArgs, resp kvpb.Response,
) (result.Result, error) {
args := cArgs.Args.(*kvpb.DeleteRangeRequest)
h := cArgs.Header
reply := resp.(*kvpb.DeleteRangeResponse)

if args.Predicates != (kvpb.DeleteRangePredicates{}) && !args.UseRangeTombstone {
// This ensures predicate based DeleteRange piggybacks on the version gate,
// roachpb api flags, and latch declarations used by the UseRangeTombstone.
return result.Result{}, errors.AssertionFailedf(
"UseRangeTombstones must be passed with predicate based Delete Range")
}

if h.MaxSpanRequestKeys == 0 {
// In production, MaxSpanRequestKeys must be greater than zero to ensure
// the DistSender serializes predicate based DeleteRange requests across
// ranges. This ensures that only one resumeSpan gets returned to the
// client.
//
// Also, note that DeleteRangeUsingTombstone requests pass the isAlone
// flag in roachpb.api.proto, ensuring the requests do not intermingle with
// other types of requests, preventing further resume span muddling.
return result.Result{}, errors.AssertionFailedf(
"MaxSpanRequestKeys must be greater than zero when using predicated based DeleteRange")
}
if args.Predicates.ImportEpoch > 0 && !args.Predicates.StartTime.IsEmpty() {
return result.Result{}, errors.AssertionFailedf(
"DeleteRangePredicate should not have both non-zero ImportEpoch and non-empty StartTime")
}

desc := cArgs.EvalCtx.Desc()

leftPeekBound, rightPeekBound := rangeTombstonePeekBounds(
args.Key, args.EndKey, desc.StartKey.AsRawKey(), desc.EndKey.AsRawKey())
maxLockConflicts := storage.MaxConflictsPerLockConflictError.Get(&cArgs.EvalCtx.ClusterSettings().SV)
targetLockConflictBytes := storage.TargetBytesPerLockConflictError.Get(&cArgs.EvalCtx.ClusterSettings().SV)

// TODO (msbutler): Tune the threshold once DeleteRange and DeleteRangeUsingTombstone have
// been further optimized.
defaultRangeTombstoneThreshold := int64(64)
resumeSpan, err := storage.MVCCPredicateDeleteRange(ctx, readWriter, cArgs.Stats,
args.Key, args.EndKey, h.Timestamp, cArgs.Now, leftPeekBound, rightPeekBound,
args.Predicates, h.MaxSpanRequestKeys, maxDeleteRangeBatchBytes,
defaultRangeTombstoneThreshold, maxLockConflicts, targetLockConflictBytes)
if err != nil {
return result.Result{}, err
}

if resumeSpan != nil {
// Note: While MVCCPredicateDeleteRange _could_ return reply.NumKeys, as
// the number of keys iterated through, doing so could lead to a
// significant performance drawback. The DistSender would have used
// NumKeys to subtract the number of keys processed by one range from the
// MaxSpanRequestKeys limit given to the next range. In this case, that's
// specifically not what we want, because this request does not use the
// normal meaning of MaxSpanRequestKeys (i.e. number of keys to return).
// Here, MaxSpanRequest keys is used to limit the number of tombstones
// written. Thus, setting NumKeys would just reduce the limit available to
// the next range for no good reason.
reply.ResumeSpan = resumeSpan
reply.ResumeReason = kvpb.RESUME_KEY_LIMIT
}
return result.Result{}, nil
}

func deleteRangeTransactional(
ctx context.Context, readWriter storage.ReadWriter, cArgs CommandArgs, resp kvpb.Response,
) (result.Result, error) {
args := cArgs.Args.(*kvpb.DeleteRangeRequest)
h := cArgs.Header
reply := resp.(*kvpb.DeleteRangeResponse)
var timestamp hlc.Timestamp
if !args.Inline {
timestamp = h.Timestamp
Expand Down
Loading