From 8aee9f9ba49c59bd9a2f25c8be2a0e4bcfb86327 Mon Sep 17 00:00:00 2001 From: marcosQuesada <marcos.quesadas@gmail.com> Date: Sun, 10 Apr 2022 16:26:01 +0200 Subject: [PATCH 1/2] use k8s exec to get workload config from workers, this will feed up swarm state --- go.mod | 5 +- go.sum | 1 + pkg/config/config.go | 17 ++++ pkg/operator/configmap/provider_test.go | 8 +- pkg/operator/pod/exec.go | 54 ++++++++++ pkg/operator/pod/exec_test.go | 34 +++++++ .../swarm-pool-controller/cmd/external.go | 12 ++- .../internal/app/controller.go | 98 +++++++++++++------ .../internal/app/event.go | 28 ++++-- .../internal/app/pool_test.go | 2 +- .../internal/infra/k8s/pod/provider.go | 53 ++++++++++ .../internal/infra/k8s/pod/provider_test.go | 29 ++++++ 12 files changed, 298 insertions(+), 43 deletions(-) create mode 100644 pkg/operator/pod/exec.go create mode 100644 pkg/operator/pod/exec_test.go create mode 100644 services/swarm-pool-controller/internal/infra/k8s/pod/provider.go create mode 100644 services/swarm-pool-controller/internal/infra/k8s/pod/provider_test.go diff --git a/go.mod b/go.mod index 9f34bb5..68575ba 100644 --- a/go.mod +++ b/go.mod @@ -5,9 +5,7 @@ go 1.17 require ( github.com/davecgh/go-spew v1.1.1 github.com/fsnotify/fsnotify v1.5.1 - github.com/google/go-cmp v0.5.7 github.com/gorilla/mux v1.8.0 - github.com/mitchellh/go-homedir v1.1.0 github.com/mitchellh/mapstructure v1.4.3 github.com/sirupsen/logrus v1.8.1 github.com/spf13/cobra v1.4.0 @@ -33,6 +31,7 @@ require ( github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/golang/protobuf v1.5.2 // indirect + github.com/google/go-cmp v0.5.7 // indirect github.com/google/gofuzz v1.1.0 // indirect github.com/googleapis/gnostic v0.5.5 // indirect github.com/hashicorp/hcl v1.0.0 // indirect @@ -42,6 +41,8 @@ require ( github.com/json-iterator/go v1.1.12 // indirect github.com/magiconair/properties v1.8.5 // indirect github.com/mailru/easyjson v0.7.6 // indirect + github.com/mitchellh/go-homedir v1.1.0 // indirect + github.com/moby/spdystream v0.2.0 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/pelletier/go-toml v1.9.4 // indirect diff --git a/go.sum b/go.sum index 86a19fe..ebc2b89 100644 --- a/go.sum +++ b/go.sum @@ -398,6 +398,7 @@ github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh github.com/mitchellh/mapstructure v1.4.1/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= github.com/mitchellh/mapstructure v1.4.3 h1:OVowDSCllw/YjdLkam3/sm7wEtOy59d8ndGgCcyj8cs= github.com/mitchellh/mapstructure v1.4.3/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= +github.com/moby/spdystream v0.2.0 h1:cjW1zVyyoiM0T7b6UoySUFqzXMoqRckQtXwGPiBhOM8= github.com/moby/spdystream v0.2.0/go.mod h1:f7i0iNDQJ059oMTcWxx8MA/zKFIuD/lY+0GqbN2Wy8c= github.com/moby/term v0.0.0-20210610120745-9d4ed1856297/go.mod h1:vgPCkQMyxTZ7IDy8SXRufE172gr8+K/JE/7hHFxHW3A= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= diff --git a/pkg/config/config.go b/pkg/config/config.go index 138c988..28807a8 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -3,8 +3,11 @@ package config import ( "fmt" logger "github.com/marcosQuesada/k8s-lab/pkg/log" + "github.com/mitchellh/mapstructure" + "github.com/pkg/errors" log "github.com/sirupsen/logrus" "github.com/spf13/cobra" + "gopkg.in/yaml.v3" "os" ) @@ -118,6 +121,20 @@ func (a *Workload) Difference(newWorkload *Workload) (included, excluded []Job) return } +func Decode(raw []byte) (*Workloads, error) { + data := make(map[interface{}]interface{}) + if err := yaml.Unmarshal(raw, &data); err != nil { + return nil, errors.Wrap(err, "unable to unmarshall") + } + + c := &Workloads{} + if err := mapstructure.Decode(data, c); err != nil { + return nil, errors.Wrap(err, "unable to decode") + } + + return c, nil +} + func toMap(set []Job) map[string]struct{} { res := map[string]struct{}{} for _, s := range set { diff --git a/pkg/operator/configmap/provider_test.go b/pkg/operator/configmap/provider_test.go index f3ab655..6c0fa46 100644 --- a/pkg/operator/configmap/provider_test.go +++ b/pkg/operator/configmap/provider_test.go @@ -23,7 +23,7 @@ func TestNewProvider_ItUpdatesConfigMapOnAssignWorkload(t *testing.T) { } spew.Dump(cm.Data) - p := NewProvider(clientset, namespace, configMapName) + p := NewProvider(clientset) w := &config.Workloads{ Version: 1, Workloads: map[string]*config.Workload{ @@ -33,7 +33,7 @@ func TestNewProvider_ItUpdatesConfigMapOnAssignWorkload(t *testing.T) { }, } - if err := p.Set(context.Background(), w); err != nil { + if err := p.Set(context.Background(), namespace, configMapName, w); err != nil { t.Fatalf("unexepcted error setting workload %v, got %v", w, err) } } @@ -52,8 +52,8 @@ func TestNewProvider_ItGetsWorkloadsFromConfigMap(t *testing.T) { } spew.Dump(cm.Data) - p := NewProvider(clientset, namespace, configMapName) - w, err := p.Get(context.Background()) + p := NewProvider(clientset) + w, err := p.Get(context.Background(), namespace, configMapName) if err != nil { t.Fatalf("unexepcted error setting workload %v, got %v", w, err) } diff --git a/pkg/operator/pod/exec.go b/pkg/operator/pod/exec.go new file mode 100644 index 0000000..46b1fb7 --- /dev/null +++ b/pkg/operator/pod/exec.go @@ -0,0 +1,54 @@ +package pod + +import ( + "bytes" + "fmt" + v1 "k8s.io/api/core/v1" + "k8s.io/client-go/kubernetes" + + "k8s.io/client-go/kubernetes/scheme" + restclient "k8s.io/client-go/rest" + "k8s.io/client-go/tools/remotecommand" +) + +// ExecCmd exec command on specific pod and wait the command's output. +func ExecCmd(client kubernetes.Interface, config *restclient.Config, namespace, name string, + command string) ([]byte, error) { + cmd := []string{ + "sh", + "-c", + command, + } + req := client.CoreV1().RESTClient().Post().Resource("pods").Name(name). + Namespace(namespace).SubResource("exec") + option := &v1.PodExecOptions{ + Command: cmd, + Stdin: false, + Stdout: true, + Stderr: true, + TTY: true, + } + req.VersionedParams( + option, + scheme.ParameterCodec, + ) + exec, err := remotecommand.NewSPDYExecutor(config, "POST", req.URL()) + if err != nil { + return nil, err + } + + outBuf := &bytes.Buffer{} + errBuf := &bytes.Buffer{} + err = exec.Stream(remotecommand.StreamOptions{ + Stdout: outBuf, + Stderr: errBuf, + }) + if err != nil { + return nil, err + } + + if errBuf.Len() != 0 { + return nil, fmt.Errorf("StdErr error %s", errBuf.String()) + } + return outBuf.Bytes(), nil +} diff --git a/pkg/operator/pod/exec_test.go b/pkg/operator/pod/exec_test.go new file mode 100644 index 0000000..c7b7308 --- /dev/null +++ b/pkg/operator/pod/exec_test.go @@ -0,0 +1,34 @@ +package pod + +import ( + "github.com/marcosQuesada/k8s-lab/pkg/operator" + "k8s.io/client-go/tools/clientcmd" + "os" + "testing" +) + +// @TODO: Think on how to test it +func TestExecCmdExample(t *testing.T) { + t.Skip() + clientSet := operator.BuildExternalClient() + kubeConfigPath := os.Getenv("HOME") + "/.kube/config" + config, err := clientcmd.BuildConfigFromFlags("", kubeConfigPath) + if err != nil { + t.Fatalf("unable to get cluster config from flags, error %v", err) + } + + namespace := "swarm" + name := "swarm-worker-0" + cmd := "cat /app/config/config.yml" + r, err := ExecCmd(clientSet, config, namespace, name, cmd) + if err != nil { + t.Fatalf("unable to execute command, error %v", err) + } + + //spew.Dump(outBuf.String(), errBuf.String()) + + if err := os.WriteFile("foo.yaml", r, 0644); err != nil { + t.Fatalf("unable to write fil, error %v", err) + } + +} diff --git a/services/swarm-pool-controller/cmd/external.go b/services/swarm-pool-controller/cmd/external.go index 6846133..9aa269f 100644 --- a/services/swarm-pool-controller/cmd/external.go +++ b/services/swarm-pool-controller/cmd/external.go @@ -13,11 +13,13 @@ import ( "github.com/marcosQuesada/k8s-lab/services/swarm-pool-controller/internal/infra/k8s/crd" "github.com/marcosQuesada/k8s-lab/services/swarm-pool-controller/internal/infra/k8s/crd/apis/swarm/v1alpha1" crdinformers "github.com/marcosQuesada/k8s-lab/services/swarm-pool-controller/internal/infra/k8s/crd/generated/informers/externalversions" + wp "github.com/marcosQuesada/k8s-lab/services/swarm-pool-controller/internal/infra/k8s/pod" statefulset "github.com/marcosQuesada/k8s-lab/services/swarm-pool-controller/internal/infra/k8s/statefulset" log "github.com/sirupsen/logrus" "github.com/spf13/cobra" "k8s.io/client-go/informers" "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/clientcmd" "net/http" "os" "os/signal" @@ -66,7 +68,15 @@ var externalCmd = &cobra.Command{ appm := app.NewManager(ex, swl) selSt := statefulset.NewSelectorStore() pr := app.NewProvider(swl, stsl, podl) - ctl := app.NewSwarmController(swarmClientSet, selSt, appm, pr, operator.NewRunner()) + + kubeConfigPath := os.Getenv("HOME") + "/.kube/config" + restConfig, err := clientcmd.BuildConfigFromFlags("", kubeConfigPath) + if err != nil { + log.Fatalf("unable to get cluster config from flags, error %v", err) + } + + wklpr := wp.NewProvider(clientSet, restConfig) + ctl := app.NewSwarmController(swarmClientSet, selSt, appm, pr, wklpr, operator.NewRunner()) go ctl.Run(ctx) crdh := crd.NewHandler(ctl) diff --git a/services/swarm-pool-controller/internal/app/controller.go b/services/swarm-pool-controller/internal/app/controller.go index d5acf5e..21e208c 100644 --- a/services/swarm-pool-controller/internal/app/controller.go +++ b/services/swarm-pool-controller/internal/app/controller.go @@ -3,10 +3,12 @@ package app import ( "context" "fmt" + "github.com/marcosQuesada/k8s-lab/pkg/config" "github.com/marcosQuesada/k8s-lab/pkg/operator" swapi "github.com/marcosQuesada/k8s-lab/services/swarm-pool-controller/internal/infra/k8s/crd/apis/swarm/v1alpha1" "github.com/marcosQuesada/k8s-lab/services/swarm-pool-controller/internal/infra/k8s/crd/generated/clientset/versioned" "github.com/marcosQuesada/k8s-lab/services/swarm-pool-controller/internal/infra/k8s/statefulset" + "github.com/pkg/errors" log "github.com/sirupsen/logrus" api "k8s.io/api/apps/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -25,34 +27,40 @@ type Provider interface { SwarmNameFromStatefulSetName(namespace, name string) (string, error) } +type WorkloadProvider interface { + Workload(ctx context.Context, namespace, name string) (*config.Workloads, error) +} + // swarmController linearize incoming commands, concurrent processing wouldn't make sense type swarmController struct { - swarmClient versioned.Interface - selectorStore statefulset.SelectorStore - manager Manager - provider Provider - runner operator.Runner + swarmClient versioned.Interface + selectorStore statefulset.SelectorStore + manager Manager + provider Provider + workloadProvider WorkloadProvider + runner operator.Runner } -func NewSwarmController(cl versioned.Interface, ss statefulset.SelectorStore, m Manager, p Provider, r operator.Runner) *swarmController { +func NewSwarmController(cl versioned.Interface, ss statefulset.SelectorStore, m Manager, p Provider, wp WorkloadProvider, r operator.Runner) *swarmController { return &swarmController{ - swarmClient: cl, - selectorStore: ss, - manager: m, - provider: p, - runner: r, + swarmClient: cl, + selectorStore: ss, + manager: m, + provider: p, + workloadProvider: wp, + runner: r, } } // Create swarm entry happens on swarm creation func (c *swarmController) Create(ctx context.Context, namespace, name string) error { - c.runner.Process(newProcessSwarm(namespace, name)) + c.runner.Process(newCreateSwarm(namespace, name)) return nil } -// Update swarm entry happens on swarm update +// Update swarm entry happens on swarm process func (c *swarmController) Update(ctx context.Context, namespace, name string) error { - c.runner.Process(newProcessSwarm(namespace, name)) // @TODO: Improve this! + c.runner.Process(newUpdateSwarm(namespace, name)) return nil } @@ -75,23 +83,51 @@ func (c *swarmController) Run(ctx context.Context) { func (c *swarmController) handle(ctx context.Context, e interface{}) error { ev := e.(Event) switch e := ev.(type) { - case processSwarm: - return c.process(ctx, e.namespace, e.name) - case updateSwarmSize: - return c.updatePool(ctx, e.namespace, e.name, e.size) + case createSwarm: + n, err := c.process(ctx, e.namespace, e.name) + if err != nil { + return errors.Wrap(err, "cannot process") + } + if err := c.checkWorkerVersion(ctx, e.namespace, n); err != nil { + return errors.Wrap(err, "unable to check version") + } + return err + case updateSwarm: + n, err := c.process(ctx, e.namespace, e.name) // @TODO: Temporal + if err != nil { + return errors.Wrap(err, "cannot process") + } + if err := c.checkWorkerVersion(ctx, e.namespace, n); err != nil { + return errors.Wrap(err, "unable to check version") + } + return err case deleteSwarm: return c.delete(ctx, e.namespace, e.name) + case updateSwarmSize: + return c.updatePool(ctx, e.namespace, e.name, e.size) } return fmt.Errorf("action %T not handled", ev) } -// process happens on swarm create/update event -func (c *swarmController) process(ctx context.Context, namespace, name string) error { +func (c *swarmController) checkWorkerVersion(ctx context.Context, namespace string, names []string) error { + for _, s := range names { + w, err := c.workloadProvider.Workload(ctx, namespace, s) + if err != nil { + log.Errorf("unable to get workload, error %v", err) + continue + } + + log.Infof("Worker pod %s on version %d", s, w.Version) + } + return nil +} + +func (c *swarmController) process(ctx context.Context, namespace, name string) ([]string, error) { log.Infof("Create swarm %s %s", namespace, name) sw, err := c.provider.Swarm(namespace, name) if err != nil { - return err + return nil, err } log.Infof("Processing swarm %s namespace %s statefulset name %s configmap name %s version %d total workloads %d", @@ -99,24 +135,30 @@ func (c *swarmController) process(ctx context.Context, namespace, name string) e sts, err := c.provider.StatefulSet(sw.Namespace, sw.Spec.StatefulSetName) if err != nil { - return fmt.Errorf("unable to get statefulset from swarm %s on namespace %s error %v", name, namespace, err) + return nil, errors.Wrap(err, fmt.Sprintf("unable to get statefulset from swarm %s on namespace %s", name, namespace)) } // idempotent call if err := c.selectorStore.Register(namespace, sts.Name, sts.Spec.Selector); err != nil { - return fmt.Errorf("unable to register key %s %s error %v", namespace, sts.Name, err) + return nil, fmt.Errorf("unable to register key %s %s error %v", namespace, sts.Name, err) } names, err := c.provider.PodNamesFromSelector(namespace, sts.Spec.Selector) if err != nil { - return fmt.Errorf("unable to get pods from selector, error %v", err) + return nil, fmt.Errorf("unable to get pods from selector, error %v", err) } log.Infof("Controller found size %d worker pods %s", len(names), names) + if len(names) != int(*sts.Spec.Replicas) { + log.Errorf("unexpected statefulset pods, size %s names %v", int(*sts.Spec.Replicas), names) + } c.manager.Process(ctx, namespace, name, sw.Spec.Version, sw.Spec.Workload) - return c.updatePool(ctx, namespace, sts.Name, int(*sts.Spec.Replicas)) + if err := c.updatePool(ctx, namespace, sts.Name, int(*sts.Spec.Replicas)); err != nil { + return nil, errors.Wrap(err, "cannot update pool") + } + return names, nil } func (c *swarmController) updatePool(ctx context.Context, namespace, name string, size int) error { @@ -129,12 +171,12 @@ func (c *swarmController) updatePool(ctx context.Context, namespace, name string version, err := c.manager.UpdateSize(ctx, namespace, swarmName, size) if err != nil { - return fmt.Errorf("unable to update swarm %s size error %v", swarmName, err) + return fmt.Errorf("unable to process swarm %s size error %v", swarmName, err) } _, err = c.updateSwarm(ctx, namespace, swarmName, version, size) if err != nil { - return fmt.Errorf("unable to update swarm %s error %v", name, err) + return fmt.Errorf("unable to process swarm %s error %v", name, err) } return nil } @@ -160,7 +202,7 @@ func (c *swarmController) updateSwarm(ctx context.Context, namespace, name strin updatedSwarm.Spec.Version = version swu, err := c.swarmClient.K8slabV1alpha1().Swarms(namespace).Update(ctx, updatedSwarm, metav1.UpdateOptions{}) if err != nil { - return nil, fmt.Errorf("unable to update swarm %s error %v", sw.Name, err) + return nil, fmt.Errorf("unable to process swarm %s error %v", sw.Name, err) } return swu, nil diff --git a/services/swarm-pool-controller/internal/app/event.go b/services/swarm-pool-controller/internal/app/event.go index a631d69..75a9355 100644 --- a/services/swarm-pool-controller/internal/app/event.go +++ b/services/swarm-pool-controller/internal/app/event.go @@ -2,25 +2,39 @@ package app type action string -const processSwarmAction = action("processSwarmAction") +const createSwarmAction = action("createSwarmAction") const updateSwarmAction = action("updateSwarmAction") +const updateSwarmSizeAction = action("updateSwarmSizeAction") const deleteSwarmAction = action("deleteSwarmAction") type Event interface { Type() action } -type processSwarm struct { +type createSwarm struct { namespace string name string } -func newProcessSwarm(namespace, name string) processSwarm { - return processSwarm{namespace: namespace, name: name} +func newCreateSwarm(namespace, name string) createSwarm { + return createSwarm{namespace: namespace, name: name} } -func (e processSwarm) Type() action { - return processSwarmAction +func (e createSwarm) Type() action { + return createSwarmAction +} + +type updateSwarm struct { + namespace string + name string +} + +func newUpdateSwarm(namespace, name string) updateSwarm { + return updateSwarm{namespace: namespace, name: name} +} + +func (e updateSwarm) Type() action { + return updateSwarmAction } type updateSwarmSize struct { @@ -34,7 +48,7 @@ func newUpdateSwarmSize(namespace, name string, size int) updateSwarmSize { } func (e updateSwarmSize) Type() action { - return updateSwarmAction + return updateSwarmSizeAction } type deleteSwarm struct { diff --git a/services/swarm-pool-controller/internal/app/pool_test.go b/services/swarm-pool-controller/internal/app/pool_test.go index 842482d..8a3675e 100644 --- a/services/swarm-pool-controller/internal/app/pool_test.go +++ b/services/swarm-pool-controller/internal/app/pool_test.go @@ -18,7 +18,7 @@ func TestOnWorkerPoolUpdateStateGetsBalancedAndVersionUpdated(t *testing.T) { size := 2 v, err := p.UpdateSize(context.Background(), size) if err != nil { - t.Fatalf("unable to update size, error %v", err) + t.Fatalf("unable to process size, error %v", err) } newVersion := 2 diff --git a/services/swarm-pool-controller/internal/infra/k8s/pod/provider.go b/services/swarm-pool-controller/internal/infra/k8s/pod/provider.go new file mode 100644 index 0000000..24f42a8 --- /dev/null +++ b/services/swarm-pool-controller/internal/infra/k8s/pod/provider.go @@ -0,0 +1,53 @@ +package pod + +import ( + "context" + "fmt" + "github.com/marcosQuesada/k8s-lab/pkg/config" + "github.com/marcosQuesada/k8s-lab/pkg/operator/pod" + "github.com/pkg/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + restclient "k8s.io/client-go/rest" +) + +const catConfigCommand = "cat /app/config/config.yml" + +// Provider deletes pod to force pod fresh recreation +type Provider struct { + client kubernetes.Interface + config *restclient.Config +} + +// NewProvider instantiates pod refresher provider +func NewProvider(cl kubernetes.Interface, config *restclient.Config) *Provider { + return &Provider{ + client: cl, + config: config, + } +} + +// Refresh deletes pod to force restart on the latest version +func (p *Provider) Refresh(ctx context.Context, namespace, name string) error { + err := p.client.CoreV1().Pods(namespace).Delete(ctx, name, metav1.DeleteOptions{}) + if err != nil { + return fmt.Errorf("unable to delete pod %s error %v", name, err) + } + + return nil +} + +// Workload gets pod config workload +func (p *Provider) Workload(ctx context.Context, namespace, name string) (*config.Workloads, error) { + r, err := pod.ExecCmd(p.client, p.config, namespace, name, catConfigCommand) + if err != nil { + return nil, errors.Wrap(err, "unable to execute command") + } + + w, err := config.Decode(r) + if err != nil { + return nil, errors.Wrap(err, "unable to decode") + } + + return w, nil +} diff --git a/services/swarm-pool-controller/internal/infra/k8s/pod/provider_test.go b/services/swarm-pool-controller/internal/infra/k8s/pod/provider_test.go new file mode 100644 index 0000000..8cc0d96 --- /dev/null +++ b/services/swarm-pool-controller/internal/infra/k8s/pod/provider_test.go @@ -0,0 +1,29 @@ +package pod + +import ( + "context" + "github.com/davecgh/go-spew/spew" + "github.com/marcosQuesada/k8s-lab/pkg/operator" + "k8s.io/client-go/tools/clientcmd" + "os" + "testing" +) + +func TestItGetsWorkloadConfigFromPodUsingExec(t *testing.T) { + clientSet := operator.BuildExternalClient() + kubeConfigPath := os.Getenv("HOME") + "/.kube/config" + restConfig, err := clientcmd.BuildConfigFromFlags("", kubeConfigPath) + if err != nil { + t.Fatalf("unable to get cluster config from flags, error %v", err) + } + + p := NewProvider(clientSet, restConfig) + namespace := "swarm" + name := "swarm-worker-0" + w, err := p.Workload(context.Background(), namespace, name) + if err != nil { + t.Fatalf("unable to decode workload, error %v", err) + } + + spew.Dump(w) +} From 14853da0b625823353a03e329b9e6d6d557196e9 Mon Sep 17 00:00:00 2001 From: marcosQuesada <marcos.quesadas@gmail.com> Date: Fri, 29 Apr 2022 18:23:31 +0200 Subject: [PATCH 2/2] consolidate refactor and cleanout --- pkg/operator/controller_test.go | 26 ++++++++++++++++--- pkg/operator/resource_event_handler_test.go | 18 ++++++++----- pkg/operator/runner_test.go | 4 +-- services/fake-worker/k8s/statefulset.yaml | 5 ++-- .../internal/app/controller.go | 2 +- 5 files changed, 40 insertions(+), 15 deletions(-) diff --git a/pkg/operator/controller_test.go b/pkg/operator/controller_test.go index a132e7a..2e3d850 100644 --- a/pkg/operator/controller_test.go +++ b/pkg/operator/controller_test.go @@ -23,7 +23,8 @@ func TestController_ItGetsCreatedOnListeningPodsWithPodAddition(t *testing.T) { cl := fake.NewSimpleClientset(p) i := informers.NewSharedInformerFactory(cl, 0) pi := i.Core().V1().Pods() - ctl := New(eh, pi.Informer(), "Pod") + fr := &fakeRunner{} + ctl := New(eh, pi.Informer(), fr, "Pod") ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -89,7 +90,8 @@ func TestController_ItGetsDeletedOnListeningPodsWithPodAdditionWithoutBeingPrelo cl := fake.NewSimpleClientset() i := informers.NewSharedInformerFactory(cl, 0) pi := i.Core().V1().Pods() - ctl := New(eh, pi.Informer(), "Pod") + fr := &fakeRunner{} + ctl := New(eh, pi.Informer(), fr, "Pod") ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -121,15 +123,21 @@ type fakeHandler struct { totalDeleted int32 } -func (f *fakeHandler) Handle(ctx context.Context, o runtime.Object) error { +func (f *fakeHandler) Create(ctx context.Context, o runtime.Object) error { atomic.AddInt32(&f.totalCreated, 1) return nil } -func (f *fakeHandler) Delete(ctx context.Context, namespace, name string) error { +func (f *fakeHandler) Update(ctx context.Context, o, n runtime.Object) error { + atomic.AddInt32(&f.totalCreated, 1) + return nil +} + +func (f *fakeHandler) Delete(ctx context.Context, o runtime.Object) error { atomic.AddInt32(&f.totalDeleted, 1) return nil } + func (f *fakeHandler) created() int32 { return atomic.LoadInt32(&f.totalCreated) } @@ -160,3 +168,13 @@ func getFakePod(namespace, name string) *apiv1.Pod { }, } } + +type fakeRunner struct{} + +func (f fakeRunner) Process(e interface{}) { + +} + +func (f fakeRunner) Run(ctx context.Context, h func(context.Context, interface{}) error) { + +} diff --git a/pkg/operator/resource_event_handler_test.go b/pkg/operator/resource_event_handler_test.go index a67a05c..844d0ef 100644 --- a/pkg/operator/resource_event_handler_test.go +++ b/pkg/operator/resource_event_handler_test.go @@ -13,9 +13,9 @@ func TestResourceEventHandler_AddPodIncludesItemOnQueue(t *testing.T) { name := "swarm-worker-0" namespace := "swarm" q := newFakeQueue() - reh := NewResourceEventHandler(q) + reh := NewResourceEventHandler() p := getFakePod(namespace, name) - reh.Add(&p) + reh.Create(&p) if expected, got := 1, q.Len(); expected != got { t.Fatalf("unexpected queue size, expected %d got %d", expected, got) @@ -38,9 +38,12 @@ func TestResourceEventHandler_UpdatePodIncludesItemOnQueue(t *testing.T) { name := "swarm-worker-0" namespace := "swarm" q := newFakeQueue() - reh := NewResourceEventHandler(q) + reh := NewResourceEventHandler() p := getFakePod(namespace, name) - reh.Update(&p, &p) + _, err := reh.Update(&p, &p) // @TODO: check event + if err != nil { + t.Fatalf("unexpected error updating resource %v", err) + } if expected, got := 1, q.Len(); expected != got { t.Fatalf("unexpected queue size, expected %d got %d", expected, got) @@ -63,9 +66,12 @@ func TestResourceEventHandler_DeletePodIncludesItemOnQueue(t *testing.T) { name := "swarm-worker-0" namespace := "swarm" q := newFakeQueue() - reh := NewResourceEventHandler(q) + reh := NewResourceEventHandler() p := getFakePod(namespace, name) - reh.Delete(&p) + _, err := reh.Delete(&p) // @TODO: validate event + if err != nil { + t.Fatalf("unexpected error deleting resource %v", err) + } if expected, got := 1, q.Len(); expected != got { t.Fatalf("unexpected queue size, expected %d got %d", expected, got) diff --git a/pkg/operator/runner_test.go b/pkg/operator/runner_test.go index f8ffff8..2926b3c 100644 --- a/pkg/operator/runner_test.go +++ b/pkg/operator/runner_test.go @@ -14,7 +14,7 @@ func TestItConsumesProducedEntriesWithSuccess(t *testing.T) { atomic.AddInt32(&totalCalls, 1) return nil } - r := NewRunner() + r := NewRunner().(*runner) r.workerFrequency = time.Millisecond * 50 ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*100) defer cancel() @@ -34,7 +34,7 @@ func TestItRetriesConsumedEntriesOnHandlingErrorUntilMaxRetries(t *testing.T) { atomic.AddInt32(&totalCalls, 1) return errors.New("foo error") } - r := NewRunner() + r := NewRunner().(*runner) r.workerFrequency = time.Millisecond * 50 ctx, cancel := context.WithCancel(context.Background()) defer cancel() diff --git a/services/fake-worker/k8s/statefulset.yaml b/services/fake-worker/k8s/statefulset.yaml index 2b2deb7..651667e 100644 --- a/services/fake-worker/k8s/statefulset.yaml +++ b/services/fake-worker/k8s/statefulset.yaml @@ -9,7 +9,7 @@ spec: selector: matchLabels: app: swarm-worker - serviceName: swarm-worker + serviceName: swarm-worker-headless replicas: 2 template: metadata: @@ -61,6 +61,7 @@ apiVersion: v1 kind: Service metadata: name: swarm-worker-headless + namespace: swarm labels: app: swarm-worker spec: @@ -69,4 +70,4 @@ spec: name: http clusterIP: None selector: - app: swarm-worker \ No newline at end of file + app: swarm-worker diff --git a/services/swarm-pool-controller/internal/app/controller.go b/services/swarm-pool-controller/internal/app/controller.go index 21e208c..55652da 100644 --- a/services/swarm-pool-controller/internal/app/controller.go +++ b/services/swarm-pool-controller/internal/app/controller.go @@ -88,7 +88,7 @@ func (c *swarmController) handle(ctx context.Context, e interface{}) error { if err != nil { return errors.Wrap(err, "cannot process") } - if err := c.checkWorkerVersion(ctx, e.namespace, n); err != nil { + if err := c.checkWorkerVersion(ctx, e.namespace, n); err != nil { // @TODO: Move them to conciliation loop return errors.Wrap(err, "unable to check version") } return err