Skip to content

Commit

Permalink
Merge #137091
Browse files Browse the repository at this point in the history
137091: raft: add counters for flow state changes r=kvoli a=pav-kv

Resolves #136533

Co-authored-by: Pavel Kalinnikov <[email protected]>
  • Loading branch information
craig[bot] and pav-kv committed Dec 12, 2024
2 parents e8ee6f5 + 4c67ac5 commit f4b4368
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 20 deletions.
3 changes: 3 additions & 0 deletions docs/generated/metrics/metrics.html
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,9 @@
<tr><td>STORAGE</td><td>raft.entrycache.hits</td><td>Number of successful cache lookups in the Raft entry cache</td><td>Hits</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>STORAGE</td><td>raft.entrycache.read_bytes</td><td>Counter of bytes in entries returned from the Raft entry cache</td><td>Bytes</td><td>COUNTER</td><td>BYTES</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>STORAGE</td><td>raft.entrycache.size</td><td>Number of Raft entries in the Raft entry cache</td><td>Entry Count</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>STORAGE</td><td>raft.flows.entered.state_probe</td><td>The number of leader-&gt;peer flows transitioned to StateProbe</td><td>Flows</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>STORAGE</td><td>raft.flows.entered.state_replicate</td><td>The number of leader-&gt;peer flows transitioned to StateReplicate</td><td>Flows</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>STORAGE</td><td>raft.flows.entered.state_snapshot</td><td>The number of of leader-&gt;peer flows transitioned to StateSnapshot</td><td>Flows</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>STORAGE</td><td>raft.fortification.skipped_no_support</td><td>The number of fortification requests that were skipped (not sent) due to lack of store liveness support</td><td>Skipped Fortifications</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>STORAGE</td><td>raft.fortification_resp.accepted</td><td>The number of accepted fortification responses. Calculated on the raft leader</td><td>Accepted Fortification Responses</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>STORAGE</td><td>raft.fortification_resp.rejected</td><td>The number of rejected fortification responses. Calculated on the raft leader</td><td>Rejected Fortification Responses</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
Expand Down
31 changes: 28 additions & 3 deletions pkg/raft/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,15 @@ package raft

import "github.com/cockroachdb/cockroach/pkg/util/metric"

// Metrics all the metrics reported in Raft.
// Metrics contains all the metrics reported in Raft.
type Metrics struct {
AcceptedFortificationResponses *metric.Counter
RejectedFortificationResponses *metric.Counter
SkippedFortificationDueToLackOfSupport *metric.Counter

FlowsEnteredStateProbe *metric.Counter
FlowsEnteredStateReplicate *metric.Counter
FlowsEnteredStateSnapshot *metric.Counter
}

var (
Expand All @@ -21,21 +25,38 @@ var (
Measurement: "Accepted Fortification Responses",
Unit: metric.Unit_COUNT,
}

rejectedFortificationResponsesMeta = metric.Metadata{
Name: "raft.fortification_resp.rejected",
Help: "The number of rejected fortification responses. Calculated on the raft leader",
Measurement: "Rejected Fortification Responses",
Unit: metric.Unit_COUNT,
}

skippedFortificationDueToLackOfSupportMeta = metric.Metadata{
Name: "raft.fortification.skipped_no_support",
Help: "The number of fortification requests that were skipped (not sent) due to lack of store" +
" liveness support",
Measurement: "Skipped Fortifications",
Unit: metric.Unit_COUNT,
}

metaRaftFlowsEnteredProbe = metric.Metadata{
Name: "raft.flows.entered.state_probe",
Help: "The number of leader->peer flows transitioned to StateProbe",
Measurement: "Flows",
Unit: metric.Unit_COUNT,
}
metaRaftFlowsEnteredReplicate = metric.Metadata{
Name: "raft.flows.entered.state_replicate",
Help: "The number of leader->peer flows transitioned to StateReplicate",
Measurement: "Flows",
Unit: metric.Unit_COUNT,
}
metaRaftFlowsEnteredSnapshot = metric.Metadata{
Name: "raft.flows.entered.state_snapshot",
Help: "The number of of leader->peer flows transitioned to StateSnapshot",
Measurement: "Flows",
Unit: metric.Unit_COUNT,
}
)

