Skip to content

Commit

Permalink
feat: run status metrics (#697)
Browse files Browse the repository at this point in the history
Adds metrics for total runs by status:

```
# HELP otf_runs_statuses Total runs by status
# TYPE otf_runs_statuses gauge
otf_runs_statuses{status="applied"} 0
otf_runs_statuses{status="apply_queued"} 0
otf_runs_statuses{status="applying"} 0
otf_runs_statuses{status="cost_estimated"} 0
otf_runs_statuses{status="pending"} 0
otf_runs_statuses{status="plan_queued"} 0
otf_runs_statuses{status="planning"} 1
```
  • Loading branch information
leg100 authored Oct 29, 2024
1 parent ece6de7 commit 4642e5d
Show file tree
Hide file tree
Showing 4 changed files with 172 additions and 0 deletions.
7 changes: 7 additions & 0 deletions internal/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,13 @@ func (d *Daemon) Start(ctx context.Context, started chan struct{}) error {
Cache: make(map[string]vcs.Status),
},
},
{
Name: "run_metrics",
Logger: d.Logger,
System: &run.MetricsCollector{
Service: d.Runs,
},
},
{
Name: "timeout",
Logger: d.Logger,
Expand Down
73 changes: 73 additions & 0 deletions internal/run/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package run

import (
"context"

"github.com/leg100/otf/internal/pubsub"
"github.com/leg100/otf/internal/resource"
"github.com/prometheus/client_golang/prometheus"
)

func init() {
prometheus.MustRegister(runStatusMetric)
}

var runStatusMetric = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "otf",
Subsystem: "runs",
Name: "statuses",
Help: "Total runs by status",
}, []string{"status"})

type MetricsCollector struct {
Service *Service
currentStatuses map[string]Status
}

func (mc *MetricsCollector) Start(ctx context.Context) error {
// subscribe to run events
sub, unsub := mc.Service.Watch(ctx)
defer unsub()

runs, err := resource.ListAll(func(opts resource.PageOptions) (*resource.Page[*Run], error) {
return mc.Service.List(ctx, ListOptions{PageOptions: opts})
})
if err != nil {
return err
}
mc.bootstrap(runs...)

for event := range sub {
mc.update(event)
}
return pubsub.ErrSubscriptionTerminated
}

func (mc *MetricsCollector) bootstrap(runs ...*Run) {
mc.currentStatuses = make(map[string]Status, len(runs))
for _, run := range runs {
mc.currentStatuses[run.ID] = run.Status
runStatusMetric.WithLabelValues(run.Status.String()).Inc()
}
}

func (mc *MetricsCollector) update(event pubsub.Event[*Run]) {
if event.Type == pubsub.DeletedEvent {
// Run has been deleted, so lookup its last status and decrement
// the tally.
if lastStatus, ok := mc.currentStatuses[event.Payload.ID]; ok {
runStatusMetric.WithLabelValues(lastStatus.String()).Dec()
delete(mc.currentStatuses, event.Payload.ID)
}
} else {
// Run has been created or updated.
if lastStatus, ok := mc.currentStatuses[event.Payload.ID]; ok {
// Decrement tally for its last status
runStatusMetric.WithLabelValues(lastStatus.String()).Dec()
}
// Record new status
mc.currentStatuses[event.Payload.ID] = event.Payload.Status
// Increment tally for new status
runStatusMetric.WithLabelValues(event.Payload.Status.String()).Inc()
}
}
90 changes: 90 additions & 0 deletions internal/run/metrics_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package run

import (
"strings"
"testing"

"github.com/leg100/otf/internal/pubsub"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/assert"
)

func TestMetricsCollector_bootstrap(t *testing.T) {
runStatusMetric.Reset()

const metadata = `
# HELP otf_runs_statuses Total runs by status
# TYPE otf_runs_statuses gauge
`

mc := &MetricsCollector{}
mc.bootstrap(
&Run{ID: "run-1", Status: RunPending},
&Run{ID: "run-2", Status: RunPending},
&Run{ID: "run-3", Status: RunPending},
&Run{ID: "run-4", Status: RunPending},
&Run{ID: "run-5", Status: RunPlanning},
&Run{ID: "run-6", Status: RunPlanning},
&Run{ID: "run-7", Status: RunPlanning},
&Run{ID: "run-8", Status: RunPlanning},
&Run{ID: "run-9", Status: RunApplied},
&Run{ID: "run-10", Status: RunApplied},
&Run{ID: "run-11", Status: RunApplied},
&Run{ID: "run-12", Status: RunApplied},
)
assert.Len(t, mc.currentStatuses, 12)
want := `
otf_runs_statuses{status="applied"} 4
otf_runs_statuses{status="pending"} 4
otf_runs_statuses{status="planning"} 4
`
assert.NoError(t, testutil.CollectAndCompare(runStatusMetric, strings.NewReader(metadata+want), "otf_runs_statuses"))
}

func TestMetricsCollector_update(t *testing.T) {
runStatusMetric.Reset()

const metadata = `
# HELP otf_runs_statuses Total runs by status
# TYPE otf_runs_statuses gauge
`

mc := &MetricsCollector{}
mc.bootstrap()

mc.update(pubsub.Event[*Run]{
Payload: &Run{ID: "run-1", Status: RunPending},
})
mc.update(pubsub.Event[*Run]{
Payload: &Run{ID: "run-2", Status: RunPending},
})
mc.update(pubsub.Event[*Run]{
Payload: &Run{ID: "run-2", Status: RunPlanning},
})
mc.update(pubsub.Event[*Run]{
Payload: &Run{ID: "run-3", Status: RunPending},
})
mc.update(pubsub.Event[*Run]{
Payload: &Run{ID: "run-3", Status: RunPlanning},
})
mc.update(pubsub.Event[*Run]{
Payload: &Run{ID: "run-3", Status: RunApplied},
})
mc.update(pubsub.Event[*Run]{
Payload: &Run{ID: "run-4", Status: RunPending},
})
mc.update(pubsub.Event[*Run]{
Type: pubsub.DeletedEvent,
Payload: &Run{ID: "run-4"},
})

assert.Len(t, mc.currentStatuses, 3)
want := `
otf_runs_statuses{status="applied"} 1
otf_runs_statuses{status="pending"} 1
otf_runs_statuses{status="planning"} 1
`
assert.NoError(t, testutil.CollectAndCompare(runStatusMetric, strings.NewReader(metadata+want), "otf_runs_statuses"))
}
2 changes: 2 additions & 0 deletions internal/run/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ func (s *Service) List(ctx context.Context, opts ListOptions) (*resource.Page[*R
// NOTE: this is an internal action, invoked by the scheduler only.
func (s *Service) EnqueuePlan(ctx context.Context, runID string) (run *Run, err error) {
err = s.db.Tx(ctx, func(ctx context.Context, q *sqlc.Queries) error {
// TODO: this does not need to be part of the tx
subject, err := s.CanAccess(ctx, rbac.EnqueuePlanAction, runID)
if err != nil {
return err
Expand Down Expand Up @@ -386,6 +387,7 @@ func (s *Service) watchWithOptions(ctx context.Context, opts WatchOptions) (<-ch
// Apply enqueues an apply for the run.
func (s *Service) Apply(ctx context.Context, runID string) error {
return s.db.Tx(ctx, func(ctx context.Context, q *sqlc.Queries) error {
// TODO: this does not need to be part of the tx
subject, err := s.CanAccess(ctx, rbac.ApplyRunAction, runID)
if err != nil {
return err
Expand Down

0 comments on commit 4642e5d

Please sign in to comment.