Skip to content

Commit

Permalink
set minimum pod worker size to 1 (#153)
Browse files Browse the repository at this point in the history
* set minimum workers to 1 for pod batch tasks
  • Loading branch information
ValyaB authored Oct 25, 2024
1 parent 117875a commit 2f8ff94
Show file tree
Hide file tree
Showing 19 changed files with 64 additions and 54 deletions.
2 changes: 1 addition & 1 deletion internal/actions/approve_csr_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type ApproveCSRHandler struct {
func (h *ApproveCSRHandler) Handle(ctx context.Context, action *castai.ClusterAction) error {
req, ok := action.Data().(*castai.ActionApproveCSR)
if !ok {
return fmt.Errorf("unexpected type %T for approve csr handler", action.Data())
return newUnexpectedTypeErr(action.Data(), req)
}
log := h.log.WithFields(logrus.Fields{
"node_name": req.NodeName,
Expand Down
9 changes: 4 additions & 5 deletions internal/actions/chart_rollback_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package actions

import (
"context"
"errors"
"fmt"

"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -30,7 +29,7 @@ type ChartRollbackHandler struct {
func (c *ChartRollbackHandler) Handle(_ context.Context, action *castai.ClusterAction) error {
req, ok := action.Data().(*castai.ActionChartRollback)
if !ok {
return fmt.Errorf("unexpected type %T for chart rollback handler", action.Data())
return newUnexpectedTypeErr(action.Data(), req)
}

if err := c.validateRequest(req); err != nil {
Expand All @@ -50,13 +49,13 @@ func (c *ChartRollbackHandler) Handle(_ context.Context, action *castai.ClusterA

func (c *ChartRollbackHandler) validateRequest(req *castai.ActionChartRollback) error {
if req.ReleaseName == "" {
return errors.New("bad request: releaseName not provided")
return fmt.Errorf("release name not provided %w", errAction)
}
if req.Namespace == "" {
return errors.New("bad request: namespace not provided")
return fmt.Errorf("namespace not provided %w", errAction)
}
if req.Version == "" {
return errors.New("bad request: version not provided")
return fmt.Errorf("version not provided %w", errAction)
}
return nil
}
7 changes: 3 additions & 4 deletions internal/actions/chart_uninstall_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package actions

import (
"context"
"errors"
"fmt"

"github.com/sirupsen/logrus"
Expand All @@ -28,7 +27,7 @@ type ChartUninstallHandler struct {
func (c *ChartUninstallHandler) Handle(_ context.Context, action *castai.ClusterAction) error {
req, ok := action.Data().(*castai.ActionChartUninstall)
if !ok {
return fmt.Errorf("unexpected type %T for upsert uninstall handler", action.Data())
return newUnexpectedTypeErr(action.Data(), req)
}

if err := c.validateRequest(req); err != nil {
Expand All @@ -43,10 +42,10 @@ func (c *ChartUninstallHandler) Handle(_ context.Context, action *castai.Cluster

func (c *ChartUninstallHandler) validateRequest(req *castai.ActionChartUninstall) error {
if req.ReleaseName == "" {
return errors.New("bad request: releaseName not provided")
return fmt.Errorf("release name not provided %w", errAction)
}
if req.Namespace == "" {
return errors.New("bad request: namespace not provided")
return fmt.Errorf("namespace not provided %w", errAction)
}
return nil
}
6 changes: 3 additions & 3 deletions internal/actions/chart_upsert_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type ChartUpsertHandler struct {
func (c *ChartUpsertHandler) Handle(ctx context.Context, action *castai.ClusterAction) error {
req, ok := action.Data().(*castai.ActionChartUpsert)
if !ok {
return fmt.Errorf("unexpected type %T for upsert chart handler", action.Data())
return newUnexpectedTypeErr(action.Data(), req)
}

if err := c.validateRequest(req); err != nil {
Expand Down Expand Up @@ -79,10 +79,10 @@ func (c *ChartUpsertHandler) Handle(ctx context.Context, action *castai.ClusterA

func (c *ChartUpsertHandler) validateRequest(req *castai.ActionChartUpsert) error {
if req.ReleaseName == "" {
return errors.New("bad request: releaseName not provided")
return fmt.Errorf("release name not provided %w", errAction)
}
if req.Namespace == "" {
return errors.New("bad request: namespace not provided")
return fmt.Errorf("namespace not provided %w", errAction)
}
if err := req.ChartSource.Validate(); err != nil {
return fmt.Errorf("validating chart source: %w", err)
Expand Down
8 changes: 5 additions & 3 deletions internal/actions/check_node_deleted.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,12 @@ type CheckNodeDeletedHandler struct {
cfg checkNodeDeletedConfig
}

var errNodeNotDeleted = errors.New("node is not deleted")

func (h *CheckNodeDeletedHandler) Handle(ctx context.Context, action *castai.ClusterAction) error {
req, ok := action.Data().(*castai.ActionCheckNodeDeleted)
if !ok {
return fmt.Errorf("unexpected type %T for check node deleted handler", action.Data())
return newUnexpectedTypeErr(action.Data(), req)
}

log := h.log.WithFields(logrus.Fields{
Expand Down Expand Up @@ -80,12 +82,12 @@ func (h *CheckNodeDeletedHandler) Handle(ctx context.Context, action *castai.Clu
return false, nil
}
if currentNodeID == req.NodeID {
return false, errors.New("node is not deleted")
return false, fmt.Errorf("current node id = request node ID %w", errNodeNotDeleted)
}
}

if n != nil {
return false, errors.New("node is not deleted")
return false, errNodeNotDeleted
}

return true, err
Expand Down
7 changes: 3 additions & 4 deletions internal/actions/check_node_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package actions

import (
"context"
"errors"
"fmt"
"reflect"
"time"
Expand Down Expand Up @@ -34,7 +33,7 @@ type CheckNodeStatusHandler struct {
func (h *CheckNodeStatusHandler) Handle(ctx context.Context, action *castai.ClusterAction) error {
req, ok := action.Data().(*castai.ActionCheckNodeStatus)
if !ok {
return fmt.Errorf("unexpected type %T for check node status handler", action.Data())
return newUnexpectedTypeErr(action.Data(), req)
}

log := h.log.WithFields(logrus.Fields{
Expand Down Expand Up @@ -96,12 +95,12 @@ func (h *CheckNodeStatusHandler) checkNodeDeleted(ctx context.Context, log *logr
return false, nil
}
if currentNodeID == req.NodeID {
return false, errors.New("node is not deleted")
return false, fmt.Errorf("current node id is equal to requested node id: %v %w", req.NodeID, errNodeNotDeleted)
}
}

if n != nil {
return false, errors.New("node is not deleted")
return false, errNodeNotDeleted
}

return true, err
Expand Down
21 changes: 11 additions & 10 deletions internal/actions/check_node_status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package actions

import (
"context"
"errors"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -33,7 +34,7 @@ func TestCheckStatus_Deleted(t *testing.T) {
},
},
}
clientset := fake.NewSimpleClientset(node)
clientset := fake.NewClientset(node)

h := CheckNodeStatusHandler{
log: log,
Expand All @@ -50,7 +51,7 @@ func TestCheckStatus_Deleted(t *testing.T) {
}

err := h.Handle(context.Background(), action)
r.EqualError(err, "node is not deleted")
r.True(errors.Is(err, errNodeNotDeleted))
})

t.Run("return error when node is not deleted with no label (backwards compatibility)", func(t *testing.T) {
Expand All @@ -61,7 +62,7 @@ func TestCheckStatus_Deleted(t *testing.T) {
Name: nodeName,
},
}
clientset := fake.NewSimpleClientset(node)
clientset := fake.NewClientset(node)

h := CheckNodeStatusHandler{
log: log,
Expand All @@ -83,7 +84,7 @@ func TestCheckStatus_Deleted(t *testing.T) {

t.Run("handle check successfully when node is not found", func(t *testing.T) {
r := require.New(t)
clientset := fake.NewSimpleClientset()
clientset := fake.NewClientset()

h := CheckNodeStatusHandler{
log: log,
Expand Down Expand Up @@ -113,7 +114,7 @@ func TestCheckStatus_Deleted(t *testing.T) {
},
},
}
clientset := fake.NewSimpleClientset(node)
clientset := fake.NewClientset(node)

h := CheckNodeStatusHandler{
log: log,
Expand All @@ -140,7 +141,7 @@ func TestCheckStatus_Ready(t *testing.T) {

t.Run("return error when node is not found", func(t *testing.T) {
r := require.New(t)
clientset := fake.NewSimpleClientset()
clientset := fake.NewClientset()

h := CheckNodeStatusHandler{
log: log,
Expand Down Expand Up @@ -185,7 +186,7 @@ func TestCheckStatus_Ready(t *testing.T) {
},
},
}
clientset := fake.NewSimpleClientset(node)
clientset := fake.NewClientset(node)

h := CheckNodeStatusHandler{
log: log,
Expand Down Expand Up @@ -240,7 +241,7 @@ func TestCheckStatus_Ready(t *testing.T) {
Taints: []v1.Taint{taintCloudProviderUninitialized},
},
}
clientset := fake.NewSimpleClientset(node)
clientset := fake.NewClientset(node)

h := CheckNodeStatusHandler{
log: log,
Expand Down Expand Up @@ -287,7 +288,7 @@ func TestCheckStatus_Ready(t *testing.T) {
Conditions: []v1.NodeCondition{},
},
}
clientset := fake.NewSimpleClientset(node)
clientset := fake.NewClientset(node)
watcher := watch.NewFake()

clientset.PrependWatchReactor("nodes", k8stest.DefaultWatchReactor(watcher, nil))
Expand Down Expand Up @@ -333,7 +334,7 @@ func TestCheckStatus_Ready(t *testing.T) {
},
},
}
clientset := fake.NewSimpleClientset(node)
clientset := fake.NewClientset(node)

h := CheckNodeStatusHandler{
log: log,
Expand Down
2 changes: 1 addition & 1 deletion internal/actions/create_event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ type CreateEventHandler struct {
func (h *CreateEventHandler) Handle(ctx context.Context, action *castai.ClusterAction) error {
req, ok := action.Data().(*castai.ActionCreateEvent)
if !ok {
return fmt.Errorf("unexpected type %T for create event handler", action.Data())
return newUnexpectedTypeErr(action.Data(), req)
}
namespace := req.ObjectRef.Namespace
if namespace == "" {
Expand Down
9 changes: 4 additions & 5 deletions internal/actions/create_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package actions

import (
"context"
"errors"
"fmt"
"reflect"

Expand Down Expand Up @@ -39,12 +38,12 @@ func (h *CreateHandler) Handle(ctx context.Context, action *castai.ClusterAction
}

if req.Object == nil {
return errors.New("no object provided")
return fmt.Errorf("object not provided %w", errAction)
}

newObj := &unstructured.Unstructured{Object: req.Object}
if newObj.GetNamespace() == "" {
return errors.New("object namespace is missing")
return fmt.Errorf("namespace not provided %w", errAction)
}

log := h.log.WithFields(logrus.Fields{
Expand Down Expand Up @@ -87,12 +86,12 @@ func (h *CreateHandler) Handle(ctx context.Context, action *castai.ClusterAction

original, err := obj.MarshalJSON()
if err != nil {
return err
return fmt.Errorf("marshaling original resource: %w", err)
}

modified, err := newObj.MarshalJSON()
if err != nil {
return err
return fmt.Errorf("marshaling modified resource: %w", err)
}

patch, err := jsonpatch.CreateMergePatch(original, modified)
Expand Down
6 changes: 3 additions & 3 deletions internal/actions/create_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,15 @@ func Test_newCreateHandler(t *testing.T) {
action: &castai.ClusterAction{
ActionDeleteNode: &castai.ActionDeleteNode{},
},
err: newUnexpectedTypeErr(&castai.ActionDeleteNode{}, &castai.ActionCreate{}),
err: errAction,
},
"should return error when object is not provided": {
action: &castai.ClusterAction{
ActionCreate: &castai.ActionCreate{
GroupVersionResource: castai.GroupVersionResource{},
},
},
err: errors.New("no object provided"),
err: errAction,
},
"should create new deployment": {
action: &castai.ClusterAction{
Expand Down Expand Up @@ -129,7 +129,7 @@ func Test_newCreateHandler(t *testing.T) {
err := handler.Handle(ctx, test.action)
if test.err != nil {
r.Error(err)
r.Equal(test.err, err)
r.True(errors.Is(err, test.err), "expected error %v, got %v", test.err, err)
return
}

Expand Down
4 changes: 3 additions & 1 deletion internal/actions/csr/csr.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,11 @@ type Certificate struct {
RequestingUser string
}

var errCSRNotFound = errors.New("v1 or v1beta csr should be set")

func (c *Certificate) Validate() error {
if c.v1 == nil && c.v1Beta1 == nil {
return errors.New("v1 or v1beta csr should be set")
return errCSRNotFound
}
return nil
}
Expand Down
4 changes: 3 additions & 1 deletion internal/actions/csr/svc.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ func (h *ApprovalManager) handleWithRetry(ctx context.Context, log *logrus.Entry
)
}

var errCSRNotApproved = errors.New("certificate signing request was not approved")

func (h *ApprovalManager) handle(ctx context.Context, log logrus.FieldLogger, cert *Certificate) (reterr error) {
if cert.Approved() {
return nil
Expand Down Expand Up @@ -94,7 +96,7 @@ func (h *ApprovalManager) handle(ctx context.Context, log logrus.FieldLogger, ce
}
}

return errors.New("certificate signing request was not approved")
return errCSRNotApproved
}

func (h *ApprovalManager) runAutoApproveForCastAINodes(ctx context.Context) {
Expand Down
2 changes: 1 addition & 1 deletion internal/actions/delete_node_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ type DeleteNodeHandler struct {
func (h *DeleteNodeHandler) Handle(ctx context.Context, action *castai.ClusterAction) error {
req, ok := action.Data().(*castai.ActionDeleteNode)
if !ok {
return fmt.Errorf("unexpected type %T for delete node handler", action.Data())
return newUnexpectedTypeErr(action.Data(), req)
}

log := h.log.WithFields(logrus.Fields{
Expand Down
2 changes: 1 addition & 1 deletion internal/actions/drain_node_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ type DrainNodeHandler struct {
func (h *DrainNodeHandler) Handle(ctx context.Context, action *castai.ClusterAction) error {
req, ok := action.Data().(*castai.ActionDrainNode)
if !ok {
return fmt.Errorf("unexpected type %T for drain handler", action.Data())
return newUnexpectedTypeErr(action.Data(), req)
}
drainTimeout := h.getDrainTimeout(action)

Expand Down
2 changes: 1 addition & 1 deletion internal/actions/kubernetes_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func executeBatchPodActions(
}

var (
parallelTasks = int(lo.Clamp(float64(len(pods)), 30, 100))
parallelTasks = lo.Clamp(len(pods), 1, 50)
taskChan = make(chan v1.Pod, len(pods))
successfulPodsChan = make(chan *v1.Pod, len(pods))
failedPodsChan = make(chan podActionFailure, len(pods))
Expand Down
Loading

0 comments on commit 2f8ff94

Please sign in to comment.