diff --git a/internal/daemon/daemon.go b/internal/daemon/daemon.go index 4e8aa3de4..38a0cb59c 100644 --- a/internal/daemon/daemon.go +++ b/internal/daemon/daemon.go @@ -483,6 +483,13 @@ func (d *Daemon) Start(ctx context.Context, started chan struct{}) error { Cache: make(map[string]vcs.Status), }, }, + { + Name: "run_metrics", + Logger: d.Logger, + System: &run.MetricsCollector{ + Service: d.Runs, + }, + }, { Name: "timeout", Logger: d.Logger, diff --git a/internal/run/metrics.go b/internal/run/metrics.go new file mode 100644 index 000000000..3a25e968a --- /dev/null +++ b/internal/run/metrics.go @@ -0,0 +1,73 @@ +package run + +import ( + "context" + + "github.com/leg100/otf/internal/pubsub" + "github.com/leg100/otf/internal/resource" + "github.com/prometheus/client_golang/prometheus" +) + +func init() { + prometheus.MustRegister(runStatusMetric) +} + +var runStatusMetric = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "otf", + Subsystem: "runs", + Name: "statuses", + Help: "Total runs by status", +}, []string{"status"}) + +type MetricsCollector struct { + Service *Service + currentStatuses map[string]Status +} + +func (mc *MetricsCollector) Start(ctx context.Context) error { + // subscribe to run events + sub, unsub := mc.Service.Watch(ctx) + defer unsub() + + runs, err := resource.ListAll(func(opts resource.PageOptions) (*resource.Page[*Run], error) { + return mc.Service.List(ctx, ListOptions{PageOptions: opts}) + }) + if err != nil { + return err + } + mc.bootstrap(runs...) + + for event := range sub { + mc.update(event) + } + return pubsub.ErrSubscriptionTerminated +} + +func (mc *MetricsCollector) bootstrap(runs ...*Run) { + mc.currentStatuses = make(map[string]Status, len(runs)) + for _, run := range runs { + mc.currentStatuses[run.ID] = run.Status + runStatusMetric.WithLabelValues(run.Status.String()).Inc() + } +} + +func (mc *MetricsCollector) update(event pubsub.Event[*Run]) { + if event.Type == pubsub.DeletedEvent { + // Run has been deleted, so lookup its last status and decrement + // the tally. + if lastStatus, ok := mc.currentStatuses[event.Payload.ID]; ok { + runStatusMetric.WithLabelValues(lastStatus.String()).Dec() + delete(mc.currentStatuses, event.Payload.ID) + } + } else { + // Run has been created or updated. + if lastStatus, ok := mc.currentStatuses[event.Payload.ID]; ok { + // Decrement tally for its last status + runStatusMetric.WithLabelValues(lastStatus.String()).Dec() + } + // Record new status + mc.currentStatuses[event.Payload.ID] = event.Payload.Status + // Increment tally for new status + runStatusMetric.WithLabelValues(event.Payload.Status.String()).Inc() + } +} diff --git a/internal/run/metrics_test.go b/internal/run/metrics_test.go new file mode 100644 index 000000000..3f4dfc3a4 --- /dev/null +++ b/internal/run/metrics_test.go @@ -0,0 +1,90 @@ +package run + +import ( + "strings" + "testing" + + "github.com/leg100/otf/internal/pubsub" + "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/stretchr/testify/assert" +) + +func TestMetricsCollector_bootstrap(t *testing.T) { + runStatusMetric.Reset() + + const metadata = ` + # HELP otf_runs_statuses Total runs by status + # TYPE otf_runs_statuses gauge +` + + mc := &MetricsCollector{} + mc.bootstrap( + &Run{ID: "run-1", Status: RunPending}, + &Run{ID: "run-2", Status: RunPending}, + &Run{ID: "run-3", Status: RunPending}, + &Run{ID: "run-4", Status: RunPending}, + &Run{ID: "run-5", Status: RunPlanning}, + &Run{ID: "run-6", Status: RunPlanning}, + &Run{ID: "run-7", Status: RunPlanning}, + &Run{ID: "run-8", Status: RunPlanning}, + &Run{ID: "run-9", Status: RunApplied}, + &Run{ID: "run-10", Status: RunApplied}, + &Run{ID: "run-11", Status: RunApplied}, + &Run{ID: "run-12", Status: RunApplied}, + ) + assert.Len(t, mc.currentStatuses, 12) + want := ` + + otf_runs_statuses{status="applied"} 4 + otf_runs_statuses{status="pending"} 4 + otf_runs_statuses{status="planning"} 4 + ` + assert.NoError(t, testutil.CollectAndCompare(runStatusMetric, strings.NewReader(metadata+want), "otf_runs_statuses")) +} + +func TestMetricsCollector_update(t *testing.T) { + runStatusMetric.Reset() + + const metadata = ` + # HELP otf_runs_statuses Total runs by status + # TYPE otf_runs_statuses gauge +` + + mc := &MetricsCollector{} + mc.bootstrap() + + mc.update(pubsub.Event[*Run]{ + Payload: &Run{ID: "run-1", Status: RunPending}, + }) + mc.update(pubsub.Event[*Run]{ + Payload: &Run{ID: "run-2", Status: RunPending}, + }) + mc.update(pubsub.Event[*Run]{ + Payload: &Run{ID: "run-2", Status: RunPlanning}, + }) + mc.update(pubsub.Event[*Run]{ + Payload: &Run{ID: "run-3", Status: RunPending}, + }) + mc.update(pubsub.Event[*Run]{ + Payload: &Run{ID: "run-3", Status: RunPlanning}, + }) + mc.update(pubsub.Event[*Run]{ + Payload: &Run{ID: "run-3", Status: RunApplied}, + }) + mc.update(pubsub.Event[*Run]{ + Payload: &Run{ID: "run-4", Status: RunPending}, + }) + mc.update(pubsub.Event[*Run]{ + Type: pubsub.DeletedEvent, + Payload: &Run{ID: "run-4"}, + }) + + assert.Len(t, mc.currentStatuses, 3) + want := ` + + otf_runs_statuses{status="applied"} 1 + otf_runs_statuses{status="pending"} 1 + otf_runs_statuses{status="planning"} 1 + ` + assert.NoError(t, testutil.CollectAndCompare(runStatusMetric, strings.NewReader(metadata+want), "otf_runs_statuses")) +} diff --git a/internal/run/service.go b/internal/run/service.go index f3c63f58b..54f51a7a2 100644 --- a/internal/run/service.go +++ b/internal/run/service.go @@ -234,6 +234,7 @@ func (s *Service) List(ctx context.Context, opts ListOptions) (*resource.Page[*R // NOTE: this is an internal action, invoked by the scheduler only. func (s *Service) EnqueuePlan(ctx context.Context, runID string) (run *Run, err error) { err = s.db.Tx(ctx, func(ctx context.Context, q *sqlc.Queries) error { + // TODO: this does not need to be part of the tx subject, err := s.CanAccess(ctx, rbac.EnqueuePlanAction, runID) if err != nil { return err @@ -386,6 +387,7 @@ func (s *Service) watchWithOptions(ctx context.Context, opts WatchOptions) (<-ch // Apply enqueues an apply for the run. func (s *Service) Apply(ctx context.Context, runID string) error { return s.db.Tx(ctx, func(ctx context.Context, q *sqlc.Queries) error { + // TODO: this does not need to be part of the tx subject, err := s.CanAccess(ctx, rbac.ApplyRunAction, runID) if err != nil { return err