From 1cd38117c86a4410c8219c2cb19cc7071fa4748b Mon Sep 17 00:00:00 2001 From: isubasinghe Date: Tue, 13 Aug 2024 14:49:24 +1000 Subject: [PATCH] fix: address Alan's feedback Signed-off-by: isubasinghe --- workflow/controller/operator.go | 13 +++++ workflow/controller/taskresult.go | 83 +++++++++++++------------------ 2 files changed, 47 insertions(+), 49 deletions(-) diff --git a/workflow/controller/operator.go b/workflow/controller/operator.go index 880939df8bcb..54b944a5e0da 100644 --- a/workflow/controller/operator.go +++ b/workflow/controller/operator.go @@ -1317,6 +1317,19 @@ func (woc *wfOperationCtx) getAllWorkflowPods() ([]*apiv1.Pod, error) { return pods, nil } +func (woc *wfOperationCtx) getAllWorkflowPodsMap() (map[string]*apiv1.Pod, error) { + podList, err := woc.getAllWorkflowPods() + if err != nil { + return nil, err + } + podMap := make(map[string]*apiv1.Pod) + for _, pod := range podList { + nodeID := woc.nodeID(pod) + podMap[nodeID] = pod + } + return podMap, nil +} + func printPodSpecLog(pod *apiv1.Pod, wfName string) { podSpecByte, err := json.Marshal(pod) log := log.WithField("workflow", wfName). diff --git a/workflow/controller/taskresult.go b/workflow/controller/taskresult.go index 471713ea6b1e..438935df5799 100644 --- a/workflow/controller/taskresult.go +++ b/workflow/controller/taskresult.go @@ -7,7 +7,6 @@ import ( "time" log "github.com/sirupsen/logrus" - apiv1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" @@ -62,7 +61,7 @@ func (wfc *WorkflowController) newWorkflowTaskResultInformer() cache.SharedIndex return informer } -func shouldErrorPodTimeout(node *wfv1.NodeStatus) bool { +func podAbsentTimeout(node *wfv1.NodeStatus) bool { return time.Since(node.StartedAt.Time) <= envutil.LookupEnvDurationOr("POD_ABSENT_TIMEOUT", 2*time.Minute) } @@ -71,22 +70,11 @@ func (woc *wfOperationCtx) taskResultReconciliation() error { objs, _ := woc.controller.taskResultInformer.GetIndexer().ByIndex(indexes.WorkflowIndex, woc.wf.Namespace+"/"+woc.wf.Name) woc.log.WithField("numObjs", len(objs)).Info("Task-result reconciliation") - taskresultCli := woc.controller.wfclientset.ArgoprojV1alpha1().WorkflowTaskResults(woc.wf.Namespace) - // We will be hitting the cache here, should we bypass the cache? - podList, err := woc.getAllWorkflowPods() + taskResultClient := woc.controller.wfclientset.ArgoprojV1alpha1().WorkflowTaskResults(woc.wf.Namespace) + podMap, err := woc.getAllWorkflowPodsMap() if err != nil { - woc.log.Warnf("couldn't retrieve pods") return err } - podMap := make(map[string]*apiv1.Pod) - - // can use woc.nodePodExit technically - for _, pod := range podList { - pod := pod - nodeID := woc.nodeID(pod) - podMap[nodeID] = pod - } - for _, obj := range objs { result := obj.(*wfv1.WorkflowTaskResult) resultName := result.GetName() @@ -94,57 +82,54 @@ func (woc *wfOperationCtx) taskResultReconciliation() error { woc.log.Debugf("task result:\n%+v", result) woc.log.Debugf("task result name:\n%+v", resultName) - label, foundLabel := result.Labels[common.LabelKeyReportOutputsCompleted] + label := result.Labels[common.LabelKeyReportOutputsCompleted] - pod, foundPod := podMap[result.Name] + _, foundPod := podMap[result.Name] node, err := woc.wf.Status.Nodes.Get(result.Name) if err != nil { if foundPod { // how does this path make any sense? - // pod created but informed not yet updated + // pod created but informer not yet updated woc.log.Errorf("couldn't obtain node for %s, but found pod, this is not expected, doing nothing", result.Name) } continue } - if foundPod { - woc.log.Debugf("Got pod %s with phase %s for task result %s and node id %s with label %s", pod.Name, pod.Status.Phase, resultName, result.Name, label) - woc.log.Debugf("The node phase was %s for node named %s", node.Phase, node.Name) - } else if !foundPod && !node.Completed() { - woc.log.Debugf("couldn't find pod") - timeout := shouldErrorPodTimeout(node) - if timeout { + if !foundPod && !node.Completed() { + if podAbsentTimeout(node) { woc.log.Infof("Determined controller should timeout for %s", result.Name) result.Labels[common.LabelKeyReportOutputsCompleted] = "true" // patch the label - err := retryutil.OnError(wait.Backoff{ - Duration: time.Second, - Factor: 2, - Jitter: 0.1, - Steps: 5, - Cap: 30 * time.Second, - }, errorsutil.IsTransientErr, func() error { - data, err := json.Marshal(&wfv1.WorkflowTaskResult{ - ObjectMeta: metav1.ObjectMeta{ - Labels: map[string]string{ - common.LabelKeyReportOutputsCompleted: "true", + if label != "true" { + err := retryutil.OnError(wait.Backoff{ + Duration: time.Second, + Factor: 2, + Jitter: 0.1, + Steps: 5, + Cap: 30 * time.Second, + }, errorsutil.IsTransientErr, func() error { + data, err := json.Marshal(&wfv1.WorkflowTaskResult{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + common.LabelKeyReportOutputsCompleted: "true", + }, }, - }, + }) + if err != nil { + return err + } + _, err = taskResultClient.Patch(context.Background(), + result.Name, + types.MergePatchType, + data, + metav1.PatchOptions{}, + ) + return err }) if err != nil { - return err + woc.log.Errorf("couldn't patch taskresult, got error: %s", err) } - _, err = taskresultCli.Patch(context.Background(), - result.Name, - types.MergePatchType, - data, - metav1.PatchOptions{}, - ) - return err - }) - if err != nil { - woc.log.Errorf("couldn't patch taskresult, got error: %s", err) } woc.markNodePhase(node.Name, wfv1.NodeFailed, "pod was absent") } else { @@ -155,7 +140,7 @@ func (woc *wfOperationCtx) taskResultReconciliation() error { if label == "true" { woc.log.Debugf("Marking task result complete %s", resultName) woc.wf.Status.MarkTaskResultComplete(resultName) - } else if label != "true" && foundLabel { + } else if label == "false" { woc.log.Debugf("Marking task result incomplete %s", resultName) woc.wf.Status.MarkTaskResultIncomplete(resultName) }