From a5bc06ccef1d7378c8fd441f849a05ccf2603975 Mon Sep 17 00:00:00 2001
From: Kaviraj Kanagaraj <kavirajkanagaraj@gmail.com>
Date: Thu, 12 Dec 2024 14:19:17 +0100
Subject: [PATCH] chore: Add flag to skip legacy duplicate telemetry (#630)

Currently we have some legacy metrics with `peer_id` in the metrics suffix (in
addition to same metrics with `peer_id`d as label)
1. `raft_replication_appendEntries_rpc_peer0`
2. `raft_replication_appendEntries_logs_peer0`
3. `raft_replication_heartbeat_peer0`
4. `raft_replication_installSnapshot_peer0`

These metrics may have additional `_count` or `_sum` metrics. And each metrics
are multiplicative. Meaning if I have 10 peers, these metrics will be 10x.

This PR adds a flag `noLegacyTelemetry` (default: false) which by setting to
`true` you can skip those duplicate metrics.

---------

Signed-off-by: Kaviraj <kavirajkanagaraj@gmail.com>
Co-authored-by: Piotr Kazmierczak <470696+pkazmierczak@users.noreply.github.com>
---
 CHANGELOG.md   |  4 ++++
 api.go         | 15 ++++++++++++---
 config.go      |  5 +++++
 replication.go | 30 ++++++++++++++++++++----------
 4 files changed, 41 insertions(+), 13 deletions(-)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index 2222897f..f88d1621 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,5 +1,9 @@
 # UNRELEASED
 
+IMPROVEMENETS
+
+* Added a flag to skip legacy duplicate telemetry. [GH-630](https://github.com/hashicorp/raft/pull/630)
+
 # 1.7.0 (June 5th, 2024)
 
 CHANGES
diff --git a/api.go b/api.go
index 98a4b18a..68c8af80 100644
--- a/api.go
+++ b/api.go
@@ -217,6 +217,11 @@ type Raft struct {
 	// preVoteDisabled control if the pre-vote feature is activated,
 	// prevote feature is disabled if set to true.
 	preVoteDisabled bool
+
+	// noLegacyTelemetry allows to skip the legacy metrics to avoid duplicates.
+	// legacy metrics are those that have `_peer_name` as metric suffix instead as labels.
+	// e.g: raft_replication_heartbeat_peer0
+	noLegacyTelemetry bool
 }
 
 // BootstrapCluster initializes a server's storage with the given cluster
@@ -232,7 +237,8 @@ type Raft struct {
 // listing just itself as a Voter, then invoke AddVoter() on it to add other
 // servers to the cluster.
 func BootstrapCluster(conf *Config, logs LogStore, stable StableStore,
-	snaps SnapshotStore, trans Transport, configuration Configuration) error {
+	snaps SnapshotStore, trans Transport, configuration Configuration,
+) error {
 	// Validate the Raft server config.
 	if err := ValidateConfig(conf); err != nil {
 		return err
@@ -305,7 +311,8 @@ func BootstrapCluster(conf *Config, logs LogStore, stable StableStore,
 // the sole voter, and then join up other new clean-state peer servers using
 // the usual APIs in order to bring the cluster back into a known state.
 func RecoverCluster(conf *Config, fsm FSM, logs LogStore, stable StableStore,
-	snaps SnapshotStore, trans Transport, configuration Configuration) error {
+	snaps SnapshotStore, trans Transport, configuration Configuration,
+) error {
 	// Validate the Raft server config.
 	if err := ValidateConfig(conf); err != nil {
 		return err
@@ -436,7 +443,8 @@ func RecoverCluster(conf *Config, fsm FSM, logs LogStore, stable StableStore,
 // without starting a Raft instance or connecting to the cluster. This function
 // has identical behavior to Raft.GetConfiguration.
 func GetConfiguration(conf *Config, fsm FSM, logs LogStore, stable StableStore,
-	snaps SnapshotStore, trans Transport) (Configuration, error) {
+	snaps SnapshotStore, trans Transport,
+) (Configuration, error) {
 	conf.skipStartup = true
 	r, err := NewRaft(conf, fsm, logs, stable, snaps, trans)
 	if err != nil {
@@ -566,6 +574,7 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna
 		followerNotifyCh:      make(chan struct{}, 1),
 		mainThreadSaturation:  newSaturationMetric([]string{"raft", "thread", "main", "saturation"}, 1*time.Second),
 		preVoteDisabled:       conf.PreVoteDisabled || !transportSupportPreVote,
+		noLegacyTelemetry:     conf.NoLegacyTelemetry,
 	}
 	if !transportSupportPreVote && !conf.PreVoteDisabled {
 		r.logger.Warn("pre-vote is disabled because it is not supported by the Transport")
diff --git a/config.go b/config.go
index d14392fc..0f586973 100644
--- a/config.go
+++ b/config.go
@@ -235,6 +235,11 @@ type Config struct {
 	// PreVoteDisabled deactivate the pre-vote feature when set to true
 	PreVoteDisabled bool
 
+	// NoLegacyTelemetry allows to skip the legacy metrics to avoid duplicates.
+	// legacy metrics are those that have `_peer_name` as metric suffix instead as labels.
+	// e.g: raft_replication_heartbeat_peer0
+	NoLegacyTelemetry bool
+
 	// skipStartup allows NewRaft() to bypass all background work goroutines
 	skipStartup bool
 }
diff --git a/replication.go b/replication.go
index 5051863b..2241662b 100644
--- a/replication.go
+++ b/replication.go
@@ -233,7 +233,7 @@ START:
 		s.failures++
 		return
 	}
-	appendStats(string(peer.ID), start, float32(len(req.Entries)))
+	appendStats(string(peer.ID), start, float32(len(req.Entries)), r.noLegacyTelemetry)
 
 	// Check for a newer term, stop running
 	if resp.Term > req.Term {
@@ -347,8 +347,11 @@ func (r *Raft) sendLatestSnapshot(s *followerReplication) (bool, error) {
 	}
 	labels := []metrics.Label{{Name: "peer_id", Value: string(peer.ID)}}
 	metrics.MeasureSinceWithLabels([]string{"raft", "replication", "installSnapshot"}, start, labels)
-	// Duplicated information. Kept for backward compatibility.
-	metrics.MeasureSince([]string{"raft", "replication", "installSnapshot", string(peer.ID)}, start)
+
+	if !r.noLegacyTelemetry {
+		// Duplicated information. Kept for backward compatibility.
+		metrics.MeasureSince([]string{"raft", "replication", "installSnapshot", string(peer.ID)}, start)
+	}
 
 	// Check for a newer term, stop running
 	if resp.Term > req.Term {
@@ -423,8 +426,12 @@ func (r *Raft) heartbeat(s *followerReplication, stopCh chan struct{}) {
 			failures = 0
 			labels := []metrics.Label{{Name: "peer_id", Value: string(peer.ID)}}
 			metrics.MeasureSinceWithLabels([]string{"raft", "replication", "heartbeat"}, start, labels)
-			// Duplicated information. Kept for backward compatibility.
-			metrics.MeasureSince([]string{"raft", "replication", "heartbeat", string(peer.ID)}, start)
+
+			if !r.noLegacyTelemetry {
+				// Duplicated information. Kept for backward compatibility.
+				metrics.MeasureSince([]string{"raft", "replication", "heartbeat", string(peer.ID)}, start)
+			}
+
 			s.notifyAll(resp.Success)
 		}
 	}
@@ -533,7 +540,7 @@ func (r *Raft) pipelineDecode(s *followerReplication, p AppendPipeline, stopCh,
 			s.peerLock.RUnlock()
 
 			req, resp := ready.Request(), ready.Response()
-			appendStats(string(peer.ID), ready.Start(), float32(len(req.Entries)))
+			appendStats(string(peer.ID), ready.Start(), float32(len(req.Entries)), r.noLegacyTelemetry)
 
 			// Check for a newer term, stop running
 			if resp.Term > req.Term {
@@ -621,13 +628,16 @@ func (r *Raft) setNewLogs(req *AppendEntriesRequest, nextIndex, lastIndex uint64
 }
 
 // appendStats is used to emit stats about an AppendEntries invocation.
-func appendStats(peer string, start time.Time, logs float32) {
+func appendStats(peer string, start time.Time, logs float32, skipLegacy bool) {
 	labels := []metrics.Label{{Name: "peer_id", Value: peer}}
 	metrics.MeasureSinceWithLabels([]string{"raft", "replication", "appendEntries", "rpc"}, start, labels)
 	metrics.IncrCounterWithLabels([]string{"raft", "replication", "appendEntries", "logs"}, logs, labels)
-	// Duplicated information. Kept for backward compatibility.
-	metrics.MeasureSince([]string{"raft", "replication", "appendEntries", "rpc", peer}, start)
-	metrics.IncrCounter([]string{"raft", "replication", "appendEntries", "logs", peer}, logs)
+
+	if !skipLegacy {
+		// Duplicated information. Kept for backward compatibility.
+		metrics.MeasureSince([]string{"raft", "replication", "appendEntries", "rpc", peer}, start)
+		metrics.IncrCounter([]string{"raft", "replication", "appendEntries", "logs", peer}, logs)
+	}
 }
 
 // handleStaleTerm is used when a follower indicates that we have a stale term.