Skip to content

Commit

Permalink
refactor watcher package (#38)
Browse files Browse the repository at this point in the history
  • Loading branch information
pPrecel authored Jan 30, 2023
1 parent 16e64fd commit 1099d14
Show file tree
Hide file tree
Showing 9 changed files with 554 additions and 238 deletions.
9 changes: 4 additions & 5 deletions cmd/serve/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,13 @@ func NewCmd(o *options) *cobra.Command {
func run(o *options) error {
o.Logger.Info("starting gardeners agent")

resourceGetter, err := watcher.NewForConfig(&watcher.Options{
watcher := watcher.New(&watcher.Options{
Context: o.Context,
Logger: o.Logger.WithField("component", "watcher"),
ConfigPath: o.configPath,
})
if err != nil {
return err
}

go watcher.Start()

o.Logger.Debugf("configuring grpc server - network '%s', address '%s'", o.socketNetwork, o.socketAddress)
lis, err := agent.NewSocket(o.socketNetwork, o.socketAddress)
Expand All @@ -52,7 +51,7 @@ func run(o *options) error {

grpcServer := googlerpc.NewServer(googlerpc.EmptyServerOption{})
agentServer := agent.NewServer(&agent.ServerOption{
ResourceGetter: resourceGetter,
ResourceGetter: watcher,
Logger: o.Logger.WithField("component", "server"),
})
cloud_agent.RegisterAgentServer(grpcServer, agentServer)
Expand Down
18 changes: 0 additions & 18 deletions cmd/serve/serve_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,22 +109,4 @@ func Test_run(t *testing.T) {

assert.Error(t, c.RunE(c, []string{}))
})

t.Run("newWatcher error", func(t *testing.T) {
l := logrus.New()
l.Out = io.Discard
o := &options{
Options: &command.Options{
Logger: l,
Context: context.Background(),
},
configPath: "/empty/path",
socketNetwork: testNetwork,
}
c := NewCmd(o)

o.configPath = "/empty/path"

assert.Error(t, c.RunE(c, []string{}))
})
}
86 changes: 28 additions & 58 deletions internal/watcher/cached.go
Original file line number Diff line number Diff line change
@@ -1,87 +1,57 @@
package watcher

import (
"github.com/gardener/gardener/pkg/apis/core/v1beta1"
"github.com/pPrecel/cloudagent/internal/gardener"
"github.com/pPrecel/cloudagent/internal/system"
"context"

"github.com/pPrecel/cloudagent/pkg/agent"
"github.com/pPrecel/cloudagent/pkg/config"
"github.com/sirupsen/logrus"
)

type watcher struct {
options *Options
cache *agent.ServerCache
type cached struct {
logger *logrus.Entry
configPath string
cache agent.ResourceGetter

getConfig func(string) (*config.Config, error)
notifyChange func(string) (*system.Notifier, error)
getConfig func(string) (*config.Config, error)
}

func newCached(cache *agent.ServerCache, o *Options) *watcher {
return &watcher{
options: o,
cache: cache,
getConfig: config.Read,
notifyChange: system.NotifyChange,
func newCached(cache agent.ResourceGetter, logger *logrus.Entry, configPath string) *cached {
return &cached{
logger: logger,
configPath: configPath,
cache: cache,
getConfig: config.Read,
}
}

func (w *watcher) start() error {
w.options.Logger.Debug("starting cached watcher")
watcher, err := w.newWatcher()
func (w *cached) start(ctx context.Context) error {
w.logger.Debug("starting cached watcher")
watcher, err := w.newWatcher(ctx)
if err != nil {
return err
}
defer watcher.Stop()
watcher.Start()

w.options.Logger.Info("starting config notifier")
n, err := w.notifyChange(w.options.ConfigPath)
if err != nil {
return err
}
defer n.Stop()

select {
case err := <-n.Errors:
return err
case <-n.IsMotified:
return nil
}
<-ctx.Done()
return nil
}

func (w *watcher) newWatcher() (*agent.Watcher, error) {
w.options.Logger.Infof("reading config from path: '%s'", w.options.ConfigPath)
config, err := w.getConfig(w.options.ConfigPath)
func (w *cached) newWatcher(ctx context.Context) (*agent.Watcher, error) {
w.logger.Infof("reading config from path: '%s'", w.configPath)
config, err := w.getConfig(w.configPath)
if err != nil {
return nil, err
}

w.options.Logger.Infof("starting state watcher with spec: '%s'", config.PersistentSpec)
w.logger.Infof("starting state watcher with spec: '%s'", config.PersistentSpec)
return agent.NewWatcher(agent.WatcherOptions{
Spec: config.PersistentSpec,
Context: w.options.Context,
Logger: w.options.Logger,
}, parseWatcherFns(w.options.Logger, w.cache.GardenerCache, config)...)
}

func parseWatcherFns(l *logrus.Entry, gardenerCache agent.Cache[*v1beta1.ShootList], config *config.Config) []agent.WatchFn {
funcs := []agent.WatchFn{}
for i := range config.GardenerProjects {
p := config.GardenerProjects[i]
r := gardenerCache.Register(p.Namespace)

l.Debugf("creating watcher func for namespace: '%s'", p.Namespace)
l := l.WithFields(
logrus.Fields{
"provider": "gardener",
"project": p.Namespace,
},
)
funcs = append(funcs,
gardener.NewWatchFunc(l, r, p.Namespace, p.KubeconfigPath),
)
}

return funcs
Context: ctx,
Logger: w.logger,
}, parseWatcherFns(
w.logger,
w.cache.GetGardenerCache(),
config)...)
}
116 changes: 36 additions & 80 deletions internal/watcher/cached_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"testing"

"github.com/gardener/gardener/pkg/apis/core/v1beta1"
"github.com/pPrecel/cloudagent/internal/system"
"github.com/pPrecel/cloudagent/pkg/agent"
"github.com/pPrecel/cloudagent/pkg/config"
"github.com/sirupsen/logrus"
Expand All @@ -28,112 +27,69 @@ var (
}
)

func TestNewWatcher(t *testing.T) {
t.Run("new watcher", func(t *testing.T) {
assert.NotNil(t, newCached(nil, nil))
func TestNewCached(t *testing.T) {
t.Run("new cached watcher", func(t *testing.T) {
assert.NotNil(t, newCached(nil, nil, ""))
})
}

func Test_watcher_Start(t *testing.T) {
func Test_cached_Start(t *testing.T) {
l := &logrus.Entry{
Logger: logrus.New(),
}
l.Logger.Out = io.Discard

type fields struct {
getConfig func(string) (*config.Config, error)
notifyChange func(string) (*system.Notifier, error)
logger *logrus.Entry
configPath string
getConfig func(string) (*config.Config, error)
}
type args struct {
context context.Context
}
tests := []struct {
name string
fields fields
args *Options
args args
cache *agent.ServerCache
wantErr bool
}{
{
name: "notify change",
name: "context done",
fields: fields{
getConfig: fixConfigFn,
notifyChange: func(s string) (*system.Notifier, error) {
n := &system.Notifier{
IsMotified: make(chan interface{}),
Errors: make(chan error),
Stop: func() {},
}
go func() {
n.IsMotified <- 1
}()

return n, nil
configPath: "",
logger: l,
getConfig: func(s string) (*config.Config, error) {
return &config.Config{
PersistentSpec: "@every 120s",
GardenerProjects: []config.GardenerProject{
{
Namespace: "test",
KubeconfigPath: "/test/path",
},
},
}, nil
},
},
args: &Options{
Context: context.Background(),
Logger: l,
ConfigPath: "",
args: args{
context: fixCanceledContext(),
},
cache: &agent.ServerCache{
GardenerCache: agent.NewCache[*v1beta1.ShootList](),
},
wantErr: false,
},
{
name: "notify change error",
fields: fields{
getConfig: fixConfigFn,
notifyChange: func(s string) (*system.Notifier, error) {
n := &system.Notifier{
IsMotified: make(chan interface{}),
Errors: make(chan error),
Stop: func() {},
}
go func() {
n.Errors <- errors.New("test error")
}()

return n, nil
},
},
args: &Options{
Context: context.Background(),
Logger: l,
ConfigPath: "",
},
cache: &agent.ServerCache{
GardenerCache: agent.NewCache[*v1beta1.ShootList](),
},
wantErr: true,
},
{
name: "getConfig error",
fields: fields{
configPath: "",
logger: l,
getConfig: func(s string) (*config.Config, error) {
return nil, errors.New("test error")
},
},
args: &Options{
Context: context.Background(),
Logger: l,
ConfigPath: "",
},
cache: &agent.ServerCache{
GardenerCache: agent.NewCache[*v1beta1.ShootList](),
},
wantErr: true,
},
{
name: "notifyChange error",
fields: fields{
getConfig: fixConfigFn,
notifyChange: func(s string) (*system.Notifier, error) {
return nil, errors.New("test error")
},
},
args: &Options{
Context: context.Background(),
Logger: l,
ConfigPath: "",
args: args{
context: context.Background(),
},
cache: &agent.ServerCache{
GardenerCache: agent.NewCache[*v1beta1.ShootList](),
Expand All @@ -143,14 +99,14 @@ func Test_watcher_Start(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
w := &watcher{
options: tt.args,
getConfig: tt.fields.getConfig,
notifyChange: tt.fields.notifyChange,
cache: tt.cache,
w := &cached{
logger: tt.fields.logger,
configPath: tt.fields.configPath,
getConfig: tt.fields.getConfig,
cache: tt.cache,
}

if err := w.start(); (err != nil) != tt.wantErr {
if err := w.start(tt.args.context); (err != nil) != tt.wantErr {
t.Errorf("watcher.Start() error = %v, wantErr %v", err, tt.wantErr)
}
})
Expand Down
30 changes: 30 additions & 0 deletions internal/watcher/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package watcher

import (
"github.com/gardener/gardener/pkg/apis/core/v1beta1"
"github.com/pPrecel/cloudagent/internal/gardener"
"github.com/pPrecel/cloudagent/pkg/agent"
"github.com/pPrecel/cloudagent/pkg/config"
"github.com/sirupsen/logrus"
)

func parseWatcherFns(l *logrus.Entry, gardenerCache agent.Cache[*v1beta1.ShootList], config *config.Config) []agent.WatchFn {
funcs := []agent.WatchFn{}
for i := range config.GardenerProjects {
p := config.GardenerProjects[i]
r := gardenerCache.Register(p.Namespace)

l.Debugf("creating watcher func for namespace: '%s'", p.Namespace)
l := l.WithFields(
logrus.Fields{
"provider": "gardener",
"project": p.Namespace,
},
)
funcs = append(funcs,
gardener.NewWatchFunc(l, r, p.Namespace, p.KubeconfigPath),
)
}

return funcs
}
Loading

0 comments on commit 1099d14

Please sign in to comment.