From 5e048318067876a9a747eccaae33a0f7051562d8 Mon Sep 17 00:00:00 2001 From: Jennifer Chen Date: Wed, 28 Aug 2024 10:14:45 -0400 Subject: [PATCH 1/6] Update current replica count using podwatcher --- .../autoscaling/workload/controller.go | 14 ++++++++++++++ .../autoscaling/workload/controller_horizontal.go | 3 --- 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/pkg/clusteragent/autoscaling/workload/controller.go b/pkg/clusteragent/autoscaling/workload/controller.go index fb874e4bd3483..8ba9f0b80bf2b 100644 --- a/pkg/clusteragent/autoscaling/workload/controller.go +++ b/pkg/clusteragent/autoscaling/workload/controller.go @@ -275,6 +275,20 @@ func (c *Controller) syncPodAutoscaler(ctx context.Context, key, ns, name string // Now that everything is synced, we can perform the actual processing result, err := c.handleScaling(ctx, podAutoscaler, &podAutoscalerInternal) + // Update current replicas + targetGVK, err := podAutoscalerInternal.TargetGVK() + if err != nil { + podAutoscalerInternal.SetError(err) + return autoscaling.NoRequeue, err + } + target := NamespacedPodOwner{ + Namespace: podAutoscalerInternal.Namespace(), + Name: podAutoscalerInternal.Spec().TargetRef.Name, + Kind: targetGVK.Kind, + } + currentReplicas := len(c.podWatcher.GetPodsForOwner(target)) + podAutoscalerInternal.SetCurrentReplicas(int32(currentReplicas)) + // Update status based on latest state statusErr := c.updatePodAutoscalerStatus(ctx, podAutoscalerInternal, podAutoscaler) if statusErr != nil { diff --git a/pkg/clusteragent/autoscaling/workload/controller_horizontal.go b/pkg/clusteragent/autoscaling/workload/controller_horizontal.go index 9c3c043d02c21..3a9c0ac675033 100644 --- a/pkg/clusteragent/autoscaling/workload/controller_horizontal.go +++ b/pkg/clusteragent/autoscaling/workload/controller_horizontal.go @@ -79,9 +79,6 @@ func (hr *horizontalController) sync(ctx context.Context, podAutoscaler *datadog return autoscaling.Requeue, err } - // Update current replicas - autoscalerInternal.SetCurrentReplicas(scale.Status.Replicas) - return hr.performScaling(ctx, podAutoscaler, autoscalerInternal, gr, scale) } From 99abb19c8c7b9fbd7cf8d4f78a308271884b00b3 Mon Sep 17 00:00:00 2001 From: Jennifer Chen Date: Wed, 28 Aug 2024 10:28:17 -0400 Subject: [PATCH 2/6] Remove update to current replicas in horizontal controller tests --- .../workload/controller_horizontal_test.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/pkg/clusteragent/autoscaling/workload/controller_horizontal_test.go b/pkg/clusteragent/autoscaling/workload/controller_horizontal_test.go index 009811fbf4a39..f306f680bbc34 100644 --- a/pkg/clusteragent/autoscaling/workload/controller_horizontal_test.go +++ b/pkg/clusteragent/autoscaling/workload/controller_horizontal_test.go @@ -111,7 +111,6 @@ func (f *horizontalControllerFixture) testScalingDecision(args horizontalScaling f.scaler.AssertNumberOfCalls(f.t, "get", 1) f.scaler.AssertNumberOfCalls(f.t, "update", expectedUpdateCalls) - args.fakePai.CurrentReplicas = pointer.Ptr[int32](args.statusReplicas) if scaleActionExpected && args.scaleError == nil { // Update fakePai with the new expected state action := &datadoghq.DatadogPodAutoscalerHorizontalAction{ @@ -142,8 +141,9 @@ func TestHorizontalControllerSyncPrerequisites(t *testing.T) { autoscalerName := "test" fakePai := &model.FakePodAutoscalerInternal{ - Namespace: autoscalerNamespace, - Name: autoscalerName, + Namespace: autoscalerNamespace, + Name: autoscalerName, + CurrentReplicas: pointer.Ptr[int32](5), } // Test case: no Spec, no action taken @@ -165,7 +165,7 @@ func TestHorizontalControllerSyncPrerequisites(t *testing.T) { model.AssertPodAutoscalersEqual(t, fakePai.Build(), autoscaler) // Test case: Correct Spec and GVK, but no scaling values - // Should only update replica count + // Should do nothing expectedGVK := schema.GroupVersionKind{ Group: "apps", Version: "v1", @@ -304,7 +304,8 @@ func TestHorizontalControllerSyncScaleDecisions(t *testing.T) { Replicas: 5, }, }, - TargetGVK: expectedGVK, + TargetGVK: expectedGVK, + CurrentReplicas: pointer.Ptr[int32](5), } // Step: same number of replicas, no action taken, only updating status From fdad6fbc14ee0237e60a2d02be83d329b95474e2 Mon Sep 17 00:00:00 2001 From: Jennifer Chen Date: Wed, 28 Aug 2024 10:43:29 -0400 Subject: [PATCH 3/6] fixup! Update current replica count using podwatcher --- pkg/clusteragent/autoscaling/workload/controller.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/clusteragent/autoscaling/workload/controller.go b/pkg/clusteragent/autoscaling/workload/controller.go index 8ba9f0b80bf2b..361d58cdd59e3 100644 --- a/pkg/clusteragent/autoscaling/workload/controller.go +++ b/pkg/clusteragent/autoscaling/workload/controller.go @@ -276,10 +276,10 @@ func (c *Controller) syncPodAutoscaler(ctx context.Context, key, ns, name string result, err := c.handleScaling(ctx, podAutoscaler, &podAutoscalerInternal) // Update current replicas - targetGVK, err := podAutoscalerInternal.TargetGVK() - if err != nil { - podAutoscalerInternal.SetError(err) - return autoscaling.NoRequeue, err + targetGVK, targetErr := podAutoscalerInternal.TargetGVK() + if targetErr != nil { + podAutoscalerInternal.SetError(targetErr) + return autoscaling.NoRequeue, targetErr } target := NamespacedPodOwner{ Namespace: podAutoscalerInternal.Namespace(), From ae0a5a44802e4c549b0fe30aab2570e2c840fae0 Mon Sep 17 00:00:00 2001 From: Jennifer Chen Date: Thu, 29 Aug 2024 16:29:20 -0400 Subject: [PATCH 4/6] Pass target to vertical controller to dedupe --- .../autoscaling/workload/controller.go | 30 ++++++++++++------- .../workload/controller_vertical.go | 14 +-------- 2 files changed, 20 insertions(+), 24 deletions(-) diff --git a/pkg/clusteragent/autoscaling/workload/controller.go b/pkg/clusteragent/autoscaling/workload/controller.go index 361d58cdd59e3..fd2e09ae62cd6 100644 --- a/pkg/clusteragent/autoscaling/workload/controller.go +++ b/pkg/clusteragent/autoscaling/workload/controller.go @@ -14,6 +14,7 @@ import ( "k8s.io/apimachinery/pkg/api/errors" apimeta "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/dynamic" "k8s.io/client-go/dynamic/dynamicinformer" scaleclient "k8s.io/client-go/scale" @@ -272,30 +273,37 @@ func (c *Controller) syncPodAutoscaler(ctx context.Context, key, ns, name string // Reaching this point, we had an error in processing, clearing up global error podAutoscalerInternal.SetError(nil) - // Now that everything is synced, we can perform the actual processing - result, err := c.handleScaling(ctx, podAutoscaler, &podAutoscalerInternal) - - // Update current replicas targetGVK, targetErr := podAutoscalerInternal.TargetGVK() if targetErr != nil { + log.Errorf("Failed to get target GVK for PodAutoscaler: %s/%s, err: %v", ns, name, targetErr) podAutoscalerInternal.SetError(targetErr) - return autoscaling.NoRequeue, targetErr } target := NamespacedPodOwner{ Namespace: podAutoscalerInternal.Namespace(), Name: podAutoscalerInternal.Spec().TargetRef.Name, Kind: targetGVK.Kind, } - currentReplicas := len(c.podWatcher.GetPodsForOwner(target)) - podAutoscalerInternal.SetCurrentReplicas(int32(currentReplicas)) + + // Now that everything is synced, we can perform the actual processing + result, err := c.handleScaling(ctx, podAutoscaler, &podAutoscalerInternal, targetGVK, target) + + // Update current replicas + if targetErr == nil { + pods := c.podWatcher.GetPodsForOwner(target) + currentReplicas := len(pods) + podAutoscalerInternal.SetCurrentReplicas(int32(currentReplicas)) + } // Update status based on latest state statusErr := c.updatePodAutoscalerStatus(ctx, podAutoscalerInternal, podAutoscaler) if statusErr != nil { log.Errorf("Failed to update status for PodAutoscaler: %s/%s, err: %v", ns, name, statusErr) + } - // We want to return the status error if none to count in the requeue retries. - if err == nil { + if err == nil { + if targetErr != nil { + err = targetErr + } else if statusErr != nil { err = statusErr } } @@ -304,7 +312,7 @@ func (c *Controller) syncPodAutoscaler(ctx context.Context, key, ns, name string return result, err } -func (c *Controller) handleScaling(ctx context.Context, podAutoscaler *datadoghq.DatadogPodAutoscaler, podAutoscalerInternal *model.PodAutoscalerInternal) (autoscaling.ProcessResult, error) { +func (c *Controller) handleScaling(ctx context.Context, podAutoscaler *datadoghq.DatadogPodAutoscaler, podAutoscalerInternal *model.PodAutoscalerInternal, targetGVK schema.GroupVersionKind, target NamespacedPodOwner) (autoscaling.ProcessResult, error) { // TODO: While horizontal scaling is in progress we should not start vertical scaling // While vertical scaling is in progress we should only allow horizontal upscale horizontalRes, err := c.horizontalController.sync(ctx, podAutoscaler, podAutoscalerInternal) @@ -312,7 +320,7 @@ func (c *Controller) handleScaling(ctx context.Context, podAutoscaler *datadoghq return horizontalRes, err } - verticalRes, err := c.verticalController.sync(ctx, podAutoscaler, podAutoscalerInternal) + verticalRes, err := c.verticalController.sync(ctx, podAutoscaler, podAutoscalerInternal, targetGVK, target) if err != nil { return verticalRes, err } diff --git a/pkg/clusteragent/autoscaling/workload/controller_vertical.go b/pkg/clusteragent/autoscaling/workload/controller_vertical.go index 5ea9d8d4f300a..2a940490d875a 100644 --- a/pkg/clusteragent/autoscaling/workload/controller_vertical.go +++ b/pkg/clusteragent/autoscaling/workload/controller_vertical.go @@ -56,7 +56,7 @@ func newVerticalController(clock clock.Clock, eventRecorder record.EventRecorder return res } -func (u *verticalController) sync(ctx context.Context, podAutoscaler *datadoghq.DatadogPodAutoscaler, autoscalerInternal *model.PodAutoscalerInternal) (autoscaling.ProcessResult, error) { +func (u *verticalController) sync(ctx context.Context, podAutoscaler *datadoghq.DatadogPodAutoscaler, autoscalerInternal *model.PodAutoscalerInternal, targetGVK schema.GroupVersionKind, target NamespacedPodOwner) (autoscaling.ProcessResult, error) { scalingValues := autoscalerInternal.ScalingValues() // Check if the autoscaler has a vertical scaling recommendation @@ -67,18 +67,6 @@ func (u *verticalController) sync(ctx context.Context, podAutoscaler *datadoghq. } recomendationID := scalingValues.Vertical.ResourcesHash - targetGVK, err := autoscalerInternal.TargetGVK() - if err != nil { - autoscalerInternal.SetError(err) - return autoscaling.NoRequeue, err - } - - // Get the pod owner from the workload - target := NamespacedPodOwner{ - Namespace: autoscalerInternal.Namespace(), - Name: autoscalerInternal.Spec().TargetRef.Name, - Kind: targetGVK.Kind, - } // Get the pods for the pod owner pods := u.podWatcher.GetPodsForOwner(target) From 2e012f0af9a81c89ea00f26324a87500f9c0e9a4 Mon Sep 17 00:00:00 2001 From: Jennifer Chen Date: Fri, 30 Aug 2024 12:48:53 -0400 Subject: [PATCH 5/6] Remove error log due to user error with setting DPA target --- pkg/clusteragent/autoscaling/workload/controller.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/clusteragent/autoscaling/workload/controller.go b/pkg/clusteragent/autoscaling/workload/controller.go index fd2e09ae62cd6..c5201d54a09a9 100644 --- a/pkg/clusteragent/autoscaling/workload/controller.go +++ b/pkg/clusteragent/autoscaling/workload/controller.go @@ -275,7 +275,6 @@ func (c *Controller) syncPodAutoscaler(ctx context.Context, key, ns, name string targetGVK, targetErr := podAutoscalerInternal.TargetGVK() if targetErr != nil { - log.Errorf("Failed to get target GVK for PodAutoscaler: %s/%s, err: %v", ns, name, targetErr) podAutoscalerInternal.SetError(targetErr) } target := NamespacedPodOwner{ From 734b934b449b12063311e3e14721534bd92e8e23 Mon Sep 17 00:00:00 2001 From: Jennifer Chen Date: Thu, 5 Sep 2024 12:31:51 -0400 Subject: [PATCH 6/6] fixup! Merge remote-tracking branch 'origin/main' into jenn/CASCL-57_update-current-replicas-with-podwatcher --- .../autoscaling/workload/controller.go | 16 +++------------- 1 file changed, 3 insertions(+), 13 deletions(-) diff --git a/pkg/clusteragent/autoscaling/workload/controller.go b/pkg/clusteragent/autoscaling/workload/controller.go index cbff491d19e64..ca32adf79a103 100644 --- a/pkg/clusteragent/autoscaling/workload/controller.go +++ b/pkg/clusteragent/autoscaling/workload/controller.go @@ -287,7 +287,7 @@ func (c *Controller) syncPodAutoscaler(ctx context.Context, key, ns, name string targetGVK, targetErr := podAutoscalerInternal.TargetGVK() if targetErr != nil { podAutoscalerInternal.SetError(targetErr) - return autoscaling.NoRequeue, c.updateAutoscalerStatusAndUnlock(ctx, key, ns, name, validationErr, podAutoscalerInternal, podAutoscaler) + return autoscaling.NoRequeue, c.updateAutoscalerStatusAndUnlock(ctx, key, ns, name, targetErr, podAutoscalerInternal, podAutoscaler) } target := NamespacedPodOwner{ Namespace: podAutoscalerInternal.Namespace(), @@ -296,7 +296,7 @@ func (c *Controller) syncPodAutoscaler(ctx context.Context, key, ns, name string } // Now that everything is synced, we can perform the actual processing - result, err := c.handleScaling(ctx, podAutoscaler, &podAutoscalerInternal, targetGVK, target) + result, scalingErr := c.handleScaling(ctx, podAutoscaler, &podAutoscalerInternal, targetGVK, target) // Update current replicas pods := c.podWatcher.GetPodsForOwner(target) @@ -304,17 +304,7 @@ func (c *Controller) syncPodAutoscaler(ctx context.Context, key, ns, name string podAutoscalerInternal.SetCurrentReplicas(int32(currentReplicas)) // Update status based on latest state - statusErr := c.updatePodAutoscalerStatus(ctx, podAutoscalerInternal, podAutoscaler) - if statusErr != nil { - log.Errorf("Failed to update status for PodAutoscaler: %s/%s, err: %v", ns, name, statusErr) - } - - // We want to return the status error if none to count in the requeue retries. - if err == nil { - err = statusErr - } - - return result, c.updateAutoscalerStatusAndUnlock(ctx, key, ns, name, err, podAutoscalerInternal, podAutoscaler) + return result, c.updateAutoscalerStatusAndUnlock(ctx, key, ns, name, scalingErr, podAutoscalerInternal, podAutoscaler) } func (c *Controller) handleScaling(ctx context.Context, podAutoscaler *datadoghq.DatadogPodAutoscaler, podAutoscalerInternal *model.PodAutoscalerInternal, targetGVK schema.GroupVersionKind, target NamespacedPodOwner) (autoscaling.ProcessResult, error) {