Skip to content

Commit

Permalink
Adding PDB
Browse files Browse the repository at this point in the history
NOTE: not adding yaml - all the yaml fields are marked as obsolete and we now have a dedicated pipeline for yamls so I presume there is no need to add the field
  • Loading branch information
jbartosik committed Oct 23, 2024
1 parent 4aca93a commit 9a3c9a3
Show file tree
Hide file tree
Showing 7 changed files with 676 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ func NewCollectorInventory(cfg config.Component, store workloadmeta.Component) *
k8sCollectors.NewNodeCollectorVersions(),
k8sCollectors.NewPersistentVolumeClaimCollectorVersions(),
k8sCollectors.NewPersistentVolumeCollectorVersions(),
k8sCollectors.NewPodDisruptionBudgetCollectorVersions(),
k8sCollectors.NewReplicaSetCollectorVersions(),
k8sCollectors.NewRoleBindingCollectorVersions(),
k8sCollectors.NewRoleCollectorVersions(),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2024-present Datadog, Inc.

//go:build kubeapiserver && orchestrator

package k8s

import (
"k8s.io/apimachinery/pkg/labels"
v1policyinformer "k8s.io/client-go/informers/policy/v1"
v1policylister "k8s.io/client-go/listers/policy/v1"
"k8s.io/client-go/tools/cache"

"github.com/DataDog/datadog-agent/pkg/collector/corechecks/cluster/orchestrator/collectors"
"github.com/DataDog/datadog-agent/pkg/collector/corechecks/cluster/orchestrator/processors"
k8sProcessors "github.com/DataDog/datadog-agent/pkg/collector/corechecks/cluster/orchestrator/processors/k8s"
"github.com/DataDog/datadog-agent/pkg/orchestrator"
)

// NewPodDisruptionBudgetCollectorVersions builds the group of collector versions.
func NewPodDisruptionBudgetCollectorVersions() collectors.CollectorVersions {
return collectors.NewCollectorVersions(
NewPodDisruptionBudgetCollectorVersion(),
)
}

// PodDisruptionBudgetCollector is a collector for Kubernetes Pod Disruption Budgets.
type PodDisruptionBudgetCollector struct {
informer v1policyinformer.PodDisruptionBudgetInformer
lister v1policylister.PodDisruptionBudgetLister
metadata *collectors.CollectorMetadata
processor *processors.Processor
}

// NewPodDisruptionBudgetCollectorVersion creates a new collector for the Kubernetes Pod Disruption Budget
// resource.
func NewPodDisruptionBudgetCollectorVersion() *PodDisruptionBudgetCollector {
return &PodDisruptionBudgetCollector{
informer: nil,
lister: nil,
metadata: &collectors.CollectorMetadata{
IsDefaultVersion: true,
IsStable: true,
IsMetadataProducer: true,
IsManifestProducer: true,
SupportsManifestBuffering: true,
Name: "poddisruptionbudgets",
NodeType: orchestrator.K8sPodDisruptionBudget,
Version: "policy/v1",
},
processor: processors.NewProcessor(new(k8sProcessors.PodDisruptionBudgetHandlers)),
}
}

// Informer returns the shared informer.
func (c *PodDisruptionBudgetCollector) Informer() cache.SharedInformer {
return c.informer.Informer()
}

// Init is used to initialize the collector.
func (c *PodDisruptionBudgetCollector) Init(rcfg *collectors.CollectorRunConfig) {
c.informer = rcfg.OrchestratorInformerFactory.InformerFactory.Policy().V1().PodDisruptionBudgets()
c.lister = c.informer.Lister()
}

// Metadata is used to access information about the collector.
func (c *PodDisruptionBudgetCollector) Metadata() *collectors.CollectorMetadata {
return c.metadata
}

