Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added configuration for arraynode default parallelism behavior #5268

Merged
merged 7 commits into from
Apr 23, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 30 additions & 2 deletions flytepropeller/pkg/controller/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,11 @@ var (
},
ClusterID: "propeller",
CreateFlyteWorkflowCRD: false,
ArrayNodeEventVersion: 0,
NodeExecutionWorkerCount: 8,
ArrayNode: ArrayNodeConfig{
EventVersion: 0,
DefaultParallelismBehavior: ParallelismBehaviorUnlimited,
},
}
)

Expand Down Expand Up @@ -156,8 +159,8 @@ type Config struct {
ExcludeDomainLabel []string `json:"exclude-domain-label" pflag:",Exclude the specified domain label from the k8s FlyteWorkflow CRD label selector"`
ClusterID string `json:"cluster-id" pflag:",Unique cluster id running this flytepropeller instance with which to annotate execution events"`
CreateFlyteWorkflowCRD bool `json:"create-flyteworkflow-crd" pflag:",Enable creation of the FlyteWorkflow CRD on startup"`
ArrayNodeEventVersion int `json:"array-node-event-version" pflag:",ArrayNode eventing version. 0 => legacy (drop-in replacement for maptask), 1 => new"`
NodeExecutionWorkerCount int `json:"node-execution-worker-count" pflag:",Number of workers to evaluate node executions, currently only used for array nodes"`
ArrayNode ArrayNodeConfig `json:"array-node-config,omitempty" pflag:",Configuration for array nodes"`
}

// KubeClientConfig contains the configuration used by flytepropeller to configure its internal Kubernetes Client.
Expand Down Expand Up @@ -258,6 +261,31 @@ type EventConfig struct {
FallbackToOutputReference bool `json:"fallback-to-output-reference" pflag:",Whether output data should be sent by reference when it is too large to be sent inline in execution events."`
}

// ParallelismBehavior defines how ArrayNode should handle subNode parallelism by default
type ParallelismBehavior = string

const (
// ParallelismBehaviorHybrid means that ArrayNode will adhere to the parallelism defined in the
// ArrayNode exactly. This means `nil` will use the workflow parallelism, and 0 will have
// unlimited parallelism.
ParallelismBehaviorHybrid ParallelismBehavior = "hybrid"

// ParallelismBehaviorUnlimited means that ArrayNode subNodes will be evaluated with unlimited
// parallelism for both nil and 0. If a non-default (ie. nil / zero) parallelism is set, then
// ArrayNode will adhere to that value.
ParallelismBehaviorUnlimited ParallelismBehavior = "unlimited"

// ParallelismBehaviorWorkflow means that ArrayNode subNodes will be evaluated using the
// configured workflow parallelism for both nil and 0. If a non-default (ie. nil / zero)
// parallelism is set, then ArrayNode will adhere to that value.
ParallelismBehaviorWorkflow ParallelismBehavior = "workflow"
)

type ArrayNodeConfig struct {
EventVersion int `json:"event-version" pflag:",ArrayNode eventing version. 0 => legacy (drop-in replacement for maptask), 1 => new"`
DefaultParallelismBehavior ParallelismBehavior `json:"default-parallelism-behavior" pflag:",Default parallelism behavior for array nodes"`
}

