Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix memory leak #151

Merged
merged 4 commits into from
Jan 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions pkg/batch/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@ import (
"k8s.io/client-go/util/retry"
)

// CompletedRadixBatches Completed RadixBatch lists
type CompletedRadixBatches struct {
SucceededRadixBatches []*modelsv2.RadixBatch
NotSucceededRadixBatches []*modelsv2.RadixBatch
SucceededSingleJobs []*modelsv2.RadixBatch
NotSucceededSingleJobs []*modelsv2.RadixBatch
// CompletedRadixBatchNames Completed RadixBatch lists
type CompletedRadixBatchNames struct {
SucceededRadixBatches []string
NotSucceededRadixBatches []string
SucceededSingleJobs []string
NotSucceededSingleJobs []string
}

// GetRadixBatchStatus Get radix batch
Expand Down
94 changes: 53 additions & 41 deletions pkg/batch/history.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/equinor/radix-common/utils/slice"
"github.com/equinor/radix-job-scheduler/internal"
"github.com/equinor/radix-job-scheduler/models"
modelsv2 "github.com/equinor/radix-job-scheduler/models/v2"
"github.com/equinor/radix-operator/pkg/apis/kube"
radixv1 "github.com/equinor/radix-operator/pkg/apis/radix/v1"
radixLabels "github.com/equinor/radix-operator/pkg/apis/utils/labels"
Expand All @@ -21,13 +20,14 @@ import (
// History Interface for job History
type History interface {
// Cleanup the pipeline job history for the Radix application
Cleanup(ctx context.Context) error
Cleanup(ctx context.Context, existingRadixBatchNamesMap map[string]struct{}) error
}

type history struct {
kubeUtil *kube.Kube
env *models.Env
radixDeployJobComponent *radixv1.RadixDeployJobComponent
lastCleanupTime time.Time
}

// NewHistory Constructor for job History
Expand All @@ -40,10 +40,18 @@ func NewHistory(kubeUtil *kube.Kube, env *models.Env, radixDeployJobComponent *r
}

// Cleanup the pipeline job history
func (h *history) Cleanup(ctx context.Context) error {
func (h *history) Cleanup(ctx context.Context, existingRadixBatchNamesMap map[string]struct{}) error {
logger := log.Ctx(ctx)
const minimumAge = 3600 // TODO add as default env-var and/or job-component property
completedBefore := time.Now().Add(-time.Second * minimumAge)
const (
minimumAgeSeconds = 3600 // TODO add as default env-var and/or job-component property
cleanupPeriodSeconds = 60
)
if h.lastCleanupTime.After(time.Now().Add(-time.Second * cleanupPeriodSeconds)) {
logger.Debug().Msg("skip cleanup RadixBatch history")
return nil
}
h.lastCleanupTime = time.Now()
completedBefore := time.Now().Add(-time.Second * minimumAgeSeconds)
completedRadixBatches, err := h.getCompletedRadixBatchesSortedByCompletionTimeAsc(ctx, completedBefore)
if err != nil {
return err
Expand All @@ -52,19 +60,19 @@ func (h *history) Cleanup(ctx context.Context) error {
logger.Debug().Msg("cleanup RadixBatch history for succeeded batches")
var errs []error
historyLimit := h.env.RadixJobSchedulersPerEnvironmentHistoryLimit
if err := h.cleanupRadixBatchHistory(ctx, completedRadixBatches.SucceededRadixBatches, historyLimit); err != nil {
if err := h.cleanupRadixBatchHistory(ctx, completedRadixBatches.SucceededRadixBatches, historyLimit, existingRadixBatchNamesMap); err != nil {
errs = append(errs, err)
}
logger.Debug().Msg("cleanup RadixBatch history for not succeeded batches")
if err := h.cleanupRadixBatchHistory(ctx, completedRadixBatches.NotSucceededRadixBatches, historyLimit); err != nil {
if err := h.cleanupRadixBatchHistory(ctx, completedRadixBatches.NotSucceededRadixBatches, historyLimit, existingRadixBatchNamesMap); err != nil {
errs = append(errs, err)
}
logger.Debug().Msg("cleanup RadixBatch history for succeeded single jobs")
if err := h.cleanupRadixBatchHistory(ctx, completedRadixBatches.SucceededSingleJobs, historyLimit); err != nil {
if err := h.cleanupRadixBatchHistory(ctx, completedRadixBatches.SucceededSingleJobs, historyLimit, existingRadixBatchNamesMap); err != nil {
errs = append(errs, err)
}
logger.Debug().Msg("cleanup RadixBatch history for not succeeded single jobs")
if err := h.cleanupRadixBatchHistory(ctx, completedRadixBatches.NotSucceededSingleJobs, historyLimit); err != nil {
if err := h.cleanupRadixBatchHistory(ctx, completedRadixBatches.NotSucceededSingleJobs, historyLimit, existingRadixBatchNamesMap); err != nil {
errs = append(errs, err)
}
logger.Debug().Msg("delete orphaned payload secrets")
Expand All @@ -74,58 +82,69 @@ func (h *history) Cleanup(ctx context.Context) error {
return errors.Join(errs...)
}

func (h *history) getCompletedRadixBatchesSortedByCompletionTimeAsc(ctx context.Context, completedBefore time.Time) (*CompletedRadixBatches, error) {
func (h *history) getCompletedRadixBatchesSortedByCompletionTimeAsc(ctx context.Context, completedBefore time.Time) (*CompletedRadixBatchNames, error) {
radixBatches, err := internal.GetRadixBatches(ctx, h.env.RadixDeploymentNamespace, h.kubeUtil.RadixClient(), radixLabels.ForComponentName(h.env.RadixComponentName))
if err != nil {
return nil, err
}
radixBatches = sortRJSchByCompletionTimeAsc(radixBatches)
return &CompletedRadixBatches{
return &CompletedRadixBatchNames{
SucceededRadixBatches: h.getSucceededRadixBatches(radixBatches, completedBefore),
NotSucceededRadixBatches: h.getNotSucceededRadixBatches(radixBatches, completedBefore),
SucceededSingleJobs: h.getSucceededSingleJobs(radixBatches, completedBefore),
NotSucceededSingleJobs: h.getNotSucceededSingleJobs(radixBatches, completedBefore),
}, nil
}

func (h *history) getNotSucceededRadixBatches(radixBatches []*radixv1.RadixBatch, completedBefore time.Time) []*modelsv2.RadixBatch {
return convertToRadixBatchStatuses(slice.FindAll(radixBatches, func(radixBatch *radixv1.RadixBatch) bool {
return radixBatchHasType(radixBatch, kube.RadixBatchTypeBatch) && internal.IsRadixBatchNotSucceeded(radixBatch) && radixBatchIsCompletedBefore(completedBefore, radixBatch)
}), h.radixDeployJobComponent)
func (h *history) getNotSucceededRadixBatches(radixBatches []*radixv1.RadixBatch, completedBefore time.Time) []string {
return slice.Reduce(radixBatches, []string{}, func(acc []string, radixBatch *radixv1.RadixBatch) []string {
if radixBatchHasType(radixBatch, kube.RadixBatchTypeBatch) && internal.IsRadixBatchNotSucceeded(radixBatch) && radixBatchIsCompletedBefore(completedBefore, radixBatch) {
acc = append(acc, radixBatch.Name)
}
return acc
})
}

func (h *history) getSucceededRadixBatches(radixBatches []*radixv1.RadixBatch, completedBefore time.Time) []*modelsv2.RadixBatch {
radixBatches = slice.FindAll(radixBatches, func(radixBatch *radixv1.RadixBatch) bool {
return radixBatchHasType(radixBatch, kube.RadixBatchTypeBatch) && internal.IsRadixBatchSucceeded(radixBatch) && radixBatchIsCompletedBefore(completedBefore, radixBatch)
func (h *history) getSucceededRadixBatches(radixBatches []*radixv1.RadixBatch, completedBefore time.Time) []string {
return slice.Reduce(radixBatches, []string{}, func(acc []string, radixBatch *radixv1.RadixBatch) []string {
if radixBatchHasType(radixBatch, kube.RadixBatchTypeBatch) && internal.IsRadixBatchSucceeded(radixBatch) && radixBatchIsCompletedBefore(completedBefore, radixBatch) {
acc = append(acc, radixBatch.Name)
}
return acc
})
return convertToRadixBatchStatuses(radixBatches, h.radixDeployJobComponent)
}

func radixBatchIsCompletedBefore(completedBefore time.Time, radixBatch *radixv1.RadixBatch) bool {
return radixBatch.Status.Condition.CompletionTime != nil && (*radixBatch.Status.Condition.CompletionTime).Before(&metav1.Time{Time: completedBefore})
}

func (h *history) getNotSucceededSingleJobs(radixBatches []*radixv1.RadixBatch, completedBefore time.Time) []*modelsv2.RadixBatch {
return convertToRadixBatchStatuses(slice.FindAll(radixBatches, func(radixBatch *radixv1.RadixBatch) bool {
return radixBatchHasType(radixBatch, kube.RadixBatchTypeJob) && internal.IsRadixBatchNotSucceeded(radixBatch) && radixBatchIsCompletedBefore(completedBefore, radixBatch)
}), h.radixDeployJobComponent)
func (h *history) getNotSucceededSingleJobs(radixBatches []*radixv1.RadixBatch, completedBefore time.Time) []string {
return slice.Reduce(radixBatches, []string{}, func(acc []string, radixBatch *radixv1.RadixBatch) []string {
if radixBatchHasType(radixBatch, kube.RadixBatchTypeJob) && internal.IsRadixBatchNotSucceeded(radixBatch) && radixBatchIsCompletedBefore(completedBefore, radixBatch) {
acc = append(acc, radixBatch.Name)
}
return acc
})
}

func (h *history) getSucceededSingleJobs(radixBatches []*radixv1.RadixBatch, completedBefore time.Time) []*modelsv2.RadixBatch {
return convertToRadixBatchStatuses(slice.FindAll(radixBatches, func(radixBatch *radixv1.RadixBatch) bool {
return radixBatchHasType(radixBatch, kube.RadixBatchTypeJob) && internal.IsRadixBatchSucceeded(radixBatch) && radixBatchIsCompletedBefore(completedBefore, radixBatch)
}), h.radixDeployJobComponent)
func (h *history) getSucceededSingleJobs(radixBatches []*radixv1.RadixBatch, completedBefore time.Time) []string {
return slice.Reduce(radixBatches, []string{}, func(acc []string, radixBatch *radixv1.RadixBatch) []string {
if radixBatchHasType(radixBatch, kube.RadixBatchTypeJob) && internal.IsRadixBatchSucceeded(radixBatch) && radixBatchIsCompletedBefore(completedBefore, radixBatch) {
acc = append(acc, radixBatch.Name)
}
return acc
})
}

func radixBatchHasType(radixBatch *radixv1.RadixBatch, radixBatchType kube.RadixBatchType) bool {
return radixBatch.GetLabels()[kube.RadixBatchTypeLabel] == string(radixBatchType)
}

func (h *history) cleanupRadixBatchHistory(ctx context.Context, radixBatchesSortedByCompletionTimeAsc []*modelsv2.RadixBatch, historyLimit int) error {
func (h *history) cleanupRadixBatchHistory(ctx context.Context, radixBatchNamesSortedByCompletionTimeAsc []string, historyLimit int, existingRadixBatchNamesMap map[string]struct{}) error {
logger := log.Ctx(ctx)
numToDelete := len(radixBatchesSortedByCompletionTimeAsc) - historyLimit
numToDelete := len(radixBatchNamesSortedByCompletionTimeAsc) - historyLimit
if numToDelete <= 0 {
logger.Debug().Msgf("no history batches to delete: %d batches, %d history limit", len(radixBatchesSortedByCompletionTimeAsc), historyLimit)
logger.Debug().Msgf("no history batches to delete: %d batches, %d history limit", len(radixBatchNamesSortedByCompletionTimeAsc), historyLimit)
return nil
}
logger.Debug().Msgf("history batches to delete: %v", numToDelete)
Expand All @@ -134,11 +153,12 @@ func (h *history) cleanupRadixBatchHistory(ctx context.Context, radixBatchesSort
if ctx.Err() != nil {
return nil
}
radixBatch := radixBatchesSortedByCompletionTimeAsc[i]
logger.Debug().Msgf("deleting batch %s", radixBatch.Name)
if err := DeleteRadixBatchByName(ctx, h.kubeUtil.RadixClient(), h.env.RadixDeploymentNamespace, radixBatch.Name); err != nil {
radixBatchName := radixBatchNamesSortedByCompletionTimeAsc[i]
logger.Debug().Msgf("deleting batch %s", radixBatchName)
if err := DeleteRadixBatchByName(ctx, h.kubeUtil.RadixClient(), h.env.RadixDeploymentNamespace, radixBatchName); err != nil {
return err
}
delete(existingRadixBatchNamesMap, radixBatchName)
}
return nil
}
Expand All @@ -165,11 +185,3 @@ func getCompletionTimeFrom(radixBatch *radixv1.RadixBatch) *metav1.Time {
}
return radixBatch.Status.Condition.CompletionTime
}

func convertToRadixBatchStatuses(radixBatches []*radixv1.RadixBatch, radixDeployJobComponent *radixv1.RadixDeployJobComponent) []*modelsv2.RadixBatch {
batches := make([]*modelsv2.RadixBatch, 0, len(radixBatches))
for _, radixBatch := range radixBatches {
batches = append(batches, pointers.Ptr(GetRadixBatchStatus(radixBatch, radixDeployJobComponent)))
}
return batches
}
8 changes: 4 additions & 4 deletions pkg/batch/history_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 9 additions & 10 deletions pkg/watcher/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func NewRadixBatchWatcher(ctx context.Context, radixClient radixclient.Interface
jobHistory: jobHistory,
}

existingRadixBatchMap, err := getRadixBatchMap(radixClient, namespace)
existingRadixBatchNamesMap, err := getExistingRadixBatchNamesMap(radixClient, namespace)
if err != nil {
return nil, fmt.Errorf("failed to get list of RadixBatches %w", err)
}
Expand All @@ -68,7 +68,7 @@ func NewRadixBatchWatcher(ctx context.Context, radixClient radixclient.Interface
if radixBatch.Status.Condition.Type != "" {
return // skip existing batch added to the cache
}
if _, ok := existingRadixBatchMap[radixBatch.GetName()]; ok {
if _, ok := existingRadixBatchNamesMap[radixBatch.GetName()]; ok {
watcher.logger.Debug().Msgf("skip existing RadixBatch object %s", radixBatch.GetName())
return
}
Expand All @@ -78,7 +78,7 @@ func NewRadixBatchWatcher(ctx context.Context, radixClient radixclient.Interface
jobStatuses = make([]radixv1.RadixBatchJobStatus, 0)
}
notify(ctx, notifier, events.Create, radixBatch, jobStatuses)
watcher.cleanupJobHistory(ctx)
watcher.cleanupJobHistory(ctx, existingRadixBatchNamesMap)
},
UpdateFunc: func(old, cur interface{}) {
oldRadixBatch := old.(*radixv1.RadixBatch)
Expand All @@ -104,7 +104,7 @@ func NewRadixBatchWatcher(ctx context.Context, radixClient radixclient.Interface
jobStatuses = make([]radixv1.RadixBatchJobStatus, 0)
}
notify(ctx, notifier, events.Delete, radixBatch, jobStatuses)
delete(existingRadixBatchMap, radixBatch.GetName())
delete(existingRadixBatchNamesMap, radixBatch.GetName())
},
})
if err != nil {
Expand All @@ -127,11 +127,11 @@ func notify(ctx context.Context, notifier notifications.Notifier, ev events.Even
}()
}

func (w *watcher) cleanupJobHistory(ctx context.Context) {
func (w *watcher) cleanupJobHistory(ctx context.Context, existingRadixBatchNamesMap map[string]struct{}) {
ctxWithTimeout, cancel := context.WithTimeout(ctx, time.Minute*5)
go func() {
defer cancel()
if err := w.jobHistory.Cleanup(ctxWithTimeout); err != nil {
if err := w.jobHistory.Cleanup(ctxWithTimeout, existingRadixBatchNamesMap); err != nil {
log.Ctx(ctx).Error().Err(err).Msg("failed to cleanup job history")
}
}()
Expand Down Expand Up @@ -166,15 +166,14 @@ func equalBatchStatuses(status1, status2 *radixv1.RadixBatchStatus) bool {
status1.Condition.Message == status2.Condition.Message
}

func getRadixBatchMap(radixClient radixclient.Interface, namespace string) (map[string]*radixv1.RadixBatch, error) {
func getExistingRadixBatchNamesMap(radixClient radixclient.Interface, namespace string) (map[string]struct{}, error) {
radixBatchList, err := radixClient.RadixV1().RadixBatches(namespace).List(context.TODO(), metav1.ListOptions{})
if err != nil {
return nil, err
}
radixBatchMap := make(map[string]*radixv1.RadixBatch, len(radixBatchList.Items))
radixBatchMap := make(map[string]struct{}, len(radixBatchList.Items))
for _, radixBatch := range radixBatchList.Items {
radixBatch := radixBatch
radixBatchMap[radixBatch.GetName()] = &radixBatch
radixBatchMap[radixBatch.GetName()] = struct{}{}
}
return radixBatchMap, nil
}
2 changes: 1 addition & 1 deletion pkg/watcher/watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ func Test_RadixBatchWatcher(t *testing.T) {
assert.False(t, commonUtils.IsNil(batchWatcher))

if tt.fields.newRadixBatch != nil && tt.fields.event == events.Create {
history.EXPECT().Cleanup(gomock.Any()).Times(1)
history.EXPECT().Cleanup(gomock.Any(), make(map[string]struct{})).Times(1)
// when radix batch exists and during test it will be updated
_, err := radixClient.RadixV1().RadixBatches(namespace).Create(context.TODO(), tt.fields.newRadixBatch, metav1.CreateOptions{})
if err != nil {
Expand Down