Skip to content

Commit

Permalink
fix: address Alan's feedback
Browse files Browse the repository at this point in the history
Signed-off-by: isubasinghe <[email protected]>
  • Loading branch information
isubasinghe committed Aug 13, 2024
1 parent ff2cd99 commit 1cd3811
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 49 deletions.
13 changes: 13 additions & 0 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1317,6 +1317,19 @@ func (woc *wfOperationCtx) getAllWorkflowPods() ([]*apiv1.Pod, error) {
return pods, nil
}

func (woc *wfOperationCtx) getAllWorkflowPodsMap() (map[string]*apiv1.Pod, error) {
podList, err := woc.getAllWorkflowPods()
if err != nil {
return nil, err
}
podMap := make(map[string]*apiv1.Pod)
for _, pod := range podList {
nodeID := woc.nodeID(pod)
podMap[nodeID] = pod
}
return podMap, nil
}

func printPodSpecLog(pod *apiv1.Pod, wfName string) {
podSpecByte, err := json.Marshal(pod)
log := log.WithField("workflow", wfName).
Expand Down
83 changes: 34 additions & 49 deletions workflow/controller/taskresult.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"time"

log "github.com/sirupsen/logrus"
apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -62,7 +61,7 @@ func (wfc *WorkflowController) newWorkflowTaskResultInformer() cache.SharedIndex
return informer
}

func shouldErrorPodTimeout(node *wfv1.NodeStatus) bool {
func podAbsentTimeout(node *wfv1.NodeStatus) bool {
return time.Since(node.StartedAt.Time) <= envutil.LookupEnvDurationOr("POD_ABSENT_TIMEOUT", 2*time.Minute)
}

Expand All @@ -71,80 +70,66 @@ func (woc *wfOperationCtx) taskResultReconciliation() error {
objs, _ := woc.controller.taskResultInformer.GetIndexer().ByIndex(indexes.WorkflowIndex, woc.wf.Namespace+"/"+woc.wf.Name)
woc.log.WithField("numObjs", len(objs)).Info("Task-result reconciliation")

taskresultCli := woc.controller.wfclientset.ArgoprojV1alpha1().WorkflowTaskResults(woc.wf.Namespace)
// We will be hitting the cache here, should we bypass the cache?
podList, err := woc.getAllWorkflowPods()
taskResultClient := woc.controller.wfclientset.ArgoprojV1alpha1().WorkflowTaskResults(woc.wf.Namespace)
podMap, err := woc.getAllWorkflowPodsMap()
if err != nil {
woc.log.Warnf("couldn't retrieve pods")
return err
}
podMap := make(map[string]*apiv1.Pod)

// can use woc.nodePodExit technically
for _, pod := range podList {
pod := pod
nodeID := woc.nodeID(pod)
podMap[nodeID] = pod
}

for _, obj := range objs {
result := obj.(*wfv1.WorkflowTaskResult)
resultName := result.GetName()

woc.log.Debugf("task result:\n%+v", result)
woc.log.Debugf("task result name:\n%+v", resultName)

label, foundLabel := result.Labels[common.LabelKeyReportOutputsCompleted]
label := result.Labels[common.LabelKeyReportOutputsCompleted]

pod, foundPod := podMap[result.Name]
_, foundPod := podMap[result.Name]
node, err := woc.wf.Status.Nodes.Get(result.Name)
if err != nil {
if foundPod {
// how does this path make any sense?
// pod created but informed not yet updated
// pod created but informer not yet updated
woc.log.Errorf("couldn't obtain node for %s, but found pod, this is not expected, doing nothing", result.Name)
}
continue
}

if foundPod {
woc.log.Debugf("Got pod %s with phase %s for task result %s and node id %s with label %s", pod.Name, pod.Status.Phase, resultName, result.Name, label)
woc.log.Debugf("The node phase was %s for node named %s", node.Phase, node.Name)
} else if !foundPod && !node.Completed() {
woc.log.Debugf("couldn't find pod")
timeout := shouldErrorPodTimeout(node)
if timeout {
if !foundPod && !node.Completed() {
if podAbsentTimeout(node) {
woc.log.Infof("Determined controller should timeout for %s", result.Name)
result.Labels[common.LabelKeyReportOutputsCompleted] = "true"

// patch the label
err := retryutil.OnError(wait.Backoff{
Duration: time.Second,
Factor: 2,
Jitter: 0.1,
Steps: 5,
Cap: 30 * time.Second,
}, errorsutil.IsTransientErr, func() error {
data, err := json.Marshal(&wfv1.WorkflowTaskResult{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
common.LabelKeyReportOutputsCompleted: "true",
if label != "true" {
err := retryutil.OnError(wait.Backoff{
Duration: time.Second,
Factor: 2,
Jitter: 0.1,
Steps: 5,
Cap: 30 * time.Second,
}, errorsutil.IsTransientErr, func() error {
data, err := json.Marshal(&wfv1.WorkflowTaskResult{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
common.LabelKeyReportOutputsCompleted: "true",
},
},
},
})
if err != nil {
return err
}
_, err = taskResultClient.Patch(context.Background(),
result.Name,
types.MergePatchType,
data,
metav1.PatchOptions{},
)
return err
})
if err != nil {
return err
woc.log.Errorf("couldn't patch taskresult, got error: %s", err)
}
_, err = taskresultCli.Patch(context.Background(),
result.Name,
types.MergePatchType,
data,
metav1.PatchOptions{},
)
return err
})
if err != nil {
woc.log.Errorf("couldn't patch taskresult, got error: %s", err)
}
woc.markNodePhase(node.Name, wfv1.NodeFailed, "pod was absent")
} else {
Expand All @@ -155,7 +140,7 @@ func (woc *wfOperationCtx) taskResultReconciliation() error {
if label == "true" {
woc.log.Debugf("Marking task result complete %s", resultName)
woc.wf.Status.MarkTaskResultComplete(resultName)
} else if label != "true" && foundLabel {
} else if label == "false" {
woc.log.Debugf("Marking task result incomplete %s", resultName)
woc.wf.Status.MarkTaskResultIncomplete(resultName)
}
Expand Down

0 comments on commit 1cd3811

Please sign in to comment.