diff --git a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/iface.go b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/iface.go index b3d744bd77..6590aaa04a 100644 --- a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/iface.go +++ b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/iface.go @@ -318,7 +318,7 @@ type MutableNodeStatus interface { SetOutputDir(d DataReference) SetParentNodeID(n *NodeID) SetParentTaskID(t *core.TaskExecutionIdentifier) - UpdatePhase(phase NodePhase, occurredAt metav1.Time, reason string, err *core.ExecutionError) + UpdatePhase(phase NodePhase, occurredAt metav1.Time, reason string, enableCRDebugMetadata bool, err *core.ExecutionError) IncrementAttempts() uint32 IncrementSystemFailures() uint32 SetCached() diff --git a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/mocks/ExecutableNodeStatus.go b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/mocks/ExecutableNodeStatus.go index cb447e06fc..cdf3f1b6ab 100644 --- a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/mocks/ExecutableNodeStatus.go +++ b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/mocks/ExecutableNodeStatus.go @@ -1187,9 +1187,9 @@ func (_m *ExecutableNodeStatus) SetParentTaskID(t *core.TaskExecutionIdentifier) _m.Called(t) } -// UpdatePhase provides a mock function with given fields: phase, occurredAt, reason, err -func (_m *ExecutableNodeStatus) UpdatePhase(phase v1alpha1.NodePhase, occurredAt v1.Time, reason string, err *core.ExecutionError) { - _m.Called(phase, occurredAt, reason, err) +// UpdatePhase provides a mock function with given fields: phase, occurredAt, reason, enableCRDebugMetadata, err +func (_m *ExecutableNodeStatus) UpdatePhase(phase v1alpha1.NodePhase, occurredAt v1.Time, reason string, enableCRDebugMetadata bool, err *core.ExecutionError) { + _m.Called(phase, occurredAt, reason, enableCRDebugMetadata, err) } // VisitNodeStatuses provides a mock function with given fields: visitor diff --git a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/mocks/MutableNodeStatus.go b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/mocks/MutableNodeStatus.go index b8c97f6be7..3f103bc2ec 100644 --- a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/mocks/MutableNodeStatus.go +++ b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/mocks/MutableNodeStatus.go @@ -587,7 +587,7 @@ func (_m *MutableNodeStatus) SetParentTaskID(t *core.TaskExecutionIdentifier) { _m.Called(t) } -// UpdatePhase provides a mock function with given fields: phase, occurredAt, reason, err -func (_m *MutableNodeStatus) UpdatePhase(phase v1alpha1.NodePhase, occurredAt v1.Time, reason string, err *core.ExecutionError) { - _m.Called(phase, occurredAt, reason, err) +// UpdatePhase provides a mock function with given fields: phase, occurredAt, reason, enableCRDebugMetadata, err +func (_m *MutableNodeStatus) UpdatePhase(phase v1alpha1.NodePhase, occurredAt v1.Time, reason string, enableCRDebugMetadata bool, err *core.ExecutionError) { + _m.Called(phase, occurredAt, reason, enableCRDebugMetadata, err) } diff --git a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go index 9fb891d01a..aab034224d 100644 --- a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go +++ b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go @@ -594,7 +594,7 @@ func (in *NodeStatus) GetOrCreateArrayNodeStatus() MutableArrayNodeStatus { return in.ArrayNodeStatus } -func (in *NodeStatus) UpdatePhase(p NodePhase, occurredAt metav1.Time, reason string, err *core.ExecutionError) { +func (in *NodeStatus) UpdatePhase(p NodePhase, occurredAt metav1.Time, reason string, enableCRDebugMetadata bool, err *core.ExecutionError) { if in.Phase == p && in.Message == reason { // We will not update the phase multiple times. This prevents the comparison from returning false positive return @@ -607,6 +607,7 @@ func (in *NodeStatus) UpdatePhase(p NodePhase, occurredAt metav1.Time, reason st } n := occurredAt + in.LastUpdatedAt = &n if occurredAt.IsZero() { n = metav1.Now() } @@ -625,35 +626,31 @@ func (in *NodeStatus) UpdatePhase(p NodePhase, occurredAt metav1.Time, reason st in.LastAttemptStartedAt = &n } } else if IsPhaseTerminal(p) { - // If we are in terminal phase then we will clear out all our fields as they are not required anymore - // Only thing required is stopped at and lastupdatedat time if in.StoppedAt == nil { in.StoppedAt = &n } - if in.StartedAt == nil { - in.StartedAt = &n - } - if in.LastAttemptStartedAt == nil { - in.LastAttemptStartedAt = &n + if p == NodePhaseSucceeded || p == NodePhaseSkipped || !enableCRDebugMetadata { + // Clear most status related fields after reaching a terminal state. This keeps the CR state small to avoid + // etcd size limits. Importantly we keep Phase, StoppedAt and Error which will be needed further. + in.Message = "" + in.QueuedAt = nil + in.StartedAt = nil + in.LastUpdatedAt = nil + in.LastAttemptStartedAt = nil + in.DynamicNodeStatus = nil + in.BranchStatus = nil + in.SubNodeStatus = nil + in.TaskNodeStatus = nil + in.WorkflowNodeStatus = nil + } else { + if in.StartedAt == nil { + in.StartedAt = &n + } + if in.LastAttemptStartedAt == nil { + in.LastAttemptStartedAt = &n + } } } - in.LastUpdatedAt = &n - - // For cases in which the node is either Succeeded or Skipped we clear most fields from the status - // except for StoppedAt and Phase. StoppedAt is used to calculate transition latency between this node and - // any downstream nodes and Phase is required for propeller to continue to downstream nodes. - if p == NodePhaseSucceeded || p == NodePhaseSkipped { - in.Message = "" - in.QueuedAt = nil - in.StartedAt = nil - in.LastAttemptStartedAt = nil - in.DynamicNodeStatus = nil - in.BranchStatus = nil - in.SubNodeStatus = nil - in.TaskNodeStatus = nil - in.WorkflowNodeStatus = nil - in.LastUpdatedAt = nil - } in.SetDirty() } diff --git a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status_test.go b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status_test.go index 45c299687a..0278d62f55 100644 --- a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status_test.go +++ b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status_test.go @@ -259,71 +259,202 @@ func TestNodeStatus_UpdatePhase(t *testing.T) { n := metav1.NewTime(time.Now()) const queued = "queued" - t.Run("identical-phase", func(t *testing.T) { - p := NodePhaseQueued - ns := NodeStatus{ - Phase: p, - Message: queued, - } - msg := queued - ns.UpdatePhase(p, n, msg, nil) - assert.Nil(t, ns.QueuedAt) - }) + const success = "success" + for _, enableCRDebugMetadata := range []bool{false, true} { + t.Run("identical-phase", func(t *testing.T) { + p := NodePhaseQueued + ns := NodeStatus{ + Phase: p, + Message: queued, + } + msg := queued + ns.UpdatePhase(p, n, msg, enableCRDebugMetadata, nil) + assert.Nil(t, ns.QueuedAt) + }) - t.Run("zero", func(t *testing.T) { - p := NodePhaseQueued - ns := NodeStatus{} - msg := queued - ns.UpdatePhase(p, metav1.NewTime(time.Time{}), msg, nil) - assert.NotNil(t, ns.QueuedAt) - }) + t.Run("zero", func(t *testing.T) { + p := NodePhaseQueued + ns := NodeStatus{} + msg := queued + ns.UpdatePhase(p, metav1.NewTime(time.Time{}), msg, enableCRDebugMetadata, nil) + assert.NotNil(t, ns.QueuedAt) + }) - t.Run("non-terminal", func(t *testing.T) { - ns := NodeStatus{} - p := NodePhaseQueued - msg := queued - ns.UpdatePhase(p, n, msg, nil) + t.Run("non-terminal", func(t *testing.T) { + ns := NodeStatus{} + p := NodePhaseQueued + msg := queued + ns.UpdatePhase(p, n, msg, enableCRDebugMetadata, nil) + + assert.Equal(t, *ns.LastUpdatedAt, n) + assert.Equal(t, *ns.QueuedAt, n) + assert.Nil(t, ns.LastAttemptStartedAt) + assert.Nil(t, ns.StartedAt) + assert.Nil(t, ns.StoppedAt) + assert.Equal(t, p, ns.Phase) + assert.Equal(t, msg, ns.Message) + assert.Nil(t, ns.Error) + }) - assert.Equal(t, *ns.LastUpdatedAt, n) - assert.Equal(t, *ns.QueuedAt, n) - assert.Nil(t, ns.LastAttemptStartedAt) - assert.Nil(t, ns.StartedAt) - assert.Nil(t, ns.StoppedAt) - assert.Equal(t, p, ns.Phase) - assert.Equal(t, msg, ns.Message) - assert.Nil(t, ns.Error) - }) + t.Run("non-terminal-running", func(t *testing.T) { + ns := NodeStatus{} + p := NodePhaseRunning + msg := "running" + ns.UpdatePhase(p, n, msg, enableCRDebugMetadata, nil) + + assert.Equal(t, *ns.LastUpdatedAt, n) + assert.Nil(t, ns.QueuedAt) + assert.Equal(t, *ns.LastAttemptStartedAt, n) + assert.Equal(t, *ns.StartedAt, n) + assert.Nil(t, ns.StoppedAt) + assert.Equal(t, p, ns.Phase) + assert.Equal(t, msg, ns.Message) + assert.Nil(t, ns.Error) + }) + + t.Run("non-terminal-timing-out", func(t *testing.T) { + ns := NodeStatus{} + p := NodePhaseTimingOut + msg := "timing-out" + ns.UpdatePhase(p, n, msg, enableCRDebugMetadata, nil) + + assert.Equal(t, *ns.LastUpdatedAt, n) + assert.Nil(t, ns.QueuedAt) + assert.Nil(t, ns.LastAttemptStartedAt) + assert.Nil(t, ns.StartedAt) + assert.Nil(t, ns.StoppedAt) + assert.Equal(t, p, ns.Phase) + assert.Equal(t, msg, ns.Message) + assert.Nil(t, ns.Error) + }) - t.Run("non-terminal-running", func(t *testing.T) { + t.Run("terminal-success", func(t *testing.T) { + ns := NodeStatus{} + p := NodePhaseSucceeded + msg := success + ns.UpdatePhase(p, n, msg, enableCRDebugMetadata, nil) + + assert.Nil(t, ns.LastUpdatedAt) + assert.Nil(t, ns.QueuedAt) + assert.Nil(t, ns.LastAttemptStartedAt) + assert.Nil(t, ns.StartedAt) + assert.Equal(t, *ns.StoppedAt, n) + assert.Equal(t, p, ns.Phase) + assert.Empty(t, ns.Message) + assert.Nil(t, ns.Error) + }) + + t.Run("terminal-skipped", func(t *testing.T) { + ns := NodeStatus{} + p := NodePhaseSucceeded + msg := success + ns.UpdatePhase(p, n, msg, enableCRDebugMetadata, nil) + + assert.Nil(t, ns.LastUpdatedAt) + assert.Nil(t, ns.QueuedAt) + assert.Nil(t, ns.LastAttemptStartedAt) + assert.Nil(t, ns.StartedAt) + assert.Equal(t, *ns.StoppedAt, n) + assert.Equal(t, p, ns.Phase) + assert.Empty(t, ns.Message) + assert.Nil(t, ns.Error) + }) + + t.Run("terminal-success-preset", func(t *testing.T) { + ns := NodeStatus{ + QueuedAt: &n, + StartedAt: &n, + LastUpdatedAt: &n, + LastAttemptStartedAt: &n, + WorkflowNodeStatus: &WorkflowNodeStatus{}, + BranchStatus: &BranchNodeStatus{}, + DynamicNodeStatus: &DynamicNodeStatus{}, + TaskNodeStatus: &TaskNodeStatus{}, + SubNodeStatus: map[NodeID]*NodeStatus{}, + } + p := NodePhaseSucceeded + msg := success + ns.UpdatePhase(p, n, msg, enableCRDebugMetadata, nil) + + assert.Nil(t, ns.LastUpdatedAt) + assert.Nil(t, ns.QueuedAt) + assert.Nil(t, ns.LastAttemptStartedAt) + assert.Nil(t, ns.StartedAt) + assert.Equal(t, *ns.StoppedAt, n) + assert.Equal(t, p, ns.Phase) + assert.Empty(t, ns.Message) + assert.Nil(t, ns.Error) + assert.Nil(t, ns.SubNodeStatus) + assert.Nil(t, ns.DynamicNodeStatus) + assert.Nil(t, ns.WorkflowNodeStatus) + assert.Nil(t, ns.BranchStatus) + assert.Nil(t, ns.TaskNodeStatus) + }) + + t.Run("non-terminal-preset", func(t *testing.T) { + ns := NodeStatus{ + QueuedAt: &n, + StartedAt: &n, + LastUpdatedAt: &n, + LastAttemptStartedAt: &n, + WorkflowNodeStatus: &WorkflowNodeStatus{}, + BranchStatus: &BranchNodeStatus{}, + DynamicNodeStatus: &DynamicNodeStatus{}, + TaskNodeStatus: &TaskNodeStatus{}, + SubNodeStatus: map[NodeID]*NodeStatus{}, + } + n2 := metav1.NewTime(time.Now()) + p := NodePhaseRunning + msg := "running" + ns.UpdatePhase(p, n2, msg, enableCRDebugMetadata, nil) + + assert.Equal(t, *ns.LastUpdatedAt, n2) + assert.Equal(t, *ns.QueuedAt, n) + assert.Equal(t, *ns.LastAttemptStartedAt, n) + assert.Equal(t, *ns.StartedAt, n) + assert.Nil(t, ns.StoppedAt) + assert.Equal(t, p, ns.Phase) + assert.Equal(t, msg, ns.Message) + assert.Nil(t, ns.Error) + assert.NotNil(t, ns.SubNodeStatus) + assert.NotNil(t, ns.DynamicNodeStatus) + assert.NotNil(t, ns.WorkflowNodeStatus) + assert.NotNil(t, ns.BranchStatus) + assert.NotNil(t, ns.TaskNodeStatus) + }) + } + + t.Run("terminal-fail", func(t *testing.T) { ns := NodeStatus{} - p := NodePhaseRunning - msg := "running" - ns.UpdatePhase(p, n, msg, nil) + p := NodePhaseFailed + msg := "failed" + err := &core.ExecutionError{} + ns.UpdatePhase(p, n, msg, true, err) assert.Equal(t, *ns.LastUpdatedAt, n) assert.Nil(t, ns.QueuedAt) assert.Equal(t, *ns.LastAttemptStartedAt, n) assert.Equal(t, *ns.StartedAt, n) - assert.Nil(t, ns.StoppedAt) + assert.Equal(t, *ns.StoppedAt, n) assert.Equal(t, p, ns.Phase) assert.Equal(t, msg, ns.Message) - assert.Nil(t, ns.Error) + assert.Equal(t, ns.Error.ExecutionError, err) }) - t.Run("terminal-fail", func(t *testing.T) { + t.Run("terminal-fail-clear-state-on-any-termination", func(t *testing.T) { ns := NodeStatus{} p := NodePhaseFailed msg := "failed" err := &core.ExecutionError{} - ns.UpdatePhase(p, n, msg, err) + ns.UpdatePhase(p, n, msg, false, err) - assert.Equal(t, *ns.LastUpdatedAt, n) + assert.Nil(t, ns.LastUpdatedAt) assert.Nil(t, ns.QueuedAt) - assert.Equal(t, *ns.LastAttemptStartedAt, n) - assert.Equal(t, *ns.StartedAt, n) + assert.Nil(t, ns.LastAttemptStartedAt) + assert.Nil(t, ns.StartedAt) assert.Equal(t, *ns.StoppedAt, n) assert.Equal(t, p, ns.Phase) - assert.Equal(t, msg, ns.Message) + assert.Equal(t, ns.Message, "") assert.Equal(t, ns.Error.ExecutionError, err) }) @@ -332,7 +463,7 @@ func TestNodeStatus_UpdatePhase(t *testing.T) { p := NodePhaseTimedOut msg := "tm" err := &core.ExecutionError{} - ns.UpdatePhase(p, n, msg, err) + ns.UpdatePhase(p, n, msg, true, err) assert.Equal(t, *ns.LastUpdatedAt, n) assert.Nil(t, ns.QueuedAt) @@ -344,54 +475,12 @@ func TestNodeStatus_UpdatePhase(t *testing.T) { assert.Equal(t, ns.Error.ExecutionError, err) }) - const success = "success" - t.Run("terminal-success", func(t *testing.T) { - ns := NodeStatus{} - p := NodePhaseSucceeded - msg := success - ns.UpdatePhase(p, n, msg, nil) - - assert.Nil(t, ns.LastUpdatedAt) - assert.Nil(t, ns.QueuedAt) - assert.Nil(t, ns.LastAttemptStartedAt) - assert.Nil(t, ns.StartedAt) - assert.Equal(t, *ns.StoppedAt, n) - assert.Equal(t, p, ns.Phase) - assert.Empty(t, ns.Message) - assert.Nil(t, ns.Error) - }) - - t.Run("terminal-skipped", func(t *testing.T) { + t.Run("terminal-timeout-clear-state-on-any-termination", func(t *testing.T) { ns := NodeStatus{} - p := NodePhaseSucceeded - msg := success - ns.UpdatePhase(p, n, msg, nil) - - assert.Nil(t, ns.LastUpdatedAt) - assert.Nil(t, ns.QueuedAt) - assert.Nil(t, ns.LastAttemptStartedAt) - assert.Nil(t, ns.StartedAt) - assert.Equal(t, *ns.StoppedAt, n) - assert.Equal(t, p, ns.Phase) - assert.Empty(t, ns.Message) - assert.Nil(t, ns.Error) - }) - - t.Run("terminal-success-preset", func(t *testing.T) { - ns := NodeStatus{ - QueuedAt: &n, - StartedAt: &n, - LastUpdatedAt: &n, - LastAttemptStartedAt: &n, - WorkflowNodeStatus: &WorkflowNodeStatus{}, - BranchStatus: &BranchNodeStatus{}, - DynamicNodeStatus: &DynamicNodeStatus{}, - TaskNodeStatus: &TaskNodeStatus{}, - SubNodeStatus: map[NodeID]*NodeStatus{}, - } - p := NodePhaseSucceeded - msg := success - ns.UpdatePhase(p, n, msg, nil) + p := NodePhaseTimedOut + msg := "tm" + err := &core.ExecutionError{} + ns.UpdatePhase(p, n, msg, false, err) assert.Nil(t, ns.LastUpdatedAt) assert.Nil(t, ns.QueuedAt) @@ -399,44 +488,7 @@ func TestNodeStatus_UpdatePhase(t *testing.T) { assert.Nil(t, ns.StartedAt) assert.Equal(t, *ns.StoppedAt, n) assert.Equal(t, p, ns.Phase) - assert.Empty(t, ns.Message) - assert.Nil(t, ns.Error) - assert.Nil(t, ns.SubNodeStatus) - assert.Nil(t, ns.DynamicNodeStatus) - assert.Nil(t, ns.WorkflowNodeStatus) - assert.Nil(t, ns.BranchStatus) - assert.Nil(t, ns.TaskNodeStatus) - }) - - t.Run("non-terminal-preset", func(t *testing.T) { - ns := NodeStatus{ - QueuedAt: &n, - StartedAt: &n, - LastUpdatedAt: &n, - LastAttemptStartedAt: &n, - WorkflowNodeStatus: &WorkflowNodeStatus{}, - BranchStatus: &BranchNodeStatus{}, - DynamicNodeStatus: &DynamicNodeStatus{}, - TaskNodeStatus: &TaskNodeStatus{}, - SubNodeStatus: map[NodeID]*NodeStatus{}, - } - n2 := metav1.NewTime(time.Now()) - p := NodePhaseRunning - msg := "running" - ns.UpdatePhase(p, n2, msg, nil) - - assert.Equal(t, *ns.LastUpdatedAt, n2) - assert.Equal(t, *ns.QueuedAt, n) - assert.Equal(t, *ns.LastAttemptStartedAt, n) - assert.Equal(t, *ns.StartedAt, n) - assert.Nil(t, ns.StoppedAt) - assert.Equal(t, p, ns.Phase) - assert.Equal(t, msg, ns.Message) - assert.Nil(t, ns.Error) - assert.NotNil(t, ns.SubNodeStatus) - assert.NotNil(t, ns.DynamicNodeStatus) - assert.NotNil(t, ns.WorkflowNodeStatus) - assert.NotNil(t, ns.BranchStatus) - assert.NotNil(t, ns.TaskNodeStatus) + assert.Equal(t, ns.Message, "") + assert.Equal(t, ns.Error.ExecutionError, err) }) } diff --git a/flytepropeller/pkg/controller/config/config.go b/flytepropeller/pkg/controller/config/config.go index 1afc986287..698883fb48 100644 --- a/flytepropeller/pkg/controller/config/config.go +++ b/flytepropeller/pkg/controller/config/config.go @@ -98,6 +98,7 @@ var ( InterruptibleFailureThreshold: -1, DefaultMaxAttempts: 1, IgnoreRetryCause: false, + EnableCRDebugMetadata: false, }, MaxStreakLength: 8, // Turbo mode is enabled by default ProfilerPort: config.Port{ @@ -213,6 +214,7 @@ type NodeConfig struct { InterruptibleFailureThreshold int32 `json:"interruptible-failure-threshold" pflag:"1,number of failures for a node to be still considered interruptible. Negative numbers are treated as complementary (ex. -1 means last attempt is non-interruptible).'"` DefaultMaxAttempts int32 `json:"default-max-attempts" pflag:"3,Default maximum number of attempts for a node"` IgnoreRetryCause bool `json:"ignore-retry-cause" pflag:",Ignore retry cause and count all attempts toward a node's max attempts"` + EnableCRDebugMetadata bool `json:"enable-cr-debug-metadata" pflag:",Collapse node on any terminal state, not just successful terminations. This is useful to reduce the size of workflow state in etcd."` } // DefaultDeadlines contains default values for timeouts diff --git a/flytepropeller/pkg/controller/config/config_flags.go b/flytepropeller/pkg/controller/config/config_flags.go index 07a4fba742..c60f724ee2 100755 --- a/flytepropeller/pkg/controller/config/config_flags.go +++ b/flytepropeller/pkg/controller/config/config_flags.go @@ -96,6 +96,7 @@ func (cfg Config) GetPFlagSet(prefix string) *pflag.FlagSet { cmdFlags.Int32(fmt.Sprintf("%v%v", prefix, "node-config.interruptible-failure-threshold"), defaultConfig.NodeConfig.InterruptibleFailureThreshold, "number of failures for a node to be still considered interruptible. Negative numbers are treated as complementary (ex. -1 means last attempt is non-interruptible).'") cmdFlags.Int32(fmt.Sprintf("%v%v", prefix, "node-config.default-max-attempts"), defaultConfig.NodeConfig.DefaultMaxAttempts, "Default maximum number of attempts for a node") cmdFlags.Bool(fmt.Sprintf("%v%v", prefix, "node-config.ignore-retry-cause"), defaultConfig.NodeConfig.IgnoreRetryCause, "Ignore retry cause and count all attempts toward a node's max attempts") + cmdFlags.Bool(fmt.Sprintf("%v%v", prefix, "node-config.enable-cr-debug-metadata"), defaultConfig.NodeConfig.EnableCRDebugMetadata, "Collapse node on any terminal state, not just successful terminations. This is useful to reduce the size of workflow state in etcd.") cmdFlags.Int(fmt.Sprintf("%v%v", prefix, "max-streak-length"), defaultConfig.MaxStreakLength, "Maximum number of consecutive rounds that one propeller worker can use for one workflow - >1 => turbo-mode is enabled.") cmdFlags.String(fmt.Sprintf("%v%v", prefix, "event-config.raw-output-policy"), defaultConfig.EventConfig.RawOutputPolicy, "How output data should be passed along in execution events.") cmdFlags.Bool(fmt.Sprintf("%v%v", prefix, "event-config.fallback-to-output-reference"), defaultConfig.EventConfig.FallbackToOutputReference, "Whether output data should be sent by reference when it is too large to be sent inline in execution events.") diff --git a/flytepropeller/pkg/controller/config/config_flags_test.go b/flytepropeller/pkg/controller/config/config_flags_test.go index 54da9e9fe1..6f3c67b652 100755 --- a/flytepropeller/pkg/controller/config/config_flags_test.go +++ b/flytepropeller/pkg/controller/config/config_flags_test.go @@ -743,6 +743,20 @@ func TestConfig_SetFlags(t *testing.T) { } }) }) + t.Run("Test_node-config.enable-cr-debug-metadata", func(t *testing.T) { + + t.Run("Override", func(t *testing.T) { + testValue := "1" + + cmdFlags.Set("node-config.enable-cr-debug-metadata", testValue) + if vBool, err := cmdFlags.GetBool("node-config.enable-cr-debug-metadata"); err == nil { + testDecodeJson_Config(t, fmt.Sprintf("%v", vBool), &actual.NodeConfig.EnableCRDebugMetadata) + + } else { + assert.FailNow(t, err.Error()) + } + }) + }) t.Run("Test_max-streak-length", func(t *testing.T) { t.Run("Override", func(t *testing.T) { diff --git a/flytepropeller/pkg/controller/nodes/branch/evaluator.go b/flytepropeller/pkg/controller/nodes/branch/evaluator.go index 44dc8711e8..4bc1676745 100644 --- a/flytepropeller/pkg/controller/nodes/branch/evaluator.go +++ b/flytepropeller/pkg/controller/nodes/branch/evaluator.go @@ -129,7 +129,9 @@ func DecideBranch(ctx context.Context, nl executors.NodeLookup, nodeID v1alpha1. } nStatus := nl.GetNodeExecutionStatus(ctx, n.GetID()) logger.Infof(ctx, "Branch Setting Node[%v] status to Skipped!", skippedNodeID) - nStatus.UpdatePhase(v1alpha1.NodePhaseSkipped, v1.Now(), "Branch evaluated to false", nil) + // We hard code enableCRDebugMetadata=true because it has no effect when setting phase to + // NodePhaseSkipped. This saves us passing the config all the way down from the nodeExecutor. + nStatus.UpdatePhase(v1alpha1.NodePhaseSkipped, v1.Now(), "Branch evaluated to false", true, nil) } if selectedNodeID == nil { diff --git a/flytepropeller/pkg/controller/nodes/executor.go b/flytepropeller/pkg/controller/nodes/executor.go index 8e96ee9645..6ddde14c71 100644 --- a/flytepropeller/pkg/controller/nodes/executor.go +++ b/flytepropeller/pkg/controller/nodes/executor.go @@ -475,6 +475,7 @@ func (c *recursiveNodeExecutor) WithNodeExecutionContextBuilder(nCtxBuilder inte type nodeExecutor struct { catalog catalog.Client clusterID string + enableCRDebugMetadata bool defaultActiveDeadline time.Duration defaultDataSandbox storage.DataReference defaultExecutionDeadline time.Duration @@ -1005,7 +1006,7 @@ func (c *nodeExecutor) handleNotYetStartedNode(ctx context.Context, dag executor logger.Warningf(ctx, "Failed to record nodeEvent, error [%s]", err.Error()) return interfaces.NodeStatusUndefined, errors.Wrapf(errors.EventRecordingFailed, nCtx.NodeID(), err, "failed to record node event") } - UpdateNodeStatus(np, p, nCtx.NodeStateReader(), nodeStatus) + UpdateNodeStatus(np, p, nCtx.NodeStateReader(), nodeStatus, c.enableCRDebugMetadata) c.RecordTransitionLatency(ctx, dag, nCtx.ContextualNodeLookup(), nCtx.Node(), nodeStatus) } @@ -1271,7 +1272,7 @@ func (c *nodeExecutor) handleQueuedOrRunningNode(ctx context.Context, nCtx inter } } - UpdateNodeStatus(np, p, nCtx.NodeStateReader(), nodeStatus) + UpdateNodeStatus(np, p, nCtx.NodeStateReader(), nodeStatus, c.enableCRDebugMetadata) return finalStatus, nil } @@ -1285,7 +1286,7 @@ func (c *nodeExecutor) handleRetryableFailure(ctx context.Context, nCtx interfac // NOTE: It is important to increment attempts only after abort has been called. Increment attempt mutates the state // Attempt is used throughout the system to determine the idempotent resource version. nodeStatus.IncrementAttempts() - nodeStatus.UpdatePhase(v1alpha1.NodePhaseRunning, metav1.Now(), "retrying", nil) + nodeStatus.UpdatePhase(v1alpha1.NodePhaseRunning, metav1.Now(), "retrying", c.enableCRDebugMetadata, nil) // We are going to retry in the next round, so we should clear all current state nodeStatus.ClearSubNodeStatus() nodeStatus.ClearTaskStatus() @@ -1324,8 +1325,14 @@ func (c *nodeExecutor) HandleNode(ctx context.Context, dag executors.DAGStructur if err := c.Abort(ctx, h, nCtx, "node failing", false); err != nil { return interfaces.NodeStatusUndefined, err } - nodeStatus.UpdatePhase(v1alpha1.NodePhaseFailed, metav1.Now(), nodeStatus.GetMessage(), nodeStatus.GetExecutionError()) - c.metrics.FailureDuration.Observe(ctx, nodeStatus.GetStartedAt().Time, nodeStatus.GetStoppedAt().Time) + t := metav1.Now() + + startedAt := nodeStatus.GetStartedAt() + if startedAt == nil { + startedAt = &t + } + nodeStatus.UpdatePhase(v1alpha1.NodePhaseFailed, t, nodeStatus.GetMessage(), c.enableCRDebugMetadata, nodeStatus.GetExecutionError()) + c.metrics.FailureDuration.Observe(ctx, startedAt.Time, nodeStatus.GetStoppedAt().Time) if nCtx.NodeExecutionMetadata().IsInterruptible() { c.metrics.InterruptibleNodesTerminated.Inc(ctx) } @@ -1338,8 +1345,7 @@ func (c *nodeExecutor) HandleNode(ctx context.Context, dag executors.DAGStructur return interfaces.NodeStatusUndefined, err } - nodeStatus.ClearSubNodeStatus() - nodeStatus.UpdatePhase(v1alpha1.NodePhaseTimedOut, metav1.Now(), nodeStatus.GetMessage(), nodeStatus.GetExecutionError()) + nodeStatus.UpdatePhase(v1alpha1.NodePhaseTimedOut, metav1.Now(), nodeStatus.GetMessage(), c.enableCRDebugMetadata, nodeStatus.GetExecutionError()) c.metrics.TimedOutFailure.Inc(ctx) if nCtx.NodeExecutionMetadata().IsInterruptible() { c.metrics.InterruptibleNodesTerminated.Inc(ctx) @@ -1363,8 +1369,7 @@ func (c *nodeExecutor) HandleNode(ctx context.Context, dag executors.DAGStructur stopped = &t } c.metrics.SuccessDuration.Observe(ctx, started.Time, stopped.Time) - nodeStatus.ClearSubNodeStatus() - nodeStatus.UpdatePhase(v1alpha1.NodePhaseSucceeded, t, "completed successfully", nil) + nodeStatus.UpdatePhase(v1alpha1.NodePhaseSucceeded, t, "completed successfully", c.enableCRDebugMetadata, nil) if nCtx.NodeExecutionMetadata().IsInterruptible() { c.metrics.InterruptibleNodesTerminated.Inc(ctx) } @@ -1431,6 +1436,7 @@ func NewExecutor(ctx context.Context, nodeConfig config.NodeConfig, store *stora nodeExecutor := &nodeExecutor{ catalog: catalogClient, clusterID: clusterID, + enableCRDebugMetadata: nodeConfig.EnableCRDebugMetadata, defaultActiveDeadline: nodeConfig.DefaultDeadlines.DefaultNodeActiveDeadline.Duration, defaultDataSandbox: defaultRawOutputPrefix, defaultExecutionDeadline: nodeConfig.DefaultDeadlines.DefaultNodeExecutionDeadline.Duration, diff --git a/flytepropeller/pkg/controller/nodes/executor_test.go b/flytepropeller/pkg/controller/nodes/executor_test.go index 222e0a05d5..2bc552bab0 100644 --- a/flytepropeller/pkg/controller/nodes/executor_test.go +++ b/flytepropeller/pkg/controller/nodes/executor_test.go @@ -565,7 +565,7 @@ func TestNodeExecutor_RecursiveNodeHandler_Recurse(t *testing.T) { }, } - setupNodePhase := func(n0Phase, n2Phase, expectedN2Phase v1alpha1.NodePhase) (*mocks.ExecutableWorkflow, *mocks.ExecutableNodeStatus) { + setupNodePhase := func(n0Phase, n2Phase, expectedN2Phase v1alpha1.NodePhase, expectedClearStateOnAnyTermination bool) (*mocks.ExecutableWorkflow, *mocks.ExecutableNodeStatus) { taskID := "id" taskID0 := "id1" // Setup @@ -582,7 +582,7 @@ func TestNodeExecutor_RecursiveNodeHandler_Recurse(t *testing.T) { mockN2Status.OnGetStoppedAt().Return(nil) var ee *core.ExecutionError - mockN2Status.On("UpdatePhase", expectedN2Phase, mock.Anything, mock.AnythingOfType("string"), ee) + mockN2Status.On("UpdatePhase", expectedN2Phase, mock.Anything, mock.AnythingOfType("string"), expectedClearStateOnAnyTermination, ee) mockN2Status.OnIsDirty().Return(false) mockN2Status.OnGetTaskNodeStatus().Return(nil) mockN2Status.On("ClearDynamicNodeStatus").Return(nil) @@ -659,17 +659,21 @@ func TestNodeExecutor_RecursiveNodeHandler_Recurse(t *testing.T) { } tests := []struct { - name string - currentNodePhase v1alpha1.NodePhase - parentNodePhase v1alpha1.NodePhase - expectedNodePhase v1alpha1.NodePhase - expectedPhase interfaces.NodePhase - expectedError bool - updateCalled bool + name string + currentNodePhase v1alpha1.NodePhase + parentNodePhase v1alpha1.NodePhase + enableCRDebugMetadata bool + expectedNodePhase v1alpha1.NodePhase + expectedPhase interfaces.NodePhase + expectedError bool + updateCalled bool }{ - {"notYetStarted->skipped", v1alpha1.NodePhaseNotYetStarted, v1alpha1.NodePhaseFailed, v1alpha1.NodePhaseSkipped, interfaces.NodePhaseFailed, false, false}, - {"notYetStarted->skipped", v1alpha1.NodePhaseNotYetStarted, v1alpha1.NodePhaseSkipped, v1alpha1.NodePhaseSkipped, interfaces.NodePhaseSuccess, false, true}, - {"notYetStarted->queued", v1alpha1.NodePhaseNotYetStarted, v1alpha1.NodePhaseSucceeded, v1alpha1.NodePhaseQueued, interfaces.NodePhasePending, false, true}, + {"notYetStarted->skipped", v1alpha1.NodePhaseNotYetStarted, v1alpha1.NodePhaseFailed, false, v1alpha1.NodePhaseSkipped, interfaces.NodePhaseFailed, false, false}, + {"notYetStarted->skipped", v1alpha1.NodePhaseNotYetStarted, v1alpha1.NodePhaseSkipped, false, v1alpha1.NodePhaseSkipped, interfaces.NodePhaseSuccess, false, true}, + {"notYetStarted->queued", v1alpha1.NodePhaseNotYetStarted, v1alpha1.NodePhaseSucceeded, false, v1alpha1.NodePhaseQueued, interfaces.NodePhasePending, false, true}, + {"notYetStarted->skipped enableCRDebugMetadata", v1alpha1.NodePhaseNotYetStarted, v1alpha1.NodePhaseFailed, true, v1alpha1.NodePhaseSkipped, interfaces.NodePhaseFailed, false, false}, + {"notYetStarted->skipped enableCRDebugMetadata", v1alpha1.NodePhaseNotYetStarted, v1alpha1.NodePhaseSkipped, true, v1alpha1.NodePhaseSkipped, interfaces.NodePhaseSuccess, false, true}, + {"notYetStarted->queued enableCRDebugMetadata", v1alpha1.NodePhaseNotYetStarted, v1alpha1.NodePhaseSucceeded, true, v1alpha1.NodePhaseQueued, interfaces.NodePhasePending, false, true}, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { @@ -684,12 +688,14 @@ func TestNodeExecutor_RecursiveNodeHandler_Recurse(t *testing.T) { h.OnFinalizeRequired().Return(false) hf.OnGetHandler(v1alpha1.NodeKindTask).Return(h, nil) - mockWf, _ := setupNodePhase(test.parentNodePhase, test.currentNodePhase, test.expectedNodePhase) + mockWf, _ := setupNodePhase(test.parentNodePhase, test.currentNodePhase, test.expectedNodePhase, test.enableCRDebugMetadata) startNode := mockWf.StartNode() store := createInmemoryDataStore(t, promutils.NewTestScope()) adminClient := launchplan.NewFailFastLaunchPlanExecutor() - execIface, err := NewExecutor(ctx, config.GetConfig().NodeConfig, store, enQWf, mockEventSink, adminClient, adminClient, + nodeConfig := config.GetConfig().NodeConfig + nodeConfig.EnableCRDebugMetadata = test.enableCRDebugMetadata + execIface, err := NewExecutor(ctx, nodeConfig, store, enQWf, mockEventSink, adminClient, adminClient, 10, "s3://bucket", fakeKubeClient, catalogClient, recoveryClient, eventConfig, testClusterID, signalClient, hf, promutils.NewTestScope()) assert.NoError(t, err) exec := execIface.(*recursiveNodeExecutor) @@ -1329,7 +1335,7 @@ func TestNodeExecutor_RecursiveNodeHandler_BranchNode(t *testing.T) { if test.phaseUpdateExpected { var ee *core.ExecutionError - branchTakeNodeStatus.On("UpdatePhase", v1alpha1.NodePhaseQueued, mock.Anything, mock.Anything, ee).Return() + branchTakeNodeStatus.On("UpdatePhase", v1alpha1.NodePhaseQueued, mock.Anything, mock.Anything, false, ee).Return() } leafDag := executors.NewLeafNodeDAGStructure(branchTakenNodeID, parentBranchNodeID) diff --git a/flytepropeller/pkg/controller/nodes/transformers.go b/flytepropeller/pkg/controller/nodes/transformers.go index 8c0db1e57a..e341a0d2a1 100644 --- a/flytepropeller/pkg/controller/nodes/transformers.go +++ b/flytepropeller/pkg/controller/nodes/transformers.go @@ -228,10 +228,10 @@ func ToK8sTime(t time.Time) v1.Time { return v1.Time{Time: t} } -func UpdateNodeStatus(np v1alpha1.NodePhase, p handler.PhaseInfo, n interfaces.NodeStateReader, s v1alpha1.ExecutableNodeStatus) { +func UpdateNodeStatus(np v1alpha1.NodePhase, p handler.PhaseInfo, n interfaces.NodeStateReader, s v1alpha1.ExecutableNodeStatus, enableCRDebugMetadata bool) { // We update the phase and / or reason only if they are not already updated if np != s.GetPhase() || p.GetReason() != s.GetMessage() { - s.UpdatePhase(np, ToK8sTime(p.GetOccurredAt()), p.GetReason(), p.GetErr()) + s.UpdatePhase(np, ToK8sTime(p.GetOccurredAt()), p.GetReason(), enableCRDebugMetadata, p.GetErr()) } // Update TaskStatus if n.HasTaskNodeState() {