diff --git a/cmd/serve/serve.go b/cmd/serve/serve.go index b51ae48..b3dfec8 100644 --- a/cmd/serve/serve.go +++ b/cmd/serve/serve.go @@ -35,14 +35,13 @@ func NewCmd(o *options) *cobra.Command { func run(o *options) error { o.Logger.Info("starting gardeners agent") - resourceGetter, err := watcher.NewForConfig(&watcher.Options{ + watcher := watcher.New(&watcher.Options{ Context: o.Context, Logger: o.Logger.WithField("component", "watcher"), ConfigPath: o.configPath, }) - if err != nil { - return err - } + + go watcher.Start() o.Logger.Debugf("configuring grpc server - network '%s', address '%s'", o.socketNetwork, o.socketAddress) lis, err := agent.NewSocket(o.socketNetwork, o.socketAddress) @@ -52,7 +51,7 @@ func run(o *options) error { grpcServer := googlerpc.NewServer(googlerpc.EmptyServerOption{}) agentServer := agent.NewServer(&agent.ServerOption{ - ResourceGetter: resourceGetter, + ResourceGetter: watcher, Logger: o.Logger.WithField("component", "server"), }) cloud_agent.RegisterAgentServer(grpcServer, agentServer) diff --git a/cmd/serve/serve_test.go b/cmd/serve/serve_test.go index 4a34888..00d5a7e 100644 --- a/cmd/serve/serve_test.go +++ b/cmd/serve/serve_test.go @@ -109,22 +109,4 @@ func Test_run(t *testing.T) { assert.Error(t, c.RunE(c, []string{})) }) - - t.Run("newWatcher error", func(t *testing.T) { - l := logrus.New() - l.Out = io.Discard - o := &options{ - Options: &command.Options{ - Logger: l, - Context: context.Background(), - }, - configPath: "/empty/path", - socketNetwork: testNetwork, - } - c := NewCmd(o) - - o.configPath = "/empty/path" - - assert.Error(t, c.RunE(c, []string{})) - }) } diff --git a/internal/watcher/cached.go b/internal/watcher/cached.go index 444c7de..62862fe 100644 --- a/internal/watcher/cached.go +++ b/internal/watcher/cached.go @@ -1,87 +1,57 @@ package watcher import ( - "github.com/gardener/gardener/pkg/apis/core/v1beta1" - "github.com/pPrecel/cloudagent/internal/gardener" - "github.com/pPrecel/cloudagent/internal/system" + "context" + "github.com/pPrecel/cloudagent/pkg/agent" "github.com/pPrecel/cloudagent/pkg/config" "github.com/sirupsen/logrus" ) -type watcher struct { - options *Options - cache *agent.ServerCache +type cached struct { + logger *logrus.Entry + configPath string + cache agent.ResourceGetter - getConfig func(string) (*config.Config, error) - notifyChange func(string) (*system.Notifier, error) + getConfig func(string) (*config.Config, error) } -func newCached(cache *agent.ServerCache, o *Options) *watcher { - return &watcher{ - options: o, - cache: cache, - getConfig: config.Read, - notifyChange: system.NotifyChange, +func newCached(cache agent.ResourceGetter, logger *logrus.Entry, configPath string) *cached { + return &cached{ + logger: logger, + configPath: configPath, + cache: cache, + getConfig: config.Read, } } -func (w *watcher) start() error { - w.options.Logger.Debug("starting cached watcher") - watcher, err := w.newWatcher() +func (w *cached) start(ctx context.Context) error { + w.logger.Debug("starting cached watcher") + watcher, err := w.newWatcher(ctx) if err != nil { return err } defer watcher.Stop() watcher.Start() - w.options.Logger.Info("starting config notifier") - n, err := w.notifyChange(w.options.ConfigPath) - if err != nil { - return err - } - defer n.Stop() - - select { - case err := <-n.Errors: - return err - case <-n.IsMotified: - return nil - } + <-ctx.Done() + return nil } -func (w *watcher) newWatcher() (*agent.Watcher, error) { - w.options.Logger.Infof("reading config from path: '%s'", w.options.ConfigPath) - config, err := w.getConfig(w.options.ConfigPath) +func (w *cached) newWatcher(ctx context.Context) (*agent.Watcher, error) { + w.logger.Infof("reading config from path: '%s'", w.configPath) + config, err := w.getConfig(w.configPath) if err != nil { return nil, err } - w.options.Logger.Infof("starting state watcher with spec: '%s'", config.PersistentSpec) + w.logger.Infof("starting state watcher with spec: '%s'", config.PersistentSpec) return agent.NewWatcher(agent.WatcherOptions{ Spec: config.PersistentSpec, - Context: w.options.Context, - Logger: w.options.Logger, - }, parseWatcherFns(w.options.Logger, w.cache.GardenerCache, config)...) -} - -func parseWatcherFns(l *logrus.Entry, gardenerCache agent.Cache[*v1beta1.ShootList], config *config.Config) []agent.WatchFn { - funcs := []agent.WatchFn{} - for i := range config.GardenerProjects { - p := config.GardenerProjects[i] - r := gardenerCache.Register(p.Namespace) - - l.Debugf("creating watcher func for namespace: '%s'", p.Namespace) - l := l.WithFields( - logrus.Fields{ - "provider": "gardener", - "project": p.Namespace, - }, - ) - funcs = append(funcs, - gardener.NewWatchFunc(l, r, p.Namespace, p.KubeconfigPath), - ) - } - - return funcs + Context: ctx, + Logger: w.logger, + }, parseWatcherFns( + w.logger, + w.cache.GetGardenerCache(), + config)...) } diff --git a/internal/watcher/cached_test.go b/internal/watcher/cached_test.go index f11e150..58dc807 100644 --- a/internal/watcher/cached_test.go +++ b/internal/watcher/cached_test.go @@ -7,7 +7,6 @@ import ( "testing" "github.com/gardener/gardener/pkg/apis/core/v1beta1" - "github.com/pPrecel/cloudagent/internal/system" "github.com/pPrecel/cloudagent/pkg/agent" "github.com/pPrecel/cloudagent/pkg/config" "github.com/sirupsen/logrus" @@ -28,112 +27,69 @@ var ( } ) -func TestNewWatcher(t *testing.T) { - t.Run("new watcher", func(t *testing.T) { - assert.NotNil(t, newCached(nil, nil)) +func TestNewCached(t *testing.T) { + t.Run("new cached watcher", func(t *testing.T) { + assert.NotNil(t, newCached(nil, nil, "")) }) } -func Test_watcher_Start(t *testing.T) { +func Test_cached_Start(t *testing.T) { l := &logrus.Entry{ Logger: logrus.New(), } l.Logger.Out = io.Discard type fields struct { - getConfig func(string) (*config.Config, error) - notifyChange func(string) (*system.Notifier, error) + logger *logrus.Entry + configPath string + getConfig func(string) (*config.Config, error) + } + type args struct { + context context.Context } tests := []struct { name string fields fields - args *Options + args args cache *agent.ServerCache wantErr bool }{ { - name: "notify change", + name: "context done", fields: fields{ - getConfig: fixConfigFn, - notifyChange: func(s string) (*system.Notifier, error) { - n := &system.Notifier{ - IsMotified: make(chan interface{}), - Errors: make(chan error), - Stop: func() {}, - } - go func() { - n.IsMotified <- 1 - }() - - return n, nil + configPath: "", + logger: l, + getConfig: func(s string) (*config.Config, error) { + return &config.Config{ + PersistentSpec: "@every 120s", + GardenerProjects: []config.GardenerProject{ + { + Namespace: "test", + KubeconfigPath: "/test/path", + }, + }, + }, nil }, }, - args: &Options{ - Context: context.Background(), - Logger: l, - ConfigPath: "", + args: args{ + context: fixCanceledContext(), }, cache: &agent.ServerCache{ GardenerCache: agent.NewCache[*v1beta1.ShootList](), }, wantErr: false, }, - { - name: "notify change error", - fields: fields{ - getConfig: fixConfigFn, - notifyChange: func(s string) (*system.Notifier, error) { - n := &system.Notifier{ - IsMotified: make(chan interface{}), - Errors: make(chan error), - Stop: func() {}, - } - go func() { - n.Errors <- errors.New("test error") - }() - - return n, nil - }, - }, - args: &Options{ - Context: context.Background(), - Logger: l, - ConfigPath: "", - }, - cache: &agent.ServerCache{ - GardenerCache: agent.NewCache[*v1beta1.ShootList](), - }, - wantErr: true, - }, { name: "getConfig error", fields: fields{ + configPath: "", + logger: l, getConfig: func(s string) (*config.Config, error) { return nil, errors.New("test error") }, }, - args: &Options{ - Context: context.Background(), - Logger: l, - ConfigPath: "", - }, - cache: &agent.ServerCache{ - GardenerCache: agent.NewCache[*v1beta1.ShootList](), - }, - wantErr: true, - }, - { - name: "notifyChange error", - fields: fields{ - getConfig: fixConfigFn, - notifyChange: func(s string) (*system.Notifier, error) { - return nil, errors.New("test error") - }, - }, - args: &Options{ - Context: context.Background(), - Logger: l, - ConfigPath: "", + args: args{ + context: context.Background(), }, cache: &agent.ServerCache{ GardenerCache: agent.NewCache[*v1beta1.ShootList](), @@ -143,14 +99,14 @@ func Test_watcher_Start(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - w := &watcher{ - options: tt.args, - getConfig: tt.fields.getConfig, - notifyChange: tt.fields.notifyChange, - cache: tt.cache, + w := &cached{ + logger: tt.fields.logger, + configPath: tt.fields.configPath, + getConfig: tt.fields.getConfig, + cache: tt.cache, } - if err := w.start(); (err != nil) != tt.wantErr { + if err := w.start(tt.args.context); (err != nil) != tt.wantErr { t.Errorf("watcher.Start() error = %v, wantErr %v", err, tt.wantErr) } }) diff --git a/internal/watcher/common.go b/internal/watcher/common.go new file mode 100644 index 0000000..ea96c62 --- /dev/null +++ b/internal/watcher/common.go @@ -0,0 +1,30 @@ +package watcher + +import ( + "github.com/gardener/gardener/pkg/apis/core/v1beta1" + "github.com/pPrecel/cloudagent/internal/gardener" + "github.com/pPrecel/cloudagent/pkg/agent" + "github.com/pPrecel/cloudagent/pkg/config" + "github.com/sirupsen/logrus" +) + +func parseWatcherFns(l *logrus.Entry, gardenerCache agent.Cache[*v1beta1.ShootList], config *config.Config) []agent.WatchFn { + funcs := []agent.WatchFn{} + for i := range config.GardenerProjects { + p := config.GardenerProjects[i] + r := gardenerCache.Register(p.Namespace) + + l.Debugf("creating watcher func for namespace: '%s'", p.Namespace) + l := l.WithFields( + logrus.Fields{ + "provider": "gardener", + "project": p.Namespace, + }, + ) + funcs = append(funcs, + gardener.NewWatchFunc(l, r, p.Namespace, p.KubeconfigPath), + ) + } + + return funcs +} diff --git a/internal/watcher/ondemand.go b/internal/watcher/ondemand.go index cb1f580..2d9b990 100644 --- a/internal/watcher/ondemand.go +++ b/internal/watcher/ondemand.go @@ -9,7 +9,7 @@ import ( "github.com/sirupsen/logrus" ) -type onDemandWatcher struct { +type ondemand struct { cache agent.Cache[*v1beta1.ShootList] fns []agent.WatchFn @@ -21,8 +21,8 @@ type onDemandWatcher struct { parseWatcherFns func(*logrus.Entry, agent.Cache[*v1beta1.ShootList], *config.Config) []agent.WatchFn } -func newOnDemand(o *Options) *onDemandWatcher { - return &onDemandWatcher{ +func newOnDemand(o *Options) *ondemand { + return &ondemand{ context: o.Context, logger: o.Logger, cache: agent.NewCache[*v1beta1.ShootList](), @@ -32,22 +32,28 @@ func newOnDemand(o *Options) *onDemandWatcher { } } -func (rw *onDemandWatcher) GetGardenerCache() agent.Cache[*v1beta1.ShootList] { - rw.cache.Clean() - for i := range rw.fns { - rw.fns[i](rw.context) +func (w *ondemand) start(ctx context.Context) error { + // wait for cotext Done only + <-ctx.Done() + return nil +} + +func (w *ondemand) GetGardenerCache() agent.Cache[*v1beta1.ShootList] { + for i := range w.fns { + w.fns[i](w.context) } - return rw.cache + return w.cache } -func (rw *onDemandWatcher) GetGeneralError() error { - cfg, err := rw.getConfig(rw.configPath) +func (w *ondemand) GetGeneralError() error { + w.cache.Clean() + cfg, err := w.getConfig(w.configPath) if err != nil { return err } - rw.fns = rw.parseWatcherFns(rw.logger, rw.cache, cfg) + w.fns = w.parseWatcherFns(w.logger, w.cache, cfg) return nil } diff --git a/internal/watcher/ondemand_test.go b/internal/watcher/ondemand_test.go index 3f11455..4c35040 100644 --- a/internal/watcher/ondemand_test.go +++ b/internal/watcher/ondemand_test.go @@ -41,7 +41,8 @@ func TestNewOnDemand(t *testing.T) { func Test_onDemandWatcher_GetGeneralError(t *testing.T) { t.Run("get nil", func(t *testing.T) { - w := onDemandWatcher{ + w := ondemand{ + cache: agent.NewCache[*v1beta1.ShootList](), getConfig: func(s string) (*config.Config, error) { return nil, nil }, @@ -54,7 +55,8 @@ func Test_onDemandWatcher_GetGeneralError(t *testing.T) { }) t.Run("get general error", func(t *testing.T) { - w := onDemandWatcher{ + w := ondemand{ + cache: agent.NewCache[*v1beta1.ShootList](), getConfig: func(s string) (*config.Config, error) { return nil, errors.New("test error") }, @@ -69,7 +71,7 @@ func Test_onDemandWatcher_GetGardenerCache(t *testing.T) { cache := agent.NewCache[*v1beta1.ShootList]() cache.Register("test-1").Set(nil, nil) - w := onDemandWatcher{ + w := ondemand{ cache: cache, fns: []agent.WatchFn{ func(ctx context.Context) { @@ -84,3 +86,13 @@ func Test_onDemandWatcher_GetGardenerCache(t *testing.T) { assert.NoError(t, cache.Resources()["test-1"].Get().Error) }) } + +func Test_ondemand_start(t *testing.T) { + t.Run("context done", func(t *testing.T) { + ctx := fixCanceledContext() + + w := ondemand{} + err := w.start(ctx) + assert.NoError(t, err) + }) +} diff --git a/internal/watcher/watcher.go b/internal/watcher/watcher.go index c40d713..1a7eccc 100644 --- a/internal/watcher/watcher.go +++ b/internal/watcher/watcher.go @@ -2,10 +2,13 @@ package watcher import ( "context" + "errors" "fmt" + "sync" "time" "github.com/gardener/gardener/pkg/apis/core/v1beta1" + "github.com/pPrecel/cloudagent/internal/system" "github.com/pPrecel/cloudagent/pkg/agent" "github.com/pPrecel/cloudagent/pkg/config" "github.com/sirupsen/logrus" @@ -17,39 +20,55 @@ type Options struct { ConfigPath string } -func NewForConfig(o *Options) (agent.ResourceGetter, error) { - return newForConfig(o, config.Read) +type watcher struct { + mu sync.Mutex + w agent.ResourceGetter + + o *Options + + getConfig func(string) (*config.Config, error) + notifyChange func(string) (*system.Notifier, error) } -func newForConfig(o *Options, getConfig func(string) (*config.Config, error)) (agent.ResourceGetter, error) { - cfg, err := getConfig(o.ConfigPath) - if err != nil { - return nil, fmt.Errorf("failed to read config: %s", err) +func New(o *Options) *watcher { + return &watcher{ + o: o, + getConfig: config.Read, + notifyChange: system.NotifyChange, } +} - if cfg.PersistentSpec == "on-demand" { - return newOnDemand(o), nil - } else { - cache := &agent.ServerCache{ - GardenerCache: agent.NewCache[*v1beta1.ShootList](), - } +func (w *watcher) GetGardenerCache() agent.Cache[*v1beta1.ShootList] { + w.mu.Lock() + defer w.mu.Unlock() - go setupWatcher(cache, o) + if w.w != nil { + return w.w.GetGardenerCache() + } + return nil +} - return cache, nil +func (w *watcher) GetGeneralError() error { + w.mu.Lock() + defer w.mu.Unlock() + + if w.w != nil { + return w.w.GetGeneralError() } + return errors.New("watcher not implemented") } -func setupWatcher(cache *agent.ServerCache, o *Options) { +func (w *watcher) Start() { for { - select { - case <-o.Context.Done(): - o.Logger.Warn("watcher context done. Exiting") + case <-w.o.Context.Done(): + w.o.Logger.Warn("watcher context done. Exiting") return default: - cache.GeneralError = nil - startWatcher(cache, o) + err := w.start() + if err != nil { + w.o.Logger.Warnf("runned watcher error: %s", err) + } // wait 1sec to avoid CPU throttling time.Sleep(time.Second * 1) @@ -57,18 +76,71 @@ func setupWatcher(cache *agent.ServerCache, o *Options) { } } -func startWatcher(cache *agent.ServerCache, o *Options) { - if err := newCached(cache, &Options{ - Context: o.Context, - Logger: o.Logger, - ConfigPath: o.ConfigPath, - }).start(); err != nil { - o.Logger.Warn(err) - cache.GeneralError = err +type watcherStarter interface { + start(context.Context) error +} + +func (w *watcher) start() error { + var err error + starter, err := w.buildWatcher() + if err != nil { + return fmt.Errorf("failed to build watcher: %f", err) } - o.Logger.Info("configuration midyfication detected") + ctx, cancel := context.WithCancel(w.o.Context) + defer cancel() + + go starter.start(ctx) + + w.o.Logger.Info("starting config notifier") + n, err := w.notifyChange(w.o.ConfigPath) + if err != nil { + return err + } + defer n.Stop() - o.Logger.Info("cleaning up cache") - cache.GardenerCache.Clean() + select { + case err := <-n.Errors: + return err + case <-n.IsMotified: + w.o.Logger.Info("configuration midyfication detected") + return nil + } +} + +func (w *watcher) buildWatcher() (watcherStarter, error) { + w.mu.Lock() + defer w.mu.Unlock() + + cfg, err := w.getConfig(w.o.ConfigPath) + if err != nil { + return nil, fmt.Errorf("failed to read config: %s", err) + } + + if cfg.PersistentSpec == "on-demand" { + ondemand := newOnDemand(w.o) + w.w = ondemand + return ondemand, nil + } + + w.w = newCache(w.w) + return newCached(w.w, w.o.Logger, w.o.ConfigPath), nil +} + +func newCache(actualCache agent.ResourceGetter) agent.ResourceGetter { + if actualCache == nil { + return &agent.ServerCache{ + GardenerCache: agent.NewCache[*v1beta1.ShootList](), + } + } + + switch actualCache.(type) { + case *agent.ServerCache: + c := actualCache.(*agent.ServerCache) + c.GardenerCache.Clean() + c.GeneralError = nil + return c + default: + return newCache(nil) + } } diff --git a/internal/watcher/watcher_test.go b/internal/watcher/watcher_test.go index 7cdf9f6..0eb544b 100644 --- a/internal/watcher/watcher_test.go +++ b/internal/watcher/watcher_test.go @@ -2,11 +2,12 @@ package watcher import ( "context" + "errors" "io" "testing" - "time" "github.com/gardener/gardener/pkg/apis/core/v1beta1" + "github.com/pPrecel/cloudagent/internal/system" "github.com/pPrecel/cloudagent/pkg/agent" "github.com/pPrecel/cloudagent/pkg/config" "github.com/sirupsen/logrus" @@ -27,56 +28,344 @@ var ( } ) -func TestNewForConfig(t *testing.T) { +func TestNew(t *testing.T) { t.Run("read config error", func(t *testing.T) { - rg, err := NewForConfig(&Options{ + rg := New(&Options{ ConfigPath: "", }) - assert.Error(t, err) - assert.Nil(t, rg) + assert.NotNil(t, rg) }) } -func Test_newForConfig(t *testing.T) { - l := logrus.New() - l.Out = io.Discard +type resourceGetterStub struct { + ch chan struct{} + cache agent.Cache[*v1beta1.ShootList] + err error +} + +func (stub *resourceGetterStub) GetGardenerCache() agent.Cache[*v1beta1.ShootList] { + <-stub.ch + return stub.cache +} - t.Run("new on demand", func(t *testing.T) { - rg, err := newForConfig(&Options{}, fixOndemandConfig) - assert.NoError(t, err) - assert.NotNil(t, rg) +func (stub *resourceGetterStub) GetGeneralError() error { + <-stub.ch + return stub.err +} + +func TestWatcher_GetGardenerCache(t *testing.T) { + t.Run("lock and return cache", func(t *testing.T) { + ch := make(chan struct{}) + resourceGetter := resourceGetterStub{ + ch: ch, + } + w := watcher{ + w: &resourceGetter, + } + + stop := make(chan struct{}) + go func() { + c := w.GetGardenerCache() + assert.Nil(t, c) + stop <- struct{}{} + }() + + ch <- struct{}{} + <-stop }) + t.Run("nil cache", func(t *testing.T) { + w := watcher{ + w: nil, + } + + assert.Nil(t, w.GetGardenerCache()) + }) +} + +func Test_watcher_GetGeneralError(t *testing.T) { + t.Run("lock and return cache", func(t *testing.T) { + ch := make(chan struct{}) + resourceGetter := resourceGetterStub{ + ch: ch, + } + w := watcher{ + w: &resourceGetter, + } + + stop := make(chan struct{}) + go func() { + c := w.GetGeneralError() + assert.Nil(t, c) + stop <- struct{}{} + }() + + ch <- struct{}{} + <-stop + }) + t.Run("nil cache", func(t *testing.T) { + w := watcher{ + w: nil, + } - t.Run("watcher context done", func(t *testing.T) { + assert.Error(t, w.GetGeneralError()) + }) +} + +func Test_watcher_Start(t *testing.T) { + l := &logrus.Entry{ + Logger: logrus.New(), + } + l.Logger.Out = io.Discard + + t.Run("context done", func(t *testing.T) { + w := watcher{ + o: &Options{ + Context: fixCanceledContext(), + Logger: l, + }, + } + + w.Start() + }) + + t.Run("do not panic on error", func(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) - cancel() - rg, err := newForConfig(&Options{ - Context: ctx, - Logger: l.WithField("test", "test"), - }, fixConfig) - assert.NoError(t, err) - assert.NotNil(t, rg) + stop := make(chan struct{}) + w := watcher{ + getConfig: func(s string) (*config.Config, error) { + stop <- struct{}{} + return nil, errors.New("test error") + }, + o: &Options{ + Context: ctx, + Logger: l, + }, + } + + go w.Start() + + <-stop + cancel() }) } -func Test_setupWatcher(t *testing.T) { - l := logrus.New() - l.Out = io.Discard +func Test_watcher_start(t *testing.T) { + l := &logrus.Entry{ + Logger: logrus.New(), + } + l.Logger.Out = io.Discard - t.Run("lib watcher error", func(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second) + t.Run("start on-demand", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) defer cancel() - c := &agent.ServerCache{ - GardenerCache: agent.NewCache[*v1beta1.ShootList](), + modified := make(chan interface{}) + notifier := &system.Notifier{ + IsMotified: modified, + Stop: func() {}, } - setupWatcher(c, &Options{ - Context: ctx, - Logger: l.WithField("test", "test"), - }) + stop := make(chan interface{}) + w := watcher{ + getConfig: func(s string) (*config.Config, error) { + return &config.Config{ + PersistentSpec: "on-demand", + }, nil + }, + notifyChange: func(s string) (*system.Notifier, error) { + return notifier, nil + }, + o: &Options{ + Context: ctx, + Logger: l, + }, + } - assert.Len(t, c.GardenerCache.Resources(), 0) + go func() { + err := w.start() + assert.Nil(t, err) + stop <- struct{}{} + }() + + modified <- struct{}{} + <-stop }) + + t.Run("start cached", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + modified := make(chan interface{}) + notifier := &system.Notifier{ + IsMotified: modified, + Stop: func() {}, + } + + stop := make(chan interface{}) + w := watcher{ + getConfig: func(s string) (*config.Config, error) { + return &config.Config{ + PersistentSpec: "@every 120s", + }, nil + }, + notifyChange: func(s string) (*system.Notifier, error) { + return notifier, nil + }, + o: &Options{ + Context: ctx, + Logger: l, + }, + } + + go func() { + err := w.start() + assert.Nil(t, err) + stop <- struct{}{} + }() + + modified <- struct{}{} + <-stop + }) + + t.Run("start cached when cache is not nil", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + modified := make(chan interface{}) + notifier := &system.Notifier{ + IsMotified: modified, + Stop: func() {}, + } + + stop := make(chan interface{}) + w := watcher{ + w: &agent.ServerCache{ + GardenerCache: agent.NewCache[*v1beta1.ShootList](), + }, + getConfig: func(s string) (*config.Config, error) { + return &config.Config{ + PersistentSpec: "@every 120s", + }, nil + }, + notifyChange: func(s string) (*system.Notifier, error) { + return notifier, nil + }, + o: &Options{ + Context: ctx, + Logger: l, + }, + } + + go func() { + err := w.start() + assert.Nil(t, err) + stop <- struct{}{} + }() + + modified <- struct{}{} + <-stop + }) + + t.Run("start cached when cache is on-demand watcher", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + modified := make(chan interface{}) + notifier := &system.Notifier{ + IsMotified: modified, + Stop: func() {}, + } + + stop := make(chan interface{}) + w := watcher{ + w: &ondemand{}, + getConfig: func(s string) (*config.Config, error) { + return &config.Config{ + PersistentSpec: "@every 120s", + }, nil + }, + notifyChange: func(s string) (*system.Notifier, error) { + return notifier, nil + }, + o: &Options{ + Context: ctx, + Logger: l, + }, + } + + go func() { + err := w.start() + assert.Nil(t, err) + stop <- struct{}{} + }() + + modified <- struct{}{} + <-stop + }) + + t.Run("handle notify error channel", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + chanErr := make(chan error) + notifier := &system.Notifier{ + Errors: chanErr, + Stop: func() {}, + } + + stop := make(chan interface{}) + w := watcher{ + getConfig: func(s string) (*config.Config, error) { + return &config.Config{ + PersistentSpec: "on-demand", + }, nil + }, + notifyChange: func(s string) (*system.Notifier, error) { + return notifier, nil + }, + o: &Options{ + Context: ctx, + Logger: l, + }, + } + + go func() { + err := w.start() + assert.Error(t, err) + stop <- struct{}{} + }() + + chanErr <- errors.New("test error") + <-stop + }) + + t.Run("notify error", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + w := watcher{ + getConfig: func(s string) (*config.Config, error) { + return &config.Config{ + PersistentSpec: "on-demand", + }, nil + }, + notifyChange: func(s string) (*system.Notifier, error) { + return nil, errors.New("test error") + }, + o: &Options{ + Context: ctx, + Logger: l, + }, + } + + err := w.start() + assert.Error(t, err) + }) +} + +func fixCanceledContext() context.Context { + ctx, cancel := context.WithCancel(context.Background()) + cancel() + return ctx }