Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
124225: raft: add match index safety check on followers r=pav-kv a=pav-kv

This PR adds a safety check which ensures that a follower's log state does not contradict to the leader's `Match` index.

If the leader has a non-zero `Match` for the follower, it believes that the follower's log a) is consistent with this leader, and b) has all entries up to the `Match` index persisted. If the follower's `Term <= leader.Term` and (b) turns out to not be the case, this means the follower has lost part of the durable state (for example, fsync didn't work properly in the system, and the node got restarted). This is a safety violation, so this follower panics to avoid spreading the harm.

In the future, it would be better to "quarantine" this follower, and surface this information to the operator without crashing the node.

Touches cockroachdb#122690
Epic: none
Release note: none

Co-authored-by: Pavel Kalinnikov <[email protected]>
  • Loading branch information
craig[bot] and pav-kv committed May 20, 2024
2 parents d9dbc2e + fd78de3 commit 420d7da
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 3 deletions.
19 changes: 19 additions & 0 deletions pkg/raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -602,6 +602,7 @@ func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool {
LogTerm: prevTerm,
Entries: ents,
Commit: r.raftLog.committed,
Match: pr.Match,
})
pr.SentEntries(len(ents), uint64(payloadsSize(ents)))
pr.SentCommit(r.raftLog.committed)
Expand Down Expand Up @@ -651,6 +652,7 @@ func (r *raft) sendHeartbeat(to uint64) {
To: to,
Type: pb.MsgHeartbeat,
Commit: commit,
Match: pr.Match,
})
pr.SentCommit(commit)
}
Expand Down Expand Up @@ -1659,6 +1661,8 @@ func logSliceFromMsgApp(m *pb.Message) logSlice {
}

func (r *raft) handleAppendEntries(m pb.Message) {
r.checkMatch(m.Match)

// TODO(pav-kv): construct logSlice up the stack next to receiving the
// message, and validate it before taking any action (e.g. bumping term).
a := logSliceFromMsgApp(&m)
Expand Down Expand Up @@ -1702,7 +1706,22 @@ func (r *raft) handleAppendEntries(m pb.Message) {
})
}

// checkMatch ensures that the follower's log size does not contradict to the
// leader's idea where it matches.
func (r *raft) checkMatch(match uint64) {
// TODO(pav-kv): lastIndex() might be not yet durable. Make this check
// stronger by comparing `match` with the last durable index.
//
// TODO(pav-kv): make this check stronger when the raftLog stores the last
// accepted term. If `match` is non-zero, this follower's log last accepted
// term must equal the leader term, and have entries up to `match` durable.
if last := r.raftLog.lastIndex(); last < match {
r.logger.Panicf("match(%d) is out of range [lastIndex(%d)]. Was the raft log corrupted, truncated, or lost?", match, last)
}
}

