From 2f8ff942e86c13653e0a570cc6c6a82593643451 Mon Sep 17 00:00:00 2001 From: Valentyna Bukhalova Date: Fri, 25 Oct 2024 15:23:31 +0200 Subject: [PATCH] set minimum pod worker size to 1 (#153) * set minimum workers to 1 for pod batch tasks --- internal/actions/approve_csr_handler.go | 2 +- internal/actions/chart_rollback_handler.go | 9 ++++---- internal/actions/chart_uninstall_handler.go | 7 +++---- internal/actions/chart_upsert_handler.go | 6 +++--- internal/actions/check_node_deleted.go | 8 ++++--- internal/actions/check_node_status.go | 7 +++---- internal/actions/check_node_status_test.go | 21 ++++++++++--------- internal/actions/create_event_handler.go | 2 +- internal/actions/create_handler.go | 9 ++++---- internal/actions/create_handler_test.go | 6 +++--- internal/actions/csr/csr.go | 4 +++- internal/actions/csr/svc.go | 4 +++- internal/actions/delete_node_handler.go | 2 +- internal/actions/drain_node_handler.go | 2 +- internal/actions/kubernetes_helpers.go | 2 +- internal/actions/patch_node_handler.go | 10 +++++---- .../actions/send_aks_init_data_handler.go | 7 +++++-- internal/actions/types.go | 2 +- internal/castai/types.go | 8 ++++--- 19 files changed, 64 insertions(+), 54 deletions(-) diff --git a/internal/actions/approve_csr_handler.go b/internal/actions/approve_csr_handler.go index 28b32bc..7fc7098 100644 --- a/internal/actions/approve_csr_handler.go +++ b/internal/actions/approve_csr_handler.go @@ -41,7 +41,7 @@ type ApproveCSRHandler struct { func (h *ApproveCSRHandler) Handle(ctx context.Context, action *castai.ClusterAction) error { req, ok := action.Data().(*castai.ActionApproveCSR) if !ok { - return fmt.Errorf("unexpected type %T for approve csr handler", action.Data()) + return newUnexpectedTypeErr(action.Data(), req) } log := h.log.WithFields(logrus.Fields{ "node_name": req.NodeName, diff --git a/internal/actions/chart_rollback_handler.go b/internal/actions/chart_rollback_handler.go index 8ef36c2..e7df695 100644 --- a/internal/actions/chart_rollback_handler.go +++ b/internal/actions/chart_rollback_handler.go @@ -2,7 +2,6 @@ package actions import ( "context" - "errors" "fmt" "github.com/sirupsen/logrus" @@ -30,7 +29,7 @@ type ChartRollbackHandler struct { func (c *ChartRollbackHandler) Handle(_ context.Context, action *castai.ClusterAction) error { req, ok := action.Data().(*castai.ActionChartRollback) if !ok { - return fmt.Errorf("unexpected type %T for chart rollback handler", action.Data()) + return newUnexpectedTypeErr(action.Data(), req) } if err := c.validateRequest(req); err != nil { @@ -50,13 +49,13 @@ func (c *ChartRollbackHandler) Handle(_ context.Context, action *castai.ClusterA func (c *ChartRollbackHandler) validateRequest(req *castai.ActionChartRollback) error { if req.ReleaseName == "" { - return errors.New("bad request: releaseName not provided") + return fmt.Errorf("release name not provided %w", errAction) } if req.Namespace == "" { - return errors.New("bad request: namespace not provided") + return fmt.Errorf("namespace not provided %w", errAction) } if req.Version == "" { - return errors.New("bad request: version not provided") + return fmt.Errorf("version not provided %w", errAction) } return nil } diff --git a/internal/actions/chart_uninstall_handler.go b/internal/actions/chart_uninstall_handler.go index 97cd36c..e1c7c1a 100644 --- a/internal/actions/chart_uninstall_handler.go +++ b/internal/actions/chart_uninstall_handler.go @@ -2,7 +2,6 @@ package actions import ( "context" - "errors" "fmt" "github.com/sirupsen/logrus" @@ -28,7 +27,7 @@ type ChartUninstallHandler struct { func (c *ChartUninstallHandler) Handle(_ context.Context, action *castai.ClusterAction) error { req, ok := action.Data().(*castai.ActionChartUninstall) if !ok { - return fmt.Errorf("unexpected type %T for upsert uninstall handler", action.Data()) + return newUnexpectedTypeErr(action.Data(), req) } if err := c.validateRequest(req); err != nil { @@ -43,10 +42,10 @@ func (c *ChartUninstallHandler) Handle(_ context.Context, action *castai.Cluster func (c *ChartUninstallHandler) validateRequest(req *castai.ActionChartUninstall) error { if req.ReleaseName == "" { - return errors.New("bad request: releaseName not provided") + return fmt.Errorf("release name not provided %w", errAction) } if req.Namespace == "" { - return errors.New("bad request: namespace not provided") + return fmt.Errorf("namespace not provided %w", errAction) } return nil } diff --git a/internal/actions/chart_upsert_handler.go b/internal/actions/chart_upsert_handler.go index b68716c..99bd389 100644 --- a/internal/actions/chart_upsert_handler.go +++ b/internal/actions/chart_upsert_handler.go @@ -30,7 +30,7 @@ type ChartUpsertHandler struct { func (c *ChartUpsertHandler) Handle(ctx context.Context, action *castai.ClusterAction) error { req, ok := action.Data().(*castai.ActionChartUpsert) if !ok { - return fmt.Errorf("unexpected type %T for upsert chart handler", action.Data()) + return newUnexpectedTypeErr(action.Data(), req) } if err := c.validateRequest(req); err != nil { @@ -79,10 +79,10 @@ func (c *ChartUpsertHandler) Handle(ctx context.Context, action *castai.ClusterA func (c *ChartUpsertHandler) validateRequest(req *castai.ActionChartUpsert) error { if req.ReleaseName == "" { - return errors.New("bad request: releaseName not provided") + return fmt.Errorf("release name not provided %w", errAction) } if req.Namespace == "" { - return errors.New("bad request: namespace not provided") + return fmt.Errorf("namespace not provided %w", errAction) } if err := req.ChartSource.Validate(); err != nil { return fmt.Errorf("validating chart source: %w", err) diff --git a/internal/actions/check_node_deleted.go b/internal/actions/check_node_deleted.go index 229792e..81fd1f8 100644 --- a/internal/actions/check_node_deleted.go +++ b/internal/actions/check_node_deleted.go @@ -40,10 +40,12 @@ type CheckNodeDeletedHandler struct { cfg checkNodeDeletedConfig } +var errNodeNotDeleted = errors.New("node is not deleted") + func (h *CheckNodeDeletedHandler) Handle(ctx context.Context, action *castai.ClusterAction) error { req, ok := action.Data().(*castai.ActionCheckNodeDeleted) if !ok { - return fmt.Errorf("unexpected type %T for check node deleted handler", action.Data()) + return newUnexpectedTypeErr(action.Data(), req) } log := h.log.WithFields(logrus.Fields{ @@ -80,12 +82,12 @@ func (h *CheckNodeDeletedHandler) Handle(ctx context.Context, action *castai.Clu return false, nil } if currentNodeID == req.NodeID { - return false, errors.New("node is not deleted") + return false, fmt.Errorf("current node id = request node ID %w", errNodeNotDeleted) } } if n != nil { - return false, errors.New("node is not deleted") + return false, errNodeNotDeleted } return true, err diff --git a/internal/actions/check_node_status.go b/internal/actions/check_node_status.go index 849a9b8..044ac4f 100644 --- a/internal/actions/check_node_status.go +++ b/internal/actions/check_node_status.go @@ -2,7 +2,6 @@ package actions import ( "context" - "errors" "fmt" "reflect" "time" @@ -34,7 +33,7 @@ type CheckNodeStatusHandler struct { func (h *CheckNodeStatusHandler) Handle(ctx context.Context, action *castai.ClusterAction) error { req, ok := action.Data().(*castai.ActionCheckNodeStatus) if !ok { - return fmt.Errorf("unexpected type %T for check node status handler", action.Data()) + return newUnexpectedTypeErr(action.Data(), req) } log := h.log.WithFields(logrus.Fields{ @@ -96,12 +95,12 @@ func (h *CheckNodeStatusHandler) checkNodeDeleted(ctx context.Context, log *logr return false, nil } if currentNodeID == req.NodeID { - return false, errors.New("node is not deleted") + return false, fmt.Errorf("current node id is equal to requested node id: %v %w", req.NodeID, errNodeNotDeleted) } } if n != nil { - return false, errors.New("node is not deleted") + return false, errNodeNotDeleted } return true, err diff --git a/internal/actions/check_node_status_test.go b/internal/actions/check_node_status_test.go index 09b56dd..643ee66 100644 --- a/internal/actions/check_node_status_test.go +++ b/internal/actions/check_node_status_test.go @@ -2,6 +2,7 @@ package actions import ( "context" + "errors" "sync" "testing" "time" @@ -33,7 +34,7 @@ func TestCheckStatus_Deleted(t *testing.T) { }, }, } - clientset := fake.NewSimpleClientset(node) + clientset := fake.NewClientset(node) h := CheckNodeStatusHandler{ log: log, @@ -50,7 +51,7 @@ func TestCheckStatus_Deleted(t *testing.T) { } err := h.Handle(context.Background(), action) - r.EqualError(err, "node is not deleted") + r.True(errors.Is(err, errNodeNotDeleted)) }) t.Run("return error when node is not deleted with no label (backwards compatibility)", func(t *testing.T) { @@ -61,7 +62,7 @@ func TestCheckStatus_Deleted(t *testing.T) { Name: nodeName, }, } - clientset := fake.NewSimpleClientset(node) + clientset := fake.NewClientset(node) h := CheckNodeStatusHandler{ log: log, @@ -83,7 +84,7 @@ func TestCheckStatus_Deleted(t *testing.T) { t.Run("handle check successfully when node is not found", func(t *testing.T) { r := require.New(t) - clientset := fake.NewSimpleClientset() + clientset := fake.NewClientset() h := CheckNodeStatusHandler{ log: log, @@ -113,7 +114,7 @@ func TestCheckStatus_Deleted(t *testing.T) { }, }, } - clientset := fake.NewSimpleClientset(node) + clientset := fake.NewClientset(node) h := CheckNodeStatusHandler{ log: log, @@ -140,7 +141,7 @@ func TestCheckStatus_Ready(t *testing.T) { t.Run("return error when node is not found", func(t *testing.T) { r := require.New(t) - clientset := fake.NewSimpleClientset() + clientset := fake.NewClientset() h := CheckNodeStatusHandler{ log: log, @@ -185,7 +186,7 @@ func TestCheckStatus_Ready(t *testing.T) { }, }, } - clientset := fake.NewSimpleClientset(node) + clientset := fake.NewClientset(node) h := CheckNodeStatusHandler{ log: log, @@ -240,7 +241,7 @@ func TestCheckStatus_Ready(t *testing.T) { Taints: []v1.Taint{taintCloudProviderUninitialized}, }, } - clientset := fake.NewSimpleClientset(node) + clientset := fake.NewClientset(node) h := CheckNodeStatusHandler{ log: log, @@ -287,7 +288,7 @@ func TestCheckStatus_Ready(t *testing.T) { Conditions: []v1.NodeCondition{}, }, } - clientset := fake.NewSimpleClientset(node) + clientset := fake.NewClientset(node) watcher := watch.NewFake() clientset.PrependWatchReactor("nodes", k8stest.DefaultWatchReactor(watcher, nil)) @@ -333,7 +334,7 @@ func TestCheckStatus_Ready(t *testing.T) { }, }, } - clientset := fake.NewSimpleClientset(node) + clientset := fake.NewClientset(node) h := CheckNodeStatusHandler{ log: log, diff --git a/internal/actions/create_event_handler.go b/internal/actions/create_event_handler.go index a7269db..3788c66 100644 --- a/internal/actions/create_event_handler.go +++ b/internal/actions/create_event_handler.go @@ -49,7 +49,7 @@ type CreateEventHandler struct { func (h *CreateEventHandler) Handle(ctx context.Context, action *castai.ClusterAction) error { req, ok := action.Data().(*castai.ActionCreateEvent) if !ok { - return fmt.Errorf("unexpected type %T for create event handler", action.Data()) + return newUnexpectedTypeErr(action.Data(), req) } namespace := req.ObjectRef.Namespace if namespace == "" { diff --git a/internal/actions/create_handler.go b/internal/actions/create_handler.go index 0551f01..f6d0cb1 100644 --- a/internal/actions/create_handler.go +++ b/internal/actions/create_handler.go @@ -2,7 +2,6 @@ package actions import ( "context" - "errors" "fmt" "reflect" @@ -39,12 +38,12 @@ func (h *CreateHandler) Handle(ctx context.Context, action *castai.ClusterAction } if req.Object == nil { - return errors.New("no object provided") + return fmt.Errorf("object not provided %w", errAction) } newObj := &unstructured.Unstructured{Object: req.Object} if newObj.GetNamespace() == "" { - return errors.New("object namespace is missing") + return fmt.Errorf("namespace not provided %w", errAction) } log := h.log.WithFields(logrus.Fields{ @@ -87,12 +86,12 @@ func (h *CreateHandler) Handle(ctx context.Context, action *castai.ClusterAction original, err := obj.MarshalJSON() if err != nil { - return err + return fmt.Errorf("marshaling original resource: %w", err) } modified, err := newObj.MarshalJSON() if err != nil { - return err + return fmt.Errorf("marshaling modified resource: %w", err) } patch, err := jsonpatch.CreateMergePatch(original, modified) diff --git a/internal/actions/create_handler_test.go b/internal/actions/create_handler_test.go index dfa3f40..b97eb29 100644 --- a/internal/actions/create_handler_test.go +++ b/internal/actions/create_handler_test.go @@ -36,7 +36,7 @@ func Test_newCreateHandler(t *testing.T) { action: &castai.ClusterAction{ ActionDeleteNode: &castai.ActionDeleteNode{}, }, - err: newUnexpectedTypeErr(&castai.ActionDeleteNode{}, &castai.ActionCreate{}), + err: errAction, }, "should return error when object is not provided": { action: &castai.ClusterAction{ @@ -44,7 +44,7 @@ func Test_newCreateHandler(t *testing.T) { GroupVersionResource: castai.GroupVersionResource{}, }, }, - err: errors.New("no object provided"), + err: errAction, }, "should create new deployment": { action: &castai.ClusterAction{ @@ -129,7 +129,7 @@ func Test_newCreateHandler(t *testing.T) { err := handler.Handle(ctx, test.action) if test.err != nil { r.Error(err) - r.Equal(test.err, err) + r.True(errors.Is(err, test.err), "expected error %v, got %v", test.err, err) return } diff --git a/internal/actions/csr/csr.go b/internal/actions/csr/csr.go index 3e73f9c..e05e7d2 100644 --- a/internal/actions/csr/csr.go +++ b/internal/actions/csr/csr.go @@ -39,9 +39,11 @@ type Certificate struct { RequestingUser string } +var errCSRNotFound = errors.New("v1 or v1beta csr should be set") + func (c *Certificate) Validate() error { if c.v1 == nil && c.v1Beta1 == nil { - return errors.New("v1 or v1beta csr should be set") + return errCSRNotFound } return nil } diff --git a/internal/actions/csr/svc.go b/internal/actions/csr/svc.go index bf6e611..bb13609 100644 --- a/internal/actions/csr/svc.go +++ b/internal/actions/csr/svc.go @@ -61,6 +61,8 @@ func (h *ApprovalManager) handleWithRetry(ctx context.Context, log *logrus.Entry ) } +var errCSRNotApproved = errors.New("certificate signing request was not approved") + func (h *ApprovalManager) handle(ctx context.Context, log logrus.FieldLogger, cert *Certificate) (reterr error) { if cert.Approved() { return nil @@ -94,7 +96,7 @@ func (h *ApprovalManager) handle(ctx context.Context, log logrus.FieldLogger, ce } } - return errors.New("certificate signing request was not approved") + return errCSRNotApproved } func (h *ApprovalManager) runAutoApproveForCastAINodes(ctx context.Context) { diff --git a/internal/actions/delete_node_handler.go b/internal/actions/delete_node_handler.go index 5f143ad..b0954ae 100644 --- a/internal/actions/delete_node_handler.go +++ b/internal/actions/delete_node_handler.go @@ -54,7 +54,7 @@ type DeleteNodeHandler struct { func (h *DeleteNodeHandler) Handle(ctx context.Context, action *castai.ClusterAction) error { req, ok := action.Data().(*castai.ActionDeleteNode) if !ok { - return fmt.Errorf("unexpected type %T for delete node handler", action.Data()) + return newUnexpectedTypeErr(action.Data(), req) } log := h.log.WithFields(logrus.Fields{ diff --git a/internal/actions/drain_node_handler.go b/internal/actions/drain_node_handler.go index ec5d09a..0b621e0 100644 --- a/internal/actions/drain_node_handler.go +++ b/internal/actions/drain_node_handler.go @@ -76,7 +76,7 @@ type DrainNodeHandler struct { func (h *DrainNodeHandler) Handle(ctx context.Context, action *castai.ClusterAction) error { req, ok := action.Data().(*castai.ActionDrainNode) if !ok { - return fmt.Errorf("unexpected type %T for drain handler", action.Data()) + return newUnexpectedTypeErr(action.Data(), req) } drainTimeout := h.getDrainTimeout(action) diff --git a/internal/actions/kubernetes_helpers.go b/internal/actions/kubernetes_helpers.go index bc47107..c75bb06 100644 --- a/internal/actions/kubernetes_helpers.go +++ b/internal/actions/kubernetes_helpers.go @@ -140,7 +140,7 @@ func executeBatchPodActions( } var ( - parallelTasks = int(lo.Clamp(float64(len(pods)), 30, 100)) + parallelTasks = lo.Clamp(len(pods), 1, 50) taskChan = make(chan v1.Pod, len(pods)) successfulPodsChan = make(chan *v1.Pod, len(pods)) failedPodsChan = make(chan podActionFailure, len(pods)) diff --git a/internal/actions/patch_node_handler.go b/internal/actions/patch_node_handler.go index 7e840be..b616bf7 100644 --- a/internal/actions/patch_node_handler.go +++ b/internal/actions/patch_node_handler.go @@ -30,24 +30,26 @@ type PatchNodeHandler struct { clientset kubernetes.Interface } +var errAction = errors.New("not valid action") + func (h *PatchNodeHandler) Handle(ctx context.Context, action *castai.ClusterAction) error { req, ok := action.Data().(*castai.ActionPatchNode) if !ok { - return fmt.Errorf("unexpected type %T for delete patch handler", action.Data()) + return newUnexpectedTypeErr(action.Data(), req) } for k := range req.Labels { if k == "" { - return errors.New("labels contain entry with empty key") + return fmt.Errorf("labels contain entry with empty key %w", errAction) } } for k := range req.Annotations { if k == "" { - return errors.New("annotations contain entry with empty key") + return fmt.Errorf("annotations contain entry with empty key %w", errAction) } } for _, t := range req.Taints { if t.Key == "" { - return errors.New("taint contain entry with empty key") + return fmt.Errorf("taints contain entry with empty key %w", errAction) } } diff --git a/internal/actions/send_aks_init_data_handler.go b/internal/actions/send_aks_init_data_handler.go index ba81e44..280c91a 100644 --- a/internal/actions/send_aks_init_data_handler.go +++ b/internal/actions/send_aks_init_data_handler.go @@ -64,7 +64,10 @@ func (s *SendAKSInitDataHandler) Handle(ctx context.Context, _ *castai.ClusterAc }) } -var customDataRegex = regexp.MustCompile(`(.*?)<\/ns1:CustomData>`) +var ( + customDataRegex = regexp.MustCompile(`(.*?)<\/ns1:CustomData>`) + errNoXML = errors.New("no custom data xml tag found") +) // readCloudConfigBase64 extracts base64 encoded cloud config content from XML file. func (s *SendAKSInitDataHandler) readCloudConfigBase64(cloudConfigPath string) ([]byte, error) { @@ -74,7 +77,7 @@ func (s *SendAKSInitDataHandler) readCloudConfigBase64(cloudConfigPath string) ( } matches := customDataRegex.FindSubmatch(xmlContent) if len(matches) < 2 { - return nil, errors.New("no custom data xml tag found") + return nil, errNoXML } return matches[1], nil } diff --git a/internal/actions/types.go b/internal/actions/types.go index 2274bfd..a3a3487 100644 --- a/internal/actions/types.go +++ b/internal/actions/types.go @@ -17,7 +17,7 @@ const ( ) func newUnexpectedTypeErr(value, expectedType interface{}) error { - return fmt.Errorf("unexpected type %T, expected %T", value, expectedType) + return fmt.Errorf("unexpected type %T, expected %T %w", value, expectedType, errAction) } type ActionHandler interface { diff --git a/internal/castai/types.go b/internal/castai/types.go index 08ae5d8..87fa940 100644 --- a/internal/castai/types.go +++ b/internal/castai/types.go @@ -227,15 +227,17 @@ type ChartSource struct { Version string `json:"version"` } +var errFieldNotSet = errors.New("chart field is not set") + func (c *ChartSource) Validate() error { if c.Name == "" { - return errors.New("chart name is not set") + return fmt.Errorf("name: %w", errFieldNotSet) } if c.RepoURL == "" { - return errors.New("chart repoURL is not set") + return fmt.Errorf("repoUrl: %w", errFieldNotSet) } if c.Version == "" { - return errors.New("chart version is not set") + return fmt.Errorf("version: %w", errFieldNotSet) } return nil }