Skip to content

Commit

Permalink
storing delta timestamps to set subnode lastattemptstartedat
Browse files Browse the repository at this point in the history
Signed-off-by: Daniel Rammer <[email protected]>
  • Loading branch information
hamersaw committed Nov 27, 2024
1 parent ab04192 commit 7907274
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 15 deletions.
2 changes: 2 additions & 0 deletions flytepropeller/pkg/apis/flyteworkflow/v1alpha1/iface.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,7 @@ type ExecutableArrayNodeStatus interface {
GetSubNodeTaskPhases() bitarray.CompactArray
GetSubNodeRetryAttempts() bitarray.CompactArray
GetSubNodeSystemFailures() bitarray.CompactArray
GetSubNodeDeltaTimestamps() bitarray.CompactArray
GetTaskPhaseVersion() uint32
}

Expand All @@ -302,6 +303,7 @@ type MutableArrayNodeStatus interface {
SetSubNodeTaskPhases(subNodeTaskPhases bitarray.CompactArray)
SetSubNodeRetryAttempts(subNodeRetryAttempts bitarray.CompactArray)
SetSubNodeSystemFailures(subNodeSystemFailures bitarray.CompactArray)
SetSubNodeDeltaTimestamps(subNodeDeltaTimestamps bitarray.CompactArray)
SetTaskPhaseVersion(taskPhaseVersion uint32)
}

