Skip to content

Commit

Permalink
Supporting parallized workers in ArrayNode subNodes (#4567)
Browse files Browse the repository at this point in the history
* detecting subNode or task phase updates to increment TaskPhaseVersion on ArrayNode state

Signed-off-by: Daniel Rammer <[email protected]>

* not writting empty file on no inputs

Signed-off-by: Daniel Rammer <[email protected]>

* parallelizing subNode evaluations and output retrievals

Signed-off-by: Daniel Rammer <[email protected]>

* refactored workers out to ArrayHandler level

Signed-off-by: Daniel Rammer <[email protected]>

* handling worker errors

Signed-off-by: Daniel Rammer <[email protected]>

* implemented parallelism controls with workers

Signed-off-by: Daniel Rammer <[email protected]>

* added configuration for node execution worker count

Signed-off-by: Daniel Rammer <[email protected]>

* docs

Signed-off-by: Daniel Rammer <[email protected]>

* spaces instead of tabs ...

Signed-off-by: Daniel Rammer <[email protected]>

* removed dead code

Signed-off-by: Daniel Rammer <[email protected]>

* added panic handling on worker execution

Signed-off-by: Daniel Rammer <[email protected]>

---------

Signed-off-by: Daniel Rammer <[email protected]>
  • Loading branch information
hamersaw authored Dec 15, 2023
1 parent 1699094 commit 398e5cb
Show file tree
Hide file tree
Showing 9 changed files with 316 additions and 103 deletions.
8 changes: 5 additions & 3 deletions flytepropeller/pkg/controller/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,10 @@ var (
EventConfig: EventConfig{
RawOutputPolicy: RawOutputPolicyReference,
},
ClusterID: "propeller",
CreateFlyteWorkflowCRD: false,
ArrayNodeEventVersion: 0,
ClusterID: "propeller",
CreateFlyteWorkflowCRD: false,
ArrayNodeEventVersion: 0,
NodeExecutionWorkerCount: 8,
}
)

Expand Down Expand Up @@ -155,6 +156,7 @@ type Config struct {
ClusterID string `json:"cluster-id" pflag:",Unique cluster id running this flytepropeller instance with which to annotate execution events"`
CreateFlyteWorkflowCRD bool `json:"create-flyteworkflow-crd" pflag:",Enable creation of the FlyteWorkflow CRD on startup"`
ArrayNodeEventVersion int `json:"array-node-event-version" pflag:",ArrayNode eventing version. 0 => legacy (drop-in replacement for maptask), 1 => new"`
NodeExecutionWorkerCount int `json:"node-execution-worker-count" pflag:",Number of workers to evaluate node executions, currently only used for array nodes"`
}

// KubeClientConfig contains the configuration used by flytepropeller to configure its internal Kubernetes Client.
Expand Down
1 change: 1 addition & 0 deletions flytepropeller/pkg/controller/config/config_flags.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions flytepropeller/pkg/controller/config/config_flags_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 5 additions & 16 deletions flytepropeller/pkg/controller/nodes/array/execution_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,35 +14,24 @@ const (

type arrayExecutionContext struct {
executors.ExecutionContext
executionConfig v1alpha1.ExecutionConfig
currentParallelism *uint32
executionConfig v1alpha1.ExecutionConfig
}

func (a *arrayExecutionContext) GetExecutionConfig() v1alpha1.ExecutionConfig {
return a.executionConfig
}

func (a *arrayExecutionContext) CurrentParallelism() uint32 {
return *a.currentParallelism
}

func (a *arrayExecutionContext) IncrementParallelism() uint32 {
*a.currentParallelism = *a.currentParallelism + 1
return *a.currentParallelism
}

func newArrayExecutionContext(executionContext executors.ExecutionContext, subNodeIndex int, currentParallelism *uint32, maxParallelism uint32) *arrayExecutionContext {
func newArrayExecutionContext(executionContext executors.ExecutionContext, subNodeIndex int) *arrayExecutionContext {
executionConfig := executionContext.GetExecutionConfig()
if executionConfig.EnvironmentVariables == nil {
executionConfig.EnvironmentVariables = make(map[string]string)
}
executionConfig.EnvironmentVariables[JobIndexVarName] = FlyteK8sArrayIndexVarName
executionConfig.EnvironmentVariables[FlyteK8sArrayIndexVarName] = strconv.Itoa(subNodeIndex)
executionConfig.MaxParallelism = maxParallelism
executionConfig.MaxParallelism = 0 // hardcoded to 0 because parallelism is handled by the array node

return &arrayExecutionContext{
ExecutionContext: executionContext,
executionConfig: executionConfig,
currentParallelism: currentParallelism,
ExecutionContext: executionContext,
executionConfig: executionConfig,
}
}
Loading

0 comments on commit 398e5cb

Please sign in to comment.