diff --git a/pkg/cache/application.go b/pkg/cache/application.go index 0e6385c59..37bc3c92f 100644 --- a/pkg/cache/application.go +++ b/pkg/cache/application.go @@ -59,6 +59,7 @@ type Application struct { placeholderTimeoutInSec int64 schedulingStyle string originatingTask *Task // Original Pod which creates the requests + accepted bool } const transitionErr = "no transition" @@ -332,80 +333,6 @@ func (app *Application) TriggerAppSubmission() error { return app.handle(NewSubmitApplicationEvent(app.applicationID)) } -// Schedule is called in every scheduling interval, -// we are not using dispatcher here because we want to -// make state transition in sync mode in order to prevent -// generating too many duplicate events. However, it must -// ensure non of these calls is expensive, usually, they -// do nothing more than just triggering the state transition. -// return true if the app needs scheduling or false if not -func (app *Application) Schedule() bool { - switch app.GetApplicationState() { - case ApplicationStates().New: - ev := NewSubmitApplicationEvent(app.GetApplicationID()) - if err := app.handle(ev); err != nil { - log.Log(log.ShimCacheApplication).Warn("failed to handle SUBMIT app event", - zap.Error(err)) - } - case ApplicationStates().Accepted: - // once the app is accepted by the scheduler core, - // the next step is to send requests for scheduling - // the app state could be transited to Reserving or Running - // depends on if the app has gang members - app.postAppAccepted() - case ApplicationStates().Reserving: - // during the Reserving state, only the placeholders - // can be scheduled - app.scheduleTasks(func(t *Task) bool { - return t.placeholder - }) - if len(app.GetNewTasks()) == 0 { - return false - } - case ApplicationStates().Running: - // during the Running state, only the regular pods - // can be scheduled - app.scheduleTasks(func(t *Task) bool { - return !t.placeholder - }) - if len(app.GetNewTasks()) == 0 { - return false - } - default: - log.Log(log.ShimCacheApplication).Debug("skipping scheduling application", - zap.String("appState", app.GetApplicationState()), - zap.String("appID", app.GetApplicationID()), - zap.String("appState", app.GetApplicationState())) - return false - } - return true -} - -func (app *Application) scheduleTasks(taskScheduleCondition func(t *Task) bool) { - for _, task := range app.GetNewTasks() { - if taskScheduleCondition(task) { - // for each new task, we do a sanity check before moving the state to Pending_Schedule - if err := task.sanityCheckBeforeScheduling(); err == nil { - // note, if we directly trigger submit task event, it may spawn too many duplicate - // events, because a task might be submitted multiple times before its state transits to PENDING. - if handleErr := task.handle( - NewSimpleTaskEvent(task.applicationID, task.taskID, InitTask)); handleErr != nil { - // something goes wrong when transit task to PENDING state, - // this should not happen because we already checked the state - // before calling the transition. Nowhere to go, just log the error. - log.Log(log.ShimCacheApplication).Warn("init task failed", zap.Error(err)) - } - } else { - events.GetRecorder().Eventf(task.GetTaskPod().DeepCopy(), nil, v1.EventTypeWarning, "FailedScheduling", "FailedScheduling", err.Error()) - log.Log(log.ShimCacheApplication).Debug("task is not ready for scheduling", - zap.String("appID", task.applicationID), - zap.String("taskID", task.taskID), - zap.Error(err)) - } - } - } -} - func (app *Application) handleSubmitApplicationEvent() error { log.Log(log.ShimCacheApplication).Info("handle app submission", zap.Stringer("app", app), @@ -469,10 +396,11 @@ func (app *Application) postAppAccepted() { // app could have allocated tasks upon a recovery, and in that case, // the reserving phase has already passed, no need to trigger that again. var ev events.SchedulingEvent + numAllocatedTasks := len(app.getTasks(TaskStates().Allocated)) log.Log(log.ShimCacheApplication).Debug("postAppAccepted on cached app", zap.String("appID", app.applicationID), zap.Int("numTaskGroups", len(app.taskGroups)), - zap.Int("numAllocatedTasks", len(app.GetAllocatedTasks()))) + zap.Int("numAllocatedTasks", numAllocatedTasks)) if app.skipReservationStage() { ev = NewRunApplicationEvent(app.applicationID) log.Log(log.ShimCacheApplication).Info("Skip the reservation stage", @@ -485,6 +413,24 @@ func (app *Application) postAppAccepted() { dispatcher.Dispatch(ev) } +func (app *Application) postAppRunning() { + tasks := make([]*Task, 0, len(app.taskMap)) + for _, task := range app.taskMap { + if !task.IsPlaceholder() { + tasks = append(tasks, task) + } + } + // sort tasks based on submission time & schedule them + sort.Slice(tasks, func(i, j int) bool { + l := tasks[i] + r := tasks[j] + return l.createTime.Before(r.createTime) + }) + for _, task := range tasks { + task.Schedule() + } +} + func (app *Application) onReserving() { // happens after recovery - if placeholders already exist, we need to send // an event to trigger Application state change in the core @@ -659,3 +605,55 @@ func (app *Application) SetPlaceholderTimeout(timeout int64) { defer app.lock.Unlock() app.placeholderTimeoutInSec = timeout } + +func (app *Application) addTaskAndSchedule(task *Task) { + app.lock.Lock() + defer app.lock.Unlock() + if _, ok := app.taskMap[task.taskID]; ok { + // skip adding duplicate task + return + } + app.taskMap[task.taskID] = task + + if app.canScheduleTask(task) { + task.Schedule() + } +} + +func (app *Application) canScheduleTask(task *Task) bool { + // skip - not yet accepted by the core + if !app.accepted { + return false + } + + // can submit if gang scheduling is not used + if len(app.taskGroups) == 0 { + return true + } + + // placeholder, or regular task and we're past reservation + ph := task.IsPlaceholder() + currentState := app.sm.Current() + return ph || (!ph && currentState != ApplicationStates().Reserving) +} + +func (app *Application) GetNewTasksWithFailedAttempt() []*Task { + app.lock.RLock() + defer app.lock.RUnlock() + + taskList := make([]*Task, 0, len(app.taskMap)) + for _, task := range app.taskMap { + if task.GetTaskState() == TaskStates().New && task.IsFailedAttempt() { + taskList = append(taskList, task) + } + } + + // sort the task based on creation time + sort.Slice(taskList, func(i, j int) bool { + l := taskList[i] + r := taskList[j] + return l.createTime.Before(r.createTime) + }) + + return taskList +} diff --git a/pkg/cache/application_state.go b/pkg/cache/application_state.go index cb6293946..e420280cc 100644 --- a/pkg/cache/application_state.go +++ b/pkg/cache/application_state.go @@ -503,10 +503,19 @@ func newAppState() *fsm.FSM { //nolint:funlen zap.String("destination", event.Dst), zap.String("event", event.Event)) }, + states.Accepted: func(_ context.Context, event *fsm.Event) { + app := event.Args[0].(*Application) //nolint:errcheck + app.accepted = true + app.postAppAccepted() + }, states.Reserving: func(_ context.Context, event *fsm.Event) { app := event.Args[0].(*Application) //nolint:errcheck app.onReserving() }, + states.Running: func(ctx context.Context, event *fsm.Event) { + app := event.Args[0].(*Application) //nolint:errcheck + app.postAppRunning() + }, SubmitApplication.String(): func(_ context.Context, event *fsm.Event) { app := event.Args[0].(*Application) //nolint:errcheck event.Err = app.handleSubmitApplicationEvent() diff --git a/pkg/cache/application_test.go b/pkg/cache/application_test.go index e0263c2e9..71e30c1ad 100644 --- a/pkg/cache/application_test.go +++ b/pkg/cache/application_test.go @@ -731,9 +731,6 @@ func TestTryReserve(t *testing.T) { err = app.handle(NewSimpleApplicationEvent(app.GetApplicationID(), AcceptApplication)) assert.NilError(t, err) - // run app schedule - app.Schedule() - // since this app has taskGroups defined, // once the app is accepted, it is expected to see this app goes to Reserving state assertAppState(t, app, ApplicationStates().Reserving, 3*time.Second) @@ -787,15 +784,6 @@ func TestTryReservePostRestart(t *testing.T) { }, }) - // submit the app - err := app.handle(NewSubmitApplicationEvent(app.applicationID)) - assert.NilError(t, err) - assertAppState(t, app, ApplicationStates().Submitted, 3*time.Second) - - // accepted the app - err = app.handle(NewSimpleApplicationEvent(app.GetApplicationID(), AcceptApplication)) - assert.NilError(t, err) - // simulate some tasks are recovered during the restart // create 3 pods, 1 of them is Allocated and the other 2 are New resources := make(map[v1.ResourceName]resource.Quantity) @@ -860,8 +848,11 @@ func TestTryReservePostRestart(t *testing.T) { assert.Equal(t, len(app.getTasks(TaskStates().Allocated)), 1) assert.Equal(t, len(app.getTasks(TaskStates().New)), 2) - // run app schedule - app.Schedule() + // submit app & trigger state transition to Accepted + err := app.TriggerAppSubmission() + assert.NilError(t, err) + err = app.handle(NewSimpleApplicationEvent(app.GetApplicationID(), AcceptApplication)) + assert.NilError(t, err) // since this app has Allocated tasks, the Reserving state will be skipped assertAppState(t, app, ApplicationStates().Running, 3*time.Second) @@ -1160,7 +1151,7 @@ func TestPlaceholderTimeoutEvents(t *testing.T) { app := context.GetApplication("app00001") assert.Assert(t, app != nil) assert.Equal(t, app.GetApplicationID(), "app00001") - assert.Equal(t, app.GetApplicationState(), ApplicationStates().New) + assert.Equal(t, app.GetApplicationState(), ApplicationStates().Submitted) assert.Equal(t, app.GetQueue(), "root.a") assert.Equal(t, len(app.GetNewTasks()), 1) @@ -1284,6 +1275,91 @@ func TestApplication_onReservationStateChange(t *testing.T) { assertAppState(t, app, ApplicationStates().Running, 1*time.Second) } +func TestAddTaskAndSchedule(t *testing.T) { + context := initContextForTest() + dispatcher.RegisterEventHandler("TestAppHandler", dispatcher.EventTypeApp, context.ApplicationEventHandler()) + dispatcher.Start() + defer dispatcher.Stop() + + pod := &v1.Pod{ + TypeMeta: apis.TypeMeta{ + Kind: "Pod", + APIVersion: "v1", + }, + ObjectMeta: apis.ObjectMeta{ + Name: "pod-test-00001", + UID: "UID-00001", + }, + } + + // can't schedule - app is not accepted + app := NewApplication(appID, "root.abc", "testuser", testGroups, map[string]string{}, newMockSchedulerAPI()) + task := NewTask("task01", app, context, pod) + app.addTaskAndSchedule(task) + assert.Equal(t, task.sm.Current(), TaskStates().New) + assert.Assert(t, !task.failedAttempt) + + // can schedule task - no gang scheduling + app = NewApplication(appID, "root.abc", "testuser", testGroups, map[string]string{}, newMockSchedulerAPI()) + app.accepted = true + task = NewTask("task01", app, context, pod) + app.addTaskAndSchedule(task) + + // can schedule task - placeholder + app = NewApplication(appID, "root.abc", "testuser", testGroups, map[string]string{}, newMockSchedulerAPI()) + app.accepted = true + app.taskGroups = []TaskGroup{ + { + Name: "group1", + MinMember: 3, + }, + } + task = NewTaskPlaceholder("task01", app, context, pod) + app.addTaskAndSchedule(task) + assert.Assert(t, !task.IsFailedAttempt()) + + // can schedule task - state is no longer Reserving + app = NewApplication(appID, "root.abc", "testuser", testGroups, map[string]string{}, newMockSchedulerAPI()) + app.accepted = true + app.taskGroups = []TaskGroup{ + { + Name: "group1", + MinMember: 3, + }, + } + task = NewTask("task01", app, context, pod) + app.sm.SetState(ApplicationStates().Running) + app.addTaskAndSchedule(task) + assert.Assert(t, !task.IsFailedAttempt()) +} + +func TestGetNewTasksWithFailedAttempt(t *testing.T) { + context := initContextForTest() + app := NewApplication(appID, "root.abc", "testuser", testGroups, map[string]string{}, newMockSchedulerAPI()) + + task1 := NewTask("task01", app, context, &v1.Pod{}) + task1.setFailedAttempt(true) + task1.createTime = time.UnixMilli(100) + task2 := NewTask("task02", app, context, &v1.Pod{}) + task3 := NewTask("task03", app, context, &v1.Pod{}) + task3.setFailedAttempt(true) + task3.createTime = time.UnixMilli(50) + task4 := NewTask("task04", app, context, &v1.Pod{}) + task4.setFailedAttempt(true) + task4.createTime = time.UnixMilli(10) + + app.addTask(task1) + app.addTask(task2) + app.addTask(task3) + app.addTask(task4) + + tasks := app.GetNewTasksWithFailedAttempt() + assert.Equal(t, 3, len(tasks)) + assert.Equal(t, "task04", tasks[0].taskID) + assert.Equal(t, "task03", tasks[1].taskID) + assert.Equal(t, "task01", tasks[2].taskID) +} + func (ctx *Context) addApplicationToContext(app *Application) { ctx.lock.Lock() defer ctx.lock.Unlock() diff --git a/pkg/cache/context.go b/pkg/cache/context.go index dd8f49542..31e35f6eb 100644 --- a/pkg/cache/context.go +++ b/pkg/cache/context.go @@ -65,6 +65,7 @@ type Context struct { configMaps []*v1.ConfigMap // cached yunikorn configmaps lock *sync.RWMutex // lock txnID atomic.Uint64 // transaction ID counter + newApp bool // whether application has been added since the last time it was checked } // NewContext create a new context for the scheduler using a default (empty) configuration @@ -322,6 +323,10 @@ func (ctx *Context) ensureAppAndTaskCreated(pod *v1.Pod) { app = ctx.addApplication(&AddApplicationRequest{ Metadata: appMeta, }) + err := app.TriggerAppSubmission() + if err != nil { + log.Log(log.ShimContext).Error("BUG: application submission failed") + } } // get task metadata @@ -1017,6 +1022,7 @@ func (ctx *Context) addApplication(request *AddApplicationRequest) *Application // add into cache ctx.applications[app.applicationID] = app + ctx.newApp = true log.Log(log.ShimContext).Info("app added", zap.String("appID", app.applicationID)) @@ -1111,7 +1117,7 @@ func (ctx *Context) addTask(request *AddTaskRequest) *Task { } } task := NewFromTaskMeta(request.Metadata.TaskID, app, ctx, request.Metadata, originator) - app.addTask(task) + app.addTaskAndSchedule(task) log.Log(log.ShimContext).Info("task added", zap.String("appID", app.applicationID), zap.String("taskID", task.taskID), @@ -1709,6 +1715,15 @@ func (ctx *Context) finalizePods(existingPods []*v1.Pod) error { return nil } +func (ctx *Context) HasNewApplication() bool { + ctx.lock.Lock() + defer ctx.lock.Unlock() + v := ctx.newApp + ctx.newApp = false + + return v +} + // for a given pod, return an allocation if found func getExistingAllocation(pod *v1.Pod) *si.Allocation { // skip terminated pods diff --git a/pkg/cache/context_test.go b/pkg/cache/context_test.go index dab395e3d..0cb4ac239 100644 --- a/pkg/cache/context_test.go +++ b/pkg/cache/context_test.go @@ -229,6 +229,8 @@ func TestAddApplications(t *testing.T) { assert.Assert(t, context.applications["app00001"] != nil) assert.Equal(t, context.applications["app00001"].GetApplicationState(), ApplicationStates().New) assert.Equal(t, len(context.applications["app00001"].GetPendingTasks()), 0) + assert.Assert(t, context.HasNewApplication()) + assert.Assert(t, !context.HasNewApplication()) // subsequent invocations should return "false" // add an app but app already exists app := context.AddApplication(&AddApplicationRequest{ @@ -903,9 +905,9 @@ func TestRecoverTask(t *testing.T) { }) assert.Assert(t, task != nil) assert.Equal(t, task.GetTaskID(), taskUID1) + task.Schedule() app.SetState("Running") - app.Schedule() // wait for task to transition to bound state err := utils.WaitForCondition(func() bool { @@ -1039,12 +1041,11 @@ func TestTaskReleaseAfterRecovery(t *testing.T) { Pod: newPodHelper(pod1Name, namespace, pod1UID, fakeNodeName, appID, v1.PodRunning), }, }) - assert.Assert(t, task0 != nil) assert.Equal(t, task0.GetTaskID(), pod1UID) + task0.Schedule() app.SetState("Running") - app.Schedule() // wait for task to transition to bound state err := utils.WaitForCondition(func() bool { @@ -1059,11 +1060,9 @@ func TestTaskReleaseAfterRecovery(t *testing.T) { Pod: newPodHelper(pod2Name, namespace, pod2UID, fakeNodeName, appID, v1.PodRunning), }, }) - assert.Assert(t, task1 != nil) assert.Equal(t, task1.GetTaskID(), pod2UID) - - app.Schedule() + task1.Schedule() // wait for task to transition to bound state err = utils.WaitForCondition(func() bool { diff --git a/pkg/cache/task.go b/pkg/cache/task.go index d485db810..767d1a131 100644 --- a/pkg/cache/task.go +++ b/pkg/cache/task.go @@ -56,6 +56,7 @@ type Task struct { terminationType string originator bool schedulingState TaskSchedulingState + failedAttempt bool sm *fsm.FSM lock *sync.RWMutex } @@ -580,3 +581,41 @@ func (task *Task) UpdatePodCondition(podCondition *v1.PodCondition) (bool, *v1.P return false, pod } + +func (task *Task) Schedule() { + if task.sm.Current() != TaskStates().New { + return + } + + if err := task.sanityCheckBeforeScheduling(); err != nil { + events.GetRecorder().Eventf(task.GetTaskPod().DeepCopy(), nil, v1.EventTypeWarning, "FailedScheduling", "FailedScheduling", err.Error()) + log.Log(log.ShimCacheApplication).Debug("task is not ready for scheduling", + zap.String("appID", task.applicationID), + zap.String("taskID", task.taskID), + zap.Error(err)) + task.setFailedAttempt(true) + return + } + + err := task.handle(NewSimpleTaskEvent(task.applicationID, task.taskID, InitTask)) + if err != nil { + task.setFailedAttempt(true) + log.Log(log.ShimCacheTask).Error("could not submit task for scheduling", + zap.Error(err)) + return + } + + task.setFailedAttempt(false) +} + +func (task *Task) setFailedAttempt(b bool) { + task.lock.Lock() + defer task.lock.Unlock() + task.failedAttempt = b +} + +func (task *Task) IsFailedAttempt() bool { + task.lock.RLock() + defer task.lock.RUnlock() + return task.failedAttempt +} diff --git a/pkg/cache/task_test.go b/pkg/cache/task_test.go index d31339c99..facd88374 100644 --- a/pkg/cache/task_test.go +++ b/pkg/cache/task_test.go @@ -24,7 +24,6 @@ import ( "time" "gotest.tools/v3/assert" - v1 "k8s.io/api/core/v1" schedulingv1 "k8s.io/api/scheduling/v1" "k8s.io/apimachinery/pkg/api/resource" @@ -35,7 +34,9 @@ import ( "github.com/apache/yunikorn-k8shim/pkg/client" "github.com/apache/yunikorn-k8shim/pkg/common/constants" "github.com/apache/yunikorn-k8shim/pkg/common/events" + "github.com/apache/yunikorn-k8shim/pkg/common/test" "github.com/apache/yunikorn-k8shim/pkg/conf" + "github.com/apache/yunikorn-k8shim/pkg/dispatcher" "github.com/apache/yunikorn-scheduler-interface/lib/go/si" ) @@ -740,3 +741,66 @@ func TestUpdatePodCondition(t *testing.T) { assert.Equal(t, v1.PodPending, podCopy.Status.Phase) assert.Equal(t, v1.PodReasonUnschedulable, podCopy.Status.Conditions[0].Reason) } + +func TestSchedule(t *testing.T) { + context := initContextForTest() + dispatcher.RegisterEventHandler("TestAppHandler", dispatcher.EventTypeApp, context.ApplicationEventHandler()) + dispatcher.RegisterEventHandler("TestTaskHandler", dispatcher.EventTypeTask, context.TaskEventHandler()) + dispatcher.Start() + defer dispatcher.Stop() + + pod := &v1.Pod{ + TypeMeta: metav1.TypeMeta{ + Kind: "Pod", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-test-00001", + Namespace: "default", + }, + Spec: v1.PodSpec{ + Volumes: []v1.Volume{ + { + Name: "test", + VolumeSource: v1.VolumeSource{ + PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ + ClaimName: "testPVC", + }, + }, + }, + }, + }, + } + apiProvider := context.apiProvider.(*client.MockedAPIProvider) //nolint:errcheck + pvcLister := apiProvider.GetAPIs().PVCInformer.Lister().(*test.MockedPersistentVolumeClaimLister) //nolint:errcheck + pvc := &v1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: "testPVC", + Namespace: "default", + }, + } + pvcLister.AddPVC(pvc) + + // normal case + app := NewApplication(appID, "root.default", "user", testGroups, map[string]string{}, nil) + task := NewTask("pod-1", app, context, pod) + task.Schedule() + assert.Equal(t, task.sm.Current(), TaskStates().Pending) + assert.Assert(t, !task.IsFailedAttempt()) + + // sanity check failed + pvc.DeletionTimestamp = &metav1.Time{} + app = NewApplication(appID, "root.default", "user", testGroups, map[string]string{}, nil) + task = NewTask("pod-1", app, context, pod) + task.Schedule() + assert.Equal(t, task.sm.Current(), TaskStates().New) + assert.Assert(t, task.IsFailedAttempt()) + + // state not New + pvc.DeletionTimestamp = &metav1.Time{} + app = NewApplication(appID, "root.default", "user", testGroups, map[string]string{}, nil) + task = NewTask("pod-1", app, context, pod) + task.sm.SetState(TaskStates().Scheduling) + task.Schedule() // if sanity check runs (which shouldn't happen), it fails due to DeletionTimeStamp + assert.Assert(t, !task.IsFailedAttempt()) +} diff --git a/pkg/client/apifactory_mock.go b/pkg/client/apifactory_mock.go index 827d2626b..e8d201d4a 100644 --- a/pkg/client/apifactory_mock.go +++ b/pkg/client/apifactory_mock.go @@ -84,8 +84,8 @@ func NewMockedAPIProvider(showError bool) *MockedAPIProvider { PodInformer: test.NewMockedPodInformer(), NodeInformer: test.NewMockedNodeInformer(), ConfigMapInformer: test.NewMockedConfigMapInformer(), + PVCInformer: test.NewMockedPVCInformer(), PVInformer: &MockedPersistentVolumeInformer{}, - PVCInformer: &MockedPersistentVolumeClaimInformer{}, StorageInformer: &MockedStorageClassInformer{}, VolumeBinder: nil, NamespaceInformer: test.NewMockNamespaceInformer(false), @@ -425,17 +425,6 @@ func (m *MockedPersistentVolumeInformer) Lister() corev1.PersistentVolumeLister return nil } -// MockedPersistentVolumeClaimInformer implements PersistentVolumeClaimInformer interface -type MockedPersistentVolumeClaimInformer struct{} - -func (m *MockedPersistentVolumeClaimInformer) Informer() cache.SharedIndexInformer { - return nil -} - -func (m *MockedPersistentVolumeClaimInformer) Lister() corev1.PersistentVolumeClaimLister { - return nil -} - // MockedStorageClassInformer implements StorageClassInformer interface type MockedStorageClassInformer struct{} diff --git a/pkg/common/test/pvcinformer_mock.go b/pkg/common/test/pvcinformer_mock.go new file mode 100644 index 000000000..712f26df3 --- /dev/null +++ b/pkg/common/test/pvcinformer_mock.go @@ -0,0 +1,46 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package test + +import ( + v1 "k8s.io/api/core/v1" + informerv1 "k8s.io/client-go/informers/core/v1" + listersv1 "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/tools/cache" +) + +func NewMockedPVCInformer() informerv1.PersistentVolumeClaimInformer { + return &MockedPersistentVolumeClaimInformer{ + lister: &MockedPersistentVolumeClaimLister{ + pvClaims: make(map[*v1.PersistentVolumeClaim]struct{}), + }, + } +} + +type MockedPersistentVolumeClaimInformer struct { + lister listersv1.PersistentVolumeClaimLister +} + +func (m *MockedPersistentVolumeClaimInformer) Informer() cache.SharedIndexInformer { + return nil +} + +func (m *MockedPersistentVolumeClaimInformer) Lister() listersv1.PersistentVolumeClaimLister { + return m.lister +} diff --git a/pkg/common/test/pvclister_mock.go b/pkg/common/test/pvclister_mock.go new file mode 100644 index 000000000..bb885c12a --- /dev/null +++ b/pkg/common/test/pvclister_mock.go @@ -0,0 +1,103 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package test + +import ( + "errors" + "sync" + + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/labels" + listersv1 "k8s.io/client-go/listers/core/v1" +) + +type MockedPersistentVolumeClaimLister struct { + pvClaims map[*v1.PersistentVolumeClaim]struct{} + getPVCFailure bool + + sync.RWMutex +} + +func (p *MockedPersistentVolumeClaimLister) SetFailureOnGetPVC(b bool) { + p.Lock() + defer p.Unlock() + p.getPVCFailure = b +} + +func (p *MockedPersistentVolumeClaimLister) AddPVC(pvc *v1.PersistentVolumeClaim) { + p.Lock() + defer p.Unlock() + p.pvClaims[pvc] = struct{}{} +} + +func (p *MockedPersistentVolumeClaimLister) DeletePVC(pvc *v1.PersistentVolumeClaim) { + p.Lock() + defer p.Unlock() + delete(p.pvClaims, pvc) +} + +func (p *MockedPersistentVolumeClaimLister) List(selector labels.Selector) (ret []*v1.PersistentVolumeClaim, err error) { + p.RLock() + defer p.RUnlock() + result := make([]*v1.PersistentVolumeClaim, 0) + for pod := range p.pvClaims { + if selector.Matches(labels.Set(pod.Labels)) { + result = append(result, pod) + } + } + return result, nil +} + +func (p *MockedPersistentVolumeClaimLister) PersistentVolumeClaims(namespace string) listersv1.PersistentVolumeClaimNamespaceLister { + p.RLock() + defer p.RUnlock() + filtered := make(map[string]*v1.PersistentVolumeClaim) + for k := range p.pvClaims { + if k.Namespace == namespace { + filtered[k.Name] = k + } + } + return &MockPersistentVolumeClaimNamespaceLister{ + pvClaims: filtered, + getPVCFailure: p.getPVCFailure, + } +} + +type MockPersistentVolumeClaimNamespaceLister struct { + pvClaims map[string]*v1.PersistentVolumeClaim + getPVCFailure bool +} + +func (pns *MockPersistentVolumeClaimNamespaceLister) List(selector labels.Selector) ([]*v1.PersistentVolumeClaim, error) { + result := make([]*v1.PersistentVolumeClaim, 0) + for _, pvc := range pns.pvClaims { + if selector.Matches(labels.Set(pvc.Labels)) { + result = append(result, pvc) + } + } + return result, nil +} + +func (pns *MockPersistentVolumeClaimNamespaceLister) Get(name string) (*v1.PersistentVolumeClaim, error) { + if pns.getPVCFailure { + return nil, errors.New("error while getting PVC") + } + pvc := pns.pvClaims[name] + return pvc, nil +} diff --git a/pkg/shim/scheduler.go b/pkg/shim/scheduler.go index 76a2c717a..ab082e7da 100644 --- a/pkg/shim/scheduler.go +++ b/pkg/shim/scheduler.go @@ -111,10 +111,21 @@ func (ss *KubernetesShim) initSchedulerState() error { } func (ss *KubernetesShim) doScheduling() { - // run main scheduling loop - go wait.Until(ss.schedule, conf.GetSchedulerConf().GetSchedulingInterval(), ss.stopChan) // log a message if no outstanding requests were found for a while go wait.Until(ss.checkOutstandingApps, outstandingAppLogTimeout, ss.stopChan) + + go wait.Until(ss.scheduleFailedAttempts, time.Second, ss.stopChan) +} + +// we scan all apps and try to submit tasks where the submission failed previously +func (ss *KubernetesShim) scheduleFailedAttempts() { + apps := ss.context.GetAllApplications() + for _, app := range apps { + tasks := app.GetNewTasksWithFailedAttempt() + for _, task := range tasks { + task.Schedule() + } + } } func (ss *KubernetesShim) registerShimLayer() error { @@ -154,16 +165,6 @@ func (ss *KubernetesShim) registerShimLayer() error { return nil } -// each schedule iteration, we scan all apps and triggers app state transition -func (ss *KubernetesShim) schedule() { - apps := ss.context.GetAllApplications() - for _, app := range apps { - if app.Schedule() { - ss.setOutstandingAppsFound(true) - } - } -} - func (ss *KubernetesShim) Run() error { // NOTE: the order of starting these services matter, // please look at the comments before modifying the orders @@ -213,21 +214,8 @@ func (ss *KubernetesShim) Stop() { } func (ss *KubernetesShim) checkOutstandingApps() { - if !ss.getOutstandingAppsFound() { + if !ss.context.HasNewApplication() { log.Log(log.ShimScheduler).Info("No outstanding apps found for a while", zap.Duration("timeout", outstandingAppLogTimeout)) return } - ss.setOutstandingAppsFound(false) -} - -func (ss *KubernetesShim) getOutstandingAppsFound() bool { - ss.lock.RLock() - defer ss.lock.RUnlock() - return ss.outstandingAppsFound -} - -func (ss *KubernetesShim) setOutstandingAppsFound(value bool) { - ss.lock.Lock() - defer ss.lock.Unlock() - ss.outstandingAppsFound = value } diff --git a/pkg/shim/scheduler_mock_test.go b/pkg/shim/scheduler_mock_test.go index 54c2f75b4..38f28a1b8 100644 --- a/pkg/shim/scheduler_mock_test.go +++ b/pkg/shim/scheduler_mock_test.go @@ -158,6 +158,10 @@ func (fc *MockScheduler) removeApplication(appId string) error { return fc.context.RemoveApplication(appId) } +func (fc *MockScheduler) appExistsInCore(appId string) bool { + return fc.coreContext.Scheduler.GetClusterContext().GetApplication(appId, partitionName) != nil +} + func (fc *MockScheduler) waitAndAssertTaskState(t *testing.T, appID, taskID, expectedState string) { app := fc.context.GetApplication(appID) assert.Equal(t, app != nil, true) diff --git a/pkg/shim/scheduler_test.go b/pkg/shim/scheduler_test.go index bbb1d9ad2..cc717654e 100644 --- a/pkg/shim/scheduler_test.go +++ b/pkg/shim/scheduler_test.go @@ -19,14 +19,18 @@ package shim import ( + "context" "fmt" "testing" + "time" "gotest.tools/v3/assert" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" apis "k8s.io/apimachinery/pkg/apis/meta/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/wait" "github.com/apache/yunikorn-k8shim/pkg/cache" "github.com/apache/yunikorn-k8shim/pkg/client" @@ -148,7 +152,11 @@ partitions: // remove task first or removeApplication will fail cluster.context.RemoveTask(appID, "task0001") err = cluster.removeApplication(appID) - assert.Assert(t, err == nil) + assert.NilError(t, err) + err = wait.PollUntilContextTimeout(context.Background(), 100*time.Millisecond, time.Second, true, func(_ context.Context) (bool, error) { + return !cluster.appExistsInCore(appID), nil + }) + assert.NilError(t, err, "application still exists in the core") // submit again task1 = createTestPod("root.a", appID, "task0001", taskResource) @@ -282,3 +290,77 @@ func createTestPod(queue string, appID string, taskID string, taskResource *si.R }, } } + +func TestScheduleFailedAttempts(t *testing.T) { + cluster := MockScheduler{} + cluster.init() + err := cluster.start() + assert.NilError(t, err, "cannot start cluster") + defer cluster.stop() + + appID := "app-1" + podUID := "pod-uid-1" + pvcName := "testPVC" + + pod := &v1.Pod{ + TypeMeta: metav1.TypeMeta{ + Kind: "Pod", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-test-00001", + Namespace: "default", + Labels: map[string]string{ + constants.LabelApplicationID: appID, + }, + UID: types.UID(podUID), + }, + Spec: v1.PodSpec{ + SchedulerName: constants.SchedulerName, + Volumes: []v1.Volume{ + { + Name: "test", + VolumeSource: v1.VolumeSource{ + PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ + ClaimName: pvcName, + }, + }, + }, + }, + }, + } + pvcLister := cluster.apiProvider.GetAPIs().PVCInformer.Lister().(*test.MockedPersistentVolumeClaimLister) //nolint:errcheck + pvcLister.SetFailureOnGetPVC(true) + pvc := &v1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: pvcName, + Namespace: "default", + }, + } + pvcLister.AddPVC(pvc) + + // add pod & wait until application object becomes available + cluster.AddPod(pod) + err = wait.PollUntilContextTimeout(context.Background(), 10*time.Millisecond, time.Second, true, func(_ context.Context) (bool, error) { + return cluster.context.GetApplication(appID) != nil, nil + }) + assert.NilError(t, err, "application not found inside the context") + + // verify task state #1 - scheduling attempt is failed due to PVC error + app := cluster.context.GetApplication(appID) + task, err := app.GetTask(podUID) + assert.NilError(t, err, "could not get task") + // wait until the scheduling attempt of the task becomes failed + err = wait.PollUntilContextTimeout(context.Background(), 10*time.Millisecond, time.Second, true, func(_ context.Context) (bool, error) { + return task.IsFailedAttempt(), nil + }) + assert.NilError(t, err, "scheduling of task hasn't failed") + + // remove simulated error + pvcLister.SetFailureOnGetPVC(false) + // verify task state #2 - wait until the "failed" flag is cleared + err = wait.PollUntilContextTimeout(context.Background(), 10*time.Millisecond, time.Second, true, func(_ context.Context) (bool, error) { + return !task.IsFailedAttempt(), nil + }) + assert.NilError(t, err, "scheduling of task hasn't succeeded") +}