Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Mark task result as completed if pod has been deleted for a while. Fixes #13533 #13537

Merged
merged 1 commit into from
Sep 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/environment-variables.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ This document outlines environment variables that can be used to customize behav
| `CRON_SYNC_PERIOD` | `time.Duration` | `10s` | How often to sync cron workflows. |
| `DEFAULT_REQUEUE_TIME` | `time.Duration` | `10s` | The re-queue time for the rate limiter of the workflow queue. |
| `DISABLE_MAX_RECURSION` | `bool` | `false` | Set to true to disable the recursion preventer, which will stop a workflow running which has called into a child template 100 times |
| `POD_ABSENT_TIMEOUT` | `time.Duration` | `2m` | The time used to determine if pod absence should imply node failure |
| `EXPRESSION_TEMPLATES` | `bool` | `true` | Escape hatch to disable expression templates. |
| `EVENT_AGGREGATION_WITH_ANNOTATIONS` | `bool` | `false` | Whether event annotations will be used when aggregating events. |
| `GZIP_IMPLEMENTATION` | `string` | `PGZip` | The implementation of compression/decompression. Currently only "`PGZip`" and "`GZip`" are supported. |
Expand All @@ -46,6 +45,7 @@ This document outlines environment variables that can be used to customize behav
| `OPERATION_DURATION_METRIC_BUCKET_COUNT` | `int` | `6` | The number of buckets to collect the metric for the operation duration. |
| `POD_NAMES` | `string` | `v2` | Whether to have pod names contain the template name (v2) or be the node id (v1) - should be set the same for Argo Server. |
| `RECENTLY_STARTED_POD_DURATION` | `time.Duration` | `10s` | The duration of a pod before the pod is considered to be recently started. |
| `RECENTLY_DELETED_POD_DURATION` | `time.Duration` | `10s` | The duration of a pod before the pod is considered to be recently deleted. |
| `RETRY_BACKOFF_DURATION` | `time.Duration` | `10ms` | The retry back-off duration when retrying API calls. |
| `RETRY_BACKOFF_FACTOR` | `float` | `2.0` | The retry back-off factor when retrying API calls. |
| `RETRY_BACKOFF_STEPS` | `int` | `5` | The retry back-off steps when retrying API calls. |
Expand Down
5 changes: 5 additions & 0 deletions pkg/apis/workflow/v1alpha1/workflow_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -2402,6 +2402,11 @@ func (n NodeStatus) IsExitNode() bool {
return strings.HasSuffix(n.DisplayName, ".onExit")
}

// IsPodDeleted returns whether node is error with pod deleted.
func (n NodeStatus) IsPodDeleted() bool {
return n.Phase == NodeError && n.Message == "pod deleted"
}

