Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactoring: move types to common package and define interface on consumer's side #114

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 48 additions & 39 deletions actions/actions.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
// Package actions polls, handles and acknowledges actions from mothership for a given cluster.
//
//go:generate mockgen -destination ./mock/client.go . Client
package actions

import (
Expand All @@ -14,22 +17,24 @@ import (
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"

"github.com/castai/cluster-controller/castai"
"github.com/castai/cluster-controller/health"
"github.com/castai/cluster-controller/helm"
"github.com/castai/cluster-controller/types"
"github.com/castai/cluster-controller/waitext"
)

const (
// actionIDLogField is the log field name for action ID.
// This field is used in backend to detect actions ID in logs.
actionIDLogField = "id"
labelNodeID = "provisioner.cast.ai/node-id"
)

func newUnexpectedTypeErr(value interface{}, expectedType interface{}) error {
return fmt.Errorf("unexpected type %T, expected %T", value, expectedType)
}

// Config contains parameters to modify actions handling frequency and values required to poll/ack actions.
type Config struct {
PollWaitInterval time.Duration // How long to wait unit next long polling request.
PollTimeout time.Duration // hard timeout. Normally server should return empty result before this timeout.
Expand All @@ -41,67 +46,73 @@ type Config struct {
Namespace string
}

type Service interface {
Run(ctx context.Context)
// Client abstracts communication means.
type Client interface {
GetActions(ctx context.Context, k8sVersion string) ([]*types.ClusterAction, error)
AckAction(ctx context.Context, actionID string, errMessage *string) error
SendAKSInitData(ctx context.Context, cloudConfigBase64, protectedSettingsBase64, architecture string) error
}

type ActionHandler interface {
Handle(ctx context.Context, action *castai.ClusterAction) error
type actionHandler interface {
Handle(ctx context.Context, action *types.ClusterAction) error
}

// NewService returns new Service that can continuously handle actions once started.
func NewService(
log logrus.FieldLogger,
cfg Config,
k8sVersion string,
clientset *kubernetes.Clientset,
dynamicClient dynamic.Interface,
castaiClient castai.ActionsClient,
castaiClient Client,
helmClient helm.Client,
healthCheck *health.HealthzProvider,
) Service {
return &service{
) *Service {
return &Service{
log: log,
cfg: cfg,
k8sVersion: k8sVersion,
castAIClient: castaiClient,
startedActions: map[string]struct{}{},
actionHandlers: map[reflect.Type]ActionHandler{
reflect.TypeOf(&castai.ActionDeleteNode{}): newDeleteNodeHandler(log, clientset),
reflect.TypeOf(&castai.ActionDrainNode{}): newDrainNodeHandler(log, clientset, cfg.Namespace),
reflect.TypeOf(&castai.ActionPatchNode{}): newPatchNodeHandler(log, clientset),
reflect.TypeOf(&castai.ActionCreateEvent{}): newCreateEventHandler(log, clientset),
reflect.TypeOf(&castai.ActionApproveCSR{}): newApproveCSRHandler(log, clientset),
reflect.TypeOf(&castai.ActionChartUpsert{}): newChartUpsertHandler(log, helmClient),
reflect.TypeOf(&castai.ActionChartUninstall{}): newChartUninstallHandler(log, helmClient),
reflect.TypeOf(&castai.ActionChartRollback{}): newChartRollbackHandler(log, helmClient, cfg.Version),
reflect.TypeOf(&castai.ActionDisconnectCluster{}): newDisconnectClusterHandler(log, clientset),
reflect.TypeOf(&castai.ActionSendAKSInitData{}): newSendAKSInitDataHandler(log, castaiClient),
reflect.TypeOf(&castai.ActionCheckNodeDeleted{}): newCheckNodeDeletedHandler(log, clientset),
reflect.TypeOf(&castai.ActionCheckNodeStatus{}): newCheckNodeStatusHandler(log, clientset),
reflect.TypeOf(&castai.ActionPatch{}): newPatchHandler(log, dynamicClient),
reflect.TypeOf(&castai.ActionCreate{}): newCreateHandler(log, dynamicClient),
reflect.TypeOf(&castai.ActionDelete{}): newDeleteHandler(log, dynamicClient),
actionHandlers: map[reflect.Type]actionHandler{
reflect.TypeOf(&types.ActionDeleteNode{}): newDeleteNodeHandler(log, clientset),
reflect.TypeOf(&types.ActionDrainNode{}): newDrainNodeHandler(log, clientset, cfg.Namespace),
reflect.TypeOf(&types.ActionPatchNode{}): newPatchNodeHandler(log, clientset),
reflect.TypeOf(&types.ActionCreateEvent{}): newCreateEventHandler(log, clientset),
reflect.TypeOf(&types.ActionApproveCSR{}): newApproveCSRHandler(log, clientset),
reflect.TypeOf(&types.ActionChartUpsert{}): newChartUpsertHandler(log, helmClient),
reflect.TypeOf(&types.ActionChartUninstall{}): newChartUninstallHandler(log, helmClient),
reflect.TypeOf(&types.ActionChartRollback{}): newChartRollbackHandler(log, helmClient, cfg.Version),
reflect.TypeOf(&types.ActionDisconnectCluster{}): newDisconnectClusterHandler(log, clientset),
reflect.TypeOf(&types.ActionSendAKSInitData{}): newSendAKSInitDataHandler(log, castaiClient),
reflect.TypeOf(&types.ActionCheckNodeDeleted{}): newCheckNodeDeletedHandler(log, clientset),
reflect.TypeOf(&types.ActionCheckNodeStatus{}): newCheckNodeStatusHandler(log, clientset),
reflect.TypeOf(&types.ActionPatch{}): newPatchHandler(log, dynamicClient),
reflect.TypeOf(&types.ActionCreate{}): newCreateHandler(log, dynamicClient),
reflect.TypeOf(&types.ActionDelete{}): newDeleteHandler(log, dynamicClient),
},
healthCheck: healthCheck,
}
}

type service struct {
// Service can continuously poll and handle actions.
type Service struct {
log logrus.FieldLogger
cfg Config
castAIClient castai.ActionsClient
castAIClient Client

k8sVersion string

actionHandlers map[reflect.Type]ActionHandler
actionHandlers map[reflect.Type]actionHandler

startedActionsWg sync.WaitGroup
startedActions map[string]struct{}
startedActionsMu sync.Mutex
healthCheck *health.HealthzProvider
}

func (s *service) Run(ctx context.Context) {
// Run starts polling and handling actions.
func (s *Service) Run(ctx context.Context) {
s.healthCheck.Initializing()
for {
select {
Expand All @@ -123,11 +134,11 @@ func (s *service) Run(ctx context.Context) {
}
}

func (s *service) doWork(ctx context.Context) error {
func (s *Service) doWork(ctx context.Context) error {
s.log.Info("polling actions")
start := time.Now()
var (
actions []*castai.ClusterAction
actions []*types.ClusterAction
err error
iteration int
)
Expand Down Expand Up @@ -161,13 +172,13 @@ func (s *service) doWork(ctx context.Context) error {
return nil
}

func (s *service) handleActions(ctx context.Context, actions []*castai.ClusterAction) {
func (s *Service) handleActions(ctx context.Context, actions []*types.ClusterAction) {
for _, action := range actions {
if !s.startProcessing(action.ID) {
continue
}

go func(action *castai.ClusterAction) {
go func(action *types.ClusterAction) {
defer s.finishProcessing(action.ID)

var err error
Expand All @@ -193,15 +204,15 @@ func (s *service) handleActions(ctx context.Context, actions []*castai.ClusterAc
}
}

func (s *service) finishProcessing(actionID string) {
func (s *Service) finishProcessing(actionID string) {
s.startedActionsMu.Lock()
defer s.startedActionsMu.Unlock()

s.startedActionsWg.Done()
delete(s.startedActions, actionID)
}

func (s *service) startProcessing(actionID string) bool {
func (s *Service) startProcessing(actionID string) bool {
s.startedActionsMu.Lock()
defer s.startedActionsMu.Unlock()

Expand All @@ -214,7 +225,7 @@ func (s *service) startProcessing(actionID string) bool {
return true
}

func (s *service) handleAction(ctx context.Context, action *castai.ClusterAction) (err error) {
func (s *Service) handleAction(ctx context.Context, action *types.ClusterAction) (err error) {
actionType := reflect.TypeOf(action.Data())

defer func() {
Expand All @@ -238,7 +249,7 @@ func (s *service) handleAction(ctx context.Context, action *castai.ClusterAction
return nil
}

func (s *service) ackAction(ctx context.Context, action *castai.ClusterAction, handleErr error) error {
func (s *Service) ackAction(ctx context.Context, action *types.ClusterAction, handleErr error) error {
actionType := reflect.TypeOf(action.Data())
s.log.WithFields(logrus.Fields{
actionIDLogField: action.ID,
Expand All @@ -250,9 +261,7 @@ func (s *service) ackAction(ctx context.Context, action *castai.ClusterAction, h
return waitext.Retry(ctx, boff, s.cfg.AckRetriesCount, func(ctx context.Context) (bool, error) {
ctx, cancel := context.WithTimeout(ctx, s.cfg.AckTimeout)
defer cancel()
return true, s.castAIClient.AckAction(ctx, action.ID, &castai.AckClusterActionRequest{
Error: getHandlerError(handleErr),
})
return true, s.castAIClient.AckAction(ctx, action.ID, getHandlerError(handleErr))
}, func(err error) {
s.log.Debugf("ack failed, will retry: %v", err)
})
Expand Down
Loading
Loading