From 5a506d391194891b8adbb6e057b0817cbe7ab3f0 Mon Sep 17 00:00:00 2001 From: shreddedbacon Date: Mon, 18 Dec 2023 07:47:00 +1100 Subject: [PATCH] feat: support idling messages from core --- api/lagoon/v1beta2/lagoontask_types.go | 2 - cmd/main.go | 13 ++ .../crd/bases/crd.lagoon.sh_lagoontasks.yaml | 3 - controllers/namespace/namespace.go | 113 +++++++++++++++++ controllers/namespace/predicates.go | 38 ++++++ internal/messenger/consumer.go | 32 +++++ internal/messenger/tasks_handler.go | 114 ++++++++++++++++++ 7 files changed, 310 insertions(+), 5 deletions(-) create mode 100644 controllers/namespace/namespace.go create mode 100644 controllers/namespace/predicates.go diff --git a/api/lagoon/v1beta2/lagoontask_types.go b/api/lagoon/v1beta2/lagoontask_types.go index 5c685625..9bfe477d 100644 --- a/api/lagoon/v1beta2/lagoontask_types.go +++ b/api/lagoon/v1beta2/lagoontask_types.go @@ -96,8 +96,6 @@ func (b TaskType) String() string { // LagoonTaskSpec defines the desired state of LagoonTask type LagoonTaskSpec struct { - // INSERT ADDITIONAL SPEC FIELDS - desired state of cluster - // Important: Run "make" to regenerate code after modifying this file Key string `json:"key,omitempty"` Task schema.LagoonTaskInfo `json:"task,omitempty"` Project LagoonTaskProject `json:"project,omitempty"` diff --git a/cmd/main.go b/cmd/main.go index 09831d0a..ff5f250b 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -47,6 +47,7 @@ import ( k8upv1 "github.com/k8up-io/k8up/v2/api/v1" lagoonv1beta1 "github.com/uselagoon/remote-controller/api/lagoon/v1beta1" lagoonv1beta2 "github.com/uselagoon/remote-controller/api/lagoon/v1beta2" + "github.com/uselagoon/remote-controller/controllers/namespace" harborctrl "github.com/uselagoon/remote-controller/internal/controllers/harbor" lagoonv1beta1ctrl "github.com/uselagoon/remote-controller/internal/controllers/v1beta1" lagoonv1beta2ctrl "github.com/uselagoon/remote-controller/internal/controllers/v1beta2" @@ -893,6 +894,18 @@ func main() { setupLog.Error(err, "unable to create controller", "controller", "LagoonTask") os.Exit(1) } + // start the namespace reconciler + if err = (&namespace.NamespaceReconciler{ + Client: mgr.GetClient(), + Log: ctrl.Log.WithName("namespace").WithName("Namespace"), + Scheme: mgr.GetScheme(), + EnableMQ: enableMQ, + Messaging: messaging, + LagoonTargetName: lagoonTargetName, + }).SetupWithManager(mgr); err != nil { + setupLog.Error(err, "unable to create controller", "controller", "Namespace") + os.Exit(1) + } // v1beta2 is the latest version if err = (&lagoonv1beta2ctrl.LagoonBuildReconciler{ diff --git a/config/crd/bases/crd.lagoon.sh_lagoontasks.yaml b/config/crd/bases/crd.lagoon.sh_lagoontasks.yaml index bc08f7a3..ceacd703 100644 --- a/config/crd/bases/crd.lagoon.sh_lagoontasks.yaml +++ b/config/crd/bases/crd.lagoon.sh_lagoontasks.yaml @@ -738,9 +738,6 @@ spec: - project type: object key: - description: |- - INSERT ADDITIONAL SPEC FIELDS - desired state of cluster - Important: Run "make" to regenerate code after modifying this file type: string misc: description: LagoonMiscInfo defines the resource or backup information diff --git a/controllers/namespace/namespace.go b/controllers/namespace/namespace.go new file mode 100644 index 00000000..ed43e6ce --- /dev/null +++ b/controllers/namespace/namespace.go @@ -0,0 +1,113 @@ +/* + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package namespace + +import ( + "context" + "encoding/base64" + "encoding/json" + "fmt" + "strconv" + + "github.com/go-logr/logr" + "github.com/uselagoon/machinery/api/schema" + "github.com/uselagoon/remote-controller/internal/messenger" + "k8s.io/apimachinery/pkg/runtime" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" +) + +// NamespaceReconciler reconciles idling +type NamespaceReconciler struct { + client.Client + Log logr.Logger + Scheme *runtime.Scheme + EnableMQ bool + Messaging *messenger.Messenger + LagoonTargetName string +} + +type Idled struct { + Idled bool `json:"idled"` +} + +func (r *NamespaceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + opLog := r.Log.WithValues("namespace", req.NamespacedName) + + var namespace corev1.Namespace + if err := r.Get(ctx, req.NamespacedName, &namespace); err != nil { + return ctrl.Result{}, ignoreNotFound(err) + } + + // this would be nice to be a lagoon label :) + if val, ok := namespace.ObjectMeta.Labels["idling.amazee.io/idled"]; ok { + idled, _ := strconv.ParseBool(val) + opLog.Info(fmt.Sprintf("environment %s idle state %t", namespace.Name, idled)) + if r.EnableMQ { + var projectName, environmentName string + if p, ok := namespace.ObjectMeta.Labels["lagoon.sh/project"]; ok { + projectName = p + } + if e, ok := namespace.ObjectMeta.Labels["lagoon.sh/environment"]; ok { + environmentName = e + } + idling := Idled{ + Idled: idled, + } + idlingJSON, _ := json.Marshal(idling) + msg := schema.LagoonMessage{ + Type: "idling", + Namespace: namespace.Name, + Meta: &schema.LagoonLogMeta{ + Environment: environmentName, + Project: projectName, + Cluster: r.LagoonTargetName, + AdvancedData: base64.StdEncoding.EncodeToString(idlingJSON), + }, + } + msgBytes, err := json.Marshal(msg) + if err != nil { + opLog.Error(err, "Unable to encode message as JSON") + } + // @TODO: if we can't publish the message because for some reason, log the error and move on + // this may result in the state being out of sync in lagoon but eventually will be consistent + if err := r.Messaging.Publish("lagoon-tasks:controller", msgBytes); err != nil { + return ctrl.Result{}, nil + } + } + return ctrl.Result{}, nil + } + return ctrl.Result{}, nil +} + +// SetupWithManager sets up the watch on the namespace resource with an event filter (see predicates.go) +func (r *NamespaceReconciler) SetupWithManager(mgr ctrl.Manager) error { + return ctrl.NewControllerManagedBy(mgr). + For(&corev1.Namespace{}). + WithEventFilter(NamespacePredicates{}). + Complete(r) +} + +// will ignore not found errors +func ignoreNotFound(err error) error { + if apierrors.IsNotFound(err) { + return nil + } + return err +} diff --git a/controllers/namespace/predicates.go b/controllers/namespace/predicates.go new file mode 100644 index 00000000..b0f3c426 --- /dev/null +++ b/controllers/namespace/predicates.go @@ -0,0 +1,38 @@ +package namespace + +import ( + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/predicate" +) + +// NamespacePredicates defines the funcs for predicates +type NamespacePredicates struct { + predicate.Funcs +} + +// Create is used when a creation event is received by the controller. +func (n NamespacePredicates) Create(e event.CreateEvent) bool { + return false +} + +// Delete is used when a deletion event is received by the controller. +func (n NamespacePredicates) Delete(e event.DeleteEvent) bool { + return false +} + +// Update is used when an update event is received by the controller. +func (n NamespacePredicates) Update(e event.UpdateEvent) bool { + if oldIdled, ok := e.ObjectOld.GetLabels()["idling.amazee.io/idled"]; ok { + if newIdled, ok := e.ObjectNew.GetLabels()["idling.amazee.io/idled"]; ok { + if oldIdled != newIdled { + return true + } + } + } + return false +} + +// Generic is used when any other event is received by the controller. +func (n NamespacePredicates) Generic(e event.GenericEvent) bool { + return false +} diff --git a/internal/messenger/consumer.go b/internal/messenger/consumer.go index 7a3e0b02..204503d8 100644 --- a/internal/messenger/consumer.go +++ b/internal/messenger/consumer.go @@ -486,6 +486,38 @@ func (m *Messenger) Consumer(targetName string) { //error { message.Ack(false) // ack to remove from queue return } + case "deploytarget:environment:idling": + opLog.Info( + fmt.Sprintf( + "Received environment idling request for project %s, environment %s - %s", + jobSpec.Project.Name, + jobSpec.Environment.Name, + namespace, + ), + ) + // idle or unidle an environment, optionally forcible scale it so it can't be unidled by the ingress + err := m.ScaleOrIdleEnvironment(ctx, opLog, namespace, jobSpec) + if err != nil { + //@TODO: send msg back to lagoon and update task to failed? + message.Ack(false) // ack to remove from queue + return + } + case "deploytarget:environment:service": + opLog.Info( + fmt.Sprintf( + "Received environment service request for project %s, environment %s service - %s", + jobSpec.Project.Name, + jobSpec.Environment.Name, + namespace, + ), + ) + // idle an environment, optionally forcible scale it so it can't be unidled by the ingress + err := m.EnvironmentServiceState(ctx, opLog, namespace, jobSpec) + if err != nil { + //@TODO: send msg back to lagoon and update task to failed? + message.Ack(false) // ack to remove from queue + return + } default: // if we get something that we don't know about, spit out the entire message opLog.Info( diff --git a/internal/messenger/tasks_handler.go b/internal/messenger/tasks_handler.go index d4da44f6..9c242d65 100644 --- a/internal/messenger/tasks_handler.go +++ b/internal/messenger/tasks_handler.go @@ -5,10 +5,16 @@ import ( "encoding/base64" "encoding/json" "fmt" + "strconv" + "time" + "github.com/go-logr/logr" lagoonv1beta2 "github.com/uselagoon/remote-controller/api/lagoon/v1beta2" "github.com/uselagoon/remote-controller/internal/helpers" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" ctrl "sigs.k8s.io/controller-runtime" ) @@ -92,3 +98,111 @@ func createAdvancedTask(namespace string, jobSpec *lagoonv1beta2.LagoonTaskSpec, } return nil } + +type Idling struct { + Idle bool `json:"idle"` + ForceScale bool `json:"forceScale"` +} + +type Service struct { + Name string `json:"name"` + State string `json:"state"` +} + +func (m *Messenger) ScaleOrIdleEnvironment(ctx context.Context, opLog logr.Logger, ns string, jobSpec *lagoonv1beta2.LagoonTaskSpec) error { + namespace := &corev1.Namespace{} + err := m.Client.Get(ctx, types.NamespacedName{ + Name: ns, + }, namespace) + if err != nil { + return err + } + idling := Idling{} + if err := json.Unmarshal(jobSpec.Misc.MiscResource, &idling); err != nil { + opLog.Error(err, + "Unable to unmarshal the idling json.", + ) + return err + } + if idling.Idle { + if idling.ForceScale { + // this would be nice to be a lagoon label :) + namespace.ObjectMeta.Labels["idling.amazee.io/force-scaled"] = "true" + } else { + // this would be nice to be a lagoon label :) + namespace.ObjectMeta.Labels["idling.amazee.io/force-idled"] = "true" + } + } else { + // this would be nice to be a lagoon label :) + namespace.ObjectMeta.Labels["idling.amazee.io/unidle"] = "true" + } + if err := m.Client.Update(context.Background(), namespace); err != nil { + opLog.Error(err, + fmt.Sprintf( + "Unable to update namespace %s to set idle state.", + ns, + ), + ) + return err + } + return nil +} + +func (m *Messenger) EnvironmentServiceState(ctx context.Context, opLog logr.Logger, ns string, jobSpec *lagoonv1beta2.LagoonTaskSpec) error { + deployment := &appsv1.Deployment{} + service := Service{} + if err := json.Unmarshal(jobSpec.Misc.MiscResource, &service); err != nil { + opLog.Error(err, + "Unable to unmarshal the service json.", + ) + return err + } + err := m.Client.Get(ctx, types.NamespacedName{ + Name: service.Name, + Namespace: ns, + }, deployment) + if err != nil { + return err + } + update := false + switch service.State { + case "restart": + deployment.ObjectMeta.Annotations["kubectl.kubernetes.io/restartedAt"] = time.Now().Format(time.RFC3339) + update = true + case "stop": + if *deployment.Spec.Replicas > 0 { + // if the service has replicas, then save the replica count and scale it to 0 + deployment.ObjectMeta.Annotations["service.lagoon.sh/replicas"] = strconv.FormatInt(int64(*deployment.Spec.Replicas), 10) + replicas := int32(0) + deployment.Spec.Replicas = &replicas + update = true + } + case "start": + if *deployment.Spec.Replicas == 0 { + // if the service has no replicas, set it back to what the previous replica value was + prevReplicas, err := strconv.Atoi(deployment.ObjectMeta.Annotations["service.lagoon.sh/replicas"]) + if err != nil { + return err + } + replicas := int32(prevReplicas) + deployment.Spec.Replicas = &replicas + delete(deployment.ObjectMeta.Annotations, "service.lagoon.sh/replicas") + update = true + } + default: + // nothing to do + return nil + } + if update { + if err := m.Client.Update(ctx, deployment); err != nil { + opLog.Error(err, + fmt.Sprintf( + "Unable to update deployment %s to change its state.", + ns, + ), + ) + return err + } + } + return nil +}