Skip to content

Commit

Permalink
refactor: separate metrics code for reuse
Browse files Browse the repository at this point in the history
This PR is intended as a pure refactor to move code.

It separates the reusable metrics code into something that could be
reused in a separate module.

This intention is proved in a separate PR which adds metrics to the
server.

The `version` metric is moved to also be reusable as it is simple and
obvious to be reused in the server.

Labels are renamed to Attrib(utes) to match OpenTelemetry.

Signed-off-by: Alan Clucas <[email protected]>
  • Loading branch information
Joibel committed Aug 29, 2024
1 parent fed83ca commit a9ce710
Show file tree
Hide file tree
Showing 41 changed files with 775 additions and 594 deletions.
43 changes: 43 additions & 0 deletions util/telemetry/attributes.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package telemetry

const (
AttribBuildVersion string = `version`
AttribBuildPlatform string = `platform`
AttribBuildGoVersion string = `go_version`
AttribBuildDate string = `build_date`
AttribBuildCompiler string = `compiler`
AttribBuildGitCommit string = `git_commit`
AttribBuildGitTreeState string = `git_treestate`
AttribBuildGitTag string = `git_tag`

AttribCronWFName string = `name`

AttribErrorCause string = "cause"

AttribLogLevel string = `level`

AttribNodePhase string = `node_phase`

AttribPodPhase string = `phase`
AttribPodNamespace string = `namespace`
AttribPodPendingReason string = `reason`

AttribQueueName string = `queue_name`

AttribRecentlyStarted string = `recently_started`

AttribRequestKind = `kind`
AttribRequestVerb = `verb`
AttribRequestCode = `status_code`

AttribTemplateName string = `name`
AttribTemplateNamespace string = `namespace`
AttribTemplateCluster string = `cluster_scope`

AttribWorkerType string = `worker_type`

AttribWorkflowNamespace string = `namespace`
AttribWorkflowPhase string = `phase`
AttribWorkflowStatus = `status`
AttribWorkflowType = `type`
)
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package metrics
package telemetry

