diff --git a/workflow/controller/operator.go b/workflow/controller/operator.go index 84279172351a..a0145f85a82a 100644 --- a/workflow/controller/operator.go +++ b/workflow/controller/operator.go @@ -3664,13 +3664,15 @@ func (woc *wfOperationCtx) computeMetrics(metricList []*wfv1.Prometheus, localSc metricTmpl.Labels = metricTmplSubstituted.Labels metricTmpl.When = metricTmplSubstituted.When - proceed, err := shouldExecute(metricTmpl.When) - if err != nil { - woc.reportMetricEmissionError(fmt.Sprintf("unable to compute 'when' clause for metric '%s': %s", woc.wf.ObjectMeta.Name, err)) - continue - } - if !proceed { - continue + if !metricTmpl.IsRealtime() { + proceed, err := shouldExecute(metricTmpl.When) + if err != nil { + woc.reportMetricEmissionError(fmt.Sprintf("unable to compute 'when' clause for metric '%s': %s", woc.wf.ObjectMeta.Name, err)) + continue + } + if !proceed { + continue + } } if metricTmpl.IsRealtime() { @@ -3691,7 +3693,15 @@ func (woc *wfOperationCtx) computeMetrics(metricList []*wfv1.Prometheus, localSc woc.reportMetricEmissionError(fmt.Sprintf("could not construct metric '%s': %s", metricTmpl.Name, err)) continue } - err = woc.controller.metrics.UpsertCustomMetric(metricTmpl.GetDesc(), string(woc.wf.UID), updatedMetric, true) + realTimeWhenFunc := func() bool { + proceed, err := shouldExecute(metricTmpl.When) + if err != nil { + woc.reportMetricEmissionError(fmt.Sprintf("unable to compute 'when' clause for metric '%s': %s", woc.wf.ObjectMeta.Name, err)) + return false + } + return proceed + } + err = woc.controller.metrics.UpsertCustomMetric(metricTmpl.GetDesc(), string(woc.wf.UID), updatedMetric, true, realTimeWhenFunc) if err != nil { woc.reportMetricEmissionError(fmt.Sprintf("could not construct metric '%s': %s", metricTmpl.Name, err)) continue @@ -3731,7 +3741,7 @@ func (woc *wfOperationCtx) computeMetrics(metricList []*wfv1.Prometheus, localSc woc.reportMetricEmissionError(fmt.Sprintf("could not construct metric '%s': %s", metricSpec.Name, err)) continue } - err = woc.controller.metrics.UpsertCustomMetric(metricSpec.GetDesc(), string(woc.wf.UID), updatedMetric, false) + err = woc.controller.metrics.UpsertCustomMetric(metricSpec.GetDesc(), string(woc.wf.UID), updatedMetric, false, nil) if err != nil { woc.reportMetricEmissionError(fmt.Sprintf("could not construct metric '%s': %s", metricSpec.Name, err)) continue diff --git a/workflow/metrics/metrics.go b/workflow/metrics/metrics.go index fd001c864e92..5d850c938183 100644 --- a/workflow/metrics/metrics.go +++ b/workflow/metrics/metrics.go @@ -40,10 +40,11 @@ func (s ServerConfig) SameServerAs(other ServerConfig) bool { } type metric struct { - metric prometheus.Metric - lastUpdated time.Time - realtime bool - completed bool + metric prometheus.Metric + lastUpdated time.Time + realtime bool + completed bool + realTimeWhenFunc func() bool } type Metrics struct { @@ -141,10 +142,26 @@ func (m *Metrics) allMetrics() []prometheus.Metric { for _, metric := range m.workersBusy { allMetrics = append(allMetrics, metric) } + allCustomMetrics := m.allCustomMetrics() + allMetrics = append(allMetrics, allCustomMetrics...) + + return allMetrics +} + +func (m *Metrics) allCustomMetrics() []prometheus.Metric { + allCustomMetrics := []prometheus.Metric{} for _, metric := range m.customMetrics { - allMetrics = append(allMetrics, metric.metric) + if metric.realtime { + if metric.realTimeWhenFunc == nil { + allCustomMetrics = append(allCustomMetrics, metric.metric) + } else if metric.realTimeWhenFunc() { + allCustomMetrics = append(allCustomMetrics, metric.metric) + } + } else { + allCustomMetrics = append(allCustomMetrics, metric.metric) + } } - return allMetrics + return allCustomMetrics } func (m *Metrics) StopRealtimeMetricsForKey(key string) { @@ -195,7 +212,7 @@ func (m *Metrics) GetCustomMetric(key string) prometheus.Metric { return m.customMetrics[key].metric } -func (m *Metrics) UpsertCustomMetric(key string, ownerKey string, newMetric prometheus.Metric, realtime bool) error { +func (m *Metrics) UpsertCustomMetric(key string, ownerKey string, newMetric prometheus.Metric, realtime bool, realTimeWhenFunc func() bool) error { m.mutex.Lock() defer m.mutex.Unlock() @@ -209,7 +226,7 @@ func (m *Metrics) UpsertCustomMetric(key string, ownerKey string, newMetric prom } else { m.metricNameHelps[name] = help } - m.customMetrics[key] = metric{metric: newMetric, lastUpdated: time.Now(), realtime: realtime} + m.customMetrics[key] = metric{metric: newMetric, lastUpdated: time.Now(), realtime: realtime, realTimeWhenFunc: realTimeWhenFunc} // If this is a realtime metric, track it if realtime { diff --git a/workflow/metrics/metrics_test.go b/workflow/metrics/metrics_test.go index 078bc7095f2a..89f02f668297 100644 --- a/workflow/metrics/metrics_test.go +++ b/workflow/metrics/metrics_test.go @@ -2,6 +2,7 @@ package metrics import ( "context" + "strings" "testing" "time" @@ -68,12 +69,12 @@ func TestMetrics(t *testing.T) { assert.Nil(t, m.GetCustomMetric("does-not-exist")) - err := m.UpsertCustomMetric("metric", "", newCounter("test", "test", nil), false) + err := m.UpsertCustomMetric("metric", "", newCounter("test", "test", nil), false, nil) if assert.NoError(t, err) { assert.NotNil(t, m.GetCustomMetric("metric")) } - err = m.UpsertCustomMetric("metric2", "", newCounter("test", "new test", nil), false) + err = m.UpsertCustomMetric("metric2", "", newCounter("test", "new test", nil), false, nil) assert.Error(t, err) badMetric, err := constructOrUpdateGaugeMetric(nil, &v1alpha1.Prometheus{ @@ -85,7 +86,7 @@ func TestMetrics(t *testing.T) { }, }) if assert.NoError(t, err) { - err = m.UpsertCustomMetric("asdf", "", badMetric, false) + err = m.UpsertCustomMetric("asdf", "", badMetric, false, nil) assert.Error(t, err) } } @@ -108,7 +109,7 @@ func TestMetricGC(t *testing.T) { m := New(config, config) assert.Empty(t, m.customMetrics) - err := m.UpsertCustomMetric("metric", "", newCounter("test", "test", nil), false) + err := m.UpsertCustomMetric("metric", "", newCounter("test", "test", nil), false, nil) if assert.NoError(t, err) { assert.Len(t, m.customMetrics, 1) } @@ -141,7 +142,7 @@ func TestRealtimeMetricGC(t *testing.T) { m := New(config, config) assert.Empty(t, m.customMetrics) - err := m.UpsertCustomMetric("realtime_metric", "workflow-uid", newCounter("test", "test", nil), true) + err := m.UpsertCustomMetric("realtime_metric", "workflow-uid", newCounter("test", "test", nil), true, nil) if assert.NoError(t, err) { assert.Len(t, m.customMetrics, 1) } @@ -213,7 +214,7 @@ func TestRealTimeMetricDeletion(t *testing.T) { rtMetric, err := ConstructRealTimeGaugeMetric(&v1alpha1.Prometheus{Name: "name", Help: "hello"}, func() float64 { return 0.0 }) assert.NoError(t, err) - err = m.UpsertCustomMetric("metrickey", "123", rtMetric, true) + err = m.UpsertCustomMetric("metrickey", "123", rtMetric, true, nil) assert.NoError(t, err) assert.NotEmpty(t, m.workflows["123"]) assert.Len(t, m.customMetrics, 1) @@ -225,7 +226,7 @@ func TestRealTimeMetricDeletion(t *testing.T) { metric, err := ConstructOrUpdateMetric(nil, &v1alpha1.Prometheus{Name: "name", Help: "hello", Gauge: &v1alpha1.Gauge{Value: "1"}}) assert.NoError(t, err) - err = m.UpsertCustomMetric("metrickey", "456", metric, false) + err = m.UpsertCustomMetric("metrickey", "456", metric, false, nil) assert.NoError(t, err) assert.Empty(t, m.workflows["456"]) assert.Len(t, m.customMetrics, 1) @@ -234,3 +235,55 @@ func TestRealTimeMetricDeletion(t *testing.T) { assert.Empty(t, m.workflows["456"]) assert.Len(t, m.customMetrics, 1) } + +func TestRealTimeWhenFunction(t *testing.T) { + config := ServerConfig{ + Enabled: true, + Path: DefaultMetricsServerPath, + Port: DefaultMetricsServerPort, + TTL: 1 * time.Second, + } + m := New(config, config) + + rtMetric, err := ConstructRealTimeGaugeMetric(&v1alpha1.Prometheus{Name: "name1", Help: "hello"}, func() float64 { return 0.0 }) + assert.NoError(t, err) + + realTimeWhenFalseFunc := func() bool { + return false + } + err = m.UpsertCustomMetric("metricFalseKey", "123", rtMetric, true, realTimeWhenFalseFunc) + assert.NoError(t, err) + metrics := m.allCustomMetrics() + assert.Empty(t, metrics) + assert.Len(t, metrics, 0) + + metric, err := ConstructRealTimeGaugeMetric(&v1alpha1.Prometheus{Name: "name2", Help: "hello2"}, func() float64 { return 1.0 }) + assert.NoError(t, err) + realTimeWhenTrueFunc := func() bool { + return true + } + err = m.UpsertCustomMetric("metrickey", "456", metric, false, realTimeWhenTrueFunc) + assert.NoError(t, err) + metrics = m.allCustomMetrics() + assert.NotEmpty(t, metrics) + assert.Len(t, metrics, 1) + tmpMetrics := &dto.Metric{} + metrics[0].Write(tmpMetrics) + value := tmpMetrics.Gauge.Value + assert.Equal(t, *value, float64(1)) + + name2found := false + name1found := false + allMetrics := m.allMetrics() + for _, metric := range allMetrics { + if strings.Contains(metric.Desc().String(), "name2") { + name2found = true + } + + if strings.Contains(metric.Desc().String(), "name1") { + name1found = false + } + } + assert.True(t, name2found) + assert.False(t, name1found) +}