Skip to content

Commit

Permalink
Add HeartbeatTimeout and ElectionTimeout to reloadable config. (#496)
Browse files Browse the repository at this point in the history
  • Loading branch information
ncabatoff authored Apr 14, 2022
1 parent f2fdbd6 commit ace424e
Show file tree
Hide file tree
Showing 6 changed files with 228 additions and 4 deletions.
16 changes: 16 additions & 0 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,12 @@ type Raft struct {
// leadershipTransferCh is used to start a leadership transfer from outside of
// the main thread.
leadershipTransferCh chan *leadershipTransferFuture

// leaderNotifyCh is used to tell leader that config has changed
leaderNotifyCh chan struct{}

// followerNotifyCh is used to tell followers that config has changed
followerNotifyCh chan struct{}
}

// BootstrapCluster initializes a server's storage with the given cluster
Expand Down Expand Up @@ -545,6 +551,8 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna
bootstrapCh: make(chan *bootstrapFuture),
observers: make(map[uint64]*Observer),
leadershipTransferCh: make(chan *leadershipTransferFuture, 1),
leaderNotifyCh: make(chan struct{}, 1),
followerNotifyCh: make(chan struct{}, 1),
}

r.conf.Store(*conf)
Expand Down Expand Up @@ -696,6 +704,14 @@ func (r *Raft) ReloadConfig(rc ReloadableConfig) error {
return err
}
r.conf.Store(newCfg)

if rc.HeartbeatTimeout < oldCfg.HeartbeatTimeout {
// On leader, ensure replication loops running with a longer
// timeout than what we want now discover the change.
asyncNotifyCh(r.leaderNotifyCh)
// On follower, update current timer to use the shorter new value.
asyncNotifyCh(r.followerNotifyCh)
}
return nil
}

Expand Down
16 changes: 14 additions & 2 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,14 @@ type ReloadableConfig struct {
// we perform a snapshot. This is to prevent excessive snapshots when we can
// just replay a small set of logs.
SnapshotThreshold uint64

// HeartbeatTimeout specifies the time in follower state without
// a leader before we attempt an election.
HeartbeatTimeout time.Duration

// ElectionTimeout specifies the time in candidate state without
// a leader before we attempt an election.
ElectionTimeout time.Duration
}

// apply sets the reloadable fields on the passed Config to the values in
Expand All @@ -269,6 +277,8 @@ func (rc *ReloadableConfig) apply(to Config) Config {
to.TrailingLogs = rc.TrailingLogs
to.SnapshotInterval = rc.SnapshotInterval
to.SnapshotThreshold = rc.SnapshotThreshold
to.HeartbeatTimeout = rc.HeartbeatTimeout
to.ElectionTimeout = rc.ElectionTimeout
return to
}

Expand All @@ -277,6 +287,8 @@ func (rc *ReloadableConfig) fromConfig(from Config) {
rc.TrailingLogs = from.TrailingLogs
rc.SnapshotInterval = from.SnapshotInterval
rc.SnapshotThreshold = from.SnapshotThreshold
rc.HeartbeatTimeout = from.HeartbeatTimeout
rc.ElectionTimeout = from.ElectionTimeout
}

// DefaultConfig returns a Config with usable defaults.
Expand Down Expand Up @@ -334,10 +346,10 @@ func ValidateConfig(config *Config) error {
return fmt.Errorf("LeaderLeaseTimeout is too low")
}
if config.LeaderLeaseTimeout > config.HeartbeatTimeout {
return fmt.Errorf("LeaderLeaseTimeout cannot be larger than heartbeat timeout")
return fmt.Errorf("LeaderLeaseTimeout (%s) cannot be larger than heartbeat timeout (%s)", config.LeaderLeaseTimeout, config.HeartbeatTimeout)
}
if config.ElectionTimeout < config.HeartbeatTimeout {
return fmt.Errorf("ElectionTimeout must be equal or greater than Heartbeat Timeout")
return fmt.Errorf("ElectionTimeout (%s) must be equal or greater than Heartbeat Timeout (%s)", config.ElectionTimeout, config.HeartbeatTimeout)
}
return nil
}
133 changes: 133 additions & 0 deletions integ_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ import (
"fmt"
"io/ioutil"
"os"
"sync/atomic"
"testing"
"time"

"github.com/hashicorp/go-hclog"
"github.com/stretchr/testify/require"
)

// CheckInteg will skip a test if integration testing is not enabled.
Expand Down Expand Up @@ -355,3 +357,134 @@ func TestRaft_Integ(t *testing.T) {
e.Release()
}
}

