-
Notifications
You must be signed in to change notification settings - Fork 6
/
saramaprom.go
97 lines (82 loc) · 2.31 KB
/
saramaprom.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
package saramaprom
import (
"context"
"fmt"
"sync"
"time"
"github.com/prometheus/client_golang/prometheus"
)
// Options holds optional params for ExportMetrics.
type Options struct {
// PrometheusRegistry is prometheus registry. Default prometheus.DefaultRegisterer.
PrometheusRegistry prometheus.Registerer
// Namespace and Subsystem form the metric name prefix.
// Default Subsystem is "sarama".
Namespace string
Subsystem string
// Label specifies value of "label" label. It is recomended
// to always set the label to avoid collisions between sarama
// instances. Default "".
Label string
// FlushInterval specifies interval between updating metrics. Default 1s.
FlushInterval time.Duration
// OnError is error handler. Default handler panics when error occurred.
OnError func(err error)
// Debug turns on debug logging.
Debug bool
}
// ExportMetrics exports metrics from go-metrics to prometheus.
func ExportMetrics(ctx context.Context, metricsRegistry MetricsRegistry, opt Options) error {
if opt.PrometheusRegistry == nil {
opt.PrometheusRegistry = prometheus.DefaultRegisterer
}
if opt.Subsystem == "" {
opt.Subsystem = "sarama"
}
if opt.FlushInterval == 0 {
opt.FlushInterval = time.Second
}
if opt.OnError == nil {
opt.OnError = func(err error) {
panic(fmt.Errorf("saramaprom: %w", err))
}
}
exp := &exporter{
opt: opt,
registry: metricsRegistry,
promRegistry: opt.PrometheusRegistry,
gauges: make(map[string]prometheus.Gauge),
customMetrics: make(map[string]*customCollector),
histogramBuckets: []float64{0.05, 0.1, 0.25, 0.50, 0.75, 0.9, 0.95, 0.99},
timerBuckets: []float64{0.50, 0.95, 0.99, 0.999},
mutex: new(sync.Mutex),
}
err := exp.update()
if err != nil {
return fmt.Errorf("saramaprom: %w", err)
}
go func() {
t := time.NewTicker(opt.FlushInterval)
defer t.Stop()
for {
select {
case <-t.C:
err := exp.update()
if err != nil {
opt.OnError(err)
}
case <-ctx.Done():
if err := exp.unregisterGauges(); err != nil {
opt.OnError(err)
}
return
}
}
}()
return nil
}
// MetricsRegistry is an interface for 'github.com/rcrowley/go-metrics'.Registry
// which is used for metrics in sarama.
type MetricsRegistry interface {
Each(func(name string, i interface{}))
}