From 245988f515e55d626329fc58253961bb6467cfc8 Mon Sep 17 00:00:00 2001 From: Howard Gao Date: Wed, 22 Mar 2023 17:01:21 +0800 Subject: [PATCH] [#574] Use k8s jobs to do scaledown --- config/rbac/role.yaml | 8 + .../activemqartemisscaledown_controller.go | 9 + ...ctivemqartemisscaledown_controller_test.go | 244 ++++++++++---- controllers/suite_test.go | 24 +- deploy/cluster_role.yaml | 8 + deploy/role.yaml | 8 + pkg/draincontroller/controller.go | 310 +++++++++++------- pkg/draincontroller/draincontroller_test.go | 49 --- pkg/utils/common/common.go | 10 + 9 files changed, 440 insertions(+), 230 deletions(-) delete mode 100644 pkg/draincontroller/draincontroller_test.go diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml index 4454b4092..248bd1310 100644 --- a/config/rbac/role.yaml +++ b/config/rbac/role.yaml @@ -41,6 +41,14 @@ rules: - deployments/finalizers verbs: - update +- apiGroups: + - batch + resources: + - jobs + verbs: + - create + - delete + - list - apiGroups: - broker.amq.io resources: diff --git a/controllers/activemqartemisscaledown_controller.go b/controllers/activemqartemisscaledown_controller.go index e522bf89c..0b8194701 100644 --- a/controllers/activemqartemisscaledown_controller.go +++ b/controllers/activemqartemisscaledown_controller.go @@ -51,6 +51,7 @@ type ActiveMQArtemisScaledownReconciler struct { //+kubebuilder:rbac:groups=broker.amq.io,namespace=activemq-artemis-operator,resources=activemqartemisscaledowns,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:groups=broker.amq.io,namespace=activemq-artemis-operator,resources=activemqartemisscaledowns/status,verbs=get;update;patch //+kubebuilder:rbac:groups=broker.amq.io,namespace=activemq-artemis-operator,resources=activemqartemisscaledowns/finalizers,verbs=update +//+kubebuilder:rbac:groups=batch,namespace=activemq-artemis-operator,resources=jobs,verbs=create;list;delete // Reconcile is part of the main kubernetes reconciliation loop which aims to // move the current state of the cluster closer to the desired state. @@ -99,6 +100,14 @@ func (r *ActiveMQArtemisScaledownReconciler) Reconcile(ctx context.Context, requ return ctrl.Result{}, nil } +//for testing +func (r *ActiveMQArtemisScaledownReconciler) GetDrainController(namespace string) *draincontroller.Controller { + if inst, ok := controllers[namespace]; ok { + return inst + } + return nil +} + func (r *ActiveMQArtemisScaledownReconciler) getDrainController(localOnly bool, namespace string, kubeClient *kubernetes.Clientset, instance *brokerv1beta1.ActiveMQArtemisScaledown) (kubeinformers.SharedInformerFactory, *draincontroller.Controller, bool) { var kubeInformerFactory kubeinformers.SharedInformerFactory var controllerInstance *draincontroller.Controller diff --git a/controllers/activemqartemisscaledown_controller_test.go b/controllers/activemqartemisscaledown_controller_test.go index 00286e9f1..e85ae9c09 100644 --- a/controllers/activemqartemisscaledown_controller_test.go +++ b/controllers/activemqartemisscaledown_controller_test.go @@ -22,23 +22,31 @@ import ( "bytes" "context" "os" - "strings" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/apiutil" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/serializer" + "k8s.io/apimachinery/pkg/selection" "k8s.io/apimachinery/pkg/types" + "encoding/json" + brokerv1beta1 "github.com/artemiscloud/activemq-artemis-operator/api/v1beta1" + drainctrl "github.com/artemiscloud/activemq-artemis-operator/pkg/draincontroller" "github.com/artemiscloud/activemq-artemis-operator/pkg/resources/environments" "github.com/artemiscloud/activemq-artemis-operator/pkg/utils/common" "github.com/artemiscloud/activemq-artemis-operator/pkg/utils/namer" + appsv1 "k8s.io/api/apps/v1" + batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/tools/remotecommand" ) @@ -72,7 +80,6 @@ var _ = Describe("Scale down controller", func() { Expect(k8sClient.Create(ctx, brokerCrd)).Should(Succeed()) createdBrokerCrd := &brokerv1beta1.ActiveMQArtemis{} - getPersistedVersionedCrd(brokerCrd.ObjectMeta.Name, defaultNamespace, createdBrokerCrd) By("verifying two ready") Eventually(func(g Gomega) { @@ -86,11 +93,11 @@ var _ = Describe("Scale down controller", func() { By("Sending a message to 1") podWithOrdinal := namer.CrToSS(brokerCrd.Name) + "-1" - sendCmd := []string{"amq-broker/bin/artemis", "producer", "--user", "Jay", "--password", "activemq", "--url", "tcp://" + podWithOrdinal + ":61616", "--message-count", "1", "--destination", "queue://DLQ", "--verbose"} + sendCmd := []string{"amq-broker/bin/artemis", "producer", "--user", "Jay", "--password", "activemq", "--url", "tcp://" + podWithOrdinal + ":61616", "--message-count", "1", "--destination", "DLQ", "--verbose"} - content, err := RunCommandInPod(podWithOrdinal, brokerName+"-container", sendCmd) + stdout, _, err := RunCommandInPod(podWithOrdinal, brokerName+"-container", sendCmd) Expect(err).To(BeNil()) - Expect(*content).Should(ContainSubstring("Produced: 1 messages")) + Expect(*stdout).Should(ContainSubstring("Produced: 1 messages")) By("Scaling down to ss-0") podWithOrdinal = namer.CrToSS(brokerCrd.Name) + "-0" @@ -101,40 +108,30 @@ var _ = Describe("Scale down controller", func() { createdBrokerCrd.Spec.DeploymentPlan.Size = common.Int32ToPtr(1) // not checking return from update as it will error on repeat as there is no change // which is expected - k8sClient.Update(ctx, createdBrokerCrd) - By("checking scale down to 0 complete?") + g.Expect(k8sClient.Update(ctx, createdBrokerCrd)).Should(Succeed()) + + By("checking statefulset scaled down to 1 pod") g.Expect(len(createdBrokerCrd.Status.PodStatus.Ready)).Should(BeEquivalentTo(1)) + + }, existingClusterTimeout, interval*2).Should(Succeed()) + + By("Checking messsage count on broker 0") + Eventually(func(g Gomega) { // This moment a drainer pod will come up and do the message migration - // so the pod number will change from 1 to 2 and back to 1. // checking message count on broker 0 to make sure scale down finally happens. - By("Checking messsage count on broker 0") - //./artemis queue stat --silent --url tcp://artemis-broker-ss-0:61616 --queueName DLQ - queryCmd := []string{"amq-broker/bin/artemis", "queue", "stat", "--silent", "--url", "tcp://" + podWithOrdinal + ":61616", "--queueName", "DLQ"} - stdout, err := RunCommandInPod(podWithOrdinal, brokerName+"-container", queryCmd) - g.Expect(err).To(BeNil()) - fields := strings.Split(*stdout, "|") - g.Expect(fields[4]).To(Equal("MESSAGE_COUNT"), *stdout) - g.Expect(strings.TrimSpace(fields[14])).To(Equal("1"), *stdout) - + checkMessageCountOnPod(brokerName, podWithOrdinal, "DLQ", 1, g) }, existingClusterTimeout, interval*2).Should(Succeed()) By("Receiving a message from 0") - rcvCmd := []string{"amq-broker/bin/artemis", "consumer", "--user", "Jay", "--password", "activemq", "--url", "tcp://" + podWithOrdinal + ":61616", "--message-count", "1", "--destination", "queue://DLQ", "--receive-timeout", "10000", "--break-on-null", "--verbose"} - content, err = RunCommandInPod(podWithOrdinal, brokerName+"-container", rcvCmd) + rcvCmd := []string{"amq-broker/bin/artemis", "consumer", "--user", "Jay", "--password", "activemq", "--url", "tcp://" + podWithOrdinal + ":61616", "--message-count", "1", "--destination", "DLQ", "--receive-timeout", "10000", "--break-on-null", "--verbose"} + stdout, _, err = RunCommandInPod(podWithOrdinal, brokerName+"-container", rcvCmd) Expect(err).To(BeNil()) - Expect(*content).Should(ContainSubstring("JMS Message ID:")) - - By("accessing drain pod") - drainPod := &corev1.Pod{} - drainPodKey := types.NamespacedName{Name: brokerName + "-ss-1", Namespace: defaultNamespace} - By("flipping MessageMigration to release drain pod CR, and PVC") - Expect(k8sClient.Get(ctx, drainPodKey, drainPod)).Should(Succeed()) + Expect(*stdout).Should(ContainSubstring("JMS Message ID:")) Eventually(func(g Gomega) { - getPersistedVersionedCrd(brokerCrd.ObjectMeta.Name, defaultNamespace, createdBrokerCrd) By("flipping message migration state (from default true) on brokerCr") booleanFalse := false @@ -144,19 +141,144 @@ var _ = Describe("Scale down controller", func() { }, existingClusterTimeout, existingClusterInterval).Should(Succeed()) - By("verifying drain pod gone") + By("verify drain job gone") + drainJobKey := types.NamespacedName{Name: "job-" + namer.CrToSS(brokerCrd.Name) + "-1", Namespace: defaultNamespace} + drainJob := &batchv1.Job{} Eventually(func(g Gomega) { - g.Expect(k8sClient.Get(ctx, drainPodKey, drainPod)).ShouldNot(Succeed()) - By("drain pod gone") + g.Expect(k8sClient.Get(ctx, drainJobKey, drainJob)).ShouldNot(Succeed()) }, existingClusterTimeout, existingClusterInterval).Should(Succeed()) + By("verify drain pod is deleted") + jobName := "job-" + namer.CrToSS(brokerCrd.Name) + "-1" + drainPodSelector := labels.NewSelector() + jobLabel, err := labels.NewRequirement("job-name", selection.Equals, []string{jobName}) + Expect(err).To(BeNil()) + drainPodSelector = drainPodSelector.Add(*jobLabel) + Eventually(func(g Gomega) { + pods := &corev1.PodList{} + k8sClient.List(context.TODO(), pods, &client.ListOptions{LabelSelector: drainPodSelector, Namespace: defaultNamespace}) + g.Expect(len(pods.Items)).To(Equal(0)) + }, existingClusterTimeout, existingClusterInterval).Should(Succeed()) + + By("clean up broker") Expect(k8sClient.Delete(ctx, createdBrokerCrd)).Should(Succeed()) } }) + + It("scaledown failure test", Label("basic-scaledown-failure"), func() { + + if os.Getenv("USE_EXISTING_CLUSTER") == "true" { + + brokerName := NextSpecResourceName() + ctx := context.Background() + + brokerCrd, createdBrokerCrd := DeployCustomBroker(defaultNamespace, func(candidate *brokerv1beta1.ActiveMQArtemis) { + candidate.Name = brokerName + candidate.Spec.DeploymentPlan.Clustered = &boolTrue + candidate.Spec.DeploymentPlan.Size = common.Int32ToPtr(2) + candidate.Spec.DeploymentPlan.PersistenceEnabled = true + candidate.Spec.DeploymentPlan.MessageMigration = &boolTrue + candidate.Spec.DeploymentPlan.ReadinessProbe = &corev1.Probe{ + InitialDelaySeconds: 1, + PeriodSeconds: 5, + } + }) + + By("verifying two ready") + Eventually(func(g Gomega) { + getPersistedVersionedCrd(brokerCrd.ObjectMeta.Name, defaultNamespace, createdBrokerCrd) + By("Check ready status") + g.Expect(len(createdBrokerCrd.Status.PodStatus.Ready)).Should(BeEquivalentTo(2)) + + }, existingClusterTimeout, interval).Should(Succeed()) + + By("Sending a message to 1") + podWithOrdinal := namer.CrToSS(brokerCrd.Name) + "-1" + + sendCmd := []string{"amq-broker/bin/artemis", "producer", "--user", "Jay", "--password", "activemq", "--url", "tcp://" + podWithOrdinal + ":61616", "--message-count", "1", "--destination", "DLQ", "--verbose"} + + content, _, err := RunCommandInPod(podWithOrdinal, brokerName+"-container", sendCmd) + Expect(err).To(BeNil()) + Expect(*content).Should(ContainSubstring("Produced: 1 messages")) + + By("Make scaledown to fail") + drainController := scaleDownRconciler.GetDrainController("*") + Expect(drainController).NotTo(BeNil()) + + failDrainCommand := []string{ + "/bin/sh", + "-c", + "echo \"To fail the drainer\" ; exit 1", + } + + drainController.SetDrainCommand(failDrainCommand) + + By("Scaling down to fail") + Eventually(func(g Gomega) { + getPersistedVersionedCrd(brokerCrd.ObjectMeta.Name, defaultNamespace, createdBrokerCrd) + createdBrokerCrd.Spec.DeploymentPlan.Size = common.Int32ToPtr(1) + // not checking return from update as it will error on repeat as there is no change + // which is expected + g.Expect(k8sClient.Update(ctx, createdBrokerCrd)).Should(Succeed()) + }, existingClusterTimeout, interval*2).Should(Succeed()) + + By("Checking job status") + jobKey := types.NamespacedName{Name: "job-" + namer.CrToSS(brokerCrd.Name) + "-1", Namespace: defaultNamespace} + job := &batchv1.Job{} + Eventually(func(g Gomega) { + g.Expect(k8sClient.Get(ctx, jobKey, job)).Should(Succeed()) + g.Expect(job.Status.Failed).To(Equal(int32(1))) + }, existingClusterTimeout, existingClusterInterval).Should(Succeed()) + + By("checking pod 0 didn't get the message") + podWithOrdinal = namer.CrToSS(brokerCrd.Name) + "-0" + Eventually(func(g Gomega) { + checkMessageCountOnPod(brokerName, podWithOrdinal, "DLQ", 0, g) + }, existingClusterTimeout, existingClusterInterval).Should(Succeed()) + + By("Create new job spec to make it pass") + ssKey := types.NamespacedName{ + Name: namer.CrToSS(brokerCrd.Name), + Namespace: defaultNamespace, + } + currentSS := &appsv1.StatefulSet{} + Eventually(func(g Gomega) { + g.Expect(k8sClient.Get(ctx, ssKey, currentSS)).Should(Succeed()) + }, existingClusterTimeout, existingClusterInterval).Should(Succeed()) + + drainController.SetDrainCommand(drainctrl.DefaultDrainCommand) + + newJobSpec, err := drainController.NewDrainJob(currentSS, 1, job.Name+"-new") + Expect(err).To(BeNil()) + jobs := k8sClientSet.BatchV1().Jobs(defaultNamespace) + + _, err = jobs.Create(context.TODO(), newJobSpec, metav1.CreateOptions{}) + Expect(err).To(BeNil()) + + By("checking new job eventually complete") + newJobKey := types.NamespacedName{Name: job.Name + "-new", Namespace: defaultNamespace} + Eventually(func(g Gomega) { + g.Expect(k8sClient.Get(ctx, newJobKey, newJobSpec)).To(Succeed()) + g.Expect(newJobSpec.Status.Succeeded).To(Equal(int32(1))) + g.Expect(newJobSpec.Status.Failed).To(Equal(int32(0))) + }, existingClusterTimeout, existingClusterInterval).Should(Succeed()) + + By("checking pod 0 got the message") + Eventually(func(g Gomega) { + checkMessageCountOnPod(brokerName, podWithOrdinal, "DLQ", 1, g) + }, existingClusterTimeout, existingClusterInterval).Should(Succeed()) + + By("clean up") + deletePolicy := metav1.DeletePropagationBackground + Expect(k8sClient.Delete(ctx, job, &client.DeleteOptions{PropagationPolicy: &deletePolicy})) + Expect(k8sClient.Delete(ctx, newJobSpec, &client.DeleteOptions{PropagationPolicy: &deletePolicy})).Should(Succeed()) + Expect(k8sClient.Delete(ctx, createdBrokerCrd)).Should(Succeed()) + } + }) }) - It("Toleration ok, verify scaledown", func() { + It("Toleration ok, verify scaledown", Label("scaledown-toleration"), func() { // some required services on crc get evicted which invalidates this test of taints isOpenshift, err := environments.DetectOpenshift() @@ -218,7 +340,7 @@ var _ = Describe("Scale down controller", func() { By("Sending a message to Host: " + podWithOrdinal) sendCmd := []string{"amq-broker/bin/artemis", "producer", "--user", "Jay", "--password", "activemq", "--url", "tcp://" + podWithOrdinal + ":61616", "--message-count", "1"} - content, err := RunCommandInPod(podWithOrdinal, brokerKey.Name+"-container", sendCmd) + content, _, err := RunCommandInPod(podWithOrdinal, brokerKey.Name+"-container", sendCmd) Expect(err).To(BeNil()) @@ -243,7 +365,7 @@ var _ = Describe("Scale down controller", func() { recvCmd := []string{"amq-broker/bin/artemis", "consumer", "--user", "Jay", "--password", "activemq", "--url", "tcp://" + podWithOrdinal + ":61616", "--message-count", "1", "--receive-timeout", "10000", "--break-on-null", "--verbose"} - content, err = RunCommandInPod(podWithOrdinal, brokerKey.Name+"-container", recvCmd) + content, _, err = RunCommandInPod(podWithOrdinal, brokerKey.Name+"-container", recvCmd) Expect(err).To(BeNil()) @@ -263,14 +385,17 @@ var _ = Describe("Scale down controller", func() { g.Expect(k8sClient.Update(ctx, &node)).Should(Succeed()) }, timeout*2, interval).Should(Succeed()) - By("accessing drain pod") - drainPod := &corev1.Pod{} - drainPodKey := types.NamespacedName{Name: brokerKey.Name + "-ss-1", Namespace: defaultNamespace} - By("flipping MessageMigration to release drain pod CR, and PVC") - Expect(k8sClient.Get(ctx, drainPodKey, drainPod)).Should(Succeed()) - + //here the drain job/pod should have gone. + //should check gone not exitst + By("checking drain job gone") + jobKey := types.NamespacedName{Name: "job-" + namer.CrToSS(crd.Name) + "-1", Namespace: defaultNamespace} + job := &batchv1.Job{} Eventually(func(g Gomega) { + err := k8sClient.Get(ctx, jobKey, job) + g.Expect(errors.IsNotFound(err)).To(BeTrue(), "error", err) + }, existingClusterTimeout, existingClusterInterval).Should(Succeed()) + Eventually(func(g Gomega) { g.Expect(k8sClient.Get(ctx, brokerKey, createdCrd)).Should(Succeed()) By("flipping message migration state (from default true) on brokerCr") booleanFalse := false @@ -280,18 +405,13 @@ var _ = Describe("Scale down controller", func() { }, existingClusterTimeout, existingClusterInterval).Should(Succeed()) - By("verifying drain pod gone") - Eventually(func(g Gomega) { - g.Expect(k8sClient.Get(ctx, drainPodKey, drainPod)).ShouldNot(Succeed()) - By("drain pod gone") - }, existingClusterTimeout, existingClusterInterval).Should(Succeed()) - Expect(k8sClient.Delete(ctx, createdCrd)).Should(Succeed()) } }) + }) -func RunCommandInPod(podName string, containerName string, command []string) (*string, error) { +func RunCommandInPod(podName string, containerName string, command []string) (*string, *string, error) { gvk := schema.GroupVersionKind{ Group: "", Version: "v1", @@ -316,27 +436,33 @@ func RunCommandInPod(podName string, containerName string, command []string) (*s exec, err := remotecommand.NewSPDYExecutor(testEnv.Config, "POST", execReq.URL()) if err != nil { - return nil, err + return nil, nil, err } - var consumerCapturedOut bytes.Buffer + var capturedOut bytes.Buffer + var capturedErr bytes.Buffer err = exec.Stream(remotecommand.StreamOptions{ Stdin: os.Stdin, - Stdout: &consumerCapturedOut, - Stderr: os.Stderr, + Stdout: &capturedOut, + Stderr: &capturedErr, Tty: false, }) - if err != nil { - return nil, err - } - //try get some content if any - Eventually(func(g Gomega) { - g.Expect(consumerCapturedOut.Len() > 0) - }, existingClusterTimeout, interval) + stdout := capturedOut.String() + stderr := capturedErr.String() - content := consumerCapturedOut.String() + return &stdout, &stderr, err +} - return &content, nil +func checkMessageCountOnPod(crName string, podName string, queueName string, count int, g Gomega) { + httpOrigin := "Origin: http://" + podName + ":8161" + reqUrl := "http://" + podName + ":8161/console/jolokia/read/org.apache.activemq.artemis:broker=\"amq-broker\",address=\"" + queueName + "\",component=addresses,queue=\"" + queueName + "\",routing-type=\"anycast\",subcomponent=queues/MessageCount" + queryCmd := []string{"curl", "-H", httpOrigin, "-u", "any:any", reqUrl} + stdout, stderr, err := RunCommandInPod(podName, crName+"-container", queryCmd) + g.Expect(err).To(BeNil(), "stdout", stdout, "stderr", stderr) + var response map[string]interface{} + Expect(json.Unmarshal([]byte(*stdout), &response)).To(Succeed()) + g.Expect(response["status"]).To(BeEquivalentTo(200)) + g.Expect(response["value"]).To(BeEquivalentTo(count)) } diff --git a/controllers/suite_test.go b/controllers/suite_test.go index 5bd719c2a..a97b209b7 100644 --- a/controllers/suite_test.go +++ b/controllers/suite_test.go @@ -42,6 +42,7 @@ import ( . "github.com/onsi/gomega" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/rest" "sigs.k8s.io/controller-runtime/pkg/cache" @@ -91,14 +92,15 @@ const ( ) var ( - resCount int64 - specCount int64 - currentDir string - k8sClient client.Client - restConfig *rest.Config - testEnv *envtest.Environment - ctx context.Context - cancel context.CancelFunc + resCount int64 + specCount int64 + currentDir string + k8sClient client.Client + k8sClientSet *kubernetes.Clientset + restConfig *rest.Config + testEnv *envtest.Environment + ctx context.Context + cancel context.CancelFunc // the cluster url clusterUrl *url.URL @@ -112,6 +114,7 @@ var ( brokerReconciler *ActiveMQArtemisReconciler securityReconciler *ActiveMQArtemisSecurityReconciler + scaleDownRconciler *ActiveMQArtemisScaledownReconciler oprRes = []string{ "../deploy/service_account.yaml", @@ -373,7 +376,7 @@ func createControllerManager(disableMetrics bool, watchNamespace string) { err = addressReconciler.SetupWithManager(k8Manager, managerCtx) Expect(err).ToNot(HaveOccurred(), "failed to create address reconciler") - scaleDownRconciler := &ActiveMQArtemisScaledownReconciler{ + scaleDownRconciler = &ActiveMQArtemisScaledownReconciler{ Client: k8Manager.GetClient(), Scheme: k8Manager.GetScheme(), Config: k8Manager.GetConfig(), @@ -580,6 +583,9 @@ func setUpK8sClient() { k8sClient, err = client.New(restConfig, client.Options{Scheme: scheme.Scheme}) Expect(err).NotTo(HaveOccurred()) Expect(k8sClient).NotTo(BeNil()) + + k8sClientSet, err = kubernetes.NewForConfig(restConfig) + Expect(err).To(BeNil()) } var _ = BeforeSuite(func() { diff --git a/deploy/cluster_role.yaml b/deploy/cluster_role.yaml index 1502b34b5..2f949a508 100644 --- a/deploy/cluster_role.yaml +++ b/deploy/cluster_role.yaml @@ -38,6 +38,14 @@ rules: - deployments/finalizers verbs: - update +- apiGroups: + - batch + resources: + - jobs + verbs: + - create + - delete + - list - apiGroups: - broker.amq.io resources: diff --git a/deploy/role.yaml b/deploy/role.yaml index 44ed4d102..a78a0b9cd 100644 --- a/deploy/role.yaml +++ b/deploy/role.yaml @@ -38,6 +38,14 @@ rules: - deployments/finalizers verbs: - update +- apiGroups: + - batch + resources: + - jobs + verbs: + - create + - delete + - list - apiGroups: - broker.amq.io resources: diff --git a/pkg/draincontroller/controller.go b/pkg/draincontroller/controller.go index 9c3f0f881..e3a1da5ce 100644 --- a/pkg/draincontroller/controller.go +++ b/pkg/draincontroller/controller.go @@ -28,7 +28,9 @@ import ( "time" appsv1 "k8s.io/api/apps/v1" + batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" rbacv1 "k8s.io/api/rbac/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -44,7 +46,6 @@ import ( "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" - "encoding/json" "sort" "strconv" "strings" @@ -52,6 +53,7 @@ import ( rbacutil "github.com/artemiscloud/activemq-artemis-operator/pkg/rbac" "github.com/artemiscloud/activemq-artemis-operator/pkg/resources" "github.com/artemiscloud/activemq-artemis-operator/pkg/resources/secrets" + "github.com/artemiscloud/activemq-artemis-operator/pkg/utils/common" "github.com/artemiscloud/activemq-artemis-operator/pkg/utils/namer" "github.com/artemiscloud/activemq-artemis-operator/pkg/utils/selectors" "k8s.io/apimachinery/pkg/labels" @@ -62,9 +64,14 @@ import ( var dlog = ctrl.Log.WithName("controller_v1beta1activemqartemisscaledown") +var DefaultDrainCommand = []string{ + "/bin/sh", + "-c", + "echo \"Starting the drainer\" ; /opt/amq/bin/drain.sh ; EXIT_CODE=$? ; echo \"Drain completed! Exit code $?\"; exit $EXIT_CODE", +} + const controllerAgentName = "statefulset-drain-controller" -const AnnotationStatefulSet = "statefulsets.kubernetes.io/drainer-pod-owner" // TODO: can we replace this with an OwnerReference with the StatefulSet as the owner? -const AnnotationDrainerPodTemplate = "statefulsets.kubernetes.io/drainer-pod-template" +const AnnotationStatefulSet = "statefulsets.kubernetes.io/drainer-pod-owner" const LabelDrainPod = "drain-pod" const DrainServiceAccountName = "drain-pod-service-account" @@ -82,9 +89,6 @@ const ( MessagePVCDeleted = "delete Claim %s in StatefulSet %s successful" ) -// TODO: Remove this hack -var globalPodTemplateJson string = "{\n \"metadata\": {\n \"labels\": {\n \"app\": \"CRNAME-amq-drainer\"\n }\n },\n \"spec\": {\n \"serviceAccount\": \"SERVICE_ACCOUNT\",\n \"serviceAccountName\": \"SERVICE_ACCOUNT_NAME\",\n \"terminationGracePeriodSeconds\": 5,\n \"containers\": [\n {\n \"env\": [\n {\n \"name\": \"AMQ_EXTRA_ARGS\",\n \"value\": \"--no-autotune\"\n },\n {\n \"name\": \"HEADLESS_SVC_NAME\",\n \"value\": \"HEADLESSSVCNAMEVALUE\"\n },\n {\n \"name\": \"PING_SVC_NAME\",\n \"value\": \"PINGSVCNAMEVALUE\"\n },\n {\n \"name\": \"AMQ_USER\",\n \"value\": \"admin\"\n },\n {\n \"name\": \"AMQ_PASSWORD\",\n \"value\": \"admin\"\n },\n {\n \"name\": \"AMQ_ROLE\",\n \"value\": \"admin\"\n },\n {\n \"name\": \"AMQ_NAME\",\n \"value\": \"amq-broker\"\n },\n {\n \"name\": \"AMQ_TRANSPORTS\",\n \"value\": \"openwire,amqp,stomp,mqtt,hornetq\"\n },\n {\n \"name\": \"AMQ_GLOBAL_MAX_SIZE\",\n \"value\": \"100mb\"\n },\n {\n \"name\": \"AMQ_DATA_DIR\",\n \"value\": \"/opt/CRNAME/data\"\n },\n {\n \"name\": \"AMQ_DATA_DIR_LOGGING\",\n \"value\": \"true\"\n },\n {\n \"name\": \"AMQ_CLUSTERED\",\n \"value\": \"true\"\n },\n {\n \"name\": \"AMQ_REPLICAS\",\n \"value\": \"1\"\n },\n {\n \"name\": \"AMQ_CLUSTER_USER\",\n \"value\": \"CLUSTERUSER\"\n },\n {\n \"name\": \"AMQ_CLUSTER_PASSWORD\",\n \"value\": \"CLUSTERPASS\"\n },\n {\n \"name\": \"POD_NAMESPACE\",\n \"valueFrom\": {\n \"fieldRef\": {\n \"fieldPath\": \"metadata.namespace\"\n }\n }\n },\n {\n \"name\": \"OPENSHIFT_DNS_PING_SERVICE_PORT\",\n \"value\": \"7800\"\n }\n ],\n \"image\": \"SSIMAGE\",\n \"name\": \"drainer-amq\",\n\n \"command\": [\"/bin/sh\", \"-c\", \"echo \\\"Starting the drainer\\\" ; /opt/amq/bin/drain.sh ; EXIT_CODE=$? ; echo \\\"Drain completed! Exit code $?\\\"; exit $EXIT_CODE\"],\n \"volumeMounts\": [\n {\n \"name\": \"CRNAME\",\n \"mountPath\": \"/opt/CRNAME/data\"\n }\n ]\n }\n ]\n }\n}" - type Controller struct { name string // kubeclientset is a standard kubernetes clientset @@ -120,6 +124,8 @@ type Controller struct { stopCh chan struct{} client client.Client + + drainCommand []string } func eventLog(format string, args ...interface{}) { @@ -212,7 +218,6 @@ func (c *Controller) AddInstance(instance *brokerv1beta1.ActiveMQArtemisScaledow Namespace: instance.Annotations["CRNAMESPACE"], Name: namer.CrToSS(instance.Annotations["CRNAME"]), } - dlog.Info("adding a new scaledown instance", "key", namespacedName) c.ssNamesMap[namespacedName] = instance.Annotations dlog.Info("Added new instance", "key", namespacedName, "now values", len(c.ssNamesMap)) c.ssToCrMap[namespacedName] = instance @@ -367,11 +372,6 @@ func (c *Controller) processStatefulSet(sts *appsv1.StatefulSet) error { } dlog.Info("Statefulset " + sts.Name + " Spec.VolumeClaimTemplates is " + strconv.Itoa((len(sts.Spec.VolumeClaimTemplates)))) - //if sts.Annotations[AnnotationDrainerPodTemplate] == "" { - // log.Info("Ignoring StatefulSet '%s' because it does not define a drain pod template.", sts.Name) - // return nil - //} - claimsGroupedByOrdinal, err := c.getClaims(sts) if err != nil { err = fmt.Errorf("error while getting list of PVCs in namespace %s: %s", sts.Namespace, err) @@ -396,21 +396,27 @@ func (c *Controller) processStatefulSet(sts *appsv1.StatefulSet) error { } // TODO check if the number of claims matches the number of StatefulSet's volumeClaimTemplates. What if it doesn't? + jobName := "job-" + getPodName(sts, ordinal) - podName := getPodName(sts, ordinal) - dlog.Info("got pod name", "name", podName) - - pod, err := c.podLister.Pods(sts.Namespace).Get(podName) + jobs, err := c.kubeclientset.BatchV1().Jobs(sts.Namespace).List(context.TODO(), metav1.ListOptions{}) if err != nil && !errors.IsNotFound(err) { - dlog.Error(err, "Error while getting Pod "+podName) + dlog.Error(err, "Error while getting Job "+jobName) return err } + var job *batchv1.Job = nil + for _, j := range jobs.Items { + if j.Name == jobName { + job = &j + break + } + } + // Is it a drain pod or a regular stateful pod? - if isDrainPod(pod) { - dlog.Info("This is a drain pod", "pod name", podName) - err = c.cleanUpDrainPodIfNeeded(sts, pod, ordinal) + if job != nil { + dlog.V(1).Info("This is a drain job", "job name", jobName, "details", job) + err = c.cleanUpDrainPodIfNeeded(sts, job, ordinal) if err != nil { return err } @@ -425,19 +431,19 @@ func (c *Controller) processStatefulSet(sts *appsv1.StatefulSet) error { // TODO: scale down to zero? should what happens on such events be configurable? there may or may not be anywhere to drain to if int32(ordinal) >= *sts.Spec.Replicas { - dlog.Info("ordinal is greater then replicas", "ordinal", ordinal, "replicas", *sts.Spec.Replicas) + dlog.Info("ordinal is greater than replicas", "ordinal", ordinal, "replicas", *sts.Spec.Replicas) // PVC exists, but its ordinal is higher than the current last stateful pod's ordinal; // this means the PVC is an orphan and should be drained & deleted // If the Pod doesn't exist, we'll create it - if pod == nil { // TODO: what if the PVC doesn't exist here (or what if it's deleted just after we create the pod) - dlog.Info("Found orphaned PVC(s) for ordinal " + strconv.Itoa(ordinal) + ". Creating drain pod " + podName) + if job == nil { // TODO: what if the PVC doesn't exist here (or what if it's deleted just after we create the pod) + dlog.Info("Found orphaned PVC(s) for ordinal " + strconv.Itoa(ordinal) + ". Creating drain pod " + jobName) // Check to ensure we have a pod to drain to ordinalZeroPodName := getPodName(sts, 0) ordinalZeroPod, err := c.podLister.Pods(sts.Namespace).Get(ordinalZeroPodName) if err != nil { - dlog.Error(err, "Error while getting ordinal zero pod "+podName+": "+err.Error()) + dlog.Error(err, "Error while getting ordinal zero pod "+ordinalZeroPodName+": "+err.Error()) return err } @@ -469,36 +475,32 @@ func (c *Controller) processStatefulSet(sts *appsv1.StatefulSet) error { continue } - dlog.Info("Creating new drain pod...", "sts", sts) - pod, err := c.newPod(sts, ordinal) + dlog.V(1).Info("Creating new drain job...", "sts", sts) + jobName := "job-" + getPodName(sts, ordinal) + job, err := c.NewDrainJob(sts, ordinal, jobName) if err != nil { - dlog.Error(err, "error creating drain pod") - return fmt.Errorf("can't create drain Pod object: %s", err) + dlog.Error(err, "failed to create job spec") + return err } - dlog.Info("Now creating the drain pod in namespace "+sts.Namespace, "pod", pod) - // needs a proper account for the pod to be created/start. - _, err = c.kubeclientset.CoreV1().Pods(sts.Namespace).Create(context.TODO(), pod, metav1.CreateOptions{}) + dlog.V(1).Info("Created the drain job", "job", job) + + jobs := c.kubeclientset.BatchV1().Jobs(sts.Namespace) + + _, err = jobs.Create(context.TODO(), job, metav1.CreateOptions{}) - // If an error occurs during Create, we'll requeue the item so we can - // attempt processing again later. This could have been caused by a - // temporary network failure, or any other transient reason. if err != nil { - dlog.Error(err, "Error while creating drain Pod "+podName+": ") + dlog.Error(err, "failed to create job", "namespace", sts.Namespace) return err } if !c.localOnly { - c.recorder.Event(sts, corev1.EventTypeNormal, SuccessCreate, fmt.Sprintf(MessageDrainPodCreated, podName, sts.Name)) + c.recorder.Event(sts, corev1.EventTypeNormal, SuccessCreate, fmt.Sprintf(MessageDrainPodCreated, jobName, sts.Name)) } continue - //} else { - // log.Info("Pod '%s' exists. Not taking any action.", podName) } } } - - // TODO: add status annotation (what info?) return nil } @@ -575,20 +577,14 @@ func (c *Controller) cleanupDrainRBACResources(namespace string) { } } -func (c *Controller) cleanUpDrainPodIfNeeded(sts *appsv1.StatefulSet, pod *corev1.Pod, ordinal int) error { - // Drain Pod already exists. Check if it's done draining. - podName := getPodName(sts, ordinal) +func (c *Controller) cleanUpDrainPodIfNeeded(sts *appsv1.StatefulSet, job *batchv1.Job, ordinal int) error { - podPhase := pod.Status.Phase - if podPhase == corev1.PodSucceeded || podPhase == corev1.PodFailed { + if common.IsJobStatusConditionTrue(job.Status.Conditions, batchv1.JobComplete) { defer c.cleanupDrainRBACResources(sts.Namespace) - } - switch podPhase { - case (corev1.PodSucceeded): - dlog.Info("Drain pod " + podName + " finished.") + dlog.Info("Drain job " + job.Name + " finished.") if !c.localOnly { - c.recorder.Event(sts, corev1.EventTypeNormal, DrainSuccess, fmt.Sprintf(MessageDrainPodFinished, podName, sts.Name)) + c.recorder.Event(sts, corev1.EventTypeNormal, DrainSuccess, fmt.Sprintf(MessageDrainPodFinished, job.Name, sts.Name)) } for _, pvcTemplate := range sts.Spec.VolumeClaimTemplates { @@ -606,31 +602,24 @@ func (c *Controller) cleanUpDrainPodIfNeeded(sts *appsv1.StatefulSet, pod *corev // TODO what if the user scales up the statefulset and the statefulset controller creates the new pod after we delete the pod but before we delete the PVC // TODO what if we crash after we delete the PVC, but before we delete the pod? // - dlog.Info("Deleting drain pod " + podName) - err := c.kubeclientset.CoreV1().Pods(sts.Namespace).Delete(context.TODO(), podName, metav1.DeleteOptions{}) + dlog.Info("Deleting drain job " + job.Name) + // this delete policy lets the drainer pod to be deleted with the job + deletePolicy := metav1.DeletePropagationBackground + err := c.kubeclientset.BatchV1().Jobs(sts.Namespace).Delete(context.TODO(), job.Name, metav1.DeleteOptions{PropagationPolicy: &deletePolicy}) if err != nil { return err } + if !c.localOnly { - c.recorder.Event(sts, corev1.EventTypeNormal, PodDeleteSuccess, fmt.Sprintf(MessageDrainPodDeleted, podName, sts.Name)) + c.recorder.Event(sts, corev1.EventTypeNormal, PodDeleteSuccess, fmt.Sprintf(MessageDrainPodDeleted, job.Name, sts.Name)) } - - case (corev1.PodFailed): - dlog.Info("Drain pod " + podName + " failed.") - - default: - str := fmt.Sprintf("Drain pod Phase was %s", pod.Status.Phase) - dlog.Info(str) - + } else { + dlog.Info("Drain pod "+job.Name+" not complete.", "job status", job.Status) } return nil } -func isDrainPod(pod *corev1.Pod) bool { - return pod != nil && pod.ObjectMeta.Annotations[AnnotationStatefulSet] != "" -} - // enqueueStatefulSet takes a StatefulSet resource and converts it into a namespace/name // string which is then put onto the work queue. This method should *not* be // passed resources of any type other than StatefulSet. @@ -688,7 +677,7 @@ func (c *Controller) handlePod(obj interface{}) { stsNameFromAnnotation := object.GetAnnotations()[AnnotationStatefulSet] if stsNameFromAnnotation != "" { - dlog.V(5).Info("Found pod with " + AnnotationStatefulSet + " annotation pointing to StatefulSet " + stsNameFromAnnotation + ". Enqueueing StatefulSet.") + dlog.V(5).Info("Found object with " + AnnotationStatefulSet + " annotation pointing to StatefulSet " + stsNameFromAnnotation + ". Enqueueing StatefulSet.") sts, err := c.statefulSetLister.StatefulSets(object.GetNamespace()).Get(stsNameFromAnnotation) if err != nil { dlog.V(4).Info("Error retrieving StatefulSet " + stsNameFromAnnotation + ": " + err.Error()) @@ -759,83 +748,177 @@ func (c *Controller) getClusterCredentials(namespace string, ssNames map[string] } } -func (c *Controller) newPod(sts *appsv1.StatefulSet, ordinal int) (*corev1.Pod, error) { +func (c *Controller) GetDrainCommand() []string { + if c.drainCommand == nil { + return DefaultDrainCommand + } + return c.drainCommand +} + +func (c *Controller) SetDrainCommand(cmd []string) { + c.drainCommand = cmd +} + +func (c *Controller) NewDrainJob(sts *appsv1.StatefulSet, ordinal int, jobName string) (*batchv1.Job, error) { ssNamesKey := types.NamespacedName{ Namespace: sts.Namespace, Name: sts.Name, } - dlog.Info("Creating newPod for ss", "ss", ssNamesKey) + dlog.Info("Creating new job for ss", "ss", ssNamesKey) if _, ok := c.ssNamesMap[ssNamesKey]; !ok { dlog.Info("Cannot find drain pod data for statefule set", "namespace", ssNamesKey) return nil, fmt.Errorf("No drain pod data for statefulset " + sts.Name) } - ssNames := c.ssNamesMap[ssNamesKey] + parameters := c.ssNamesMap[ssNamesKey] - //podTemplateJson := sts.Annotations[AnnotationDrainerPodTemplate] - //TODO: Remove this blatant hack - podTemplateJson := globalPodTemplateJson - clusterUser, clusterPassword := c.getClusterCredentials(sts.Namespace, ssNames) - podTemplateJson = strings.Replace(podTemplateJson, "CRNAME", ssNames["CRNAME"], -1) - podTemplateJson = strings.Replace(podTemplateJson, "CLUSTERUSER", clusterUser, 1) - podTemplateJson = strings.Replace(podTemplateJson, "CLUSTERPASS", clusterPassword, 1) - podTemplateJson = strings.Replace(podTemplateJson, "HEADLESSSVCNAMEVALUE", ssNames["HEADLESSSVCNAMEVALUE"], 1) - podTemplateJson = strings.Replace(podTemplateJson, "PINGSVCNAMEVALUE", ssNames["PINGSVCNAMEVALUE"], 1) + var backOffLimit int32 = 0 + var termGracePeriodSecs int64 = 5 + clusterUser, clusterPassword := c.getClusterCredentials(sts.Namespace, parameters) + var serviceAccountName string if c.localOnly { - podTemplateJson = strings.Replace(podTemplateJson, "SERVICE_ACCOUNT", os.Getenv("SERVICE_ACCOUNT"), 1) - podTemplateJson = strings.Replace(podTemplateJson, "SERVICE_ACCOUNT_NAME", os.Getenv("SERVICE_ACCOUNT"), 1) + serviceAccountName = os.Getenv("SERVICE_ACCOUNT") } else { // the drain pod is in a different namespace, we need set up a service account with proper permission // and should delete it after drain is done. c.createDrainRBACResources(sts.Namespace) dlog.Info("Setting drain pod service account", "service account name", DrainServiceAccountName) - podTemplateJson = strings.Replace(podTemplateJson, "SERVICE_ACCOUNT", DrainServiceAccountName, 1) - podTemplateJson = strings.Replace(podTemplateJson, "SERVICE_ACCOUNT_NAME", DrainServiceAccountName, 1) + serviceAccountName = DrainServiceAccountName } image := sts.Spec.Template.Spec.Containers[0].Image - //sts.Spec.Template.Spec.Containers[0].Resources = c.resources if image == "" { return nil, fmt.Errorf("No drain pod image configured for StatefulSet " + sts.Name) } - podTemplateJson = strings.Replace(podTemplateJson, "SSIMAGE", image, 1) - if podTemplateJson == "" { - return nil, fmt.Errorf("No drain pod template configured for StatefulSet " + sts.Name) - } - pod := corev1.Pod{} - err := json.Unmarshal([]byte(podTemplateJson), &pod) - if err != nil { - return nil, fmt.Errorf("Can't unmarshal DrainerPodTemplate JSON from annotation: " + err.Error()) - } - pod.Name = getPodName(sts, ordinal) - pod.Namespace = sts.Namespace + podName := getPodName(sts, ordinal) + jobLabels := map[string]string{} + jobLabels["app"] = parameters["CRNAME"] + "-amq-drainer" + jobLabels[LabelDrainPod] = podName - if pod.Labels == nil { - pod.Labels = map[string]string{} - } - pod.Labels[LabelDrainPod] = pod.Name - if pod.Annotations == nil { - pod.Annotations = map[string]string{} - } - pod.Annotations[AnnotationStatefulSet] = sts.Name + jobAnnotations := map[string]string{} + jobAnnotations[AnnotationStatefulSet] = sts.Name - // TODO: cannot set blockOwnerDeletion if an ownerReference refers to a resource you can't set finalizers on: User "system:serviceaccount:kube-system:statefulset-drain-controller" cannot update statefulsets/finalizers.apps - if pod.OwnerReferences == nil { - pod.OwnerReferences = []metav1.OwnerReference{} - } + jobOwnerReferences := []metav1.OwnerReference{} ownerCr := c.ssToCrMap[ssNamesKey] - pod.OwnerReferences = append(pod.OwnerReferences, *metav1.NewControllerRef(ownerCr, ownerCr.GroupVersionKind())) - - pod.Spec.RestartPolicy = corev1.RestartPolicyOnFailure - pod.Spec.Containers[0].Resources = c.resources - pod.Spec.Tolerations = sts.Spec.Template.Spec.Tolerations + jobOwnerReferences = append(jobOwnerReferences, *metav1.NewControllerRef(ownerCr, ownerCr.GroupVersionKind())) + + jobSpec := &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: jobName, + Namespace: sts.Namespace, + Labels: jobLabels, + Annotations: jobAnnotations, + OwnerReferences: jobOwnerReferences, + }, + Spec: batchv1.JobSpec{ + Template: v1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Name: jobName, + }, + Spec: v1.PodSpec{ + ServiceAccountName: serviceAccountName, + TerminationGracePeriodSeconds: &termGracePeriodSecs, + Tolerations: sts.Spec.Template.Spec.Tolerations, + Containers: []v1.Container{ + { + Name: "drainer-amq", + Image: image, + Resources: c.resources, + Env: []v1.EnvVar{ + { + Name: "AMQ_EXTRA_ARGS", + Value: "--no-autotune", + }, + { + Name: "HEADLESS_SVC_NAME", + Value: parameters["HEADLESSSVCNAMEVALUE"], + }, + { + Name: "PING_SVC_NAME", + Value: parameters["PINGSVCNAMEVALUE"], + }, + { + Name: "AMQ_USER", + Value: "admin", + }, + { + Name: "AMQ_PASSWORD", + Value: "admin", + }, + { + Name: "AMQ_ROLE", + Value: "admin", + }, + { + Name: "AMQ_NAME", + Value: "amq-broker", + }, + { + Name: "AMQ_TRANSPORTS", + Value: "openwire,amqp,stomp,mqtt,hornetq", + }, + { + Name: "AMQ_GLOBAL_MAX_SIZE", + Value: "100mb", + }, + { + Name: "AMQ_DATA_DIR", + Value: "/opt/" + parameters["CRNAME"] + "/data", + }, + { + Name: "AMQ_CLUSTERED", + Value: "true", + }, + { + Name: "AMQ_REPLICAS", + Value: "1", + }, + { + Name: "AMQ_CLUSTER_USER", + Value: clusterUser, + }, + { + Name: "AMQ_CLUSTER_PASSWORD", + Value: clusterPassword, + }, + { + Name: "POD_NAMESPACE", + ValueFrom: &v1.EnvVarSource{ + FieldRef: &v1.ObjectFieldSelector{ + FieldPath: "metadata.namespace", + }, + }, + }, + { + Name: "OPENSHIFT_DNS_PING_SERVICE_PORT", + Value: "7800", + }, + }, + Command: c.GetDrainCommand(), + VolumeMounts: []v1.VolumeMount{ + { + Name: parameters["CRNAME"], + MountPath: "/opt/" + parameters["CRNAME"] + "/data", + }, + }, + }, + }, + // only "Never" or "OnFailure" is allowed + // the difference is that "Never" will cause a new Pod started in case of failure + // while with "onFailure" only the container is restarted. + RestartPolicy: v1.RestartPolicyNever, + }, + }, + BackoffLimit: &backOffLimit, + }, + } for _, pvcTemplate := range sts.Spec.VolumeClaimTemplates { - pod.Spec.Volumes = append(pod.Spec.Volumes, corev1.Volume{ // TODO: override existing volumes with the same name + jobSpec.Spec.Template.Spec.Volumes = append(jobSpec.Spec.Template.Spec.Volumes, corev1.Volume{ Name: pvcTemplate.Name, VolumeSource: corev1.VolumeSource{ PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ @@ -845,7 +928,8 @@ func (c *Controller) newPod(sts *appsv1.StatefulSet, ordinal int) (*corev1.Pod, }) } - return &pod, nil + return jobSpec, nil + } func getPodName(sts *appsv1.StatefulSet, ordinal int) string { diff --git a/pkg/draincontroller/draincontroller_test.go b/pkg/draincontroller/draincontroller_test.go deleted file mode 100644 index cb251ca14..000000000 --- a/pkg/draincontroller/draincontroller_test.go +++ /dev/null @@ -1,49 +0,0 @@ -/* -Copyright 2021. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ -package draincontroller - -import ( - "encoding/json" - "testing" - - . "github.com/onsi/ginkgo/v2" - . "github.com/onsi/gomega" - corev1 "k8s.io/api/core/v1" -) - -func TestDrainController(t *testing.T) { - RegisterFailHandler(Fail) - RunSpecs(t, "Drain Controller Suite") -} - -var _ = Describe("Drain Controller Test", func() { - Context("Global template test", func() { - It("testing global template has correct openshift ping dns service port", func() { - pod := corev1.Pod{} - err := json.Unmarshal([]byte(globalPodTemplateJson), &pod) - Expect(err).Should(Succeed()) - - var servicePort string - envVars := pod.Spec.Containers[0].Env - for _, v := range envVars { - if v.Name == "OPENSHIFT_DNS_PING_SERVICE_PORT" { - servicePort = v.Value - } - } - Expect(servicePort).To(Equal("7800")) - }) - }) -}) diff --git a/pkg/utils/common/common.go b/pkg/utils/common/common.go index a8216946b..6f51ccc97 100644 --- a/pkg/utils/common/common.go +++ b/pkg/utils/common/common.go @@ -8,6 +8,7 @@ import ( "time" "github.com/blang/semver/v4" + batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/manager" @@ -196,3 +197,12 @@ func resolveVersionComponents(desired string) (major, minor, patch *uint64) { func Int32ToPtr(v int32) *int32 { return &v } + +func IsJobStatusConditionTrue(jobConditions []batchv1.JobCondition, jobConditionType batchv1.JobConditionType) bool { + for _, jc := range jobConditions { + if jc.Type == jobConditionType && jc.Status == corev1.ConditionTrue { + return true + } + } + return false +}