func TestRaft_RestartFollower_LongInitialHeartbeat(t *testing.T) {
CheckInteg(t)
tests := []struct {
name string
restartInitialTimeouts time.Duration
expectNewLeader bool
}{
{"Default", 0, true},
{"InitialHigher", time.Second, false},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
conf := DefaultConfig()
conf.LocalID = ServerID("first")
conf.HeartbeatTimeout = 50 * time.Millisecond
conf.ElectionTimeout = 50 * time.Millisecond
conf.LeaderLeaseTimeout = 50 * time.Millisecond
conf.CommitTimeout = 5 * time.Millisecond
conf.SnapshotThreshold = 100
conf.TrailingLogs = 10

// Create a single node
env1 := MakeRaft(t, conf, true)
NoErr(WaitFor(env1, Leader), t)

// Join a few nodes!
var envs []*RaftEnv
for i := 0; i < 2; i++ {
conf.LocalID = ServerID(fmt.Sprintf("next-batch-%d", i))
env := MakeRaft(t, conf, false)
addr := env.trans.LocalAddr()
NoErr(WaitFuture(env1.raft.AddVoter(conf.LocalID, addr, 0, 0), t), t)
envs = append(envs, env)
}
allEnvs := append([]*RaftEnv{env1}, envs...)

// Wait for a leader
_, err := WaitForAny(Leader, append([]*RaftEnv{env1}, envs...))
NoErr(err, t)

CheckConsistent(append([]*RaftEnv{env1}, envs...), t)
// TODO without this sleep, the restarted follower doesn't have any stored config
// and aborts the election because it doesn't know of any peers. Shouldn't
// CheckConsistent prevent that?
time.Sleep(time.Second)

// shutdown a follower
disconnected := envs[len(envs)-1]
disconnected.logger.Info("stopping follower")
disconnected.Shutdown()

seeNewLeader := func(o *Observation) bool { _, ok := o.Data.(LeaderObservation); return ok }
leaderCh := make(chan Observation)
// TODO Closing this channel results in panics, even though we're calling Release.
//defer close(leaderCh)
leaderChanges := new(uint32)
go func() {
for range leaderCh {
atomic.AddUint32(leaderChanges, 1)
}
}()

requestVoteCh := make(chan Observation)
seeRequestVote := func(o *Observation) bool { _, ok := o.Data.(RequestVoteRequest); return ok }
requestVotes := new(uint32)
go func() {
for range requestVoteCh {
atomic.AddUint32(requestVotes, 1)
}
}()

for _, env := range allEnvs {
env.raft.RegisterObserver(NewObserver(leaderCh, false, seeNewLeader))
}

// Unfortunately we need to wait for the leader to start backing off RPCs to the down follower
// such that when the follower comes back up it'll run an election before it gets an rpc from
// the leader
time.Sleep(time.Second * 5)

if tt.restartInitialTimeouts != 0 {
disconnected.conf.HeartbeatTimeout = tt.restartInitialTimeouts
disconnected.conf.ElectionTimeout = tt.restartInitialTimeouts
}
disconnected.logger.Info("restarting follower")
disconnected.Restart(t)

time.Sleep(time.Second * 2)

if tt.expectNewLeader {
require.NotEqual(t, 0, atomic.LoadUint32(leaderChanges))
} else {
require.Equal(t, uint32(0), atomic.LoadUint32(leaderChanges))
}

if tt.restartInitialTimeouts != 0 {
for _, env := range envs {
env.raft.RegisterObserver(NewObserver(requestVoteCh, false, seeRequestVote))
NoErr(env.raft.ReloadConfig(ReloadableConfig{
TrailingLogs: conf.TrailingLogs,
SnapshotInterval: conf.SnapshotInterval,
SnapshotThreshold: conf.SnapshotThreshold,
HeartbeatTimeout: 250 * time.Millisecond,
ElectionTimeout: 250 * time.Millisecond,
}), t)
}
// Make sure that reload by itself doesn't trigger a vote
time.Sleep(300 * time.Millisecond)
require.Equal(t, uint32(0), atomic.LoadUint32(requestVotes))

// Stop the leader, ensure that we don't see a request vote within the first 50ms
// (original config of the non-restarted follower), but that we do see one within
// the 250ms both followers should now be using for heartbeat timeout. Well, not
// quite: we wait for two heartbeat intervals (plus a fudge factor), because the
// first time around, last contact will have been recent enough that no vote will
// be triggered.
env1.logger.Info("stopping leader")
env1.Shutdown()
time.Sleep(50 * time.Millisecond)
require.Equal(t, uint32(0), atomic.LoadUint32(requestVotes))
time.Sleep(600 * time.Millisecond)
require.NotEqual(t, uint32(0), atomic.LoadUint32(requestVotes))
}

for _, e := range allEnvs {
e.Release()
}
})
}
}
2 changes: 1 addition & 1 deletion observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ type Observation struct {
// Raft holds the Raft instance generating the observation.
Raft *Raft
// Data holds observation-specific data. Possible types are
// *RequestVoteRequest
// RequestVoteRequest
// RaftState
// PeerObservation
// LeaderObservation
Expand Down
26 changes: 25 additions & 1 deletion raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,12 @@ func (r *Raft) runFollower() {
case b := <-r.bootstrapCh:
b.respond(r.liveBootstrap(b.configuration))

case <-r.leaderNotifyCh:
// Ignore since we are not the leader

case <-r.followerNotifyCh:
heartbeatTimer = time.After(0)

case <-heartbeatTimer:
// Restart the heartbeat timer
hbTimeout := r.config().HeartbeatTimeout
Expand Down Expand Up @@ -275,7 +281,8 @@ func (r *Raft) runCandidate() {
// otherwise.
defer func() { r.candidateFromLeadershipTransfer = false }()

electionTimer := randomTimeout(r.config().ElectionTimeout)
electionTimeout := r.config().ElectionTimeout
electionTimer := randomTimeout(electionTimeout)

// Tally the votes, need a simple majority
grantedVotes := 0
Expand Down Expand Up @@ -337,6 +344,15 @@ func (r *Raft) runCandidate() {
case b := <-r.bootstrapCh:
b.respond(ErrCantBootstrap)

case <-r.leaderNotifyCh:
// Ignore since we are not the leader

case <-r.followerNotifyCh:
if electionTimeout != r.config().ElectionTimeout {
electionTimeout = r.config().ElectionTimeout
electionTimer = randomTimeout(electionTimeout)
}

case <-electionTimer:
// Election failed! Restart the election. We simply return,
// which will kick us back into runCandidate
Expand Down Expand Up @@ -826,6 +842,14 @@ func (r *Raft) leaderLoop() {
// Renew the lease timer
lease = time.After(checkInterval)

case <-r.leaderNotifyCh:
for _, repl := range r.leaderState.replState {
asyncNotifyCh(repl.notifyCh)
}

case <-r.followerNotifyCh:
// Ignore since we are not a follower

case <-r.shutdownCh:
return
}
Expand Down
39 changes: 39 additions & 0 deletions raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2466,6 +2466,7 @@ func TestRaft_CacheLogWithStoreError(t *testing.T) {

func TestRaft_ReloadConfig(t *testing.T) {
conf := inmemConfig(t)
conf.LeaderLeaseTimeout = 40 * time.Millisecond
c := MakeCluster(1, t, conf)
defer c.Close()
raft := c.rafts[0]
Expand All @@ -2480,6 +2481,8 @@ func TestRaft_ReloadConfig(t *testing.T) {
TrailingLogs: 12345,
SnapshotInterval: 234 * time.Second,
SnapshotThreshold: 6789,
HeartbeatTimeout: 45 * time.Millisecond,
ElectionTimeout: 46 * time.Millisecond,
}

require.NoError(t, raft.ReloadConfig(newCfg))
Expand All @@ -2488,6 +2491,8 @@ func TestRaft_ReloadConfig(t *testing.T) {
require.Equal(t, newCfg.TrailingLogs, raft.config().TrailingLogs)
require.Equal(t, newCfg.SnapshotInterval, raft.config().SnapshotInterval)
require.Equal(t, newCfg.SnapshotThreshold, raft.config().SnapshotThreshold)
require.Equal(t, newCfg.HeartbeatTimeout, raft.config().HeartbeatTimeout)
require.Equal(t, newCfg.ElectionTimeout, raft.config().ElectionTimeout)
}

func TestRaft_ReloadConfigValidates(t *testing.T) {
Expand Down Expand Up @@ -2776,3 +2781,37 @@ func TestRaft_runFollower_State_Transition(t *testing.T) {
})
}
}

func TestRaft_runFollower_ReloadTimeoutConfigs(t *testing.T) {
conf := DefaultConfig()
conf.LocalID = ServerID("first")
conf.HeartbeatTimeout = 500 * time.Millisecond
conf.ElectionTimeout = 500 * time.Millisecond
conf.LeaderLeaseTimeout = 50 * time.Millisecond
conf.CommitTimeout = 5 * time.Millisecond
conf.SnapshotThreshold = 100
conf.TrailingLogs = 10
conf.skipStartup = true

env := MakeRaft(t, conf, false)
servers := []Server{{Voter, "first", ""}}
env.raft.setLatestConfiguration(Configuration{Servers: servers}, 1)
env.raft.setState(Follower)

// run the follower loop exclusively
go env.raft.runFollower()

newCfg := ReloadableConfig{
TrailingLogs: conf.TrailingLogs,
SnapshotInterval: conf.SnapshotInterval,
SnapshotThreshold: conf.SnapshotThreshold,
HeartbeatTimeout: 50 * time.Millisecond,
ElectionTimeout: 50 * time.Millisecond,
}
require.NoError(t, env.raft.ReloadConfig(newCfg))
// wait enough time to have HeartbeatTimeout
time.Sleep(3 * newCfg.HeartbeatTimeout)

// Check the follower loop set the right state
require.Equal(t, Candidate, env.raft.getState())
}

0 comments on commit ace424e

Please sign in to comment.