diff --git a/util/telemetry/attributes.go b/util/telemetry/attributes.go new file mode 100644 index 000000000000..fad80c8bec07 --- /dev/null +++ b/util/telemetry/attributes.go @@ -0,0 +1,43 @@ +package telemetry + +const ( + AttribBuildVersion string = `version` + AttribBuildPlatform string = `platform` + AttribBuildGoVersion string = `go_version` + AttribBuildDate string = `build_date` + AttribBuildCompiler string = `compiler` + AttribBuildGitCommit string = `git_commit` + AttribBuildGitTreeState string = `git_treestate` + AttribBuildGitTag string = `git_tag` + + AttribCronWFName string = `name` + + AttribErrorCause string = "cause" + + AttribLogLevel string = `level` + + AttribNodePhase string = `node_phase` + + AttribPodPhase string = `phase` + AttribPodNamespace string = `namespace` + AttribPodPendingReason string = `reason` + + AttribQueueName string = `queue_name` + + AttribRecentlyStarted string = `recently_started` + + AttribRequestKind = `kind` + AttribRequestVerb = `verb` + AttribRequestCode = `status_code` + + AttribTemplateName string = `name` + AttribTemplateNamespace string = `namespace` + AttribTemplateCluster string = `cluster_scope` + + AttribWorkerType string = `worker_type` + + AttribWorkflowNamespace string = `namespace` + AttribWorkflowPhase string = `phase` + AttribWorkflowStatus = `status` + AttribWorkflowType = `type` +) diff --git a/workflow/metrics/exporter_prometheus.go b/util/telemetry/exporter_prometheus.go similarity index 95% rename from workflow/metrics/exporter_prometheus.go rename to util/telemetry/exporter_prometheus.go index cbea7d80de56..1ed45a90c91e 100644 --- a/workflow/metrics/exporter_prometheus.go +++ b/util/telemetry/exporter_prometheus.go @@ -1,4 +1,4 @@ -package metrics +package telemetry import ( "context" @@ -20,8 +20,8 @@ import ( ) const ( - defaultPrometheusServerPort = 9090 - defaultPrometheusServerPath = "/metrics" + DefaultPrometheusServerPort = 9090 + DefaultPrometheusServerPath = "/metrics" ) func (config *Config) prometheusMetricsExporter(namespace string) (*prometheus.Exporter, error) { @@ -39,14 +39,14 @@ func (config *Config) prometheusMetricsExporter(namespace string) (*prometheus.E func (config *Config) path() string { if config.Path == "" { - return defaultPrometheusServerPath + return DefaultPrometheusServerPath } return config.Path } func (config *Config) port() int { if config.Port == 0 { - return defaultPrometheusServerPort + return DefaultPrometheusServerPort } return config.Port } diff --git a/workflow/metrics/exporter_prometheus_test.go b/util/telemetry/exporter_prometheus_test.go similarity index 64% rename from workflow/metrics/exporter_prometheus_test.go rename to util/telemetry/exporter_prometheus_test.go index c80a3aa45057..5106fc354251 100644 --- a/workflow/metrics/exporter_prometheus_test.go +++ b/util/telemetry/exporter_prometheus_test.go @@ -1,6 +1,6 @@ //go:build !windows -package metrics +package telemetry import ( "context" @@ -14,19 +14,22 @@ import ( "github.com/stretchr/testify/require" ) +// testScopeName is the name that the metrics running under test will have +const testScopeName string = "argo-workflows-test" + func TestDisablePrometheusServer(t *testing.T) { config := Config{ Enabled: false, - Path: defaultPrometheusServerPath, - Port: defaultPrometheusServerPort, + Path: DefaultPrometheusServerPath, + Port: DefaultPrometheusServerPort, } ctx, cancel := context.WithCancel(context.Background()) defer cancel() - m, err := New(ctx, TestScopeName, &config, Callbacks{}) + m, err := NewMetrics(ctx, testScopeName, testScopeName, &config) require.NoError(t, err) go m.RunPrometheusServer(ctx, false) time.Sleep(1 * time.Second) // to confirm that the server doesn't start, even if we wait - resp, err := http.Get(fmt.Sprintf("http://localhost:%d%s", defaultPrometheusServerPort, defaultPrometheusServerPath)) + resp, err := http.Get(fmt.Sprintf("http://localhost:%d%s", DefaultPrometheusServerPort, DefaultPrometheusServerPath)) if resp != nil { defer resp.Body.Close() } @@ -37,16 +40,16 @@ func TestDisablePrometheusServer(t *testing.T) { func TestPrometheusServer(t *testing.T) { config := Config{ Enabled: true, - Path: defaultPrometheusServerPath, - Port: defaultPrometheusServerPort, + Path: DefaultPrometheusServerPath, + Port: DefaultPrometheusServerPort, } ctx, cancel := context.WithCancel(context.Background()) defer cancel() - m, err := New(ctx, TestScopeName, &config, Callbacks{}) + m, err := NewMetrics(ctx, testScopeName, testScopeName, &config) require.NoError(t, err) go m.RunPrometheusServer(ctx, false) time.Sleep(1 * time.Second) - resp, err := http.Get(fmt.Sprintf("http://localhost:%d%s", defaultPrometheusServerPort, defaultPrometheusServerPath)) + resp, err := http.Get(fmt.Sprintf("http://localhost:%d%s", DefaultPrometheusServerPort, DefaultPrometheusServerPath)) require.NoError(t, err) assert.Equal(t, http.StatusOK, resp.StatusCode) @@ -62,17 +65,17 @@ func TestPrometheusServer(t *testing.T) { func TestDummyPrometheusServer(t *testing.T) { config := Config{ Enabled: true, - Path: defaultPrometheusServerPath, - Port: defaultPrometheusServerPort, + Path: DefaultPrometheusServerPath, + Port: DefaultPrometheusServerPort, Secure: false, } ctx, cancel := context.WithCancel(context.Background()) defer cancel() - m, err := New(ctx, TestScopeName, &config, Callbacks{}) + m, err := NewMetrics(ctx, testScopeName, testScopeName, &config) require.NoError(t, err) go m.RunPrometheusServer(ctx, true) time.Sleep(1 * time.Second) - resp, err := http.Get(fmt.Sprintf("http://localhost:%d%s", defaultPrometheusServerPort, defaultPrometheusServerPath)) + resp, err := http.Get(fmt.Sprintf("http://localhost:%d%s", DefaultPrometheusServerPort, DefaultPrometheusServerPath)) require.NoError(t, err) assert.Equal(t, http.StatusOK, resp.StatusCode) diff --git a/util/telemetry/helpers_test.go b/util/telemetry/helpers_test.go new file mode 100644 index 000000000000..2aedaed192d6 --- /dev/null +++ b/util/telemetry/helpers_test.go @@ -0,0 +1,65 @@ +package telemetry + +import ( + "context" + + "go.opentelemetry.io/otel/sdk/metric" +) + +func createDefaultTestMetrics() (*Metrics, *TestMetricsExporter, error) { + config := Config{ + Enabled: true, + } + return createTestMetrics(&config) +} + +func createTestMetrics(config *Config) (*Metrics, *TestMetricsExporter, error) { + ctx /* with cancel*/ := context.Background() + te := NewTestMetricsExporter() + + m, err := NewMetrics(ctx, TestScopeName, TestScopeName, config, metric.WithReader(te)) + if err != nil { + return nil, nil, err + } + err = m.Populate(ctx, AddVersion, addTestingCounter, addTestingHistogram) + return m, te, err +} + +const ( + nameTestingHistogram = `testing_histogram` + nameTestingCounter = `testing_counter` + errorCauseTestingA = "TestingA" + errorCauseTestingB = "TestingB" +) + +func addTestingHistogram(_ context.Context, m *Metrics) error { + // The buckets here are only the 'defaults' and can be overridden with configmap defaults + return m.CreateInstrument(Float64Histogram, + nameTestingHistogram, + "Testing Metric", + "s", + WithDefaultBuckets([]float64{0.0, 1.0, 5.0, 10.0}), + WithAsBuiltIn(), + ) +} + +func (m *Metrics) TestingHistogramRecord(ctx context.Context, value float64) { + m.Record(ctx, nameTestingHistogram, value, InstAttribs{}) +} + +func addTestingCounter(ctx context.Context, m *Metrics) error { + return m.CreateInstrument(Int64Counter, + nameTestingCounter, + "Testing Error Counting Metric", + "{errors}", + WithAsBuiltIn(), + ) +} + +func (m *Metrics) TestingErrorA(ctx context.Context) { + m.AddInt(ctx, nameTestingCounter, 1, InstAttribs{{Name: AttribErrorCause, Value: errorCauseTestingB}}) +} + +func (m *Metrics) TestingErrorB(ctx context.Context) { + m.AddInt(ctx, nameTestingCounter, 1, InstAttribs{{Name: AttribErrorCause, Value: errorCauseTestingB}}) +} diff --git a/workflow/metrics/instrument.go b/util/telemetry/instrument.go similarity index 73% rename from workflow/metrics/instrument.go rename to util/telemetry/instrument.go index 35ba4ea28acd..6831dd5be804 100644 --- a/workflow/metrics/instrument.go +++ b/util/telemetry/instrument.go @@ -1,4 +1,4 @@ -package metrics +package telemetry import ( "fmt" @@ -9,7 +9,7 @@ import ( "github.com/argoproj/argo-workflows/v3/util/help" ) -type instrument struct { +type Instrument struct { name string description string otel interface{} @@ -17,7 +17,7 @@ type instrument struct { } func (m *Metrics) preCreateCheck(name string) error { - if _, exists := m.allInstruments[name]; exists { + if _, exists := m.AllInstruments[name]; exists { return fmt.Errorf("Instrument called %s already exists", name) } return nil @@ -30,13 +30,13 @@ func addHelpLink(name, description string) string { type instrumentType int const ( - float64ObservableGauge instrumentType = iota - float64Histogram - float64UpDownCounter - float64ObservableUpDownCounter - int64ObservableGauge - int64UpDownCounter - int64Counter + Float64ObservableGauge instrumentType = iota + Float64Histogram + Float64UpDownCounter + Float64ObservableUpDownCounter + Int64ObservableGauge + Int64UpDownCounter + Int64Counter ) // InstrumentOption applies options to all instruments. @@ -47,13 +47,13 @@ type instrumentOptions struct { type instrumentOption func(*instrumentOptions) -func withAsBuiltIn() instrumentOption { +func WithAsBuiltIn() instrumentOption { return func(o *instrumentOptions) { o.builtIn = true } } -func withDefaultBuckets(buckets []float64) instrumentOption { +func WithDefaultBuckets(buckets []float64) instrumentOption { return func(o *instrumentOptions) { o.defaultBuckets = buckets } @@ -67,10 +67,10 @@ func collectOptions(options ...instrumentOption) instrumentOptions { return o } -func (m *Metrics) createInstrument(instType instrumentType, name, desc, unit string, options ...instrumentOption) error { +func (m *Metrics) CreateInstrument(instType instrumentType, name, desc, unit string, options ...instrumentOption) error { opts := collectOptions(options...) - m.mutex.Lock() - defer m.mutex.Unlock() + m.Mutex.Lock() + defer m.Mutex.Unlock() err := m.preCreateCheck(name) if err != nil { return err @@ -81,14 +81,14 @@ func (m *Metrics) createInstrument(instType instrumentType, name, desc, unit str } var instPtr interface{} switch instType { - case float64ObservableGauge: + case Float64ObservableGauge: inst, insterr := (*m.otelMeter).Float64ObservableGauge(name, metric.WithDescription(desc), metric.WithUnit(unit), ) instPtr = &inst err = insterr - case float64Histogram: + case Float64Histogram: inst, insterr := (*m.otelMeter).Float64Histogram(name, metric.WithDescription(desc), metric.WithUnit(unit), @@ -96,35 +96,35 @@ func (m *Metrics) createInstrument(instType instrumentType, name, desc, unit str ) instPtr = &inst err = insterr - case float64UpDownCounter: + case Float64UpDownCounter: inst, insterr := (*m.otelMeter).Float64UpDownCounter(name, metric.WithDescription(desc), metric.WithUnit(unit), ) instPtr = &inst err = insterr - case float64ObservableUpDownCounter: + case Float64ObservableUpDownCounter: inst, insterr := (*m.otelMeter).Float64ObservableUpDownCounter(name, metric.WithDescription(desc), metric.WithUnit(unit), ) instPtr = &inst err = insterr - case int64ObservableGauge: + case Int64ObservableGauge: inst, insterr := (*m.otelMeter).Int64ObservableGauge(name, metric.WithDescription(desc), metric.WithUnit(unit), ) instPtr = &inst err = insterr - case int64UpDownCounter: + case Int64UpDownCounter: inst, insterr := (*m.otelMeter).Int64UpDownCounter(name, metric.WithDescription(desc), metric.WithUnit(unit), ) instPtr = &inst err = insterr - case int64Counter: + case Int64Counter: inst, insterr := (*m.otelMeter).Int64Counter(name, metric.WithDescription(desc), metric.WithUnit(unit), @@ -137,7 +137,7 @@ func (m *Metrics) createInstrument(instType instrumentType, name, desc, unit str if err != nil { return err } - m.allInstruments[name] = &instrument{ + m.AllInstruments[name] = &Instrument{ name: name, description: desc, otel: instPtr, @@ -155,3 +155,23 @@ func (m *Metrics) buckets(name string, defaultBuckets []float64) []float64 { } return defaultBuckets } + +func (i *Instrument) GetName() string { + return i.name +} + +func (i *Instrument) GetDescription() string { + return i.description +} + +func (i *Instrument) GetOtel() interface{} { + return i.otel +} + +func (i *Instrument) SetUserdata(data interface{}) { + i.userdata = data +} + +func (i *Instrument) GetUserdata() interface{} { + return i.userdata +} diff --git a/util/telemetry/metrics.go b/util/telemetry/metrics.go new file mode 100644 index 000000000000..2a6be32c38fd --- /dev/null +++ b/util/telemetry/metrics.go @@ -0,0 +1,120 @@ +package telemetry + +import ( + "context" + "os" + "sync" + "time" + + log "github.com/sirupsen/logrus" + "go.opentelemetry.io/otel" + + wfconfig "github.com/argoproj/argo-workflows/v3/config" + + "go.opentelemetry.io/contrib/instrumentation/runtime" + "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc" + "go.opentelemetry.io/otel/metric" + metricsdk "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/resource" + semconv "go.opentelemetry.io/otel/semconv/v1.24.0" +) + +type Config struct { + Enabled bool + Path string + Port int + TTL time.Duration + IgnoreErrors bool + Secure bool + Modifiers map[string]Modifier + Temporality wfconfig.MetricsTemporality +} + +type Metrics struct { + // Ensures mutual exclusion in workflows map + Mutex sync.RWMutex + + // Evil context for compatibility with legacy context free interfaces + Ctx context.Context + otelMeter *metric.Meter + config *Config + + AllInstruments map[string]*Instrument +} + +func NewMetrics(ctx context.Context, serviceName, prometheusName string, config *Config, extraOpts ...metricsdk.Option) (*Metrics, error) { + res := resource.NewWithAttributes( + semconv.SchemaURL, + semconv.ServiceName(serviceName), + ) + + options := make([]metricsdk.Option, 0) + options = append(options, metricsdk.WithResource(res)) + _, otlpEnabled := os.LookupEnv(`OTEL_EXPORTER_OTLP_ENDPOINT`) + _, otlpMetricsEnabled := os.LookupEnv(`OTEL_EXPORTER_OTLP_METRICS_ENDPOINT`) + if otlpEnabled || otlpMetricsEnabled { + log.Info("Starting OTLP metrics exporter") + otelExporter, err := otlpmetricgrpc.New(ctx, otlpmetricgrpc.WithTemporalitySelector(getTemporality(config))) + if err != nil { + return nil, err + } + options = append(options, metricsdk.WithReader(metricsdk.NewPeriodicReader(otelExporter))) + } + + if config.Enabled { + log.Info("Starting Prometheus metrics exporter") + promExporter, err := config.prometheusMetricsExporter(prometheusName) + if err != nil { + return nil, err + } + options = append(options, metricsdk.WithReader(promExporter)) + } + options = append(options, extraOpts...) + options = append(options, view(config)) + + provider := metricsdk.NewMeterProvider(options...) + otel.SetMeterProvider(provider) + + // Add runtime metrics + err := runtime.Start(runtime.WithMinimumReadMemStatsInterval(time.Second)) + if err != nil { + return nil, err + } + + meter := provider.Meter(serviceName) + metrics := &Metrics{ + Ctx: ctx, + otelMeter: &meter, + config: config, + AllInstruments: make(map[string]*Instrument), + } + + return metrics, nil +} + +type AddMetric func(context.Context, *Metrics) error + +func (m *Metrics) Populate(ctx context.Context, adders ...AddMetric) error { + for _, adder := range adders { + if err := adder(ctx, m); err != nil { + return err + } + } + return nil +} + +func getTemporality(config *Config) metricsdk.TemporalitySelector { + switch config.Temporality { + case wfconfig.MetricsTemporalityCumulative: + return func(metricsdk.InstrumentKind) metricdata.Temporality { + return metricdata.CumulativeTemporality + } + case wfconfig.MetricsTemporalityDelta: + return func(metricsdk.InstrumentKind) metricdata.Temporality { + return metricdata.DeltaTemporality + } + default: + return metricsdk.DefaultTemporalitySelector + } +} diff --git a/workflow/metrics/modifiers.go b/util/telemetry/modifiers.go similarity index 98% rename from workflow/metrics/modifiers.go rename to util/telemetry/modifiers.go index 984c21867361..eff3ef04f8ca 100644 --- a/workflow/metrics/modifiers.go +++ b/util/telemetry/modifiers.go @@ -1,4 +1,4 @@ -package metrics +package telemetry import ( "go.opentelemetry.io/otel/attribute" diff --git a/workflow/metrics/modifiers_test.go b/util/telemetry/modifiers_test.go similarity index 64% rename from workflow/metrics/modifiers_test.go rename to util/telemetry/modifiers_test.go index 818432e35778..5fa8886566b3 100644 --- a/workflow/metrics/modifiers_test.go +++ b/util/telemetry/modifiers_test.go @@ -1,4 +1,4 @@ -package metrics +package telemetry import ( "context" @@ -13,17 +13,15 @@ func TestViewDisable(t *testing.T) { // Same metric as TestMetrics, but disabled by a view m, te, err := createTestMetrics(&Config{ Modifiers: map[string]Modifier{ - nameOperationDuration: { + nameTestingHistogram: { Disabled: true, }, }, - }, - Callbacks{}, - ) + }) require.NoError(t, err) - m.OperationCompleted(m.ctx, 5) + m.TestingHistogramRecord(m.Ctx, 5) attribs := attribute.NewSet() - _, err = te.GetFloat64HistogramData(nameOperationDuration, &attribs) + _, err = te.GetFloat64HistogramData(nameTestingHistogram, &attribs) require.Error(t, err) } @@ -31,24 +29,22 @@ func TestViewDisabledAttributes(t *testing.T) { // Disable the error cause label m, te, err := createTestMetrics(&Config{ Modifiers: map[string]Modifier{ - nameErrorCount: { - DisabledAttributes: []string{labelErrorCause}, + nameTestingCounter: { + DisabledAttributes: []string{AttribErrorCause}, }, }, - }, - Callbacks{}, - ) + }) require.NoError(t, err) // Submit a couple of errors - m.OperationPanic(context.Background()) - m.CronWorkflowSubmissionError(context.Background()) + m.TestingErrorA(context.Background()) + m.TestingErrorB(context.Background()) // See if we can find this with the attributes, we should not be able to - attribsFail := attribute.NewSet(attribute.String(labelErrorCause, string(ErrorCauseOperationPanic))) - _, err = te.GetInt64CounterValue(nameErrorCount, &attribsFail) + attribsFail := attribute.NewSet(attribute.String(AttribErrorCause, string(errorCauseTestingA))) + _, err = te.GetInt64CounterValue(nameTestingCounter, &attribsFail) require.Error(t, err) // Find a sum of all error types attribsSuccess := attribute.NewSet() - val, err := te.GetInt64CounterValue(nameErrorCount, &attribsSuccess) + val, err := te.GetInt64CounterValue(nameTestingCounter, &attribsSuccess) require.NoError(t, err) // Sum of the two submitted errors is 2 assert.Equal(t, int64(2), val) @@ -59,17 +55,15 @@ func TestViewHistogramBuckets(t *testing.T) { bounds := []float64{1.0, 3.0, 5.0, 10.0} m, te, err := createTestMetrics(&Config{ Modifiers: map[string]Modifier{ - nameOperationDuration: { + nameTestingHistogram: { HistogramBuckets: bounds, }, }, - }, - Callbacks{}, - ) + }) require.NoError(t, err) - m.OperationCompleted(m.ctx, 5) + m.TestingHistogramRecord(m.Ctx, 5) attribs := attribute.NewSet() - val, err := te.GetFloat64HistogramData(nameOperationDuration, &attribs) + val, err := te.GetFloat64HistogramData(nameTestingHistogram, &attribs) require.NoError(t, err) assert.Equal(t, bounds, val.Bounds) assert.Equal(t, []uint64{0, 0, 1, 0, 0}, val.BucketCounts) diff --git a/workflow/metrics/operators.go b/util/telemetry/operators.go similarity index 61% rename from workflow/metrics/operators.go rename to util/telemetry/operators.go index afe7b33c5459..23eb1f11f58d 100644 --- a/workflow/metrics/operators.go +++ b/util/telemetry/operators.go @@ -1,4 +1,4 @@ -package metrics +package telemetry import ( "context" @@ -9,15 +9,15 @@ import ( "go.opentelemetry.io/otel/metric" ) -func (m *Metrics) addInt(ctx context.Context, name string, val int64, labels instAttribs) { - if instrument, ok := m.allInstruments[name]; ok { - instrument.addInt(ctx, val, labels) +func (m *Metrics) AddInt(ctx context.Context, name string, val int64, labels InstAttribs) { + if instrument, ok := m.AllInstruments[name]; ok { + instrument.AddInt(ctx, val, labels) } else { log.Errorf("Metrics addInt() to non-existent metric %s", name) } } -func (i *instrument) addInt(ctx context.Context, val int64, labels instAttribs) { +func (i *Instrument) AddInt(ctx context.Context, val int64, labels InstAttribs) { switch inst := i.otel.(type) { case *metric.Int64UpDownCounter: (*inst).Add(ctx, val, i.attributes(labels)) @@ -28,15 +28,15 @@ func (i *instrument) addInt(ctx context.Context, val int64, labels instAttribs) } } -func (m *Metrics) record(ctx context.Context, name string, val float64, labels instAttribs) { - if instrument, ok := m.allInstruments[name]; ok { - instrument.record(ctx, val, labels) +func (m *Metrics) Record(ctx context.Context, name string, val float64, labels InstAttribs) { + if instrument, ok := m.AllInstruments[name]; ok { + instrument.Record(ctx, val, labels) } else { log.Errorf("Metrics record() to non-existent metric %s", name) } } -func (i *instrument) record(ctx context.Context, val float64, labels instAttribs) { +func (i *Instrument) Record(ctx context.Context, val float64, labels InstAttribs) { switch inst := i.otel.(type) { case *metric.Float64Histogram: (*inst).Record(ctx, val, i.attributes(labels)) @@ -45,7 +45,7 @@ func (i *instrument) record(ctx context.Context, val float64, labels instAttribs } } -func (i *instrument) registerCallback(m *Metrics, f metric.Callback) error { +func (i *Instrument) RegisterCallback(m *Metrics, f metric.Callback) error { switch inst := i.otel.(type) { case *metric.Float64ObservableUpDownCounter: _, err := (*m.otelMeter).RegisterCallback(f, *inst) @@ -61,7 +61,7 @@ func (i *instrument) registerCallback(m *Metrics, f metric.Callback) error { } } -func (i *instrument) observeInt(o metric.Observer, val int64, labels instAttribs) { +func (i *Instrument) ObserveInt(o metric.Observer, val int64, labels InstAttribs) { switch inst := i.otel.(type) { case *metric.Int64ObservableGauge: o.ObserveInt64(*inst, val, i.attributes(labels)) @@ -70,7 +70,7 @@ func (i *instrument) observeInt(o metric.Observer, val int64, labels instAttribs } } -func (i *instrument) observeFloat(o metric.Observer, val float64, labels instAttribs) { +func (i *Instrument) ObserveFloat(o metric.Observer, val float64, labels InstAttribs) { switch inst := i.otel.(type) { case *metric.Float64ObservableGauge: o.ObserveFloat64(*inst, val, i.attributes(labels)) @@ -81,26 +81,26 @@ func (i *instrument) observeFloat(o metric.Observer, val float64, labels instAtt } } -type instAttribs []instAttrib -type instAttrib struct { - name string - value interface{} +type InstAttribs []InstAttrib +type InstAttrib struct { + Name string + Value interface{} } -func (i *instrument) attributes(labels instAttribs) metric.MeasurementOption { +func (i *Instrument) attributes(labels InstAttribs) metric.MeasurementOption { attribs := make([]attribute.KeyValue, 0) for _, label := range labels { - switch value := label.value.(type) { + switch value := label.Value.(type) { case string: - attribs = append(attribs, attribute.String(label.name, value)) + attribs = append(attribs, attribute.String(label.Name, value)) case bool: - attribs = append(attribs, attribute.Bool(label.name, value)) + attribs = append(attribs, attribute.Bool(label.Name, value)) case int: - attribs = append(attribs, attribute.Int(label.name, value)) + attribs = append(attribs, attribute.Int(label.Name, value)) case int64: - attribs = append(attribs, attribute.Int64(label.name, value)) + attribs = append(attribs, attribute.Int64(label.Name, value)) case float64: - attribs = append(attribs, attribute.Float64(label.name, value)) + attribs = append(attribs, attribute.Float64(label.Name, value)) default: log.Errorf("Attempt to use label of unhandled type in metric %s", i.name) } diff --git a/workflow/metrics/test_exporter.go b/util/telemetry/test_metrics_exporter.go similarity index 61% rename from workflow/metrics/test_exporter.go rename to util/telemetry/test_metrics_exporter.go index 071e436386ac..4a49b1965306 100644 --- a/workflow/metrics/test_exporter.go +++ b/util/telemetry/test_metrics_exporter.go @@ -1,84 +1,39 @@ -package metrics +package telemetry import ( "context" "fmt" - "time" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/metric/metricdata" - "k8s.io/client-go/util/workqueue" ) +// TestScopeName is the name that the metrics running under test will have +const TestScopeName string = "argo-workflows-test" + // TestExporter is an opentelemetry metrics exporter, purely for use within // tests. It is not possible to query the values of an instrument via the otel // SDK, so this exporter provides methods by which you can request // metrics by name+attributes and therefore inspect whether they exist, and // their values for the purposes of testing only. // This is a public structure as it is used outside of this module also. -type TestExporter struct { +type TestMetricsExporter struct { metric.Reader } -// TestScopeName is the name that the metrics running under test will have -const TestScopeName string = "argo-workflows-test" - -var _ metric.Reader = &TestExporter{} - -var sharedMetrics *Metrics = nil -var sharedTE *TestExporter = nil - -// getSharedMetrics returns a singleton metrics with test exporter -// This is necessary because only the first call to workqueue.SetProvider -// takes effect within a single binary -// This can be fixed when we update to client-go 0.27 or later and we can -// create workqueues with https://godocs.io/k8s.io/client-go/util/workqueue#NewRateLimitingQueueWithConfig -func getSharedMetrics() (*Metrics, *TestExporter, error) { - if sharedMetrics == nil { - config := Config{ - Enabled: true, - TTL: 1 * time.Second, - } - var err error - sharedMetrics, sharedTE, err = createTestMetrics(&config, Callbacks{}) - if err != nil { - return nil, nil, err - } - - workqueue.SetProvider(sharedMetrics) - } - return sharedMetrics, sharedTE, nil -} - -// CreateDefaultTestMetrics creates a boring testExporter enabled -// metrics, suitable for many tests -func CreateDefaultTestMetrics() (*Metrics, *TestExporter, error) { - config := Config{ - Enabled: true, - } - return createTestMetrics(&config, Callbacks{}) -} - -func createTestMetrics(config *Config, callbacks Callbacks) (*Metrics, *TestExporter, error) { - ctx /* with cancel*/ := context.Background() - te := newTestExporter() - - m, err := New(ctx, TestScopeName, config, callbacks, metric.WithReader(te)) - return m, te, err - -} +var _ metric.Reader = &TestMetricsExporter{} -func newTestExporter() *TestExporter { +func NewTestMetricsExporter() *TestMetricsExporter { reader := metric.NewManualReader() - e := &TestExporter{ + e := &TestMetricsExporter{ Reader: reader, } return e } -func (t *TestExporter) getOurMetrics() (*[]metricdata.Metrics, error) { +func (t *TestMetricsExporter) getOurMetrics() (*[]metricdata.Metrics, error) { metrics := metricdata.ResourceMetrics{} err := t.Collect(context.TODO(), &metrics) if err != nil { @@ -92,7 +47,7 @@ func (t *TestExporter) getOurMetrics() (*[]metricdata.Metrics, error) { return nil, fmt.Errorf("%s scope not found", TestScopeName) } -func (t *TestExporter) getNamedMetric(name string) (*metricdata.Metrics, error) { +func (t *TestMetricsExporter) getNamedMetric(name string) (*metricdata.Metrics, error) { mtcs, err := t.getOurMetrics() if err != nil { return nil, err @@ -105,7 +60,7 @@ func (t *TestExporter) getNamedMetric(name string) (*metricdata.Metrics, error) return nil, fmt.Errorf("%s named metric not found in %v", name, mtcs) } -func (t *TestExporter) getNamedInt64CounterData(name string, attribs *attribute.Set) (*metricdata.DataPoint[int64], error) { +func (t *TestMetricsExporter) getNamedInt64CounterData(name string, attribs *attribute.Set) (*metricdata.DataPoint[int64], error) { mtc, err := t.getNamedMetric(name) if err != nil { return nil, err @@ -122,7 +77,7 @@ func (t *TestExporter) getNamedInt64CounterData(name string, attribs *attribute. return nil, fmt.Errorf("%s type counter[int64] not found in %v", name, mtc) } -func (t *TestExporter) getNamedFloat64GaugeData(name string, attribs *attribute.Set) (*metricdata.DataPoint[float64], error) { +func (t *TestMetricsExporter) getNamedFloat64GaugeData(name string, attribs *attribute.Set) (*metricdata.DataPoint[float64], error) { mtc, err := t.getNamedMetric(name) if err != nil { return nil, err @@ -139,7 +94,7 @@ func (t *TestExporter) getNamedFloat64GaugeData(name string, attribs *attribute. return nil, fmt.Errorf("%s type gauge[float64] not found in %v", name, mtc) } -func (t *TestExporter) getNamedInt64GaugeData(name string, attribs *attribute.Set) (*metricdata.DataPoint[int64], error) { +func (t *TestMetricsExporter) getNamedInt64GaugeData(name string, attribs *attribute.Set) (*metricdata.DataPoint[int64], error) { mtc, err := t.getNamedMetric(name) if err != nil { return nil, err @@ -158,7 +113,7 @@ func (t *TestExporter) getNamedInt64GaugeData(name string, attribs *attribute.Se return nil, fmt.Errorf("%s named gauge[float64] with attribs %v not found in %v", name, attribs, mtc) } -func (t *TestExporter) getNamedFloat64CounterData(name string, attribs *attribute.Set) (*metricdata.DataPoint[float64], error) { +func (t *TestMetricsExporter) getNamedFloat64CounterData(name string, attribs *attribute.Set) (*metricdata.DataPoint[float64], error) { mtc, err := t.getNamedMetric(name) if err != nil { return nil, err @@ -175,7 +130,7 @@ func (t *TestExporter) getNamedFloat64CounterData(name string, attribs *attribut return nil, fmt.Errorf("%s type counter[float64] not found in %v", name, mtc) } -func (t *TestExporter) getNamedFloat64HistogramData(name string, attribs *attribute.Set) (*metricdata.HistogramDataPoint[float64], error) { +func (t *TestMetricsExporter) getNamedFloat64HistogramData(name string, attribs *attribute.Set) (*metricdata.HistogramDataPoint[float64], error) { mtc, err := t.getNamedMetric(name) if err != nil { return nil, err @@ -193,13 +148,13 @@ func (t *TestExporter) getNamedFloat64HistogramData(name string, attribs *attrib } // GetFloat64HistogramData returns an otel histogram float64 data point for test reads -func (t *TestExporter) GetFloat64HistogramData(name string, attribs *attribute.Set) (*metricdata.HistogramDataPoint[float64], error) { +func (t *TestMetricsExporter) GetFloat64HistogramData(name string, attribs *attribute.Set) (*metricdata.HistogramDataPoint[float64], error) { data, err := t.getNamedFloat64HistogramData(name, attribs) return data, err } // GetInt64CounterValue returns an otel int64 counter value for test reads -func (t *TestExporter) GetInt64CounterValue(name string, attribs *attribute.Set) (int64, error) { +func (t *TestMetricsExporter) GetInt64CounterValue(name string, attribs *attribute.Set) (int64, error) { counter, err := t.getNamedInt64CounterData(name, attribs) if err != nil { return 0, err @@ -208,7 +163,7 @@ func (t *TestExporter) GetInt64CounterValue(name string, attribs *attribute.Set) } // GetFloat64GaugeValue returns an otel float64 gauge value for test reads -func (t *TestExporter) GetFloat64GaugeValue(name string, attribs *attribute.Set) (float64, error) { +func (t *TestMetricsExporter) GetFloat64GaugeValue(name string, attribs *attribute.Set) (float64, error) { gauge, err := t.getNamedFloat64GaugeData(name, attribs) if err != nil { return 0, err @@ -217,7 +172,7 @@ func (t *TestExporter) GetFloat64GaugeValue(name string, attribs *attribute.Set) } // GetInt64GaugeValue returns an otel int64 gauge value for test reads -func (t *TestExporter) GetInt64GaugeValue(name string, attribs *attribute.Set) (int64, error) { +func (t *TestMetricsExporter) GetInt64GaugeValue(name string, attribs *attribute.Set) (int64, error) { gauge, err := t.getNamedInt64GaugeData(name, attribs) if err != nil { return 0, err @@ -226,7 +181,7 @@ func (t *TestExporter) GetInt64GaugeValue(name string, attribs *attribute.Set) ( } // GetFloat64CounterValue returns an otel float64 counter value for test reads -func (t *TestExporter) GetFloat64CounterValue(name string, attribs *attribute.Set) (float64, error) { +func (t *TestMetricsExporter) GetFloat64CounterValue(name string, attribs *attribute.Set) (float64, error) { counter, err := t.getNamedFloat64CounterData(name, attribs) if err != nil { return 0, err diff --git a/util/telemetry/version.go b/util/telemetry/version.go new file mode 100644 index 000000000000..055aa038bf74 --- /dev/null +++ b/util/telemetry/version.go @@ -0,0 +1,33 @@ +package telemetry + +import ( + "context" + + "github.com/argoproj/argo-workflows/v3" +) + +func AddVersion(ctx context.Context, m *Metrics) error { + const nameVersion = `version` + err := m.CreateInstrument(Int64Counter, + nameVersion, + "Build metadata for this Controller", + "{unused}", + WithAsBuiltIn(), + ) + if err != nil { + return err + } + + version := argo.GetVersion() + m.AddInt(ctx, nameVersion, 1, InstAttribs{ + {Name: AttribBuildVersion, Value: version.Version}, + {Name: AttribBuildPlatform, Value: version.Platform}, + {Name: AttribBuildGoVersion, Value: version.GoVersion}, + {Name: AttribBuildDate, Value: version.BuildDate}, + {Name: AttribBuildCompiler, Value: version.Compiler}, + {Name: AttribBuildGitCommit, Value: version.GitCommit}, + {Name: AttribBuildGitTreeState, Value: version.GitTreeState}, + {Name: AttribBuildGitTag, Value: version.GitTag}, + }) + return nil +} diff --git a/util/telemetry/version_test.go b/util/telemetry/version_test.go new file mode 100644 index 000000000000..1337c1520b21 --- /dev/null +++ b/util/telemetry/version_test.go @@ -0,0 +1,31 @@ +package telemetry + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/attribute" + + "github.com/argoproj/argo-workflows/v3" +) + +func TestVersion(t *testing.T) { + _, te, err := createDefaultTestMetrics() + require.NoError(t, err) + assert.NotNil(t, te) + version := argo.GetVersion() + attribs := attribute.NewSet( + attribute.String(AttribBuildVersion, version.Version), + attribute.String(AttribBuildPlatform, version.Platform), + attribute.String(AttribBuildGoVersion, version.GoVersion), + attribute.String(AttribBuildDate, version.BuildDate), + attribute.String(AttribBuildCompiler, version.Compiler), + attribute.String(AttribBuildGitCommit, version.GitCommit), + attribute.String(AttribBuildGitTreeState, version.GitTreeState), + attribute.String(AttribBuildGitTag, version.GitTag), + ) + val, err := te.GetInt64CounterValue(`version`, &attribs) + require.NoError(t, err) + assert.Equal(t, int64(1), val) +} diff --git a/workflow/controller/controller.go b/workflow/controller/controller.go index 50d222dcb55a..dc3ca0fd4c47 100644 --- a/workflow/controller/controller.go +++ b/workflow/controller/controller.go @@ -52,6 +52,7 @@ import ( "github.com/argoproj/argo-workflows/v3/util/diff" "github.com/argoproj/argo-workflows/v3/util/env" errorsutil "github.com/argoproj/argo-workflows/v3/util/errors" + "github.com/argoproj/argo-workflows/v3/util/telemetry" "github.com/argoproj/argo-workflows/v3/workflow/artifactrepositories" "github.com/argoproj/argo-workflows/v3/workflow/common" controllercache "github.com/argoproj/argo-workflows/v3/workflow/controller/cache" @@ -221,6 +222,7 @@ func NewWorkflowController(ctx context.Context, restConfig *rest.Config, kubecli wfc.maxStackDepth = wfc.getMaxStackDepth() wfc.metrics, err = metrics.New(ctx, `workflows-controller`, + `argo_workflows`, wfc.getMetricsServerConfig(), metrics.Callbacks{ PodPhase: wfc.getPodPhaseMetrics, @@ -1391,18 +1393,18 @@ func (wfc *WorkflowController) getMaxStackDepth() int { return maxAllowedStackDepth } -func (wfc *WorkflowController) getMetricsServerConfig() *metrics.Config { +func (wfc *WorkflowController) getMetricsServerConfig() *telemetry.Config { // Metrics config - modifiers := make(map[string]metrics.Modifier) + modifiers := make(map[string]telemetry.Modifier) for name, modifier := range wfc.Config.MetricsConfig.Modifiers { - modifiers[name] = metrics.Modifier{ + modifiers[name] = telemetry.Modifier{ Disabled: modifier.Disabled, DisabledAttributes: modifier.DisabledAttributes, HistogramBuckets: modifier.HistogramBuckets, } } - metricsConfig := metrics.Config{ + metricsConfig := telemetry.Config{ Enabled: wfc.Config.MetricsConfig.Enabled == nil || *wfc.Config.MetricsConfig.Enabled, Path: wfc.Config.MetricsConfig.Path, Port: wfc.Config.MetricsConfig.Port, diff --git a/workflow/controller/controller_test.go b/workflow/controller/controller_test.go index c1b2c52b59fa..973b069eeeeb 100644 --- a/workflow/controller/controller_test.go +++ b/workflow/controller/controller_test.go @@ -32,6 +32,7 @@ import ( "github.com/argoproj/argo-workflows/v3/pkg/client/clientset/versioned/scheme" wfextv "github.com/argoproj/argo-workflows/v3/pkg/client/informers/externalversions" envutil "github.com/argoproj/argo-workflows/v3/util/env" + "github.com/argoproj/argo-workflows/v3/util/telemetry" armocks "github.com/argoproj/argo-workflows/v3/workflow/artifactrepositories/mocks" "github.com/argoproj/argo-workflows/v3/workflow/common" controllercache "github.com/argoproj/argo-workflows/v3/workflow/controller/cache" @@ -248,7 +249,7 @@ var defaultServiceAccount = &apiv1.ServiceAccount{ } // test exporter extract metric values from the metrics subsystem -var testExporter *metrics.TestExporter +var testExporter *telemetry.TestMetricsExporter func newController(options ...interface{}) (context.CancelFunc, *WorkflowController) { // get all the objects and add to the fake diff --git a/workflow/cron/operator_test.go b/workflow/cron/operator_test.go index eb10bce8b55c..e162865582ca 100644 --- a/workflow/cron/operator_test.go +++ b/workflow/cron/operator_test.go @@ -13,6 +13,7 @@ import ( "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" "github.com/argoproj/argo-workflows/v3/pkg/client/clientset/versioned/fake" + "github.com/argoproj/argo-workflows/v3/util/telemetry" "github.com/argoproj/argo-workflows/v3/workflow/common" "github.com/argoproj/argo-workflows/v3/workflow/metrics" "github.com/argoproj/argo-workflows/v3/workflow/util" @@ -181,7 +182,7 @@ func TestCronWorkflowConditionSubmissionError(t *testing.T) { v1alpha1.MustUnmarshal([]byte(invalidWf), &cronWf) cs := fake.NewSimpleClientset() - testMetrics, err := metrics.New(context.Background(), metrics.TestScopeName, &metrics.Config{}, metrics.Callbacks{}) + testMetrics, err := metrics.New(context.Background(), telemetry.TestScopeName, telemetry.TestScopeName, &telemetry.Config{}, metrics.Callbacks{}) require.NoError(t, err) woc := &cronWfOperationCtx{ wfClientset: cs, @@ -237,7 +238,7 @@ func TestSpecError(t *testing.T) { cs := fake.NewSimpleClientset() ctx := context.Background() - testMetrics, err := metrics.New(ctx, metrics.TestScopeName, &metrics.Config{}, metrics.Callbacks{}) + testMetrics, err := metrics.New(ctx, telemetry.TestScopeName, telemetry.TestScopeName, &telemetry.Config{}, metrics.Callbacks{}) require.NoError(t, err) woc := &cronWfOperationCtx{ wfClientset: cs, @@ -262,7 +263,7 @@ func TestScheduleTimeParam(t *testing.T) { v1alpha1.MustUnmarshal([]byte(scheduledWf), &cronWf) cs := fake.NewSimpleClientset() - testMetrics, _ := metrics.New(context.Background(), metrics.TestScopeName, &metrics.Config{}, metrics.Callbacks{}) + testMetrics, _ := metrics.New(context.Background(), telemetry.TestScopeName, telemetry.TestScopeName, &telemetry.Config{}, metrics.Callbacks{}) woc := &cronWfOperationCtx{ wfClientset: cs, wfClient: cs.ArgoprojV1alpha1().Workflows(""), @@ -312,7 +313,7 @@ func TestLastUsedSchedule(t *testing.T) { v1alpha1.MustUnmarshal([]byte(lastUsedSchedule), &cronWf) cs := fake.NewSimpleClientset() - testMetrics, err := metrics.New(context.Background(), metrics.TestScopeName, &metrics.Config{}, metrics.Callbacks{}) + testMetrics, err := metrics.New(context.Background(), telemetry.TestScopeName, telemetry.TestScopeName, &telemetry.Config{}, metrics.Callbacks{}) require.NoError(t, err) woc := &cronWfOperationCtx{ wfClientset: cs, @@ -441,7 +442,7 @@ func TestMultipleSchedules(t *testing.T) { v1alpha1.MustUnmarshal([]byte(multipleSchedulesWf), &cronWf) cs := fake.NewSimpleClientset() - testMetrics, err := metrics.New(context.Background(), metrics.TestScopeName, &metrics.Config{}, metrics.Callbacks{}) + testMetrics, err := metrics.New(context.Background(), telemetry.TestScopeName, telemetry.TestScopeName, &telemetry.Config{}, metrics.Callbacks{}) require.NoError(t, err) woc := &cronWfOperationCtx{ wfClientset: cs, @@ -504,7 +505,7 @@ func TestSpecErrorWithScheduleAndSchedules(t *testing.T) { cs := fake.NewSimpleClientset() ctx := context.Background() - testMetrics, err := metrics.New(ctx, metrics.TestScopeName, &metrics.Config{}, metrics.Callbacks{}) + testMetrics, err := metrics.New(ctx, telemetry.TestScopeName, telemetry.TestScopeName, &telemetry.Config{}, metrics.Callbacks{}) require.NoError(t, err) woc := &cronWfOperationCtx{ wfClientset: cs, @@ -565,7 +566,7 @@ func TestSpecErrorWithValidAndInvalidSchedules(t *testing.T) { cs := fake.NewSimpleClientset() ctx := context.Background() - testMetrics, err := metrics.New(ctx, metrics.TestScopeName, &metrics.Config{}, metrics.Callbacks{}) + testMetrics, err := metrics.New(ctx, telemetry.TestScopeName, telemetry.TestScopeName, &telemetry.Config{}, metrics.Callbacks{}) require.NoError(t, err) woc := &cronWfOperationCtx{ wfClientset: cs, diff --git a/workflow/gccontroller/gc_controller_test.go b/workflow/gccontroller/gc_controller_test.go index 5ac263ecba85..e1e77cdd4f09 100644 --- a/workflow/gccontroller/gc_controller_test.go +++ b/workflow/gccontroller/gc_controller_test.go @@ -15,6 +15,7 @@ import ( wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" fakewfclientset "github.com/argoproj/argo-workflows/v3/pkg/client/clientset/versioned/fake" + "github.com/argoproj/argo-workflows/v3/util/telemetry" "github.com/argoproj/argo-workflows/v3/workflow/metrics" "github.com/argoproj/argo-workflows/v3/workflow/util" ) @@ -345,7 +346,7 @@ func newTTLController(t *testing.T) *Controller { clock := testingclock.NewFakeClock(time.Now()) wfclientset := fakewfclientset.NewSimpleClientset() wfInformer := cache.NewSharedIndexInformer(nil, nil, 0, nil) - gcMetrics, err := metrics.New(context.Background(), metrics.TestScopeName, &metrics.Config{}, metrics.Callbacks{}) + gcMetrics, err := metrics.New(context.Background(), telemetry.TestScopeName, telemetry.TestScopeName, &telemetry.Config{}, metrics.Callbacks{}) require.NoError(t, err) return &Controller{ wfclientset: wfclientset, diff --git a/workflow/metrics/counter_cronworkflow_trigger.go b/workflow/metrics/counter_cronworkflow_trigger.go index 2f1950e4331e..f77c488b6b36 100644 --- a/workflow/metrics/counter_cronworkflow_trigger.go +++ b/workflow/metrics/counter_cronworkflow_trigger.go @@ -2,6 +2,8 @@ package metrics import ( "context" + + "github.com/argoproj/argo-workflows/v3/util/telemetry" ) const ( @@ -9,17 +11,17 @@ const ( ) func addCronWfTriggerCounter(_ context.Context, m *Metrics) error { - return m.createInstrument(int64Counter, + return m.CreateInstrument(telemetry.Int64Counter, nameCronTriggered, "Total number of cron workflows triggered", "{cronworkflow}", - withAsBuiltIn(), + telemetry.WithAsBuiltIn(), ) } func (m *Metrics) CronWfTrigger(ctx context.Context, name, namespace string) { - m.addInt(ctx, nameCronTriggered, 1, instAttribs{ - {name: labelCronWFName, value: name}, - {name: labelWorkflowNamespace, value: namespace}, + m.AddInt(ctx, nameCronTriggered, 1, telemetry.InstAttribs{ + {Name: telemetry.AttribCronWFName, Value: name}, + {Name: telemetry.AttribWorkflowNamespace, Value: namespace}, }) } diff --git a/workflow/metrics/counter_error.go b/workflow/metrics/counter_error.go index f53a14f44c13..f71f7e6d8315 100644 --- a/workflow/metrics/counter_error.go +++ b/workflow/metrics/counter_error.go @@ -2,6 +2,8 @@ package metrics import ( "context" + + "github.com/argoproj/argo-workflows/v3/util/telemetry" ) type ErrorCause string @@ -14,30 +16,30 @@ const ( ) func addErrorCounter(ctx context.Context, m *Metrics) error { - err := m.createInstrument(int64Counter, + err := m.CreateInstrument(telemetry.Int64Counter, nameErrorCount, "Number of errors encountered by the controller by cause", "{error}", - withAsBuiltIn(), + telemetry.WithAsBuiltIn(), ) if err != nil { return err } // Initialise all values to zero for _, cause := range []ErrorCause{ErrorCauseOperationPanic, ErrorCauseCronWorkflowSubmissionError, ErrorCauseCronWorkflowSpecError} { - m.addInt(ctx, nameErrorCount, 0, instAttribs{{name: labelErrorCause, value: string(cause)}}) + m.AddInt(ctx, nameErrorCount, 0, telemetry.InstAttribs{{Name: telemetry.AttribErrorCause, Value: string(cause)}}) } return nil } func (m *Metrics) OperationPanic(ctx context.Context) { - m.addInt(ctx, nameErrorCount, 1, instAttribs{{name: labelErrorCause, value: string(ErrorCauseOperationPanic)}}) + m.AddInt(ctx, nameErrorCount, 1, telemetry.InstAttribs{{Name: telemetry.AttribErrorCause, Value: string(ErrorCauseOperationPanic)}}) } func (m *Metrics) CronWorkflowSubmissionError(ctx context.Context) { - m.addInt(ctx, nameErrorCount, 1, instAttribs{{name: labelErrorCause, value: string(ErrorCauseCronWorkflowSubmissionError)}}) + m.AddInt(ctx, nameErrorCount, 1, telemetry.InstAttribs{{Name: telemetry.AttribErrorCause, Value: string(ErrorCauseCronWorkflowSubmissionError)}}) } func (m *Metrics) CronWorkflowSpecError(ctx context.Context) { - m.addInt(ctx, nameErrorCount, 1, instAttribs{{name: labelErrorCause, value: string(ErrorCauseCronWorkflowSpecError)}}) + m.AddInt(ctx, nameErrorCount, 1, telemetry.InstAttribs{{Name: telemetry.AttribErrorCause, Value: string(ErrorCauseCronWorkflowSpecError)}}) } diff --git a/workflow/metrics/counter_log.go b/workflow/metrics/counter_log.go index 96ed960943ab..b9cea55952ab 100644 --- a/workflow/metrics/counter_log.go +++ b/workflow/metrics/counter_log.go @@ -3,28 +3,30 @@ package metrics import ( "context" + "github.com/argoproj/argo-workflows/v3/util/telemetry" + log "github.com/sirupsen/logrus" ) type logMetric struct { - counter *instrument + counter *telemetry.Instrument } func addLogCounter(ctx context.Context, m *Metrics) error { const nameLogMessages = `log_messages` - err := m.createInstrument(int64Counter, + err := m.CreateInstrument(telemetry.Int64Counter, nameLogMessages, "Total number of log messages.", "{message}", - withAsBuiltIn(), + telemetry.WithAsBuiltIn(), ) lm := logMetric{ - counter: m.allInstruments[nameLogMessages], + counter: m.AllInstruments[nameLogMessages], } log.AddHook(lm) for _, level := range lm.Levels() { - m.addInt(ctx, nameLogMessages, 0, instAttribs{ - {name: labelLogLevel, value: level.String()}, + m.AddInt(ctx, nameLogMessages, 0, telemetry.InstAttribs{ + {Name: telemetry.AttribLogLevel, Value: level.String()}, }) } @@ -36,8 +38,8 @@ func (m logMetric) Levels() []log.Level { } func (m logMetric) Fire(entry *log.Entry) error { - (*m.counter).addInt(entry.Context, 1, instAttribs{ - {name: labelLogLevel, value: entry.Level.String()}, + (*m.counter).AddInt(entry.Context, 1, telemetry.InstAttribs{ + {Name: telemetry.AttribLogLevel, Value: entry.Level.String()}, }) return nil } diff --git a/workflow/metrics/counter_pod_missing.go b/workflow/metrics/counter_pod_missing.go index 991a2184796b..8f3b227d8ef0 100644 --- a/workflow/metrics/counter_pod_missing.go +++ b/workflow/metrics/counter_pod_missing.go @@ -2,6 +2,8 @@ package metrics import ( "context" + + "github.com/argoproj/argo-workflows/v3/util/telemetry" ) const ( @@ -9,18 +11,18 @@ const ( ) func addPodMissingCounter(_ context.Context, m *Metrics) error { - return m.createInstrument(int64Counter, + return m.CreateInstrument(telemetry.Int64Counter, namePodMissing, "Incidents of pod missing.", "{pod}", - withAsBuiltIn(), + telemetry.WithAsBuiltIn(), ) } func (m *Metrics) incPodMissing(ctx context.Context, val int64, recentlyStarted bool, phase string) { - m.addInt(ctx, namePodMissing, val, instAttribs{ - {name: labelRecentlyStarted, value: recentlyStarted}, - {name: labelNodePhase, value: phase}, + m.AddInt(ctx, namePodMissing, val, telemetry.InstAttribs{ + {Name: telemetry.AttribRecentlyStarted, Value: recentlyStarted}, + {Name: telemetry.AttribNodePhase, Value: phase}, }) } diff --git a/workflow/metrics/counter_pod_pending.go b/workflow/metrics/counter_pod_pending.go index 4c47fbb4a22f..5138df86abea 100644 --- a/workflow/metrics/counter_pod_pending.go +++ b/workflow/metrics/counter_pod_pending.go @@ -3,6 +3,8 @@ package metrics import ( "context" "strings" + + "github.com/argoproj/argo-workflows/v3/util/telemetry" ) const ( @@ -10,11 +12,11 @@ const ( ) func addPodPendingCounter(_ context.Context, m *Metrics) error { - return m.createInstrument(int64Counter, + return m.CreateInstrument(telemetry.Int64Counter, namePodPending, "Total number of pods that started pending by reason", "{pod}", - withAsBuiltIn(), + telemetry.WithAsBuiltIn(), ) } @@ -28,9 +30,9 @@ func (m *Metrics) ChangePodPending(ctx context.Context, reason, namespace string // the pod_phase metric can cope with this being visible return default: - m.addInt(ctx, namePodPending, 1, instAttribs{ - {name: labelPodPendingReason, value: splitReason[0]}, - {name: labelPodNamespace, value: namespace}, + m.AddInt(ctx, namePodPending, 1, telemetry.InstAttribs{ + {Name: telemetry.AttribPodPendingReason, Value: splitReason[0]}, + {Name: telemetry.AttribPodNamespace, Value: namespace}, }) } } diff --git a/workflow/metrics/counter_pod_phase.go b/workflow/metrics/counter_pod_phase.go index 530be09ad5ce..37686c5fc4c3 100644 --- a/workflow/metrics/counter_pod_phase.go +++ b/workflow/metrics/counter_pod_phase.go @@ -2,6 +2,8 @@ package metrics import ( "context" + + "github.com/argoproj/argo-workflows/v3/util/telemetry" ) const ( @@ -9,17 +11,17 @@ const ( ) func addPodPhaseCounter(_ context.Context, m *Metrics) error { - return m.createInstrument(int64Counter, + return m.CreateInstrument(telemetry.Int64Counter, namePodPhase, "Total number of Pods that have entered each phase", "{pod}", - withAsBuiltIn(), + telemetry.WithAsBuiltIn(), ) } func (m *Metrics) ChangePodPhase(ctx context.Context, phase, namespace string) { - m.addInt(ctx, namePodPhase, 1, instAttribs{ - {name: labelPodPhase, value: phase}, - {name: labelPodNamespace, value: namespace}, + m.AddInt(ctx, namePodPhase, 1, telemetry.InstAttribs{ + {Name: telemetry.AttribPodPhase, Value: phase}, + {Name: telemetry.AttribPodNamespace, Value: namespace}, }) } diff --git a/workflow/metrics/counter_template.go b/workflow/metrics/counter_template.go index 85861179cf42..d4dbd0ed91e9 100644 --- a/workflow/metrics/counter_template.go +++ b/workflow/metrics/counter_template.go @@ -2,6 +2,8 @@ package metrics import ( "context" + + "github.com/argoproj/argo-workflows/v3/util/telemetry" ) const ( @@ -9,25 +11,25 @@ const ( ) func addWorkflowTemplateCounter(_ context.Context, m *Metrics) error { - return m.createInstrument(int64Counter, + return m.CreateInstrument(telemetry.Int64Counter, nameWFTemplateTriggered, "Total number of workflow templates triggered by workflowTemplateRef", "{workflow_template}", - withAsBuiltIn(), + telemetry.WithAsBuiltIn(), ) } -func templateLabels(name, namespace string, cluster bool) instAttribs { - return instAttribs{ - {name: labelTemplateName, value: name}, - {name: labelTemplateNamespace, value: namespace}, - {name: labelTemplateCluster, value: cluster}, +func templateAttribs(name, namespace string, cluster bool) telemetry.InstAttribs { + return telemetry.InstAttribs{ + {Name: telemetry.AttribTemplateName, Value: name}, + {Name: telemetry.AttribTemplateNamespace, Value: namespace}, + {Name: telemetry.AttribTemplateCluster, Value: cluster}, } } func (m *Metrics) CountWorkflowTemplate(ctx context.Context, phase MetricWorkflowPhase, name, namespace string, cluster bool) { - labels := templateLabels(name, namespace, cluster) - labels = append(labels, instAttrib{name: labelWorkflowPhase, value: string(phase)}) + labels := templateAttribs(name, namespace, cluster) + labels = append(labels, telemetry.InstAttrib{Name: telemetry.AttribWorkflowPhase, Value: string(phase)}) - m.addInt(ctx, nameWFTemplateTriggered, 1, labels) + m.AddInt(ctx, nameWFTemplateTriggered, 1, labels) } diff --git a/workflow/metrics/counter_workflow_phase.go b/workflow/metrics/counter_workflow_phase.go index 693dcae12cf4..91c56eef4853 100644 --- a/workflow/metrics/counter_workflow_phase.go +++ b/workflow/metrics/counter_workflow_phase.go @@ -4,6 +4,7 @@ import ( "context" wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" + "github.com/argoproj/argo-workflows/v3/util/telemetry" ) const ( @@ -40,17 +41,17 @@ func ConvertWorkflowPhase(inPhase wfv1.WorkflowPhase) MetricWorkflowPhase { } func addWorkflowPhaseCounter(_ context.Context, m *Metrics) error { - return m.createInstrument(int64Counter, + return m.CreateInstrument(telemetry.Int64Counter, nameWorkflowPhaseCounter, "Total number of workflows that have entered each phase", "{workflow}", - withAsBuiltIn(), + telemetry.WithAsBuiltIn(), ) } func (m *Metrics) ChangeWorkflowPhase(ctx context.Context, phase MetricWorkflowPhase, namespace string) { - m.addInt(ctx, nameWorkflowPhaseCounter, 1, instAttribs{ - {name: labelWorkflowPhase, value: string(phase)}, - {name: labelWorkflowNamespace, value: namespace}, + m.AddInt(ctx, nameWorkflowPhaseCounter, 1, telemetry.InstAttribs{ + {Name: telemetry.AttribWorkflowPhase, Value: string(phase)}, + {Name: telemetry.AttribWorkflowNamespace, Value: namespace}, }) } diff --git a/workflow/metrics/gauge_pod_phase.go b/workflow/metrics/gauge_pod_phase.go index d9514ee90c79..b93e9c33729b 100644 --- a/workflow/metrics/gauge_pod_phase.go +++ b/workflow/metrics/gauge_pod_phase.go @@ -3,6 +3,8 @@ package metrics import ( "context" + "github.com/argoproj/argo-workflows/v3/util/telemetry" + "go.opentelemetry.io/otel/metric" ) @@ -11,16 +13,16 @@ type PodPhaseCallback func() map[string]int64 type podPhaseGauge struct { callback PodPhaseCallback - gauge *instrument + gauge *telemetry.Instrument } func addPodPhaseGauge(ctx context.Context, m *Metrics) error { const namePodsPhase = `pods_gauge` - err := m.createInstrument(int64ObservableGauge, + err := m.CreateInstrument(telemetry.Int64ObservableGauge, namePodsPhase, "Number of Pods from Workflows currently accessible by the controller by status.", "{pod}", - withAsBuiltIn(), + telemetry.WithAsBuiltIn(), ) if err != nil { return err @@ -29,9 +31,9 @@ func addPodPhaseGauge(ctx context.Context, m *Metrics) error { if m.callbacks.PodPhase != nil { ppGauge := podPhaseGauge{ callback: m.callbacks.PodPhase, - gauge: m.allInstruments[namePodsPhase], + gauge: m.AllInstruments[namePodsPhase], } - return m.allInstruments[namePodsPhase].registerCallback(m, ppGauge.update) + return m.AllInstruments[namePodsPhase].RegisterCallback(m.Metrics, ppGauge.update) } return nil } @@ -39,7 +41,7 @@ func addPodPhaseGauge(ctx context.Context, m *Metrics) error { func (p *podPhaseGauge) update(_ context.Context, o metric.Observer) error { phases := p.callback() for phase, val := range phases { - p.gauge.observeInt(o, val, instAttribs{{name: labelPodPhase, value: phase}}) + p.gauge.ObserveInt(o, val, telemetry.InstAttribs{{Name: telemetry.AttribPodPhase, Value: phase}}) } return nil } diff --git a/workflow/metrics/gauge_workflow_condition.go b/workflow/metrics/gauge_workflow_condition.go index 19e01917643f..3709cd07d9d1 100644 --- a/workflow/metrics/gauge_workflow_condition.go +++ b/workflow/metrics/gauge_workflow_condition.go @@ -6,6 +6,7 @@ import ( "go.opentelemetry.io/otel/metric" wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" + "github.com/argoproj/argo-workflows/v3/util/telemetry" ) // WorkflowConditionCallback is the function prototype to provide this gauge with the condition of the workflows @@ -13,16 +14,16 @@ type WorkflowConditionCallback func() map[wfv1.Condition]int64 type workflowConditionGauge struct { callback WorkflowConditionCallback - gauge *instrument + gauge *telemetry.Instrument } func addWorkflowConditionGauge(_ context.Context, m *Metrics) error { const nameWorkflowCondition = `workflow_condition` - err := m.createInstrument(int64ObservableGauge, + err := m.CreateInstrument(telemetry.Int64ObservableGauge, nameWorkflowCondition, "Workflow condition.", "{unit}", - withAsBuiltIn(), + telemetry.WithAsBuiltIn(), ) if err != nil { return err @@ -31,9 +32,9 @@ func addWorkflowConditionGauge(_ context.Context, m *Metrics) error { if m.callbacks.WorkflowCondition != nil { wfcGauge := workflowConditionGauge{ callback: m.callbacks.WorkflowCondition, - gauge: m.allInstruments[nameWorkflowCondition], + gauge: m.AllInstruments[nameWorkflowCondition], } - return m.allInstruments[nameWorkflowCondition].registerCallback(m, wfcGauge.update) + return m.AllInstruments[nameWorkflowCondition].RegisterCallback(m.Metrics, wfcGauge.update) } return nil // TODO init all phases? @@ -42,9 +43,9 @@ func addWorkflowConditionGauge(_ context.Context, m *Metrics) error { func (c *workflowConditionGauge) update(_ context.Context, o metric.Observer) error { conditions := c.callback() for condition, val := range conditions { - c.gauge.observeInt(o, val, instAttribs{ - {name: labelWorkflowType, value: string(condition.Type)}, - {name: labelWorkflowStatus, value: string(condition.Status)}, + c.gauge.ObserveInt(o, val, telemetry.InstAttribs{ + {Name: telemetry.AttribWorkflowType, Value: string(condition.Type)}, + {Name: telemetry.AttribWorkflowStatus, Value: string(condition.Status)}, }) } return nil diff --git a/workflow/metrics/gauge_workflow_phase.go b/workflow/metrics/gauge_workflow_phase.go index 357feaf19863..59a6e670e413 100644 --- a/workflow/metrics/gauge_workflow_phase.go +++ b/workflow/metrics/gauge_workflow_phase.go @@ -3,6 +3,8 @@ package metrics import ( "context" + "github.com/argoproj/argo-workflows/v3/util/telemetry" + "go.opentelemetry.io/otel/metric" ) @@ -11,16 +13,16 @@ type WorkflowPhaseCallback func() map[string]int64 type workflowPhaseGauge struct { callback WorkflowPhaseCallback - gauge *instrument + gauge *telemetry.Instrument } func addWorkflowPhaseGauge(_ context.Context, m *Metrics) error { const nameWorkflowPhaseGauge = `gauge` - err := m.createInstrument(int64ObservableGauge, + err := m.CreateInstrument(telemetry.Int64ObservableGauge, nameWorkflowPhaseGauge, "number of Workflows currently accessible by the controller by status", "{workflow}", - withAsBuiltIn(), + telemetry.WithAsBuiltIn(), ) if err != nil { return err @@ -29,9 +31,9 @@ func addWorkflowPhaseGauge(_ context.Context, m *Metrics) error { if m.callbacks.WorkflowPhase != nil { wfpGauge := workflowPhaseGauge{ callback: m.callbacks.WorkflowPhase, - gauge: m.allInstruments[nameWorkflowPhaseGauge], + gauge: m.AllInstruments[nameWorkflowPhaseGauge], } - return m.allInstruments[nameWorkflowPhaseGauge].registerCallback(m, wfpGauge.update) + return m.AllInstruments[nameWorkflowPhaseGauge].RegisterCallback(m.Metrics, wfpGauge.update) } return nil // TODO init all phases? @@ -40,7 +42,7 @@ func addWorkflowPhaseGauge(_ context.Context, m *Metrics) error { func (p *workflowPhaseGauge) update(_ context.Context, o metric.Observer) error { phases := p.callback() for phase, val := range phases { - p.gauge.observeInt(o, val, instAttribs{{name: labelWorkflowStatus, value: phase}}) + p.gauge.ObserveInt(o, val, telemetry.InstAttribs{{Name: telemetry.AttribWorkflowStatus, Value: phase}}) } return nil } diff --git a/workflow/metrics/histogram_durations.go b/workflow/metrics/histogram_durations.go index 88c00ad28bbb..6b6038e31ff2 100644 --- a/workflow/metrics/histogram_durations.go +++ b/workflow/metrics/histogram_durations.go @@ -4,6 +4,8 @@ import ( "context" "time" + "github.com/argoproj/argo-workflows/v3/util/telemetry" + "github.com/prometheus/client_golang/prometheus" log "github.com/sirupsen/logrus" @@ -24,15 +26,15 @@ func addOperationDurationHistogram(_ context.Context, m *Metrics) error { } bucketWidth := maxOperationTimeSeconds / float64(operationDurationMetricBucketCount) // The buckets here are only the 'defaults' and can be overridden with configmap defaults - return m.createInstrument(float64Histogram, + return m.CreateInstrument(telemetry.Float64Histogram, nameOperationDuration, "Histogram of durations of operations", "s", - withDefaultBuckets(prometheus.LinearBuckets(bucketWidth, bucketWidth, operationDurationMetricBucketCount)), - withAsBuiltIn(), + telemetry.WithDefaultBuckets(prometheus.LinearBuckets(bucketWidth, bucketWidth, operationDurationMetricBucketCount)), + telemetry.WithAsBuiltIn(), ) } func (m *Metrics) OperationCompleted(ctx context.Context, durationSeconds float64) { - m.record(ctx, nameOperationDuration, durationSeconds, instAttribs{}) + m.Record(ctx, nameOperationDuration, durationSeconds, telemetry.InstAttribs{}) } diff --git a/workflow/metrics/histogram_template.go b/workflow/metrics/histogram_template.go index 7fe50895b5d8..a466b9112fa3 100644 --- a/workflow/metrics/histogram_template.go +++ b/workflow/metrics/histogram_template.go @@ -3,6 +3,8 @@ package metrics import ( "context" "time" + + "github.com/argoproj/argo-workflows/v3/util/telemetry" ) const ( @@ -10,14 +12,14 @@ const ( ) func addWorkflowTemplateHistogram(_ context.Context, m *Metrics) error { - return m.createInstrument(float64Histogram, + return m.CreateInstrument(telemetry.Float64Histogram, nameWorkflowTemplateRuntime, "Duration of workflow template runs run through workflowTemplateRefs", "s", - withAsBuiltIn(), + telemetry.WithAsBuiltIn(), ) } func (m *Metrics) RecordWorkflowTemplateTime(ctx context.Context, duration time.Duration, name, namespace string, cluster bool) { - m.record(ctx, nameWorkflowTemplateRuntime, duration.Seconds(), templateLabels(name, namespace, cluster)) + m.Record(ctx, nameWorkflowTemplateRuntime, duration.Seconds(), templateAttribs(name, namespace, cluster)) } diff --git a/workflow/metrics/labels.go b/workflow/metrics/labels.go deleted file mode 100644 index 47de721689be..000000000000 --- a/workflow/metrics/labels.go +++ /dev/null @@ -1,43 +0,0 @@ -package metrics - -const ( - labelBuildVersion string = `version` - labelBuildPlatform string = `platform` - labelBuildGoVersion string = `go_version` - labelBuildDate string = `build_date` - labelBuildCompiler string = `compiler` - labelBuildGitCommit string = `git_commit` - labelBuildGitTreeState string = `git_treestate` - labelBuildGitTag string = `git_tag` - - labelCronWFName string = `name` - - labelErrorCause string = "cause" - - labelLogLevel string = `level` - - labelNodePhase string = `node_phase` - - labelPodPhase string = `phase` - labelPodNamespace string = `namespace` - labelPodPendingReason string = `reason` - - labelQueueName string = `queue_name` - - labelRecentlyStarted string = `recently_started` - - labelRequestKind = `kind` - labelRequestVerb = `verb` - labelRequestCode = `status_code` - - labelTemplateName string = `name` - labelTemplateNamespace string = `namespace` - labelTemplateCluster string = `cluster_scope` - - labelWorkerType string = `worker_type` - - labelWorkflowNamespace string = `namespace` - labelWorkflowPhase string = `phase` - labelWorkflowStatus = `status` - labelWorkflowType = `type` -) diff --git a/workflow/metrics/leader.go b/workflow/metrics/leader.go index ec58dbc1e774..ff3562b66b52 100644 --- a/workflow/metrics/leader.go +++ b/workflow/metrics/leader.go @@ -3,6 +3,8 @@ package metrics import ( "context" + "github.com/argoproj/argo-workflows/v3/util/telemetry" + "go.opentelemetry.io/otel/metric" ) @@ -10,16 +12,16 @@ type IsLeaderCallback func() bool type leaderGauge struct { callback IsLeaderCallback - gauge *instrument + gauge *telemetry.Instrument } func addIsLeader(ctx context.Context, m *Metrics) error { const nameLeader = `is_leader` - err := m.createInstrument(int64ObservableGauge, + err := m.CreateInstrument(telemetry.Int64ObservableGauge, nameLeader, "Emits 1 if leader, 0 otherwise. Always 1 if leader election is disabled.", "{leader}", - withAsBuiltIn(), + telemetry.WithAsBuiltIn(), ) if err != nil { return err @@ -29,9 +31,9 @@ func addIsLeader(ctx context.Context, m *Metrics) error { } lGauge := leaderGauge{ callback: m.callbacks.IsLeader, - gauge: m.allInstruments[nameLeader], + gauge: m.AllInstruments[nameLeader], } - return m.allInstruments[nameLeader].registerCallback(m, lGauge.update) + return m.AllInstruments[nameLeader].RegisterCallback(m.Metrics, lGauge.update) } func (l *leaderGauge) update(_ context.Context, o metric.Observer) error { @@ -39,6 +41,6 @@ func (l *leaderGauge) update(_ context.Context, o metric.Observer) error { if l.callback() { val = 1 } - l.gauge.observeInt(o, val, instAttribs{}) + l.gauge.ObserveInt(o, val, telemetry.InstAttribs{}) return nil } diff --git a/workflow/metrics/leader_test.go b/workflow/metrics/leader_test.go index 623c5931464d..76514ab4fa51 100644 --- a/workflow/metrics/leader_test.go +++ b/workflow/metrics/leader_test.go @@ -6,11 +6,13 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/otel/attribute" + + "github.com/argoproj/argo-workflows/v3/util/telemetry" ) func TestIsLeader(t *testing.T) { _, te, err := createTestMetrics( - &Config{}, + &telemetry.Config{}, Callbacks{ IsLeader: func() bool { return true @@ -27,7 +29,7 @@ func TestIsLeader(t *testing.T) { func TestNotLeader(t *testing.T) { _, te, err := createTestMetrics( - &Config{}, + &telemetry.Config{}, Callbacks{ IsLeader: func() bool { return false diff --git a/workflow/metrics/metrics.go b/workflow/metrics/metrics.go index f2377333aa40..622b3147a9c4 100644 --- a/workflow/metrics/metrics.go +++ b/workflow/metrics/metrics.go @@ -2,96 +2,38 @@ package metrics import ( "context" - "os" - "sync" - "time" - log "github.com/sirupsen/logrus" - "go.opentelemetry.io/otel" + "github.com/argoproj/argo-workflows/v3/util/telemetry" - wfconfig "github.com/argoproj/argo-workflows/v3/config" - - "go.opentelemetry.io/contrib/instrumentation/runtime" - "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc" - "go.opentelemetry.io/otel/metric" metricsdk "go.opentelemetry.io/otel/sdk/metric" - "go.opentelemetry.io/otel/sdk/metric/metricdata" - "go.opentelemetry.io/otel/sdk/resource" - semconv "go.opentelemetry.io/otel/semconv/v1.24.0" ) -type Config struct { - Enabled bool - Path string - Port int - TTL time.Duration - IgnoreErrors bool - Secure bool - Modifiers map[string]Modifier - Temporality wfconfig.MetricsTemporality -} - type Metrics struct { - // Ensures mutual exclusion in workflows map - mutex sync.RWMutex + *telemetry.Metrics - // Evil context for compatibility with legacy context free interfaces - ctx context.Context - otelMeter *metric.Meter - callbacks Callbacks - config *Config - - allInstruments map[string]*instrument + callbacks Callbacks realtimeWorkflows map[string][]realtimeTracker } -func New(ctx context.Context, serviceName string, config *Config, callbacks Callbacks, extraOpts ...metricsdk.Option) (*Metrics, error) { - res := resource.NewWithAttributes( - semconv.SchemaURL, - semconv.ServiceName(serviceName), - ) - - options := make([]metricsdk.Option, 0) - options = append(options, metricsdk.WithResource(res)) - _, otlpEnabled := os.LookupEnv(`OTEL_EXPORTER_OTLP_ENDPOINT`) - _, otlpMetricsEnabled := os.LookupEnv(`OTEL_EXPORTER_OTLP_METRICS_ENDPOINT`) - if otlpEnabled || otlpMetricsEnabled { - log.Info("Starting OTLP metrics exporter") - otelExporter, err := otlpmetricgrpc.New(ctx, otlpmetricgrpc.WithTemporalitySelector(getTemporality(config))) - if err != nil { - return nil, err - } - options = append(options, metricsdk.WithReader(metricsdk.NewPeriodicReader(otelExporter))) - } - - if config.Enabled { - log.Info("Starting Prometheus metrics exporter") - promExporter, err := config.prometheusMetricsExporter(`argo_workflows`) - if err != nil { - return nil, err - } - options = append(options, metricsdk.WithReader(promExporter)) +func New(ctx context.Context, serviceName, prometheusName string, config *telemetry.Config, callbacks Callbacks, extraOpts ...metricsdk.Option) (*Metrics, error) { + m, err := telemetry.NewMetrics(ctx, serviceName, prometheusName, config, extraOpts...) + if err != nil { + return nil, err } - options = append(options, extraOpts...) - options = append(options, view(config)) - - provider := metricsdk.NewMeterProvider(options...) - otel.SetMeterProvider(provider) - // Add runtime metrics - err := runtime.Start(runtime.WithMinimumReadMemStatsInterval(time.Second)) + err = m.Populate(ctx, + telemetry.AddVersion, + ) if err != nil { return nil, err } - meter := provider.Meter(serviceName) metrics := &Metrics{ - ctx: ctx, - otelMeter: &meter, + Metrics: m, callbacks: callbacks, - config: config, realtimeWorkflows: make(map[string][]realtimeTracker), } + err = metrics.populate(ctx, addIsLeader, addPodPhaseGauge, @@ -107,7 +49,6 @@ func New(ctx context.Context, serviceName string, config *Config, callbacks Call addErrorCounter, addLogCounter, addK8sRequests, - addVersion, addWorkflowConditionGauge, addWorkQueueMetrics, ) @@ -123,7 +64,6 @@ func New(ctx context.Context, serviceName string, config *Config, callbacks Call type addMetric func(context.Context, *Metrics) error func (m *Metrics) populate(ctx context.Context, adders ...addMetric) error { - m.allInstruments = make(map[string]*instrument) for _, adder := range adders { if err := adder(ctx, m); err != nil { return err @@ -131,18 +71,3 @@ func (m *Metrics) populate(ctx context.Context, adders ...addMetric) error { } return nil } - -func getTemporality(config *Config) metricsdk.TemporalitySelector { - switch config.Temporality { - case wfconfig.MetricsTemporalityCumulative: - return func(metricsdk.InstrumentKind) metricdata.Temporality { - return metricdata.CumulativeTemporality - } - case wfconfig.MetricsTemporalityDelta: - return func(metricsdk.InstrumentKind) metricdata.Temporality { - return metricdata.DeltaTemporality - } - default: - return metricsdk.DefaultTemporalitySelector - } -} diff --git a/workflow/metrics/metrics_custom.go b/workflow/metrics/metrics_custom.go index 3b6957e69576..f4057540408d 100644 --- a/workflow/metrics/metrics_custom.go +++ b/workflow/metrics/metrics_custom.go @@ -10,6 +10,7 @@ import ( "go.opentelemetry.io/otel/metric" wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" + "github.com/argoproj/argo-workflows/v3/util/telemetry" ) type RealTimeValueFunc func() float64 @@ -28,52 +29,56 @@ type customMetricValue struct { } type realtimeTracker struct { - inst *instrument + inst *telemetry.Instrument key string } -func (cmv *customMetricValue) getLabels() instAttribs { - labels := make(instAttribs, len(cmv.labels)) +func (cmv *customMetricValue) getLabels() telemetry.InstAttribs { + labels := make(telemetry.InstAttribs, len(cmv.labels)) for i := range cmv.labels { - labels[i] = instAttrib{name: cmv.labels[i].Key, value: cmv.labels[i].Value} + labels[i] = telemetry.InstAttrib{Name: cmv.labels[i].Key, Value: cmv.labels[i].Value} } return labels } -func (i *instrument) customUserdata(requireSuccess bool) map[string]*customMetricValue { - switch val := i.userdata.(type) { +func customUserdata(i *telemetry.Instrument, requireSuccess bool) map[string]*customMetricValue { + switch val := i.GetUserdata().(type) { case map[string]*customMetricValue: return val default: if requireSuccess { - panic(fmt.Errorf("internal error: unexpected userdata on custom metric %s", i.name)) + panic(fmt.Errorf("internal error: unexpected userdata on custom metric %s", i.GetName())) } return make(map[string]*customMetricValue) } } -func (i *instrument) getOrCreateValue(key string, labels []*wfv1.MetricLabel) *customMetricValue { - if value, ok := i.customUserdata(true)[key]; ok { +func getOrCreateValue(i *telemetry.Instrument, key string, labels []*wfv1.MetricLabel) *customMetricValue { + if value, ok := customUserdata(i, true)[key]; ok { return value } newValue := customMetricValue{ key: key, labels: labels, } - i.customUserdata(true)[key] = &newValue + customUserdata(i, true)[key] = &newValue return &newValue } +type customInstrument struct { + *telemetry.Instrument +} + // Common callback for realtime and gauges // For realtime this acts as a thunk to the calling convention // For non-realtime we have to fake observability as prometheus provides // up/down and set on the same gauge type, which otel forbids. -func (i *instrument) customCallback(_ context.Context, o metric.Observer) error { - for _, value := range i.customUserdata(true) { +func (i *customInstrument) customCallback(_ context.Context, o metric.Observer) error { + for _, value := range customUserdata(i.Instrument, true) { if value.rtValueFunc != nil { - i.observeFloat(o, value.rtValueFunc(), value.getLabels()) + i.ObserveFloat(o, value.rtValueFunc(), value.getLabels()) } else { - i.observeFloat(o, value.prometheusValue, value.getLabels()) + i.ObserveFloat(o, value.prometheusValue, value.getLabels()) } } return nil @@ -86,33 +91,33 @@ func (i *instrument) customCallback(_ context.Context, o metric.Observer) error // GetCustomMetric returns a custom (or any) metric from it's key // This is exported for legacy testing only -func (m *Metrics) GetCustomMetric(key string) *instrument { - m.mutex.RLock() - defer m.mutex.RUnlock() +func (m *Metrics) GetCustomMetric(key string) *telemetry.Instrument { + m.Mutex.RLock() + defer m.Mutex.RUnlock() // It's okay to return nil metrics in this function - return m.allInstruments[key] + return m.AllInstruments[key] } // CustomMetricExists returns if metric exists from its key // This is exported for testing only func (m *Metrics) CustomMetricExists(key string) bool { - m.mutex.RLock() - defer m.mutex.RUnlock() + m.Mutex.RLock() + defer m.Mutex.RUnlock() // It's okay to return nil metrics in this function - return m.allInstruments[key] != nil + return m.AllInstruments[key] != nil } // TODO labels on custom metrics -func (m *Metrics) matchExistingMetric(metricSpec *wfv1.Prometheus) (*instrument, error) { +func (m *Metrics) matchExistingMetric(metricSpec *wfv1.Prometheus) (*telemetry.Instrument, error) { key := metricSpec.Name - if inst, ok := m.allInstruments[key]; ok { - if inst.description != metricSpec.Help { - return nil, fmt.Errorf("Help for metric %s is already set to %s, it cannot be changed", metricSpec.Name, inst.description) + if inst, ok := m.AllInstruments[key]; ok { + if inst.GetDescription() != metricSpec.Help { + return nil, fmt.Errorf("Help for metric %s is already set to %s, it cannot be changed", metricSpec.Name, inst.GetDescription()) } wantedType := metricSpec.GetMetricType() - switch inst.otel.(type) { + switch inst.GetOtel().(type) { case *metric.Float64ObservableGauge: if wantedType != wfv1.MetricTypeGauge && !metricSpec.IsRealtime() { return nil, fmt.Errorf("Found existing gauge for custom metric %s of type %s", metricSpec.Name, wantedType) @@ -126,14 +131,14 @@ func (m *Metrics) matchExistingMetric(metricSpec *wfv1.Prometheus) (*instrument, return nil, fmt.Errorf("Found existing histogram for custom metric %s of type %s", metricSpec.Name, wantedType) } default: - return nil, fmt.Errorf("Found unwanted type %s for custom metric %s of type %s", reflect.TypeOf(inst.otel), metricSpec.Name, wantedType) + return nil, fmt.Errorf("Found unwanted type %s for custom metric %s of type %s", reflect.TypeOf(inst.GetOtel()), metricSpec.Name, wantedType) } return inst, nil } return nil, nil } -func (m *Metrics) ensureBaseMetric(metricSpec *wfv1.Prometheus, ownerKey string) (*instrument, error) { +func (m *Metrics) ensureBaseMetric(metricSpec *wfv1.Prometheus, ownerKey string) (*telemetry.Instrument, error) { metric, err := m.matchExistingMetric(metricSpec) if err != nil { return nil, err @@ -147,11 +152,11 @@ func (m *Metrics) ensureBaseMetric(metricSpec *wfv1.Prometheus, ownerKey string) return nil, err } m.attachCustomMetricToWorkflow(metricSpec, ownerKey) - inst := m.allInstruments[metricSpec.Name] + inst := m.AllInstruments[metricSpec.Name] if inst == nil { return nil, fmt.Errorf("Failed to create new metric %s", metricSpec.Name) } - inst.userdata = make(map[string]*customMetricValue) + inst.SetUserdata(make(map[string]*customMetricValue)) return inst, nil } @@ -163,7 +168,7 @@ func (m *Metrics) UpsertCustomMetric(ctx context.Context, metricSpec *wfv1.Prome if err != nil { return err } - metricValue := baseMetric.getOrCreateValue(metricSpec.GetKey(), metricSpec.Labels) + metricValue := getOrCreateValue(baseMetric, metricSpec.GetKey(), metricSpec.Labels) metricValue.lastUpdated = time.Now() metricType := metricSpec.GetMetricType() @@ -191,7 +196,7 @@ func (m *Metrics) UpsertCustomMetric(ctx context.Context, metricSpec *wfv1.Prome if err != nil { return err } - baseMetric.record(ctx, val, metricValue.getLabels()) + baseMetric.Record(ctx, val, metricValue.getLabels()) case metricType == wfv1.MetricTypeCounter: val, err := strconv.ParseFloat(metricSpec.Counter.Value, 64) if err != nil { @@ -213,7 +218,7 @@ func (m *Metrics) attachCustomMetricToWorkflow(metricSpec *wfv1.Prometheus, owne } } m.realtimeWorkflows[ownerKey] = append(m.realtimeWorkflows[ownerKey], realtimeTracker{ - inst: m.allInstruments[metricSpec.Name], + inst: m.AllInstruments[metricSpec.Name], key: metricSpec.GetKey(), }) } @@ -231,33 +236,35 @@ func (m *Metrics) createCustomMetric(metricSpec *wfv1.Prometheus) error { case metricType == wfv1.MetricTypeGauge: return m.createCustomGauge(metricSpec) case metricType == wfv1.MetricTypeHistogram: - return m.createInstrument(float64Histogram, metricSpec.Name, metricSpec.Help, "{item}", withDefaultBuckets(metricSpec.Histogram.GetBuckets())) + return m.CreateInstrument(telemetry.Float64Histogram, metricSpec.Name, metricSpec.Help, "{item}", telemetry.WithDefaultBuckets(metricSpec.Histogram.GetBuckets())) case metricType == wfv1.MetricTypeCounter: - err := m.createInstrument(float64ObservableUpDownCounter, metricSpec.Name, metricSpec.Help, "{item}") + err := m.CreateInstrument(telemetry.Float64ObservableUpDownCounter, metricSpec.Name, metricSpec.Help, "{item}") if err != nil { return err } - inst := m.allInstruments[metricSpec.Name] - return inst.registerCallback(m, inst.customCallback) + inst := m.AllInstruments[metricSpec.Name] + customInst := customInstrument{Instrument: inst} + return inst.RegisterCallback(m.Metrics, customInst.customCallback) default: return fmt.Errorf("invalid metric spec") } } func (m *Metrics) createCustomGauge(metricSpec *wfv1.Prometheus) error { - err := m.createInstrument(float64ObservableGauge, metricSpec.Name, metricSpec.Help, "{item}") + err := m.CreateInstrument(telemetry.Float64ObservableGauge, metricSpec.Name, metricSpec.Help, "{item}") if err != nil { return err } - inst := m.allInstruments[metricSpec.Name] - return inst.registerCallback(m, inst.customCallback) + inst := m.AllInstruments[metricSpec.Name] + customInst := customInstrument{Instrument: inst} + return inst.RegisterCallback(m.Metrics, customInst.customCallback) } func (m *Metrics) runCustomGC(ttl time.Duration) { - m.mutex.Lock() - defer m.mutex.Unlock() - for _, baseMetric := range m.allInstruments { - custom := baseMetric.customUserdata(false) + m.Mutex.Lock() + defer m.Mutex.Unlock() + for _, baseMetric := range m.AllInstruments { + custom := customUserdata(baseMetric, false) for key, value := range custom { if time.Since(value.lastUpdated) > ttl { delete(custom, key) @@ -284,8 +291,8 @@ func (m *Metrics) customMetricsGC(ctx context.Context, ttl time.Duration) { } func (m *Metrics) StopRealtimeMetricsForWfUID(key string) { - m.mutex.Lock() - defer m.mutex.Unlock() + m.Mutex.Lock() + defer m.Mutex.Unlock() if _, exists := m.realtimeWorkflows[key]; !exists { return @@ -293,7 +300,7 @@ func (m *Metrics) StopRealtimeMetricsForWfUID(key string) { realtimeMetrics := m.realtimeWorkflows[key] for _, metric := range realtimeMetrics { - delete(metric.inst.customUserdata(true), metric.key) + delete(customUserdata(metric.inst, true), metric.key) } delete(m.realtimeWorkflows, key) diff --git a/workflow/metrics/metrics_k8s_request.go b/workflow/metrics/metrics_k8s_request.go index b361baeae848..70fc46c29110 100644 --- a/workflow/metrics/metrics_k8s_request.go +++ b/workflow/metrics/metrics_k8s_request.go @@ -8,6 +8,7 @@ import ( "k8s.io/client-go/rest" "github.com/argoproj/argo-workflows/v3/util/k8s" + "github.com/argoproj/argo-workflows/v3/util/telemetry" ) const ( @@ -16,21 +17,21 @@ const ( ) func addK8sRequests(_ context.Context, m *Metrics) error { - err := m.createInstrument(int64Counter, + err := m.CreateInstrument(telemetry.Int64Counter, nameK8sRequestTotal, "Number of kubernetes requests executed.", "{request}", - withAsBuiltIn(), + telemetry.WithAsBuiltIn(), ) if err != nil { return err } - err = m.createInstrument(float64Histogram, + err = m.CreateInstrument(telemetry.Float64Histogram, nameK8sRequestDuration, "Duration of kubernetes requests executed.", "s", - withDefaultBuckets([]float64{0.1, 0.2, 0.5, 1.0, 2.0, 5.0, 10.0, 20.0, 60.0, 180.0}), - withAsBuiltIn(), + telemetry.WithDefaultBuckets([]float64{0.1, 0.2, 0.5, 1.0, 2.0, 5.0, 10.0, 20.0, 60.0, 180.0}), + telemetry.WithAsBuiltIn(), ) // Register this metrics with the global k8sMetrics.metrics = m @@ -53,13 +54,13 @@ func (m metricsRoundTripper) RoundTrip(r *http.Request) (*http.Response, error) duration := time.Since(startTime) if x != nil && m.metrics != nil { verb, kind := k8s.ParseRequest(r) - attribs := instAttribs{ - {name: labelRequestKind, value: kind}, - {name: labelRequestVerb, value: verb}, - {name: labelRequestCode, value: x.StatusCode}, + attribs := telemetry.InstAttribs{ + {Name: telemetry.AttribRequestKind, Value: kind}, + {Name: telemetry.AttribRequestVerb, Value: verb}, + {Name: telemetry.AttribRequestCode, Value: x.StatusCode}, } - (*m.metrics).addInt(m.ctx, nameK8sRequestTotal, 1, attribs) - (*m.metrics).record(m.ctx, nameK8sRequestDuration, duration.Seconds(), attribs) + (*m.metrics).AddInt(m.ctx, nameK8sRequestTotal, 1, attribs) + (*m.metrics).Record(m.ctx, nameK8sRequestDuration, duration.Seconds(), attribs) } return x, err } diff --git a/workflow/metrics/metrics_test.go b/workflow/metrics/metrics_test.go index 113fa4d44cb6..de2b224f9bc3 100644 --- a/workflow/metrics/metrics_test.go +++ b/workflow/metrics/metrics_test.go @@ -12,13 +12,14 @@ import ( "k8s.io/utils/pointer" wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" + "github.com/argoproj/argo-workflows/v3/util/telemetry" ) func TestMetrics(t *testing.T) { m, te, err := CreateDefaultTestMetrics() require.NoError(t, err) // Default buckets: {5, 10, 15, 20, 25, 30} - m.OperationCompleted(m.ctx, 5) + m.OperationCompleted(m.Ctx, 5) assert.NotNil(t, te) attribs := attribute.NewSet() val, err := te.GetFloat64HistogramData(nameOperationDuration, &attribs) @@ -33,12 +34,12 @@ func TestErrors(t *testing.T) { assert.Nil(t, m.GetCustomMetric("does-not-exist")) require.NoError(t, err) - err = m.UpsertCustomMetric(m.ctx, &wfv1.Prometheus{ + err = m.UpsertCustomMetric(m.Ctx, &wfv1.Prometheus{ Name: "invalid.name", }, "owner", func() float64 { return 0.0 }) require.Error(t, err) - err = m.UpsertCustomMetric(m.ctx, &wfv1.Prometheus{ + err = m.UpsertCustomMetric(m.Ctx, &wfv1.Prometheus{ Name: "name", Labels: []*wfv1.MetricLabel{{ Key: "invalid-key", @@ -49,10 +50,10 @@ func TestErrors(t *testing.T) { } func TestMetricGC(t *testing.T) { - config := Config{ + config := telemetry.Config{ Enabled: true, - Path: defaultPrometheusServerPath, - Port: defaultPrometheusServerPort, + Path: telemetry.DefaultPrometheusServerPath, + Port: telemetry.DefaultPrometheusServerPort, TTL: 1 * time.Second, } @@ -63,7 +64,7 @@ func TestMetricGC(t *testing.T) { labels := []*wfv1.MetricLabel{ {Key: "foo", Value: "bar"}, } - err = m.UpsertCustomMetric(m.ctx, &wfv1.Prometheus{ + err = m.UpsertCustomMetric(m.Ctx, &wfv1.Prometheus{ Name: key, Labels: labels, Help: "none", @@ -73,7 +74,7 @@ func TestMetricGC(t *testing.T) { baseCm := m.GetCustomMetric(key) assert.NotNil(t, baseCm) - cm := baseCm.customUserdata(true) + cm := customUserdata(baseCm, true) assert.Len(t, cm, 1) // Ensure we get at least one TTL run @@ -92,15 +93,15 @@ func TestMetricGC(t *testing.T) { } func TestRealtimeMetricGC(t *testing.T) { - config := Config{ + config := telemetry.Config{ Enabled: true, - Path: defaultPrometheusServerPath, - Port: defaultPrometheusServerPort, + Path: telemetry.DefaultPrometheusServerPath, + Port: telemetry.DefaultPrometheusServerPort, TTL: 1 * time.Second, } ctx, cancel := context.WithCancel(context.Background()) defer cancel() - m, err := New(ctx, TestScopeName, &config, Callbacks{}) + m, err := New(ctx, telemetry.TestScopeName, telemetry.TestScopeName, &config, Callbacks{}) require.NoError(t, err) labels := []*wfv1.MetricLabel{ @@ -108,7 +109,7 @@ func TestRealtimeMetricGC(t *testing.T) { } name := "realtime_metric" wfKey := "workflow-uid" - err = m.UpsertCustomMetric(m.ctx, &wfv1.Prometheus{ + err = m.UpsertCustomMetric(m.Ctx, &wfv1.Prometheus{ Name: name, Labels: labels, Help: "None", @@ -146,31 +147,31 @@ func TestRealtimeMetricGC(t *testing.T) { func TestWorkflowQueueMetrics(t *testing.T) { m, te, err := getSharedMetrics() require.NoError(t, err) - attribs := attribute.NewSet(attribute.String(labelQueueName, "workflow_queue")) - wfQueue := m.RateLimiterWithBusyWorkers(m.ctx, workqueue.DefaultControllerRateLimiter(), "workflow_queue") + attribs := attribute.NewSet(attribute.String(telemetry.AttribQueueName, "workflow_queue")) + wfQueue := m.RateLimiterWithBusyWorkers(m.Ctx, workqueue.DefaultControllerRateLimiter(), "workflow_queue") defer wfQueue.ShutDown() - assert.NotNil(t, m.allInstruments[nameWorkersQueueDepth]) - assert.NotNil(t, m.allInstruments[nameWorkersQueueLatency]) + assert.NotNil(t, m.AllInstruments[nameWorkersQueueDepth]) + assert.NotNil(t, m.AllInstruments[nameWorkersQueueLatency]) wfQueue.Add("hello") - require.NotNil(t, m.allInstruments[nameWorkersQueueAdds]) + require.NotNil(t, m.AllInstruments[nameWorkersQueueAdds]) val, err := te.GetInt64CounterValue(nameWorkersQueueAdds, &attribs) require.NoError(t, err) assert.Equal(t, int64(1), val) } func TestRealTimeMetricDeletion(t *testing.T) { - config := Config{ + config := telemetry.Config{ Enabled: true, - Path: defaultPrometheusServerPath, - Port: defaultPrometheusServerPort, + Path: telemetry.DefaultPrometheusServerPath, + Port: telemetry.DefaultPrometheusServerPort, TTL: 1 * time.Second, } ctx, cancel := context.WithCancel(context.Background()) defer cancel() - m, err := New(ctx, TestScopeName, &config, Callbacks{}) + m, err := New(ctx, telemetry.TestScopeName, telemetry.TestScopeName, &config, Callbacks{}) require.NoError(t, err) // We've not yet fed a metric in for 123 @@ -200,7 +201,7 @@ func TestRealTimeMetricDeletion(t *testing.T) { m.StopRealtimeMetricsForWfUID("456") assert.Empty(t, m.realtimeWorkflows["456"]) - cm := baseCm.customUserdata(true) + cm := customUserdata(baseCm, true) assert.Len(t, cm, 1) assert.Len(t, m.realtimeWorkflows["123"], 1) diff --git a/workflow/metrics/test_helpers.go b/workflow/metrics/test_helpers.go new file mode 100644 index 000000000000..944d46a1cff4 --- /dev/null +++ b/workflow/metrics/test_helpers.go @@ -0,0 +1,53 @@ +package metrics + +import ( + "context" + "time" + + "go.opentelemetry.io/otel/sdk/metric" + "k8s.io/client-go/util/workqueue" + + "github.com/argoproj/argo-workflows/v3/util/telemetry" +) + +var sharedMetrics *Metrics = nil +var sharedTE *telemetry.TestMetricsExporter = nil + +// getSharedMetrics returns a singleton metrics with test exporter +// This is necessary because only the first call to workqueue.SetProvider +// takes effect within a single binary +// This can be fixed when we update to client-go 0.27 or later and we can +// create workqueues with https://godocs.io/k8s.io/client-go/util/workqueue#NewRateLimitingQueueWithConfig +func getSharedMetrics() (*Metrics, *telemetry.TestMetricsExporter, error) { + if sharedMetrics == nil { + config := telemetry.Config{ + Enabled: true, + TTL: 1 * time.Second, + } + var err error + sharedMetrics, sharedTE, err = createTestMetrics(&config, Callbacks{}) + if err != nil { + return nil, nil, err + } + + workqueue.SetProvider(sharedMetrics) + } + return sharedMetrics, sharedTE, nil +} + +// CreateDefaultTestMetrics creates a boring testExporter enabled +// metrics, suitable for many tests +func CreateDefaultTestMetrics() (*Metrics, *telemetry.TestMetricsExporter, error) { + config := telemetry.Config{ + Enabled: true, + } + return createTestMetrics(&config, Callbacks{}) +} + +func createTestMetrics(config *telemetry.Config, callbacks Callbacks) (*Metrics, *telemetry.TestMetricsExporter, error) { + ctx /* with cancel*/ := context.Background() + te := telemetry.NewTestMetricsExporter() + + m, err := New(ctx, telemetry.TestScopeName, telemetry.TestScopeName, config, callbacks, metric.WithReader(te)) + return m, te, err +} diff --git a/workflow/metrics/version.go b/workflow/metrics/version.go deleted file mode 100644 index afc2b1c0a3d8..000000000000 --- a/workflow/metrics/version.go +++ /dev/null @@ -1,33 +0,0 @@ -package metrics - -import ( - "context" - - "github.com/argoproj/argo-workflows/v3" -) - -func addVersion(ctx context.Context, m *Metrics) error { - const nameVersion = `version` - err := m.createInstrument(int64Counter, - nameVersion, - "Build metadata for this Controller", - "{unused}", - withAsBuiltIn(), - ) - if err != nil { - return err - } - - version := argo.GetVersion() - m.addInt(ctx, nameVersion, 1, instAttribs{ - {name: labelBuildVersion, value: version.Version}, - {name: labelBuildPlatform, value: version.Platform}, - {name: labelBuildGoVersion, value: version.GoVersion}, - {name: labelBuildDate, value: version.BuildDate}, - {name: labelBuildCompiler, value: version.Compiler}, - {name: labelBuildGitCommit, value: version.GitCommit}, - {name: labelBuildGitTreeState, value: version.GitTreeState}, - {name: labelBuildGitTag, value: version.GitTag}, - }) - return nil -} diff --git a/workflow/metrics/version_test.go b/workflow/metrics/version_test.go deleted file mode 100644 index 5d2c22d8165d..000000000000 --- a/workflow/metrics/version_test.go +++ /dev/null @@ -1,31 +0,0 @@ -package metrics - -import ( - "testing" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "go.opentelemetry.io/otel/attribute" - - "github.com/argoproj/argo-workflows/v3" -) - -func TestVersion(t *testing.T) { - _, te, err := CreateDefaultTestMetrics() - require.NoError(t, err) - assert.NotNil(t, te) - version := argo.GetVersion() - attribs := attribute.NewSet( - attribute.String(labelBuildVersion, version.Version), - attribute.String(labelBuildPlatform, version.Platform), - attribute.String(labelBuildGoVersion, version.GoVersion), - attribute.String(labelBuildDate, version.BuildDate), - attribute.String(labelBuildCompiler, version.Compiler), - attribute.String(labelBuildGitCommit, version.GitCommit), - attribute.String(labelBuildGitTreeState, version.GitTreeState), - attribute.String(labelBuildGitTag, version.GitTag), - ) - val, err := te.GetInt64CounterValue(`version`, &attribs) - require.NoError(t, err) - assert.Equal(t, int64(1), val) -} diff --git a/workflow/metrics/work_queue.go b/workflow/metrics/work_queue.go index acca059b73b3..366188a63f95 100644 --- a/workflow/metrics/work_queue.go +++ b/workflow/metrics/work_queue.go @@ -3,6 +3,8 @@ package metrics import ( "context" + "github.com/argoproj/argo-workflows/v3/util/telemetry" + log "github.com/sirupsen/logrus" "go.opentelemetry.io/otel/metric" "k8s.io/client-go/util/workqueue" @@ -26,100 +28,100 @@ var _ workqueue.MetricsProvider = &Metrics{} type workersBusyRateLimiterWorkQueue struct { workqueue.RateLimitingInterface workerType string - busyGauge *instrument + busyGauge *telemetry.Instrument // Evil storage of context for compatibility with legacy interface to workqueue ctx context.Context } func addWorkQueueMetrics(_ context.Context, m *Metrics) error { - err := m.createInstrument(int64UpDownCounter, + err := m.CreateInstrument(telemetry.Int64UpDownCounter, nameWorkersBusy, "Number of workers currently busy", "{worker}", - withAsBuiltIn(), + telemetry.WithAsBuiltIn(), ) if err != nil { return err } - err = m.createInstrument(int64UpDownCounter, + err = m.CreateInstrument(telemetry.Int64UpDownCounter, nameWorkersQueueDepth, "Depth of the queue", "{item}", - withAsBuiltIn(), + telemetry.WithAsBuiltIn(), ) if err != nil { return err } - err = m.createInstrument(int64Counter, + err = m.CreateInstrument(telemetry.Int64Counter, nameWorkersQueueAdds, "Adds to the queue", "{item}", - withAsBuiltIn(), + telemetry.WithAsBuiltIn(), ) if err != nil { return err } - err = m.createInstrument(float64Histogram, + err = m.CreateInstrument(telemetry.Float64Histogram, nameWorkersQueueLatency, "Time objects spend waiting in the queue", "s", - withDefaultBuckets([]float64{1.0, 5.0, 20.0, 60.0, 180.0}), - withAsBuiltIn(), + telemetry.WithDefaultBuckets([]float64{1.0, 5.0, 20.0, 60.0, 180.0}), + telemetry.WithAsBuiltIn(), ) if err != nil { return err } - err = m.createInstrument(float64Histogram, + err = m.CreateInstrument(telemetry.Float64Histogram, nameWorkersQueueDuration, "Time objects spend being processed from the queue", "s", - withDefaultBuckets([]float64{0.1, 0.2, 0.5, 1.0, 2.0, 5.0, 10.0, 20.0, 60.0, 180.0}), - withAsBuiltIn(), + telemetry.WithDefaultBuckets([]float64{0.1, 0.2, 0.5, 1.0, 2.0, 5.0, 10.0, 20.0, 60.0, 180.0}), + telemetry.WithAsBuiltIn(), ) if err != nil { return err } - err = m.createInstrument(int64Counter, + err = m.CreateInstrument(telemetry.Int64Counter, nameWorkersRetries, "Retries in the queues", "{item}", - withAsBuiltIn(), + telemetry.WithAsBuiltIn(), ) if err != nil { return err } - err = m.createInstrument(float64ObservableGauge, + err = m.CreateInstrument(telemetry.Float64ObservableGauge, nameWorkersUnfinishedWork, "Unfinished work time", "s", - withAsBuiltIn(), + telemetry.WithAsBuiltIn(), ) if err != nil { return err } unfinishedCallback := queueUserdata{ - gauge: m.allInstruments[nameWorkersUnfinishedWork], + gauge: m.AllInstruments[nameWorkersUnfinishedWork], } - m.allInstruments[nameWorkersUnfinishedWork].userdata = &unfinishedCallback - err = m.allInstruments[nameWorkersUnfinishedWork].registerCallback(m, unfinishedCallback.update) + m.AllInstruments[nameWorkersUnfinishedWork].SetUserdata(&unfinishedCallback) + err = m.AllInstruments[nameWorkersUnfinishedWork].RegisterCallback(m.Metrics, unfinishedCallback.update) if err != nil { return err } - err = m.createInstrument(float64ObservableGauge, + err = m.CreateInstrument(telemetry.Float64ObservableGauge, nameWorkersLongestRunning, "Longest running worker", "s", - withAsBuiltIn(), + telemetry.WithAsBuiltIn(), ) if err != nil { return err } longestRunningCallback := queueUserdata{ - gauge: m.allInstruments[nameWorkersLongestRunning], + gauge: m.AllInstruments[nameWorkersLongestRunning], } - m.allInstruments[nameWorkersLongestRunning].userdata = &longestRunningCallback - err = m.allInstruments[nameWorkersLongestRunning].registerCallback(m, longestRunningCallback.update) + m.AllInstruments[nameWorkersLongestRunning].SetUserdata(&longestRunningCallback) + err = m.AllInstruments[nameWorkersLongestRunning].RegisterCallback(m.Metrics, longestRunningCallback.update) if err != nil { return err } @@ -130,27 +132,27 @@ func (m *Metrics) RateLimiterWithBusyWorkers(ctx context.Context, workQueue work queue := workersBusyRateLimiterWorkQueue{ RateLimitingInterface: workqueue.NewNamedRateLimitingQueue(workQueue, queueName), workerType: queueName, - busyGauge: m.allInstruments[nameWorkersBusy], + busyGauge: m.AllInstruments[nameWorkersBusy], ctx: ctx, } queue.newWorker(ctx) return queue } -func (w *workersBusyRateLimiterWorkQueue) attributes() instAttribs { - return instAttribs{{name: labelWorkerType, value: w.workerType}} +func (w *workersBusyRateLimiterWorkQueue) attributes() telemetry.InstAttribs { + return telemetry.InstAttribs{{Name: telemetry.AttribWorkerType, Value: w.workerType}} } func (w *workersBusyRateLimiterWorkQueue) newWorker(ctx context.Context) { - w.busyGauge.addInt(ctx, 0, w.attributes()) + w.busyGauge.AddInt(ctx, 0, w.attributes()) } func (w *workersBusyRateLimiterWorkQueue) workerBusy(ctx context.Context) { - w.busyGauge.addInt(ctx, 1, w.attributes()) + w.busyGauge.AddInt(ctx, 1, w.attributes()) } func (w *workersBusyRateLimiterWorkQueue) workerFree(ctx context.Context) { - w.busyGauge.addInt(ctx, -1, w.attributes()) + w.busyGauge.AddInt(ctx, -1, w.attributes()) } func (w workersBusyRateLimiterWorkQueue) Get() (interface{}, bool) { @@ -168,29 +170,29 @@ func (w workersBusyRateLimiterWorkQueue) Done(item interface{}) { type queueMetric struct { ctx context.Context name string - inst *instrument + inst *telemetry.Instrument value *float64 } type queueUserdata struct { - gauge *instrument + gauge *telemetry.Instrument metrics []queueMetric } -func (q *queueMetric) attributes() instAttribs { - return instAttribs{{name: labelQueueName, value: q.name}} +func (q *queueMetric) attributes() telemetry.InstAttribs { + return telemetry.InstAttribs{{Name: telemetry.AttribQueueName, Value: q.name}} } func (q queueMetric) Inc() { - q.inst.addInt(q.ctx, 1, q.attributes()) + q.inst.AddInt(q.ctx, 1, q.attributes()) } func (q queueMetric) Dec() { - q.inst.addInt(q.ctx, -1, q.attributes()) + q.inst.AddInt(q.ctx, -1, q.attributes()) } func (q queueMetric) Observe(val float64) { - q.inst.record(q.ctx, val, q.attributes()) + q.inst.Record(q.ctx, val, q.attributes()) } // Observable gauge stores in the shim @@ -198,83 +200,83 @@ func (q queueMetric) Set(val float64) { *(q.value) = val } -func (i *instrument) queueUserdata() *queueUserdata { - switch val := i.userdata.(type) { +func getQueueUserdata(i *telemetry.Instrument) *queueUserdata { + switch val := i.GetUserdata().(type) { case *queueUserdata: return val default: - log.Errorf("internal error: unexpected userdata on queue metric %s", i.name) + log.Errorf("internal error: unexpected userdata on queue metric %s", i.GetName()) return &queueUserdata{} } } func (q *queueUserdata) update(_ context.Context, o metric.Observer) error { for _, metric := range q.metrics { - q.gauge.observeFloat(o, *metric.value, metric.attributes()) + q.gauge.ObserveFloat(o, *metric.value, metric.attributes()) } return nil } func (m *Metrics) NewDepthMetric(name string) workqueue.GaugeMetric { return queueMetric{ - ctx: m.ctx, + ctx: m.Ctx, name: name, - inst: m.allInstruments[nameWorkersQueueDepth], + inst: m.AllInstruments[nameWorkersQueueDepth], } } func (m *Metrics) NewAddsMetric(name string) workqueue.CounterMetric { return queueMetric{ - ctx: m.ctx, + ctx: m.Ctx, name: name, - inst: m.allInstruments[nameWorkersQueueAdds], + inst: m.AllInstruments[nameWorkersQueueAdds], } } func (m *Metrics) NewLatencyMetric(name string) workqueue.HistogramMetric { return queueMetric{ - ctx: m.ctx, + ctx: m.Ctx, name: name, - inst: m.allInstruments[nameWorkersQueueLatency], + inst: m.AllInstruments[nameWorkersQueueLatency], } } func (m *Metrics) NewWorkDurationMetric(name string) workqueue.HistogramMetric { return queueMetric{ - ctx: m.ctx, + ctx: m.Ctx, name: name, - inst: m.allInstruments[nameWorkersQueueDuration], + inst: m.AllInstruments[nameWorkersQueueDuration], } } func (m *Metrics) NewRetriesMetric(name string) workqueue.CounterMetric { return queueMetric{ - ctx: m.ctx, + ctx: m.Ctx, name: name, - inst: m.allInstruments[nameWorkersRetries], + inst: m.AllInstruments[nameWorkersRetries], } } func (m *Metrics) NewUnfinishedWorkSecondsMetric(name string) workqueue.SettableGaugeMetric { metric := queueMetric{ - ctx: m.ctx, + ctx: m.Ctx, name: name, - inst: m.allInstruments[nameWorkersUnfinishedWork], + inst: m.AllInstruments[nameWorkersUnfinishedWork], value: pointer.Float64(0.0), } - ud := metric.inst.queueUserdata() + ud := getQueueUserdata(metric.inst) ud.metrics = append(ud.metrics, metric) return metric } func (m *Metrics) NewLongestRunningProcessorSecondsMetric(name string) workqueue.SettableGaugeMetric { metric := queueMetric{ - ctx: m.ctx, + ctx: m.Ctx, name: name, - inst: m.allInstruments[nameWorkersLongestRunning], + inst: m.AllInstruments[nameWorkersLongestRunning], value: pointer.Float64(0.0), } - ud := metric.inst.queueUserdata() + ud := getQueueUserdata(metric.inst) ud.metrics = append(ud.metrics, metric) return metric } diff --git a/workflow/metrics/work_queue_test.go b/workflow/metrics/work_queue_test.go index f1a998ba7255..9c7f9766936b 100644 --- a/workflow/metrics/work_queue_test.go +++ b/workflow/metrics/work_queue_test.go @@ -7,21 +7,23 @@ import ( "github.com/stretchr/testify/require" "go.opentelemetry.io/otel/attribute" "k8s.io/client-go/util/workqueue" + + "github.com/argoproj/argo-workflows/v3/util/telemetry" ) func TestMetricsWorkQueue(t *testing.T) { m, te, err := getSharedMetrics() require.NoError(t, err) - attribsWT := attribute.NewSet(attribute.String(labelWorkerType, "test")) + attribsWT := attribute.NewSet(attribute.String(telemetry.AttribWorkerType, "test")) - queue := m.RateLimiterWithBusyWorkers(m.ctx, workqueue.DefaultControllerRateLimiter(), "test") + queue := m.RateLimiterWithBusyWorkers(m.Ctx, workqueue.DefaultControllerRateLimiter(), "test") defer queue.ShutDown() val, err := te.GetInt64CounterValue(nameWorkersBusy, &attribsWT) require.NoError(t, err) assert.Equal(t, int64(0), val) - attribsQN := attribute.NewSet(attribute.String(labelQueueName, "test")) + attribsQN := attribute.NewSet(attribute.String(telemetry.AttribQueueName, "test")) queue.Add("A") val, err = te.GetInt64CounterValue(nameWorkersBusy, &attribsWT) require.NoError(t, err)