From 790306ac22b17dfd0ff889da4a5dc22cb4753765 Mon Sep 17 00:00:00 2001 From: Nicholas Wiersma Date: Sat, 4 Jun 2022 17:52:14 +0200 Subject: [PATCH] feat: support victoria metrics (#67) --- go.mod | 1 + go.sum | 4 + reporter/victoriametrics/victoria_metrics.go | 185 ++++++++++++++++++ .../victoriametrics/victoria_metrics_test.go | 121 ++++++++++++ 4 files changed, 311 insertions(+) create mode 100644 reporter/victoriametrics/victoria_metrics.go create mode 100644 reporter/victoriametrics/victoria_metrics_test.go diff --git a/go.mod b/go.mod index 9048891..9afb387 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/hamba/statter/v2 go 1.16 require ( + github.com/VictoriaMetrics/metrics v1.18.1 github.com/cactus/go-statsd-client/v5 v5.0.0 github.com/hamba/logger/v2 v2.3.0 github.com/prometheus/client_golang v1.12.2 diff --git a/go.sum b/go.sum index 51e985c..78f1843 100644 --- a/go.sum +++ b/go.sum @@ -33,6 +33,8 @@ cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9 dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= +github.com/VictoriaMetrics/metrics v1.18.1 h1:OZ0+kTTto8oPfHnVAnTOoyl0XlRhRkoQrD2n2cOuRw0= +github.com/VictoriaMetrics/metrics v1.18.1/go.mod h1:ArjwVz7WpgpegX/JpB0zpNF2h2232kErkEnzH1sxMmA= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= @@ -201,6 +203,8 @@ github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMT github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/valyala/fastrand v1.1.0 h1:f+5HkLW4rsgzdNoleUOB69hyT9IlD2ZQh9GyDMfb5G8= github.com/valyala/fastrand v1.1.0/go.mod h1:HWqCzkrkg6QXT8V2EXWvXCoow7vLwOFN002oeRzjapQ= +github.com/valyala/histogram v1.2.0 h1:wyYGAZZt3CpwUiIb9AU/Zbllg1llXyrtApRS815OLoQ= +github.com/valyala/histogram v1.2.0/go.mod h1:Hb4kBwb4UxsaNbbbh+RRz8ZR6pdodR57tzWUS3BUzXY= github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= diff --git a/reporter/victoriametrics/victoria_metrics.go b/reporter/victoriametrics/victoria_metrics.go new file mode 100644 index 0000000..ef8071c --- /dev/null +++ b/reporter/victoriametrics/victoria_metrics.go @@ -0,0 +1,185 @@ +// Package victoriametrics implements an victoria metrics stats reporter. +package victoriametrics + +import ( + "math" + "net/http" + "sort" + "strings" + "sync" + "sync/atomic" + "time" + + "github.com/VictoriaMetrics/metrics" + "github.com/hamba/statter/v2/internal/bytes" +) + +// VictoriaMetrics is a victoria metrics stats reporter. +type VictoriaMetrics struct { + fqn *fqn + + mu sync.RWMutex + gauges map[string]*gauge + + set *metrics.Set +} + +// New returns a new victoria metrics reporter. +func New() *VictoriaMetrics { + fqn := newFQN() + + return &VictoriaMetrics{ + fqn: fqn, + set: metrics.NewSet(), + gauges: map[string]*gauge{}, + } +} + +// Handler gets the victoria metrics HTTP handler. +func (m *VictoriaMetrics) Handler() http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + m.set.WritePrometheus(w) + }) +} + +// Counter reports a counter value. +func (m *VictoriaMetrics) Counter(name string, v int64, tags [][2]string) { + lbls := formatTags(tags, m.fqn) + key := createKey(name, lbls, m.fqn) + + c := m.set.GetOrCreateCounter(key) + + c.Add(int(v)) +} + +type gauge struct { + val uint64 +} + +func (g *gauge) Get() float64 { + v := atomic.LoadUint64(&g.val) + return math.Float64frombits(v) +} + +func (g *gauge) Set(v float64) { + atomic.StoreUint64(&g.val, math.Float64bits(v)) +} + +// Gauge reports a gauge value. +func (m *VictoriaMetrics) Gauge(name string, v float64, tags [][2]string) { + lbls := formatTags(tags, m.fqn) + key := createKey(name, lbls, m.fqn) + + if m.setExistingGauge(key, v) { + return + } + + m.mu.Lock() + defer m.mu.Unlock() + + // Double check that it was not added while we queued for the lock. + g, ok := m.gauges[key] + if ok { + g.Set(v) + return + } + + g = &gauge{} + m.gauges[key] = g + + m.set.NewGauge(key, g.Get) + + g.Set(v) +} + +func (m *VictoriaMetrics) setExistingGauge(key string, v float64) bool { + m.mu.RLock() + defer m.mu.RUnlock() + + g, ok := m.gauges[key] + if ok { + g.Set(v) + return true + } + return false +} + +// Histogram reports a histogram value. +func (m *VictoriaMetrics) Histogram(name string, tags [][2]string) func(v float64) { + lbls := formatTags(tags, m.fqn) + key := createKey(name, lbls, m.fqn) + + h := m.set.GetOrCreateHistogram(key) + + return func(v float64) { + h.Update(v) + } +} + +// Timing reports a timing value as a histogram in seconds. +func (m *VictoriaMetrics) Timing(name string, tags [][2]string) func(v time.Duration) { + lbls := formatTags(tags, m.fqn) + key := createKey(name, lbls, m.fqn) + + h := m.set.GetOrCreateHistogram(key) + + return func(v time.Duration) { + h.Update(float64(v) / float64(time.Second)) + } +} + +// Close closes the client and flushes buffered stats, if applicable. +func (m *VictoriaMetrics) Close() error { + return nil +} + +// createKey creates a unique metric key. +func createKey(name, lbls string, fqn *fqn) string { + if lbls == "" { + return fqn.Format(name) + } + return fqn.Format(name) + "{" + lbls + "}" +} + +var pool = bytes.NewPool(512) + +// formatTags create a prometheus Label map from tags. +func formatTags(tags [][2]string, fqn *fqn) string { + if len(tags) == 0 { + return "" + } + + sort.Slice(tags, func(i, j int) bool { + return tags[i][0] < tags[j][0] + }) + + buf := pool.Get() + for i, tag := range tags { + if i > 0 { + buf.WriteByte(',') + } + buf.WriteString(fqn.Format(tag[0])) + buf.WriteByte('=') + buf.WriteByte('"') + buf.WriteString(tag[1]) + buf.WriteByte('"') + } + + s := string(buf.Bytes()) + pool.Put(buf) + return s +} + +type fqn struct { + r *strings.Replacer +} + +func newFQN() *fqn { + return &fqn{ + r: strings.NewReplacer(".", "_", "-", "_"), + } +} + +func (f *fqn) Format(name string) string { + return f.r.Replace(name) +} diff --git a/reporter/victoriametrics/victoria_metrics_test.go b/reporter/victoriametrics/victoria_metrics_test.go new file mode 100644 index 0000000..dcfe00f --- /dev/null +++ b/reporter/victoriametrics/victoria_metrics_test.go @@ -0,0 +1,121 @@ +package victoriametrics_test + +import ( + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/hamba/statter/v2" + "github.com/hamba/statter/v2/reporter/victoriametrics" + "github.com/stretchr/testify/assert" +) + +func TestVictoriaMetrics_Handler(t *testing.T) { + p := victoriametrics.New() + t.Cleanup(func() { _ = p.Close() }) + + h := p.Handler() + + assert.Implements(t, (*http.Handler)(nil), h) +} + +func TestNew(t *testing.T) { + p := victoriametrics.New() + t.Cleanup(func() { _ = p.Close() }) + + assert.Implements(t, (*statter.Reporter)(nil), p) + assert.Implements(t, (*statter.HistogramReporter)(nil), p) + assert.Implements(t, (*statter.TimingReporter)(nil), p) +} + +func TestVictoriaMetrics_Counter(t *testing.T) { + p := victoriametrics.New() + t.Cleanup(func() { _ = p.Close() }) + + p.Counter("test.test.test", 2, [][2]string{{"test", "test"}, {"foo", "bar"}}) + + rr := httptest.NewRecorder() + req := httptest.NewRequest("GET", "/metrics", nil) + p.Handler().ServeHTTP(rr, req) + + assert.Contains(t, rr.Body.String(), "test_test_test{foo=\"bar\",test=\"test\"} 2") +} + +func TestVictoriaMetrics_Gauge(t *testing.T) { + p := victoriametrics.New() + t.Cleanup(func() { _ = p.Close() }) + + p.Gauge("test.test.test", 2.1, [][2]string{{"foo", "bar"}}) + + rr := httptest.NewRecorder() + req := httptest.NewRequest("GET", "/metrics", nil) + p.Handler().ServeHTTP(rr, req) + + assert.Contains(t, rr.Body.String(), "test_test_test{foo=\"bar\"} 2.1") +} + +func TestVictoriaMetrics_Histogram(t *testing.T) { + p := victoriametrics.New() + t.Cleanup(func() { _ = p.Close() }) + + p.Histogram("test.test.test", [][2]string{{"foo", "bar"}})(0.0123) + + rr := httptest.NewRecorder() + req := httptest.NewRequest("GET", "/metrics", nil) + p.Handler().ServeHTTP(rr, req) + + assert.Contains(t, rr.Body.String(), "test_test_test_bucket{foo=\"bar\",vmrange=\"1.136e-02...1.292e-02\"} 1") + assert.Contains(t, rr.Body.String(), "test_test_test_sum{foo=\"bar\"} 0.0123") + assert.Contains(t, rr.Body.String(), "test_test_test_count{foo=\"bar\"} 1") +} + +func TestVictoriaMetrics_Timing(t *testing.T) { + p := victoriametrics.New() + t.Cleanup(func() { _ = p.Close() }) + + p.Timing("test.test.test", [][2]string{{"foo", "bar"}})(1234500 * time.Nanosecond) + + rr := httptest.NewRecorder() + req := httptest.NewRequest("GET", "/metrics", nil) + p.Handler().ServeHTTP(rr, req) + + assert.Contains(t, rr.Body.String(), "test_test_test_bucket{foo=\"bar\",vmrange=\"1.136e-03...1.292e-03\"} 1") + assert.Contains(t, rr.Body.String(), "test_test_test_sum{foo=\"bar\"} 0.0012345") + assert.Contains(t, rr.Body.String(), "test_test_test_count{foo=\"bar\"} 1") +} + +func TestVictoriaMetrics_ConvertsLabels(t *testing.T) { + p := victoriametrics.New() + t.Cleanup(func() { _ = p.Close() }) + + p.Counter("foo.bar.baz", 2, [][2]string{{"test-label", "test"}, {"a", "b"}}) + + rr := httptest.NewRecorder() + req := httptest.NewRequest("GET", "/metrics", nil) + p.Handler().ServeHTTP(rr, req) + + assert.Contains(t, rr.Body.String(), "foo_bar_baz{a=\"b\",test_label=\"test\"} 2") +} + +func TestVictoriaMetrics_NoTags(t *testing.T) { + p := victoriametrics.New() + t.Cleanup(func() { _ = p.Close() }) + + p.Counter("test", 2, [][2]string{}) + + rr := httptest.NewRecorder() + req := httptest.NewRequest("GET", "/metrics", nil) + p.Handler().ServeHTTP(rr, req) + + assert.Contains(t, rr.Body.String(), "test 2") +} + +func TestVictoriaMetrics_Close(t *testing.T) { + p := victoriametrics.New() + t.Cleanup(func() { _ = p.Close() }) + + err := p.Close() + + assert.NoError(t, err) +}