From 46ff0c8a939ad76bbf604bcfa73b4d9669432f97 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lucas=20Serv=C3=A9n=20Mar=C3=ADn?= Date: Fri, 17 Jan 2020 16:44:57 +0100 Subject: [PATCH] main.go: enable cleanup of old PVCs This commit enables the controller to clean up the PVCs of Thanos Receive Pods that are watched by it. Whenever a receiver is deleted, the controller will spawn a helper container that mounts all PVCs specified by the StatefulSet for that container and `rm -rf`s the contents of them. Tested on a kind cluster. A follow up PR will add E2E tests and verify this functionality. --- main.go | 239 ++++++++++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 207 insertions(+), 32 deletions(-) diff --git a/main.go b/main.go index ead37f9..17ad119 100644 --- a/main.go +++ b/main.go @@ -12,6 +12,7 @@ import ( "net/http/pprof" "os" "os/signal" + "path/filepath" "strings" "sync" "syscall" @@ -30,27 +31,30 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/util/wait" appsinformers "k8s.io/client-go/informers/apps/v1" coreinformers "k8s.io/client-go/informers/core/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/clientcmd" + "k8s.io/client-go/util/workqueue" ) -type label = string - const ( resyncPeriod = 5 * time.Minute + pollPeriod = 1 * time.Second + pollTimeout = 5 * time.Minute internalServerShutdownTimeout = time.Second hashringLabelKey = "controller.receive.thanos.io/hashring" // Metric label values - fetch label = "fetch" - decode label = "decode" - save label = "save" - create label = "create" - update label = "update" - other label = "other" + fetchLabel = "fetch" + decodeLabel = "decode" + saveLabel = "save" + createLabel = "create" + deleteLabel = "delete" + updateLabel = "update" + otherLabel = "other" ) func main() { @@ -65,6 +69,8 @@ func main() { Port int Scheme string InternalAddr string + CleanupImage string + CleanUp bool }{} flag.StringVar(&config.KubeConfig, "kubeconfig", "", "Path to kubeconfig") @@ -77,6 +83,8 @@ func main() { flag.IntVar(&config.Port, "port", 10901, "The port on which receive components are listening for write requests") flag.StringVar(&config.Scheme, "scheme", "http", "The URL scheme on which receive components accept write requests") flag.StringVar(&config.InternalAddr, "internal-addr", ":8080", "The address on which internal server runs") + flag.StringVar(&config.CleanupImage, "cleanup-image", "busybox", "The container image to use for cleanup operations") + flag.BoolVar(&config.CleanUp, "cleanup", true, "Should the controller clean up PVCs?") flag.Parse() logger := log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr)) @@ -131,6 +139,8 @@ func main() { scheme: config.Scheme, labelKey: labelKey, labelValue: labelValue, + cleanupImage: config.CleanupImage, + cleanUp: config.CleanUp, } c := newController(klient, logger, opt) c.registerMetrics(reg) @@ -297,19 +307,25 @@ type options struct { scheme string labelKey string labelValue string + cleanupImage string + cleanUp bool } type controller struct { options *options - queue *queue + podQ workqueue.RateLimitingInterface + stsQ *queue logger log.Logger klient kubernetes.Interface cmapInf cache.SharedIndexInformer + podInf cache.SharedIndexInformer ssetInf cache.SharedIndexInformer reconcileAttempts prometheus.Counter reconcileErrors *prometheus.CounterVec + cleanupAttempts prometheus.Counter + cleanupErrors *prometheus.CounterVec configmapChangeAttempts prometheus.Counter configmapChangeErrors *prometheus.CounterVec configmapHash prometheus.Gauge @@ -325,11 +341,13 @@ func newController(klient kubernetes.Interface, logger log.Logger, o *options) * return &controller{ options: o, - queue: newQueue(), + podQ: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Pod"), + stsQ: newQueue(), logger: logger, klient: klient, cmapInf: coreinformers.NewConfigMapInformer(klient, o.namespace, resyncPeriod, nil), + podInf: coreinformers.NewPodInformer(klient, o.namespace, resyncPeriod, nil), ssetInf: appsinformers.NewFilteredStatefulSetInformer(klient, o.namespace, resyncPeriod, nil, func(lo *v1.ListOptions) { lo.LabelSelector = labels.Set{o.labelKey: o.labelValue}.String() }), @@ -344,6 +362,16 @@ func newController(klient kubernetes.Interface, logger log.Logger, o *options) * }, []string{"type"}, ), + cleanupAttempts: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "thanos_receive_controller_cleanup_attempts_total", + Help: "Total number of cleanups.", + }), + cleanupErrors: prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "thanos_receive_controller_cleanup_errors_total", + Help: "Total number of cleanup errors.", + }, + []string{"type"}, + ), configmapChangeAttempts: prometheus.NewCounter(prometheus.CounterOpts{ Name: "thanos_receive_controller_configmap_change_attempts_total", Help: "Total number of configmap change attempts.", @@ -384,13 +412,14 @@ func newController(klient kubernetes.Interface, logger log.Logger, o *options) * func (c *controller) registerMetrics(reg *prometheus.Registry) { if reg != nil { c.reconcileAttempts.Add(0) - c.reconcileErrors.WithLabelValues(fetch).Add(0) - c.reconcileErrors.WithLabelValues(decode).Add(0) - c.reconcileErrors.WithLabelValues(save).Add(0) + c.reconcileErrors.WithLabelValues(fetchLabel).Add(0) + c.reconcileErrors.WithLabelValues(decodeLabel).Add(0) + c.reconcileErrors.WithLabelValues(saveLabel).Add(0) c.configmapChangeAttempts.Add(0) - c.configmapChangeErrors.WithLabelValues(create).Add(0) - c.configmapChangeErrors.WithLabelValues(update).Add(0) - c.configmapChangeErrors.WithLabelValues(other).Add(0) + c.configmapChangeErrors.WithLabelValues(createLabel).Add(0) + c.configmapChangeErrors.WithLabelValues(deleteLabel).Add(0) + c.configmapChangeErrors.WithLabelValues(updateLabel).Add(0) + c.configmapChangeErrors.WithLabelValues(otherLabel).Add(0) reg.MustRegister( c.reconcileAttempts, c.reconcileErrors, @@ -405,24 +434,35 @@ func (c *controller) registerMetrics(reg *prometheus.Registry) { } func (c *controller) run(stop <-chan struct{}) error { - defer c.queue.stop() + defer c.podQ.ShutDown() + defer c.stsQ.stop() go c.cmapInf.Run(stop) go c.ssetInf.Run(stop) + if c.options.cleanUp { + go c.podInf.Run(stop) + } if err := c.waitForCacheSync(stop); err != nil { return err } c.cmapInf.AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: func(_ interface{}) { c.queue.add() }, - DeleteFunc: func(_ interface{}) { c.queue.add() }, - UpdateFunc: func(_, _ interface{}) { c.queue.add() }, + AddFunc: func(_ interface{}) { c.stsQ.add() }, + DeleteFunc: func(_ interface{}) { c.stsQ.add() }, + UpdateFunc: func(_, _ interface{}) { c.stsQ.add() }, }) c.ssetInf.AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: func(_ interface{}) { c.queue.add() }, - DeleteFunc: func(_ interface{}) { c.queue.add() }, - UpdateFunc: func(_, _ interface{}) { c.queue.add() }, + AddFunc: func(_ interface{}) { c.stsQ.add() }, + DeleteFunc: func(_ interface{}) { c.stsQ.add() }, + UpdateFunc: func(_, _ interface{}) { c.stsQ.add() }, }) - go c.worker() + go c.stsWorker() + + if c.options.cleanUp { + c.podInf.AddEventHandler(cache.ResourceEventHandlerFuncs{ + DeleteFunc: func(obj interface{}) { c.podQ.Add(obj) }, + }) + go c.podWorker() + } <-stop return nil @@ -437,6 +477,10 @@ func (c *controller) waitForCacheSync(stop <-chan struct{}) error { }{ {"ConfigMap", c.cmapInf}, {"StatefulSet", c.ssetInf}, + {"Pod", c.podInf}, + } + if !c.options.cleanUp { + informers = informers[:1] } for _, inf := range informers { if !cache.WaitForCacheSync(stop, inf.informer.HasSynced) { @@ -453,17 +497,148 @@ func (c *controller) waitForCacheSync(stop <-chan struct{}) error { return nil } -func (c *controller) worker() { - for c.queue.get() { +func (c *controller) podWorker() { + fn := func() bool { + key, quit := c.podQ.Get() + if quit { + return false + } + defer c.podQ.Done(key) + if err := c.cleanUp(key.(*corev1.Pod)); err != nil { + level.Error(c.logger).Log("msg", "unable to clean up PVC", "err", err) + c.podQ.AddRateLimited(key) + return true + } + c.podQ.Forget(key) + return true + } + for fn() { + } +} + +func (c *controller) stsWorker() { + for c.stsQ.get() { c.sync() } } +func (c *controller) resolvePodOwnerRef(namespace string, refs []metav1.OwnerReference) (*appsv1.StatefulSet, error) { + for _, ref := range refs { + // If the owner reference points at the wrong kind of object, skip. + if ref.Kind != "StatefulSet" { + continue + } + // If the owner reference points at something that we don't have, then skip. + obj, ok, err := c.ssetInf.GetStore().GetByKey(fmt.Sprintf("%s/%s", namespace, ref.Name)) + if !ok { + continue + } + if err != nil { + return nil, err + } + sts := obj.(*appsv1.StatefulSet) + if sts.UID != ref.UID { + return nil, errors.Wrap(err, "owner reference UID does not match StatefulSet") + } + return sts, nil + } + return nil, nil +} + +func (c *controller) cleanUp(pod *corev1.Pod) error { + c.cleanupAttempts.Inc() + sts, err := c.resolvePodOwnerRef(pod.Namespace, pod.GetOwnerReferences()) + if err != nil { + c.cleanupErrors.WithLabelValues(fetchLabel).Inc() + return errors.Wrap(err, "could not get StatefulSet") + } + // This probably means that the Pod did not belong to a StatefulSet with + // our label selector, i.e. not a StatefulSet we are watching. + if sts == nil { + return nil + } + // Nothing to clean up. + if len(sts.Spec.VolumeClaimTemplates) == 0 { + return nil + } + + helper := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "cleanup-" + pod.Name, + }, + Spec: corev1.PodSpec{ + RestartPolicy: corev1.RestartPolicyNever, + Containers: []corev1.Container{ + { + Name: "cleanup", + Image: c.options.cleanupImage, + Command: []string{"rm", "-rf"}, + ImagePullPolicy: corev1.PullIfNotPresent, + }, + }, + }, + } + + var v corev1.Volume + var name, mountPath string + for _, t := range sts.Spec.VolumeClaimTemplates { + name = fmt.Sprintf("%s-%s", t.Name, pod.Name) + v = corev1.Volume{ + Name: name, + VolumeSource: corev1.VolumeSource{ + PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ + ClaimName: name, + }, + }, + } + helper.Spec.Volumes = append(helper.Spec.Volumes, v) + mountPath = filepath.Join("/pvc", v.Name) + helper.Spec.Containers[0].VolumeMounts = append(helper.Spec.Containers[0].VolumeMounts, corev1.VolumeMount{ + Name: v.Name, + MountPath: mountPath, + }) + helper.Spec.Containers[0].Command = append(helper.Spec.Containers[0].Command, filepath.Join(mountPath, "*")) + } + + if _, err := c.klient.CoreV1().Pods(pod.Namespace).Create(helper); err != nil && !kerrors.IsAlreadyExists(err) { + c.cleanupErrors.WithLabelValues(createLabel).Inc() + return errors.Wrap(err, "unable to create the cleanup Pod") + } + + defer func() { + err := c.klient.CoreV1().Pods(pod.Namespace).Delete(helper.Name, &metav1.DeleteOptions{}) + if err != nil { + level.Error(c.logger).Log("msg", "unable to delete the cleanup Pod", "err", err) + c.cleanupErrors.WithLabelValues(deleteLabel).Inc() + } + }() + + ctx, cancel := context.WithTimeout(context.Background(), pollTimeout) + defer cancel() + + err = wait.PollUntil(pollPeriod, + func() (bool, error) { + if p, _, err := c.podInf.GetStore().GetByKey(fmt.Sprintf("%s/%s", pod.Namespace, helper.Name)); err != nil { + return false, err + } else if p.(*corev1.Pod).Status.Phase == corev1.PodSucceeded { + return true, nil + } + return false, nil + }, + ctx.Done()) + if err != nil { + c.cleanupErrors.WithLabelValues(fetchLabel).Inc() + return errors.Wrap(err, "clean up PersistentVolumeClaim") + } + + return nil +} + func (c *controller) sync() { c.reconcileAttempts.Inc() configMap, ok, err := c.cmapInf.GetStore().GetByKey(fmt.Sprintf("%s/%s", c.options.namespace, c.options.configMapName)) if !ok || err != nil { - c.reconcileErrors.WithLabelValues(fetch).Inc() + c.reconcileErrors.WithLabelValues(fetchLabel).Inc() level.Warn(c.logger).Log("msg", "could not fetch ConfigMap", "err", err, "name", c.options.configMapName) return } @@ -471,7 +646,7 @@ func (c *controller) sync() { var hashrings []receive.HashringConfig if err := json.Unmarshal([]byte(cm.Data[c.options.fileName]), &hashrings); err != nil { - c.reconcileErrors.WithLabelValues(decode).Inc() + c.reconcileErrors.WithLabelValues(decodeLabel).Inc() level.Warn(c.logger).Log("msg", "failed to decode configuration", "err", err) return } @@ -488,7 +663,7 @@ func (c *controller) sync() { c.populate(hashrings, statefulsets) if err := c.saveHashring(hashrings); err != nil { - c.reconcileErrors.WithLabelValues(save).Inc() + c.reconcileErrors.WithLabelValues(saveLabel).Inc() level.Error(c.logger).Log("msg", "failed to save hashrings") } } @@ -539,14 +714,14 @@ func (c *controller) saveHashring(hashring []receive.HashringConfig) error { if kerrors.IsNotFound(err) { _, err = c.klient.CoreV1().ConfigMaps(c.options.namespace).Create(cm) if err != nil { - c.configmapChangeErrors.WithLabelValues(create).Inc() + c.configmapChangeErrors.WithLabelValues(createLabel).Inc() return err } c.configmapLastSuccessfulChangeTime.Set(float64(time.Now().Unix())) return nil } if err != nil { - c.configmapChangeErrors.WithLabelValues(other).Inc() + c.configmapChangeErrors.WithLabelValues(otherLabel).Inc() return err } @@ -556,7 +731,7 @@ func (c *controller) saveHashring(hashring []receive.HashringConfig) error { _, err = c.klient.CoreV1().ConfigMaps(c.options.namespace).Update(cm) if err != nil { - c.configmapChangeErrors.WithLabelValues(update).Inc() + c.configmapChangeErrors.WithLabelValues(updateLabel).Inc() return err }