Skip to content

Commit

Permalink
fix: process metrics later in executeTemplate. Fixes #13162 (#13163)
Browse files Browse the repository at this point in the history
Signed-off-by: Dillen Padhiar <[email protected]>
  • Loading branch information
dpadhiar authored Jun 18, 2024
1 parent 9c57c37 commit 6201d75
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 23 deletions.
17 changes: 17 additions & 0 deletions test/e2e/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,23 @@ func (s *MetricsSuite) TestDAGMetrics() {
})
}

func (s *MetricsSuite) TestFailedMetric() {
s.Given().
Workflow(`@testdata/template-status-failed-conditional-metric.yaml`).
When().
SubmitWorkflow().
WaitForWorkflow(fixtures.ToBeFailed).
Then().
ExpectWorkflow(func(t *testing.T, metadata *metav1.ObjectMeta, status *wfv1.WorkflowStatus) {
assert.Equal(t, wfv1.WorkflowFailed, status.Phase)
s.e(s.T()).GET("").
Expect().
Status(200).
Body().
Contains(`argo_workflows_task_failure 1`)
})
}

func TestMetricsSuite(t *testing.T) {
suite.Run(t, new(MetricsSuite))
}
31 changes: 31 additions & 0 deletions test/e2e/testdata/template-status-failed-conditional-metric.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: template-status-failed-conditional-metric-
spec:
entrypoint: dag
templates:
- name: dag
metrics:
prometheus:
- counter:
value: "1"
help: Task failed
name: task_failure
when: '{{status}} == Failed'
dag:
tasks:
- name: test
template: echo
arguments:
parameters:
- name: message
value: "test"

- name: echo
inputs:
parameters:
- name: message
container:
image: alpine:3.7
command: ["{{inputs.parameters.message}}"]
46 changes: 23 additions & 23 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2218,29 +2218,6 @@ func (woc *wfOperationCtx) executeTemplate(ctx context.Context, nodeName string,
woc.controller.syncManager.Release(woc.wf, node.ID, processedTmpl.Synchronization)
}

if processedTmpl.Metrics != nil {
// Check if the node was just created, if it was emit realtime metrics.
// If the node did not previously exist, we can infer that it was created during the current operation, emit real time metrics.
if _, ok := woc.preExecutionNodePhases[node.ID]; !ok {
localScope, realTimeScope := woc.prepareMetricScope(node)
woc.computeMetrics(processedTmpl.Metrics.Prometheus, localScope, realTimeScope, true)
}
// Check if the node completed during this execution, if it did emit metrics
//
// This check is necessary because sometimes a node will be marked completed during the current execution and will
// not be considered again. The best example of this is the entrypoint steps/dag template (once completed, the
// workflow ends and it's not reconsidered). This checks makes sure that its metrics also get emitted.
//
// In this check, a completed node may or may not have existed prior to this execution. If it did exist, ensure that it wasn't
// completed before this execution. If it did not exist prior, then we can infer that it was completed during this execution.
// The statement "(!ok || !prevNodeStatus.Fulfilled())" checks for this behavior and represents the material conditional
// "ok -> !prevNodeStatus.Fulfilled()" (https://en.wikipedia.org/wiki/Material_conditional)
if prevNodeStatus, ok := woc.preExecutionNodePhases[node.ID]; (!ok || !prevNodeStatus.Fulfilled()) && node.Fulfilled() {
localScope, realTimeScope := woc.prepareMetricScope(node)
woc.computeMetrics(processedTmpl.Metrics.Prometheus, localScope, realTimeScope, false)
}
}

retrieveNode, err := woc.wf.GetNodeByName(node.Name)
if err != nil {
err := fmt.Errorf("no Node found by the name of %s; wf.Status.Nodes=%+v", node.Name, woc.wf.Status.Nodes)
Expand Down Expand Up @@ -2270,6 +2247,29 @@ func (woc *wfOperationCtx) executeTemplate(ctx context.Context, nodeName string,
node = retryNode
}

if processedTmpl.Metrics != nil {
// Check if the node was just created, if it was emit realtime metrics.
// If the node did not previously exist, we can infer that it was created during the current operation, emit real time metrics.
if _, ok := woc.preExecutionNodePhases[node.ID]; !ok {
localScope, realTimeScope := woc.prepareMetricScope(node)
woc.computeMetrics(processedTmpl.Metrics.Prometheus, localScope, realTimeScope, true)
}
// Check if the node completed during this execution, if it did emit metrics
//
// This check is necessary because sometimes a node will be marked completed during the current execution and will
// not be considered again. The best example of this is the entrypoint steps/dag template (once completed, the
// workflow ends and it's not reconsidered). This checks makes sure that its metrics also get emitted.
//
// In this check, a completed node may or may not have existed prior to this execution. If it did exist, ensure that it wasn't
// completed before this execution. If it did not exist prior, then we can infer that it was completed during this execution.
// The statement "(!ok || !prevNodeStatus.Fulfilled())" checks for this behavior and represents the material conditional
// "ok -> !prevNodeStatus.Fulfilled()" (https://en.wikipedia.org/wiki/Material_conditional)
if prevNodeStatus, ok := woc.preExecutionNodePhases[node.ID]; (!ok || !prevNodeStatus.Fulfilled()) && node.Fulfilled() {
localScope, realTimeScope := woc.prepareMetricScope(node)
woc.computeMetrics(processedTmpl.Metrics.Prometheus, localScope, realTimeScope, false)
}
}

return node, nil
}

Expand Down

0 comments on commit 6201d75

Please sign in to comment.