Skip to content

Commit

Permalink
feat: new pod phase counter metric
Browse files Browse the repository at this point in the history
From #12589

This is a new metric counting how many pods have gone into each pod
phase as observed by the controller.

This is like pods_gauge, but as a counter rather than a gauge.

The gauge is useful at telling you what is happening right now in
the cluster, but is not useful for long term statistics such as "How
many pods has workflows run" because it may never report some pods at
all. This counter can answer that question.

Note to reviewers: this is part of a stack of reviews for metrics
changes. Please don't merge until the rest of the stack is also ready.

Signed-off-by: Alan Clucas <[email protected]>
  • Loading branch information
Joibel committed Aug 15, 2024
1 parent e825f72 commit ed07b58
Show file tree
Hide file tree
Showing 7 changed files with 46 additions and 4 deletions.
10 changes: 10 additions & 0 deletions docs/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,16 @@ You should only see this under high load.

`recently_started` is controlled by the [environment variable](environment-variables.md) `RECENTLY_STARTED_POD_DURATION` and defaults to 10 seconds.

#### `pods_total_count`

A gauge of the number of pods which have entered each phase and then observed by the controller.
This is not directly controlled by the workflow controller, so it is possible for some pod phases to be missed.

| attribute | explanation |
|-------------|-------------------------------------------|
| `phase` | The phase that the pod is in |
| `namespace` | The namespace in which the pod is running |

#### `queue_adds_count`

A counter of additions to the work queues inside the controller.
Expand Down
1 change: 1 addition & 0 deletions docs/upgrading.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ The following are new metrics:

* `is_leader`
* `k8s_request_duration`
* `pods_total_count`
* `queue_duration`
* `queue_longest_running`
* `queue_retries`
Expand Down
7 changes: 5 additions & 2 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1140,7 +1140,7 @@ func (woc *wfOperationCtx) podReconciliation(ctx context.Context) (error, bool)
defer wfNodesLock.Unlock()
node, err := woc.wf.Status.Nodes.Get(nodeID)
if err == nil {
if newState := woc.assessNodeStatus(pod, node); newState != nil {
if newState := woc.assessNodeStatus(ctx, pod, node); newState != nil {
// Check whether its taskresult is in an incompleted state.
if newState.Succeeded() && woc.wf.Status.IsTaskResultIncomplete(node.ID) {
woc.log.WithFields(log.Fields{"nodeID": newState.ID}).Debug("Taskresult of the node not yet completed")
Expand Down Expand Up @@ -1332,7 +1332,7 @@ func printPodSpecLog(pod *apiv1.Pod, wfName string) {

// assessNodeStatus compares the current state of a pod with its corresponding node
// and returns the new node status if something changed
func (woc *wfOperationCtx) assessNodeStatus(pod *apiv1.Pod, old *wfv1.NodeStatus) *wfv1.NodeStatus {
func (woc *wfOperationCtx) assessNodeStatus(ctx context.Context, pod *apiv1.Pod, old *wfv1.NodeStatus) *wfv1.NodeStatus {
new := old.DeepCopy()
tmpl, err := woc.GetNodeTemplate(old)
if err != nil {
Expand Down Expand Up @@ -1386,6 +1386,9 @@ func (woc *wfOperationCtx) assessNodeStatus(pod *apiv1.Pod, old *wfv1.NodeStatus
new.Phase = wfv1.NodeError
new.Message = fmt.Sprintf("Unexpected pod phase for %s: %s", pod.ObjectMeta.Name, pod.Status.Phase)
}
if old.Phase != new.Phase {
woc.controller.metrics.ChangePodPhase(ctx, string(new.Phase), pod.ObjectMeta.Namespace)
}

// if it's ContainerSetTemplate pod then the inner container names should match to some node names,
// in this case need to update nodes according to container status
Expand Down
2 changes: 1 addition & 1 deletion workflow/controller/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1765,7 +1765,7 @@ func TestAssessNodeStatus(t *testing.T) {
cancel, controller := newController()
defer cancel()
woc := newWorkflowOperationCtx(wf, controller)
got := woc.assessNodeStatus(tt.pod, tt.node)
got := woc.assessNodeStatus(context.TODO(), tt.pod, tt.node)
assert.Equal(t, tt.want, got.Phase)
})
}
Expand Down
25 changes: 25 additions & 0 deletions workflow/metrics/counter_pod_phase.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package metrics

import (
"context"
)

const (
namePodPhase = `pods_total_count`
)

func addPodPhaseCounter(_ context.Context, m *Metrics) error {
return m.createInstrument(int64Counter,
namePodPhase,
"Total number of Pods that have entered each phase",
"{pod}",
withAsBuiltIn(),
)
}

func (m *Metrics) ChangePodPhase(ctx context.Context, phase, namespace string) {
m.addInt(ctx, namePodPhase, 1, instAttribs{
{name: labelPodPhase, value: phase},
{name: labelPodNamespace, value: namespace},
})
}
4 changes: 3 additions & 1 deletion workflow/metrics/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ const (

labelNodePhase string = `node_phase`

labelPodPhase string = `phase`
labelPodPhase string = `phase`
labelPodNamespace string = `namespace`

labelQueueName string = `queue_name`

labelRecentlyStarted string = `recently_started`
Expand Down
1 change: 1 addition & 0 deletions workflow/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ func New(ctx context.Context, serviceName string, config *Config, callbacks Call
err = metrics.populate(ctx,
addIsLeader,
addPodPhaseGauge,
addPodPhaseCounter,
addPodMissingCounter,
addWorkflowPhaseGauge,
addOperationDurationHistogram,
Expand Down

0 comments on commit ed07b58

Please sign in to comment.