Skip to content

Commit

Permalink
kv: use expiration-based lease for right-hand side of range split
Browse files Browse the repository at this point in the history
Fixes cockroachdb#130112.

This commit converts leader leases into expiration-based leases during
the state transfer during a range split. A leader lease is tied to a
specific raft leadership term within a specific raft group. During a
range split, we initialize a new raft group on the right-hand side, so a
leader lease term from the left-hand side is unusable. Once the
right-hand side elects a leader and collocates the lease and leader, it
can promote the expiration-based lease back to a leader lease.

Release note: None
  • Loading branch information
nvanbenschoten committed Sep 11, 2024
1 parent 6d09d13 commit 9c942a8
Show file tree
Hide file tree
Showing 6 changed files with 192 additions and 26 deletions.
1 change: 1 addition & 0 deletions pkg/kv/kvserver/batcheval/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ go_test(
"//pkg/kv/kvserver/readsummary",
"//pkg/kv/kvserver/readsummary/rspb",
"//pkg/kv/kvserver/spanset",
"//pkg/kv/kvserver/stateloader",
"//pkg/roachpb",
"//pkg/security/securityassets",
"//pkg/security/securitytest",
Expand Down
26 changes: 22 additions & 4 deletions pkg/kv/kvserver/batcheval/cmd_end_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -1280,15 +1280,33 @@ func splitTriggerHelper(
log.Fatalf(ctx, "LHS of split has no lease")
}

replica, found := split.RightDesc.GetReplicaDescriptor(leftLease.Replica.StoreID)
if !found {
// Copy the lease from the left-hand side of the split over to the
// right-hand side so that it can immediately start serving requests.
// When doing so, we need to make a few modifications.
rightLease := leftLease
// Rebind the lease to the existing leaseholder store's replica from the
// right-hand side's descriptor.
var ok bool
rightLease.Replica, ok = split.RightDesc.GetReplicaDescriptor(leftLease.Replica.StoreID)
if !ok {
return enginepb.MVCCStats{}, result.Result{}, errors.Errorf(
"pre-split lease holder %+v not found in post-split descriptor %+v",
leftLease.Replica, split.RightDesc,
)
}
rightLease := leftLease
rightLease.Replica = replica
// Convert leader leases into expiration-based leases. A leader lease is
// tied to a specific raft leadership term within a specific raft group.
// During a range split, we initialize a new raft group on the right-hand
// side, so a leader lease term from the left-hand side is unusable. Once
// the right-hand side elects a leader and collocates the lease and leader,
// it can promote the expiration-based lease back to a leader lease.
if rightLease.Type() == roachpb.LeaseLeader {
exp := rec.Clock().Now().Add(int64(rec.GetRangeLeaseDuration()), 0)
rightLease.Expiration = &exp
rightLease.Term = 0
rightLease.MinExpiration = hlc.Timestamp{}
}

gcThreshold, err := sl.LoadGCThreshold(ctx, batch)
if err != nil {
return enginepb.MVCCStats{}, result.Result{}, errors.Wrap(err, "unable to load GCThreshold")
Expand Down
129 changes: 129 additions & 0 deletions pkg/kv/kvserver/batcheval/cmd_end_transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,27 @@ import (
"context"
"fmt"
"regexp"
"slices"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/abortspan"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/isolation"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -1678,3 +1684,126 @@ func TestResolveLocalLocks(t *testing.T) {
})
}
}

// TestSplitTriggerWritesInitialReplicaState tests that a split trigger sets up
// the split's right-hand side range by writing the initial replica state into
// the evaluation write batch.
func TestSplitTriggerWritesInitialReplicaState(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
st := cluster.MakeTestingClusterSettings()
version := st.Version.LatestVersion()
manual := timeutil.NewManualTime(timeutil.Unix(0, 10))
clock := hlc.NewClockForTesting(manual)

db := storage.NewDefaultInMemForTesting()
defer db.Close()
batch := db.NewBatch()
defer batch.Close()

rangeLeaseDuration := 99 * time.Nanosecond
startKey := roachpb.Key("0000")
endKey := roachpb.Key("9999")
desc := roachpb.RangeDescriptor{
RangeID: 99,
StartKey: roachpb.RKey(startKey),
EndKey: roachpb.RKey(endKey),
}
desc.AddReplica(1, 1, roachpb.VOTER_FULL)
lease := roachpb.Lease{
Replica: desc.InternalReplicas[0],
// The range was using a leader lease. The split will need to swap this to
// an expiration-based lease.
Term: 10,
MinExpiration: hlc.Timestamp{WallTime: 100},
}
gcThreshold := hlc.Timestamp{WallTime: 4}
lastGCTimestamp := hlc.Timestamp{WallTime: 5}
gcHint := roachpb.GCHint{GCTimestamp: gcThreshold}
abortSpanTxnID := uuid.MakeV4()
as := abortspan.New(desc.RangeID)
sl := stateloader.Make(desc.RangeID)
rec := (&MockEvalCtx{
ClusterSettings: st,
Desc: &desc,
Clock: clock,
AbortSpan: as,
LastReplicaGCTimestamp: lastGCTimestamp,
RangeLeaseDuration: rangeLeaseDuration,
}).EvalContext()

splitKey := roachpb.RKey("5555")
leftDesc, rightDesc := desc, desc
leftDesc.EndKey = splitKey
rightDesc.RangeID++
rightDesc.StartKey = splitKey
rightDesc.InternalReplicas = slices.Clone(leftDesc.InternalReplicas)
rightDesc.InternalReplicas[0].ReplicaID++
split := &roachpb.SplitTrigger{
LeftDesc: leftDesc,
RightDesc: rightDesc,
}

// Write the range state that will be consulted and copied during the split.
err := as.Put(ctx, batch, nil, abortSpanTxnID, &roachpb.AbortSpanEntry{})
require.NoError(t, err)
err = sl.SetLease(ctx, batch, nil, lease)
require.NoError(t, err)
err = sl.SetGCThreshold(ctx, batch, nil, &gcThreshold)
require.NoError(t, err)
err = sl.SetGCHint(ctx, batch, nil, &gcHint)
require.NoError(t, err)
err = sl.SetVersion(ctx, batch, nil, &version)
require.NoError(t, err)

// Run the split trigger, which is normally run as a subset of EndTxn request
// evaluation.
_, _, err = splitTrigger(ctx, rec, batch, enginepb.MVCCStats{}, split, hlc.Timestamp{})
require.NoError(t, err)

// Verify that range state was migrated to the right-hand side properly.
asRight := abortspan.New(rightDesc.RangeID)
slRight := stateloader.Make(rightDesc.RangeID)
// The abort span should have been transferred over.
ok, err := asRight.Get(ctx, batch, abortSpanTxnID, &roachpb.AbortSpanEntry{})
require.NoError(t, err)
require.True(t, ok)
// The lease should be present, pointing at the replica in the right-hand side
// range, and switched to an expiration-based lease.
expLease := roachpb.Lease{
Replica: rightDesc.InternalReplicas[0],
Expiration: &hlc.Timestamp{WallTime: manual.Now().Add(rangeLeaseDuration).UnixNano()},
}
loadedLease, err := slRight.LoadLease(ctx, batch)
require.NoError(t, err)
require.Equal(t, expLease, loadedLease)
loadedGCThreshold, err := slRight.LoadGCThreshold(ctx, batch)
require.NoError(t, err)
require.NotNil(t, loadedGCThreshold)
require.Equal(t, gcThreshold, *loadedGCThreshold)
loadedGCHint, err := slRight.LoadGCHint(ctx, batch)
require.NoError(t, err)
require.NotNil(t, loadedGCHint)
require.Equal(t, gcHint, *loadedGCHint)
expTruncState := kvserverpb.RaftTruncatedState{
Term: stateloader.RaftInitialLogTerm,
Index: stateloader.RaftInitialLogIndex,
}
loadedTruncState, err := slRight.LoadRaftTruncatedState(ctx, batch)
require.NoError(t, err)
require.Equal(t, expTruncState, loadedTruncState)
loadedVersion, err := slRight.LoadVersion(ctx, batch)
require.NoError(t, err)
require.Equal(t, version, loadedVersion)
expAppliedState := kvserverpb.RangeAppliedState{
RaftAppliedIndexTerm: stateloader.RaftInitialLogTerm,
RaftAppliedIndex: stateloader.RaftInitialLogIndex,
}
loadedAppliedState, err := slRight.LoadRangeAppliedState(ctx, batch)
require.NoError(t, err)
require.NotNil(t, loadedAppliedState)
loadedAppliedState.RangeStats = kvserverpb.MVCCPersistentStats{} // ignore
require.Equal(t, &expAppliedState, loadedAppliedState)
}
51 changes: 29 additions & 22 deletions pkg/kv/kvserver/batcheval/eval_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"context"
"fmt"
"math"
"time"

"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/abortspan"
Expand Down Expand Up @@ -106,6 +107,7 @@ type EvalContext interface {
ExcludeDataFromBackup(context.Context, roachpb.Span) (bool, error)
GetLastReplicaGCTimestamp(context.Context) (hlc.Timestamp, error)
GetLease() (roachpb.Lease, roachpb.Lease)
GetRangeLeaseDuration() time.Duration
GetRangeInfo(context.Context) roachpb.RangeInfo

// GetCurrentReadSummary returns a new ReadSummary reflecting all reads
Expand Down Expand Up @@ -165,27 +167,29 @@ type ImmutableEvalContext interface {
// MockEvalCtx is a dummy implementation of EvalContext for testing purposes.
// For technical reasons, the interface is implemented by a wrapper .EvalContext().
type MockEvalCtx struct {
ClusterSettings *cluster.Settings
Desc *roachpb.RangeDescriptor
StoreID roachpb.StoreID
NodeID roachpb.NodeID
Clock *hlc.Clock
Stats enginepb.MVCCStats
QPS float64
CPU float64
AbortSpan *abortspan.AbortSpan
GCThreshold hlc.Timestamp
Term kvpb.RaftTerm
FirstIndex kvpb.RaftIndex
CanCreateTxnRecordFn func() (bool, kvpb.TransactionAbortedReason)
MinTxnCommitTSFn func() hlc.Timestamp
Lease roachpb.Lease
CurrentReadSummary rspb.ReadSummary
ClosedTimestamp hlc.Timestamp
RevokedLeaseSeq roachpb.LeaseSequence
MaxBytes int64
ApproxDiskBytes uint64
EvalKnobs kvserverbase.BatchEvalTestingKnobs
ClusterSettings *cluster.Settings
Desc *roachpb.RangeDescriptor
StoreID roachpb.StoreID
NodeID roachpb.NodeID
Clock *hlc.Clock
Stats enginepb.MVCCStats
QPS float64
CPU float64
AbortSpan *abortspan.AbortSpan
GCThreshold hlc.Timestamp
Term kvpb.RaftTerm
FirstIndex kvpb.RaftIndex
CanCreateTxnRecordFn func() (bool, kvpb.TransactionAbortedReason)
MinTxnCommitTSFn func() hlc.Timestamp
LastReplicaGCTimestamp hlc.Timestamp
Lease roachpb.Lease
RangeLeaseDuration time.Duration
CurrentReadSummary rspb.ReadSummary
ClosedTimestamp hlc.Timestamp
RevokedLeaseSeq roachpb.LeaseSequence
MaxBytes int64
ApproxDiskBytes uint64
EvalKnobs kvserverbase.BatchEvalTestingKnobs
}

// EvalContext returns the MockEvalCtx as an EvalContext. It will reflect future
Expand Down Expand Up @@ -280,11 +284,14 @@ func (m *mockEvalCtxImpl) ExcludeDataFromBackup(context.Context, roachpb.Span) (
return false, nil
}
func (m *mockEvalCtxImpl) GetLastReplicaGCTimestamp(context.Context) (hlc.Timestamp, error) {
panic("unimplemented")
return m.LastReplicaGCTimestamp, nil
}
func (m *mockEvalCtxImpl) GetLease() (roachpb.Lease, roachpb.Lease) {
return m.Lease, roachpb.Lease{}
}
func (m *mockEvalCtxImpl) GetRangeLeaseDuration() time.Duration {
return m.RangeLeaseDuration
}
func (m *mockEvalCtxImpl) GetRangeInfo(ctx context.Context) roachpb.RangeInfo {
return roachpb.RangeInfo{Desc: *m.Desc(), Lease: m.Lease}
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/kv/kvserver/replica_eval_context_span.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package kvserver

import (
"context"
"time"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
Expand Down Expand Up @@ -207,6 +208,11 @@ func (rec SpanSetReplicaEvalContext) GetLease() (roachpb.Lease, roachpb.Lease) {
return rec.i.GetLease()
}

// GetRangeLeaseDuration is part of the EvalContext interface.
func (rec SpanSetReplicaEvalContext) GetRangeLeaseDuration() time.Duration {
return rec.i.GetRangeLeaseDuration()
}

// GetRangeInfo is part of the EvalContext interface.
func (rec SpanSetReplicaEvalContext) GetRangeInfo(ctx context.Context) roachpb.RangeInfo {
// Do the latching checks and ignore the results.
Expand Down
5 changes: 5 additions & 0 deletions pkg/kv/kvserver/replica_range_lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -753,6 +753,11 @@ func (r *Replica) leaseSettings(ctx context.Context) leases.Settings {
}
}

// GetRangeLeaseDuration is part of the EvalContext interface.
func (r *Replica) GetRangeLeaseDuration() time.Duration {
return r.store.cfg.RangeLeaseDuration
}

// requiresExpirationLeaseRLocked returns whether this range unconditionally
// uses an expiration-based lease. Ranges located before or including the node
// liveness table must always use expiration leases to avoid circular
Expand Down

0 comments on commit 9c942a8

Please sign in to comment.