Expand Down
26 changes: 19 additions & 7 deletions flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,13 +230,14 @@ const (

type ArrayNodeStatus struct {
MutableStruct
Phase ArrayNodePhase `json:"phase,omitempty"`
ExecutionError *core.ExecutionError `json:"executionError,omitempty"`
SubNodePhases bitarray.CompactArray `json:"subphase,omitempty"`
SubNodeTaskPhases bitarray.CompactArray `json:"subtphase,omitempty"`
SubNodeRetryAttempts bitarray.CompactArray `json:"subattempts,omitempty"`
SubNodeSystemFailures bitarray.CompactArray `json:"subsysfailures,omitempty"`
TaskPhaseVersion uint32 `json:"taskPhaseVersion,omitempty"`
Phase ArrayNodePhase `json:"phase,omitempty"`
ExecutionError *core.ExecutionError `json:"executionError,omitempty"`
SubNodePhases bitarray.CompactArray `json:"subphase,omitempty"`
SubNodeTaskPhases bitarray.CompactArray `json:"subtphase,omitempty"`
SubNodeRetryAttempts bitarray.CompactArray `json:"subattempts,omitempty"`
SubNodeSystemFailures bitarray.CompactArray `json:"subsysfailures,omitempty"`
SubNodeDeltaTimestamps bitarray.CompactArray `json: "subtimestamps",omitempty"`
TaskPhaseVersion uint32 `json:"taskPhaseVersion,omitempty"`
}

func (in *ArrayNodeStatus) GetArrayNodePhase() ArrayNodePhase {
Expand Down Expand Up @@ -305,6 +306,17 @@ func (in *ArrayNodeStatus) SetSubNodeSystemFailures(subNodeSystemFailures bitarr
}
}

func (in *ArrayNodeStatus) GetSubNodeDeltaTimestamps() bitarray.CompactArray {
return in.SubNodeDeltaTimestamps
}

func (in *ArrayNodeStatus) SetSubNodeDeltaTimestamps(subNodeDeltaTimestamps bitarray.CompactArray) {
if in.SubNodeDeltaTimestamps != subNodeDeltaTimestamps {
in.SetDirty()
in.SubNodeDeltaTimestamps = subNodeDeltaTimestamps
}
}

func (in *ArrayNodeStatus) GetTaskPhaseVersion() uint32 {
return in.TaskPhaseVersion
}
Expand Down
24 changes: 24 additions & 0 deletions flytepropeller/pkg/controller/nodes/array/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ import (
"fmt"
"math"
"strconv"
"time"

"k8s.io/apimachinery/pkg/apis/meta/v1"

idlcore "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/core"
Expand Down Expand Up @@ -254,6 +257,7 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu
{arrayReference: &arrayNodeState.SubNodeTaskPhases, maxValue: len(core.Phases) - 1},
{arrayReference: &arrayNodeState.SubNodeRetryAttempts, maxValue: maxAttemptsValue},
{arrayReference: &arrayNodeState.SubNodeSystemFailures, maxValue: maxSystemFailuresValue},
{arrayReference: &arrayNodeState.SubNodeDeltaTimestamps, maxValue: 259200}, // max value is 3 days of seconds which is coverd by 18 bits (262144)

Check failure on line 260 in flytepropeller/pkg/controller/nodes/array/handler.go

View workflow job for this annotation

GitHub Actions / Check for spelling errors

coverd ==> covered
} {

*item.arrayReference, err = bitarray.NewCompactArray(uint(size), bitarray.Item(item.maxValue)) // #nosec G115
Expand Down Expand Up @@ -380,6 +384,18 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu
arrayNodeState.SubNodeRetryAttempts.SetItem(index, uint64(subNodeStatus.GetAttempts()))
arrayNodeState.SubNodeSystemFailures.SetItem(index, uint64(subNodeStatus.GetSystemFailures()))

startedAt := nCtx.NodeStatus().GetLastAttemptStartedAt()
subNodeStartedAt := subNodeStatus.GetLastAttemptStartedAt()
if subNodeStartedAt == nil {
// subNodeStartedAt == nil indicates either (1) node has not started or (2) node status has
// been reset (ex. retryable failure). in both cases we set the delta timestamp to 0
arrayNodeState.SubNodeDeltaTimestamps.SetItem(index, 0)
} else if startedAt != nil && arrayNodeState.SubNodeDeltaTimestamps.GetItem(index) == 0 {
// otherwise if `SubNodeDeltaTimestamps` is unset, we compute the delta and set it
deltaDuration := uint64(subNodeStartedAt.Time.Sub(startedAt.Time).Seconds())
arrayNodeState.SubNodeDeltaTimestamps.SetItem(index, deltaDuration)
}

// increment task phase version if subNode phase or task phase changed
if subNodeStatus.GetPhase() != nodeExecutionRequest.nodePhase || subNodeStatus.GetTaskNodeStatus().GetPhase() != nodeExecutionRequest.taskPhase {
incrementTaskPhaseVersion = true
Expand Down Expand Up @@ -767,6 +783,13 @@ func (a *arrayNodeHandler) buildArrayNodeContext(ctx context.Context, nCtx inter
return nil, nil, nil, nil, nil, nil, err
}

// compute start time for subNode using delta timestamp from ArrayNode NodeStatus
var startedAt *v1.Time
fmt.Printf("HAMERSAW - retrieving subNodeIndex %d/%d\n", subNodeIndex, arrayNodeState.SubNodeDeltaTimestamps.ItemsCount)
if deltaSeconds := arrayNodeState.SubNodeDeltaTimestamps.GetItem(subNodeIndex); deltaSeconds != 0 {
startedAt = &v1.Time{Time: nCtx.NodeStatus().GetLastAttemptStartedAt().Add(time.Duration(deltaSeconds) * time.Second)}
}

subNodeStatus := &v1alpha1.NodeStatus{
Phase: nodePhase,
DataDir: subDataDir,
Expand All @@ -777,6 +800,7 @@ func (a *arrayNodeHandler) buildArrayNodeContext(ctx context.Context, nCtx inter
Phase: taskPhase,
PluginState: pluginStateBytes,
},
LastAttemptStartedAt: startedAt,
}

// initialize mocks
Expand Down
15 changes: 8 additions & 7 deletions flytepropeller/pkg/controller/nodes/handler/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,12 @@ type GateNodeState struct {
}

type ArrayNodeState struct {
Phase v1alpha1.ArrayNodePhase
TaskPhaseVersion uint32
Error *core.ExecutionError
SubNodePhases bitarray.CompactArray
SubNodeTaskPhases bitarray.CompactArray
SubNodeRetryAttempts bitarray.CompactArray
SubNodeSystemFailures bitarray.CompactArray
Phase v1alpha1.ArrayNodePhase
TaskPhaseVersion uint32
Error *core.ExecutionError
SubNodePhases bitarray.CompactArray
SubNodeTaskPhases bitarray.CompactArray
SubNodeRetryAttempts bitarray.CompactArray
SubNodeSystemFailures bitarray.CompactArray
SubNodeDeltaTimestamps bitarray.CompactArray
}
7 changes: 6 additions & 1 deletion flytepropeller/pkg/controller/nodes/node_state_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (n *nodeStateManager) HasWorkflowNodeState() bool {
}

func (n *nodeStateManager) HasGateNodeState() bool {
return n.g != nil
return n.g != nil
}

func (n *nodeStateManager) HasArrayNodeState() bool {
Expand Down Expand Up @@ -181,6 +181,11 @@ func (n nodeStateManager) GetArrayNodeState() handler.ArrayNodeState {
if subNodeSystemFailuresCopy := subNodeSystemFailures.DeepCopy(); subNodeSystemFailuresCopy != nil {
as.SubNodeSystemFailures = *subNodeSystemFailuresCopy
}

subNodeDeltaTimestamps := an.GetSubNodeDeltaTimestamps()
if subNodeDeltaTimestampsCopy := subNodeDeltaTimestamps.DeepCopy(); subNodeDeltaTimestampsCopy != nil {
as.SubNodeDeltaTimestamps = *subNodeDeltaTimestampsCopy
}
}
return as
}
Expand Down
1 change: 1 addition & 0 deletions flytepropeller/pkg/controller/nodes/transformers.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,7 @@ func UpdateNodeStatus(np v1alpha1.NodePhase, p handler.PhaseInfo, n interfaces.N
t.SetSubNodeTaskPhases(na.SubNodeTaskPhases)
t.SetSubNodeRetryAttempts(na.SubNodeRetryAttempts)
t.SetSubNodeSystemFailures(na.SubNodeSystemFailures)
t.SetSubNodeDeltaTimestamps(na.SubNodeDeltaTimestamps)
t.SetTaskPhaseVersion(na.TaskPhaseVersion)
}
}

0 comments on commit 7907274

Please sign in to comment.