Skip to content

Commit

Permalink
Merge #131621
Browse files Browse the repository at this point in the history
131621: raft: update lastUpdateTimes based on store liveness support r=iskettaneh a=iskettaneh

This commit updates the lastUpdateTimes map for all followers that provide store liveness support. Store liveness support indicates that the follower is alive. This is done on top of the normal update when the leader receives any message from the follower.

This is important because in leader leases, followers might not receive/send any Raft messages if they don't need to. Therefore, we need an extra signal to determine if the follower is alive or not.

Epic: None

Release note: None

Co-authored-by: Ibrahim Kettaneh <[email protected]>
  • Loading branch information
craig[bot] and iskettaneh committed Oct 7, 2024
2 parents fba8c11 + 4956a40 commit 0ce3817
Show file tree
Hide file tree
Showing 3 changed files with 123 additions and 2 deletions.
84 changes: 84 additions & 0 deletions pkg/kv/kvserver/lease_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,90 @@ func TestLeaseQueueSwitchesLeaseType(t *testing.T) {
waitForLeasesToSwitch("disabled", true /* someEpoch */, false /* someLeader */)
}

// TestUpdateLastUpdateTimesUsingStoreLiveness tests that `lastUpdateTimes` is
// updated when the leader is supported by a follower in the store liveness even
// if it's not updating the map upon receiving followers' messages.
func TestUpdateLastUpdateTimesUsingStoreLiveness(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

skip.UnderRace(t) // too slow under stressrace
skip.UnderDeadlock(t)
skip.UnderShort(t)

ctx := context.Background()
st := cluster.MakeTestingClusterSettings()
kvserver.ExpirationLeasesOnly.Override(ctx, &st.SV, false) // override metamorphism

manualClock := hlc.NewHybridManualClock()
knobs := base.TestingKnobs{
Server: &server.TestingKnobs{
WallClock: manualClock,
},
Store: &kvserver.StoreTestingKnobs{
// Disable updating the `lastUpdateTimes` map when the leader receives
// messages from followers. This is to simulate the leader not
// sending/receiving any messages because it doesn't have any updates.
DisableUpdateLastUpdateTimesMapOnRaftGroupStep: true,
},
}

tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
// Speed up the lease queue, which switches the lease type.
Settings: st,
ScanMinIdleTime: time.Millisecond,
ScanMaxIdleTime: time.Millisecond,
Knobs: knobs,
},
})
defer tc.Stopper().Stop(ctx)
require.NoError(t, tc.WaitForFullReplication())

db := tc.Server(0).DB()
sqlDB := tc.ServerConn(0)

// Split off a few ranges so we have something to work with.
scratchKey := tc.ScratchRange(t)
for i := 0; i <= 16; i++ {
splitKey := append(scratchKey.Clone(), byte(i))
require.NoError(t, db.AdminSplit(ctx, splitKey, hlc.MaxTimestamp))
}

// Switch 100% of ranges to use leader fortification.
_, err := sqlDB.ExecContext(ctx,
`SET CLUSTER SETTING kv.raft.leader_fortification.fraction_enabled = 1.00`)
require.NoError(t, err)

// Increment the manual clock to ensure that all followers are initially
// considered inactive.
manualClock.Increment(time.Second.Nanoseconds() * 3)

// Make sure that the replicas are considered active.
require.Eventually(t, func() bool {
allActive := true
for i := 0; i < tc.NumServers(); i++ {
store, err := tc.Server(i).GetStores().(*kvserver.Stores).
GetStore(tc.Server(i).GetFirstStoreID())
require.NoError(t, err)

store.VisitReplicas(func(r *kvserver.Replica) (wantMore bool) {
leader := tc.GetRaftLeader(t, r.Desc().StartKey)

// Any replica that is not active will cause this check to repeat.
if !leader.IsFollowerActiveSince(r.ReplicaID(), leader.Clock().PhysicalTime(),
time.Second) {
allActive = false
}

return true
})
}

return allActive
}, 45*time.Second, 1*time.Second) // accommodate stress
}

