Skip to content

Commit

Permalink
feat(propeller): Add new config to disable fallback to container task…
Browse files Browse the repository at this point in the history
… handler

Resolves: #5076
Signed-off-by: Chi-Sheng Liu <[email protected]>
  • Loading branch information
MortalHappiness committed Apr 9, 2024
1 parent 2528de7 commit 92f7076
Show file tree
Hide file tree
Showing 9 changed files with 674 additions and 24 deletions.
2 changes: 2 additions & 0 deletions flyte-single-binary-local.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ tasks:
default-for-task-types:
- container: container
- container_array: K8S-ARRAY
- sidecar: sidecar
fallback-to-container-handler: true

plugins:
logs:
Expand Down
2 changes: 2 additions & 0 deletions flytepropeller/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@ require (
github.com/go-redis/redis v6.15.7+incompatible
github.com/go-test/deep v1.0.7
github.com/golang/protobuf v1.5.3
github.com/google/martian v2.1.0+incompatible
github.com/google/uuid v1.6.0
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/imdario/mergo v0.3.13
github.com/jinzhu/copier v0.3.5
github.com/magiconair/properties v1.8.6
github.com/mitchellh/mapstructure v1.5.0
github.com/pkg/errors v0.9.1
Expand Down
11 changes: 8 additions & 3 deletions flytepropeller/pkg/controller/nodes/task/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,17 @@ import (
"github.com/flyteorg/flyte/flytestdlib/logger"
)

//go:generate pflags Config
//go:generate pflags Config --default-var=defaultConfig

const SectionKey = "tasks"

var (
defaultConfig = &Config{
TaskPlugins: TaskPluginConfig{EnabledPlugins: []string{}, DefaultForTaskTypes: map[string]string{}},
TaskPlugins: TaskPluginConfig{
EnabledPlugins: []string{},
DefaultForTaskTypes: map[string]string{},
FallbackToContainerHandler: true,
},
MaxPluginPhaseVersions: 100000,
BackOffConfig: BackOffConfig{
BaseSecond: 2,
Expand All @@ -39,7 +43,8 @@ type Config struct {
type TaskPluginConfig struct {
EnabledPlugins []string `json:"enabled-plugins" pflag:",Plugins enabled currently"`
// Maps task types to their plugin handler (by ID).
DefaultForTaskTypes map[string]string `json:"default-for-task-types" pflag:"-,"`
DefaultForTaskTypes map[string]string `json:"default-for-task-types" pflag:"-,"`
FallbackToContainerHandler bool `json:"fallback-to-container-handler" pflag:",Fallback to container handler if a task does not have a registered plugin handler. Defaults to true"`
}

type BackOffConfig struct {
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions flytepropeller/pkg/controller/nodes/task/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,9 @@ func (t Handler) ResolvePlugin(ctx context.Context, ttype string, executionConfi
logger.Debugf(ctx, "Plugin [%s] resolved for Handler type [%s]", p.GetID(), ttype)
return p, nil
}
if !t.cfg.TaskPlugins.FallbackToContainerHandler {
return nil, fmt.Errorf("no plugin defined for Handler type [%s] and fallback-to-container-handler is set to false", ttype)
}
if t.defaultPlugin != nil {
logger.Warnf(ctx, "No plugin found for Handler-type [%s], defaulting to [%s]", ttype, t.defaultPlugin.GetID())
return t.defaultPlugin, nil
Expand Down
76 changes: 64 additions & 12 deletions flytepropeller/pkg/controller/nodes/task/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"testing"

"github.com/golang/protobuf/proto"
"github.com/google/martian/log"
"github.com/jinzhu/copier"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
v1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -282,31 +284,70 @@ func Test_task_ResolvePlugin(t *testing.T) {
plugins map[pluginCore.TaskType]pluginCore.Plugin
defaultPlugin pluginCore.Plugin
pluginsForType map[pluginCore.TaskType]map[pluginID]pluginCore.Plugin
cfg *config.Config
}
type args struct {
ttype string
executionConfig v1alpha1.ExecutionConfig
}
cfg := config.GetConfig()
cfgNoFallbackContainerHandler := &config.Config{}
if err := copier.CopyWithOption(cfgNoFallbackContainerHandler, cfg, copier.Option{DeepCopy: true}); err != nil {
log.Errorf("Failed to copy config")
return
}
cfgNoFallbackContainerHandler.TaskPlugins.FallbackToContainerHandler = false

tests := []struct {
name string
fields fields
args args
want string
wantErr bool
}{
{"no-plugins", fields{}, args{}, "", true},
{"default",
{
"no-plugins",
fields{
cfg: cfg,
},
args{},
"",
true,
},
{
"no-plugins-no-fallback-container-handler",
fields{
cfg: cfgNoFallbackContainerHandler,
},
args{},
"",
true,
},
{
"default",
fields{
defaultPlugin: defaultPlugin,
}, args{ttype: someID}, defaultID, false},
{"actual",
cfg: cfg,
},
args{ttype: someID},
defaultID,
false,
},
{
"actual",
fields{
plugins: map[pluginCore.TaskType]pluginCore.Plugin{
someID: somePlugin,
},
defaultPlugin: defaultPlugin,
}, args{ttype: someID}, someID, false},
{"override",
cfg: cfg,
},
args{ttype: someID},
someID,
false,
},
{
"override",
fields{
plugins: make(map[pluginCore.TaskType]pluginCore.Plugin),
defaultPlugin: defaultPlugin,
Expand All @@ -315,21 +356,30 @@ func Test_task_ResolvePlugin(t *testing.T) {
someID: somePlugin,
},
},
}, args{ttype: someID, executionConfig: v1alpha1.ExecutionConfig{
TaskPluginImpls: map[string]v1alpha1.TaskPluginOverride{
someID: {
PluginIDs: []string{someID},
MissingPluginBehavior: admin.PluginOverride_FAIL,
cfg: cfg,
},
args{
ttype: someID,
executionConfig: v1alpha1.ExecutionConfig{
TaskPluginImpls: map[string]v1alpha1.TaskPluginOverride{
someID: {
PluginIDs: []string{someID},
MissingPluginBehavior: admin.PluginOverride_FAIL,
},
},
},
}}, someID, false},
},
someID,
false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tk := Handler{
defaultPlugins: tt.fields.plugins,
defaultPlugin: tt.fields.defaultPlugin,
pluginsForType: tt.fields.pluginsForType,
cfg: tt.fields.cfg,
}
got, err := tk.ResolvePlugin(context.TODO(), tt.args.ttype, tt.args.executionConfig)
if (err != nil) != tt.wantErr {
Expand Down Expand Up @@ -889,6 +939,7 @@ func Test_task_Abort(t *testing.T) {
tk := Handler{
defaultPlugin: m,
resourceManager: noopRm,
cfg: config.GetConfig(),
}
nCtx := createNodeCtx(tt.args.ev)
if err := tk.Abort(context.TODO(), nCtx, "reason"); (err != nil) != tt.wantErr {
Expand Down Expand Up @@ -1051,6 +1102,7 @@ func Test_task_Abort_v1(t *testing.T) {
tk := Handler{
defaultPlugin: m,
resourceManager: noopRm,
cfg: config.GetConfig(),
}
nCtx := createNodeCtx(tt.args.ev)
if err := tk.Abort(context.TODO(), nCtx, "reason"); (err != nil) != tt.wantErr {
Expand Down
44 changes: 39 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ go 1.21
require (
github.com/flyteorg/flyte/datacatalog v0.0.0-00010101000000-000000000000
github.com/flyteorg/flyte/flyteadmin v0.0.0-00010101000000-000000000000
github.com/flyteorg/flyte/flytepropeller v0.0.0-00010101000000-000000000000
github.com/flyteorg/flyte/flytestdlib v0.0.0-00010101000000-000000000000
github.com/flyteorg/flyte/flytepropeller v1.9.12
github.com/flyteorg/flyte/flytestdlib v1.9.12
github.com/flyteorg/flytectl v0.8.14
github.com/golang/glog v1.2.0
github.com/prometheus/client_golang v1.16.0
github.com/spf13/cobra v1.7.0
Expand All @@ -27,11 +28,16 @@ require (
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.3.1 // indirect
github.com/Azure/azure-sdk-for-go/sdk/internal v1.3.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.1.0 // indirect
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 // indirect
github.com/AzureAD/microsoft-authentication-library-for-go v1.2.0 // indirect
github.com/GoogleCloudPlatform/spark-on-k8s-operator v0.0.0-20200723154620-6f35a1152625 // indirect
github.com/Microsoft/go-winio v0.5.0 // indirect
github.com/NYTimes/gizmo v1.3.6 // indirect
github.com/Shopify/sarama v1.26.4 // indirect
github.com/apoorvam/goterminal v0.0.0-20180523175556-614d345c47e5 // indirect
github.com/asaskevich/govalidator v0.0.0-20200428143746-21a406dcc535 // indirect
github.com/avast/retry-go v3.0.0+incompatible // indirect
github.com/awalterschulze/gographviz v2.0.3+incompatible // indirect
github.com/aws/aws-sdk-go v1.44.2 // indirect
github.com/aws/aws-sdk-go-v2 v1.2.0 // indirect
github.com/aws/aws-sdk-go-v2/config v1.0.0 // indirect
Expand All @@ -50,21 +56,31 @@ require (
github.com/cloudevents/sdk-go/binding/format/protobuf/v2 v2.14.0 // indirect
github.com/cloudevents/sdk-go/protocol/kafka_sarama/v2 v2.8.0 // indirect
github.com/cloudevents/sdk-go/v2 v2.15.2 // indirect
github.com/containerd/containerd v1.5.10 // indirect
github.com/coocood/freecache v1.1.1 // indirect
github.com/coreos/go-oidc/v3 v3.6.0 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect
github.com/danieljoos/wincred v1.1.0 // indirect
github.com/dask/dask-kubernetes/v2023 v2023.0.0-20230626103304-abd02cd17b26 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect
github.com/dgraph-io/ristretto v0.0.3 // indirect
github.com/disiqueira/gotree v1.0.0 // indirect
github.com/docker/distribution v2.8.0+incompatible // indirect
github.com/docker/docker v20.10.7+incompatible // indirect
github.com/docker/go-connections v0.4.0 // indirect
github.com/docker/go-units v0.4.0 // indirect
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/eapache/go-resiliency v1.2.0 // indirect
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 // indirect
github.com/eapache/queue v1.1.0 // indirect
github.com/emicklei/go-restful/v3 v3.12.0 // indirect
github.com/enescakir/emoji v1.0.0 // indirect
github.com/evanphx/json-patch v5.6.0+incompatible // indirect
github.com/evanphx/json-patch/v5 v5.9.0 // indirect
github.com/fatih/color v1.13.0 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/flyteorg/flyte/flyteidl v0.0.0-00010101000000-000000000000 // indirect
github.com/flyteorg/flyte/flyteidl v1.9.12 // indirect
github.com/flyteorg/flyte/flyteplugins v0.0.0-00010101000000-000000000000 // indirect
github.com/flyteorg/stow v0.3.10 // indirect
github.com/fsnotify/fsnotify v1.6.0 // indirect
Expand All @@ -76,9 +92,11 @@ require (
github.com/go-openapi/jsonpointer v0.19.6 // indirect
github.com/go-openapi/jsonreference v0.20.2 // indirect
github.com/go-openapi/swag v0.22.3 // indirect
github.com/go-ozzo/ozzo-validation/v4 v4.3.0 // indirect
github.com/go-redis/redis v6.15.7+incompatible // indirect
github.com/go-test/deep v1.0.7 // indirect
github.com/goccy/go-json v0.10.2 // indirect
github.com/godbus/dbus/v5 v5.0.4 // indirect
github.com/gofrs/uuid v4.2.0+incompatible // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang-jwt/jwt/v4 v4.5.0 // indirect
Expand All @@ -88,6 +106,8 @@ require (
github.com/golang/snappy v0.0.4 // indirect
github.com/google/gnostic-models v0.6.8 // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/google/go-github/v42 v42.0.0 // indirect
github.com/google/go-querystring v1.1.0 // indirect
github.com/google/gofuzz v1.2.0 // indirect
github.com/google/s2a-go v0.1.7 // indirect
github.com/google/uuid v1.6.0 // indirect
Expand All @@ -101,8 +121,10 @@ require (
github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.0 // indirect
github.com/gtank/cryptopasta v0.0.0-20170601214702-1f550f6f2f69 // indirect
github.com/hashicorp/go-uuid v1.0.2 // indirect
github.com/hashicorp/go-version v1.3.0 // indirect
github.com/hashicorp/golang-lru v0.5.4 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/hexops/gotextdiff v1.0.3 // indirect
github.com/imdario/mergo v0.3.13 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/jackc/chunkreader/v2 v2.0.1 // indirect
Expand All @@ -119,11 +141,14 @@ require (
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/kardianos/osext v0.0.0-20190222173326-2bc1f35cddc0 // indirect
github.com/kataras/tablewriter v0.0.0-20180708051242-e063d29b7c23 // indirect
github.com/kelseyhightower/envconfig v1.4.0 // indirect
github.com/klauspost/compress v1.9.8 // indirect
github.com/klauspost/compress v1.11.13 // indirect
github.com/kubeflow/common v0.4.3 // indirect
github.com/kubeflow/training-operator v1.5.0-rc.0 // indirect
github.com/kylelemons/godebug v1.1.0 // indirect
github.com/landoop/tableprinter v0.0.0-20180806200924-8bd8c2576d27 // indirect
github.com/lestrrat-go/backoff/v2 v2.0.8 // indirect
github.com/lestrrat-go/blackmagic v1.0.2 // indirect
github.com/lestrrat-go/httpcc v1.0.1 // indirect
Expand All @@ -134,16 +159,21 @@ require (
github.com/mailru/easyjson v0.7.7 // indirect
github.com/mattn/go-colorable v0.1.12 // indirect
github.com/mattn/go-isatty v0.0.16 // indirect
github.com/mattn/go-runewidth v0.0.13 // indirect
github.com/mattn/go-sqlite3 v2.0.3+incompatible // indirect
github.com/mattn/goveralls v0.0.6 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/moby/term v0.0.0-20221205130635-1aeaba878587 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826 // indirect
github.com/morikuni/aec v1.0.0 // indirect
github.com/mouuff/go-rocket-update v1.5.1 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/ncw/swift v1.0.53 // indirect
github.com/nxadm/tail v1.4.11 // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.0.2 // indirect
github.com/ory/fosite v0.42.2 // indirect
github.com/ory/go-acc v0.2.6 // indirect
github.com/ory/go-convenience v0.1.0 // indirect
Expand All @@ -161,7 +191,9 @@ require (
github.com/prometheus/procfs v0.10.1 // indirect
github.com/ray-project/kuberay/ray-operator v1.1.0-rc.1 // indirect
github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0 // indirect
github.com/rivo/uniseg v0.2.0 // indirect
github.com/robfig/cron/v3 v3.0.0 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/sendgrid/rest v2.6.8+incompatible // indirect
github.com/sendgrid/sendgrid-go v3.10.0+incompatible // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
Expand All @@ -177,6 +209,8 @@ require (
github.com/tidwall/pretty v1.2.0 // indirect
github.com/tidwall/sjson v1.2.5 // indirect
github.com/wI2L/jsondiff v0.5.0 // indirect
github.com/yalp/jsonpath v0.0.0-20180802001716-5cc68e5049a0 // indirect
github.com/zalando/go-keyring v0.1.1 // indirect
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.46.1 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.46.1 // indirect
Expand Down
Loading

0 comments on commit 92f7076

Please sign in to comment.