Skip to content

Commit

Permalink
fix(metrics): Added when condition support for realtime metrics
Browse files Browse the repository at this point in the history
Signed-off-by: Yuping Fan <[email protected]>
  • Loading branch information
fyp711 committed Aug 15, 2024
1 parent c86bbda commit e06457e
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 24 deletions.
28 changes: 19 additions & 9 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -3664,13 +3664,15 @@ func (woc *wfOperationCtx) computeMetrics(metricList []*wfv1.Prometheus, localSc
metricTmpl.Labels = metricTmplSubstituted.Labels
metricTmpl.When = metricTmplSubstituted.When

proceed, err := shouldExecute(metricTmpl.When)
if err != nil {
woc.reportMetricEmissionError(fmt.Sprintf("unable to compute 'when' clause for metric '%s': %s", woc.wf.ObjectMeta.Name, err))
continue
}
if !proceed {
continue
if !metricTmpl.IsRealtime() {
proceed, err := shouldExecute(metricTmpl.When)
if err != nil {
woc.reportMetricEmissionError(fmt.Sprintf("unable to compute 'when' clause for metric '%s': %s", woc.wf.ObjectMeta.Name, err))
continue
}
if !proceed {
continue
}
}

if metricTmpl.IsRealtime() {
Expand All @@ -3691,7 +3693,15 @@ func (woc *wfOperationCtx) computeMetrics(metricList []*wfv1.Prometheus, localSc
woc.reportMetricEmissionError(fmt.Sprintf("could not construct metric '%s': %s", metricTmpl.Name, err))
continue
}
err = woc.controller.metrics.UpsertCustomMetric(metricTmpl.GetDesc(), string(woc.wf.UID), updatedMetric, true)
realTimeWhenFunc := func() bool {
proceed, err := shouldExecute(metricTmpl.When)
if err != nil {
woc.reportMetricEmissionError(fmt.Sprintf("unable to compute 'when' clause for metric '%s': %s", woc.wf.ObjectMeta.Name, err))
return false
}
return proceed
}
err = woc.controller.metrics.UpsertCustomMetric(metricTmpl.GetDesc(), string(woc.wf.UID), updatedMetric, true, realTimeWhenFunc)
if err != nil {
woc.reportMetricEmissionError(fmt.Sprintf("could not construct metric '%s': %s", metricTmpl.Name, err))
continue
Expand Down Expand Up @@ -3731,7 +3741,7 @@ func (woc *wfOperationCtx) computeMetrics(metricList []*wfv1.Prometheus, localSc
woc.reportMetricEmissionError(fmt.Sprintf("could not construct metric '%s': %s", metricSpec.Name, err))
continue
}
err = woc.controller.metrics.UpsertCustomMetric(metricSpec.GetDesc(), string(woc.wf.UID), updatedMetric, false)
err = woc.controller.metrics.UpsertCustomMetric(metricSpec.GetDesc(), string(woc.wf.UID), updatedMetric, false, nil)
if err != nil {
woc.reportMetricEmissionError(fmt.Sprintf("could not construct metric '%s': %s", metricSpec.Name, err))
continue
Expand Down
33 changes: 25 additions & 8 deletions workflow/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,11 @@ func (s ServerConfig) SameServerAs(other ServerConfig) bool {
}

type metric struct {
metric prometheus.Metric
lastUpdated time.Time
realtime bool
completed bool
metric prometheus.Metric
lastUpdated time.Time
realtime bool
completed bool
realTimeWhenFunc func() bool
}

type Metrics struct {
Expand Down Expand Up @@ -141,10 +142,26 @@ func (m *Metrics) allMetrics() []prometheus.Metric {
for _, metric := range m.workersBusy {
allMetrics = append(allMetrics, metric)
}
allCustomMetrics := m.allCustomMetrics()
allMetrics = append(allMetrics, allCustomMetrics...)

return allMetrics
}

func (m *Metrics) allCustomMetrics() []prometheus.Metric {
allCustomMetrics := []prometheus.Metric{}
for _, metric := range m.customMetrics {
allMetrics = append(allMetrics, metric.metric)
if metric.realtime {
if metric.realTimeWhenFunc == nil {
allCustomMetrics = append(allCustomMetrics, metric.metric)
} else if metric.realTimeWhenFunc() {
allCustomMetrics = append(allCustomMetrics, metric.metric)
}
} else {
allCustomMetrics = append(allCustomMetrics, metric.metric)
}
}
return allMetrics
return allCustomMetrics
}

func (m *Metrics) StopRealtimeMetricsForKey(key string) {
Expand Down Expand Up @@ -195,7 +212,7 @@ func (m *Metrics) GetCustomMetric(key string) prometheus.Metric {
return m.customMetrics[key].metric
}

func (m *Metrics) UpsertCustomMetric(key string, ownerKey string, newMetric prometheus.Metric, realtime bool) error {
func (m *Metrics) UpsertCustomMetric(key string, ownerKey string, newMetric prometheus.Metric, realtime bool, realTimeWhenFunc func() bool) error {
m.mutex.Lock()
defer m.mutex.Unlock()

Expand All @@ -209,7 +226,7 @@ func (m *Metrics) UpsertCustomMetric(key string, ownerKey string, newMetric prom
} else {
m.metricNameHelps[name] = help
}
m.customMetrics[key] = metric{metric: newMetric, lastUpdated: time.Now(), realtime: realtime}
m.customMetrics[key] = metric{metric: newMetric, lastUpdated: time.Now(), realtime: realtime, realTimeWhenFunc: realTimeWhenFunc}

// If this is a realtime metric, track it
if realtime {
Expand Down
67 changes: 60 additions & 7 deletions workflow/metrics/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package metrics

import (
"context"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -68,12 +69,12 @@ func TestMetrics(t *testing.T) {

assert.Nil(t, m.GetCustomMetric("does-not-exist"))

err := m.UpsertCustomMetric("metric", "", newCounter("test", "test", nil), false)
err := m.UpsertCustomMetric("metric", "", newCounter("test", "test", nil), false, nil)
if assert.NoError(t, err) {
assert.NotNil(t, m.GetCustomMetric("metric"))
}

err = m.UpsertCustomMetric("metric2", "", newCounter("test", "new test", nil), false)
err = m.UpsertCustomMetric("metric2", "", newCounter("test", "new test", nil), false, nil)
assert.Error(t, err)

badMetric, err := constructOrUpdateGaugeMetric(nil, &v1alpha1.Prometheus{
Expand All @@ -85,7 +86,7 @@ func TestMetrics(t *testing.T) {
},
})
if assert.NoError(t, err) {
err = m.UpsertCustomMetric("asdf", "", badMetric, false)
err = m.UpsertCustomMetric("asdf", "", badMetric, false, nil)
assert.Error(t, err)
}
}
Expand All @@ -108,7 +109,7 @@ func TestMetricGC(t *testing.T) {
m := New(config, config)
assert.Empty(t, m.customMetrics)

err := m.UpsertCustomMetric("metric", "", newCounter("test", "test", nil), false)
err := m.UpsertCustomMetric("metric", "", newCounter("test", "test", nil), false, nil)
if assert.NoError(t, err) {
assert.Len(t, m.customMetrics, 1)
}
Expand Down Expand Up @@ -141,7 +142,7 @@ func TestRealtimeMetricGC(t *testing.T) {
m := New(config, config)
assert.Empty(t, m.customMetrics)

err := m.UpsertCustomMetric("realtime_metric", "workflow-uid", newCounter("test", "test", nil), true)
err := m.UpsertCustomMetric("realtime_metric", "workflow-uid", newCounter("test", "test", nil), true, nil)
if assert.NoError(t, err) {
assert.Len(t, m.customMetrics, 1)
}
Expand Down Expand Up @@ -213,7 +214,7 @@ func TestRealTimeMetricDeletion(t *testing.T) {
rtMetric, err := ConstructRealTimeGaugeMetric(&v1alpha1.Prometheus{Name: "name", Help: "hello"}, func() float64 { return 0.0 })
assert.NoError(t, err)

err = m.UpsertCustomMetric("metrickey", "123", rtMetric, true)
err = m.UpsertCustomMetric("metrickey", "123", rtMetric, true, nil)
assert.NoError(t, err)
assert.NotEmpty(t, m.workflows["123"])
assert.Len(t, m.customMetrics, 1)
Expand All @@ -225,7 +226,7 @@ func TestRealTimeMetricDeletion(t *testing.T) {
metric, err := ConstructOrUpdateMetric(nil, &v1alpha1.Prometheus{Name: "name", Help: "hello", Gauge: &v1alpha1.Gauge{Value: "1"}})
assert.NoError(t, err)

err = m.UpsertCustomMetric("metrickey", "456", metric, false)
err = m.UpsertCustomMetric("metrickey", "456", metric, false, nil)
assert.NoError(t, err)
assert.Empty(t, m.workflows["456"])
assert.Len(t, m.customMetrics, 1)
Expand All @@ -234,3 +235,55 @@ func TestRealTimeMetricDeletion(t *testing.T) {
assert.Empty(t, m.workflows["456"])
assert.Len(t, m.customMetrics, 1)
}

func TestRealTimeWhenFunction(t *testing.T) {
config := ServerConfig{
Enabled: true,
Path: DefaultMetricsServerPath,
Port: DefaultMetricsServerPort,
TTL: 1 * time.Second,
}
m := New(config, config)

rtMetric, err := ConstructRealTimeGaugeMetric(&v1alpha1.Prometheus{Name: "name1", Help: "hello"}, func() float64 { return 0.0 })
assert.NoError(t, err)

realTimeWhenFalseFunc := func() bool {
return false
}
err = m.UpsertCustomMetric("metricFalseKey", "123", rtMetric, true, realTimeWhenFalseFunc)
assert.NoError(t, err)
metrics := m.allCustomMetrics()
assert.Empty(t, metrics)
assert.Len(t, metrics, 0)

metric, err := ConstructRealTimeGaugeMetric(&v1alpha1.Prometheus{Name: "name2", Help: "hello2"}, func() float64 { return 1.0 })
assert.NoError(t, err)
realTimeWhenTrueFunc := func() bool {
return true
}
err = m.UpsertCustomMetric("metrickey", "456", metric, false, realTimeWhenTrueFunc)
assert.NoError(t, err)
metrics = m.allCustomMetrics()
assert.NotEmpty(t, metrics)
assert.Len(t, metrics, 1)
tmpMetrics := &dto.Metric{}
metrics[0].Write(tmpMetrics)
value := tmpMetrics.Gauge.Value
assert.Equal(t, *value, float64(1))

name2found := false
name1found := false
allMetrics := m.allMetrics()
for _, metric := range allMetrics {
if strings.Contains(metric.Desc().String(), "name2") {
name2found = true
}

if strings.Contains(metric.Desc().String(), "name1") {
name1found = false
}
}
assert.True(t, name2found)
assert.False(t, name1found)
}

0 comments on commit e06457e

Please sign in to comment.