// Run triggers the collection process.
func (c *PodDisruptionBudgetCollector) Run(rcfg *collectors.CollectorRunConfig) (*collectors.CollectorRunResult, error) {
list, err := c.lister.List(labels.Everything())
if err != nil {
return nil, collectors.NewListingError(err)
}

ctx := collectors.NewK8sProcessorContext(rcfg, c.metadata)

processResult, processed := c.processor.Process(ctx, list)

if processed == -1 {
return nil, collectors.ErrProcessingPanic
}

result := &collectors.CollectorRunResult{
Result: processResult,
ResourcesListed: len(list),
ResourcesProcessed: processed,
}

return result, nil
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2024-present Datadog, Inc.

//go:build orchestrator

package k8s

import (
model "github.com/DataDog/agent-payload/v5/process"
"github.com/DataDog/datadog-agent/pkg/collector/corechecks/cluster/orchestrator/processors"
"github.com/DataDog/datadog-agent/pkg/collector/corechecks/cluster/orchestrator/processors/common"
k8sTransformers "github.com/DataDog/datadog-agent/pkg/collector/corechecks/cluster/orchestrator/transformers/k8s"
"github.com/DataDog/datadog-agent/pkg/orchestrator/redact"

policyv1 "k8s.io/api/policy/v1"
"k8s.io/apimachinery/pkg/types"
)

// PodDisruptionBudgetHandlers implements the Handlers interface for Kubernetes NetworkPolicy.
type PodDisruptionBudgetHandlers struct {
common.BaseHandlers
}

// AfterMarshalling is a handler called after resource marshalling.
func (h *PodDisruptionBudgetHandlers) AfterMarshalling(_ processors.ProcessorContext, _, _ interface{}, _ []byte) (skip bool) {
return
}

// BuildMessageBody is a handler called to build a message body out of a list of
// extracted resources.
func (h *PodDisruptionBudgetHandlers) BuildMessageBody(ctx processors.ProcessorContext, resourceModels []interface{}, groupSize int) model.MessageBody {
pctx := ctx.(*processors.K8sProcessorContext)
models := make([]*model.PodDisruptionBudget, 0, len(resourceModels))

for _, m := range resourceModels {
models = append(models, m.(*model.PodDisruptionBudget))
}

return &model.CollectorPodDisruptionBudget{
ClusterName: pctx.Cfg.KubeClusterName,
ClusterId: pctx.ClusterID,
GroupId: pctx.MsgGroupID,
GroupSize: int32(groupSize),
PodDisruptionBudgets: models,
Tags: append(pctx.Cfg.ExtraTags, pctx.ApiGroupVersionTag),
}
}

// ExtractResource is a handler called to extract the resource model out of a raw resource.
func (h *PodDisruptionBudgetHandlers) ExtractResource(_ processors.ProcessorContext, resource interface{}) (resourceModel interface{}) {
r := resource.(*policyv1.PodDisruptionBudget)
return k8sTransformers.ExtractPodDisruptionBudget(r)
}

// ResourceList is a handler called to convert a list passed as a generic
// interface to a list of generic interfaces.
func (h *PodDisruptionBudgetHandlers) ResourceList(_ processors.ProcessorContext, list interface{}) (resources []interface{}) {
resourceList := list.([]*policyv1.PodDisruptionBudget)
resources = make([]interface{}, 0, len(resourceList))

for _, resource := range resourceList {
resources = append(resources, resource)
}

return resources
}

// ResourceUID is a handler called to retrieve the resource UID.
func (h *PodDisruptionBudgetHandlers) ResourceUID(_ processors.ProcessorContext, resource interface{}) types.UID {
return resource.(*policyv1.PodDisruptionBudget).UID
}

// ResourceVersion is a handler called to retrieve the resource version.
func (h *PodDisruptionBudgetHandlers) ResourceVersion(_ processors.ProcessorContext, resource, _ interface{}) string {
return resource.(*policyv1.PodDisruptionBudget).ResourceVersion
}

// ScrubBeforeExtraction is a handler called to redact the raw resource before
// it is extracted as an internal resource model.
func (h *PodDisruptionBudgetHandlers) ScrubBeforeExtraction(_ processors.ProcessorContext, resource interface{}) {
r := resource.(*policyv1.PodDisruptionBudget)
redact.RemoveSensitiveAnnotationsAndLabels(r.Annotations, r.Labels)
}

// ScrubBeforeMarshalling is a handler called to redact the raw resource before
// it is marshalled to generate a manifest.
func (h *PodDisruptionBudgetHandlers) ScrubBeforeMarshalling(_ processors.ProcessorContext, _ interface{}) {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2024-present Datadog, Inc.

//go:build orchestrator

package k8s

import (
model "github.com/DataDog/agent-payload/v5/process"
"github.com/DataDog/datadog-agent/pkg/collector/corechecks/cluster/orchestrator/transformers"

policyv1 "k8s.io/api/policy/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
)

// ExtractPodDisruptionBudget returns the protobuf model corresponding to a Kubernetes
func ExtractPodDisruptionBudget(pdb *policyv1.PodDisruptionBudget) *model.PodDisruptionBudget {
if pdb == nil {
return nil
}
result := model.PodDisruptionBudget{
Metadata: extractMetadata(&pdb.ObjectMeta),
Spec: extractPodDisruptionBudgetSpec(&pdb.Spec),
Status: extractPodDisruptionBudgetStatus(&pdb.Status),
}
result.Tags = append(result.Tags, transformers.RetrieveUnifiedServiceTags(pdb.ObjectMeta.Labels)...)
return &result
}

func extractPodDisruptionBudgetSpec(spec *policyv1.PodDisruptionBudgetSpec) *model.PodDisruptionBudgetSpec {
if spec == nil {
return nil
}
result := model.PodDisruptionBudgetSpec{}
result.MinAvailable = extractIntOrString(spec.MinAvailable)
if spec.Selector != nil {
result.Selector = extractLabelSelector(spec.Selector)
}
result.MaxUnavailable = extractIntOrString(spec.MaxUnavailable)
if spec.UnhealthyPodEvictionPolicy != nil {
result.UnhealthyPodEvictionPolicy = string(*spec.UnhealthyPodEvictionPolicy)
}
return &result
}

func extractIntOrString(source *intstr.IntOrString) *model.IntOrString {
if source == nil {
return nil
}
switch source.Type {
case intstr.Int:
return &model.IntOrString{
Type: model.IntOrString_Int,
IntVal: source.IntVal,
}
case intstr.String:
return &model.IntOrString{
Type: model.IntOrString_String,
StrVal: source.StrVal,
}
}
return nil
}

func extractPodDisruptionBudgetStatus(status *policyv1.PodDisruptionBudgetStatus) *model.PodDisruptionBudgetStatus {
if status == nil {
return nil
}
return &model.PodDisruptionBudgetStatus{
DisruptedPods: extractDisruptedPods(status.DisruptedPods),
DisruptionsAllowed: status.DisruptionsAllowed,
CurrentHealthy: status.CurrentHealthy,
DesiredHealthy: status.DesiredHealthy,
ExpectedPods: status.ExpectedPods,
Conditions: extractPodDisruptionBudgetConditions(status.Conditions),
}
}

func extractDisruptedPods(disruptedPodsmap map[string]metav1.Time) map[string]int64 {
result := make(map[string]int64)
for pod, t := range disruptedPodsmap {
result[pod] = t.Time.Unix()
}
return result
}
func extractPodDisruptionBudgetConditions(conditions []metav1.Condition) []*model.Condition {
result := make([]*model.Condition, 0)
for _, condition := range conditions {
result = append(result, &model.Condition{
Type: condition.Type,
Status: string(condition.Status),
LastTransitionTime: condition.LastTransitionTime.Time.Unix(),
Reason: condition.Reason,
Message: condition.Message,
})
}
return result
}
Loading

0 comments on commit 9a3c9a3

Please sign in to comment.