From 7d0d111e565258e0a800cb9663739dce7734c834 Mon Sep 17 00:00:00 2001 From: "anton.aleksandrov" Date: Mon, 2 Dec 2024 14:34:45 +0100 Subject: [PATCH 1/4] added a func to read int env variables --- main.go | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/main.go b/main.go index 4827b988..6f6967c2 100644 --- a/main.go +++ b/main.go @@ -456,3 +456,15 @@ func getBoolEnv(envName string) bool { } return boolVar } + +func getIntEnv(envName string) int { + var intVar int + if initStr, ok := os.LookupEnv(envName); ok { + var err error + if intVar, err = strconv.Atoi(initStr); err != nil { + log.Error(err, fmt.Sprintf("unable to parse provided '%s'", envName)) + os.Exit(1) + } + } + return intVar +} From 8b70c02bb7033e2fd973df1405a00247712a05ad Mon Sep 17 00:00:00 2001 From: "anton.aleksandrov" Date: Mon, 2 Dec 2024 14:54:44 +0100 Subject: [PATCH 2/4] reading MAX_CONCURRENT_RECONCILES env var + passing it to reconcilers --- controllers/super_stream_controller.go | 3 +++ controllers/topology_controller.go | 7 ++++++- main.go | 29 +++++++++++++++++++++----- 3 files changed, 33 insertions(+), 6 deletions(-) diff --git a/controllers/super_stream_controller.go b/controllers/super_stream_controller.go index 7bedae4b..6fb1b3bf 100644 --- a/controllers/super_stream_controller.go +++ b/controllers/super_stream_controller.go @@ -12,6 +12,7 @@ package controllers import ( "context" "fmt" + "sigs.k8s.io/controller-runtime/pkg/controller" "strconv" "github.com/go-logr/logr" @@ -39,6 +40,7 @@ type SuperStreamReconciler struct { Recorder record.EventRecorder RabbitmqClientFactory rabbitmqclient.Factory KubernetesClusterDomain string + MaxConcurrentReconciles int } // +kubebuilder:rbac:groups=rabbitmq.com,resources=exchanges,verbs=get;create;update;patch;delete @@ -205,5 +207,6 @@ func (r *SuperStreamReconciler) SetupWithManager(mgr ctrl.Manager) error { Owns(&topology.Exchange{}). Owns(&topology.Binding{}). Owns(&topology.Queue{}). + WithOptions(controller.Options{MaxConcurrentReconciles: r.MaxConcurrentReconciles}). Complete(r) } diff --git a/controllers/topology_controller.go b/controllers/topology_controller.go index 64c56cb7..be523384 100644 --- a/controllers/topology_controller.go +++ b/controllers/topology_controller.go @@ -17,6 +17,7 @@ import ( "reflect" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/reconcile" "strings" "time" @@ -34,6 +35,7 @@ type TopologyReconciler struct { RabbitmqClientFactory rabbitmqclient.Factory KubernetesClusterDomain string ConnectUsingPlainHTTP bool + MaxConcurrentReconciles int } func (r *TopologyReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { @@ -222,9 +224,12 @@ func (r *TopologyReconciler) SetupWithManager(mgr ctrl.Manager) error { if len(r.WatchTypes) == 0 { return ctrl.NewControllerManagedBy(mgr). For(r.Type). + WithOptions(controller.Options{MaxConcurrentReconciles: r.MaxConcurrentReconciles}). Complete(r) } - builder := ctrl.NewControllerManagedBy(mgr).For(r.Type) + builder := ctrl.NewControllerManagedBy(mgr). + For(r.Type). + WithOptions(controller.Options{MaxConcurrentReconciles: r.MaxConcurrentReconciles}) for _, t := range r.WatchTypes { if err := mgr.GetFieldIndexer().IndexField(context.Background(), t, ownerKey, addResourceToIndex); err != nil { return err diff --git a/main.go b/main.go index 6f6967c2..753bef67 100644 --- a/main.go +++ b/main.go @@ -160,6 +160,12 @@ func main() { managerOpts.RetryPeriod = &retryPeriod } + var maxConcurrentReconciles int + if maxConcurrentReconcilesEnvValue := getIntEnv("MAX_CONCURRENT_RECONCILES"); maxConcurrentReconcilesEnvValue > 0 { + maxConcurrentReconciles = maxConcurrentReconcilesEnvValue + log.Info("maxConcurrentReconciles set to", maxConcurrentReconciles) + } + if enableDebugPprof, ok := os.LookupEnv("ENABLE_DEBUG_PPROF"); ok { pprofEnabled, err := strconv.ParseBool(enableDebugPprof) if err == nil && pprofEnabled { @@ -187,6 +193,7 @@ func main() { KubernetesClusterDomain: clusterDomain, ReconcileFunc: &controllers.QueueReconciler{}, ConnectUsingPlainHTTP: usePlainHTTP, + MaxConcurrentReconciles: maxConcurrentReconciles, }).SetupWithManager(mgr); err != nil { log.Error(err, "unable to create controller", "controller", controllers.QueueControllerName) os.Exit(1) @@ -202,6 +209,7 @@ func main() { KubernetesClusterDomain: clusterDomain, ReconcileFunc: &controllers.ExchangeReconciler{}, ConnectUsingPlainHTTP: usePlainHTTP, + MaxConcurrentReconciles: maxConcurrentReconciles, }).SetupWithManager(mgr); err != nil { log.Error(err, "unable to create controller", "controller", controllers.ExchangeControllerName) os.Exit(1) @@ -217,6 +225,7 @@ func main() { KubernetesClusterDomain: clusterDomain, ReconcileFunc: &controllers.BindingReconciler{}, ConnectUsingPlainHTTP: usePlainHTTP, + MaxConcurrentReconciles: maxConcurrentReconciles, }).SetupWithManager(mgr); err != nil { log.Error(err, "unable to create controller", "controller", controllers.BindingControllerName) os.Exit(1) @@ -233,6 +242,7 @@ func main() { WatchTypes: []client.Object{}, ReconcileFunc: &controllers.UserReconciler{Client: mgr.GetClient(), Scheme: mgr.GetScheme()}, ConnectUsingPlainHTTP: usePlainHTTP, + MaxConcurrentReconciles: maxConcurrentReconciles, }).SetupWithManager(mgr); err != nil { log.Error(err, "unable to create controller", "controller", controllers.UserControllerName) os.Exit(1) @@ -248,6 +258,7 @@ func main() { KubernetesClusterDomain: clusterDomain, ReconcileFunc: &controllers.VhostReconciler{Client: mgr.GetClient()}, ConnectUsingPlainHTTP: usePlainHTTP, + MaxConcurrentReconciles: maxConcurrentReconciles, }).SetupWithManager(mgr); err != nil { log.Error(err, "unable to create controller", "controller", controllers.VhostControllerName) os.Exit(1) @@ -263,6 +274,7 @@ func main() { KubernetesClusterDomain: clusterDomain, ReconcileFunc: &controllers.PolicyReconciler{}, ConnectUsingPlainHTTP: usePlainHTTP, + MaxConcurrentReconciles: maxConcurrentReconciles, }).SetupWithManager(mgr); err != nil { log.Error(err, "unable to create controller", "controller", controllers.PolicyControllerName) os.Exit(1) @@ -278,6 +290,7 @@ func main() { KubernetesClusterDomain: clusterDomain, ReconcileFunc: &controllers.OperatorPolicyReconciler{}, ConnectUsingPlainHTTP: usePlainHTTP, + MaxConcurrentReconciles: maxConcurrentReconciles, }).SetupWithManager(mgr); err != nil { log.Error(err, "unable to create controller", "controller", controllers.OperatorPolicyControllerName) os.Exit(1) @@ -293,6 +306,7 @@ func main() { KubernetesClusterDomain: clusterDomain, ReconcileFunc: &controllers.PermissionReconciler{Client: mgr.GetClient(), Scheme: mgr.GetScheme()}, ConnectUsingPlainHTTP: usePlainHTTP, + MaxConcurrentReconciles: maxConcurrentReconciles, }).SetupWithManager(mgr); err != nil { log.Error(err, "unable to create controller", "controller", controllers.PermissionControllerName) os.Exit(1) @@ -308,6 +322,7 @@ func main() { KubernetesClusterDomain: clusterDomain, ReconcileFunc: &controllers.SchemaReplicationReconciler{Client: mgr.GetClient()}, ConnectUsingPlainHTTP: usePlainHTTP, + MaxConcurrentReconciles: maxConcurrentReconciles, }).SetupWithManager(mgr); err != nil { log.Error(err, "unable to create controller", "controller", controllers.SchemaReplicationControllerName) os.Exit(1) @@ -323,6 +338,7 @@ func main() { KubernetesClusterDomain: clusterDomain, ReconcileFunc: &controllers.FederationReconciler{Client: mgr.GetClient()}, ConnectUsingPlainHTTP: usePlainHTTP, + MaxConcurrentReconciles: maxConcurrentReconciles, }).SetupWithManager(mgr); err != nil { log.Error(err, "unable to create controller", "controller", controllers.FederationControllerName) os.Exit(1) @@ -338,6 +354,7 @@ func main() { KubernetesClusterDomain: clusterDomain, ReconcileFunc: &controllers.ShovelReconciler{Client: mgr.GetClient()}, ConnectUsingPlainHTTP: usePlainHTTP, + MaxConcurrentReconciles: maxConcurrentReconciles, }).SetupWithManager(mgr); err != nil { log.Error(err, "unable to create controller", "controller", controllers.ShovelControllerName) os.Exit(1) @@ -353,17 +370,19 @@ func main() { KubernetesClusterDomain: clusterDomain, ReconcileFunc: &controllers.TopicPermissionReconciler{Client: mgr.GetClient(), Scheme: mgr.GetScheme()}, ConnectUsingPlainHTTP: usePlainHTTP, + MaxConcurrentReconciles: maxConcurrentReconciles, }).SetupWithManager(mgr); err != nil { log.Error(err, "unable to create controller", "controller", controllers.TopicPermissionControllerName) os.Exit(1) } if err = (&controllers.SuperStreamReconciler{ - Client: mgr.GetClient(), - Log: ctrl.Log.WithName(controllers.SuperStreamControllerName), - Scheme: mgr.GetScheme(), - Recorder: mgr.GetEventRecorderFor(controllers.SuperStreamControllerName), - RabbitmqClientFactory: rabbitmqclient.RabbitholeClientFactory, + Client: mgr.GetClient(), + Log: ctrl.Log.WithName(controllers.SuperStreamControllerName), + Scheme: mgr.GetScheme(), + Recorder: mgr.GetEventRecorderFor(controllers.SuperStreamControllerName), + RabbitmqClientFactory: rabbitmqclient.RabbitholeClientFactory, + MaxConcurrentReconciles: maxConcurrentReconciles, }).SetupWithManager(mgr); err != nil { log.Error(err, "unable to create controller", "controller", controllers.SuperStreamControllerName) os.Exit(1) From 4643dc7c32cd040c18bace856c69b8a7435a7bac Mon Sep 17 00:00:00 2001 From: "anton.aleksandrov" Date: Mon, 2 Dec 2024 16:32:27 +0100 Subject: [PATCH 3/4] fix log message when setting maxConcurrentReconciles --- main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/main.go b/main.go index 753bef67..10417de5 100644 --- a/main.go +++ b/main.go @@ -163,7 +163,7 @@ func main() { var maxConcurrentReconciles int if maxConcurrentReconcilesEnvValue := getIntEnv("MAX_CONCURRENT_RECONCILES"); maxConcurrentReconcilesEnvValue > 0 { maxConcurrentReconciles = maxConcurrentReconcilesEnvValue - log.Info("maxConcurrentReconciles set to", maxConcurrentReconciles) + log.Info(fmt.Sprintf("maxConcurrentReconciles set to %d", maxConcurrentReconciles)) } if enableDebugPprof, ok := os.LookupEnv("ENABLE_DEBUG_PPROF"); ok { From a2df60e1fbd869815de759d36ee94b858cda7121 Mon Sep 17 00:00:00 2001 From: "anton.aleksandrov" Date: Mon, 2 Dec 2024 16:46:57 +0100 Subject: [PATCH 4/4] using const name instead of string --- controllers/common.go | 1 + main.go | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/controllers/common.go b/controllers/common.go index 106c9897..79822ed8 100644 --- a/controllers/common.go +++ b/controllers/common.go @@ -34,4 +34,5 @@ const ( EnableWebhooksEnvVar = "ENABLE_WEBHOOKS" ControllerSyncPeriodEnvVar = "SYNC_PERIOD" ConnectUsingPlainHTTPEnvVar = "CONNECT_USING_PLAIN_HTTP" + MaxConcurrentReconciles = "MAX_CONCURRENT_RECONCILES" ) diff --git a/main.go b/main.go index 10417de5..955660dc 100644 --- a/main.go +++ b/main.go @@ -161,7 +161,7 @@ func main() { } var maxConcurrentReconciles int - if maxConcurrentReconcilesEnvValue := getIntEnv("MAX_CONCURRENT_RECONCILES"); maxConcurrentReconcilesEnvValue > 0 { + if maxConcurrentReconcilesEnvValue := getIntEnv(controllers.MaxConcurrentReconciles); maxConcurrentReconcilesEnvValue > 0 { maxConcurrentReconciles = maxConcurrentReconcilesEnvValue log.Info(fmt.Sprintf("maxConcurrentReconciles set to %d", maxConcurrentReconciles)) }