-
Notifications
You must be signed in to change notification settings - Fork 1
/
reporter.go
141 lines (127 loc) · 3.6 KB
/
reporter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
package exporters
import (
"fmt"
"log"
"time"
"github.com/rcrowley/go-metrics"
)
// A reporter periodically cut metrics and publish to given publishers.
type Reporter struct {
registry metrics.Registry
interval time.Duration // poll and report interval
autoRemove bool // auto remove metric such as counter
emitters []Emitter
exit chan struct{} // signal when shutting down
labels map[string]string // global labels attach to each metric
reshape Reshape // metric transformer
logf func(format string, a ...any)
}
// Add more emitter to the reporter. Repeatedly apply it to add multiple emitters.
func (rep *Reporter) WithEmitter(emitter Emitter) *Reporter {
rep.emitters = append(rep.emitters, emitter)
return rep
}
// Apply a function to transform metric name, labels, fields before emitting.
func (rep *Reporter) WithReshape(fn Reshape) *Reporter {
rep.reshape = fn
return rep
}
// Add a global label to each metric. Repeatedly apply it to add multiple labels.
func (rep *Reporter) WithLabel(k, v string) *Reporter {
if rep.labels == nil {
rep.labels = make(map[string]string)
}
rep.labels[k] = v
return rep
}
// Auto remove (or not) metric from registry after polled. NOTE that all metrics
// must be dynamically registered to registry via `GetOrRegister`, otherwise
// they will be lost after polled.
func (rep *Reporter) WithAutoRemove(b bool) *Reporter {
rep.autoRemove = b
return rep
}
// Start and return reporter, the reporter should be Closed when shutting down.
func (rep *Reporter) Start() (*Reporter, error) {
rep.exit = make(chan struct{})
if len(rep.emitters) < 1 {
return nil, fmt.Errorf("Please specify at least one emitter to report metrics.")
}
go rep.loopPoll()
return rep, nil
}
// Close reporter and emitters gracefully
func (rep *Reporter) Close() error {
close(rep.exit)
rep.report()
var err error
for _, em := range rep.emitters {
err = em.Close()
}
return err
}
// Log to customized logger, default to log.Printf.
func (rep *Reporter) WithLogger(fn func(format string, a ...any)) *Reporter {
rep.logf = fn
return rep
}
// Poll metrics from registry
func (rep *Reporter) pollMetrics() []*Metric {
points := make([]*Metric, 0, 128)
rep.registry.Each(func(name string, metrik any) {
metric := CollectMetric(name, metrik)
if rep.autoRemove {
// remove metric to keep zero metrics from hanging all time
rep.registry.Unregister(name)
}
if metric != nil {
if metric.Labels == nil && len(rep.labels) > 0 {
metric.Labels = make(map[string]string)
}
for k, v := range rep.labels {
metric.Labels[k] = v
}
if rep.reshape != nil {
metric = rep.reshape(metric)
}
// do not emit if metric has zero fields
if len(metric.Fields) > 0 {
points = append(points, metric)
}
}
})
return points
}
func (rep *Reporter) loopPoll() {
rep.logf("Start reporting metrics (every %s) to %s ...", rep.interval, rep.emitters[0].Name())
ticker := time.Tick(rep.interval)
for {
select {
case <-rep.exit:
return
case <-ticker:
rep.report()
}
}
}
func (rep *Reporter) report() {
metrics := rep.pollMetrics()
if len(metrics) == 0 {
return
}
for _, em := range rep.emitters {
if err := em.Emit(metrics...); err != nil {
rep.logf("ERROR: Report %d metric points to %s error: %s\n", len(metrics), em.Name(), err.Error())
} else {
rep.logf("Reported %d metric points to %s\n", len(metrics), em.Name())
}
}
}
func NewReporter(registry metrics.Registry, pollInterval time.Duration) *Reporter {
rep := &Reporter{
registry: registry,
interval: pollInterval,
logf: log.Printf,
}
return rep
}