Skip to content

Commit

Permalink
Merge pull request #278 from uselagoon/task-qos
Browse files Browse the repository at this point in the history
feat: task queuing support
  • Loading branch information
shreddedbacon authored Feb 6, 2025
2 parents 0d80ebc + c9d9195 commit 347cd93
Show file tree
Hide file tree
Showing 30 changed files with 1,360 additions and 73 deletions.
10 changes: 7 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -368,10 +368,10 @@ kind/clean:

# Utilize Kind or modify the e2e tests to load the image locally, enabling compatibility with other vendors.
.PHONY: test-e2e # Run the e2e tests against a Kind k8s instance that is spun up inside github action.
test-e2e:
test-e2e: build-task-image
export HARBOR_VERSION=$(HARBOR_VERSION) && \
export OVERRIDE_BUILD_DEPLOY_DIND_IMAGE=$(OVERRIDE_BUILD_DEPLOY_DIND_IMAGE) && \
go test ./test/e2e/ -v -ginkgo.v
go test ./test/e2e/ -v -ginkgo.v -timeout 20m

.PHONY: github/test-e2e
github/test-e2e: local-dev/tools install-lagoon-remote test-e2e
Expand Down Expand Up @@ -401,4 +401,8 @@ echo "Downloading $${package}" ;\
GOBIN=$(LOCALBIN) go install $${package} ;\
mv "$$(echo "$(1)" | sed "s/-$(3)$$//")" $(1) ;\
}
endef
endef

.PHONY: build-task-image
build-task-image:
docker build . -f test-resources/Dockerfile.task -t example.com/test-task-image:v0.0.1
51 changes: 49 additions & 2 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

