Skip to content

Commit

Permalink
revert(LMES): Remove LMES support and the controller level
Browse files Browse the repository at this point in the history
  • Loading branch information
ruivieira committed Nov 25, 2024
1 parent a3b783e commit 7281636
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 89 deletions.
31 changes: 12 additions & 19 deletions controllers/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ limitations under the License.
package controllers

import (
"errors"
"fmt"
"slices"
"strings"
Expand Down Expand Up @@ -46,29 +45,23 @@ func registerService(name string, setupf ControllerSetupFunc) {
}

func SetupControllers(enabledServices []string, mgr manager.Manager, ns, configmap string, recorder record.EventRecorder) error {
var errs []error
for _, service := range enabledServices {
errs = append(errs, TasServices[service](mgr, ns, configmap, recorder))
if len(enabledServices) == 0 || enabledServices[0] != "TAS" {
return fmt.Errorf("only TAS is supported")
}
return errors.Join(errs...)
if setupFunc, ok := TasServices["TAS"]; ok {
return setupFunc(mgr, ns, configmap, recorder)
}
return fmt.Errorf("TAS service is not registered")
}

func (es *EnabledServices) Set(services string) error {
for _, service := range strings.Split(services, ",") {
if slices.Contains(*es, service) {
return fmt.Errorf("specify the same service twice: %s", service)
}
if _, ok := TasServices[service]; ok {
*es = append(*es, service)
} else {
return fmt.Errorf(
"service %s is not supported. available services: %s",
service,
strings.Join(AllTasServices, ","),
)
}
if services != "TAS" {
return fmt.Errorf("only TAS is supported, but %s was provided", services)
}

if slices.Contains(*es, services) {
return fmt.Errorf("TAS is already enabled")
}
*es = append(*es, services)
return nil
}

Expand Down
74 changes: 4 additions & 70 deletions controllers/lmes/lmevaljob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,78 +171,12 @@ func (q *syncedMap4Reconciler) remove(key string) {
func (r *LMEvalJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log := log.FromContext(ctx)

job := &lmesv1alpha1.LMEvalJob{}
if err := r.Get(ctx, req.NamespacedName, job); err != nil {
log.Info("unable to fetch LMEvalJob. could be from a deletion request")
return ctrl.Result{}, client.IgnoreNotFound(err)
}

if !job.ObjectMeta.DeletionTimestamp.IsZero() {
// Handle deletion here
return r.handleDeletion(ctx, job, log)
}

// Treat this as NewJobState
if job.Status.LastScheduleTime == nil && job.Status.CompleteTime == nil {
job.Status.State = lmesv1alpha1.NewJobState
}
r.Recorder.Eventf(&lmesv1alpha1.LMEvalJob{}, corev1.EventTypeWarning, "NotSupported",
"LMEvalJob CRD is not currently supported")

if JobMgrEnabled && job.Status.State != lmesv1alpha1.CompleteJobState {
//the job requires kueue.x-k8s.io/queue-name label if Job Manager is enabled
if _, ok := job.ObjectMeta.GetLabels()["kueue.x-k8s.io/queue-name"]; !ok {
job.Status.State = lmesv1alpha1.CompleteJobState
job.Status.Reason = lmesv1alpha1.FailedReason
job.Status.Message = "job requires kueue.x-k8s.io/queue-name label"
log.Error(fmt.Errorf("job %s requires kueue.x-k8s.io/queue-name label", job.Name), "LMevalJob requires kueue.x-k8s.io/queue-name label when Job Manager is enabled")
return r.handleComplete(ctx, log, job)
} else if job.Spec.Suspend {
return r.handleSuspend(ctx, log, job)
}
}

// If outputs have been explicitly set
if job.Spec.HasCustomOutput() {
// If managed PVC is set
if job.Spec.Outputs.HasManagedPVC() {
if job.Spec.Outputs.HasExistingPVC() {
log.Info("LMEvalJob has both managed and existing PVCs defined. Existing PVC configuration will be ignored.")
}
err := r.handleManagedPVC(ctx, log, job)
if err != nil {
return ctrl.Result{}, err
}
} else if job.Spec.Outputs.HasExistingPVC() {
err := r.handleExistingPVC(ctx, log, job)
if err != nil {
return ctrl.Result{}, err
}
}
}
log.Info("Continuing after PVC")

// Handle the job based on its state
switch job.Status.State {
case lmesv1alpha1.NewJobState:
// Handle newly created job
return r.handleNewCR(ctx, log, job)
case lmesv1alpha1.ScheduledJobState:
// the job's pod has been created and the driver hasn't updated the state yet
// let's check the pod status and detect pod failure if there is
// TODO: need a timeout/retry mechanism here to transit to other states
return r.checkScheduledPod(ctx, log, job)
case lmesv1alpha1.RunningJobState:
// TODO: need a timeout/retry mechanism here to transit to other states
return r.checkScheduledPod(ctx, log, job)
case lmesv1alpha1.CompleteJobState:
return r.handleComplete(ctx, log, job)
case lmesv1alpha1.CancelledJobState:
return r.handleCancel(ctx, log, job)
case lmesv1alpha1.SuspendedJobState:
if !job.Spec.Suspend {
return r.handleResume(ctx, log, job)
}
}
log.Info("LMEvalJob CRD is not supported. Ignoring reconciliation request.")

// Do nothing and return
return ctrl.Result{}, nil
}

Expand Down

0 comments on commit 7281636

Please sign in to comment.