Skip to content

Commit

Permalink
fix: Mark task result as completed if pod has been deleted for a while.
Browse files Browse the repository at this point in the history
Fixes #13533

Signed-off-by: oninowang <[email protected]>
  • Loading branch information
jswxstw authored and oninowang committed Aug 30, 2024
1 parent 3d41fb2 commit ddb6ab1
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 44 deletions.
2 changes: 1 addition & 1 deletion docs/environment-variables.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ This document outlines environment variables that can be used to customize behav
| `CRON_SYNC_PERIOD` | `time.Duration` | `10s` | How often to sync cron workflows. |
| `DEFAULT_REQUEUE_TIME` | `time.Duration` | `10s` | The re-queue time for the rate limiter of the workflow queue. |
| `DISABLE_MAX_RECURSION` | `bool` | `false` | Set to true to disable the recursion preventer, which will stop a workflow running which has called into a child template 100 times |
| `POD_ABSENT_TIMEOUT` | `time.Duration` | `2m` | The time used to determine if pod absence should imply node failure |
| `EXPRESSION_TEMPLATES` | `bool` | `true` | Escape hatch to disable expression templates. |
| `EVENT_AGGREGATION_WITH_ANNOTATIONS` | `bool` | `false` | Whether event annotations will be used when aggregating events. |
| `GZIP_IMPLEMENTATION` | `string` | `PGZip` | The implementation of compression/decompression. Currently only "`PGZip`" and "`GZip`" are supported. |
Expand All @@ -46,6 +45,7 @@ This document outlines environment variables that can be used to customize behav
| `OPERATION_DURATION_METRIC_BUCKET_COUNT` | `int` | `6` | The number of buckets to collect the metric for the operation duration. |
| `POD_NAMES` | `string` | `v2` | Whether to have pod names contain the template name (v2) or be the node id (v1) - should be set the same for Argo Server. |
| `RECENTLY_STARTED_POD_DURATION` | `time.Duration` | `10s` | The duration of a pod before the pod is considered to be recently started. |
| `RECENTLY_DELETED_POD_DURATION` | `time.Duration` | `10s` | The duration of a pod before the pod is considered to be recently deleted. |
| `RETRY_BACKOFF_DURATION` | `time.Duration` | `10ms` | The retry back-off duration when retrying API calls. |
| `RETRY_BACKOFF_FACTOR` | `float` | `2.0` | The retry back-off factor when retrying API calls. |
| `RETRY_BACKOFF_STEPS` | `int` | `5` | The retry back-off steps when retrying API calls. |
Expand Down
5 changes: 5 additions & 0 deletions pkg/apis/workflow/v1alpha1/workflow_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -2402,6 +2402,11 @@ func (n NodeStatus) IsExitNode() bool {
return strings.HasSuffix(n.DisplayName, ".onExit")
}

// IsPodDeleted returns whether node is error with pod deleted.
func (n NodeStatus) IsPodDeleted() bool {
return n.Phase == NodeError && n.Message == "pod deleted"
}

func (n NodeStatus) Succeeded() bool {
return n.Phase == NodeSucceeded
}
Expand Down
15 changes: 1 addition & 14 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1227,7 +1227,7 @@ func (woc *wfOperationCtx) podReconciliation(ctx context.Context) (error, bool)
}

