Skip to content

Commit

Permalink
Instrument writers, secondary registry for local metrics (#204)
Browse files Browse the repository at this point in the history
  • Loading branch information
hoenn authored Aug 8, 2019
1 parent cfeb60f commit 7dadc14
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 9 deletions.
6 changes: 3 additions & 3 deletions cmd/do-agent/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,13 +147,13 @@ func checkConfig() error {
return nil
}

func initWriter() (metricWriter, limiter) {
func initWriter(wc *prometheus.CounterVec) (metricWriter, limiter) {
if config.stdoutOnly {
return writer.NewFile(os.Stdout), &constThrottler{wait: 10 * time.Second}
return writer.NewFile(os.Stdout, wc), &constThrottler{wait: 10 * time.Second}
}

tsc := newTimeseriesClient()
return writer.NewSonar(tsc), tsc
return writer.NewSonar(tsc, wc), tsc
}

func initDecorator() decorate.Chain {
Expand Down
8 changes: 6 additions & 2 deletions cmd/do-agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,20 @@ func main() {
reg.MustRegister(cols...)

if config.webListen {
//Create a secondary registry for local only metrics
localReg := prometheus.NewRegistry()
localCols := append(cols, metricWriterDiagnostics)
localReg.MustRegister(localCols...)
go func() {
http.Handle("/", promhttp.HandlerFor(reg, promhttp.HandlerOpts{}))
http.Handle("/", promhttp.HandlerFor(localReg, promhttp.HandlerOpts{}))
err := http.ListenAndServe(config.webListenAddress, nil)
if err != nil {
log.Error("failed to init HTTP listener: %+v", err.Error())
}
}()
}

w, th := initWriter()
w, th := initWriter(metricWriterDiagnostics)
d := initDecorator()
aggregateSpecs := initAggregatorSpecs()

Expand Down
16 changes: 14 additions & 2 deletions cmd/do-agent/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,28 @@ import (
)

const (
diagnosticMetricName = "sonar_diagnostic"
diagnosticMetricName = "sonar_diagnostic"
metricWriterDiagnosticsName = "metric_writes"
)

var (
//ErrAggregationFailed is the error msg for failed aggregation
ErrAggregationFailed = fmt.Errorf("metric aggregation failed")
diagnosticMetric = prometheus.NewCounterVec(prometheus.CounterOpts{

diagnosticMetric = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "",
Name: diagnosticMetricName,
Help: "do-agent diagnostic information",
}, []string{"error"})

metricWriterDiagnostics = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "",
Name: metricWriterDiagnosticsName,
Help: "Total successes and failures of metric writers",
},
[]string{"writer", "result", "reason"},
)
)

type metricWriter interface {
Expand Down
7 changes: 6 additions & 1 deletion pkg/writer/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,23 @@ import (
"sync"

"github.com/digitalocean/do-agent/pkg/aggregate"
"github.com/prometheus/client_golang/prometheus"
)

// File writes metrics to an io.Writer
type File struct {
w io.Writer
m *sync.Mutex
c *prometheus.CounterVec
}

// NewFile creates a new File writer with the provided writer
func NewFile(w io.Writer) *File {
func NewFile(w io.Writer, c *prometheus.CounterVec) *File {
c = c.MustCurryWith(prometheus.Labels{"writer": "file"})
return &File{
w: w,
m: new(sync.Mutex),
c: c,
}
}

Expand All @@ -29,6 +33,7 @@ func (w *File) Write(mets []aggregate.MetricWithValue) error {
for _, met := range mets {
fmt.Fprintf(w.w, "[%s]: %v: %v\n", met.LFM["__name__"], met.LFM, met.Value)
}
w.c.WithLabelValues("success", "").Inc()
return nil
}

Expand Down
11 changes: 10 additions & 1 deletion pkg/writer/sonar.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/digitalocean/do-agent/internal/log"
"github.com/digitalocean/do-agent/pkg/aggregate"
"github.com/digitalocean/do-agent/pkg/clients/tsclient"
"github.com/prometheus/client_golang/prometheus"

"github.com/pkg/errors"
)
Expand All @@ -26,30 +27,36 @@ var (
type Sonar struct {
client tsclient.Client
firstWriteSent bool
c *prometheus.CounterVec
}

// NewSonar creates a new Sonar writer
func NewSonar(client tsclient.Client) *Sonar {
func NewSonar(client tsclient.Client, c *prometheus.CounterVec) *Sonar {
c = c.MustCurryWith(prometheus.Labels{"writer": "sonar"})
return &Sonar{
client: client,
firstWriteSent: false,
c: c,
}
}

// Write writes the metrics to Sonar and returns the amount of time to wait
// before the next write
func (s *Sonar) Write(mets []aggregate.MetricWithValue) error {
if len(mets) > s.client.MaxBatchSize() {
s.c.WithLabelValues("failure", "too many metrics").Inc()
return errors.Wrap(ErrTooManyMetrics, "cannot write metrics")
}

for _, m := range mets {
lfmEncoded := tsclient.ConvertLFMMapToPrometheusEncodedName(m.LFM)
if len(lfmEncoded) > s.client.MaxMetricLength() {
s.c.WithLabelValues("failure", "metric exceeds max length").Inc()
return errors.Wrapf(ErrMetricTooLong, "cannot send metric: %q", lfmEncoded)
}
err := s.client.AddMetric(tsclient.NewDefinitionFromMap(m.LFM), m.Value)
if err != nil {
s.c.WithLabelValues("failure", "could not add metric to batch").Inc()
return err
}
}
Expand All @@ -62,9 +69,11 @@ func (s *Sonar) Write(mets []aggregate.MetricWithValue) error {
s.firstWriteSent = true

if err == nil {
s.c.WithLabelValues("success", "").Inc()
return nil
}

s.c.WithLabelValues("failure", "failed to flush").Inc()
log.Error("failed to flush: %+v", err)
return ErrFlushFailure
}
Expand Down

0 comments on commit 7dadc14

Please sign in to comment.