Skip to content

Commit

Permalink
Add metrics labels to all metrics (#270)
Browse files Browse the repository at this point in the history
* adding metric labels to all metrics

Co-authored-by: R.B. Boyer <[email protected]>
  • Loading branch information
huikang and rboyer authored Aug 8, 2022
1 parent 05405d0 commit e6ff9b2
Show file tree
Hide file tree
Showing 7 changed files with 48 additions and 26 deletions.
12 changes: 8 additions & 4 deletions awareness.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,17 @@ type awareness struct {
// score is the current awareness score. Lower values are healthier and
// zero is the minimum value.
score int

// metricLabels is the slice of labels to put on all emitted metrics
metricLabels []metrics.Label
}

// newAwareness returns a new awareness object.
func newAwareness(max int) *awareness {
func newAwareness(max int, metricLabels []metrics.Label) *awareness {
return &awareness{
max: max,
score: 0,
max: max,
score: 0,
metricLabels: metricLabels,
}
}

Expand All @@ -47,7 +51,7 @@ func (a *awareness) ApplyDelta(delta int) {
a.Unlock()

if initial != final {
metrics.SetGauge([]string{"memberlist", "health", "score"}, float32(final))
metrics.SetGaugeWithLabels([]string{"memberlist", "health", "score"}, float32(final), a.metricLabels)
}
}

Expand Down
2 changes: 1 addition & 1 deletion awareness_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func TestAwareness(t *testing.T) {
{-1, 0, 1 * time.Second},
}

a := newAwareness(8)
a := newAwareness(8, nil)
for i, c := range cases {
a.ApplyDelta(c.delta)
if a.GetHealthScore() != c.score {
Expand Down
5 changes: 5 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"strings"
"time"

"github.com/armon/go-metrics"
multierror "github.com/hashicorp/go-multierror"
)

Expand Down Expand Up @@ -244,10 +245,14 @@ type Config struct {
// RequireNodeNames controls if the name of a node is required when sending
// a message to that node.
RequireNodeNames bool

// CIDRsAllowed If nil, allow any connection (default), otherwise specify all networks
// allowed to connect (you must specify IPv6/IPv4 separately)
// Using [] will block all connections.
CIDRsAllowed []net.IPNet

// MetricLabels is a map of optional labels to apply to all metrics emitted.
MetricLabels []metrics.Label
}

// ParseCIDRs return a possible empty list of all Network that have been parsed
Expand Down
14 changes: 10 additions & 4 deletions memberlist.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"sync/atomic"
"time"

"github.com/armon/go-metrics"
multierror "github.com/hashicorp/go-multierror"
sockaddr "github.com/hashicorp/go-sockaddr"
"github.com/miekg/dns"
Expand Down Expand Up @@ -77,6 +78,9 @@ type Memberlist struct {
broadcasts *TransmitLimitedQueue

logger *log.Logger

// metricLabels is the slice of labels to put on all emitted metrics
metricLabels []metrics.Label
}

// BuildVsnArray creates the array of Vsn
Expand Down Expand Up @@ -135,9 +139,10 @@ func newMemberlist(conf *Config) (*Memberlist, error) {
transport := conf.Transport
if transport == nil {
nc := &NetTransportConfig{
BindAddrs: []string{conf.BindAddr},
BindPort: conf.BindPort,
Logger: logger,
BindAddrs: []string{conf.BindAddr},
BindPort: conf.BindPort,
Logger: logger,
MetricLabels: conf.MetricLabels,
}

// See comment below for details about the retry in here.
Expand Down Expand Up @@ -208,10 +213,11 @@ func newMemberlist(conf *Config) (*Memberlist, error) {
lowPriorityMsgQueue: list.New(),
nodeMap: make(map[string]*nodeState),
nodeTimers: make(map[string]*suspicion),
awareness: newAwareness(conf.AwarenessMaxMultiplier),
awareness: newAwareness(conf.AwarenessMaxMultiplier, conf.MetricLabels),
ackHandlers: make(map[uint32]*ackHandler),
broadcasts: &TransmitLimitedQueue{RetransmitMult: conf.RetransmitMult},
logger: logger,
metricLabels: conf.MetricLabels,
}
m.broadcasts.NumNodes = func() int {
return m.estNumNodes()
Expand Down
8 changes: 4 additions & 4 deletions net.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ func (m *Memberlist) handleConn(conn net.Conn) {
defer conn.Close()
m.logger.Printf("[DEBUG] memberlist: Stream connection %s", LogConn(conn))

metrics.IncrCounter([]string{"memberlist", "tcp", "accept"}, 1)
metrics.IncrCounterWithLabels([]string{"memberlist", "tcp", "accept"}, 1, m.metricLabels)

conn.SetDeadline(time.Now().Add(m.config.TCPTimeout))

Expand Down Expand Up @@ -869,7 +869,7 @@ func (m *Memberlist) rawSendMsgPacket(a Address, node *Node, msg []byte) error {
msg = buf.Bytes()
}

metrics.IncrCounter([]string{"memberlist", "udp", "sent"}, float32(len(msg)))
metrics.IncrCounterWithLabels([]string{"memberlist", "udp", "sent"}, float32(len(msg)), m.metricLabels)
_, err := m.transport.WriteToAddress(msg, a)
return err
}
Expand Down Expand Up @@ -898,7 +898,7 @@ func (m *Memberlist) rawSendMsgStream(conn net.Conn, sendBuf []byte, streamLabel
}

// Write out the entire send buffer
metrics.IncrCounter([]string{"memberlist", "tcp", "sent"}, float32(len(sendBuf)))
metrics.IncrCounterWithLabels([]string{"memberlist", "tcp", "sent"}, float32(len(sendBuf)), m.metricLabels)

if n, err := conn.Write(sendBuf); err != nil {
return err
Expand Down Expand Up @@ -953,7 +953,7 @@ func (m *Memberlist) sendAndReceiveState(a Address, join bool) ([]pushNodeState,
}
defer conn.Close()
m.logger.Printf("[DEBUG] memberlist: Initiating push/pull sync with: %s %s", a.Name, conn.RemoteAddr())
metrics.IncrCounter([]string{"memberlist", "tcp", "connect"}, 1)
metrics.IncrCounterWithLabels([]string{"memberlist", "tcp", "connect"}, 1, m.metricLabels)

// Send our state
if err := m.sendLocalState(conn, join, m.config.Label); err != nil {
Expand Down
17 changes: 12 additions & 5 deletions net_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ type NetTransportConfig struct {

// Logger is a logger for operator messages.
Logger *log.Logger

// MetricLabels is a map of optional labels to apply to all metrics
// emitted by this transport.
MetricLabels []metrics.Label
}

// NetTransport is a Transport implementation that uses connectionless UDP for
Expand All @@ -48,6 +52,8 @@ type NetTransport struct {
tcpListeners []*net.TCPListener
udpListeners []*net.UDPConn
shutdown int32

metricLabels []metrics.Label
}

var _ NodeAwareTransport = (*NetTransport)(nil)
Expand All @@ -64,10 +70,11 @@ func NewNetTransport(config *NetTransportConfig) (*NetTransport, error) {
// Build out the new transport.
var ok bool
t := NetTransport{
config: config,
packetCh: make(chan *Packet),
streamCh: make(chan net.Conn),
logger: config.Logger,
config: config,
packetCh: make(chan *Packet),
streamCh: make(chan net.Conn),
logger: config.Logger,
metricLabels: config.MetricLabels,
}

// Clean up listeners if there's an error.
Expand Down Expand Up @@ -341,7 +348,7 @@ func (t *NetTransport) udpListen(udpLn *net.UDPConn) {
}

// Ingest the packet.
metrics.IncrCounter([]string{"memberlist", "udp", "received"}, float32(n))
metrics.IncrCounterWithLabels([]string{"memberlist", "udp", "received"}, float32(n), t.metricLabels)
t.packetCh <- &Packet{
Buf: buf[:n],
From: addr,
Expand Down
16 changes: 8 additions & 8 deletions state.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,14 +286,14 @@ func failedRemote(err error) bool {

// probeNode handles a single round of failure checking on a node.
func (m *Memberlist) probeNode(node *nodeState) {
defer metrics.MeasureSince([]string{"memberlist", "probeNode"}, time.Now())
defer metrics.MeasureSinceWithLabels([]string{"memberlist", "probeNode"}, time.Now(), m.metricLabels)

// We use our health awareness to scale the overall probe interval, so we
// slow down if we detect problems. The ticker that calls us can handle
// us running over the base interval, and will skip missed ticks.
probeInterval := m.awareness.ScaleTimeout(m.config.ProbeInterval)
if probeInterval > m.config.ProbeInterval {
metrics.IncrCounter([]string{"memberlist", "degraded", "probe"}, 1)
metrics.IncrCounterWithLabels([]string{"memberlist", "degraded", "probe"}, 1, m.metricLabels)
}

// Prepare a ping message and setup an ack handler.
Expand Down Expand Up @@ -573,7 +573,7 @@ func (m *Memberlist) resetNodes() {
// gossip is invoked every GossipInterval period to broadcast our gossip
// messages to a few random nodes.
func (m *Memberlist) gossip() {
defer metrics.MeasureSince([]string{"memberlist", "gossip"}, time.Now())
defer metrics.MeasureSinceWithLabels([]string{"memberlist", "gossip"}, time.Now(), m.metricLabels)

// Get some random live, suspect, or recently dead nodes
m.nodeLock.RLock()
Expand Down Expand Up @@ -653,7 +653,7 @@ func (m *Memberlist) pushPull() {

// pushPullNode does a complete state exchange with a specific node.
func (m *Memberlist) pushPullNode(a Address, join bool) error {
defer metrics.MeasureSince([]string{"memberlist", "pushPullNode"}, time.Now())
defer metrics.MeasureSinceWithLabels([]string{"memberlist", "pushPullNode"}, time.Now(), m.metricLabels)

// Attempt to send and receive with the node
remote, userState, err := m.sendAndReceiveState(a, join)
Expand Down Expand Up @@ -1125,7 +1125,7 @@ func (m *Memberlist) aliveNode(a *alive, notify chan struct{}, bootstrap bool) {
}

// Update metrics
metrics.IncrCounter([]string{"memberlist", "msg", "alive"}, 1)
metrics.IncrCounterWithLabels([]string{"memberlist", "msg", "alive"}, 1, m.metricLabels)

// Notify the delegate of any relevant updates
if m.config.Events != nil {
Expand Down Expand Up @@ -1183,7 +1183,7 @@ func (m *Memberlist) suspectNode(s *suspect) {
}

// Update metrics
metrics.IncrCounter([]string{"memberlist", "msg", "suspect"}, 1)
metrics.IncrCounterWithLabels([]string{"memberlist", "msg", "suspect"}, 1, m.metricLabels)

// Update the state
state.Incarnation = s.Incarnation
Expand Down Expand Up @@ -1221,7 +1221,7 @@ func (m *Memberlist) suspectNode(s *suspect) {

if timeout {
if k > 0 && numConfirmations < k {
metrics.IncrCounter([]string{"memberlist", "degraded", "timeout"}, 1)
metrics.IncrCounterWithLabels([]string{"memberlist", "degraded", "timeout"}, 1, m.metricLabels)
}

m.logger.Printf("[INFO] memberlist: Marking %s as failed, suspect timeout reached (%d peer confirmations)",
Expand Down Expand Up @@ -1274,7 +1274,7 @@ func (m *Memberlist) deadNode(d *dead) {
}

// Update metrics
metrics.IncrCounter([]string{"memberlist", "msg", "dead"}, 1)
metrics.IncrCounterWithLabels([]string{"memberlist", "msg", "dead"}, 1, m.metricLabels)

// Update the state
state.Incarnation = d.Incarnation
Expand Down

0 comments on commit e6ff9b2

Please sign in to comment.