func (n NodeStatus) Succeeded() bool {
return n.Phase == NodeSucceeded
}
Expand Down
20 changes: 2 additions & 18 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,10 +234,7 @@ func (woc *wfOperationCtx) operate(ctx context.Context) {
woc.addArtifactGCFinalizer()

// Reconciliation of Outputs (Artifacts). See ReportOutputs() of executor.go.
err = woc.taskResultReconciliation()
if err != nil {
woc.markWorkflowError(ctx, fmt.Errorf("failed to reconcile: %v", err))
}
woc.taskResultReconciliation()

// Do artifact GC if task result reconciliation is complete.
if woc.wf.Status.Fulfilled() {
Expand Down Expand Up @@ -1227,7 +1224,7 @@ func (woc *wfOperationCtx) podReconciliation(ctx context.Context) (error, bool)
}

if recentlyStarted {
// If the pod was deleted, then we it is possible that the controller never get another informer message about it.
// If the pod was deleted, then it is possible that the controller never get another informer message about it.
// In this case, the workflow will only be requeued after the resync period (20m). This means
// workflow will not update for 20m. Requeuing here prevents that happening.
woc.requeue()
Expand Down Expand Up @@ -1317,19 +1314,6 @@ func (woc *wfOperationCtx) getAllWorkflowPods() ([]*apiv1.Pod, error) {
return pods, nil
}

func (woc *wfOperationCtx) getAllWorkflowPodsMap() (map[string]*apiv1.Pod, error) {
podList, err := woc.getAllWorkflowPods()
if err != nil {
return nil, err
}
podMap := make(map[string]*apiv1.Pod)
for _, pod := range podList {
nodeID := woc.nodeID(pod)
podMap[nodeID] = pod
}
return podMap, nil
}

func printPodSpecLog(pod *apiv1.Pod, wfName string) {
podSpecByte, err := json.Marshal(pod)
log := log.WithField("workflow", wfName).
Expand Down
47 changes: 16 additions & 31 deletions workflow/controller/taskresult.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,19 +54,14 @@ func (wfc *WorkflowController) newWorkflowTaskResultInformer() cache.SharedIndex
return informer
}

func podAbsentTimeout(node *wfv1.NodeStatus) bool {
return time.Since(node.StartedAt.Time) <= envutil.LookupEnvDurationOr("POD_ABSENT_TIMEOUT", 2*time.Minute)
func recentlyDeleted(node *wfv1.NodeStatus) bool {
return time.Since(node.FinishedAt.Time) <= envutil.LookupEnvDurationOr("RECENTLY_DELETED_POD_DURATION", 10*time.Second)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I'm not mistaken, the reason for using startedAt was because the finishedAt time may never have been recorded

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I reviewed this I'd read this as being a finished rather than a start time - so the intention here is is correct.

I think the current IsPodDeleted() guards this against not having a FinishedAt, but that is problematic. @isubasinghe and I did discuss this briefly before he disappeared for the weekend, and we'd like to have a proper look as this almost certainly breaks the part of #13454 which was to ensure 3.4->3.5 upgrade worked as hoped for in flight workflows.

Copy link
Member Author

@jswxstw jswxstw Sep 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this almost certainly breaks the part of #13454 which was to ensure 3.4->3.5 upgrade worked as hoped for in flight workflows.

@Joibel Do you mean this problem #12103 (comment)?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is correct. @jswxstw do you think you would be able to make time for a chat about this issue?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually this doesn't break the 3.4 -> 3.5 upgrade path, the else if check is present meaning we never falsely update the TaskCompletionStatus.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is correct. @jswxstw do you think you would be able to make time for a chat about this issue?

Sure, I have time now.

Actually this doesn't break the 3.4 -> 3.5 upgrade path, the else if check is present meaning we never falsely update the TaskCompletionStatus.

Yes, I kept this fix.

Copy link
Member

@isubasinghe isubasinghe Sep 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I have time now.

Its alright, I have enough confidence now that this is a better approach than #13454

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Related: #13798 (comment)

}

func (woc *wfOperationCtx) taskResultReconciliation() error {

func (woc *wfOperationCtx) taskResultReconciliation() {
objs, _ := woc.controller.taskResultInformer.GetIndexer().ByIndex(indexes.WorkflowIndex, woc.wf.Namespace+"/"+woc.wf.Name)
woc.log.WithField("numObjs", len(objs)).Info("Task-result reconciliation")

podMap, err := woc.getAllWorkflowPodsMap()
if err != nil {
return err
}
for _, obj := range objs {
result := obj.(*wfv1.WorkflowTaskResult)
resultName := result.GetName()
Expand All @@ -75,7 +70,6 @@ func (woc *wfOperationCtx) taskResultReconciliation() error {
woc.log.Debugf("task result name:\n%+v", resultName)

label := result.Labels[common.LabelKeyReportOutputsCompleted]

// If the task result is completed, set the state to true.
if label == "true" {
woc.log.Debugf("Marking task result complete %s", resultName)
Expand All @@ -85,33 +79,25 @@ func (woc *wfOperationCtx) taskResultReconciliation() error {
woc.wf.Status.MarkTaskResultIncomplete(resultName)
}

_, foundPod := podMap[result.Name]
node, err := woc.wf.Status.Nodes.Get(result.Name)
nodeID := result.Name
old, err := woc.wf.Status.Nodes.Get(nodeID)
if err != nil {
if foundPod {
// how does this path make any sense?
// pod created but informer not yet updated
woc.log.Errorf("couldn't obtain node for %s, but found pod, this is not expected, doing nothing", result.Name)
}
continue
}

if !foundPod && !node.Completed() {
if podAbsentTimeout(node) {
woc.log.Infof("Determined controller should timeout for %s", result.Name)
woc.wf.Status.MarkTaskResultComplete(resultName)

woc.markNodePhase(node.Name, wfv1.NodeFailed, "pod was absent")
// Mark task result as completed if it has no chance to be completed.
if label == "false" && old.IsPodDeleted() {
if recentlyDeleted(old) {
woc.log.WithField("nodeID", nodeID).Debug("Wait for marking task result as completed because pod is recently deleted.")
// If the pod was deleted, then it is possible that the controller never get another informer message about it.
// In this case, the workflow will only be requeued after the resync period (20m). This means
// workflow will not update for 20m. Requeuing here prevents that happening.
woc.requeue()
continue
} else {
woc.log.Debugf("Determined controller shouldn't timeout %s", result.Name)
woc.log.WithField("nodeID", nodeID).Info("Marking task result as completed because pod has been deleted for a while.")
woc.wf.Status.MarkTaskResultComplete(nodeID)
}
}

nodeID := result.Name
old, err := woc.wf.Status.Nodes.Get(nodeID)
if err != nil {
continue
}
newNode := old.DeepCopy()
if result.Outputs.HasOutputs() {
if newNode.Outputs == nil {
Expand All @@ -133,5 +119,4 @@ func (woc *wfOperationCtx) taskResultReconciliation() error {
woc.updated = true
}
}
return nil
}
Loading