Skip to content

Commit

Permalink
Remove api dependency (#34)
Browse files Browse the repository at this point in the history
* Increase timeout

* Updated logging scope in metric collectors to remove repeated fields

* Fetch volumes from local files instead of API
  • Loading branch information
Ricardo-Osorio authored May 27, 2022
1 parent c56cb5f commit 27e959b
Show file tree
Hide file tree
Showing 9 changed files with 160 additions and 318 deletions.
211 changes: 0 additions & 211 deletions api.go

This file was deleted.

20 changes: 8 additions & 12 deletions collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,22 +11,19 @@ import (

type Collector interface {
Name() string
Collect(log *zap.SugaredLogger, ch chan<- prometheus.Metric, ondatVolumes []VolumePVC) error
Collect(log *zap.SugaredLogger, ch chan<- prometheus.Metric, ondatVolumes []*Volume) error
}

type CollectorGroup struct {
log *zap.SugaredLogger

apiSecretsPath string

collectors []Collector
}

func NewCollectorGroup(log *zap.SugaredLogger, apiSecretsPath string, c []Collector) CollectorGroup {
func NewCollectorGroup(log *zap.SugaredLogger, c []Collector) CollectorGroup {
return CollectorGroup{
log: log,
apiSecretsPath: apiSecretsPath,
collectors: c,
log: log,
collectors: c,
}
}

Expand All @@ -39,11 +36,10 @@ func (c CollectorGroup) Describe(ch chan<- *prometheus.Desc) {
// but also everything that has been gathered successfully.
// Can be called multiple times asynchronously from the prometheus default registry.
func (c CollectorGroup) Collect(ch chan<- prometheus.Metric) {
// All Ondat volumes fetched from the storageos container's API
// Every collectors requires it to match against PVC's
ondatVolumes, err := GetOndatVolumesAPI(c.log, c.apiSecretsPath)
// All local Ondat volumes fetched from the state files
ondatVolumes, err := GetVolumesFromLocalState(c.log)
if err != nil {
c.log.Errorw("failed to fetch Ondat volumes from API", "error", err)
c.log.Errorw("failed to get Ondat volumes from local state files", "error", err)
return
}

Expand All @@ -60,7 +56,7 @@ func (c CollectorGroup) Collect(ch chan<- prometheus.Metric) {
wg.Wait()
}

func execute(log *zap.SugaredLogger, c Collector, ch chan<- prometheus.Metric, ondatVolumes []VolumePVC) {
func execute(log *zap.SugaredLogger, c Collector, ch chan<- prometheus.Metric, ondatVolumes []*Volume) {
timeStart := time.Now()

// best effort
Expand Down
34 changes: 10 additions & 24 deletions diskstats_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func (c DiskStatsCollector) Name() string {
return DISKSTATS_COLLECTOR_NAME
}

func (c DiskStatsCollector) Collect(log *zap.SugaredLogger, ch chan<- prometheus.Metric, ondatVolumes []VolumePVC) error {
func (c DiskStatsCollector) Collect(log *zap.SugaredLogger, ch chan<- prometheus.Metric, ondatVolumes []*Volume) error {
log.Debug("starting diskstats metrics collector")
log = log.With("collector", DISKSTATS_COLLECTOR_NAME)

Expand All @@ -183,34 +183,20 @@ func (c DiskStatsCollector) Collect(log *zap.SugaredLogger, ch chan<- prometheus
return nil
}

volumesOnNode, err := GetOndatVolumesFS()
err := ExtractOndatVolumesNumbers(ondatVolumes)
if err != nil {
log.Errorw("error getting Ondat volumes", "error", err)
log.Errorw("error getting Ondat volumes major and minor numbers", "error", err)
return err
}

if len(volumesOnNode) == 0 {
log.Debug("no Ondat volumes on node, metrics collector finished early")
return nil
}

diskstats, err := ProcDiskstats()
if err != nil {
log.Errorw("error reading diskstats", "error", err)
return err
}

for _, localVol := range volumesOnNode {
// find the volume within the API response and retrieve the PVC info we are missing here
for _, apiVol := range ondatVolumes {
if localVol.ID == apiVol.ID {
localVol.PVC = apiVol.PVC
localVol.PVCNamespace = apiVol.Namespace
break
}
}

log = log.With("pvc", localVol.PVC, "pvc_namespace", localVol.PVCNamespace)
for _, localVol := range ondatVolumes {
logScope := log.With("pvc", localVol.Labels.PVC, "pvc_namespace", localVol.Labels.PVCNamespace)

for _, stats := range diskstats {
// match with Ondat volume through diskstat row's Major and Minor numbers
Expand All @@ -221,17 +207,17 @@ func (c DiskStatsCollector) Collect(log *zap.SugaredLogger, ch chan<- prometheus
// Build the info metric for each diskstate line (volume) processed.
// Its value is not relevant as we only care about the labels.
// Failure to do so shouldn't stop us from collecting any further metrics.
metric, err := prometheus.NewConstMetric(c.info.desc, c.info.valueType, 1.0, localVol.PVC, localVol.PVCNamespace, stats.DeviceName, fmt.Sprint(localVol.Major), fmt.Sprint(localVol.Minor))
metric, err := prometheus.NewConstMetric(c.info.desc, c.info.valueType, 1.0, localVol.Labels.PVC, localVol.Labels.PVCNamespace, stats.DeviceName, fmt.Sprint(localVol.Major), fmt.Sprint(localVol.Minor))
if err != nil {
log.Errorw("encountered error while building metric", "metric", c.info.desc.String(), "error", err)
logScope.Errorw("encountered error while building metric", "metric", c.info.desc.String(), "error", err)
} else {
ch <- metric
}

diskSectorSize := 512.0
logicalBlockSize, err := GetBlockDeviceLogicalBlockSize(stats.DeviceName)
if err != nil {
log.Errorw("error reading device logical block size, falling back to default", "error", err)
logScope.Errorw("error reading device logical block size, falling back to default", "error", err)
// continue with default sector size
} else {
diskSectorSize = float64(logicalBlockSize)
Expand Down Expand Up @@ -267,9 +253,9 @@ func (c DiskStatsCollector) Collect(log *zap.SugaredLogger, ch chan<- prometheus
break
}

metric, err := prometheus.NewConstMetric(c.metrics[i].desc, c.metrics[i].valueType, val, localVol.PVC, localVol.PVCNamespace)
metric, err := prometheus.NewConstMetric(c.metrics[i].desc, c.metrics[i].valueType, val, localVol.Labels.PVC, localVol.Labels.PVCNamespace)
if err != nil {
log.Errorw("encountered error while building metric", "metric", c.metrics[i].desc.String(), "error", err)
logScope.Errorw("encountered error while building metric", "metric", c.metrics[i].desc.String(), "error", err)
continue
}
ch <- metric
Expand Down
Loading

0 comments on commit 27e959b

Please sign in to comment.