// GetConfig extracts the Configuration from the global config module in flytestdlib and returns the corresponding type-casted object.
func GetConfig() *Config {
return configSection.GetConfig().(*Config)
Expand Down
3 changes: 2 additions & 1 deletion flytepropeller/pkg/controller/config/config_flags.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

30 changes: 22 additions & 8 deletions flytepropeller/pkg/controller/config/config_flags_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ func (*passThroughEventRecorder) finalizeRequired(ctx context.Context) bool {
}

func newArrayEventRecorder(eventRecorder interfaces.EventRecorder) arrayEventRecorder {
if config.GetConfig().ArrayNodeEventVersion == 0 {
if config.GetConfig().ArrayNode.EventVersion == 0 {
return &externalResourcesEventRecorder{
EventRecorder: eventRecorder,
}
Expand Down
34 changes: 14 additions & 20 deletions flytepropeller/pkg/controller/nodes/array/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,26 +252,14 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu
arrayNodeState.Phase = v1alpha1.ArrayNodePhaseExecuting
case v1alpha1.ArrayNodePhaseExecuting:
// process array node subNodes
remainingWorkflowParallelism := int(nCtx.ExecutionContext().GetExecutionConfig().MaxParallelism - nCtx.ExecutionContext().CurrentParallelism())
incrementWorkflowParallelism, maxParallelism := inferParallelism(ctx, arrayNode.GetParallelism(),
config.GetConfig().ArrayNode.DefaultParallelismBehavior, remainingWorkflowParallelism, len(arrayNodeState.SubNodePhases.GetItems()))

availableParallelism := 0
// using the workflow's parallelism if the array node parallelism is not set
useWorkflowParallelism := arrayNode.GetParallelism() == nil
if useWorkflowParallelism {
// greedily take all available slots
// TODO: This will need to be re-evaluated if we want to support dynamics & sub_workflows
currentParallelism := nCtx.ExecutionContext().CurrentParallelism()
maxParallelism := nCtx.ExecutionContext().GetExecutionConfig().MaxParallelism
availableParallelism = int(maxParallelism - currentParallelism)
} else {
availableParallelism = int(*arrayNode.GetParallelism())
if availableParallelism == 0 {
availableParallelism = len(arrayNodeState.SubNodePhases.GetItems())
}
}

nodeExecutionRequests := make([]*nodeExecutionRequest, 0, availableParallelism)
nodeExecutionRequests := make([]*nodeExecutionRequest, 0, maxParallelism)
currentParallelism := 0
for i, nodePhaseUint64 := range arrayNodeState.SubNodePhases.GetItems() {
if availableParallelism == 0 {
if currentParallelism >= maxParallelism {
break
}

Expand Down Expand Up @@ -315,10 +303,10 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu
// TODO - this is a naive implementation of parallelism, if we want to support more
// complex subNodes (ie. dynamics / subworkflows) we need to revisit this so that
// parallelism is handled during subNode evaluations + avoid deadlocks
if useWorkflowParallelism {
if incrementWorkflowParallelism {
nCtx.ExecutionContext().IncrementParallelism()
}
availableParallelism--
currentParallelism++
}

workerErrorCollector := errorcollector.NewErrorMessageCollector()
Expand Down Expand Up @@ -418,6 +406,12 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu
// wait until all tasks have completed before declaring success
arrayNodeState.Phase = v1alpha1.ArrayNodePhaseSucceeding
}

// if incrementWorkflowParallelism is not set then we need to increment the parallelism by one
// to indicate that the overall ArrayNode is still running
if !incrementWorkflowParallelism && arrayNodeState.Phase == v1alpha1.ArrayNodePhaseExecuting {
pvditt marked this conversation as resolved.
Show resolved Hide resolved
nCtx.ExecutionContext().IncrementParallelism()
}
case v1alpha1.ArrayNodePhaseFailing:
if err := a.Abort(ctx, nCtx, "ArrayNodeFailing"); err != nil {
return handler.UnknownTransition, err
Expand Down
32 changes: 20 additions & 12 deletions flytepropeller/pkg/controller/nodes/array/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,11 @@ func uint32Ptr(v uint32) *uint32 {

func TestHandleArrayNodePhaseExecuting(t *testing.T) {
ctx := context.Background()

// setting default parallelism behavior on ArrayNode to "hybrid" to test the largest scope of functionality
flyteConfig := config.GetConfig()
flyteConfig.ArrayNode.DefaultParallelismBehavior = config.ParallelismBehaviorHybrid

minSuccessRatio := float32(0.5)

// initialize universal variables
Expand Down Expand Up @@ -511,6 +516,7 @@ func TestHandleArrayNodePhaseExecuting(t *testing.T) {
expectedArrayNodePhase: v1alpha1.ArrayNodePhaseExecuting,
expectedTransitionPhase: handler.EPhaseRunning,
expectedExternalResourcePhases: []idlcore.TaskExecution_Phase{idlcore.TaskExecution_RUNNING, idlcore.TaskExecution_RUNNING},
incrementParallelismCount: 1,
},
{
name: "StartOneSubNodeParallelism",
Expand All @@ -529,12 +535,11 @@ func TestHandleArrayNodePhaseExecuting(t *testing.T) {
expectedArrayNodePhase: v1alpha1.ArrayNodePhaseExecuting,
expectedTransitionPhase: handler.EPhaseRunning,
expectedExternalResourcePhases: []idlcore.TaskExecution_Phase{idlcore.TaskExecution_RUNNING},
incrementParallelismCount: 1,
},
{
name: "UtilizeWfParallelismAllSubNodes",
parallelism: nil,
currentWfParallelism: 0,
incrementParallelismCount: 2,
name: "UtilizeWfParallelismAllSubNodes",
parallelism: nil,
subNodePhases: []v1alpha1.NodePhase{
v1alpha1.NodePhaseQueued,
v1alpha1.NodePhaseQueued,
Expand All @@ -550,12 +555,12 @@ func TestHandleArrayNodePhaseExecuting(t *testing.T) {
expectedArrayNodePhase: v1alpha1.ArrayNodePhaseExecuting,
expectedTransitionPhase: handler.EPhaseRunning,
expectedExternalResourcePhases: []idlcore.TaskExecution_Phase{idlcore.TaskExecution_RUNNING, idlcore.TaskExecution_RUNNING},
currentWfParallelism: 0,
incrementParallelismCount: 2,
},
{
name: "UtilizeWfParallelismSomeSubNodes",
parallelism: nil,
currentWfParallelism: workflowMaxParallelism - 1,
incrementParallelismCount: 1,
name: "UtilizeWfParallelismSomeSubNodes",
parallelism: nil,
subNodePhases: []v1alpha1.NodePhase{
v1alpha1.NodePhaseQueued,
v1alpha1.NodePhaseQueued,
Expand All @@ -570,12 +575,12 @@ func TestHandleArrayNodePhaseExecuting(t *testing.T) {
expectedArrayNodePhase: v1alpha1.ArrayNodePhaseExecuting,
expectedTransitionPhase: handler.EPhaseRunning,
expectedExternalResourcePhases: []idlcore.TaskExecution_Phase{idlcore.TaskExecution_RUNNING},
currentWfParallelism: workflowMaxParallelism - 1,
incrementParallelismCount: 1,
},
{
name: "UtilizeWfParallelismNoSubNodes",
parallelism: nil,
currentWfParallelism: workflowMaxParallelism,
incrementParallelismCount: 0,
name: "UtilizeWfParallelismNoSubNodes",
parallelism: nil,
subNodePhases: []v1alpha1.NodePhase{
v1alpha1.NodePhaseQueued,
v1alpha1.NodePhaseQueued,
Expand All @@ -588,6 +593,8 @@ func TestHandleArrayNodePhaseExecuting(t *testing.T) {
expectedArrayNodePhase: v1alpha1.ArrayNodePhaseExecuting,
expectedTransitionPhase: handler.EPhaseRunning,
expectedExternalResourcePhases: []idlcore.TaskExecution_Phase{},
currentWfParallelism: workflowMaxParallelism,
incrementParallelismCount: 0,
},
{
name: "StartSubNodesNewAttempts",
Expand All @@ -607,6 +614,7 @@ func TestHandleArrayNodePhaseExecuting(t *testing.T) {
expectedArrayNodePhase: v1alpha1.ArrayNodePhaseExecuting,
expectedTransitionPhase: handler.EPhaseRunning,
expectedExternalResourcePhases: []idlcore.TaskExecution_Phase{idlcore.TaskExecution_RUNNING, idlcore.TaskExecution_RUNNING},
incrementParallelismCount: 1,
},
{
name: "AllSubNodesSuccedeed",
Expand Down
20 changes: 20 additions & 0 deletions flytepropeller/pkg/controller/nodes/array/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ import (

idlcore "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyte/flytepropeller/pkg/apis/flyteworkflow/v1alpha1"
"github.com/flyteorg/flyte/flytepropeller/pkg/controller/config"
"github.com/flyteorg/flyte/flytepropeller/pkg/controller/nodes/interfaces"
"github.com/flyteorg/flyte/flytepropeller/pkg/controller/nodes/task"
"github.com/flyteorg/flyte/flytepropeller/pkg/controller/nodes/task/codex"
"github.com/flyteorg/flyte/flytepropeller/pkg/controller/nodes/task/k8s"
"github.com/flyteorg/flyte/flytestdlib/logger"
"github.com/flyteorg/flyte/flytestdlib/storage"
)

Expand Down Expand Up @@ -62,6 +64,24 @@ func constructOutputReferences(ctx context.Context, nCtx interfaces.NodeExecutio
return subDataDir, subOutputDir, nil
}

func inferParallelism(ctx context.Context, parallelism *uint32, parallelismBehavior string, remainingWorkflowParallelism, arrayNodeSize int) (bool, int) {
if parallelism != nil && *parallelism > 0 {
// if parallelism is not defaulted - use it
return false, int(*parallelism)
} else if parallelismBehavior == config.ParallelismBehaviorWorkflow || (parallelism == nil && parallelismBehavior == config.ParallelismBehaviorHybrid) {
// if workflow level parallelism
return true, remainingWorkflowParallelism
} else if parallelismBehavior == config.ParallelismBehaviorUnlimited ||
(parallelism != nil && *parallelism == 0 && parallelismBehavior == config.ParallelismBehaviorHybrid) {
// if unlimited parallelism
return false, arrayNodeSize
}

logger.Warnf(ctx, "unable to infer ArrayNode parallelism configuration for parallelism:%v behavior:%v, defaulting to unlimited parallelism",
parallelism, parallelismBehavior)
return false, arrayNodeSize
}

func isTerminalNodePhase(nodePhase v1alpha1.NodePhase) bool {
return nodePhase == v1alpha1.NodePhaseSucceeded || nodePhase == v1alpha1.NodePhaseFailed || nodePhase == v1alpha1.NodePhaseTimedOut ||
nodePhase == v1alpha1.NodePhaseSkipped || nodePhase == v1alpha1.NodePhaseRecovered
Expand Down
Loading
Loading