Skip to content

Commit

Permalink
feat(propeller): Add new config to disable fallback to container task…
Browse files Browse the repository at this point in the history
… handler

Resolves: #5076
Signed-off-by: Chi-Sheng Liu <[email protected]>
  • Loading branch information
MortalHappiness committed Mar 28, 2024
1 parent c81133b commit 2f7b1cd
Show file tree
Hide file tree
Showing 7 changed files with 859 additions and 1,447 deletions.
2 changes: 2 additions & 0 deletions flyte-single-binary-local.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ tasks:
default-for-task-types:
- container: container
- container_array: K8S-ARRAY
- sidecar: sidecar
fallback-to-container-handler: true

plugins:
logs:
Expand Down
2 changes: 2 additions & 0 deletions flytepropeller/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@ require (
github.com/go-redis/redis v6.15.7+incompatible
github.com/go-test/deep v1.0.7
github.com/golang/protobuf v1.5.3
github.com/google/martian v2.1.0+incompatible
github.com/google/uuid v1.6.0
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/imdario/mergo v0.3.13
github.com/jinzhu/copier v0.3.5
github.com/magiconair/properties v1.8.6
github.com/mitchellh/mapstructure v1.5.0
github.com/pkg/errors v0.9.1
Expand Down
11 changes: 8 additions & 3 deletions flytepropeller/pkg/controller/nodes/task/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,17 @@ import (
"github.com/flyteorg/flyte/flytestdlib/logger"
)

//go:generate pflags Config
//go:generate pflags Config --default-var=defaultConfig

const SectionKey = "tasks"

var (
defaultConfig = &Config{
TaskPlugins: TaskPluginConfig{EnabledPlugins: []string{}, DefaultForTaskTypes: map[string]string{}},
TaskPlugins: TaskPluginConfig{
EnabledPlugins: []string{},
DefaultForTaskTypes: map[string]string{},
FallbackToContainerHandler: true,
},
MaxPluginPhaseVersions: 100000,
BackOffConfig: BackOffConfig{
BaseSecond: 2,
Expand All @@ -39,7 +43,8 @@ type Config struct {
type TaskPluginConfig struct {
EnabledPlugins []string `json:"enabled-plugins" pflag:",Plugins enabled currently"`
// Maps task types to their plugin handler (by ID).
DefaultForTaskTypes map[string]string `json:"default-for-task-types" pflag:"-,"`
DefaultForTaskTypes map[string]string `json:"default-for-task-types" pflag:"-,"`
FallbackToContainerHandler bool `json:"fallback-to-container-handler" pflag:",Fallback to container handler if a task does not have a registered plugin handler. Defaults to true"`
}

type BackOffConfig struct {
Expand Down
3 changes: 3 additions & 0 deletions flytepropeller/pkg/controller/nodes/task/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,9 @@ func (t Handler) ResolvePlugin(ctx context.Context, ttype string, executionConfi
logger.Debugf(ctx, "Plugin [%s] resolved for Handler type [%s]", p.GetID(), ttype)
return p, nil
}
if !t.cfg.TaskPlugins.FallbackToContainerHandler {
return nil, fmt.Errorf("no plugin defined for Handler type [%s] and fallback-to-container-handler is set to false", ttype)
}
if t.defaultPlugin != nil {
logger.Warnf(ctx, "No plugin found for Handler-type [%s], defaulting to [%s]", ttype, t.defaultPlugin.GetID())
return t.defaultPlugin, nil
Expand Down
74 changes: 62 additions & 12 deletions flytepropeller/pkg/controller/nodes/task/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"bytes"
"context"
"fmt"
"github.com/google/martian/log"
"github.com/jinzhu/copier"
"testing"

"github.com/golang/protobuf/proto"
Expand Down Expand Up @@ -282,31 +284,70 @@ func Test_task_ResolvePlugin(t *testing.T) {
plugins map[pluginCore.TaskType]pluginCore.Plugin
defaultPlugin pluginCore.Plugin
pluginsForType map[pluginCore.TaskType]map[pluginID]pluginCore.Plugin
cfg *config.Config
}
type args struct {
ttype string
executionConfig v1alpha1.ExecutionConfig
}
cfg := config.GetConfig()
cfgNoFallbackContainerHandler := &config.Config{}
if err := copier.CopyWithOption(cfgNoFallbackContainerHandler, cfg, copier.Option{DeepCopy: true}); err != nil {
log.Errorf("Failed to copy config")
return
}
cfgNoFallbackContainerHandler.TaskPlugins.FallbackToContainerHandler = false

tests := []struct {
name string
fields fields
args args
want string
wantErr bool
}{
{"no-plugins", fields{}, args{}, "", true},
{"default",
{
"no-plugins",
fields{
cfg: cfg,
},
args{},
"",
true,
},
{
"no-plugins-no-fallback-container-handler",
fields{
cfg: cfgNoFallbackContainerHandler,
},
args{},
"",
true,
},
{
"default",
fields{
defaultPlugin: defaultPlugin,
}, args{ttype: someID}, defaultID, false},
{"actual",
cfg: cfg,
},
args{ttype: someID},
defaultID,
false,
},
{
"actual",
fields{
plugins: map[pluginCore.TaskType]pluginCore.Plugin{
someID: somePlugin,
},
defaultPlugin: defaultPlugin,
}, args{ttype: someID}, someID, false},
{"override",
cfg: cfg,
},
args{ttype: someID},
someID,
false,
},
{
"override",
fields{
plugins: make(map[pluginCore.TaskType]pluginCore.Plugin),
defaultPlugin: defaultPlugin,
Expand All @@ -315,21 +356,30 @@ func Test_task_ResolvePlugin(t *testing.T) {
someID: somePlugin,
},
},
}, args{ttype: someID, executionConfig: v1alpha1.ExecutionConfig{
TaskPluginImpls: map[string]v1alpha1.TaskPluginOverride{
someID: {
PluginIDs: []string{someID},
MissingPluginBehavior: admin.PluginOverride_FAIL,
cfg: cfg,
},
args{
ttype: someID,
executionConfig: v1alpha1.ExecutionConfig{
TaskPluginImpls: map[string]v1alpha1.TaskPluginOverride{
someID: {
PluginIDs: []string{someID},
MissingPluginBehavior: admin.PluginOverride_FAIL,
},
},
},
}}, someID, false},
},
someID,
false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tk := Handler{
defaultPlugins: tt.fields.plugins,
defaultPlugin: tt.fields.defaultPlugin,
pluginsForType: tt.fields.pluginsForType,
cfg: tt.fields.cfg,
}
got, err := tk.ResolvePlugin(context.TODO(), tt.args.ttype, tt.args.executionConfig)
if (err != nil) != tt.wantErr {
Expand Down
Loading

0 comments on commit 2f7b1cd

Please sign in to comment.