Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[clusteragent/autoscaling] Use PodWatcher to update current replicas in status #28857

Merged
32 changes: 27 additions & 5 deletions pkg/clusteragent/autoscaling/workload/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/dynamic/dynamicinformer"
scaleclient "k8s.io/client-go/scale"
Expand Down Expand Up @@ -272,16 +273,37 @@ func (c *Controller) syncPodAutoscaler(ctx context.Context, key, ns, name string
// Reaching this point, we had an error in processing, clearing up global error
podAutoscalerInternal.SetError(nil)

targetGVK, targetErr := podAutoscalerInternal.TargetGVK()
if targetErr != nil {
log.Errorf("Failed to get target GVK for PodAutoscaler: %s/%s, err: %v", ns, name, targetErr)
Copy link
Contributor

@vboulineau vboulineau Aug 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Functional error, will be reflected in status, we should not increase number of errors logs in DCA for functional error (usually mistake in name)

podAutoscalerInternal.SetError(targetErr)
}
target := NamespacedPodOwner{
Namespace: podAutoscalerInternal.Namespace(),
Name: podAutoscalerInternal.Spec().TargetRef.Name,
Kind: targetGVK.Kind,
}

// Now that everything is synced, we can perform the actual processing
result, err := c.handleScaling(ctx, podAutoscaler, &podAutoscalerInternal)
result, err := c.handleScaling(ctx, podAutoscaler, &podAutoscalerInternal, targetGVK, target)

// Update current replicas
if targetErr == nil {
pods := c.podWatcher.GetPodsForOwner(target)
currentReplicas := len(pods)
podAutoscalerInternal.SetCurrentReplicas(int32(currentReplicas))
}

// Update status based on latest state
statusErr := c.updatePodAutoscalerStatus(ctx, podAutoscalerInternal, podAutoscaler)
if statusErr != nil {
log.Errorf("Failed to update status for PodAutoscaler: %s/%s, err: %v", ns, name, statusErr)
}

// We want to return the status error if none to count in the requeue retries.
if err == nil {
if err == nil {
if targetErr != nil {
err = targetErr
} else if statusErr != nil {
err = statusErr
}
}
Expand All @@ -290,15 +312,15 @@ func (c *Controller) syncPodAutoscaler(ctx context.Context, key, ns, name string
return result, err
}

func (c *Controller) handleScaling(ctx context.Context, podAutoscaler *datadoghq.DatadogPodAutoscaler, podAutoscalerInternal *model.PodAutoscalerInternal) (autoscaling.ProcessResult, error) {
func (c *Controller) handleScaling(ctx context.Context, podAutoscaler *datadoghq.DatadogPodAutoscaler, podAutoscalerInternal *model.PodAutoscalerInternal, targetGVK schema.GroupVersionKind, target NamespacedPodOwner) (autoscaling.ProcessResult, error) {
// TODO: While horizontal scaling is in progress we should not start vertical scaling
// While vertical scaling is in progress we should only allow horizontal upscale
horizontalRes, err := c.horizontalController.sync(ctx, podAutoscaler, podAutoscalerInternal)
if err != nil {
return horizontalRes, err
}

verticalRes, err := c.verticalController.sync(ctx, podAutoscaler, podAutoscalerInternal)
verticalRes, err := c.verticalController.sync(ctx, podAutoscaler, podAutoscalerInternal, targetGVK, target)
if err != nil {
return verticalRes, err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,6 @@ func (hr *horizontalController) sync(ctx context.Context, podAutoscaler *datadog
return autoscaling.Requeue, err
}

// Update current replicas
autoscalerInternal.SetCurrentReplicas(scale.Status.Replicas)

return hr.performScaling(ctx, podAutoscaler, autoscalerInternal, gr, scale)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,6 @@ func (f *horizontalControllerFixture) testScalingDecision(args horizontalScaling
f.scaler.AssertNumberOfCalls(f.t, "get", 1)
f.scaler.AssertNumberOfCalls(f.t, "update", expectedUpdateCalls)

args.fakePai.CurrentReplicas = pointer.Ptr[int32](args.statusReplicas)
if scaleActionExpected && args.scaleError == nil {
// Update fakePai with the new expected state
action := &datadoghq.DatadogPodAutoscalerHorizontalAction{
Expand Down Expand Up @@ -142,8 +141,9 @@ func TestHorizontalControllerSyncPrerequisites(t *testing.T) {
autoscalerName := "test"

fakePai := &model.FakePodAutoscalerInternal{
Namespace: autoscalerNamespace,
Name: autoscalerName,
Namespace: autoscalerNamespace,
Name: autoscalerName,
CurrentReplicas: pointer.Ptr[int32](5),
}

// Test case: no Spec, no action taken
Expand All @@ -165,7 +165,7 @@ func TestHorizontalControllerSyncPrerequisites(t *testing.T) {
model.AssertPodAutoscalersEqual(t, fakePai.Build(), autoscaler)

// Test case: Correct Spec and GVK, but no scaling values
// Should only update replica count
// Should do nothing
expectedGVK := schema.GroupVersionKind{
Group: "apps",
Version: "v1",
Expand Down Expand Up @@ -304,7 +304,8 @@ func TestHorizontalControllerSyncScaleDecisions(t *testing.T) {
Replicas: 5,
},
},
TargetGVK: expectedGVK,
TargetGVK: expectedGVK,
CurrentReplicas: pointer.Ptr[int32](5),
}

// Step: same number of replicas, no action taken, only updating status
Expand Down
14 changes: 1 addition & 13 deletions pkg/clusteragent/autoscaling/workload/controller_vertical.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func newVerticalController(clock clock.Clock, eventRecorder record.EventRecorder
return res
}

func (u *verticalController) sync(ctx context.Context, podAutoscaler *datadoghq.DatadogPodAutoscaler, autoscalerInternal *model.PodAutoscalerInternal) (autoscaling.ProcessResult, error) {
func (u *verticalController) sync(ctx context.Context, podAutoscaler *datadoghq.DatadogPodAutoscaler, autoscalerInternal *model.PodAutoscalerInternal, targetGVK schema.GroupVersionKind, target NamespacedPodOwner) (autoscaling.ProcessResult, error) {
scalingValues := autoscalerInternal.ScalingValues()

// Check if the autoscaler has a vertical scaling recommendation
Expand All @@ -67,18 +67,6 @@ func (u *verticalController) sync(ctx context.Context, podAutoscaler *datadoghq.
}

recomendationID := scalingValues.Vertical.ResourcesHash
targetGVK, err := autoscalerInternal.TargetGVK()
if err != nil {
autoscalerInternal.SetError(err)
return autoscaling.NoRequeue, err
}

// Get the pod owner from the workload
target := NamespacedPodOwner{
Namespace: autoscalerInternal.Namespace(),
Name: autoscalerInternal.Spec().TargetRef.Name,
Kind: targetGVK.Kind,
}

// Get the pods for the pod owner
pods := u.podWatcher.GetPodsForOwner(target)
Expand Down
Loading