From cd6aca1dff619d4670d7eea9d69022cc5a8a5057 Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Tue, 10 Dec 2024 13:59:42 +0000 Subject: [PATCH 1/2] raft: add counters for flow state changes Epic: none Release note: none --- docs/generated/metrics/metrics.html | 3 +++ pkg/raft/metrics.go | 31 ++++++++++++++++++++++++--- pkg/raft/raft.go | 33 +++++++++++++++++++++-------- 3 files changed, 55 insertions(+), 12 deletions(-) diff --git a/docs/generated/metrics/metrics.html b/docs/generated/metrics/metrics.html index 5ba1b7e9f474..e7649f323ab0 100644 --- a/docs/generated/metrics/metrics.html +++ b/docs/generated/metrics/metrics.html @@ -492,6 +492,9 @@ STORAGEraft.entrycache.hitsNumber of successful cache lookups in the Raft entry cacheHitsCOUNTERCOUNTAVGNON_NEGATIVE_DERIVATIVE STORAGEraft.entrycache.read_bytesCounter of bytes in entries returned from the Raft entry cacheBytesCOUNTERBYTESAVGNON_NEGATIVE_DERIVATIVE STORAGEraft.entrycache.sizeNumber of Raft entries in the Raft entry cacheEntry CountGAUGECOUNTAVGNONE +STORAGEraft.flows.entered.state_probeThe number of leader->peer flows transitioned to StateProbeFlowsCOUNTERCOUNTAVGNON_NEGATIVE_DERIVATIVE +STORAGEraft.flows.entered.state_replicateThe number of leader->peer flows transitioned to StateReplicateFlowsCOUNTERCOUNTAVGNON_NEGATIVE_DERIVATIVE +STORAGEraft.flows.entered.state_snapshotThe number of of leader->peer flows transitioned to StateSnapshotFlowsCOUNTERCOUNTAVGNON_NEGATIVE_DERIVATIVE STORAGEraft.fortification.skipped_no_supportThe number of fortification requests that were skipped (not sent) due to lack of store liveness supportSkipped FortificationsCOUNTERCOUNTAVGNON_NEGATIVE_DERIVATIVE STORAGEraft.fortification_resp.acceptedThe number of accepted fortification responses. Calculated on the raft leaderAccepted Fortification ResponsesCOUNTERCOUNTAVGNON_NEGATIVE_DERIVATIVE STORAGEraft.fortification_resp.rejectedThe number of rejected fortification responses. Calculated on the raft leaderRejected Fortification ResponsesCOUNTERCOUNTAVGNON_NEGATIVE_DERIVATIVE diff --git a/pkg/raft/metrics.go b/pkg/raft/metrics.go index 354300a09764..cf5a21010859 100644 --- a/pkg/raft/metrics.go +++ b/pkg/raft/metrics.go @@ -7,11 +7,15 @@ package raft import "github.com/cockroachdb/cockroach/pkg/util/metric" -// Metrics all the metrics reported in Raft. +// Metrics contains all the metrics reported in Raft. type Metrics struct { AcceptedFortificationResponses *metric.Counter RejectedFortificationResponses *metric.Counter SkippedFortificationDueToLackOfSupport *metric.Counter + + FlowsEnteredStateProbe *metric.Counter + FlowsEnteredStateReplicate *metric.Counter + FlowsEnteredStateSnapshot *metric.Counter } var ( @@ -21,14 +25,12 @@ var ( Measurement: "Accepted Fortification Responses", Unit: metric.Unit_COUNT, } - rejectedFortificationResponsesMeta = metric.Metadata{ Name: "raft.fortification_resp.rejected", Help: "The number of rejected fortification responses. Calculated on the raft leader", Measurement: "Rejected Fortification Responses", Unit: metric.Unit_COUNT, } - skippedFortificationDueToLackOfSupportMeta = metric.Metadata{ Name: "raft.fortification.skipped_no_support", Help: "The number of fortification requests that were skipped (not sent) due to lack of store" + @@ -36,6 +38,25 @@ var ( Measurement: "Skipped Fortifications", Unit: metric.Unit_COUNT, } + + metaRaftFlowsEnteredProbe = metric.Metadata{ + Name: "raft.flows.entered.state_probe", + Help: "The number of leader->peer flows transitioned to StateProbe", + Measurement: "Flows", + Unit: metric.Unit_COUNT, + } + metaRaftFlowsEnteredReplicate = metric.Metadata{ + Name: "raft.flows.entered.state_replicate", + Help: "The number of leader->peer flows transitioned to StateReplicate", + Measurement: "Flows", + Unit: metric.Unit_COUNT, + } + metaRaftFlowsEnteredSnapshot = metric.Metadata{ + Name: "raft.flows.entered.state_snapshot", + Help: "The number of of leader->peer flows transitioned to StateSnapshot", + Measurement: "Flows", + Unit: metric.Unit_COUNT, + } ) // NewMetrics creates a new Metrics instance with all related metric fields. @@ -45,5 +66,9 @@ func NewMetrics() *Metrics { RejectedFortificationResponses: metric.NewCounter(rejectedFortificationResponsesMeta), SkippedFortificationDueToLackOfSupport: metric.NewCounter( skippedFortificationDueToLackOfSupportMeta), + + FlowsEnteredStateProbe: metric.NewCounter(metaRaftFlowsEnteredProbe), + FlowsEnteredStateReplicate: metric.NewCounter(metaRaftFlowsEnteredReplicate), + FlowsEnteredStateSnapshot: metric.NewCounter(metaRaftFlowsEnteredSnapshot), } } diff --git a/pkg/raft/raft.go b/pkg/raft/raft.go index 926a49270353..3d9dc030b191 100644 --- a/pkg/raft/raft.go +++ b/pkg/raft/raft.go @@ -774,7 +774,7 @@ func (r *raft) maybeSendSnapshot(to pb.PeerID, pr *tracker.Progress) bool { sindex, sterm := snapshot.Metadata.Index, snapshot.Metadata.Term r.logger.Debugf("%x [firstindex: %d, commit: %d] sent snapshot[index: %d, term: %d] to %x [%s]", r.id, r.raftLog.firstIndex(), r.raftLog.committed, sindex, sterm, to, pr) - pr.BecomeSnapshot(sindex) + r.becomeSnapshot(pr, sindex) r.logger.Debugf("%x paused sending replication messages to %x [%s]", r.id, to, pr) r.send(pb.Message{To: to, Type: pb.MsgSnap, Snapshot: &snapshot}) @@ -1372,7 +1372,7 @@ func (r *raft) becomeLeader() { // trivially in this state. Note that r.reset() has initialized this // progress with the last index already. pr := r.trk.Progress(r.id) - pr.BecomeReplicate() + r.becomeReplicate(pr) // The leader always has RecentActive == true. The checkQuorumActive method // makes sure to preserve this. pr.RecentActive = true @@ -1396,6 +1396,21 @@ func (r *raft) becomeLeader() { r.logger.Infof("%x became leader at term %d", r.id, r.Term) } +func (r *raft) becomeProbe(pr *tracker.Progress) { + r.metrics.FlowsEnteredStateProbe.Inc(1) + pr.BecomeProbe() +} + +func (r *raft) becomeReplicate(pr *tracker.Progress) { + r.metrics.FlowsEnteredStateReplicate.Inc(1) + pr.BecomeReplicate() +} + +func (r *raft) becomeSnapshot(pr *tracker.Progress, index uint64) { + r.metrics.FlowsEnteredStateSnapshot.Inc(1) + pr.BecomeSnapshot(index) +} + func (r *raft) hup(t CampaignType) { if r.state == pb.StateLeader { r.logger.Debugf("%x ignoring MsgHup because already leader", r.id) @@ -2047,7 +2062,7 @@ func stepLeader(r *raft, m pb.Message) error { if pr.MaybeDecrTo(m.Index, nextProbeIdx) { r.logger.Debugf("%x decreased progress of %x to [%s]", r.id, m.From, pr) if pr.State == tracker.StateReplicate { - pr.BecomeProbe() + r.becomeProbe(pr) } r.maybeSendAppend(m.From) } @@ -2063,7 +2078,7 @@ func stepLeader(r *raft, m pb.Message) error { if pr.MaybeUpdate(m.Index) || (pr.Match == m.Index && pr.State == tracker.StateProbe) { switch { case pr.State == tracker.StateProbe: - pr.BecomeReplicate() + r.becomeReplicate(pr) case pr.State == tracker.StateSnapshot && pr.Match+1 >= r.raftLog.firstIndex(): // Note that we don't take into account PendingSnapshot to // enter this branch. No matter at which index a snapshot @@ -2077,8 +2092,8 @@ func stepLeader(r *raft, m pb.Message) error { // move to replicating state, that would only happen with // the next round of appends (but there may not be a next // round for a while, exposing an inconsistent RaftStatus). - pr.BecomeProbe() - pr.BecomeReplicate() + r.becomeProbe(pr) + r.becomeReplicate(pr) case pr.State == tracker.StateReplicate: pr.Inflights.FreeLE(m.Index) } @@ -2123,13 +2138,13 @@ func stepLeader(r *raft, m pb.Message) error { return nil } if !m.Reject { - pr.BecomeProbe() + r.becomeProbe(pr) r.logger.Debugf("%x snapshot succeeded, resumed sending replication messages to %x [%s]", r.id, m.From, pr) } else { // NB: the order here matters or we'll be probing erroneously from // the snapshot index, but the snapshot never applied. pr.PendingSnapshot = 0 - pr.BecomeProbe() + r.becomeProbe(pr) r.logger.Debugf("%x snapshot failed, resumed sending replication messages to %x [%s]", r.id, m.From, pr) } // If snapshot finish, wait for the MsgAppResp from the remote node before sending @@ -2140,7 +2155,7 @@ func stepLeader(r *raft, m pb.Message) error { // During optimistic replication, if the remote becomes unreachable, // there is huge probability that a MsgApp is lost. if pr.State == tracker.StateReplicate { - pr.BecomeProbe() + r.becomeProbe(pr) } r.logger.Debugf("%x failed to send message to %x because it is unreachable [%s]", r.id, m.From, pr) case pb.MsgTransferLeader: From 4c67ac5885f59da1000612937db2a608a2be0391 Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Tue, 10 Dec 2024 17:35:01 +0000 Subject: [PATCH 2/2] raft: count flow states when leader steps up Epic: none Release note: none --- pkg/raft/raft.go | 29 ++++++++++++++++++++--------- 1 file changed, 20 insertions(+), 9 deletions(-) diff --git a/pkg/raft/raft.go b/pkg/raft/raft.go index 3d9dc030b191..6a009952d75e 100644 --- a/pkg/raft/raft.go +++ b/pkg/raft/raft.go @@ -1367,15 +1367,26 @@ func (r *raft) becomeLeader() { r.tick = r.tickHeartbeat r.setLead(r.id) r.state = pb.StateLeader - // Followers enter replicate mode when they've been successfully probed - // (perhaps after having received a snapshot as a result). The leader is - // trivially in this state. Note that r.reset() has initialized this - // progress with the last index already. - pr := r.trk.Progress(r.id) - r.becomeReplicate(pr) - // The leader always has RecentActive == true. The checkQuorumActive method - // makes sure to preserve this. - pr.RecentActive = true + // TODO(pav-kv): r.reset already scans the peers. Try avoiding another scan. + r.trk.Visit(func(id pb.PeerID, pr *tracker.Progress) { + if id == r.id { + // Followers enter replicate mode when they've been successfully probed + // (perhaps after having received a snapshot as a result). The leader is + // trivially in this state. Note that r.reset() has initialized this + // progress with the last index already. + r.becomeReplicate(pr) + // The leader always has RecentActive == true. The checkQuorumActive + // method makes sure to preserve this. + pr.RecentActive = true + return + } + // All peer flows, except the leader's own, are initially in StateProbe. + // Account the probe state entering in metrics here. All subsequent flow + // state changes, while we are the leader, are counted in the corresponding + // methods: becomeProbe, becomeReplicate, becomeSnapshot. + assertTrue(pr.State == tracker.StateProbe, "peers must be in StateProbe on leader step up") + r.metrics.FlowsEnteredStateProbe.Inc(1) + }) // Conservatively set the pendingConfIndex to the last index in the // log. There may or may not be a pending config change, but it's