Skip to content

Commit

Permalink
Merge #131976
Browse files Browse the repository at this point in the history
131976: raft: stop heartbeats r=iskettaneh a=iskettaneh

Since all known heartbeat dependencies has been removed if store liveness is enabled, it should be now possible to stop Raft heartbeats.                                                                                                                                                                                                                                                                          

Fixes: #131749
                                                                                                                                                                                                                                                           
Release note: None

Co-authored-by: Ibrahim Kettaneh <[email protected]>
  • Loading branch information
craig[bot] and iskettaneh committed Oct 8, 2024
2 parents c6807c1 + 6279b77 commit 9999e3a
Show file tree
Hide file tree
Showing 24 changed files with 1,292 additions and 387 deletions.
6 changes: 4 additions & 2 deletions pkg/raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -1136,8 +1136,10 @@ func (r *raft) tickHeartbeat() {

if r.heartbeatElapsed >= r.heartbeatTimeout {
r.heartbeatElapsed = 0
if err := r.Step(pb.Message{From: r.id, Type: pb.MsgBeat}); err != nil {
r.logger.Debugf("error occurred during checking sending heartbeat: %v", err)
if !r.fortificationTracker.FortificationEnabled() {
if err := r.Step(pb.Message{From: r.id, Type: pb.MsgBeat}); err != nil {
r.logger.Debugf("error occurred during checking sending heartbeat: %v", err)
}
}

// Mark fortifying followers as recently active. We disable heartbeats when
Expand Down
9 changes: 4 additions & 5 deletions pkg/raft/raft_flow_control_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,11 +145,10 @@ func TestMsgAppFlowControl(t *testing.T) {
r.tick()
}
ms := r.readMessages()
require.Len(t, ms, 3)
require.Equal(t, ms[0].Type, pb.MsgHeartbeat)
require.Equal(t, ms[1].Type, pb.MsgFortifyLeader)
require.Equal(t, ms[2].Type, pb.MsgApp)
require.Empty(t, ms[0].Entries)
require.Len(t, ms, 2)
require.Equal(t, ms[0].Type, pb.MsgFortifyLeader)
require.Equal(t, ms[1].Type, pb.MsgApp)
require.Empty(t, ms[1].Entries)
} else {
r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeatResp})
ms := r.readMessages()
Expand Down
6 changes: 2 additions & 4 deletions pkg/raft/raft_paper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,9 @@ func TestStartAsFollower(t *testing.T) {
// TestLeaderBcastBeat tests that if the leader receives a heartbeat tick,
// it will send a MsgHeartbeat with m.Index = 0, m.LogTerm=0 and empty entries
// as heartbeat to all followers.
// Note that if store liveness is enabled, the leader will also send a MsgApp
// on every heartbeat interval.
// Reference: section 5.2
// Note that if store liveness is enabled, the leader might send a MsgApp on
// every heartbeat interval, but it won't send a MsgHeartbeat.
func TestLeaderBcastBeat(t *testing.T) {
// heartbeat interval
hi := 3
Expand Down Expand Up @@ -141,8 +141,6 @@ func TestLeaderBcastBeat(t *testing.T) {
{From: 1, To: 3, Term: 1, Type: pb.MsgApp, Entries: r.raftLog.allEntries()},
{From: 1, To: 2, Term: 1, Type: pb.MsgFortifyLeader},
{From: 1, To: 3, Term: 1, Type: pb.MsgFortifyLeader},
{From: 1, To: 2, Term: 1, Type: pb.MsgHeartbeat},
{From: 1, To: 3, Term: 1, Type: pb.MsgHeartbeat},
}, msgs)
} else {
assert.Equal(t, []pb.Message{
Expand Down
53 changes: 25 additions & 28 deletions pkg/raft/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1231,24 +1231,22 @@ func TestHandleHeatbeatTimeoutStoreLivenessEnabled(t *testing.T) {
}

msgs := sm.readMessages()
require.Len(t, msgs, 2)
assert.Equal(t, pb.MsgHeartbeat, msgs[0].Type)
assert.Equal(t, pb.MsgApp, msgs[1].Type)
require.Len(t, msgs, 1)
assert.Equal(t, pb.MsgApp, msgs[0].Type)

// On another heartbeat timeout, the leader sends a MsgApp.
for ticks := sm.heartbeatTimeout; ticks > 0; ticks-- {
sm.tick()
}
msgs = sm.readMessages()
require.Len(t, msgs, 2)
assert.Equal(t, pb.MsgHeartbeat, msgs[0].Type)
assert.Equal(t, pb.MsgApp, msgs[1].Type)
require.Len(t, msgs, 1)
assert.Equal(t, pb.MsgApp, msgs[0].Type)

// Once the leader receives a MsgAppResp, it doesn't send MsgApp.
sm.Step(pb.Message{
From: 2,
Type: pb.MsgAppResp,
Index: msgs[1].Index + uint64(len(msgs[1].Entries)),
Index: msgs[0].Index + uint64(len(msgs[0].Entries)),
Commit: sm.raftLog.lastIndex(),
})

Expand All @@ -1261,8 +1259,7 @@ func TestHandleHeatbeatTimeoutStoreLivenessEnabled(t *testing.T) {
sm.tick()
}
msgs = sm.readMessages()
require.Len(t, msgs, 1)
assert.Equal(t, pb.MsgHeartbeat, msgs[0].Type)
require.Len(t, msgs, 0)
}

// TestMsgAppRespWaitReset verifies the resume behavior of a leader
Expand Down Expand Up @@ -2036,7 +2033,10 @@ func TestLeaderAppResp(t *testing.T) {
}

// TestBcastBeat is when the leader receives a heartbeat tick, it should
// send a MsgHeartbeat with m.Index = 0, m.LogTerm=0 and empty entries.
// send a MsgHeartbeat with m.Index = 0, m.LogTerm=0 and empty entries if
// store liveness is disabled. On the other hand, if store liveness is enabled,
// the leader doesn't send a MsgHeartbeat but sends a MsgApp if the follower
// needs it to catch up.
func TestBcastBeat(t *testing.T) {
testutils.RunTrueAndFalse(t, "store-liveness-enabled",
func(t *testing.T, storeLivenessEnabled bool) {
Expand Down Expand Up @@ -2082,14 +2082,12 @@ func TestBcastBeat(t *testing.T) {
sm.tick()
}
msgs := sm.readMessages()
// If storeliveness is enabled, the heartbeat timeout will also send a
// MsgApp if it needs to. In this case since follower 2 is slow, we will
// send a MsgApp to it.
// If storeliveness is enabled, the heartbeat timeout will send a MsgApp
// if it needs to. In this case since follower 2 is slow, we will send a
// MsgApp to it.
if storeLivenessEnabled {
require.Len(t, msgs, 5)
require.Len(t, msgs, 3)
assert.Equal(t, []pb.Message{
{From: 1, To: 2, Term: 2, Type: pb.MsgHeartbeat, Match: 5},
{From: 1, To: 3, Term: 2, Type: pb.MsgHeartbeat, Match: 1011},
{From: 1, To: 2, Term: 2, Type: pb.MsgFortifyLeader},
{From: 1, To: 3, Term: 2, Type: pb.MsgFortifyLeader},
{From: 1, To: 3, Term: 2, Type: pb.MsgApp, LogTerm: 2, Index: 1011, Commit: 1000,
Expand All @@ -2101,14 +2099,14 @@ func TestBcastBeat(t *testing.T) {
{From: 1, To: 2, Term: 2, Type: pb.MsgHeartbeat, Match: 5},
{From: 1, To: 3, Term: 2, Type: pb.MsgHeartbeat, Match: 1011},
}, msgs)
}

// Make sure that the heartbeat messages contain the expected fields.
for i, m := range msgs[:2] {
require.Equal(t, pb.MsgHeartbeat, m.Type, "#%d", i)
require.Zero(t, m.Index, "#%d", i)
require.Zero(t, m.LogTerm, "#%d", i)
require.Empty(t, m.Entries, "#%d", i)
// Make sure that the heartbeat messages contain the expected fields.
for i, m := range msgs {
require.Equal(t, pb.MsgHeartbeat, m.Type, "#%d", i)
require.Zero(t, m.Index, "#%d", i)
require.Zero(t, m.LogTerm, "#%d", i)
require.Empty(t, m.Entries, "#%d", i)
}
}
})
}
Expand Down Expand Up @@ -2280,11 +2278,10 @@ func TestSendAppendForProgressProbeStoreLivenessEnabled(t *testing.T) {
assert.True(t, r.trk.Progress(2).MsgAppProbesPaused)

msg := r.readMessages()
assert.Len(t, msg, 3)
assert.Equal(t, pb.MsgHeartbeat, msg[0].Type)
assert.Equal(t, pb.MsgFortifyLeader, msg[1].Type)
assert.Equal(t, pb.MsgApp, msg[2].Type)
assert.Equal(t, msg[2].Index, uint64(1))
assert.Len(t, msg, 2)
assert.Equal(t, pb.MsgFortifyLeader, msg[0].Type)
assert.Equal(t, pb.MsgApp, msg[1].Type)
assert.Equal(t, msg[1].Index, uint64(1))
assert.True(t, r.trk.Progress(2).MsgAppProbesPaused)
}
}
Expand Down
11 changes: 9 additions & 2 deletions pkg/raft/rafttest/interaction_env_handler_add_nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package rafttest

import (
"context"
"errors"
"fmt"
"reflect"
Expand All @@ -26,6 +27,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/raft"
pb "github.com/cockroachdb/cockroach/pkg/raft/raftpb"
"github.com/cockroachdb/cockroach/pkg/raft/raftstoreliveness"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/datadriven"
Expand Down Expand Up @@ -138,14 +140,19 @@ func (env *InteractionEnv) AddNodes(n int, cfg raft.Config, snap pb.Snapshot) er
cfg := cfg // fork the config stub
cfg.ID, cfg.Storage = id, s

cfg.StoreLiveness = newStoreLiveness(env.Fabric, id)

// If the node creating command hasn't specified the CRDBVersion, use the
// latest one.
if cfg.CRDBVersion == nil {
cfg.CRDBVersion = cluster.MakeTestingClusterSettings().Version
}

// Disable store liveness if the CRDB version is less than 24.3.
if cfg.CRDBVersion.IsActive(context.TODO(), clusterversion.V24_3_StoreLivenessEnabled) {
cfg.StoreLiveness = newStoreLiveness(env.Fabric, id)
} else {
cfg.StoreLiveness = raftstoreliveness.Disabled{}
}

if env.Options.OnConfig != nil {
env.Options.OnConfig(&cfg)
if cfg.ID != id {
Expand Down
12 changes: 2 additions & 10 deletions pkg/raft/testdata/async_storage_writes_append_aba_race.txt
Original file line number Diff line number Diff line change
Expand Up @@ -408,12 +408,6 @@ process-ready 4
----
Ready MustSync=false:
Messages:
4->1 MsgHeartbeat Term:3 Log:0/0
4->2 MsgHeartbeat Term:3 Log:0/0
4->3 MsgHeartbeat Term:3 Log:0/0
4->5 MsgHeartbeat Term:3 Log:0/0
4->6 MsgHeartbeat Term:3 Log:0/0
4->7 MsgHeartbeat Term:3 Log:0/0
4->1 MsgFortifyLeader Term:3 Log:0/0
4->2 MsgFortifyLeader Term:3 Log:0/0
4->3 MsgFortifyLeader Term:3 Log:0/0
Expand All @@ -432,10 +426,9 @@ Messages:

deliver-msgs 1
----
4->1 MsgHeartbeat Term:3 Log:0/0
INFO 1 [term: 2] received a MsgHeartbeat message with higher term from 4 [term: 3]
INFO 1 became follower at term 3
4->1 MsgFortifyLeader Term:3 Log:0/0
INFO 1 [term: 2] received a MsgFortifyLeader message with higher term from 4 [term: 3]
INFO 1 became follower at term 3
4->1 MsgApp Term:3 Log:1/11 Commit:11 Entries:[3/12 EntryNormal ""]
INFO found conflict at index 12 [existing term: 2, conflicting term: 3]
INFO replace the unstable entries from index 12
Expand All @@ -447,7 +440,6 @@ HardState Term:3 Commit:11 Lead:4 LeadEpoch:1
Entries:
3/12 EntryNormal ""
Messages:
1->4 MsgHeartbeatResp Term:3 Log:0/0
1->AppendThread MsgStorageAppend Term:3 Log:3/12 Commit:11 Lead:4 LeadEpoch:1 Entries:[3/12 EntryNormal ""] Responses:[
1->4 MsgFortifyLeaderResp Term:3 Log:0/0 LeadEpoch:1
1->4 MsgAppResp Term:3 Log:0/12 Commit:11
Expand Down
45 changes: 0 additions & 45 deletions pkg/raft/testdata/checkquorum.txt
Original file line number Diff line number Diff line change
Expand Up @@ -77,47 +77,27 @@ stabilize
State:StateFollower
HardState Term:1 Vote:1 Commit:11 Lead:1 LeadEpoch:2
Messages:
1->2 MsgHeartbeat Term:1 Log:0/0
1->3 MsgHeartbeat Term:1 Log:0/0
1->2 MsgFortifyLeader Term:1 Log:0/0
1->3 MsgFortifyLeader Term:1 Log:0/0
1->2 MsgHeartbeat Term:1 Log:0/0
1->3 MsgHeartbeat Term:1 Log:0/0
1->2 MsgFortifyLeader Term:1 Log:0/0
1->3 MsgFortifyLeader Term:1 Log:0/0
1->2 MsgHeartbeat Term:1 Log:0/0
1->3 MsgHeartbeat Term:1 Log:0/0
1->2 MsgFortifyLeader Term:1 Log:0/0
1->3 MsgFortifyLeader Term:1 Log:0/0
1->2 MsgHeartbeat Term:1 Log:0/0
1->3 MsgHeartbeat Term:1 Log:0/0
1->2 MsgFortifyLeader Term:1 Log:0/0
1->3 MsgFortifyLeader Term:1 Log:0/0
1->2 MsgHeartbeat Term:1 Log:0/0
1->3 MsgHeartbeat Term:1 Log:0/0
1->2 MsgFortifyLeader Term:1 Log:0/0
1->3 MsgFortifyLeader Term:1 Log:0/0
> 2 receiving messages
1->2 MsgHeartbeat Term:1 Log:0/0
1->2 MsgFortifyLeader Term:1 Log:0/0
1->2 MsgHeartbeat Term:1 Log:0/0
1->2 MsgFortifyLeader Term:1 Log:0/0
1->2 MsgHeartbeat Term:1 Log:0/0
1->2 MsgFortifyLeader Term:1 Log:0/0
1->2 MsgHeartbeat Term:1 Log:0/0
1->2 MsgFortifyLeader Term:1 Log:0/0
1->2 MsgHeartbeat Term:1 Log:0/0
1->2 MsgFortifyLeader Term:1 Log:0/0
> 3 receiving messages
1->3 MsgHeartbeat Term:1 Log:0/0
1->3 MsgFortifyLeader Term:1 Log:0/0
1->3 MsgHeartbeat Term:1 Log:0/0
1->3 MsgFortifyLeader Term:1 Log:0/0
1->3 MsgHeartbeat Term:1 Log:0/0
1->3 MsgFortifyLeader Term:1 Log:0/0
1->3 MsgHeartbeat Term:1 Log:0/0
1->3 MsgFortifyLeader Term:1 Log:0/0
1->3 MsgHeartbeat Term:1 Log:0/0
1->3 MsgFortifyLeader Term:1 Log:0/0
> 2 handling Ready
Ready MustSync=false:
Expand All @@ -127,20 +107,10 @@ stabilize
2->1 MsgAppResp Term:2 Log:0/0
2->1 MsgAppResp Term:2 Log:0/0
2->1 MsgAppResp Term:2 Log:0/0
2->1 MsgAppResp Term:2 Log:0/0
2->1 MsgAppResp Term:2 Log:0/0
2->1 MsgAppResp Term:2 Log:0/0
2->1 MsgAppResp Term:2 Log:0/0
2->1 MsgAppResp Term:2 Log:0/0
> 3 handling Ready
Ready MustSync=true:
HardState Term:1 Vote:1 Commit:11 Lead:1 LeadEpoch:2
Messages:
3->1 MsgHeartbeatResp Term:1 Log:0/0
3->1 MsgHeartbeatResp Term:1 Log:0/0
3->1 MsgHeartbeatResp Term:1 Log:0/0
3->1 MsgHeartbeatResp Term:1 Log:0/0
3->1 MsgHeartbeatResp Term:1 Log:0/0
3->1 MsgFortifyLeaderResp Term:1 Log:0/0 LeadEpoch:2
3->1 MsgFortifyLeaderResp Term:1 Log:0/0 LeadEpoch:2
3->1 MsgFortifyLeaderResp Term:1 Log:0/0 LeadEpoch:2
Expand All @@ -154,21 +124,6 @@ stabilize
2->1 MsgAppResp Term:2 Log:0/0
2->1 MsgAppResp Term:2 Log:0/0
2->1 MsgAppResp Term:2 Log:0/0
2->1 MsgAppResp Term:2 Log:0/0
2->1 MsgAppResp Term:2 Log:0/0
2->1 MsgAppResp Term:2 Log:0/0
2->1 MsgAppResp Term:2 Log:0/0
2->1 MsgAppResp Term:2 Log:0/0
3->1 MsgHeartbeatResp Term:1 Log:0/0
INFO 1 [term: 2] ignored a MsgHeartbeatResp message with lower term from 3 [term: 1]
3->1 MsgHeartbeatResp Term:1 Log:0/0
INFO 1 [term: 2] ignored a MsgHeartbeatResp message with lower term from 3 [term: 1]
3->1 MsgHeartbeatResp Term:1 Log:0/0
INFO 1 [term: 2] ignored a MsgHeartbeatResp message with lower term from 3 [term: 1]
3->1 MsgHeartbeatResp Term:1 Log:0/0
INFO 1 [term: 2] ignored a MsgHeartbeatResp message with lower term from 3 [term: 1]
3->1 MsgHeartbeatResp Term:1 Log:0/0
INFO 1 [term: 2] ignored a MsgHeartbeatResp message with lower term from 3 [term: 1]
3->1 MsgFortifyLeaderResp Term:1 Log:0/0 LeadEpoch:2
INFO 1 [term: 2] ignored a MsgFortifyLeaderResp message with lower term from 3 [term: 1]
3->1 MsgFortifyLeaderResp Term:1 Log:0/0 LeadEpoch:2
Expand Down
Loading

0 comments on commit 9999e3a

Please sign in to comment.