diff --git a/docs/metrics.md b/docs/metrics.md index 06e9c9205dfe..a74b71ebc27c 100644 --- a/docs/metrics.md +++ b/docs/metrics.md @@ -279,6 +279,16 @@ You should only see this under high load. `recently_started` is controlled by the [environment variable](environment-variables.md) `RECENTLY_STARTED_POD_DURATION` and defaults to 10 seconds. +#### `pods_total_count` + +A gauge of the number of pods which have entered each phase and then observed by the controller. +This is not directly controlled by the workflow controller, so it is possible for some pod phases to be missed. + +| attribute | explanation | +|-------------|-------------------------------------------| +| `phase` | The phase that the pod is in | +| `namespace` | The namespace in which the pod is running | + #### `queue_adds_count` A counter of additions to the work queues inside the controller. diff --git a/docs/upgrading.md b/docs/upgrading.md index 9f190c04f3b1..47d3da0baecc 100644 --- a/docs/upgrading.md +++ b/docs/upgrading.md @@ -26,6 +26,7 @@ These notes explain the differences in using the Prometheus `/metrics` endpoint The following are new metrics: * `is_leader` +* `pods_total_count` * `queue_duration` * `queue_longest_running` * `queue_retries` diff --git a/workflow/controller/operator.go b/workflow/controller/operator.go index b574e89d262a..b01e8d0bc2d8 100644 --- a/workflow/controller/operator.go +++ b/workflow/controller/operator.go @@ -1143,7 +1143,7 @@ func (woc *wfOperationCtx) podReconciliation(ctx context.Context) (error, bool) defer wfNodesLock.Unlock() node, err := woc.wf.Status.Nodes.Get(nodeID) if err == nil { - if newState := woc.assessNodeStatus(pod, node); newState != nil { + if newState := woc.assessNodeStatus(ctx, pod, node); newState != nil { // Check whether its taskresult is in an incompleted state. if newState.Succeeded() && woc.wf.Status.IsTaskResultIncomplete(node.ID) { woc.log.WithFields(log.Fields{"nodeID": newState.ID}).Debug("Taskresult of the node not yet completed") @@ -1348,7 +1348,7 @@ func printPodSpecLog(pod *apiv1.Pod, wfName string) { // assessNodeStatus compares the current state of a pod with its corresponding node // and returns the new node status if something changed -func (woc *wfOperationCtx) assessNodeStatus(pod *apiv1.Pod, old *wfv1.NodeStatus) *wfv1.NodeStatus { +func (woc *wfOperationCtx) assessNodeStatus(ctx context.Context, pod *apiv1.Pod, old *wfv1.NodeStatus) *wfv1.NodeStatus { new := old.DeepCopy() tmpl, err := woc.GetNodeTemplate(old) if err != nil { @@ -1402,6 +1402,9 @@ func (woc *wfOperationCtx) assessNodeStatus(pod *apiv1.Pod, old *wfv1.NodeStatus new.Phase = wfv1.NodeError new.Message = fmt.Sprintf("Unexpected pod phase for %s: %s", pod.ObjectMeta.Name, pod.Status.Phase) } + if old.Phase != new.Phase { + woc.controller.metrics.ChangePodPhase(ctx, string(new.Phase), pod.ObjectMeta.Namespace) + } // if it's ContainerSetTemplate pod then the inner container names should match to some node names, // in this case need to update nodes according to container status diff --git a/workflow/controller/operator_test.go b/workflow/controller/operator_test.go index b3d37537dfea..2788b594ab6e 100644 --- a/workflow/controller/operator_test.go +++ b/workflow/controller/operator_test.go @@ -1765,7 +1765,7 @@ func TestAssessNodeStatus(t *testing.T) { cancel, controller := newController() defer cancel() woc := newWorkflowOperationCtx(wf, controller) - got := woc.assessNodeStatus(tt.pod, tt.node) + got := woc.assessNodeStatus(context.TODO(), tt.pod, tt.node) assert.Equal(t, tt.want, got.Phase) }) } diff --git a/workflow/metrics/counter_pod_phase.go b/workflow/metrics/counter_pod_phase.go new file mode 100644 index 000000000000..530be09ad5ce --- /dev/null +++ b/workflow/metrics/counter_pod_phase.go @@ -0,0 +1,25 @@ +package metrics + +import ( + "context" +) + +const ( + namePodPhase = `pods_total_count` +) + +func addPodPhaseCounter(_ context.Context, m *Metrics) error { + return m.createInstrument(int64Counter, + namePodPhase, + "Total number of Pods that have entered each phase", + "{pod}", + withAsBuiltIn(), + ) +} + +func (m *Metrics) ChangePodPhase(ctx context.Context, phase, namespace string) { + m.addInt(ctx, namePodPhase, 1, instAttribs{ + {name: labelPodPhase, value: phase}, + {name: labelPodNamespace, value: namespace}, + }) +} diff --git a/workflow/metrics/labels.go b/workflow/metrics/labels.go index e3b457eee11a..9e33432d604f 100644 --- a/workflow/metrics/labels.go +++ b/workflow/metrics/labels.go @@ -16,7 +16,9 @@ const ( labelNodePhase string = `node_phase` - labelPodPhase string = `phase` + labelPodPhase string = `phase` + labelPodNamespace string = `namespace` + labelQueueName string = `queue_name` labelRecentlyStarted string = `recently_started` diff --git a/workflow/metrics/metrics.go b/workflow/metrics/metrics.go index e39ae19dbc2a..5361bda8388e 100644 --- a/workflow/metrics/metrics.go +++ b/workflow/metrics/metrics.go @@ -95,6 +95,7 @@ func New(ctx context.Context, serviceName string, config *Config, callbacks Call err = metrics.populate(ctx, addIsLeader, addPodPhaseGauge, + addPodPhaseCounter, addPodMissingCounter, addWorkflowPhaseGauge, addOperationDurationHistogram,