Skip to content

Commit

Permalink
[clusteragent/autoscaling] Check that autoscaling target is not clust…
Browse files Browse the repository at this point in the history
…er agent (#28723)
  • Loading branch information
jennchenn authored Sep 5, 2024
1 parent db89290 commit c4e3a15
Show file tree
Hide file tree
Showing 4 changed files with 262 additions and 27 deletions.
64 changes: 50 additions & 14 deletions pkg/clusteragent/autoscaling/workload/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@ import (
"github.com/DataDog/datadog-agent/pkg/aggregator/sender"
"github.com/DataDog/datadog-agent/pkg/clusteragent/autoscaling"
"github.com/DataDog/datadog-agent/pkg/clusteragent/autoscaling/workload/model"
"github.com/DataDog/datadog-agent/pkg/util/kubernetes"
"github.com/DataDog/datadog-agent/pkg/util/log"

"github.com/DataDog/datadog-agent/pkg/util/kubernetes/apiserver/common"
)

const (
Expand Down Expand Up @@ -272,22 +275,16 @@ func (c *Controller) syncPodAutoscaler(ctx context.Context, key, ns, name string
// Reaching this point, we had an error in processing, clearing up global error
podAutoscalerInternal.SetError(nil)

// Now that everything is synced, we can perform the actual processing
result, err := c.handleScaling(ctx, podAutoscaler, &podAutoscalerInternal)

// Update status based on latest state
statusErr := c.updatePodAutoscalerStatus(ctx, podAutoscalerInternal, podAutoscaler)
if statusErr != nil {
log.Errorf("Failed to update status for PodAutoscaler: %s/%s, err: %v", ns, name, statusErr)

// We want to return the status error if none to count in the requeue retries.
if err == nil {
err = statusErr
}
// Validate autoscaler requirements
validationErr := c.validateAutoscaler(podAutoscaler)
if validationErr != nil {
podAutoscalerInternal.SetError(validationErr)
return autoscaling.NoRequeue, c.updateAutoscalerStatusAndUnlock(ctx, key, ns, name, validationErr, podAutoscalerInternal, podAutoscaler)
}

c.store.UnlockSet(key, podAutoscalerInternal, c.ID)
return result, err
// Now that everything is synced, we can perform the actual processing
result, scalingErr := c.handleScaling(ctx, podAutoscaler, &podAutoscalerInternal)
return result, c.updateAutoscalerStatusAndUnlock(ctx, key, ns, name, scalingErr, podAutoscalerInternal, podAutoscaler)
}

func (c *Controller) handleScaling(ctx context.Context, podAutoscaler *datadoghq.DatadogPodAutoscaler, podAutoscalerInternal *model.PodAutoscalerInternal) (autoscaling.ProcessResult, error) {
Expand Down Expand Up @@ -389,3 +386,42 @@ func (c *Controller) deletePodAutoscaler(ns, name string) error {
}
return nil
}

func (c *Controller) validateAutoscaler(podAutoscaler *datadoghq.DatadogPodAutoscaler) error {
// Check that targetRef is not set to the cluster agent
clusterAgentPodName, err := common.GetSelfPodName()
if err != nil {
return fmt.Errorf("Unable to get the cluster agent pod name: %w", err)
}

var resourceName string
switch owner := podAutoscaler.Spec.TargetRef.Kind; owner {
case "Deployment":
resourceName = kubernetes.ParseDeploymentForPodName(clusterAgentPodName)
case "ReplicaSet":
resourceName = kubernetes.ParseReplicaSetForPodName(clusterAgentPodName)
}

clusterAgentNs := common.GetMyNamespace()

if podAutoscaler.Namespace == clusterAgentNs && podAutoscaler.Spec.TargetRef.Name == resourceName {
return fmt.Errorf("Autoscaling target cannot be set to the cluster agent")
}
return nil
}

func (c *Controller) updateAutoscalerStatusAndUnlock(ctx context.Context, key, ns, name string, err error, podAutoscalerInternal model.PodAutoscalerInternal, podAutoscaler *datadoghq.DatadogPodAutoscaler) error {
// Update status based on latest state
statusErr := c.updatePodAutoscalerStatus(ctx, podAutoscalerInternal, podAutoscaler)
if statusErr != nil {
log.Errorf("Failed to update status for PodAutoscaler: %s/%s, err: %v", ns, name, statusErr)

// We want to return the status error if none to count in the requeue retries.
if err == nil {
err = statusErr
}
}

c.store.UnlockSet(key, podAutoscalerInternal, c.ID)
return err
}
121 changes: 121 additions & 0 deletions pkg/clusteragent/autoscaling/workload/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
package workload

import (
"errors"
"fmt"
"testing"
"time"

Expand All @@ -26,6 +28,7 @@ import (

"github.com/DataDog/datadog-agent/pkg/clusteragent/autoscaling"
"github.com/DataDog/datadog-agent/pkg/clusteragent/autoscaling/workload/model"
"github.com/DataDog/datadog-agent/pkg/util/kubernetes/apiserver/common"
)

type fixture struct {
Expand Down Expand Up @@ -228,3 +231,121 @@ func TestLeaderCreateDeleteRemote(t *testing.T) {
f.RunControllerSync(true, "default/dpa-0")
assert.Len(t, f.store.GetAll(), 0)
}

func TestDatadogPodAutoscalerTargetingClusterAgentErrors(t *testing.T) {
tests := []struct {
name string
targetRef autoscalingv2.CrossVersionObjectReference
}{
{
"target set to cluster agent deployment",
autoscalingv2.CrossVersionObjectReference{
Kind: "Deployment",
Name: "datadog-agent-cluster-agent",
APIVersion: "apps/v1",
},
},
{
"target set to cluster agent replicaset",
autoscalingv2.CrossVersionObjectReference{
Kind: "ReplicaSet",
Name: "datadog-agent-cluster-agent-7dbf798595",
APIVersion: "apps/v1",
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
testTime := time.Now()
f := newFixture(t, testTime)

t.Setenv("DD_POD_NAME", "datadog-agent-cluster-agent-7dbf798595-tp9lg")
currentNs := common.GetMyNamespace()
id := fmt.Sprintf("%s/dpa-dca", currentNs)

dpaSpec := datadoghq.DatadogPodAutoscalerSpec{
TargetRef: tt.targetRef,
// Local owner means .Spec source of truth is K8S
Owner: datadoghq.DatadogPodAutoscalerLocalOwner,
}

dpa, dpaTyped := newFakePodAutoscaler(currentNs, "dpa-dca", 1, dpaSpec, datadoghq.DatadogPodAutoscalerStatus{})
f.InformerObjects = append(f.InformerObjects, dpa)

expectedDPAError := &datadoghq.DatadogPodAutoscaler{
TypeMeta: metav1.TypeMeta{
Kind: "DatadogPodAutoscaler",
APIVersion: "datadoghq.com/v1alpha1",
},
ObjectMeta: metav1.ObjectMeta{
Name: "dpa-dca",
Namespace: currentNs,
Generation: 1,
UID: dpa.GetUID(),
},
Spec: datadoghq.DatadogPodAutoscalerSpec{
TargetRef: autoscalingv2.CrossVersionObjectReference{
Kind: "",
Name: "",
APIVersion: "",
},
Owner: "",
},
Status: datadoghq.DatadogPodAutoscalerStatus{
Conditions: []datadoghq.DatadogPodAutoscalerCondition{
{
Type: datadoghq.DatadogPodAutoscalerErrorCondition,
Status: corev1.ConditionTrue,
LastTransitionTime: metav1.NewTime(testTime),
Reason: "Autoscaling target cannot be set to the cluster agent",
},
{
Type: datadoghq.DatadogPodAutoscalerActiveCondition,
Status: corev1.ConditionTrue,
LastTransitionTime: metav1.NewTime(testTime),
},
{
Type: datadoghq.DatadogPodAutoscalerHorizontalAbleToRecommendCondition,
Status: corev1.ConditionUnknown,
LastTransitionTime: metav1.NewTime(testTime),
},
{
Type: datadoghq.DatadogPodAutoscalerVerticalAbleToRecommendCondition,
Status: corev1.ConditionUnknown,
LastTransitionTime: metav1.NewTime(testTime),
},
{
Type: datadoghq.DatadogPodAutoscalerHorizontalScalingLimitedCondition,
Status: corev1.ConditionFalse,
LastTransitionTime: metav1.NewTime(testTime),
},
{
Type: datadoghq.DatadogPodAutoscalerHorizontalAbleToScaleCondition,
Status: corev1.ConditionUnknown,
LastTransitionTime: metav1.NewTime(testTime),
},
{
Type: datadoghq.DatadogPodAutoscalerVerticalAbleToApply,
Status: corev1.ConditionUnknown,
LastTransitionTime: metav1.NewTime(testTime),
},
},
},
}
expectedUnstructuredError, err := autoscaling.ToUnstructured(expectedDPAError)
assert.NoError(t, err)
f.RunControllerSync(true, id)

f.Objects = append(f.Objects, dpaTyped)
f.Actions = nil

f.ExpectUpdateStatusAction(expectedUnstructuredError)
f.RunControllerSync(true, id)
assert.Len(t, f.store.GetAll(), 1)
pai, found := f.store.Get(id)
assert.Truef(t, found, "Expected to find DatadogPodAutoscaler in store")
assert.Equal(t, errors.New("Autoscaling target cannot be set to the cluster agent"), pai.Error())
})
}
}
48 changes: 35 additions & 13 deletions pkg/util/kubernetes/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,23 @@ const Digits = "1234567890"
// ParseDeploymentForReplicaSet gets the deployment name from a replicaset,
// or returns an empty string if no parent deployment is found.
func ParseDeploymentForReplicaSet(name string) string {
lastDash := strings.LastIndexByte(name, '-')
if lastDash == -1 {
// No dash
return ""
}
suffix := name[lastDash+1:]
if len(suffix) < 3 {
// Suffix is variable length but we cutoff at 3+ characters
return ""
}
return removeKubernetesNameSuffix(name)
}

if !stringInRuneset(suffix, Digits) && !stringInRuneset(suffix, KubeAllowedEncodeStringAlphaNums) {
// Invalid suffix
// ParseDeploymentForPodName gets the deployment name from a pod name,
// or returns an empty string if no parent deployment is found.
func ParseDeploymentForPodName(name string) string {
replicaSet := removeKubernetesNameSuffix(name)
if replicaSet == "" {
return ""
}
return ParseDeploymentForReplicaSet(replicaSet)
}

return name[:lastDash]
// ParseReplicaSetForPodName gets the replica set name from a pod name,
// or returns an empty string if no parent replica set is found.
func ParseReplicaSetForPodName(name string) string {
return removeKubernetesNameSuffix(name)
}

// ParseCronJobForJob gets the cronjob name from a job,
Expand Down Expand Up @@ -79,3 +79,25 @@ func stringInRuneset(name, subset string) bool {
}
return true
}

// removeKubernetesNameSuffix removes the suffix from a kubernetes name
// or returns an empty string if either the suffix or name are invalid.
func removeKubernetesNameSuffix(name string) string {
lastDash := strings.LastIndexByte(name, '-')
if lastDash == -1 {
// No dash
return ""
}
suffix := name[lastDash+1:]
if len(suffix) < 3 {
// Suffix is variable length but we cutoff at 3+ characters
return ""
}

if !stringInRuneset(suffix, Digits) && !stringInRuneset(suffix, KubeAllowedEncodeStringAlphaNums) {
// Invalid suffix
return ""
}

return name[:lastDash]
}
56 changes: 56 additions & 0 deletions pkg/util/kubernetes/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,62 @@ func TestParseDeploymentForReplicaSet(t *testing.T) {
}
}

func TestParseDeploymentForPodName(t *testing.T) {
for in, out := range map[string]string{
// Nominal 1.6 cases
"frontend-2891696001-51234": "frontend",
"front-end-2891696001-72346": "front-end",

// Non-deployment 1.6 cases
"frontend2891696001-31-": "",
"-frontend2891696001-21": "",
"manually-created": "",

// 1.8+ nominal cases
"frontend-56c89cfff7-tsdww": "frontend",
"frontend-56c-p2q": "frontend",
"frontend-56c89cff-qhxl8": "frontend",
"frontend-56c89cfff7c2-g9lmb": "frontend",
"front-end-768dd754b7-ptdcc": "front-end",

// 1.8+ non-deployment cases
"frontend-56c89cff-bx": "", // too short
"frontend-56a89cfff7-a": "", // no vowels allowed
} {
t.Run(fmt.Sprintf("case: %s", in), func(t *testing.T) {
assert.Equal(t, out, ParseDeploymentForPodName(in))
})
}
}

func TestParseReplicaSetForPodName(t *testing.T) {
for in, out := range map[string]string{
// Nominal 1.6 cases
"frontend-2891696001-51234": "frontend-2891696001",
"front-end-2891696001-72346": "front-end-2891696001",

// Non-replica-set 1.6 cases
"frontend2891696001-31-": "",
"-frontend2891696001-21": "",
"manually-created": "",

// 1.8+ nominal cases
"frontend-56c89cfff7-tsdww": "frontend-56c89cfff7",
"frontend-56c-p2q": "frontend-56c",
"frontend-56c89cff-qhxl8": "frontend-56c89cff",
"frontend-56c89cfff7c2-g9lmb": "frontend-56c89cfff7c2",
"front-end-768dd754b7-ptdcc": "front-end-768dd754b7",

// 1.8+ non-replica-set cases
"frontend-56c89cff-bx": "", // too short
"frontend-56a89cfff7-a": "", // no vowels allowed
} {
t.Run(fmt.Sprintf("case: %s", in), func(t *testing.T) {
assert.Equal(t, out, ParseReplicaSetForPodName(in))
})
}
}

func TestParseCronJobForJob(t *testing.T) {
for in, out := range map[string]struct {
string
Expand Down

0 comments on commit c4e3a15

Please sign in to comment.