Skip to content

Commit

Permalink
chore: add reconcile job for deleted crds (#2326)
Browse files Browse the repository at this point in the history
* chore: add reconcile job for deleted crds

* chore: use client list

* chore: update pkg/jobs/topology/topology_jobs.go

Co-authored-by: Moshe Immerman <[email protected]>

* chore: update pkg/jobs/topology/topology_jobs.go

Co-authored-by: Moshe Immerman <[email protected]>

* chore: update length check

* chore: return if no topologies in k8s

---------

Co-authored-by: Moshe Immerman <[email protected]>
  • Loading branch information
yashmehrotra and moshloop authored Nov 11, 2024
1 parent 818b1a9 commit cfeed36
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 1 deletion.
7 changes: 6 additions & 1 deletion pkg/jobs/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,12 @@ func Start() {
}
}

for _, j := range []*job.Job{topologyJobs.CleanupDeletedTopologyComponents, topologyJobs.SyncTopology, canaryJobs.SyncCanaryJobs, canaryJobs.CleanupDeletedCanaryChecks, dutyQuery.SyncComponentCacheJob} {
miscJobs := []*job.Job{
topologyJobs.CleanupDeletedTopologyComponents, topologyJobs.SyncTopology,
topologyJobs.TopologyCRDReconcile, canaryJobs.SyncCanaryJobs,
canaryJobs.CleanupDeletedCanaryChecks, dutyQuery.SyncComponentCacheJob,
}
for _, j := range miscJobs {
job := j
job.Context = context.DefaultContext
if err := job.AddToScheduler(FuncScheduler); err != nil {
Expand Down
52 changes: 52 additions & 0 deletions pkg/jobs/topology/topology_jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package topology
import (
"fmt"
"reflect"
"slices"
"sync"

canaryCtx "github.com/flanksource/canary-checker/api/context"
Expand All @@ -17,6 +18,9 @@ import (
"github.com/flanksource/duty/job"
"github.com/flanksource/duty/models"
"github.com/robfig/cron/v3"
"github.com/samber/lo"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
)

var TopologyScheduler = cron.New()
Expand Down Expand Up @@ -157,3 +161,51 @@ var CleanupDeletedTopologyComponents = &job.Job{
return nil
},
}

var TopologyCRDReconcile = &job.Job{
Name: "TopologyCRDReconcile",
Schedule: "@every 8h",
Singleton: true,
JobHistory: true,
Retention: job.RetentionBalanced,
Fn: func(ctx job.JobRuntime) error {
var topologies []models.Topology

if err := ctx.DB().
Select("id", "name", "namespace").
Where("source IN (?, ?)", models.SourceCRD, models.SourceTopology).
Where(duty.LocalFilter).
Find(&topologies).Error; err != nil {
return err
}

client, err := ctx.KubernetesDynamicClient().GetClientByKind("Topology")
if err != nil {
return fmt.Errorf("error creating dynamic client for Topology: %w", err)
}

objs, err := client.List(ctx, metav1.ListOptions{})
if err != nil {
return fmt.Errorf("error listing Topology kind: %w", err)
}
k8sIDs := lo.Map(objs.Items, func(obj unstructured.Unstructured, _ int) string { return string(obj.GetUID()) })
if len(k8sIDs) == 0 {
// a misconfiguration or intermittent error could cause unnecessary deletion of all topologies
ctx.Warnf("Skipping topology CRD cleanup due to zero topologies returned")
return nil
}
for _, t := range topologies {
if !slices.Contains(k8sIDs, t.ID.String()) {
// id mismatch or object not found in k8s, delete current topology
if err := db.DeleteTopology(ctx.DB(), t.ID.String()); err != nil {
ctx.History.AddErrorf("error deleting topology[%s]: %v", t.ID, err)
continue
}
DeleteTopologyJob(t.ID.String())
}

ctx.History.IncrSuccess()
}
return nil
},
}

0 comments on commit cfeed36

Please sign in to comment.