if recentlyStarted {
// If the pod was deleted, then we it is possible that the controller never get another informer message about it.
// If the pod was deleted, then it is possible that the controller never get another informer message about it.
// In this case, the workflow will only be requeued after the resync period (20m). This means
// workflow will not update for 20m. Requeuing here prevents that happening.
woc.requeue()
Expand Down Expand Up @@ -1317,19 +1317,6 @@ func (woc *wfOperationCtx) getAllWorkflowPods() ([]*apiv1.Pod, error) {
return pods, nil
}

func (woc *wfOperationCtx) getAllWorkflowPodsMap() (map[string]*apiv1.Pod, error) {
podList, err := woc.getAllWorkflowPods()
if err != nil {
return nil, err
}
podMap := make(map[string]*apiv1.Pod)
for _, pod := range podList {
nodeID := woc.nodeID(pod)
podMap[nodeID] = pod
}
return podMap, nil
}

func printPodSpecLog(pod *apiv1.Pod, wfName string) {
podSpecByte, err := json.Marshal(pod)
log := log.WithField("workflow", wfName).
Expand Down
44 changes: 15 additions & 29 deletions workflow/controller/taskresult.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,19 +54,14 @@ func (wfc *WorkflowController) newWorkflowTaskResultInformer() cache.SharedIndex
return informer
}

func podAbsentTimeout(node *wfv1.NodeStatus) bool {
return time.Since(node.StartedAt.Time) <= envutil.LookupEnvDurationOr("POD_ABSENT_TIMEOUT", 2*time.Minute)
func recentlyDeleted(node *wfv1.NodeStatus) bool {
return time.Since(node.FinishedAt.Time) <= envutil.LookupEnvDurationOr("RECENTLY_DELETED_POD_DURATION", 10*time.Second)
}

func (woc *wfOperationCtx) taskResultReconciliation() error {

Check failure on line 61 in workflow/controller/taskresult.go

View workflow job for this annotation

GitHub Actions / Lint

(*wfOperationCtx).taskResultReconciliation - result 0 (error) is always nil (unparam)

objs, _ := woc.controller.taskResultInformer.GetIndexer().ByIndex(indexes.WorkflowIndex, woc.wf.Namespace+"/"+woc.wf.Name)
woc.log.WithField("numObjs", len(objs)).Info("Task-result reconciliation")

podMap, err := woc.getAllWorkflowPodsMap()
if err != nil {
return err
}
for _, obj := range objs {
result := obj.(*wfv1.WorkflowTaskResult)
resultName := result.GetName()
Expand All @@ -75,7 +70,6 @@ func (woc *wfOperationCtx) taskResultReconciliation() error {
woc.log.Debugf("task result name:\n%+v", resultName)

label := result.Labels[common.LabelKeyReportOutputsCompleted]

// If the task result is completed, set the state to true.
if label == "true" {
woc.log.Debugf("Marking task result complete %s", resultName)
Expand All @@ -85,33 +79,25 @@ func (woc *wfOperationCtx) taskResultReconciliation() error {
woc.wf.Status.MarkTaskResultIncomplete(resultName)
}

_, foundPod := podMap[result.Name]
node, err := woc.wf.Status.Nodes.Get(result.Name)
nodeID := result.Name
old, err := woc.wf.Status.Nodes.Get(nodeID)
if err != nil {
if foundPod {
// how does this path make any sense?
// pod created but informer not yet updated
woc.log.Errorf("couldn't obtain node for %s, but found pod, this is not expected, doing nothing", result.Name)
}
continue
}

if !foundPod && !node.Completed() {
if podAbsentTimeout(node) {
woc.log.Infof("Determined controller should timeout for %s", result.Name)
woc.wf.Status.MarkTaskResultComplete(resultName)

woc.markNodePhase(node.Name, wfv1.NodeFailed, "pod was absent")
// Mark task result as completed if it has no chance to be completed.
if label == "false" && old.IsPodDeleted() {
if recentlyDeleted(old) {
woc.log.WithField("nodeID", nodeID).Debug("Wait for marking task result as completed because pod is recently deleted.")
// If the pod was deleted, then it is possible that the controller never get another informer message about it.
// In this case, the workflow will only be requeued after the resync period (20m). This means
// workflow will not update for 20m. Requeuing here prevents that happening.
woc.requeue()
continue
} else {
woc.log.Debugf("Determined controller shouldn't timeout %s", result.Name)
woc.log.WithField("nodeID", nodeID).Info("Marking task result as completed because pod has been deleted for a while.")
woc.wf.Status.MarkTaskResultComplete(nodeID)
}
}

nodeID := result.Name
old, err := woc.wf.Status.Nodes.Get(nodeID)
if err != nil {
continue
}
newNode := old.DeepCopy()
if result.Outputs.HasOutputs() {
if newNode.Outputs == nil {
Expand Down

0 comments on commit ddb6ab1

Please sign in to comment.