From 0bafb4742e3ac92740529d3262f46e81a8c8854f Mon Sep 17 00:00:00 2001 From: Paul Dittamo Date: Wed, 3 Apr 2024 17:12:05 -0700 Subject: [PATCH] unit test Signed-off-by: Paul Dittamo --- .../controller/nodes/array/handler_test.go | 90 ++++++++++++++++--- 1 file changed, 80 insertions(+), 10 deletions(-) diff --git a/flytepropeller/pkg/controller/nodes/array/handler_test.go b/flytepropeller/pkg/controller/nodes/array/handler_test.go index 1e816782d9..62993ea321 100644 --- a/flytepropeller/pkg/controller/nodes/array/handler_test.go +++ b/flytepropeller/pkg/controller/nodes/array/handler_test.go @@ -42,6 +42,7 @@ var ( }, }, } + workflowMaxParallelism = uint32(10) ) func createArrayNodeHandler(ctx context.Context, t *testing.T, nodeHandler interfaces.NodeHandler, dataStore *storage.DataStore, scope promutils.Scope) (interfaces.NodeHandler, error) { @@ -73,7 +74,8 @@ func createArrayNodeHandler(ctx context.Context, t *testing.T, nodeHandler inter } func createNodeExecutionContext(dataStore *storage.DataStore, eventRecorder interfaces.EventRecorder, outputVariables []string, - inputLiteralMap *idlcore.LiteralMap, arrayNodeSpec *v1alpha1.NodeSpec, arrayNodeState *handler.ArrayNodeState) interfaces.NodeExecutionContext { + inputLiteralMap *idlcore.LiteralMap, arrayNodeSpec *v1alpha1.NodeSpec, arrayNodeState *handler.ArrayNodeState, + currentParallelism uint32, maxParallelism uint32) interfaces.NodeExecutionContext { nCtx := &mocks.NodeExecutionContext{} nCtx.OnMaxDatasetSizeBytes().Return(9999999) @@ -90,7 +92,9 @@ func createNodeExecutionContext(dataStore *storage.DataStore, eventRecorder inte // ExecutionContext executionContext := &execmocks.ExecutionContext{} executionContext.OnGetEventVersion().Return(1) - executionContext.OnGetExecutionConfig().Return(v1alpha1.ExecutionConfig{}) + executionContext.OnGetExecutionConfig().Return(v1alpha1.ExecutionConfig{ + MaxParallelism: maxParallelism, + }) executionContext.OnGetExecutionID().Return( v1alpha1.ExecutionID{ WorkflowExecutionIdentifier: &idlcore.WorkflowExecutionIdentifier{ @@ -119,6 +123,8 @@ func createNodeExecutionContext(dataStore *storage.DataStore, eventRecorder inte }, nil, ) + executionContext.OnCurrentParallelism().Return(currentParallelism) + executionContext.On("IncrementParallelism").Run(func(args mock.Arguments) {}).Return(currentParallelism) nCtx.OnExecutionContext().Return(executionContext) // EventsRecorder @@ -253,7 +259,7 @@ func TestAbort(t *testing.T) { // create NodeExecutionContext eventRecorder := newBufferedEventRecorder() - nCtx := createNodeExecutionContext(dataStore, eventRecorder, nil, literalMap, &arrayNodeSpec, arrayNodeState) + nCtx := createNodeExecutionContext(dataStore, eventRecorder, nil, literalMap, &arrayNodeSpec, arrayNodeState, 0, workflowMaxParallelism) // evaluate node err := arrayNodeHandler.Abort(ctx, nCtx, "foo") @@ -349,7 +355,7 @@ func TestFinalize(t *testing.T) { // create NodeExecutionContext eventRecorder := newBufferedEventRecorder() - nCtx := createNodeExecutionContext(dataStore, eventRecorder, nil, literalMap, &arrayNodeSpec, arrayNodeState) + nCtx := createNodeExecutionContext(dataStore, eventRecorder, nil, literalMap, &arrayNodeSpec, arrayNodeState, 0, workflowMaxParallelism) // evaluate node err := arrayNodeHandler.Finalize(ctx, nCtx) @@ -420,7 +426,7 @@ func TestHandleArrayNodePhaseNone(t *testing.T) { arrayNodeState := &handler.ArrayNodeState{ Phase: v1alpha1.ArrayNodePhaseNone, } - nCtx := createNodeExecutionContext(dataStore, eventRecorder, nil, literalMap, &arrayNodeSpec, arrayNodeState) + nCtx := createNodeExecutionContext(dataStore, eventRecorder, nil, literalMap, &arrayNodeSpec, arrayNodeState, 0, workflowMaxParallelism) // evaluate node transition, err := arrayNodeHandler.Handle(ctx, nCtx) @@ -475,6 +481,9 @@ func TestHandleArrayNodePhaseExecuting(t *testing.T) { expectedArrayNodePhase v1alpha1.ArrayNodePhase expectedTransitionPhase handler.EPhase expectedExternalResourcePhases []idlcore.TaskExecution_Phase + currentWfParallelism uint32 + maxWfParallelism uint32 + incrementParallelismCount uint32 }{ { name: "StartAllSubNodes", @@ -512,6 +521,65 @@ func TestHandleArrayNodePhaseExecuting(t *testing.T) { expectedTransitionPhase: handler.EPhaseRunning, expectedExternalResourcePhases: []idlcore.TaskExecution_Phase{idlcore.TaskExecution_RUNNING}, }, + { + name: "UtilizeWfParallelismAllSubNodes", + parallelism: -1, + currentWfParallelism: 0, + incrementParallelismCount: 2, + subNodePhases: []v1alpha1.NodePhase{ + v1alpha1.NodePhaseQueued, + v1alpha1.NodePhaseQueued, + }, + subNodeTaskPhases: []core.Phase{ + core.PhaseUndefined, + core.PhaseUndefined, + }, + subNodeTransitions: []handler.Transition{ + handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRunning(&handler.ExecutionInfo{})), + handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRunning(&handler.ExecutionInfo{})), + }, + expectedArrayNodePhase: v1alpha1.ArrayNodePhaseExecuting, + expectedTransitionPhase: handler.EPhaseRunning, + expectedExternalResourcePhases: []idlcore.TaskExecution_Phase{idlcore.TaskExecution_RUNNING, idlcore.TaskExecution_RUNNING}, + }, + { + name: "UtilizeWfParallelismSomeSubNodes", + parallelism: -1, + currentWfParallelism: workflowMaxParallelism - 1, + incrementParallelismCount: 1, + subNodePhases: []v1alpha1.NodePhase{ + v1alpha1.NodePhaseQueued, + v1alpha1.NodePhaseQueued, + }, + subNodeTaskPhases: []core.Phase{ + core.PhaseUndefined, + core.PhaseUndefined, + }, + subNodeTransitions: []handler.Transition{ + handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRunning(&handler.ExecutionInfo{})), + }, + expectedArrayNodePhase: v1alpha1.ArrayNodePhaseExecuting, + expectedTransitionPhase: handler.EPhaseRunning, + expectedExternalResourcePhases: []idlcore.TaskExecution_Phase{idlcore.TaskExecution_RUNNING}, + }, + { + name: "UtilizeWfParallelismNoSubNodes", + parallelism: -1, + currentWfParallelism: workflowMaxParallelism, + incrementParallelismCount: 0, + subNodePhases: []v1alpha1.NodePhase{ + v1alpha1.NodePhaseQueued, + v1alpha1.NodePhaseQueued, + }, + subNodeTaskPhases: []core.Phase{ + core.PhaseUndefined, + core.PhaseUndefined, + }, + subNodeTransitions: []handler.Transition{}, + expectedArrayNodePhase: v1alpha1.ArrayNodePhaseExecuting, + expectedTransitionPhase: handler.EPhaseRunning, + expectedExternalResourcePhases: []idlcore.TaskExecution_Phase{}, + }, { name: "StartSubNodesNewAttempts", subNodePhases: []v1alpha1.NodePhase{ @@ -627,7 +695,7 @@ func TestHandleArrayNodePhaseExecuting(t *testing.T) { nodeSpec.ArrayNode.Parallelism = int64(test.parallelism) nodeSpec.ArrayNode.MinSuccessRatio = test.minSuccessRatio - nCtx := createNodeExecutionContext(dataStore, eventRecorder, nil, literalMap, &arrayNodeSpec, arrayNodeState) + nCtx := createNodeExecutionContext(dataStore, eventRecorder, nil, literalMap, &arrayNodeSpec, arrayNodeState, test.currentWfParallelism, workflowMaxParallelism) // initialize ArrayNodeHandler nodeHandler := &mocks.NodeHandler{} @@ -673,6 +741,8 @@ func TestHandleArrayNodePhaseExecuting(t *testing.T) { } else { assert.Equal(t, 0, len(eventRecorder.taskExecutionEvents)) } + + nCtx.ExecutionContext().(*execmocks.ExecutionContext).AssertNumberOfCalls(t, "IncrementParallelism", int(test.incrementParallelismCount)) }) } } @@ -748,7 +818,7 @@ func TestHandleArrayNodePhaseExecutingSubNodeFailures(t *testing.T) { arrayNodeState := &handler.ArrayNodeState{ Phase: v1alpha1.ArrayNodePhaseNone, } - nCtx := createNodeExecutionContext(dataStore, eventRecorder, nil, literalMap, &arrayNodeSpec, arrayNodeState) + nCtx := createNodeExecutionContext(dataStore, eventRecorder, nil, literalMap, &arrayNodeSpec, arrayNodeState, 0, workflowMaxParallelism) // initialize ArrayNodeHandler nodeHandler := &mocks.NodeHandler{} @@ -776,7 +846,7 @@ func TestHandleArrayNodePhaseExecutingSubNodeFailures(t *testing.T) { // evaluate node until failure attempts := 1 for { - nCtx := createNodeExecutionContext(dataStore, eventRecorder, nil, literalMap, &arrayNodeSpec, arrayNodeState) + nCtx := createNodeExecutionContext(dataStore, eventRecorder, nil, literalMap, &arrayNodeSpec, arrayNodeState, 0, workflowMaxParallelism) _, err = arrayNodeHandler.Handle(ctx, nCtx) assert.NoError(t, err) @@ -853,7 +923,7 @@ func TestHandleArrayNodePhaseSucceeding(t *testing.T) { // create NodeExecutionContext eventRecorder := newBufferedEventRecorder() literalMap := &idlcore.LiteralMap{} - nCtx := createNodeExecutionContext(dataStore, eventRecorder, []string{test.outputVariable}, literalMap, &arrayNodeSpec, arrayNodeState) + nCtx := createNodeExecutionContext(dataStore, eventRecorder, []string{test.outputVariable}, literalMap, &arrayNodeSpec, arrayNodeState, 0, workflowMaxParallelism) // write mocked output files for i, outputValue := range test.outputValues { @@ -979,7 +1049,7 @@ func TestHandleArrayNodePhaseFailing(t *testing.T) { // create NodeExecutionContext eventRecorder := newBufferedEventRecorder() literalMap := &idlcore.LiteralMap{} - nCtx := createNodeExecutionContext(dataStore, eventRecorder, nil, literalMap, &arrayNodeSpec, arrayNodeState) + nCtx := createNodeExecutionContext(dataStore, eventRecorder, nil, literalMap, &arrayNodeSpec, arrayNodeState, 0, workflowMaxParallelism) // evaluate node transition, err := arrayNodeHandler.Handle(ctx, nCtx)