-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathcollector.go
111 lines (95 loc) · 3.14 KB
/
collector.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
package main
import (
"sync"
"time"
"github.com/google/uuid"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
configondatv1 "github.com/ondat/metrics-exporter/api/config.storageos.com/v1"
)
type Collector interface {
Name() string
Collect(log *zap.SugaredLogger, ch chan<- prometheus.Metric, ondatVolumes []*Volume) error
}
type CollectorGroup struct {
log *zap.SugaredLogger
collectors []Collector
}
func NewCollectorGroup(log *zap.SugaredLogger, c []Collector) CollectorGroup {
return CollectorGroup{
log: log,
collectors: c,
}
}
func (c CollectorGroup) Describe(ch chan<- *prometheus.Desc) {
ch <- scrapeDurationMetric.desc
ch <- scrapeSuccessMetric.desc
}
// Collect gathers all the metrics and reports back on both the process itself
// but also everything that has been gathered successfully.
// Can be called multiple times asynchronously from the prometheus default registry.
func (c CollectorGroup) Collect(ch chan<- prometheus.Metric) {
// All local Ondat volumes fetched from the state files
ondatVolumes, err := GetVolumesFromLocalState(c.log)
if err != nil {
c.log.Errorw("failed to get Ondat volumes from local state files", "error", err)
return
}
wg := sync.WaitGroup{}
wg.Add(len(c.collectors))
for _, collector := range c.collectors {
// each collector gathers metrics is parallel
go func(collector Collector) {
log := c.log.With("req_id", uuid.New())
execute(log, collector, ch, ondatVolumes)
wg.Done()
}(collector)
}
wg.Wait()
}
func execute(log *zap.SugaredLogger, c Collector, ch chan<- prometheus.Metric, ondatVolumes []*Volume) {
timeStart := time.Now()
// best effort
// even if there's an error processing a specific Volume or disk
// all those that succeed still get reported
err := c.Collect(log, ch, ondatVolumes)
duration := time.Since(timeStart)
ch <- prometheus.MustNewConstMetric(scrapeDurationMetric.desc, scrapeDurationMetric.valueType, duration.Seconds(), c.Name())
var success float64
if err != nil {
log.Errorw("collector failed", "collector", c.Name())
success = 0
} else {
log.Debugw("collector succeeded", "collector", c.Name())
success = 1
}
ch <- prometheus.MustNewConstMetric(scrapeSuccessMetric.desc, scrapeSuccessMetric.valueType, success, c.Name())
}
func GetEnabledMetricsCollectors(
log *zap.SugaredLogger,
disabled []configondatv1.MetricsExporterCollector,
) []Collector {
var metricsCollectors []Collector
for name, collectorFactory := range map[configondatv1.MetricsExporterCollector](func() Collector){
configondatv1.MetricsExporterCollectorDiskStats: func() Collector { return NewDiskStatsCollector() },
configondatv1.MetricsExporterCollectorFileSystem: func() Collector { return NewFileSystemCollector() },
} {
if IsCollectorDisabled(disabled, name) {
log.Infof("disabling %s collector", name)
continue
}
metricsCollectors = append(metricsCollectors, collectorFactory())
}
return metricsCollectors
}
func IsCollectorDisabled(
disabled []configondatv1.MetricsExporterCollector,
collector configondatv1.MetricsExporterCollector,
) bool {
for _, c := range disabled {
if c == collector {
return true
}
}
return false
}