Skip to content

Commit

Permalink
Fix the loop variable scheduler issue
Browse files Browse the repository at this point in the history
Signed-off-by: pmahindrakar-oss <[email protected]>
  • Loading branch information
pmahindrakar-oss committed Nov 22, 2023
1 parent 568e686 commit 7769486
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 0 deletions.
2 changes: 2 additions & 0 deletions flyteadmin/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
dlv --listen=:2345 --headless=true --api-version=2 --accept-multiclient exec bin/flyteadmin serve -- --config flyteadmin_config.yaml
#--server.kube-config ~/.kube/config
1 change: 1 addition & 0 deletions flyteadmin/scheduler/core/gocron_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ func (g *GoCronScheduler) GetTimedFuncWithSchedule() TimedFuncWithSchedule {
func (g *GoCronScheduler) BootStrapSchedulesFromSnapShot(ctx context.Context, schedules []models.SchedulableEntity,
snapshot snapshoter.Snapshot) {
for _, s := range schedules {
s := s
if *s.Active {
funcRef := g.GetTimedFuncWithSchedule()
nameOfSchedule := identifier.GetScheduleName(ctx, s)
Expand Down
96 changes: 96 additions & 0 deletions flyteadmin/scheduler/core/gocron_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,3 +296,99 @@ func TestCatchUpAllSchedule(t *testing.T) {
catchupSuccess := g.CatchupAll(ctx, toTime)
assert.True(t, catchupSuccess)
}

func TestGoCronScheduler_BootStrapSchedulesFromSnapShot(t *testing.T) {
g := setupWithSchedules(t, "testing", []models.SchedulableEntity{}, true)
True := true
False := false
scheduleActive1 := models.SchedulableEntity{
BaseModel: adminModels.BaseModel{
UpdatedAt: time.Date(1000, time.October, 19, 10, 0, 0, 0, time.UTC),
},
SchedulableEntityKey: models.SchedulableEntityKey{
Project: "project",
Domain: "domain",
Name: "schedule_active_1",
Version: "version1",
},
CronExpression: "0 19 * * *",
Active: &True,
}
scheduleActive2 := models.SchedulableEntity{
BaseModel: adminModels.BaseModel{
UpdatedAt: time.Date(2000, time.November, 19, 10, 0, 0, 0, time.UTC),
},
SchedulableEntityKey: models.SchedulableEntityKey{
Project: "project",
Domain: "domain",
Name: "schedule_active_2",
Version: "version1",
},
CronExpression: "0 19 * * *",
Active: &True,
}
scheduleInactive := models.SchedulableEntity{
BaseModel: adminModels.BaseModel{
UpdatedAt: time.Date(3000, time.December, 19, 10, 0, 0, 0, time.UTC),
},
SchedulableEntityKey: models.SchedulableEntityKey{
Project: "project",
Domain: "domain",
Name: "cron3",
Version: "version1",
},
CronExpression: "0 19 * * *",
Active: &False,
}

schedule1SnapshotTime := time.Date(5000, time.December, 19, 10, 0, 0, 0, time.UTC)
schedule2SnapshotTime := time.Date(6000, time.December, 19, 10, 0, 0, 0, time.UTC)
tests := []struct {
name string
schedules []models.SchedulableEntity
snapshoter snapshoter.Snapshot
expectedCatchUpTimes map[string]*time.Time
}{
{
name: "two active",
schedules: []models.SchedulableEntity{scheduleActive1, scheduleActive2},
snapshoter: &snapshoter.SnapshotV1{},
expectedCatchUpTimes: map[string]*time.Time{"11407394263542327059": &scheduleActive1.UpdatedAt, "1420107156943834850": &scheduleActive2.UpdatedAt},
},
{
name: "two active one inactive",
schedules: []models.SchedulableEntity{scheduleActive1, scheduleActive2, scheduleInactive},
snapshoter: &snapshoter.SnapshotV1{},
expectedCatchUpTimes: map[string]*time.Time{"11407394263542327059": &scheduleActive1.UpdatedAt, "1420107156943834850": &scheduleActive2.UpdatedAt},
},
{
name: "two active one inactive with snapshot populated",
schedules: []models.SchedulableEntity{scheduleActive1, scheduleActive2, scheduleInactive},
snapshoter: &snapshoter.SnapshotV1{
LastTimes: map[string]*time.Time{
"11407394263542327059": &schedule1SnapshotTime,
"1420107156943834850": &schedule2SnapshotTime,
},
},
expectedCatchUpTimes: map[string]*time.Time{"11407394263542327059": &schedule1SnapshotTime, "1420107156943834850": &schedule2SnapshotTime},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
g.BootStrapSchedulesFromSnapShot(context.Background(), tt.schedules, tt.snapshoter)
g.jobStore.Range(func(key, value interface{}) bool {
jobId := key.(string)
job := value.(*GoCronJob)
if !*job.schedule.Active {
return true
}
assert.Equal(t, job.catchupFromTime, tt.expectedCatchUpTimes[jobId])
return true
})
for _, schedule := range tt.schedules {
g.DeScheduleJob(context.TODO(), schedule)
}
})
}
}

0 comments on commit 7769486

Please sign in to comment.