func (r *raft) handleHeartbeat(m pb.Message) {
r.checkMatch(m.Match)
r.raftLog.commitTo(m.Commit)
r.send(pb.Message{To: m.From, Type: pb.MsgHeartbeatResp})
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/raft/raft_paper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,8 +380,8 @@ func TestLeaderStartReplication(t *testing.T) {
sort.Sort(messageSlice(msgs))
wents := []pb.Entry{{Index: li + 1, Term: 1, Data: []byte("some data")}}
assert.Equal(t, []pb.Message{
{From: 1, To: 2, Term: 1, Type: pb.MsgApp, Index: li, LogTerm: 1, Entries: wents, Commit: li},
{From: 1, To: 3, Term: 1, Type: pb.MsgApp, Index: li, LogTerm: 1, Entries: wents, Commit: li},
{From: 1, To: 2, Term: 1, Type: pb.MsgApp, Index: li, LogTerm: 1, Entries: wents, Commit: li, Match: li},
{From: 1, To: 3, Term: 1, Type: pb.MsgApp, Index: li, LogTerm: 1, Entries: wents, Commit: li, Match: li},
}, msgs)
assert.Equal(t, []pb.Entry{
{Index: li + 1, Term: 1, Data: []byte("some data")},
Expand Down
15 changes: 15 additions & 0 deletions pkg/raft/raftpb/raft.proto
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,21 @@ message Message {
// to respond and who to respond to when the work associated with a message
// is complete. Populated for MsgStorageAppend and MsgStorageApply messages.
repeated Message responses = 14 [(gogoproto.nullable) = false];

// match is the log index up to which the follower's log must persistently
// match the leader's. If the follower's persistent log is shorter, it means
// the follower has broken its promise and violated safety of Raft. Typically
// this means the environment (Storage) hasn't provided the required
// durability guarantees.
//
// If a follower sees a match index exceeding its log's last index, it must
// cease its membership (stop voting and acking appends) in the raft group, in
// order to limit the damage. Today it simply panics.
//
// match is only populated by the leader when sending messages to a voting
// follower. This can be 0 if the leader hasn't yet established the follower's
// match index, or for backward compatibility.
optional uint64 match = 15 [(gogoproto.nullable) = false];
}

message HardState {
Expand Down
5 changes: 4 additions & 1 deletion pkg/raft/raftpb/raft_test.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
// This code has been modified from its original form by Cockroach Labs, Inc.
// All modifications are Copyright 2024 Cockroach Labs, Inc.
//
// Copyright 2021 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
Expand Down Expand Up @@ -45,7 +48,7 @@ func TestProtoMemorySizes(t *testing.T) {
assert(unsafe.Sizeof(s), if64Bit(144, 80), "Snapshot")

var m Message
assert(unsafe.Sizeof(m), if64Bit(160, 112), "Message")
assert(unsafe.Sizeof(m), if64Bit(168, 112), "Message")

var hs HardState
assert(unsafe.Sizeof(hs), 24, "HardState")
Expand Down
52 changes: 52 additions & 0 deletions pkg/raft/rawnode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -763,6 +763,58 @@ func TestRawNodeCommitPaginationAfterRestart(t *testing.T) {
}
}

// TestRawNodePersistenceRegression tests that a follower panics on receiving a
// message from a leader thinking that the follower's log is persisted at a
// previously promised match index that is out of bounds for this log now.
//
// This emulates the situation when the follower crashed and restarted, and its
// storage durability guarantees were broken.
func TestRawNodePersistenceRegression(t *testing.T) {
const nodeID = 1
newNode := func() *RawNode {
s := newTestMemoryStorage(withPeers(1, 2))
require.NoError(t, s.Append(index(1).terms(1, 2, 5)))
require.NoError(t, s.SetHardState(pb.HardState{
Term: 5,
Vote: 1,
Commit: 3,
}))
return newTestRawNode(nodeID, 10, 1, s)
}

t.Run("MsgApp", func(t *testing.T) {
node := newNode()
msg := pb.Message{
From: 2, To: 1, Type: pb.MsgApp,
Term: 5, Index: 3, LogTerm: 5, Commit: 3,
}
// Don't panic if we haven't reported a higher match index.
for _, match := range []uint64{0, 1, 3} {
msg.Match = match
require.NoError(t, node.Step(msg))
}
// Panic if the leader believes the match index is beyond our log size.
msg.Match = 4
require.Panics(t, func() { _ = node.Step(msg) })
})

t.Run("MsgHeartbeat", func(t *testing.T) {
node := newNode()
msg := pb.Message{
From: 2, To: 1, Type: pb.MsgHeartbeat,
Term: 5, Commit: 3,
}
// Don't panic if we haven't reported a higher match index.
for _, match := range []uint64{0, 1, 3} {
msg.Match = match
require.NoError(t, node.Step(msg))
}
// Panic if the leader believes the match index is beyond our log size.
msg.Match = 4
require.Panics(t, func() { _ = node.Step(msg) })
})
}

// TestRawNodeBoundedLogGrowthWithPartition tests a scenario where a leader is
// partitioned from a quorum of nodes. It verifies that the leader's log is
// protected from unbounded growth even as new entries continue to be proposed.
Expand Down

0 comments on commit 420d7da

Please sign in to comment.