Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds ability to configure frequency for pod termination via annotation #353

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
206 changes: 175 additions & 31 deletions chaoskube/chaoskube.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"math/rand"
"regexp"
"time"

Expand Down Expand Up @@ -32,6 +33,8 @@ import (
type Chaoskube struct {
// a kubernetes client object
Client kubernetes.Interface
// the interval Chaoskube is configured to run at
Interval time.Duration
// a label selector which restricts the pods to choose from
Labels labels.Selector
// an annotation selector which restricts the pods to choose from
Expand All @@ -56,6 +59,11 @@ type Chaoskube struct {
Timezone *time.Location
// minimum age of pods to consider
MinimumAge time.Duration
// the annotation prefix (eg. "chaos.alpha.kubernetes.io") to use when
// looking for configuration overrides in pod annotations
ConfigAnnotationPrefix string
// default frequency for pods lacking annotation
DefaultFrequency string
// an instance of logrus.StdLogger to write log messages to
Logger log.FieldLogger
// a terminator that terminates victim pods
Expand All @@ -68,7 +76,7 @@ type Chaoskube struct {
EventRecorder record.EventRecorder
// a function to retrieve the current time
Now func() time.Time

// the maximum number of pods to terminate per interval
MaxKill int
// chaos events notifier
Notifier notifier.Notifier
Expand All @@ -85,6 +93,8 @@ var (
msgTimeOfDayExcluded = "time of day excluded"
// msgDayOfYearExcluded is the log message when termination is suspended due to the day of year filter
msgDayOfYearExcluded = "day of year excluded"
// msgFailedToParseAnnotation is the log message when a filter fails to parse a pod annotation
msgFailedToParseAnnotation = "failed to parse annotation, '%v', excluding from candidates"
)

// New returns a new instance of Chaoskube. It expects:
Expand All @@ -95,32 +105,35 @@ var (
// * a logger implementing logrus.FieldLogger to send log output to
// * what specific terminator to use to imbue chaos on victim pods
// * whether to enable/disable dry-run mode
func New(client kubernetes.Interface, labels, annotations, kinds, namespaces, namespaceLabels labels.Selector, includedPodNames, excludedPodNames *regexp.Regexp, excludedWeekdays []time.Weekday, excludedTimesOfDay []util.TimePeriod, excludedDaysOfYear []time.Time, timezone *time.Location, minimumAge time.Duration, logger log.FieldLogger, dryRun bool, terminator terminator.Terminator, maxKill int, notifier notifier.Notifier) *Chaoskube {
func New(client kubernetes.Interface, interval time.Duration, labels, annotations, kinds, namespaces, namespaceLabels labels.Selector, includedPodNames, excludedPodNames *regexp.Regexp, excludedWeekdays []time.Weekday, excludedTimesOfDay []util.TimePeriod, excludedDaysOfYear []time.Time, timezone *time.Location, minimumAge time.Duration, configAnnotationPrefix, defaultFrequency string, logger log.FieldLogger, dryRun bool, terminator terminator.Terminator, maxKill int, notifier notifier.Notifier) *Chaoskube {
broadcaster := record.NewBroadcaster()
broadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: client.CoreV1().Events(v1.NamespaceAll)})
recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "chaoskube"})

return &Chaoskube{
Client: client,
Labels: labels,
Annotations: annotations,
Kinds: kinds,
Namespaces: namespaces,
NamespaceLabels: namespaceLabels,
IncludedPodNames: includedPodNames,
ExcludedPodNames: excludedPodNames,
ExcludedWeekdays: excludedWeekdays,
ExcludedTimesOfDay: excludedTimesOfDay,
ExcludedDaysOfYear: excludedDaysOfYear,
Timezone: timezone,
MinimumAge: minimumAge,
Logger: logger,
DryRun: dryRun,
Terminator: terminator,
EventRecorder: recorder,
Now: time.Now,
MaxKill: maxKill,
Notifier: notifier,
Client: client,
Interval: interval,
Labels: labels,
Annotations: annotations,
Kinds: kinds,
Namespaces: namespaces,
NamespaceLabels: namespaceLabels,
IncludedPodNames: includedPodNames,
ExcludedPodNames: excludedPodNames,
ExcludedWeekdays: excludedWeekdays,
ExcludedTimesOfDay: excludedTimesOfDay,
ExcludedDaysOfYear: excludedDaysOfYear,
Timezone: timezone,
MinimumAge: minimumAge,
ConfigAnnotationPrefix: configAnnotationPrefix,
DefaultFrequency: defaultFrequency,
Logger: logger,
DryRun: dryRun,
Terminator: terminator,
EventRecorder: recorder,
Now: time.Now,
MaxKill: maxKill,
Notifier: notifier,
}
}

Expand Down Expand Up @@ -169,7 +182,7 @@ func (c *Chaoskube) TerminateVictims(ctx context.Context) error {
}
}

victims, err := c.Victims(ctx)
victims, err := c.Victims(ctx, now)
if err == errPodNotFound {
c.Logger.Debug(msgVictimNotFound)
return nil
Expand All @@ -188,8 +201,8 @@ func (c *Chaoskube) TerminateVictims(ctx context.Context) error {
}

// Victims returns up to N pods as configured by MaxKill flag
func (c *Chaoskube) Victims(ctx context.Context) ([]v1.Pod, error) {
pods, err := c.Candidates(ctx)
func (c *Chaoskube) Victims(ctx context.Context, now time.Time) ([]v1.Pod, error) {
pods, err := c.Candidates(ctx, now)
if err != nil {
return []v1.Pod{}, err
}
Expand All @@ -208,7 +221,7 @@ func (c *Chaoskube) Victims(ctx context.Context) ([]v1.Pod, error) {

// Candidates returns the list of pods that are available for termination.
// It returns all pods that match the configured label, annotation and namespace selectors.
func (c *Chaoskube) Candidates(ctx context.Context) ([]v1.Pod, error) {
func (c *Chaoskube) Candidates(ctx context.Context, now time.Time) ([]v1.Pod, error) {
listOptions := metav1.ListOptions{LabelSelector: c.Labels.String()}

podList, err := c.Client.CoreV1().Pods(v1.NamespaceAll).List(ctx, listOptions)
Expand All @@ -234,10 +247,13 @@ func (c *Chaoskube) Candidates(ctx context.Context) ([]v1.Pod, error) {
pods = filterByAnnotations(pods, c.Annotations)
pods = filterByPhase(pods, v1.PodRunning)
pods = filterTerminatingPods(pods)
pods = filterByMinimumAge(pods, c.MinimumAge, c.Now())
pods = filterByPodName(pods, c.IncludedPodNames, c.ExcludedPodNames)
pods = filterByOwnerReference(pods)

pods = filterByMinimumAge(pods, c.ConfigAnnotationPrefix, c.MinimumAge, c.Now(), c.Logger)
pods = filterByFrequency(pods, c.ConfigAnnotationPrefix, c.DefaultFrequency, c.Interval, c.Logger)
pods = filterByTime(pods, c.ConfigAnnotationPrefix, now, c.Logger)

pods = filterByOwnerReference(pods)
return pods, nil
}

Expand Down Expand Up @@ -470,16 +486,31 @@ func filterTerminatingPods(pods []v1.Pod) []v1.Pod {

// filterByMinimumAge filters pods by creation time. Only pods
// older than minimumAge are returned
func filterByMinimumAge(pods []v1.Pod, minimumAge time.Duration, now time.Time) []v1.Pod {
if minimumAge <= time.Duration(0) {
func filterByMinimumAge(pods []v1.Pod, annotationPrefix string, minimumAge time.Duration, now time.Time, logger log.FieldLogger) []v1.Pod {
if annotationPrefix == "" && minimumAge <= time.Duration(0) {
return pods
}

creationTime := now.Add(-minimumAge)
annotation := util.FormatAnnotation(annotationPrefix, "minimum-age")

defaultCreationTime := now.Add(-minimumAge)
filteredList := []v1.Pod{}

for _, pod := range pods {
text, ok := pod.Annotations[annotation]

// Don't filter out pods missing frequency annotation
creationTime := defaultCreationTime
if ok {
minimumAgeOverride, err := time.ParseDuration(text)
if err != nil {
logger.WithField("err", err).Warnf(msgFailedToParseAnnotation, annotation)
continue
}

creationTime = now.Add(-minimumAgeOverride)
}

if pod.ObjectMeta.CreationTimestamp.Time.Before(creationTime) {
filteredList = append(filteredList, pod)
}
Expand Down Expand Up @@ -533,3 +564,116 @@ func filterByOwnerReference(pods []v1.Pod) []v1.Pod {

return filteredList
}

func filterByFrequency(pods []v1.Pod, annotationPrefix string, defaultFrequency string, interval time.Duration, logger log.FieldLogger) []v1.Pod {
if annotationPrefix == "" && defaultFrequency == "" {
return pods
}

annotation := util.FormatAnnotation(annotationPrefix, "frequency")

filteredList := []v1.Pod{}
for _, pod := range pods {
text, ok := pod.Annotations[annotation]

// Don't filter out pods missing frequency annotation
if !ok {
if defaultFrequency == "" {
filteredList = append(filteredList, pod)
continue
} else {
text = defaultFrequency
}
}

chance, err := util.ParseFrequency(text, interval)
if err != nil {
logger.WithField("err", err).Warnf(msgFailedToParseAnnotation, annotation)
continue
}

if chance > rand.Float64() {
filteredList = append(filteredList, pod)
}
}

return filteredList
}

func filterByTime(pods []v1.Pod, annotationPrefix string, now time.Time, logger log.FieldLogger) []v1.Pod {
if annotationPrefix == "" {
return pods
}

timezoneAnnotation := util.FormatAnnotation(annotationPrefix, "timezone")
weekdaysAnnotation := util.FormatAnnotation(annotationPrefix, "excluded-weekdays")
timesOfDayAnnotation := util.FormatAnnotation(annotationPrefix, "excluded-times-of-day")
daysOfYearAnnotation := util.FormatAnnotation(annotationPrefix, "excluded-days-of-year")

filteredList := []v1.Pod{}

checkingPods:
for _, pod := range pods {
localNow := now

text, ok := pod.Annotations[timezoneAnnotation]
if ok {
location, err := time.LoadLocation(text)
if err != nil {
logger.WithField("err", err).WithField("pod-name", pod.Name).WithField("pod-namespace", pod.Namespace).
Warnf(msgFailedToParseAnnotation, timezoneAnnotation)

continue checkingPods
}

localNow = localNow.In(location)
}

// Weekdays
text, ok = pod.Annotations[weekdaysAnnotation]
if ok {
days := util.ParseWeekdays(text)
for _, wd := range days {
if wd == localNow.Weekday() {
continue checkingPods
}
}
}

// Times of day
text, ok = pod.Annotations[timesOfDayAnnotation]
if ok {
periods, err := util.ParseTimePeriods(text)
if err != nil {
logger.WithField("err", err).Warnf(msgFailedToParseAnnotation, timesOfDayAnnotation)
continue checkingPods
}

for _, tp := range periods {
if tp.Includes(localNow) {
continue checkingPods
}
}
}

// Days of year
text, ok = pod.Annotations[daysOfYearAnnotation]
if ok {
days, err := util.ParseDays(text)
if err != nil {
logger.WithField("err", err).Warnf(msgFailedToParseAnnotation, daysOfYearAnnotation)
continue checkingPods
}

for _, d := range days {
if d.Day() == localNow.Day() && d.Month() == localNow.Month() {
continue checkingPods
}
}
}

filteredList = append(filteredList, pod)
}

return filteredList
}
Loading