diff --git a/pkg/kv/kvserver/batcheval/cmd_lease_request.go b/pkg/kv/kvserver/batcheval/cmd_lease_request.go index 13040c882ddd..b70f363ea327 100644 --- a/pkg/kv/kvserver/batcheval/cmd_lease_request.go +++ b/pkg/kv/kvserver/batcheval/cmd_lease_request.go @@ -12,6 +12,7 @@ package batcheval import ( "context" + "fmt" "time" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" @@ -68,6 +69,15 @@ func RequestLease( Requested: args.Lease, } + // However, we verify that the current lease's sequence number and proposed + // timestamp match the provided PrevLease. This ensures that the validation + // here is consistent with the validation that was performed when the lease + // request was constructed. + if prevLease.Sequence != args.PrevLease.Sequence || !prevLease.ProposedTS.Equal(args.PrevLease.ProposedTS) { + rErr.Message = fmt.Sprintf("expected previous lease %s, found %s", args.PrevLease, prevLease) + return newFailedLeaseTrigger(false /* isTransfer */), rErr + } + // MIGRATION(tschottdorf): needed to apply Raft commands which got proposed // before the StartStasis field was introduced. newLease := args.Lease diff --git a/pkg/kv/kvserver/client_lease_test.go b/pkg/kv/kvserver/client_lease_test.go index ecbb6230c3c2..27246bca6f01 100644 --- a/pkg/kv/kvserver/client_lease_test.go +++ b/pkg/kv/kvserver/client_lease_test.go @@ -29,6 +29,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -1636,3 +1637,84 @@ func TestLeaseRequestBumpsEpoch(t *testing.T) { require.Greater(t, liveness.Epoch, prevLease.Epoch) }) } + +// TestLeaseRequestFromExpirationToEpochDoesNotRegressExpiration tests that a +// promotion from an expiration-based lease to an epoch-based lease does not +// permit the expiration time of the lease to regress. This is enforced by +// detecting cases where the leaseholder's liveness record's expiration trails +// its expiration-based lease's expiration and synchronously heartbeating the +// leaseholder's liveness record before promoting the lease. +func TestLeaseRequestFromExpirationToEpochDoesNotRegressExpiration(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + st := cluster.MakeTestingClusterSettings() + kvserver.ExpirationLeasesOnly.Override(ctx, &st.SV, true) // override metamorphism + + tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgs: base.TestServerArgs{ + Settings: st, + }, + }) + defer tc.Stopper().Stop(ctx) + + // Create scratch range. + key := tc.ScratchRange(t) + desc := tc.LookupRangeOrFatal(t, key) + + // Pause n1's node liveness heartbeats, to allow its liveness expiration to + // fall behind. + l0 := tc.Server(0).NodeLiveness().(*liveness.NodeLiveness) + l0.PauseHeartbeatLoopForTest() + l, ok := l0.GetLiveness(tc.Server(0).NodeID()) + require.True(t, ok) + + // Make sure n1 has an expiration-based lease. + s0 := tc.GetFirstStoreFromServer(t, 0) + repl := s0.LookupReplica(desc.StartKey) + require.NotNil(t, repl) + expLease := repl.CurrentLeaseStatus(ctx) + require.True(t, expLease.IsValid()) + require.Equal(t, roachpb.LeaseExpiration, expLease.Lease.Type()) + + // Wait for the expiration-based lease to have a later expiration than the + // expiration timestamp in n1's liveness record. + testutils.SucceedsSoon(t, func() error { + expLease = repl.CurrentLeaseStatus(ctx) + if expLease.Expiration().Less(l.Expiration.ToTimestamp()) { + return errors.Errorf("lease %v not extended beyond liveness %v", expLease, l) + } + return nil + }) + + // Enable epoch-based leases. This will cause automatic lease renewal to try + // to promote the expiration-based lease to an epoch-based lease. + // + // Since we have disabled the background node liveness heartbeat loop, it is + // critical that this lease promotion synchronously heartbeats node liveness + // before acquiring the epoch-based lease. + kvserver.ExpirationLeasesOnly.Override(ctx, &s0.ClusterSettings().SV, false) + + // Wait for that lease promotion to occur. + var epochLease kvserverpb.LeaseStatus + testutils.SucceedsSoon(t, func() error { + epochLease = repl.CurrentLeaseStatus(ctx) + if epochLease.Lease.Type() != roachpb.LeaseEpoch { + return errors.Errorf("lease %v not upgraded to epoch-based", epochLease) + } + return nil + }) + + // Once the lease has been promoted to an epoch-based lease, the effective + // expiration (maintained indirectly in the liveness record) must be greater + // than that in the preceding expiration-based lease. If this were to regress, + // a non-cooperative lease failover to a third lease held by a different node + // could overlap in MVCC time with the first lease (i.e. its start time could + // precede expLease.Expiration), violating the lease disjointness property. + // + // If we disable the `expToEpochPromo` branch in replica_range_lease.go, this + // assertion fails. + require.True(t, expLease.Expiration().Less(epochLease.Expiration())) +} diff --git a/pkg/kv/kvserver/client_split_test.go b/pkg/kv/kvserver/client_split_test.go index cfbde6814d5c..7f131927d143 100644 --- a/pkg/kv/kvserver/client_split_test.go +++ b/pkg/kv/kvserver/client_split_test.go @@ -2828,6 +2828,12 @@ func TestStoreCapacityAfterSplit(t *testing.T) { ReplicationMode: base.ReplicationManual, ServerArgs: base.TestServerArgs{ Settings: st, + RaftConfig: base.RaftConfig{ + // We plan to increment the manual clock by MinStatsDuration a few + // times below and would like for leases to not expire. Configure a + // longer lease duration to achieve this. + RangeLeaseDuration: 10 * replicastats.MinStatsDuration, + }, Knobs: base.TestingKnobs{ Server: &server.TestingKnobs{ WallClock: manualClock, @@ -2845,8 +2851,6 @@ func TestStoreCapacityAfterSplit(t *testing.T) { key := tc.ScratchRange(t) desc := tc.AddVotersOrFatal(t, key, tc.Target(1)) tc.TransferRangeLeaseOrFatal(t, desc, tc.Target(1)) - - tc.IncrClockForLeaseUpgrade(t, manualClock) tc.WaitForLeaseUpgrade(ctx, t, desc) cap, err := s.Capacity(ctx, false /* useCached */) diff --git a/pkg/kv/kvserver/liveness/liveness.go b/pkg/kv/kvserver/liveness/liveness.go index ca70c8c90df9..1923a75e8dc4 100644 --- a/pkg/kv/kvserver/liveness/liveness.go +++ b/pkg/kv/kvserver/liveness/liveness.go @@ -709,7 +709,9 @@ var errNodeAlreadyLive = errors.New("node already live") // // If this method returns nil, the node's liveness has been extended, // relative to the previous value. It may or may not still be alive -// when this method returns. +// when this method returns. It may also not have been extended as far +// as the livenessThreshold, because the caller may have raced with +// another heartbeater. // // On failure, this method returns ErrEpochIncremented, although this // may not necessarily mean that the epoch was actually incremented. @@ -847,17 +849,6 @@ func (nl *NodeLiveness) heartbeatInternal( // expired while in flight, so maybe we don't have to care about // that and only need to distinguish between same and different // epochs in our return value. - // - // TODO(nvanbenschoten): Unlike the early return above, this doesn't - // guarantee that the resulting expiration is past minExpiration, - // only that it's different than our oldLiveness. Is that ok? It - // hasn't caused issues so far, but we might want to detect this - // case and retry, at least in the case of the liveness heartbeat - // loop. The downside of this is that a heartbeat that's intending - // to bump the expiration of a record out 9s into the future may - // return a success even if the expiration is only 5 seconds in the - // future. The next heartbeat will then start with only 0.5 seconds - // before expiration. if actual.IsLive(nl.clock.Now()) && !incrementEpoch { return errNodeAlreadyLive } diff --git a/pkg/kv/kvserver/replica_range_lease.go b/pkg/kv/kvserver/replica_range_lease.go index f2924ec801be..1b49ee84d322 100644 --- a/pkg/kv/kvserver/replica_range_lease.go +++ b/pkg/kv/kvserver/replica_range_lease.go @@ -233,7 +233,6 @@ func (p *pendingLeaseRequest) InitOrJoinRequest( nextLeaseHolder roachpb.ReplicaDescriptor, status kvserverpb.LeaseStatus, startKey roachpb.Key, - transfer bool, bypassSafetyChecks bool, limiter *quotapool.IntPool, ) *leaseRequestHandle { @@ -252,10 +251,20 @@ func (p *pendingLeaseRequest) InitOrJoinRequest( nextLeaseHolder.ReplicaID, nextLease.Replica.ReplicaID)) } - acquisition := !status.Lease.OwnedBy(p.repl.store.StoreID()) - extension := !transfer && !acquisition + // Who owns the previous and next lease? + prevLocal := status.Lease.OwnedBy(p.repl.store.StoreID()) + nextLocal := nextLeaseHolder.StoreID == p.repl.store.StoreID() + + // Assert that the lease acquisition, extension, or transfer is valid. + acquisition := !prevLocal && nextLocal + extension := prevLocal && nextLocal + transfer := prevLocal && !nextLocal + remote := !prevLocal && !nextLocal _ = extension // not used, just documentation + if remote { + log.Fatalf(ctx, "cannot acquire/extend lease for remote replica: %v -> %v", status, nextLeaseHolder) + } if acquisition { // If this is a non-cooperative lease change (i.e. an acquisition), it // is up to us to ensure that Lease.Start is greater than the end time @@ -294,6 +303,7 @@ func (p *pendingLeaseRequest) InitOrJoinRequest( ProposedTS: &status.Now, } + var reqLeaseLiveness livenesspb.Liveness if p.repl.shouldUseExpirationLeaseRLocked() || (transfer && TransferExpirationLeasesFirstEnabled.Get(&p.repl.store.ClusterSettings().SV)) { @@ -324,6 +334,7 @@ func (p *pendingLeaseRequest) InitOrJoinRequest( return llHandle } reqLease.Epoch = l.Epoch + reqLeaseLiveness = l.Liveness } var leaseReq kvpb.Request @@ -354,7 +365,7 @@ func (p *pendingLeaseRequest) InitOrJoinRequest( } } - err := p.requestLeaseAsync(ctx, nextLeaseHolder, status, leaseReq, limiter) + err := p.requestLeaseAsync(ctx, status, reqLease, reqLeaseLiveness, leaseReq, limiter) if err != nil { if errors.Is(err, stop.ErrThrottled) { llHandle.resolve(kvpb.NewError(err)) @@ -385,10 +396,14 @@ func (p *pendingLeaseRequest) InitOrJoinRequest( // // The status argument is used as the expected value for liveness operations. // leaseReq must be consistent with the LeaseStatus. +// +// The reqLeaseLiveness argument is provided when reqLease is an epoch-based +// lease. func (p *pendingLeaseRequest) requestLeaseAsync( parentCtx context.Context, - nextLeaseHolder roachpb.ReplicaDescriptor, status kvserverpb.LeaseStatus, + reqLease roachpb.Lease, + reqLeaseLiveness livenesspb.Liveness, leaseReq kvpb.Request, limiter *quotapool.IntPool, ) error { @@ -424,7 +439,7 @@ func (p *pendingLeaseRequest) requestLeaseAsync( // RPC, but here we submit the request directly to the local replica. growstack.Grow() - err := p.requestLease(ctx, nextLeaseHolder, status, leaseReq) + err := p.requestLease(ctx, status, reqLease, reqLeaseLiveness, leaseReq) // Error will be handled below. // We reset our state below regardless of whether we've gotten an error or @@ -464,10 +479,14 @@ var logFailedHeartbeatOwnLiveness = log.Every(10 * time.Second) // requestLease sends a synchronous transfer lease or lease request to the // specified replica. It is only meant to be called from requestLeaseAsync, // since it does not coordinate with other in-flight lease requests. +// +// The reqLeaseLiveness argument is provided when reqLease is an epoch-based +// lease. func (p *pendingLeaseRequest) requestLease( ctx context.Context, - nextLeaseHolder roachpb.ReplicaDescriptor, status kvserverpb.LeaseStatus, + reqLease roachpb.Lease, + reqLeaseLiveness livenesspb.Liveness, leaseReq kvpb.Request, ) error { started := timeutil.Now() @@ -475,13 +494,53 @@ func (p *pendingLeaseRequest) requestLease( p.repl.store.metrics.LeaseRequestLatency.RecordValue(timeutil.Since(started).Nanoseconds()) }() + nextLeaseHolder := reqLease.Replica + extension := status.OwnedBy(nextLeaseHolder.StoreID) + + // If we are promoting an expiration-based lease to an epoch-based lease, we + // must make sure the expiration does not regress. We do this here because the + // expiration is stored directly in the lease for expiration-based leases but + // indirectly in liveness record for epoch-based leases. To ensure this, we + // manually heartbeat our liveness record if necessary. This is expected to + // work because the liveness record interval and the expiration-based lease + // interval are the same. + expToEpochPromo := extension && status.Lease.Type() == roachpb.LeaseExpiration && reqLease.Type() == roachpb.LeaseEpoch + if expToEpochPromo && reqLeaseLiveness.Expiration.ToTimestamp().Less(status.Lease.GetExpiration()) { + curLiveness := reqLeaseLiveness + for r := retry.StartWithCtx(ctx, base.DefaultRetryOptions()); r.Next(); { + err := p.repl.store.cfg.NodeLiveness.Heartbeat(ctx, curLiveness) + if err != nil { + if logFailedHeartbeatOwnLiveness.ShouldLog() { + log.Errorf(ctx, "failed to heartbeat own liveness record: %s", err) + } + return kvpb.NewNotLeaseHolderError(roachpb.Lease{}, p.repl.store.StoreID(), p.repl.Desc(), + fmt.Sprintf("failed to manipulate liveness record: %s", err)) + } + // Check whether the liveness record expiration is now greater than the + // expiration of the lease we're promoting. If not, we may have raced with + // another liveness heartbeat which did not extend the liveness expiration + // far enough and we should try again. + l, ok := p.repl.store.cfg.NodeLiveness.GetLiveness(reqLeaseLiveness.NodeID) + if !ok { + return errors.NewAssertionErrorWithWrappedErrf(liveness.ErrRecordCacheMiss, "after heartbeat") + } + if l.Expiration.ToTimestamp().Less(status.Lease.GetExpiration()) { + log.Infof(ctx, "expiration of liveness record %s is not greater than "+ + "expiration of the previous lease %s after liveness heartbeat, retrying...", l, status.Lease) + curLiveness = l.Liveness + continue + } + break + } + } + // If we're replacing an expired epoch-based lease, we must increment the // epoch of the prior owner to invalidate its leases. If we were the owner, // then we instead heartbeat to become live. if status.Lease.Type() == roachpb.LeaseEpoch && status.State == kvserverpb.LeaseState_EXPIRED { var err error // If this replica is previous & next lease holder, manually heartbeat to become live. - if status.OwnedBy(nextLeaseHolder.StoreID) && p.repl.store.StoreID() == nextLeaseHolder.StoreID { + if extension { if err = p.repl.store.cfg.NodeLiveness.Heartbeat(ctx, status.Liveness); err != nil && logFailedHeartbeatOwnLiveness.ShouldLog() { log.Errorf(ctx, "failed to heartbeat own liveness record: %s", err) } @@ -882,7 +941,7 @@ func (r *Replica) requestLeaseLocked( } return r.mu.pendingLeaseRequest.InitOrJoinRequest( ctx, repDesc, status, r.mu.state.Desc.StartKey.AsRawKey(), - false /* transfer */, false /* bypassSafetyChecks */, limiter) + false /* bypassSafetyChecks */, limiter) } // AdminTransferLease transfers the LeaderLease to another replica. Only the @@ -976,7 +1035,7 @@ func (r *Replica) AdminTransferLease( } transfer = r.mu.pendingLeaseRequest.InitOrJoinRequest(ctx, nextLeaseHolder, status, - desc.StartKey.AsRawKey(), true /* transfer */, bypassSafetyChecks, nil /* limiter */) + desc.StartKey.AsRawKey(), bypassSafetyChecks, nil /* limiter */) return nil, transfer, nil } @@ -1523,17 +1582,24 @@ func (r *Replica) shouldRequestLeaseRLocked( // maybeSwitchLeaseType will synchronously renew a lease using the appropriate // type if it is (or was) owned by this replica and has an incorrect type. This // typically happens when changing kv.expiration_leases_only.enabled. -func (r *Replica) maybeSwitchLeaseType(ctx context.Context, st kvserverpb.LeaseStatus) *kvpb.Error { - if !st.OwnedBy(r.store.StoreID()) { - return nil - } +func (r *Replica) maybeSwitchLeaseType(ctx context.Context) *kvpb.Error { + llHandle := func() *leaseRequestHandle { + now := r.store.Clock().NowAsClockTimestamp() + // The lease status needs to be checked and requested under the same lock, + // to avoid an interleaving lease request changing the lease between the + // two. + r.mu.Lock() + defer r.mu.Unlock() - var llHandle *leaseRequestHandle - r.mu.Lock() - if !r.hasCorrectLeaseTypeRLocked(st.Lease) { - llHandle = r.requestLeaseLocked(ctx, st, nil /* limiter */) - } - r.mu.Unlock() + st := r.leaseStatusAtRLocked(ctx, now) + if !st.OwnedBy(r.store.StoreID()) { + return nil + } + if r.hasCorrectLeaseTypeRLocked(st.Lease) { + return nil + } + return r.requestLeaseLocked(ctx, st, nil /* limiter */) + }() if llHandle != nil { select { diff --git a/pkg/kv/kvserver/replica_rangefeed_test.go b/pkg/kv/kvserver/replica_rangefeed_test.go index 1632b50bf132..b40089ca16ac 100644 --- a/pkg/kv/kvserver/replica_rangefeed_test.go +++ b/pkg/kv/kvserver/replica_rangefeed_test.go @@ -1507,13 +1507,24 @@ func TestRangefeedCheckpointsRecoverFromLeaseExpiration(t *testing.T) { // Expire the lease. Given that the Raft leadership is on n2, only n2 will be // eligible to acquire a new lease. log.Infof(ctx, "test expiring lease") - nl := n2.NodeLiveness().(*liveness.NodeLiveness) - resumeHeartbeats := nl.PauseAllHeartbeatsForTest() - n2Liveness, ok := nl.Self() + nl2 := n2.NodeLiveness().(*liveness.NodeLiveness) + resumeHeartbeats := nl2.PauseAllHeartbeatsForTest() + n2Liveness, ok := nl2.Self() require.True(t, ok) manualClock.Increment(n2Liveness.Expiration.ToTimestamp().Add(1, 0).WallTime - manualClock.UnixNano()) atomic.StoreInt64(&rejectExtraneousRequests, 1) - // Ask another node to increment n2's liveness record. + + // Ask another node to increment n2's liveness record, but first, wait until + // n1's liveness state is the same as n2's. Otherwise, the epoch below might + // get rejected because of mismatching liveness records. + testutils.SucceedsSoon(t, func() error { + nl1 := n1.NodeLiveness().(*liveness.NodeLiveness) + n2LivenessFromN1, _ := nl1.GetLiveness(n2.NodeID()) + if n2Liveness != n2LivenessFromN1.Liveness { + return errors.Errorf("waiting for node 2 liveness to converge on both nodes 1 and 2") + } + return nil + }) require.NoError(t, n1.NodeLiveness().(*liveness.NodeLiveness).IncrementEpoch(ctx, n2Liveness)) resumeHeartbeats() diff --git a/pkg/kv/kvserver/replica_test.go b/pkg/kv/kvserver/replica_test.go index 347fbff95e35..ca38fd88e307 100644 --- a/pkg/kv/kvserver/replica_test.go +++ b/pkg/kv/kvserver/replica_test.go @@ -966,7 +966,8 @@ func TestReplicaLease(t *testing.T) { ctx, tc.repl, allSpans(), false, /* requiresClosedTSOlderThanStorageSnap */ ), Args: &kvpb.RequestLeaseRequest{ - Lease: lease, + Lease: lease, + PrevLease: tc.repl.CurrentLeaseStatus(ctx).Lease, }, }, &kvpb.RequestLeaseResponse{}); !testutils.IsError(err, "replica not found") { t.Fatalf("unexpected error: %+v", err) @@ -1311,7 +1312,7 @@ func TestReplicaLeaseRejectUnknownRaftNodeID(t *testing.T) { st := tc.repl.CurrentLeaseStatus(ctx) ba := &kvpb.BatchRequest{} ba.Timestamp = tc.repl.store.Clock().Now() - ba.Add(&kvpb.RequestLeaseRequest{Lease: *lease}) + ba.Add(&kvpb.RequestLeaseRequest{Lease: *lease, PrevLease: st.Lease}) _, tok := tc.repl.mu.proposalBuf.TrackEvaluatingRequest(ctx, hlc.MinTimestamp) ch, _, _, _, pErr := tc.repl.evalAndPropose(ctx, ba, allSpansGuard(), &st, uncertainty.Interval{}, tok.Move(ctx)) if pErr == nil { diff --git a/pkg/kv/kvserver/replicate_queue.go b/pkg/kv/kvserver/replicate_queue.go index 80df32dcd32f..3d8b49858757 100644 --- a/pkg/kv/kvserver/replicate_queue.go +++ b/pkg/kv/kvserver/replicate_queue.go @@ -960,11 +960,11 @@ func (rq *replicateQueue) preProcessCheck(ctx context.Context, repl *Replica) er // TODO(erikgrinaker): This is also done more eagerly during Raft ticks, but // that doesn't work for quiesced epoch-based ranges, so we have a fallback // here that usually runs within 10 minutes. - leaseStatus, pErr := repl.redirectOnOrAcquireLease(ctx) + _, pErr := repl.redirectOnOrAcquireLease(ctx) if pErr != nil { return pErr.GoError() } - pErr = repl.maybeSwitchLeaseType(ctx, leaseStatus) + pErr = repl.maybeSwitchLeaseType(ctx) if pErr != nil { return pErr.GoError() } diff --git a/pkg/testutils/testcluster/testcluster.go b/pkg/testutils/testcluster/testcluster.go index 4af87d750121..33f654d2c670 100644 --- a/pkg/testutils/testcluster/testcluster.go +++ b/pkg/testutils/testcluster/testcluster.go @@ -1104,17 +1104,6 @@ func (tc *TestCluster) TransferRangeLeaseOrFatal( } } -// IncrClockForLeaseUpgrade run up the clock to force a lease renewal (and thus -// the change in lease types). -func (tc *TestCluster) IncrClockForLeaseUpgrade( - t serverutils.TestFataler, clock *hlc.HybridManualClock, -) { - clock.Increment( - tc.GetFirstStoreFromServer(t, 0).GetStoreConfig().RangeLeaseRenewalDuration().Nanoseconds() + - time.Second.Nanoseconds(), - ) -} - // MaybeWaitForLeaseUpgrade waits until the lease held for the given range // descriptor is upgraded to an epoch-based one, but only if we expect the lease // to be upgraded.