"github.com/cheshir/go-mq/v2"
str2duration "github.com/xhit/go-str2duration/v2"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
Expand Down Expand Up @@ -169,6 +170,13 @@ func main() {
var qosMaxBuilds int
var qosDefaultValue int

var lffTaskQoSEnabled bool
var qosMaxTasks int
var qosMaxNamespaceTasks int

var taskImagePullPolicy string
var buildImagePullPolicy string

var lffRouterURL bool

var enableDeprecatedAPIs bool
Expand Down Expand Up @@ -363,11 +371,24 @@ func main() {
flag.IntVar(&timeoutForLongRunningBuildPods, "timeout-longrunning-build-pod-cleanup", 6, "How many hours a build pod should run before forcefully closed.")
flag.IntVar(&timeoutForLongRunningTaskPods, "timeout-longrunning-task-pod-cleanup", 6, "How many hours a task pod should run before forcefully closed.")

// QoS configuration
// Build QoS configuration
flag.BoolVar(&lffQoSEnabled, "enable-qos", false, "Flag to enable this controller with QoS for builds.")
flag.IntVar(&qosMaxBuilds, "qos-max-builds", 20, "The number of builds that can run at any one time.")
flag.IntVar(&qosMaxBuilds, "qos-max-builds", 20, "The total number of builds that can run at any one time.")
flag.IntVar(&qosDefaultValue, "qos-default", 5, "The default qos value to apply if one is not provided.")

// Task QoS configuration
flag.BoolVar(&lffTaskQoSEnabled, "enable-task-qos", false, "Flag to enable this controller with QoS for tasks.")
flag.IntVar(&qosMaxTasks, "qos-max-tasks", 200, "The total number of tasks that can run at any one time.")
flag.IntVar(&qosMaxNamespaceTasks, "qos-max-namespace-tasks", 20, "The total number of tasks that can run at any one time.")

// flags to change the image pull policy used for tasks and builds
// defaults to Always, can change to another option as required. tests use IfNotPresent
// these flags are used for stability in testing, in actual production use you should never change these.
flag.StringVar(&taskImagePullPolicy, "task-image-pull-policy", "Always",
"The image pull policy to use for tasks. Changing this can have a negative impact and result in tasks failing.")
flag.StringVar(&buildImagePullPolicy, "build-image-pull-policy", "Always",
"The image pull policy to use for builds. Changing this can have a negative impact and result in builds failing.")

// If installing this controller from scratch, deprecated APIs should not be configured
flag.BoolVar(&enableDeprecatedAPIs, "enable-deprecated-apis", false, "Flag to have this controller enable support for deprecated APIs.")

Expand Down Expand Up @@ -698,6 +719,26 @@ func main() {
DefaultValue: qosDefaultValue,
}

taskQoSConfigv1beta2 := lagoonv1beta2ctrl.TaskQoS{
MaxTasks: qosMaxTasks,
MaxNamespaceTasks: qosMaxNamespaceTasks,
}

tipp := corev1.PullAlways
switch taskImagePullPolicy {
case "IfNotPresent":
tipp = corev1.PullIfNotPresent
case "Never":
tipp = corev1.PullNever
}
bipp := corev1.PullAlways
switch buildImagePullPolicy {
case "IfNotPresent":
bipp = corev1.PullIfNotPresent
case "Never":
bipp = corev1.PullNever
}

resourceCleanup := pruner.New(mgr.GetClient(),
buildsToKeep,
buildPodsToKeep,
Expand Down Expand Up @@ -834,6 +875,7 @@ func main() {
NoProxy: noProxy,
},
UnauthenticatedRegistry: unauthenticatedRegistry,
ImagePullPolicy: bipp,
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "LagoonBuild")
os.Exit(1)
Expand All @@ -860,6 +902,8 @@ func main() {
Client: mgr.GetClient(),
Log: ctrl.Log.WithName("v1beta2").WithName("LagoonTask"),
Scheme: mgr.GetScheme(),
EnableMQ: enableMQ,
Messaging: messaging,
ControllerNamespace: controllerNamespace,
NamespacePrefix: namespacePrefix,
RandomNamespacePrefix: randomPrefix,
Expand All @@ -877,6 +921,9 @@ func main() {
HTTPSProxy: httpsProxy,
NoProxy: noProxy,
},
LFFTaskQoSEnabled: lffTaskQoSEnabled,
TaskQoS: taskQoSConfigv1beta2,
ImagePullPolicy: tipp,
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "LagoonTask")
os.Exit(1)
Expand Down
5 changes: 5 additions & 0 deletions config/default/manager_auth_proxy_patch.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ spec:
- "--enable-deprecated-apis"
- "--lagoon-feature-flag-support-k8upv2"
- "--skip-tls-verify"
- "--enable-task-qos"
- "--qos-max-tasks=3"
- "--qos-max-namespace-tasks=3"
- "--task-image-pull-policy=IfNotPresent"
- "--build-image-pull-policy=IfNotPresent"
ports:
- containerPort: 8443
name: https
1 change: 1 addition & 0 deletions internal/controllers/v1beta2/build_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ type LagoonBuildReconciler struct {
LagoonAPIConfiguration helpers.LagoonAPIConfiguration
ProxyConfig ProxyConfig
UnauthenticatedRegistry string
ImagePullPolicy corev1.PullPolicy
}

// BackupConfig holds all the backup configuration settings
Expand Down
2 changes: 1 addition & 1 deletion internal/controllers/v1beta2/build_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -831,7 +831,7 @@ func (r *LagoonBuildReconciler) processBuild(ctx context.Context, opLog logr.Log
{
Name: "lagoon-build",
Image: buildImage,
ImagePullPolicy: "Always",
ImagePullPolicy: r.ImagePullPolicy,
Env: podEnvs,
VolumeMounts: volumeMounts,
},
Expand Down
38 changes: 32 additions & 6 deletions internal/controllers/v1beta2/podmonitor_taskhandlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func (r *LagoonMonitorReconciler) taskLogsToLagoonLogs(opLog logr.Logger,
Environment: lagoonTask.Spec.Environment.Name,
JobName: lagoonTask.ObjectMeta.Name,
JobStatus: condition,
RemoteID: string(jobPod.ObjectMeta.UID),
RemoteID: string(lagoonTask.ObjectMeta.UID),
Key: lagoonTask.Spec.Key,
Cluster: r.LagoonTargetName,
},
Expand All @@ -152,6 +152,15 @@ Logs on pod %s, assigned to cluster %s
// really matter if we don't smootly transition in what we send back to lagoon
return err
}
if r.EnableDebug {
opLog.Info(
fmt.Sprintf(
"Published event %s for %s to lagoon-logs exchange",
fmt.Sprintf("task-logs:job-kubernetes:%s", jobPod.ObjectMeta.Name),
jobPod.ObjectMeta.Name,
),
)
}
// if we are able to publish the message, then we need to remove any pending messages from the resource
// and make sure we don't try and publish again
}
Expand Down Expand Up @@ -186,7 +195,7 @@ func (r *LagoonMonitorReconciler) updateLagoonTask(opLog logr.Logger,
ProjectID: lagoonTask.Spec.Project.ID,
JobName: lagoonTask.ObjectMeta.Name,
JobStatus: condition,
RemoteID: string(jobPod.ObjectMeta.UID),
RemoteID: string(lagoonTask.ObjectMeta.UID),
Key: lagoonTask.Spec.Key,
Cluster: r.LagoonTargetName,
},
Expand Down Expand Up @@ -217,6 +226,14 @@ func (r *LagoonMonitorReconciler) updateLagoonTask(opLog logr.Logger,
// really matter if we don't smootly transition in what we send back to lagoon
return err
}
if r.EnableDebug {
opLog.Info(
fmt.Sprintf(
"Published task update message for %s to lagoon-tasks:controller queue",
jobPod.ObjectMeta.Name,
),
)
}
// if we are able to publish the message, then we need to remove any pending messages from the resource
// and make sure we don't try and publish again
}
Expand All @@ -226,7 +243,6 @@ func (r *LagoonMonitorReconciler) updateLagoonTask(opLog logr.Logger,
// taskStatusLogsToLagoonLogs sends the logs to lagoon-logs message queue, used for general messaging
func (r *LagoonMonitorReconciler) taskStatusLogsToLagoonLogs(opLog logr.Logger,
lagoonTask *lagooncrd.LagoonTask,
jobPod *corev1.Pod,
condition string,
) error {
if r.EnableMQ && lagoonTask != nil {
Expand All @@ -242,7 +258,7 @@ func (r *LagoonMonitorReconciler) taskStatusLogsToLagoonLogs(opLog logr.Logger,
ProjectID: lagoonTask.Spec.Project.ID,
JobName: lagoonTask.ObjectMeta.Name,
JobStatus: condition,
RemoteID: string(jobPod.ObjectMeta.UID),
RemoteID: string(lagoonTask.ObjectMeta.UID),
Key: lagoonTask.Spec.Key,
Cluster: r.LagoonTargetName,
},
Expand All @@ -263,6 +279,15 @@ func (r *LagoonMonitorReconciler) taskStatusLogsToLagoonLogs(opLog logr.Logger,
// really matter if we don't smootly transition in what we send back to lagoon
return err
}
if r.EnableDebug {
opLog.Info(
fmt.Sprintf(
"Published event %s for %s to lagoon-logs exchange",
fmt.Sprintf("task:job-kubernetes:%s", condition),
lagoonTask.ObjectMeta.Name,
),
)
}
// if we are able to publish the message, then we need to remove any pending messages from the resource
// and make sure we don't try and publish again
}
Expand Down Expand Up @@ -332,7 +357,8 @@ Task %s
mergeMap := map[string]interface{}{
"metadata": map[string]interface{}{
"labels": map[string]interface{}{
"lagoon.sh/taskStatus": taskCondition.String(),
"lagoon.sh/taskStatus": taskCondition.String(),
"lagoon.sh/taskStarted": "true",
},
},
}
Expand All @@ -346,7 +372,7 @@ Task %s

// send any messages to lagoon message queues
// update the deployment with the status
if err = r.taskStatusLogsToLagoonLogs(opLog, &lagoonTask, &jobPod, taskCondition.ToLower()); err != nil {
if err = r.taskStatusLogsToLagoonLogs(opLog, &lagoonTask, taskCondition.ToLower()); err != nil {
opLog.Error(err, "unable to publish task status logs")
}
if err = r.updateLagoonTask(opLog, &lagoonTask, &jobPod, taskCondition.ToLower()); err != nil {
Expand Down
49 changes: 40 additions & 9 deletions internal/controllers/v1beta2/task_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (

lagooncrd "github.com/uselagoon/remote-controller/api/lagoon/v1beta2"
"github.com/uselagoon/remote-controller/internal/helpers"
"github.com/uselagoon/remote-controller/internal/messenger"
"github.com/uselagoon/remote-controller/internal/metrics"
)

Expand All @@ -42,13 +43,18 @@ type LagoonTaskReconciler struct {
client.Client
Log logr.Logger
Scheme *runtime.Scheme
EnableMQ bool
Messaging *messenger.Messenger
ControllerNamespace string
NamespacePrefix string
RandomNamespacePrefix bool
LagoonAPIConfiguration helpers.LagoonAPIConfiguration
EnableDebug bool
LagoonTargetName string
ProxyConfig ProxyConfig
LFFTaskQoSEnabled bool
TaskQoS TaskQoS
ImagePullPolicy corev1.PullPolicy
}

var (
Expand All @@ -70,15 +76,40 @@ func (r *LagoonTaskReconciler) Reconcile(ctx context.Context, req ctrl.Request)

// examine DeletionTimestamp to determine if object is under deletion
if lagoonTask.ObjectMeta.DeletionTimestamp.IsZero() {
// check if the task that has been recieved is a standard or advanced task
if lagoonTask.ObjectMeta.Labels["lagoon.sh/taskStatus"] == lagooncrd.TaskStatusPending.String() &&
lagoonTask.ObjectMeta.Labels["lagoon.sh/taskType"] == lagooncrd.TaskTypeStandard.String() {
return ctrl.Result{}, r.createStandardTask(ctx, &lagoonTask, opLog)
}
if lagoonTask.ObjectMeta.Labels["lagoon.sh/taskStatus"] == lagooncrd.TaskStatusPending.String() &&
lagoonTask.ObjectMeta.Labels["lagoon.sh/taskType"] == lagooncrd.TaskTypeAdvanced.String() {
return ctrl.Result{}, r.createAdvancedTask(ctx, &lagoonTask, opLog)
if r.LFFTaskQoSEnabled {
// handle QoS tasks here
// if we do have a `lagoon.sh/taskStatus` set as running, then process it
runningNSTasks := &lagooncrd.LagoonTaskList{}
listOption := (&client.ListOptions{}).ApplyOptions([]client.ListOption{
client.InNamespace(req.Namespace),
client.MatchingLabels(map[string]string{
"lagoon.sh/taskStatus": lagooncrd.TaskStatusRunning.String(),
"lagoon.sh/controller": r.ControllerNamespace,
}),
})
// // list any tasks that are running
if err := r.List(ctx, runningNSTasks, listOption); err != nil {
return ctrl.Result{}, fmt.Errorf("unable to list tasks in the namespace, there may be none or something went wrong: %v", err)
}
for _, runningTask := range runningNSTasks.Items {
// if the running task is the one from this request then process it
if lagoonTask.ObjectMeta.Name == runningTask.ObjectMeta.Name {
// actually process the task here
if _, ok := lagoonTask.ObjectMeta.Labels["lagoon.sh/taskStarted"]; !ok {
if lagoonTask.ObjectMeta.Labels["lagoon.sh/taskType"] == lagooncrd.TaskTypeStandard.String() {
return ctrl.Result{}, r.createStandardTask(ctx, &lagoonTask, opLog)
}
if lagoonTask.ObjectMeta.Labels["lagoon.sh/taskType"] == lagooncrd.TaskTypeAdvanced.String() {
return ctrl.Result{}, r.createAdvancedTask(ctx, &lagoonTask, opLog)
}
}
} // end check if running task is current LagoonTask
} // end loop for running tasks
// // once running tasks are processed, run the qos handler
return r.qosTaskProcessor(ctx, opLog, lagoonTask)
}
// if qos is not enabled, just process it as a standard task
return r.standardTaskProcessor(ctx, opLog, lagoonTask)
} else {
// The object is being deleted
if helpers.ContainsString(lagoonTask.ObjectMeta.Finalizers, taskFinalizer) {
Expand Down Expand Up @@ -567,7 +598,7 @@ func (r *LagoonTaskReconciler) createAdvancedTask(ctx context.Context, lagoonTas
{
Name: "lagoon-task",
Image: lagoonTask.Spec.AdvancedTask.RunnerImage,
ImagePullPolicy: "Always",
ImagePullPolicy: r.ImagePullPolicy,
EnvFrom: []corev1.EnvFromSource{
{
ConfigMapRef: &corev1.ConfigMapEnvSource{
Expand Down
Loading

0 comments on commit 347cd93

Please sign in to comment.