Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Set initial progress from pod metadata if exists. Fixes #13057 #13260

Merged
merged 2 commits into from
Aug 12, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
fix: Set initial progress from pod metadata if exists. Fixes #13057
Signed-off-by: jswxstw <jswxstw@gmail.com>
jswxstw committed Aug 11, 2024
commit a68f3fc47c39353c2a960ed65e4e002e5ffbfab3
28 changes: 13 additions & 15 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
@@ -1429,16 +1429,21 @@ func (woc *wfOperationCtx) assessNodeStatus(pod *apiv1.Pod, old *wfv1.NodeStatus
} else {
woc.wf.Status.MarkTaskResultIncomplete(resultName)
}
}

if x, ok := pod.Annotations[common.AnnotationKeyOutputs]; ok {
woc.log.Warn("workflow uses legacy/insecure pod patch, see https://argo-workflows.readthedocs.io/en/latest/workflow-rbac/")
if new.Outputs == nil {
new.Outputs = &wfv1.Outputs{}
if x, ok = pod.Annotations[common.AnnotationKeyOutputs]; ok {
if new.Outputs == nil {
new.Outputs = &wfv1.Outputs{}
}
if err := json.Unmarshal([]byte(x), new.Outputs); err != nil {
new.Phase = wfv1.NodeError
new.Message = err.Error()
}
}
if err := json.Unmarshal([]byte(x), new.Outputs); err != nil {
new.Phase = wfv1.NodeError
new.Message = err.Error()

if x, ok = pod.Annotations[common.AnnotationKeyProgress]; ok {
if p, ok := wfv1.ParseProgress(x); ok {
new.Progress = p
}
}
}

@@ -1448,13 +1453,6 @@ func (woc *wfOperationCtx) assessNodeStatus(pod *apiv1.Pod, old *wfv1.NodeStatus
new.Progress = wfv1.ProgressDefault
}

if x, ok := pod.Annotations[common.AnnotationKeyProgress]; ok {
woc.log.Warn("workflow uses legacy/insecure pod patch, see https://argo-workflows.readthedocs.io/en/latest/workflow-rbac/")
if p, ok := wfv1.ParseProgress(x); ok {
new.Progress = p
}
}

// We capture the exit-code after we look for the task-result.
// All other outputs are set by the executor, only the exit-code is set by the controller.
// By waiting, we avoid breaking the race-condition check.
21 changes: 16 additions & 5 deletions workflow/controller/operator_test.go
Original file line number Diff line number Diff line change
@@ -281,6 +281,9 @@ spec:
- name: pod
template: pod
- name: pod
metadata:
annotations:
workflows.argoproj.io/progress: 0/100
container:
image: my-image
`)
@@ -291,17 +294,25 @@ spec:
woc := newWorkflowOperationCtx(wf, controller)
woc.operate(ctx)

makePodsPhase(ctx, woc, apiv1.PodRunning, withProgress("50/100"))
assert.Equal(t, wfv1.WorkflowRunning, woc.wf.Status.Phase)
assert.Equal(t, wfv1.Progress("0/100"), woc.wf.Status.Progress)
assert.Equal(t, wfv1.Progress("0/100"), woc.wf.Status.Nodes[woc.wf.Name].Progress)
pod := woc.wf.Status.Nodes.FindByDisplayName("pod")
assert.Equal(t, wfv1.Progress("0/100"), pod.Progress)

// mock workflow uses legacy/insecure pod patch
makePodsPhase(ctx, woc, apiv1.PodRunning, withAnnotation(common.AnnotationKeyReportOutputsCompleted, "false"), withProgress("50/100"))
woc = newWorkflowOperationCtx(woc.wf, controller)
woc.operate(ctx)

assert.Equal(t, wfv1.WorkflowRunning, woc.wf.Status.Phase)
assert.Equal(t, wfv1.Progress("50/100"), woc.wf.Status.Progress)
assert.Equal(t, wfv1.Progress("50/100"), woc.wf.Status.Nodes[woc.wf.Name].Progress)
pod := woc.wf.Status.Nodes.FindByDisplayName("pod")
pod = woc.wf.Status.Nodes.FindByDisplayName("pod")
assert.Equal(t, wfv1.Progress("50/100"), pod.Progress)

makePodsPhase(ctx, woc, apiv1.PodSucceeded, withProgress("100/100"))
// mock workflow uses legacy/insecure pod patch
makePodsPhase(ctx, woc, apiv1.PodSucceeded, withAnnotation(common.AnnotationKeyReportOutputsCompleted, "true"), withProgress("100/100"))
woc = newWorkflowOperationCtx(woc.wf, controller)
woc.operate(ctx)

@@ -6271,7 +6282,7 @@ func TestConfigMapCacheSaveOperate(t *testing.T) {

ctx := context.Background()
woc.operate(ctx)
makePodsPhase(ctx, woc, apiv1.PodSucceeded, withExitCode(0), withOutputs(wfv1.MustMarshallJSON(sampleOutputs)))
makePodsPhase(ctx, woc, apiv1.PodSucceeded, withExitCode(0), withAnnotation(common.AnnotationKeyReportOutputsCompleted, "true"), withOutputs(wfv1.MustMarshallJSON(sampleOutputs)))
woc = newWorkflowOperationCtx(woc.wf, controller)
woc.operate(ctx)

@@ -6555,7 +6566,7 @@ spec:
assert.Equal(t, wfv1.WorkflowRunning, woc.wf.Status.Phase)

// make all created pods as successful
makePodsPhase(ctx, woc, apiv1.PodSucceeded, withOutputs(`{"parameters": [{"name": "my-param"}]}`))
makePodsPhase(ctx, woc, apiv1.PodSucceeded, withAnnotation(common.AnnotationKeyReportOutputsCompleted, "true"), withOutputs(`{"parameters": [{"name": "my-param"}]}`))

// reconcile
woc = newWorkflowOperationCtx(woc.wf, controller)
12 changes: 12 additions & 0 deletions workflow/controller/workflowpod.go
Original file line number Diff line number Diff line change
@@ -258,6 +258,18 @@ func (woc *wfOperationCtx) createWorkflowPod(ctx context.Context, nodeName strin
woc.addSchedulingConstraints(pod, wfSpec, tmpl, nodeName)
woc.addMetadata(pod, tmpl)

// Set initial progress from pod metadata if exists.
if x, ok := pod.ObjectMeta.Annotations[common.AnnotationKeyProgress]; ok {
if p, ok := wfv1.ParseProgress(x); ok {
node, err := woc.wf.Status.Nodes.Get(nodeID)
if err != nil {
woc.log.Panicf("was unable to obtain node for %s", nodeID)
}
node.Progress = p
woc.wf.Status.Nodes.Set(nodeID, *node)
}
}

err = addVolumeReferences(pod, woc.volumes, tmpl, woc.wf.Status.PersistentVolumeClaims)
if err != nil {
return nil, err