// TestLeaseQueueRaceReplicateQueue asserts that the replicate/lease queue will
// not process a replica unless it can obtain the allocator token for the
// replica, i.e. changes are processed serially per range leaseholder.
Expand Down
37 changes: 35 additions & 2 deletions pkg/kv/kvserver/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -627,7 +627,10 @@ func (r *Replica) stepRaftGroupRaftMuLocked(req *kvserverpb.RaftMessageRequest)
wakeLeader := hasLeader && !fromLeader
r.maybeUnquiesceLocked(wakeLeader, false /* mayCampaign */)
}
r.mu.lastUpdateTimes.update(req.FromReplica.ReplicaID, r.Clock().PhysicalTime())
if r.store.TestingKnobs() == nil ||
!r.store.TestingKnobs().DisableUpdateLastUpdateTimesMapOnRaftGroupStep {
r.mu.lastUpdateTimes.update(req.FromReplica.ReplicaID, r.Clock().PhysicalTime())
}
switch req.Message.Type {
case raftpb.MsgPreVote, raftpb.MsgVote:
// If we receive a (pre)vote request, and we find our leader to be dead or
Expand Down Expand Up @@ -1405,7 +1408,8 @@ func (r *Replica) tick(

r.updatePausedFollowersLocked(ctx, ioThresholdMap)

leaseStatus := r.leaseStatusAtRLocked(ctx, r.store.Clock().NowAsClockTimestamp())
storeClockTimestamp := r.store.Clock().NowAsClockTimestamp()
leaseStatus := r.leaseStatusAtRLocked(ctx, storeClockTimestamp)
// TODO(pav-kv): modify the quiescence criterion so that we don't quiesce if
// RACv2 holds some send tokens.
if r.maybeQuiesceRaftMuLockedReplicaMuLocked(ctx, leaseStatus, livenessMap) {
Expand Down Expand Up @@ -1446,6 +1450,9 @@ func (r *Replica) tick(
// live even when quiesced.
if r.isRaftLeaderRLocked() {
r.mu.lastUpdateTimes.update(r.replicaID, r.Clock().PhysicalTime())
// We also update lastUpdateTimes for replicas that provide store liveness
// support to the leader.
r.updateLastUpdateTimesUsingStoreLivenessRLocked(storeClockTimestamp)
}

r.mu.ticks++
Expand Down Expand Up @@ -3072,6 +3079,32 @@ func (r *Replica) printRaftTail(
return sb.String(), nil
}

// updateLastUpdateTimesUsingStoreLivenessRLocked updates the lastUpdateTimes
// map if the follower's store is providing store liveness support. This is
// useful because typically this map is updated on every message, but that
// assumes that raft will periodically heartbeat. This assumption doesn't hold
// under the raft fortification protocol, where failure detection is subsumed by
// store liveness.
//
// This method assume that Replica.mu is held in read mode.
func (r *Replica) updateLastUpdateTimesUsingStoreLivenessRLocked(
storeClockTimestamp hlc.ClockTimestamp,
) {
// If store liveness is not enabled, there is nothing to do.
if !(*replicaRLockedStoreLiveness)(r).SupportFromEnabled() {
return
}

for _, desc := range r.descRLocked().Replicas().Descriptors() {
// If the replica's store if providing store liveness support, update
// lastUpdateTimes to indicate that it is alive.
_, curExp := (*replicaRLockedStoreLiveness)(r).SupportFrom(raftpb.PeerID(desc.ReplicaID))
if storeClockTimestamp.ToTimestamp().LessEq(curExp) {
r.mu.lastUpdateTimes.update(desc.ReplicaID, r.Clock().PhysicalTime())
}
}
}

func truncateEntryString(s string, maxChars int) string {
res := s
if len(s) > maxChars {
Expand Down
4 changes: 4 additions & 0 deletions pkg/kv/kvserver/testing_knobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,10 @@ type StoreTestingKnobs struct {
// true, the report is ignored and ReportUnreachable is not called on the
// raft group for that replica.
RaftReportUnreachableBypass func(roachpb.ReplicaID) bool

// DisableUpdateLastUpdateTimesMapOnRaftGroupStep disables updating the
// lastUpdateTimes map when a raft group is stepped.
DisableUpdateLastUpdateTimesMapOnRaftGroupStep bool
}

// ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface.
Expand Down

0 comments on commit 0ce3817

Please sign in to comment.