From 7769486c4c607be6de3454dc65df830a6e4ee335 Mon Sep 17 00:00:00 2001 From: pmahindrakar-oss Date: Tue, 21 Nov 2023 16:45:38 -0800 Subject: [PATCH] Fix the loop variable scheduler issue Signed-off-by: pmahindrakar-oss --- flyteadmin/run.sh | 2 + flyteadmin/scheduler/core/gocron_scheduler.go | 1 + .../scheduler/core/gocron_scheduler_test.go | 96 +++++++++++++++++++ 3 files changed, 99 insertions(+) create mode 100644 flyteadmin/run.sh diff --git a/flyteadmin/run.sh b/flyteadmin/run.sh new file mode 100644 index 00000000000..7208998d04d --- /dev/null +++ b/flyteadmin/run.sh @@ -0,0 +1,2 @@ +dlv --listen=:2345 --headless=true --api-version=2 --accept-multiclient exec bin/flyteadmin serve -- --config flyteadmin_config.yaml +#--server.kube-config ~/.kube/config diff --git a/flyteadmin/scheduler/core/gocron_scheduler.go b/flyteadmin/scheduler/core/gocron_scheduler.go index f569b7c335e..a6286f9bbfa 100644 --- a/flyteadmin/scheduler/core/gocron_scheduler.go +++ b/flyteadmin/scheduler/core/gocron_scheduler.go @@ -54,6 +54,7 @@ func (g *GoCronScheduler) GetTimedFuncWithSchedule() TimedFuncWithSchedule { func (g *GoCronScheduler) BootStrapSchedulesFromSnapShot(ctx context.Context, schedules []models.SchedulableEntity, snapshot snapshoter.Snapshot) { for _, s := range schedules { + s := s if *s.Active { funcRef := g.GetTimedFuncWithSchedule() nameOfSchedule := identifier.GetScheduleName(ctx, s) diff --git a/flyteadmin/scheduler/core/gocron_scheduler_test.go b/flyteadmin/scheduler/core/gocron_scheduler_test.go index d19436ddd54..7ec9009424d 100644 --- a/flyteadmin/scheduler/core/gocron_scheduler_test.go +++ b/flyteadmin/scheduler/core/gocron_scheduler_test.go @@ -296,3 +296,99 @@ func TestCatchUpAllSchedule(t *testing.T) { catchupSuccess := g.CatchupAll(ctx, toTime) assert.True(t, catchupSuccess) } + +func TestGoCronScheduler_BootStrapSchedulesFromSnapShot(t *testing.T) { + g := setupWithSchedules(t, "testing", []models.SchedulableEntity{}, true) + True := true + False := false + scheduleActive1 := models.SchedulableEntity{ + BaseModel: adminModels.BaseModel{ + UpdatedAt: time.Date(1000, time.October, 19, 10, 0, 0, 0, time.UTC), + }, + SchedulableEntityKey: models.SchedulableEntityKey{ + Project: "project", + Domain: "domain", + Name: "schedule_active_1", + Version: "version1", + }, + CronExpression: "0 19 * * *", + Active: &True, + } + scheduleActive2 := models.SchedulableEntity{ + BaseModel: adminModels.BaseModel{ + UpdatedAt: time.Date(2000, time.November, 19, 10, 0, 0, 0, time.UTC), + }, + SchedulableEntityKey: models.SchedulableEntityKey{ + Project: "project", + Domain: "domain", + Name: "schedule_active_2", + Version: "version1", + }, + CronExpression: "0 19 * * *", + Active: &True, + } + scheduleInactive := models.SchedulableEntity{ + BaseModel: adminModels.BaseModel{ + UpdatedAt: time.Date(3000, time.December, 19, 10, 0, 0, 0, time.UTC), + }, + SchedulableEntityKey: models.SchedulableEntityKey{ + Project: "project", + Domain: "domain", + Name: "cron3", + Version: "version1", + }, + CronExpression: "0 19 * * *", + Active: &False, + } + + schedule1SnapshotTime := time.Date(5000, time.December, 19, 10, 0, 0, 0, time.UTC) + schedule2SnapshotTime := time.Date(6000, time.December, 19, 10, 0, 0, 0, time.UTC) + tests := []struct { + name string + schedules []models.SchedulableEntity + snapshoter snapshoter.Snapshot + expectedCatchUpTimes map[string]*time.Time + }{ + { + name: "two active", + schedules: []models.SchedulableEntity{scheduleActive1, scheduleActive2}, + snapshoter: &snapshoter.SnapshotV1{}, + expectedCatchUpTimes: map[string]*time.Time{"11407394263542327059": &scheduleActive1.UpdatedAt, "1420107156943834850": &scheduleActive2.UpdatedAt}, + }, + { + name: "two active one inactive", + schedules: []models.SchedulableEntity{scheduleActive1, scheduleActive2, scheduleInactive}, + snapshoter: &snapshoter.SnapshotV1{}, + expectedCatchUpTimes: map[string]*time.Time{"11407394263542327059": &scheduleActive1.UpdatedAt, "1420107156943834850": &scheduleActive2.UpdatedAt}, + }, + { + name: "two active one inactive with snapshot populated", + schedules: []models.SchedulableEntity{scheduleActive1, scheduleActive2, scheduleInactive}, + snapshoter: &snapshoter.SnapshotV1{ + LastTimes: map[string]*time.Time{ + "11407394263542327059": &schedule1SnapshotTime, + "1420107156943834850": &schedule2SnapshotTime, + }, + }, + expectedCatchUpTimes: map[string]*time.Time{"11407394263542327059": &schedule1SnapshotTime, "1420107156943834850": &schedule2SnapshotTime}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + g.BootStrapSchedulesFromSnapShot(context.Background(), tt.schedules, tt.snapshoter) + g.jobStore.Range(func(key, value interface{}) bool { + jobId := key.(string) + job := value.(*GoCronJob) + if !*job.schedule.Active { + return true + } + assert.Equal(t, job.catchupFromTime, tt.expectedCatchUpTimes[jobId]) + return true + }) + for _, schedule := range tt.schedules { + g.DeScheduleJob(context.TODO(), schedule) + } + }) + } +}