Skip to content

Commit

Permalink
unit test
Browse files Browse the repository at this point in the history
Signed-off-by: Paul Dittamo <[email protected]>
  • Loading branch information
pvditt committed Apr 4, 2024
1 parent 65ab929 commit 0bafb47
Showing 1 changed file with 80 additions and 10 deletions.
90 changes: 80 additions & 10 deletions flytepropeller/pkg/controller/nodes/array/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ var (
},
},
}
workflowMaxParallelism = uint32(10)
)

func createArrayNodeHandler(ctx context.Context, t *testing.T, nodeHandler interfaces.NodeHandler, dataStore *storage.DataStore, scope promutils.Scope) (interfaces.NodeHandler, error) {
Expand Down Expand Up @@ -73,7 +74,8 @@ func createArrayNodeHandler(ctx context.Context, t *testing.T, nodeHandler inter
}

func createNodeExecutionContext(dataStore *storage.DataStore, eventRecorder interfaces.EventRecorder, outputVariables []string,
inputLiteralMap *idlcore.LiteralMap, arrayNodeSpec *v1alpha1.NodeSpec, arrayNodeState *handler.ArrayNodeState) interfaces.NodeExecutionContext {
inputLiteralMap *idlcore.LiteralMap, arrayNodeSpec *v1alpha1.NodeSpec, arrayNodeState *handler.ArrayNodeState,
currentParallelism uint32, maxParallelism uint32) interfaces.NodeExecutionContext {

nCtx := &mocks.NodeExecutionContext{}
nCtx.OnMaxDatasetSizeBytes().Return(9999999)
Expand All @@ -90,7 +92,9 @@ func createNodeExecutionContext(dataStore *storage.DataStore, eventRecorder inte
// ExecutionContext
executionContext := &execmocks.ExecutionContext{}
executionContext.OnGetEventVersion().Return(1)
executionContext.OnGetExecutionConfig().Return(v1alpha1.ExecutionConfig{})
executionContext.OnGetExecutionConfig().Return(v1alpha1.ExecutionConfig{
MaxParallelism: maxParallelism,
})
executionContext.OnGetExecutionID().Return(
v1alpha1.ExecutionID{
WorkflowExecutionIdentifier: &idlcore.WorkflowExecutionIdentifier{
Expand Down Expand Up @@ -119,6 +123,8 @@ func createNodeExecutionContext(dataStore *storage.DataStore, eventRecorder inte
},
nil,
)
executionContext.OnCurrentParallelism().Return(currentParallelism)
executionContext.On("IncrementParallelism").Run(func(args mock.Arguments) {}).Return(currentParallelism)
nCtx.OnExecutionContext().Return(executionContext)

// EventsRecorder
Expand Down Expand Up @@ -253,7 +259,7 @@ func TestAbort(t *testing.T) {

// create NodeExecutionContext
eventRecorder := newBufferedEventRecorder()
nCtx := createNodeExecutionContext(dataStore, eventRecorder, nil, literalMap, &arrayNodeSpec, arrayNodeState)
nCtx := createNodeExecutionContext(dataStore, eventRecorder, nil, literalMap, &arrayNodeSpec, arrayNodeState, 0, workflowMaxParallelism)

// evaluate node
err := arrayNodeHandler.Abort(ctx, nCtx, "foo")
Expand Down Expand Up @@ -349,7 +355,7 @@ func TestFinalize(t *testing.T) {

// create NodeExecutionContext
eventRecorder := newBufferedEventRecorder()
nCtx := createNodeExecutionContext(dataStore, eventRecorder, nil, literalMap, &arrayNodeSpec, arrayNodeState)
nCtx := createNodeExecutionContext(dataStore, eventRecorder, nil, literalMap, &arrayNodeSpec, arrayNodeState, 0, workflowMaxParallelism)

// evaluate node
err := arrayNodeHandler.Finalize(ctx, nCtx)
Expand Down Expand Up @@ -420,7 +426,7 @@ func TestHandleArrayNodePhaseNone(t *testing.T) {
arrayNodeState := &handler.ArrayNodeState{
Phase: v1alpha1.ArrayNodePhaseNone,
}
nCtx := createNodeExecutionContext(dataStore, eventRecorder, nil, literalMap, &arrayNodeSpec, arrayNodeState)
nCtx := createNodeExecutionContext(dataStore, eventRecorder, nil, literalMap, &arrayNodeSpec, arrayNodeState, 0, workflowMaxParallelism)

// evaluate node
transition, err := arrayNodeHandler.Handle(ctx, nCtx)
Expand Down Expand Up @@ -475,6 +481,9 @@ func TestHandleArrayNodePhaseExecuting(t *testing.T) {
expectedArrayNodePhase v1alpha1.ArrayNodePhase
expectedTransitionPhase handler.EPhase
expectedExternalResourcePhases []idlcore.TaskExecution_Phase
currentWfParallelism uint32
maxWfParallelism uint32
incrementParallelismCount uint32
}{
{
name: "StartAllSubNodes",
Expand Down Expand Up @@ -512,6 +521,65 @@ func TestHandleArrayNodePhaseExecuting(t *testing.T) {
expectedTransitionPhase: handler.EPhaseRunning,
expectedExternalResourcePhases: []idlcore.TaskExecution_Phase{idlcore.TaskExecution_RUNNING},
},
{
name: "UtilizeWfParallelismAllSubNodes",
parallelism: -1,
currentWfParallelism: 0,
incrementParallelismCount: 2,
subNodePhases: []v1alpha1.NodePhase{
v1alpha1.NodePhaseQueued,
v1alpha1.NodePhaseQueued,
},
subNodeTaskPhases: []core.Phase{
core.PhaseUndefined,
core.PhaseUndefined,
},
subNodeTransitions: []handler.Transition{
handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRunning(&handler.ExecutionInfo{})),
handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRunning(&handler.ExecutionInfo{})),
},
expectedArrayNodePhase: v1alpha1.ArrayNodePhaseExecuting,
expectedTransitionPhase: handler.EPhaseRunning,
expectedExternalResourcePhases: []idlcore.TaskExecution_Phase{idlcore.TaskExecution_RUNNING, idlcore.TaskExecution_RUNNING},
},
{
name: "UtilizeWfParallelismSomeSubNodes",
parallelism: -1,
currentWfParallelism: workflowMaxParallelism - 1,
incrementParallelismCount: 1,
subNodePhases: []v1alpha1.NodePhase{
v1alpha1.NodePhaseQueued,
v1alpha1.NodePhaseQueued,
},
subNodeTaskPhases: []core.Phase{
core.PhaseUndefined,
core.PhaseUndefined,
},
subNodeTransitions: []handler.Transition{
handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRunning(&handler.ExecutionInfo{})),
},
expectedArrayNodePhase: v1alpha1.ArrayNodePhaseExecuting,
expectedTransitionPhase: handler.EPhaseRunning,
expectedExternalResourcePhases: []idlcore.TaskExecution_Phase{idlcore.TaskExecution_RUNNING},
},
{
name: "UtilizeWfParallelismNoSubNodes",
parallelism: -1,
currentWfParallelism: workflowMaxParallelism,
incrementParallelismCount: 0,
subNodePhases: []v1alpha1.NodePhase{
v1alpha1.NodePhaseQueued,
v1alpha1.NodePhaseQueued,
},
subNodeTaskPhases: []core.Phase{
core.PhaseUndefined,
core.PhaseUndefined,
},
subNodeTransitions: []handler.Transition{},
expectedArrayNodePhase: v1alpha1.ArrayNodePhaseExecuting,
expectedTransitionPhase: handler.EPhaseRunning,
expectedExternalResourcePhases: []idlcore.TaskExecution_Phase{},
},
{
name: "StartSubNodesNewAttempts",
subNodePhases: []v1alpha1.NodePhase{
Expand Down Expand Up @@ -627,7 +695,7 @@ func TestHandleArrayNodePhaseExecuting(t *testing.T) {
nodeSpec.ArrayNode.Parallelism = int64(test.parallelism)
nodeSpec.ArrayNode.MinSuccessRatio = test.minSuccessRatio

nCtx := createNodeExecutionContext(dataStore, eventRecorder, nil, literalMap, &arrayNodeSpec, arrayNodeState)
nCtx := createNodeExecutionContext(dataStore, eventRecorder, nil, literalMap, &arrayNodeSpec, arrayNodeState, test.currentWfParallelism, workflowMaxParallelism)

// initialize ArrayNodeHandler
nodeHandler := &mocks.NodeHandler{}
Expand Down Expand Up @@ -673,6 +741,8 @@ func TestHandleArrayNodePhaseExecuting(t *testing.T) {
} else {
assert.Equal(t, 0, len(eventRecorder.taskExecutionEvents))
}

nCtx.ExecutionContext().(*execmocks.ExecutionContext).AssertNumberOfCalls(t, "IncrementParallelism", int(test.incrementParallelismCount))
})
}
}
Expand Down Expand Up @@ -748,7 +818,7 @@ func TestHandleArrayNodePhaseExecutingSubNodeFailures(t *testing.T) {
arrayNodeState := &handler.ArrayNodeState{
Phase: v1alpha1.ArrayNodePhaseNone,
}
nCtx := createNodeExecutionContext(dataStore, eventRecorder, nil, literalMap, &arrayNodeSpec, arrayNodeState)
nCtx := createNodeExecutionContext(dataStore, eventRecorder, nil, literalMap, &arrayNodeSpec, arrayNodeState, 0, workflowMaxParallelism)

// initialize ArrayNodeHandler
nodeHandler := &mocks.NodeHandler{}
Expand Down Expand Up @@ -776,7 +846,7 @@ func TestHandleArrayNodePhaseExecutingSubNodeFailures(t *testing.T) {
// evaluate node until failure
attempts := 1
for {
nCtx := createNodeExecutionContext(dataStore, eventRecorder, nil, literalMap, &arrayNodeSpec, arrayNodeState)
nCtx := createNodeExecutionContext(dataStore, eventRecorder, nil, literalMap, &arrayNodeSpec, arrayNodeState, 0, workflowMaxParallelism)
_, err = arrayNodeHandler.Handle(ctx, nCtx)
assert.NoError(t, err)

Expand Down Expand Up @@ -853,7 +923,7 @@ func TestHandleArrayNodePhaseSucceeding(t *testing.T) {
// create NodeExecutionContext
eventRecorder := newBufferedEventRecorder()
literalMap := &idlcore.LiteralMap{}
nCtx := createNodeExecutionContext(dataStore, eventRecorder, []string{test.outputVariable}, literalMap, &arrayNodeSpec, arrayNodeState)
nCtx := createNodeExecutionContext(dataStore, eventRecorder, []string{test.outputVariable}, literalMap, &arrayNodeSpec, arrayNodeState, 0, workflowMaxParallelism)

// write mocked output files
for i, outputValue := range test.outputValues {
Expand Down Expand Up @@ -979,7 +1049,7 @@ func TestHandleArrayNodePhaseFailing(t *testing.T) {
// create NodeExecutionContext
eventRecorder := newBufferedEventRecorder()
literalMap := &idlcore.LiteralMap{}
nCtx := createNodeExecutionContext(dataStore, eventRecorder, nil, literalMap, &arrayNodeSpec, arrayNodeState)
nCtx := createNodeExecutionContext(dataStore, eventRecorder, nil, literalMap, &arrayNodeSpec, arrayNodeState, 0, workflowMaxParallelism)

// evaluate node
transition, err := arrayNodeHandler.Handle(ctx, nCtx)
Expand Down

0 comments on commit 0bafb47

Please sign in to comment.