From e9dfdad587f3c7584c770aa8f70d11070748f721 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Wed, 1 May 2024 21:38:50 +0000 Subject: [PATCH 1/6] kv: assert against remote lease transfers This commit adds a check that a replica does not perform a lease transfer if it does not own the previous lease. This allows us to make a stronger assumption a layer down. Epic: None Release note: None --- pkg/kv/kvserver/replica_range_lease.go | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/pkg/kv/kvserver/replica_range_lease.go b/pkg/kv/kvserver/replica_range_lease.go index f2924ec801be..d472ad5f41a4 100644 --- a/pkg/kv/kvserver/replica_range_lease.go +++ b/pkg/kv/kvserver/replica_range_lease.go @@ -233,7 +233,6 @@ func (p *pendingLeaseRequest) InitOrJoinRequest( nextLeaseHolder roachpb.ReplicaDescriptor, status kvserverpb.LeaseStatus, startKey roachpb.Key, - transfer bool, bypassSafetyChecks bool, limiter *quotapool.IntPool, ) *leaseRequestHandle { @@ -252,10 +251,20 @@ func (p *pendingLeaseRequest) InitOrJoinRequest( nextLeaseHolder.ReplicaID, nextLease.Replica.ReplicaID)) } - acquisition := !status.Lease.OwnedBy(p.repl.store.StoreID()) - extension := !transfer && !acquisition + // Who owns the previous and next lease? + prevLocal := status.Lease.OwnedBy(p.repl.store.StoreID()) + nextLocal := nextLeaseHolder.StoreID == p.repl.store.StoreID() + + // Assert that the lease acquisition, extension, or transfer is valid. + acquisition := !prevLocal && nextLocal + extension := prevLocal && nextLocal + transfer := prevLocal && !nextLocal + remote := !prevLocal && !nextLocal _ = extension // not used, just documentation + if remote { + log.Fatalf(ctx, "cannot acquire/extend lease for remote replica: %v -> %v", status, nextLeaseHolder) + } if acquisition { // If this is a non-cooperative lease change (i.e. an acquisition), it // is up to us to ensure that Lease.Start is greater than the end time @@ -481,7 +490,7 @@ func (p *pendingLeaseRequest) requestLease( if status.Lease.Type() == roachpb.LeaseEpoch && status.State == kvserverpb.LeaseState_EXPIRED { var err error // If this replica is previous & next lease holder, manually heartbeat to become live. - if status.OwnedBy(nextLeaseHolder.StoreID) && p.repl.store.StoreID() == nextLeaseHolder.StoreID { + if status.OwnedBy(nextLeaseHolder.StoreID) { if err = p.repl.store.cfg.NodeLiveness.Heartbeat(ctx, status.Liveness); err != nil && logFailedHeartbeatOwnLiveness.ShouldLog() { log.Errorf(ctx, "failed to heartbeat own liveness record: %s", err) } @@ -882,7 +891,7 @@ func (r *Replica) requestLeaseLocked( } return r.mu.pendingLeaseRequest.InitOrJoinRequest( ctx, repDesc, status, r.mu.state.Desc.StartKey.AsRawKey(), - false /* transfer */, false /* bypassSafetyChecks */, limiter) + false /* bypassSafetyChecks */, limiter) } // AdminTransferLease transfers the LeaderLease to another replica. Only the @@ -976,7 +985,7 @@ func (r *Replica) AdminTransferLease( } transfer = r.mu.pendingLeaseRequest.InitOrJoinRequest(ctx, nextLeaseHolder, status, - desc.StartKey.AsRawKey(), true /* transfer */, bypassSafetyChecks, nil /* limiter */) + desc.StartKey.AsRawKey(), bypassSafetyChecks, nil /* limiter */) return nil, transfer, nil } From c3ae4ecfbbcd93eeef92d1bc0cc8836fc3a60366 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Thu, 2 May 2024 04:13:31 +0000 Subject: [PATCH 2/6] kv: prevent lease interval regression during expiration-to-epoch promotion Fixes #121480. Fixes #122016. This commit resolves a bug in the expiration-based to epoch-based lease promotion transition, where the lease's effective expiration could be allowed to regress. To prevent this, we detect when such cases are about to occur and synchronously heartbeat the leaseholder's liveness record. This works because the liveness record interval and the expiration-based lease interval are the same, so a synchronous heartbeat ensures that the liveness record has a later expiration than the prior lease by the time the lease promotion goes into effect. The code structure here leaves a lot to be desired, but since we're going to be cleaning up and/or removing a lot of this code soon anyway, I'm prioritizing backportability. This is therefore more targeted and less general than it could be. The resolution here also leaves something to be desired. A nicer fix would be to introduce a minimum_lease_expiration field on epoch-based leases so that we can locally ensure that the expiration does not regress. This is what we plan to do for leader leases in the upcoming release. We don't make this change because it would be require a version gate to avoid replica divergence, so it would not be backportable. Release note (bug fix): Fixed a rare bug where a lease transfer could lead to a `side-transport update saw closed timestamp regression` panic. The bug could occur when a node was overloaded and failing to heartbeat its node liveness record. --- pkg/kv/kvserver/client_lease_test.go | 82 ++++++++++++++++++++++++ pkg/kv/kvserver/client_split_test.go | 8 ++- pkg/kv/kvserver/replica_range_lease.go | 49 ++++++++++++-- pkg/testutils/testcluster/testcluster.go | 11 ---- 4 files changed, 132 insertions(+), 18 deletions(-) diff --git a/pkg/kv/kvserver/client_lease_test.go b/pkg/kv/kvserver/client_lease_test.go index ecbb6230c3c2..27246bca6f01 100644 --- a/pkg/kv/kvserver/client_lease_test.go +++ b/pkg/kv/kvserver/client_lease_test.go @@ -29,6 +29,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -1636,3 +1637,84 @@ func TestLeaseRequestBumpsEpoch(t *testing.T) { require.Greater(t, liveness.Epoch, prevLease.Epoch) }) } + +// TestLeaseRequestFromExpirationToEpochDoesNotRegressExpiration tests that a +// promotion from an expiration-based lease to an epoch-based lease does not +// permit the expiration time of the lease to regress. This is enforced by +// detecting cases where the leaseholder's liveness record's expiration trails +// its expiration-based lease's expiration and synchronously heartbeating the +// leaseholder's liveness record before promoting the lease. +func TestLeaseRequestFromExpirationToEpochDoesNotRegressExpiration(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + st := cluster.MakeTestingClusterSettings() + kvserver.ExpirationLeasesOnly.Override(ctx, &st.SV, true) // override metamorphism + + tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgs: base.TestServerArgs{ + Settings: st, + }, + }) + defer tc.Stopper().Stop(ctx) + + // Create scratch range. + key := tc.ScratchRange(t) + desc := tc.LookupRangeOrFatal(t, key) + + // Pause n1's node liveness heartbeats, to allow its liveness expiration to + // fall behind. + l0 := tc.Server(0).NodeLiveness().(*liveness.NodeLiveness) + l0.PauseHeartbeatLoopForTest() + l, ok := l0.GetLiveness(tc.Server(0).NodeID()) + require.True(t, ok) + + // Make sure n1 has an expiration-based lease. + s0 := tc.GetFirstStoreFromServer(t, 0) + repl := s0.LookupReplica(desc.StartKey) + require.NotNil(t, repl) + expLease := repl.CurrentLeaseStatus(ctx) + require.True(t, expLease.IsValid()) + require.Equal(t, roachpb.LeaseExpiration, expLease.Lease.Type()) + + // Wait for the expiration-based lease to have a later expiration than the + // expiration timestamp in n1's liveness record. + testutils.SucceedsSoon(t, func() error { + expLease = repl.CurrentLeaseStatus(ctx) + if expLease.Expiration().Less(l.Expiration.ToTimestamp()) { + return errors.Errorf("lease %v not extended beyond liveness %v", expLease, l) + } + return nil + }) + + // Enable epoch-based leases. This will cause automatic lease renewal to try + // to promote the expiration-based lease to an epoch-based lease. + // + // Since we have disabled the background node liveness heartbeat loop, it is + // critical that this lease promotion synchronously heartbeats node liveness + // before acquiring the epoch-based lease. + kvserver.ExpirationLeasesOnly.Override(ctx, &s0.ClusterSettings().SV, false) + + // Wait for that lease promotion to occur. + var epochLease kvserverpb.LeaseStatus + testutils.SucceedsSoon(t, func() error { + epochLease = repl.CurrentLeaseStatus(ctx) + if epochLease.Lease.Type() != roachpb.LeaseEpoch { + return errors.Errorf("lease %v not upgraded to epoch-based", epochLease) + } + return nil + }) + + // Once the lease has been promoted to an epoch-based lease, the effective + // expiration (maintained indirectly in the liveness record) must be greater + // than that in the preceding expiration-based lease. If this were to regress, + // a non-cooperative lease failover to a third lease held by a different node + // could overlap in MVCC time with the first lease (i.e. its start time could + // precede expLease.Expiration), violating the lease disjointness property. + // + // If we disable the `expToEpochPromo` branch in replica_range_lease.go, this + // assertion fails. + require.True(t, expLease.Expiration().Less(epochLease.Expiration())) +} diff --git a/pkg/kv/kvserver/client_split_test.go b/pkg/kv/kvserver/client_split_test.go index cfbde6814d5c..7f131927d143 100644 --- a/pkg/kv/kvserver/client_split_test.go +++ b/pkg/kv/kvserver/client_split_test.go @@ -2828,6 +2828,12 @@ func TestStoreCapacityAfterSplit(t *testing.T) { ReplicationMode: base.ReplicationManual, ServerArgs: base.TestServerArgs{ Settings: st, + RaftConfig: base.RaftConfig{ + // We plan to increment the manual clock by MinStatsDuration a few + // times below and would like for leases to not expire. Configure a + // longer lease duration to achieve this. + RangeLeaseDuration: 10 * replicastats.MinStatsDuration, + }, Knobs: base.TestingKnobs{ Server: &server.TestingKnobs{ WallClock: manualClock, @@ -2845,8 +2851,6 @@ func TestStoreCapacityAfterSplit(t *testing.T) { key := tc.ScratchRange(t) desc := tc.AddVotersOrFatal(t, key, tc.Target(1)) tc.TransferRangeLeaseOrFatal(t, desc, tc.Target(1)) - - tc.IncrClockForLeaseUpgrade(t, manualClock) tc.WaitForLeaseUpgrade(ctx, t, desc) cap, err := s.Capacity(ctx, false /* useCached */) diff --git a/pkg/kv/kvserver/replica_range_lease.go b/pkg/kv/kvserver/replica_range_lease.go index d472ad5f41a4..38de8bc485a1 100644 --- a/pkg/kv/kvserver/replica_range_lease.go +++ b/pkg/kv/kvserver/replica_range_lease.go @@ -303,6 +303,7 @@ func (p *pendingLeaseRequest) InitOrJoinRequest( ProposedTS: &status.Now, } + var reqLeaseLiveness livenesspb.Liveness if p.repl.shouldUseExpirationLeaseRLocked() || (transfer && TransferExpirationLeasesFirstEnabled.Get(&p.repl.store.ClusterSettings().SV)) { @@ -333,6 +334,7 @@ func (p *pendingLeaseRequest) InitOrJoinRequest( return llHandle } reqLease.Epoch = l.Epoch + reqLeaseLiveness = l.Liveness } var leaseReq kvpb.Request @@ -363,7 +365,7 @@ func (p *pendingLeaseRequest) InitOrJoinRequest( } } - err := p.requestLeaseAsync(ctx, nextLeaseHolder, status, leaseReq, limiter) + err := p.requestLeaseAsync(ctx, status, reqLease, reqLeaseLiveness, leaseReq, limiter) if err != nil { if errors.Is(err, stop.ErrThrottled) { llHandle.resolve(kvpb.NewError(err)) @@ -394,10 +396,14 @@ func (p *pendingLeaseRequest) InitOrJoinRequest( // // The status argument is used as the expected value for liveness operations. // leaseReq must be consistent with the LeaseStatus. +// +// The reqLeaseLiveness argument is provided when reqLease is an epoch-based +// lease. func (p *pendingLeaseRequest) requestLeaseAsync( parentCtx context.Context, - nextLeaseHolder roachpb.ReplicaDescriptor, status kvserverpb.LeaseStatus, + reqLease roachpb.Lease, + reqLeaseLiveness livenesspb.Liveness, leaseReq kvpb.Request, limiter *quotapool.IntPool, ) error { @@ -433,7 +439,7 @@ func (p *pendingLeaseRequest) requestLeaseAsync( // RPC, but here we submit the request directly to the local replica. growstack.Grow() - err := p.requestLease(ctx, nextLeaseHolder, status, leaseReq) + err := p.requestLease(ctx, status, reqLease, reqLeaseLiveness, leaseReq) // Error will be handled below. // We reset our state below regardless of whether we've gotten an error or @@ -473,10 +479,14 @@ var logFailedHeartbeatOwnLiveness = log.Every(10 * time.Second) // requestLease sends a synchronous transfer lease or lease request to the // specified replica. It is only meant to be called from requestLeaseAsync, // since it does not coordinate with other in-flight lease requests. +// +// The reqLeaseLiveness argument is provided when reqLease is an epoch-based +// lease. func (p *pendingLeaseRequest) requestLease( ctx context.Context, - nextLeaseHolder roachpb.ReplicaDescriptor, status kvserverpb.LeaseStatus, + reqLease roachpb.Lease, + reqLeaseLiveness livenesspb.Liveness, leaseReq kvpb.Request, ) error { started := timeutil.Now() @@ -484,13 +494,42 @@ func (p *pendingLeaseRequest) requestLease( p.repl.store.metrics.LeaseRequestLatency.RecordValue(timeutil.Since(started).Nanoseconds()) }() + nextLeaseHolder := reqLease.Replica + extension := status.OwnedBy(nextLeaseHolder.StoreID) + + // If we are promoting an expiration-based lease to an epoch-based lease, we + // must make sure the expiration does not regress. We do this here because the + // expiration is stored directly in the lease for expiration-based leases but + // indirectly in liveness record for epoch-based leases. To ensure this, we + // manually heartbeat our liveness record if necessary. This is expected to + // work because the liveness record interval and the expiration-based lease + // interval are the same. + expToEpochPromo := extension && status.Lease.Type() == roachpb.LeaseExpiration && reqLease.Type() == roachpb.LeaseEpoch + if expToEpochPromo && reqLeaseLiveness.Expiration.ToTimestamp().Less(status.Lease.GetExpiration()) { + err := p.repl.store.cfg.NodeLiveness.Heartbeat(ctx, reqLeaseLiveness) + if err != nil { + if logFailedHeartbeatOwnLiveness.ShouldLog() { + log.Errorf(ctx, "failed to heartbeat own liveness record: %s", err) + } + return kvpb.NewNotLeaseHolderError(roachpb.Lease{}, p.repl.store.StoreID(), p.repl.Desc(), + fmt.Sprintf("failed to manipulate liveness record: %s", err)) + } + // Assert that the liveness record expiration is now greater than the + // expiration of the lease we're promoting. + l, ok := p.repl.store.cfg.NodeLiveness.GetLiveness(reqLeaseLiveness.NodeID) + if !ok || l.Expiration.ToTimestamp().Less(status.Lease.GetExpiration()) { + return errors.AssertionFailedf("expiration of liveness record %s is not greater than "+ + "expiration of the previous lease %s after liveness heartbeat", l, status.Lease) + } + } + // If we're replacing an expired epoch-based lease, we must increment the // epoch of the prior owner to invalidate its leases. If we were the owner, // then we instead heartbeat to become live. if status.Lease.Type() == roachpb.LeaseEpoch && status.State == kvserverpb.LeaseState_EXPIRED { var err error // If this replica is previous & next lease holder, manually heartbeat to become live. - if status.OwnedBy(nextLeaseHolder.StoreID) { + if extension { if err = p.repl.store.cfg.NodeLiveness.Heartbeat(ctx, status.Liveness); err != nil && logFailedHeartbeatOwnLiveness.ShouldLog() { log.Errorf(ctx, "failed to heartbeat own liveness record: %s", err) } diff --git a/pkg/testutils/testcluster/testcluster.go b/pkg/testutils/testcluster/testcluster.go index 4af87d750121..33f654d2c670 100644 --- a/pkg/testutils/testcluster/testcluster.go +++ b/pkg/testutils/testcluster/testcluster.go @@ -1104,17 +1104,6 @@ func (tc *TestCluster) TransferRangeLeaseOrFatal( } } -// IncrClockForLeaseUpgrade run up the clock to force a lease renewal (and thus -// the change in lease types). -func (tc *TestCluster) IncrClockForLeaseUpgrade( - t serverutils.TestFataler, clock *hlc.HybridManualClock, -) { - clock.Increment( - tc.GetFirstStoreFromServer(t, 0).GetStoreConfig().RangeLeaseRenewalDuration().Nanoseconds() + - time.Second.Nanoseconds(), - ) -} - // MaybeWaitForLeaseUpgrade waits until the lease held for the given range // descriptor is upgraded to an epoch-based one, but only if we expect the lease // to be upgraded. From 293a1e9f391a9be4efad952d1186887cca155157 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Thu, 2 May 2024 18:40:08 +0000 Subject: [PATCH 3/6] kv: add PrevLease check in RequestLease This commit adds a check that `args.PrevLease` is equivalent to `cArgs.EvalCtx.GetLease()` to RequestLease. This ensures that the validation here is consistent with the validation that was performed when the lease request was constructed. Release note: None Epic: None --- pkg/kv/kvserver/batcheval/cmd_lease_request.go | 10 ++++++++++ pkg/kv/kvserver/replica_test.go | 5 +++-- 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/pkg/kv/kvserver/batcheval/cmd_lease_request.go b/pkg/kv/kvserver/batcheval/cmd_lease_request.go index 13040c882ddd..b70f363ea327 100644 --- a/pkg/kv/kvserver/batcheval/cmd_lease_request.go +++ b/pkg/kv/kvserver/batcheval/cmd_lease_request.go @@ -12,6 +12,7 @@ package batcheval import ( "context" + "fmt" "time" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" @@ -68,6 +69,15 @@ func RequestLease( Requested: args.Lease, } + // However, we verify that the current lease's sequence number and proposed + // timestamp match the provided PrevLease. This ensures that the validation + // here is consistent with the validation that was performed when the lease + // request was constructed. + if prevLease.Sequence != args.PrevLease.Sequence || !prevLease.ProposedTS.Equal(args.PrevLease.ProposedTS) { + rErr.Message = fmt.Sprintf("expected previous lease %s, found %s", args.PrevLease, prevLease) + return newFailedLeaseTrigger(false /* isTransfer */), rErr + } + // MIGRATION(tschottdorf): needed to apply Raft commands which got proposed // before the StartStasis field was introduced. newLease := args.Lease diff --git a/pkg/kv/kvserver/replica_test.go b/pkg/kv/kvserver/replica_test.go index 347fbff95e35..ca38fd88e307 100644 --- a/pkg/kv/kvserver/replica_test.go +++ b/pkg/kv/kvserver/replica_test.go @@ -966,7 +966,8 @@ func TestReplicaLease(t *testing.T) { ctx, tc.repl, allSpans(), false, /* requiresClosedTSOlderThanStorageSnap */ ), Args: &kvpb.RequestLeaseRequest{ - Lease: lease, + Lease: lease, + PrevLease: tc.repl.CurrentLeaseStatus(ctx).Lease, }, }, &kvpb.RequestLeaseResponse{}); !testutils.IsError(err, "replica not found") { t.Fatalf("unexpected error: %+v", err) @@ -1311,7 +1312,7 @@ func TestReplicaLeaseRejectUnknownRaftNodeID(t *testing.T) { st := tc.repl.CurrentLeaseStatus(ctx) ba := &kvpb.BatchRequest{} ba.Timestamp = tc.repl.store.Clock().Now() - ba.Add(&kvpb.RequestLeaseRequest{Lease: *lease}) + ba.Add(&kvpb.RequestLeaseRequest{Lease: *lease, PrevLease: st.Lease}) _, tok := tc.repl.mu.proposalBuf.TrackEvaluatingRequest(ctx, hlc.MinTimestamp) ch, _, _, _, pErr := tc.repl.evalAndPropose(ctx, ba, allSpansGuard(), &st, uncertainty.Interval{}, tok.Move(ctx)) if pErr == nil { From e7ed406eb8a5089955ac4c7533dd0994c4d2ce00 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Thu, 30 May 2024 22:13:24 +0000 Subject: [PATCH 4/6] kv: retry liveness heartbeat on race with insufficient expiration Fixes #124693. Fixes #125287. This commit adds logic to retry the synchronous liveness heartbeat which ensures that the liveness record has a later expiration than the prior lease by the time the lease promotion goes into effect, which was added in #123442. This heartbeat may need to be retried because it may have raced with another liveness heartbeat which did not extend the liveness expiration far enough. We opt to allow this race and retry across it instead of detecting it and returning an error from `NodeLiveness.Heartbeat` because: 1. returning an error would have a larger blast radius and could cause other issues. 2. returning an error wouldn't actually fix the tests that are failing, because they would still get an error, just a different one. Before this commit, `TestLeaseQueueProactiveEnqueueOnPreferences` would hit this case fail under deadlock and stress every ~150 iterations. After this commit, the test passes continuously under deadlock stress for over 2000 runs. This makes #123442 even uglier. The nicer solution is #125235, but that is not backportable. Still, we're planning to address that issue as part of the leader leases work, so this is a temporary fix. This also removes a TODO added in 1dc18dfd. As mentioned above, we don't address it, but instead document the behavior. Release note (bug fix): resolves a concerning log message that says "expiration of liveness record ... is not greater than expiration of the previous lease ... after liveness heartbeat". This message is no longer possible. --- pkg/kv/kvserver/liveness/liveness.go | 15 +++-------- pkg/kv/kvserver/replica_range_lease.go | 37 +++++++++++++++++--------- 2 files changed, 27 insertions(+), 25 deletions(-) diff --git a/pkg/kv/kvserver/liveness/liveness.go b/pkg/kv/kvserver/liveness/liveness.go index ca70c8c90df9..1923a75e8dc4 100644 --- a/pkg/kv/kvserver/liveness/liveness.go +++ b/pkg/kv/kvserver/liveness/liveness.go @@ -709,7 +709,9 @@ var errNodeAlreadyLive = errors.New("node already live") // // If this method returns nil, the node's liveness has been extended, // relative to the previous value. It may or may not still be alive -// when this method returns. +// when this method returns. It may also not have been extended as far +// as the livenessThreshold, because the caller may have raced with +// another heartbeater. // // On failure, this method returns ErrEpochIncremented, although this // may not necessarily mean that the epoch was actually incremented. @@ -847,17 +849,6 @@ func (nl *NodeLiveness) heartbeatInternal( // expired while in flight, so maybe we don't have to care about // that and only need to distinguish between same and different // epochs in our return value. - // - // TODO(nvanbenschoten): Unlike the early return above, this doesn't - // guarantee that the resulting expiration is past minExpiration, - // only that it's different than our oldLiveness. Is that ok? It - // hasn't caused issues so far, but we might want to detect this - // case and retry, at least in the case of the liveness heartbeat - // loop. The downside of this is that a heartbeat that's intending - // to bump the expiration of a record out 9s into the future may - // return a success even if the expiration is only 5 seconds in the - // future. The next heartbeat will then start with only 0.5 seconds - // before expiration. if actual.IsLive(nl.clock.Now()) && !incrementEpoch { return errNodeAlreadyLive } diff --git a/pkg/kv/kvserver/replica_range_lease.go b/pkg/kv/kvserver/replica_range_lease.go index 38de8bc485a1..a1890ec5ae41 100644 --- a/pkg/kv/kvserver/replica_range_lease.go +++ b/pkg/kv/kvserver/replica_range_lease.go @@ -506,20 +506,31 @@ func (p *pendingLeaseRequest) requestLease( // interval are the same. expToEpochPromo := extension && status.Lease.Type() == roachpb.LeaseExpiration && reqLease.Type() == roachpb.LeaseEpoch if expToEpochPromo && reqLeaseLiveness.Expiration.ToTimestamp().Less(status.Lease.GetExpiration()) { - err := p.repl.store.cfg.NodeLiveness.Heartbeat(ctx, reqLeaseLiveness) - if err != nil { - if logFailedHeartbeatOwnLiveness.ShouldLog() { - log.Errorf(ctx, "failed to heartbeat own liveness record: %s", err) + curLiveness := reqLeaseLiveness + for r := retry.StartWithCtx(ctx, base.DefaultRetryOptions()); r.Next(); { + err := p.repl.store.cfg.NodeLiveness.Heartbeat(ctx, curLiveness) + if err != nil { + if logFailedHeartbeatOwnLiveness.ShouldLog() { + log.Errorf(ctx, "failed to heartbeat own liveness record: %s", err) + } + return kvpb.NewNotLeaseHolderError(roachpb.Lease{}, p.repl.store.StoreID(), p.repl.Desc(), + fmt.Sprintf("failed to manipulate liveness record: %s", err)) } - return kvpb.NewNotLeaseHolderError(roachpb.Lease{}, p.repl.store.StoreID(), p.repl.Desc(), - fmt.Sprintf("failed to manipulate liveness record: %s", err)) - } - // Assert that the liveness record expiration is now greater than the - // expiration of the lease we're promoting. - l, ok := p.repl.store.cfg.NodeLiveness.GetLiveness(reqLeaseLiveness.NodeID) - if !ok || l.Expiration.ToTimestamp().Less(status.Lease.GetExpiration()) { - return errors.AssertionFailedf("expiration of liveness record %s is not greater than "+ - "expiration of the previous lease %s after liveness heartbeat", l, status.Lease) + // Check whether the liveness record expiration is now greater than the + // expiration of the lease we're promoting. If not, we may have raced with + // another liveness heartbeat which did not extend the liveness expiration + // far enough and we should try again. + l, ok := p.repl.store.cfg.NodeLiveness.GetLiveness(reqLeaseLiveness.NodeID) + if !ok { + return errors.NewAssertionErrorWithWrappedErrf(liveness.ErrRecordCacheMiss, "after heartbeat") + } + if l.Expiration.ToTimestamp().Less(status.Lease.GetExpiration()) { + log.Infof(ctx, "expiration of liveness record %s is not greater than "+ + "expiration of the previous lease %s after liveness heartbeat, retrying...", l, status.Lease) + curLiveness = l.Liveness + continue + } + break } } From 823a106e286f8888f9669cabb43c090fa17ba99e Mon Sep 17 00:00:00 2001 From: Ibrahim Kettaneh Date: Mon, 19 Aug 2024 14:56:04 +0000 Subject: [PATCH 5/6] kvserver: deflake TestRangefeedCheckpointsRecoverFromLeaseExpiration This commit deflakes the test by waiting for N1's view of N2's lease expiration to match N2's view. This is important in the rare case where N1 tries to increase N2's epoch, but it has a stale view of the lease expiration time. Epic: None Release note: None --- pkg/kv/kvserver/replica_rangefeed_test.go | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/pkg/kv/kvserver/replica_rangefeed_test.go b/pkg/kv/kvserver/replica_rangefeed_test.go index 1632b50bf132..b40089ca16ac 100644 --- a/pkg/kv/kvserver/replica_rangefeed_test.go +++ b/pkg/kv/kvserver/replica_rangefeed_test.go @@ -1507,13 +1507,24 @@ func TestRangefeedCheckpointsRecoverFromLeaseExpiration(t *testing.T) { // Expire the lease. Given that the Raft leadership is on n2, only n2 will be // eligible to acquire a new lease. log.Infof(ctx, "test expiring lease") - nl := n2.NodeLiveness().(*liveness.NodeLiveness) - resumeHeartbeats := nl.PauseAllHeartbeatsForTest() - n2Liveness, ok := nl.Self() + nl2 := n2.NodeLiveness().(*liveness.NodeLiveness) + resumeHeartbeats := nl2.PauseAllHeartbeatsForTest() + n2Liveness, ok := nl2.Self() require.True(t, ok) manualClock.Increment(n2Liveness.Expiration.ToTimestamp().Add(1, 0).WallTime - manualClock.UnixNano()) atomic.StoreInt64(&rejectExtraneousRequests, 1) - // Ask another node to increment n2's liveness record. + + // Ask another node to increment n2's liveness record, but first, wait until + // n1's liveness state is the same as n2's. Otherwise, the epoch below might + // get rejected because of mismatching liveness records. + testutils.SucceedsSoon(t, func() error { + nl1 := n1.NodeLiveness().(*liveness.NodeLiveness) + n2LivenessFromN1, _ := nl1.GetLiveness(n2.NodeID()) + if n2Liveness != n2LivenessFromN1.Liveness { + return errors.Errorf("waiting for node 2 liveness to converge on both nodes 1 and 2") + } + return nil + }) require.NoError(t, n1.NodeLiveness().(*liveness.NodeLiveness).IncrementEpoch(ctx, n2Liveness)) resumeHeartbeats() From 662bbcb667866e81b5a75f64ae5ab339d3a0c1dc Mon Sep 17 00:00:00 2001 From: Austen McClernon Date: Mon, 13 May 2024 22:00:28 +0000 Subject: [PATCH 6/6] kvserver: read lease under mutex when switching lease type A race could occur when a replica queue and post lease application both attempted to switch the lease type. This race would cause the queue to not process the replica because the lease type had already changed. As a result, lease preference violations might not have been quickly resolved by the lease queue. Read the lease under the same mutex used for requesting the lease, when possibly switching the lease type. Resolves: #123998 Release note: None --- pkg/kv/kvserver/replica_range_lease.go | 27 ++++++++++++++++---------- pkg/kv/kvserver/replicate_queue.go | 4 ++-- 2 files changed, 19 insertions(+), 12 deletions(-) diff --git a/pkg/kv/kvserver/replica_range_lease.go b/pkg/kv/kvserver/replica_range_lease.go index a1890ec5ae41..1b49ee84d322 100644 --- a/pkg/kv/kvserver/replica_range_lease.go +++ b/pkg/kv/kvserver/replica_range_lease.go @@ -1582,17 +1582,24 @@ func (r *Replica) shouldRequestLeaseRLocked( // maybeSwitchLeaseType will synchronously renew a lease using the appropriate // type if it is (or was) owned by this replica and has an incorrect type. This // typically happens when changing kv.expiration_leases_only.enabled. -func (r *Replica) maybeSwitchLeaseType(ctx context.Context, st kvserverpb.LeaseStatus) *kvpb.Error { - if !st.OwnedBy(r.store.StoreID()) { - return nil - } +func (r *Replica) maybeSwitchLeaseType(ctx context.Context) *kvpb.Error { + llHandle := func() *leaseRequestHandle { + now := r.store.Clock().NowAsClockTimestamp() + // The lease status needs to be checked and requested under the same lock, + // to avoid an interleaving lease request changing the lease between the + // two. + r.mu.Lock() + defer r.mu.Unlock() - var llHandle *leaseRequestHandle - r.mu.Lock() - if !r.hasCorrectLeaseTypeRLocked(st.Lease) { - llHandle = r.requestLeaseLocked(ctx, st, nil /* limiter */) - } - r.mu.Unlock() + st := r.leaseStatusAtRLocked(ctx, now) + if !st.OwnedBy(r.store.StoreID()) { + return nil + } + if r.hasCorrectLeaseTypeRLocked(st.Lease) { + return nil + } + return r.requestLeaseLocked(ctx, st, nil /* limiter */) + }() if llHandle != nil { select { diff --git a/pkg/kv/kvserver/replicate_queue.go b/pkg/kv/kvserver/replicate_queue.go index 80df32dcd32f..3d8b49858757 100644 --- a/pkg/kv/kvserver/replicate_queue.go +++ b/pkg/kv/kvserver/replicate_queue.go @@ -960,11 +960,11 @@ func (rq *replicateQueue) preProcessCheck(ctx context.Context, repl *Replica) er // TODO(erikgrinaker): This is also done more eagerly during Raft ticks, but // that doesn't work for quiesced epoch-based ranges, so we have a fallback // here that usually runs within 10 minutes. - leaseStatus, pErr := repl.redirectOnOrAcquireLease(ctx) + _, pErr := repl.redirectOnOrAcquireLease(ctx) if pErr != nil { return pErr.GoError() } - pErr = repl.maybeSwitchLeaseType(ctx, leaseStatus) + pErr = repl.maybeSwitchLeaseType(ctx) if pErr != nil { return pErr.GoError() }