import (
"context"
Expand All @@ -20,8 +20,8 @@ import (
)

const (
defaultPrometheusServerPort = 9090
defaultPrometheusServerPath = "/metrics"
DefaultPrometheusServerPort = 9090
DefaultPrometheusServerPath = "/metrics"
)

func (config *Config) prometheusMetricsExporter(namespace string) (*prometheus.Exporter, error) {
Expand All @@ -39,14 +39,14 @@ func (config *Config) prometheusMetricsExporter(namespace string) (*prometheus.E

func (config *Config) path() string {
if config.Path == "" {
return defaultPrometheusServerPath
return DefaultPrometheusServerPath
}
return config.Path
}

func (config *Config) port() int {
if config.Port == 0 {
return defaultPrometheusServerPort
return DefaultPrometheusServerPort
}
return config.Port
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//go:build !windows

package metrics
package telemetry

import (
"context"
Expand All @@ -14,19 +14,22 @@ import (
"github.com/stretchr/testify/require"
)

// testScopeName is the name that the metrics running under test will have
const testScopeName string = "argo-workflows-test"

func TestDisablePrometheusServer(t *testing.T) {
config := Config{
Enabled: false,
Path: defaultPrometheusServerPath,
Port: defaultPrometheusServerPort,
Path: DefaultPrometheusServerPath,
Port: DefaultPrometheusServerPort,
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
m, err := New(ctx, TestScopeName, &config, Callbacks{})
m, err := NewMetrics(ctx, testScopeName, testScopeName, &config)
require.NoError(t, err)
go m.RunPrometheusServer(ctx, false)
time.Sleep(1 * time.Second) // to confirm that the server doesn't start, even if we wait
resp, err := http.Get(fmt.Sprintf("http://localhost:%d%s", defaultPrometheusServerPort, defaultPrometheusServerPath))
resp, err := http.Get(fmt.Sprintf("http://localhost:%d%s", DefaultPrometheusServerPort, DefaultPrometheusServerPath))
if resp != nil {
defer resp.Body.Close()
}
Expand All @@ -37,16 +40,16 @@ func TestDisablePrometheusServer(t *testing.T) {
func TestPrometheusServer(t *testing.T) {
config := Config{
Enabled: true,
Path: defaultPrometheusServerPath,
Port: defaultPrometheusServerPort,
Path: DefaultPrometheusServerPath,
Port: DefaultPrometheusServerPort,
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
m, err := New(ctx, TestScopeName, &config, Callbacks{})
m, err := NewMetrics(ctx, testScopeName, testScopeName, &config)
require.NoError(t, err)
go m.RunPrometheusServer(ctx, false)
time.Sleep(1 * time.Second)
resp, err := http.Get(fmt.Sprintf("http://localhost:%d%s", defaultPrometheusServerPort, defaultPrometheusServerPath))
resp, err := http.Get(fmt.Sprintf("http://localhost:%d%s", DefaultPrometheusServerPort, DefaultPrometheusServerPath))
require.NoError(t, err)
assert.Equal(t, http.StatusOK, resp.StatusCode)

Expand All @@ -62,17 +65,17 @@ func TestPrometheusServer(t *testing.T) {
func TestDummyPrometheusServer(t *testing.T) {
config := Config{
Enabled: true,
Path: defaultPrometheusServerPath,
Port: defaultPrometheusServerPort,
Path: DefaultPrometheusServerPath,
Port: DefaultPrometheusServerPort,
Secure: false,
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
m, err := New(ctx, TestScopeName, &config, Callbacks{})
m, err := NewMetrics(ctx, testScopeName, testScopeName, &config)
require.NoError(t, err)
go m.RunPrometheusServer(ctx, true)
time.Sleep(1 * time.Second)
resp, err := http.Get(fmt.Sprintf("http://localhost:%d%s", defaultPrometheusServerPort, defaultPrometheusServerPath))
resp, err := http.Get(fmt.Sprintf("http://localhost:%d%s", DefaultPrometheusServerPort, DefaultPrometheusServerPath))
require.NoError(t, err)
assert.Equal(t, http.StatusOK, resp.StatusCode)

Expand Down
65 changes: 65 additions & 0 deletions util/telemetry/helpers_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package telemetry

import (
"context"

"go.opentelemetry.io/otel/sdk/metric"
)

func createDefaultTestMetrics() (*Metrics, *TestMetricsExporter, error) {
config := Config{
Enabled: true,
}
return createTestMetrics(&config)
}

func createTestMetrics(config *Config) (*Metrics, *TestMetricsExporter, error) {
ctx /* with cancel*/ := context.Background()
te := NewTestMetricsExporter()

m, err := NewMetrics(ctx, TestScopeName, TestScopeName, config, metric.WithReader(te))
if err != nil {
return nil, nil, err
}
err = m.Populate(ctx, AddVersion, addTestingCounter, addTestingHistogram)
return m, te, err
}

const (
nameTestingHistogram = `testing_histogram`
nameTestingCounter = `testing_counter`
errorCauseTestingA = "TestingA"
errorCauseTestingB = "TestingB"
)

func addTestingHistogram(_ context.Context, m *Metrics) error {
// The buckets here are only the 'defaults' and can be overridden with configmap defaults
return m.CreateInstrument(Float64Histogram,
nameTestingHistogram,
"Testing Metric",
"s",
WithDefaultBuckets([]float64{0.0, 1.0, 5.0, 10.0}),
WithAsBuiltIn(),
)
}

func (m *Metrics) TestingHistogramRecord(ctx context.Context, value float64) {
m.Record(ctx, nameTestingHistogram, value, InstAttribs{})
}

func addTestingCounter(ctx context.Context, m *Metrics) error {
return m.CreateInstrument(Int64Counter,
nameTestingCounter,
"Testing Error Counting Metric",
"{errors}",
WithAsBuiltIn(),
)
}

func (m *Metrics) TestingErrorA(ctx context.Context) {
m.AddInt(ctx, nameTestingCounter, 1, InstAttribs{{Name: AttribErrorCause, Value: errorCauseTestingB}})
}

func (m *Metrics) TestingErrorB(ctx context.Context) {
m.AddInt(ctx, nameTestingCounter, 1, InstAttribs{{Name: AttribErrorCause, Value: errorCauseTestingB}})
}
66 changes: 43 additions & 23 deletions workflow/metrics/instrument.go → util/telemetry/instrument.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package metrics
package telemetry

import (
"fmt"
Expand All @@ -9,15 +9,15 @@ import (
"github.com/argoproj/argo-workflows/v3/util/help"
)

type instrument struct {
type Instrument struct {
name string
description string
otel interface{}
userdata interface{}
}

func (m *Metrics) preCreateCheck(name string) error {
if _, exists := m.allInstruments[name]; exists {
if _, exists := m.AllInstruments[name]; exists {
return fmt.Errorf("Instrument called %s already exists", name)
}
return nil
Expand All @@ -30,13 +30,13 @@ func addHelpLink(name, description string) string {
type instrumentType int

const (
float64ObservableGauge instrumentType = iota
float64Histogram
float64UpDownCounter
float64ObservableUpDownCounter
int64ObservableGauge
int64UpDownCounter
int64Counter
Float64ObservableGauge instrumentType = iota
Float64Histogram
Float64UpDownCounter
Float64ObservableUpDownCounter
Int64ObservableGauge
Int64UpDownCounter
Int64Counter
)

// InstrumentOption applies options to all instruments.
Expand All @@ -47,13 +47,13 @@ type instrumentOptions struct {

type instrumentOption func(*instrumentOptions)

func withAsBuiltIn() instrumentOption {
func WithAsBuiltIn() instrumentOption {
return func(o *instrumentOptions) {
o.builtIn = true
}
}

func withDefaultBuckets(buckets []float64) instrumentOption {
func WithDefaultBuckets(buckets []float64) instrumentOption {
return func(o *instrumentOptions) {
o.defaultBuckets = buckets
}
Expand All @@ -67,10 +67,10 @@ func collectOptions(options ...instrumentOption) instrumentOptions {
return o
}

func (m *Metrics) createInstrument(instType instrumentType, name, desc, unit string, options ...instrumentOption) error {
func (m *Metrics) CreateInstrument(instType instrumentType, name, desc, unit string, options ...instrumentOption) error {
opts := collectOptions(options...)
m.mutex.Lock()
defer m.mutex.Unlock()
m.Mutex.Lock()
defer m.Mutex.Unlock()
err := m.preCreateCheck(name)
if err != nil {
return err
Expand All @@ -81,50 +81,50 @@ func (m *Metrics) createInstrument(instType instrumentType, name, desc, unit str
}
var instPtr interface{}
switch instType {
case float64ObservableGauge:
case Float64ObservableGauge:
inst, insterr := (*m.otelMeter).Float64ObservableGauge(name,
metric.WithDescription(desc),
metric.WithUnit(unit),
)
instPtr = &inst
err = insterr
case float64Histogram:
case Float64Histogram:
inst, insterr := (*m.otelMeter).Float64Histogram(name,
metric.WithDescription(desc),
metric.WithUnit(unit),
metric.WithExplicitBucketBoundaries(m.buckets(name, opts.defaultBuckets)...),
)
instPtr = &inst
err = insterr
case float64UpDownCounter:
case Float64UpDownCounter:
inst, insterr := (*m.otelMeter).Float64UpDownCounter(name,
metric.WithDescription(desc),
metric.WithUnit(unit),
)
instPtr = &inst
err = insterr
case float64ObservableUpDownCounter:
case Float64ObservableUpDownCounter:
inst, insterr := (*m.otelMeter).Float64ObservableUpDownCounter(name,
metric.WithDescription(desc),
metric.WithUnit(unit),
)
instPtr = &inst
err = insterr
case int64ObservableGauge:
case Int64ObservableGauge:
inst, insterr := (*m.otelMeter).Int64ObservableGauge(name,
metric.WithDescription(desc),
metric.WithUnit(unit),
)
instPtr = &inst
err = insterr
case int64UpDownCounter:
case Int64UpDownCounter:
inst, insterr := (*m.otelMeter).Int64UpDownCounter(name,
metric.WithDescription(desc),
metric.WithUnit(unit),
)
instPtr = &inst
err = insterr
case int64Counter:
case Int64Counter:
inst, insterr := (*m.otelMeter).Int64Counter(name,
metric.WithDescription(desc),
metric.WithUnit(unit),
Expand All @@ -137,7 +137,7 @@ func (m *Metrics) createInstrument(instType instrumentType, name, desc, unit str
if err != nil {
return err
}
m.allInstruments[name] = &instrument{
m.AllInstruments[name] = &Instrument{
name: name,
description: desc,
otel: instPtr,
Expand All @@ -155,3 +155,23 @@ func (m *Metrics) buckets(name string, defaultBuckets []float64) []float64 {
}
return defaultBuckets
}

func (i *Instrument) GetName() string {
return i.name
}

func (i *Instrument) GetDescription() string {
return i.description
}

func (i *Instrument) GetOtel() interface{} {
return i.otel
}

func (i *Instrument) SetUserdata(data interface{}) {
i.userdata = data
}

func (i *Instrument) GetUserdata() interface{} {
return i.userdata
}
Loading

0 comments on commit a9ce710

Please sign in to comment.