diff --git a/pkg/batch/batch.go b/pkg/batch/batch.go index 79897f9..d04460c 100644 --- a/pkg/batch/batch.go +++ b/pkg/batch/batch.go @@ -22,12 +22,12 @@ import ( "k8s.io/client-go/util/retry" ) -// CompletedRadixBatches Completed RadixBatch lists -type CompletedRadixBatches struct { - SucceededRadixBatches []*modelsv2.RadixBatch - NotSucceededRadixBatches []*modelsv2.RadixBatch - SucceededSingleJobs []*modelsv2.RadixBatch - NotSucceededSingleJobs []*modelsv2.RadixBatch +// CompletedRadixBatchNames Completed RadixBatch lists +type CompletedRadixBatchNames struct { + SucceededRadixBatches []string + NotSucceededRadixBatches []string + SucceededSingleJobs []string + NotSucceededSingleJobs []string } // GetRadixBatchStatus Get radix batch diff --git a/pkg/batch/history.go b/pkg/batch/history.go index a067350..d32095c 100644 --- a/pkg/batch/history.go +++ b/pkg/batch/history.go @@ -10,7 +10,6 @@ import ( "github.com/equinor/radix-common/utils/slice" "github.com/equinor/radix-job-scheduler/internal" "github.com/equinor/radix-job-scheduler/models" - modelsv2 "github.com/equinor/radix-job-scheduler/models/v2" "github.com/equinor/radix-operator/pkg/apis/kube" radixv1 "github.com/equinor/radix-operator/pkg/apis/radix/v1" radixLabels "github.com/equinor/radix-operator/pkg/apis/utils/labels" @@ -21,13 +20,14 @@ import ( // History Interface for job History type History interface { // Cleanup the pipeline job history for the Radix application - Cleanup(ctx context.Context) error + Cleanup(ctx context.Context, existingRadixBatchNamesMap map[string]struct{}) error } type history struct { kubeUtil *kube.Kube env *models.Env radixDeployJobComponent *radixv1.RadixDeployJobComponent + lastCleanupTime time.Time } // NewHistory Constructor for job History @@ -40,10 +40,18 @@ func NewHistory(kubeUtil *kube.Kube, env *models.Env, radixDeployJobComponent *r } // Cleanup the pipeline job history -func (h *history) Cleanup(ctx context.Context) error { +func (h *history) Cleanup(ctx context.Context, existingRadixBatchNamesMap map[string]struct{}) error { logger := log.Ctx(ctx) - const minimumAge = 3600 // TODO add as default env-var and/or job-component property - completedBefore := time.Now().Add(-time.Second * minimumAge) + const ( + minimumAgeSeconds = 3600 // TODO add as default env-var and/or job-component property + cleanupPeriodSeconds = 60 + ) + if h.lastCleanupTime.After(time.Now().Add(-time.Second * cleanupPeriodSeconds)) { + logger.Debug().Msg("skip cleanup RadixBatch history") + return nil + } + h.lastCleanupTime = time.Now() + completedBefore := time.Now().Add(-time.Second * minimumAgeSeconds) completedRadixBatches, err := h.getCompletedRadixBatchesSortedByCompletionTimeAsc(ctx, completedBefore) if err != nil { return err @@ -52,19 +60,19 @@ func (h *history) Cleanup(ctx context.Context) error { logger.Debug().Msg("cleanup RadixBatch history for succeeded batches") var errs []error historyLimit := h.env.RadixJobSchedulersPerEnvironmentHistoryLimit - if err := h.cleanupRadixBatchHistory(ctx, completedRadixBatches.SucceededRadixBatches, historyLimit); err != nil { + if err := h.cleanupRadixBatchHistory(ctx, completedRadixBatches.SucceededRadixBatches, historyLimit, existingRadixBatchNamesMap); err != nil { errs = append(errs, err) } logger.Debug().Msg("cleanup RadixBatch history for not succeeded batches") - if err := h.cleanupRadixBatchHistory(ctx, completedRadixBatches.NotSucceededRadixBatches, historyLimit); err != nil { + if err := h.cleanupRadixBatchHistory(ctx, completedRadixBatches.NotSucceededRadixBatches, historyLimit, existingRadixBatchNamesMap); err != nil { errs = append(errs, err) } logger.Debug().Msg("cleanup RadixBatch history for succeeded single jobs") - if err := h.cleanupRadixBatchHistory(ctx, completedRadixBatches.SucceededSingleJobs, historyLimit); err != nil { + if err := h.cleanupRadixBatchHistory(ctx, completedRadixBatches.SucceededSingleJobs, historyLimit, existingRadixBatchNamesMap); err != nil { errs = append(errs, err) } logger.Debug().Msg("cleanup RadixBatch history for not succeeded single jobs") - if err := h.cleanupRadixBatchHistory(ctx, completedRadixBatches.NotSucceededSingleJobs, historyLimit); err != nil { + if err := h.cleanupRadixBatchHistory(ctx, completedRadixBatches.NotSucceededSingleJobs, historyLimit, existingRadixBatchNamesMap); err != nil { errs = append(errs, err) } logger.Debug().Msg("delete orphaned payload secrets") @@ -74,13 +82,13 @@ func (h *history) Cleanup(ctx context.Context) error { return errors.Join(errs...) } -func (h *history) getCompletedRadixBatchesSortedByCompletionTimeAsc(ctx context.Context, completedBefore time.Time) (*CompletedRadixBatches, error) { +func (h *history) getCompletedRadixBatchesSortedByCompletionTimeAsc(ctx context.Context, completedBefore time.Time) (*CompletedRadixBatchNames, error) { radixBatches, err := internal.GetRadixBatches(ctx, h.env.RadixDeploymentNamespace, h.kubeUtil.RadixClient(), radixLabels.ForComponentName(h.env.RadixComponentName)) if err != nil { return nil, err } radixBatches = sortRJSchByCompletionTimeAsc(radixBatches) - return &CompletedRadixBatches{ + return &CompletedRadixBatchNames{ SucceededRadixBatches: h.getSucceededRadixBatches(radixBatches, completedBefore), NotSucceededRadixBatches: h.getNotSucceededRadixBatches(radixBatches, completedBefore), SucceededSingleJobs: h.getSucceededSingleJobs(radixBatches, completedBefore), @@ -88,44 +96,55 @@ func (h *history) getCompletedRadixBatchesSortedByCompletionTimeAsc(ctx context. }, nil } -func (h *history) getNotSucceededRadixBatches(radixBatches []*radixv1.RadixBatch, completedBefore time.Time) []*modelsv2.RadixBatch { - return convertToRadixBatchStatuses(slice.FindAll(radixBatches, func(radixBatch *radixv1.RadixBatch) bool { - return radixBatchHasType(radixBatch, kube.RadixBatchTypeBatch) && internal.IsRadixBatchNotSucceeded(radixBatch) && radixBatchIsCompletedBefore(completedBefore, radixBatch) - }), h.radixDeployJobComponent) +func (h *history) getNotSucceededRadixBatches(radixBatches []*radixv1.RadixBatch, completedBefore time.Time) []string { + return slice.Reduce(radixBatches, []string{}, func(acc []string, radixBatch *radixv1.RadixBatch) []string { + if radixBatchHasType(radixBatch, kube.RadixBatchTypeBatch) && internal.IsRadixBatchNotSucceeded(radixBatch) && radixBatchIsCompletedBefore(completedBefore, radixBatch) { + acc = append(acc, radixBatch.Name) + } + return acc + }) } -func (h *history) getSucceededRadixBatches(radixBatches []*radixv1.RadixBatch, completedBefore time.Time) []*modelsv2.RadixBatch { - radixBatches = slice.FindAll(radixBatches, func(radixBatch *radixv1.RadixBatch) bool { - return radixBatchHasType(radixBatch, kube.RadixBatchTypeBatch) && internal.IsRadixBatchSucceeded(radixBatch) && radixBatchIsCompletedBefore(completedBefore, radixBatch) +func (h *history) getSucceededRadixBatches(radixBatches []*radixv1.RadixBatch, completedBefore time.Time) []string { + return slice.Reduce(radixBatches, []string{}, func(acc []string, radixBatch *radixv1.RadixBatch) []string { + if radixBatchHasType(radixBatch, kube.RadixBatchTypeBatch) && internal.IsRadixBatchSucceeded(radixBatch) && radixBatchIsCompletedBefore(completedBefore, radixBatch) { + acc = append(acc, radixBatch.Name) + } + return acc }) - return convertToRadixBatchStatuses(radixBatches, h.radixDeployJobComponent) } func radixBatchIsCompletedBefore(completedBefore time.Time, radixBatch *radixv1.RadixBatch) bool { return radixBatch.Status.Condition.CompletionTime != nil && (*radixBatch.Status.Condition.CompletionTime).Before(&metav1.Time{Time: completedBefore}) } -func (h *history) getNotSucceededSingleJobs(radixBatches []*radixv1.RadixBatch, completedBefore time.Time) []*modelsv2.RadixBatch { - return convertToRadixBatchStatuses(slice.FindAll(radixBatches, func(radixBatch *radixv1.RadixBatch) bool { - return radixBatchHasType(radixBatch, kube.RadixBatchTypeJob) && internal.IsRadixBatchNotSucceeded(radixBatch) && radixBatchIsCompletedBefore(completedBefore, radixBatch) - }), h.radixDeployJobComponent) +func (h *history) getNotSucceededSingleJobs(radixBatches []*radixv1.RadixBatch, completedBefore time.Time) []string { + return slice.Reduce(radixBatches, []string{}, func(acc []string, radixBatch *radixv1.RadixBatch) []string { + if radixBatchHasType(radixBatch, kube.RadixBatchTypeJob) && internal.IsRadixBatchNotSucceeded(radixBatch) && radixBatchIsCompletedBefore(completedBefore, radixBatch) { + acc = append(acc, radixBatch.Name) + } + return acc + }) } -func (h *history) getSucceededSingleJobs(radixBatches []*radixv1.RadixBatch, completedBefore time.Time) []*modelsv2.RadixBatch { - return convertToRadixBatchStatuses(slice.FindAll(radixBatches, func(radixBatch *radixv1.RadixBatch) bool { - return radixBatchHasType(radixBatch, kube.RadixBatchTypeJob) && internal.IsRadixBatchSucceeded(radixBatch) && radixBatchIsCompletedBefore(completedBefore, radixBatch) - }), h.radixDeployJobComponent) +func (h *history) getSucceededSingleJobs(radixBatches []*radixv1.RadixBatch, completedBefore time.Time) []string { + return slice.Reduce(radixBatches, []string{}, func(acc []string, radixBatch *radixv1.RadixBatch) []string { + if radixBatchHasType(radixBatch, kube.RadixBatchTypeJob) && internal.IsRadixBatchSucceeded(radixBatch) && radixBatchIsCompletedBefore(completedBefore, radixBatch) { + acc = append(acc, radixBatch.Name) + } + return acc + }) } func radixBatchHasType(radixBatch *radixv1.RadixBatch, radixBatchType kube.RadixBatchType) bool { return radixBatch.GetLabels()[kube.RadixBatchTypeLabel] == string(radixBatchType) } -func (h *history) cleanupRadixBatchHistory(ctx context.Context, radixBatchesSortedByCompletionTimeAsc []*modelsv2.RadixBatch, historyLimit int) error { +func (h *history) cleanupRadixBatchHistory(ctx context.Context, radixBatchNamesSortedByCompletionTimeAsc []string, historyLimit int, existingRadixBatchNamesMap map[string]struct{}) error { logger := log.Ctx(ctx) - numToDelete := len(radixBatchesSortedByCompletionTimeAsc) - historyLimit + numToDelete := len(radixBatchNamesSortedByCompletionTimeAsc) - historyLimit if numToDelete <= 0 { - logger.Debug().Msgf("no history batches to delete: %d batches, %d history limit", len(radixBatchesSortedByCompletionTimeAsc), historyLimit) + logger.Debug().Msgf("no history batches to delete: %d batches, %d history limit", len(radixBatchNamesSortedByCompletionTimeAsc), historyLimit) return nil } logger.Debug().Msgf("history batches to delete: %v", numToDelete) @@ -134,11 +153,12 @@ func (h *history) cleanupRadixBatchHistory(ctx context.Context, radixBatchesSort if ctx.Err() != nil { return nil } - radixBatch := radixBatchesSortedByCompletionTimeAsc[i] - logger.Debug().Msgf("deleting batch %s", radixBatch.Name) - if err := DeleteRadixBatchByName(ctx, h.kubeUtil.RadixClient(), h.env.RadixDeploymentNamespace, radixBatch.Name); err != nil { + radixBatchName := radixBatchNamesSortedByCompletionTimeAsc[i] + logger.Debug().Msgf("deleting batch %s", radixBatchName) + if err := DeleteRadixBatchByName(ctx, h.kubeUtil.RadixClient(), h.env.RadixDeploymentNamespace, radixBatchName); err != nil { return err } + delete(existingRadixBatchNamesMap, radixBatchName) } return nil } @@ -165,11 +185,3 @@ func getCompletionTimeFrom(radixBatch *radixv1.RadixBatch) *metav1.Time { } return radixBatch.Status.Condition.CompletionTime } - -func convertToRadixBatchStatuses(radixBatches []*radixv1.RadixBatch, radixDeployJobComponent *radixv1.RadixDeployJobComponent) []*modelsv2.RadixBatch { - batches := make([]*modelsv2.RadixBatch, 0, len(radixBatches)) - for _, radixBatch := range radixBatches { - batches = append(batches, pointers.Ptr(GetRadixBatchStatus(radixBatch, radixDeployJobComponent))) - } - return batches -} diff --git a/pkg/batch/history_mock.go b/pkg/batch/history_mock.go index 3121fa2..3c26f1d 100644 --- a/pkg/batch/history_mock.go +++ b/pkg/batch/history_mock.go @@ -35,15 +35,15 @@ func (m *MockHistory) EXPECT() *MockHistoryMockRecorder { } // Cleanup mocks base method. -func (m *MockHistory) Cleanup(ctx context.Context) error { +func (m *MockHistory) Cleanup(ctx context.Context, existingRadixBatchNamesMap map[string]struct{}) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Cleanup", ctx) + ret := m.ctrl.Call(m, "Cleanup", ctx, existingRadixBatchNamesMap) ret0, _ := ret[0].(error) return ret0 } // Cleanup indicates an expected call of Cleanup. -func (mr *MockHistoryMockRecorder) Cleanup(ctx interface{}) *gomock.Call { +func (mr *MockHistoryMockRecorder) Cleanup(ctx, existingRadixBatchNamesMap interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Cleanup", reflect.TypeOf((*MockHistory)(nil).Cleanup), ctx) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Cleanup", reflect.TypeOf((*MockHistory)(nil).Cleanup), ctx, existingRadixBatchNamesMap) } diff --git a/pkg/watcher/watcher.go b/pkg/watcher/watcher.go index 7522bdd..be17d24 100644 --- a/pkg/watcher/watcher.go +++ b/pkg/watcher/watcher.go @@ -49,7 +49,7 @@ func NewRadixBatchWatcher(ctx context.Context, radixClient radixclient.Interface jobHistory: jobHistory, } - existingRadixBatchMap, err := getRadixBatchMap(radixClient, namespace) + existingRadixBatchNamesMap, err := getExistingRadixBatchNamesMap(radixClient, namespace) if err != nil { return nil, fmt.Errorf("failed to get list of RadixBatches %w", err) } @@ -68,7 +68,7 @@ func NewRadixBatchWatcher(ctx context.Context, radixClient radixclient.Interface if radixBatch.Status.Condition.Type != "" { return // skip existing batch added to the cache } - if _, ok := existingRadixBatchMap[radixBatch.GetName()]; ok { + if _, ok := existingRadixBatchNamesMap[radixBatch.GetName()]; ok { watcher.logger.Debug().Msgf("skip existing RadixBatch object %s", radixBatch.GetName()) return } @@ -78,7 +78,7 @@ func NewRadixBatchWatcher(ctx context.Context, radixClient radixclient.Interface jobStatuses = make([]radixv1.RadixBatchJobStatus, 0) } notify(ctx, notifier, events.Create, radixBatch, jobStatuses) - watcher.cleanupJobHistory(ctx) + watcher.cleanupJobHistory(ctx, existingRadixBatchNamesMap) }, UpdateFunc: func(old, cur interface{}) { oldRadixBatch := old.(*radixv1.RadixBatch) @@ -104,7 +104,7 @@ func NewRadixBatchWatcher(ctx context.Context, radixClient radixclient.Interface jobStatuses = make([]radixv1.RadixBatchJobStatus, 0) } notify(ctx, notifier, events.Delete, radixBatch, jobStatuses) - delete(existingRadixBatchMap, radixBatch.GetName()) + delete(existingRadixBatchNamesMap, radixBatch.GetName()) }, }) if err != nil { @@ -127,11 +127,11 @@ func notify(ctx context.Context, notifier notifications.Notifier, ev events.Even }() } -func (w *watcher) cleanupJobHistory(ctx context.Context) { +func (w *watcher) cleanupJobHistory(ctx context.Context, existingRadixBatchNamesMap map[string]struct{}) { ctxWithTimeout, cancel := context.WithTimeout(ctx, time.Minute*5) go func() { defer cancel() - if err := w.jobHistory.Cleanup(ctxWithTimeout); err != nil { + if err := w.jobHistory.Cleanup(ctxWithTimeout, existingRadixBatchNamesMap); err != nil { log.Ctx(ctx).Error().Err(err).Msg("failed to cleanup job history") } }() @@ -166,15 +166,14 @@ func equalBatchStatuses(status1, status2 *radixv1.RadixBatchStatus) bool { status1.Condition.Message == status2.Condition.Message } -func getRadixBatchMap(radixClient radixclient.Interface, namespace string) (map[string]*radixv1.RadixBatch, error) { +func getExistingRadixBatchNamesMap(radixClient radixclient.Interface, namespace string) (map[string]struct{}, error) { radixBatchList, err := radixClient.RadixV1().RadixBatches(namespace).List(context.TODO(), metav1.ListOptions{}) if err != nil { return nil, err } - radixBatchMap := make(map[string]*radixv1.RadixBatch, len(radixBatchList.Items)) + radixBatchMap := make(map[string]struct{}, len(radixBatchList.Items)) for _, radixBatch := range radixBatchList.Items { - radixBatch := radixBatch - radixBatchMap[radixBatch.GetName()] = &radixBatch + radixBatchMap[radixBatch.GetName()] = struct{}{} } return radixBatchMap, nil } diff --git a/pkg/watcher/watcher_test.go b/pkg/watcher/watcher_test.go index fbcc3f4..e711ba0 100644 --- a/pkg/watcher/watcher_test.go +++ b/pkg/watcher/watcher_test.go @@ -216,7 +216,7 @@ func Test_RadixBatchWatcher(t *testing.T) { assert.False(t, commonUtils.IsNil(batchWatcher)) if tt.fields.newRadixBatch != nil && tt.fields.event == events.Create { - history.EXPECT().Cleanup(gomock.Any()).Times(1) + history.EXPECT().Cleanup(gomock.Any(), make(map[string]struct{})).Times(1) // when radix batch exists and during test it will be updated _, err := radixClient.RadixV1().RadixBatches(namespace).Create(context.TODO(), tt.fields.newRadixBatch, metav1.CreateOptions{}) if err != nil {