// NewMetrics creates a new Metrics instance with all related metric fields.
Expand All @@ -45,5 +66,9 @@ func NewMetrics() *Metrics {
RejectedFortificationResponses: metric.NewCounter(rejectedFortificationResponsesMeta),
SkippedFortificationDueToLackOfSupport: metric.NewCounter(
skippedFortificationDueToLackOfSupportMeta),

FlowsEnteredStateProbe: metric.NewCounter(metaRaftFlowsEnteredProbe),
FlowsEnteredStateReplicate: metric.NewCounter(metaRaftFlowsEnteredReplicate),
FlowsEnteredStateSnapshot: metric.NewCounter(metaRaftFlowsEnteredSnapshot),
}
}
60 changes: 43 additions & 17 deletions pkg/raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -773,7 +773,7 @@ func (r *raft) maybeSendSnapshot(to pb.PeerID, pr *tracker.Progress) bool {
sindex, sterm := snapshot.Metadata.Index, snapshot.Metadata.Term
r.logger.Debugf("%x [firstindex: %d, commit: %d] sent snapshot[index: %d, term: %d] to %x [%s]",
r.id, r.raftLog.firstIndex(), r.raftLog.committed, sindex, sterm, to, pr)
pr.BecomeSnapshot(sindex)
r.becomeSnapshot(pr, sindex)
r.logger.Debugf("%x paused sending replication messages to %x [%s]", r.id, to, pr)

r.send(pb.Message{To: to, Type: pb.MsgSnap, Snapshot: &snapshot})
Expand Down Expand Up @@ -1333,15 +1333,26 @@ func (r *raft) becomeLeader() {
r.tick = r.tickHeartbeat
r.setLead(r.id)
r.state = pb.StateLeader
// Followers enter replicate mode when they've been successfully probed
// (perhaps after having received a snapshot as a result). The leader is
// trivially in this state. Note that r.reset() has initialized this
// progress with the last index already.
pr := r.trk.Progress(r.id)
pr.BecomeReplicate()
// The leader always has RecentActive == true. The checkQuorumActive method
// makes sure to preserve this.
pr.RecentActive = true
// TODO(pav-kv): r.reset already scans the peers. Try avoiding another scan.
r.trk.Visit(func(id pb.PeerID, pr *tracker.Progress) {
if id == r.id {
// Followers enter replicate mode when they've been successfully probed
// (perhaps after having received a snapshot as a result). The leader is
// trivially in this state. Note that r.reset() has initialized this
// progress with the last index already.
r.becomeReplicate(pr)
// The leader always has RecentActive == true. The checkQuorumActive
// method makes sure to preserve this.
pr.RecentActive = true
return
}
// All peer flows, except the leader's own, are initially in StateProbe.
// Account the probe state entering in metrics here. All subsequent flow
// state changes, while we are the leader, are counted in the corresponding
// methods: becomeProbe, becomeReplicate, becomeSnapshot.
assertTrue(pr.State == tracker.StateProbe, "peers must be in StateProbe on leader step up")
r.metrics.FlowsEnteredStateProbe.Inc(1)
})

// Conservatively set the pendingConfIndex to the last index in the
// log. There may or may not be a pending config change, but it's
Expand All @@ -1362,6 +1373,21 @@ func (r *raft) becomeLeader() {
r.logger.Infof("%x became leader at term %d", r.id, r.Term)
}

func (r *raft) becomeProbe(pr *tracker.Progress) {
r.metrics.FlowsEnteredStateProbe.Inc(1)
pr.BecomeProbe()
}

func (r *raft) becomeReplicate(pr *tracker.Progress) {
r.metrics.FlowsEnteredStateReplicate.Inc(1)
pr.BecomeReplicate()
}

func (r *raft) becomeSnapshot(pr *tracker.Progress, index uint64) {
r.metrics.FlowsEnteredStateSnapshot.Inc(1)
pr.BecomeSnapshot(index)
}

func (r *raft) hup(t CampaignType) {
if r.state == pb.StateLeader {
r.logger.Debugf("%x ignoring MsgHup because already leader", r.id)
Expand Down Expand Up @@ -2013,7 +2039,7 @@ func stepLeader(r *raft, m pb.Message) error {
if pr.MaybeDecrTo(m.Index, nextProbeIdx) {
r.logger.Debugf("%x decreased progress of %x to [%s]", r.id, m.From, pr)
if pr.State == tracker.StateReplicate {
pr.BecomeProbe()
r.becomeProbe(pr)
}
r.maybeSendAppend(m.From)
}
Expand All @@ -2029,7 +2055,7 @@ func stepLeader(r *raft, m pb.Message) error {
if pr.MaybeUpdate(m.Index) || (pr.Match == m.Index && pr.State == tracker.StateProbe) {
switch {
case pr.State == tracker.StateProbe:
pr.BecomeReplicate()
r.becomeReplicate(pr)
case pr.State == tracker.StateSnapshot && pr.Match+1 >= r.raftLog.firstIndex():
// Note that we don't take into account PendingSnapshot to
// enter this branch. No matter at which index a snapshot
Expand All @@ -2043,8 +2069,8 @@ func stepLeader(r *raft, m pb.Message) error {
// move to replicating state, that would only happen with
// the next round of appends (but there may not be a next
// round for a while, exposing an inconsistent RaftStatus).
pr.BecomeProbe()
pr.BecomeReplicate()
r.becomeProbe(pr)
r.becomeReplicate(pr)
case pr.State == tracker.StateReplicate:
pr.Inflights.FreeLE(m.Index)
}
Expand Down Expand Up @@ -2089,13 +2115,13 @@ func stepLeader(r *raft, m pb.Message) error {
return nil
}
if !m.Reject {
pr.BecomeProbe()
r.becomeProbe(pr)
r.logger.Debugf("%x snapshot succeeded, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
} else {
// NB: the order here matters or we'll be probing erroneously from
// the snapshot index, but the snapshot never applied.
pr.PendingSnapshot = 0
pr.BecomeProbe()
r.becomeProbe(pr)
r.logger.Debugf("%x snapshot failed, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
}
// If snapshot finish, wait for the MsgAppResp from the remote node before sending
Expand All @@ -2106,7 +2132,7 @@ func stepLeader(r *raft, m pb.Message) error {
// During optimistic replication, if the remote becomes unreachable,
// there is huge probability that a MsgApp is lost.
if pr.State == tracker.StateReplicate {
pr.BecomeProbe()
r.becomeProbe(pr)
}
r.logger.Debugf("%x failed to send message to %x because it is unreachable [%s]", r.id, m.From, pr)
case pb.MsgTransferLeader:
Expand Down

0 comments on commit f4b4368

Please sign in to comment.