diff --git a/controllers/rediscluster_controller.go b/controllers/rediscluster_controller.go index d3f6e0df6..ccbeebf8c 100644 --- a/controllers/rediscluster_controller.go +++ b/controllers/rediscluster_controller.go @@ -99,6 +99,39 @@ func (r *RedisClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request return ctrl.Result{RequeueAfter: time.Second * 10}, nil } + if followerReplicas < instance.Status.ReadyFollowerReplicas { + reqLogger.Info("Redis cluster is downscaling...", "Ready.ReadyFollowerReplicas", instance.Status.ReadyFollowerReplicas, "Expected.ReadFollowerReplicas", followerReplicas) + + // loop count times to remove the latest leader/follower pod + count := instance.Status.ReadyLeaderReplicas - leaderReplicas + for i := int32(0); i < count; i++ { + reqLogger.Info("Redis cluster is downscaling", "The times of loop", i) + + // Imp if the last index of leader sts is not leader make it then + // check whether the redis is leader or not ? + // if not true then make it leader pod + if !(k8sutils.VerifyLeaderPod(ctx, r.K8sClient, r.Log, instance)) { + // lastLeaderPod is slaving right now Make it the master Pod + // We have to bring a manual failover here to make it a leaderPod + // clusterFailover should also include the clusterReplicate since we have to map the followers to new leader + k8sutils.ClusterFailover(ctx, r.K8sClient, r.Log, instance) + } + // Step 1 Remove the Follower Node + k8sutils.RemoveRedisFollowerNodesFromCluster(ctx, r.K8sClient, r.Log, instance) + // Step 2 Reshard the Cluster + k8sutils.ReshardRedisCluster(r.K8sClient, r.Log, instance, true) + } + reqLogger.Info("Redis cluster is downscaled... Rebalancing the cluster") + // Step 3 Rebalance the cluster + k8sutils.RebalanceRedisCluster(r.K8sClient, r.Log, instance) + reqLogger.Info("Redis cluster is downscaled... Rebalancing the cluster is done") + err = k8sutils.UpdateRedisClusterStatus(instance, status.RedisClusterReady, status.ReadyClusterReason, leaderReplicas, leaderReplicas, r.Dk8sClient) + if err != nil { + return ctrl.Result{}, err + } + return ctrl.Result{RequeueAfter: time.Second * 60}, nil + } + // Mark the cluster status as initializing if there are no leader or follower nodes if (instance.Status.ReadyLeaderReplicas == 0 && instance.Status.ReadyFollowerReplicas == 0) || instance.Status.ReadyLeaderReplicas != leaderReplicas { diff --git a/controllers/redisreplication_controller.go b/controllers/redisreplication_controller.go index 4f2c08000..e3a952123 100644 --- a/controllers/redisreplication_controller.go +++ b/controllers/redisreplication_controller.go @@ -73,8 +73,26 @@ func (r *RedisReplicationReconciler) Reconcile(ctx context.Context, req ctrl.Req // Check that the Leader and Follower are ready in redis replication if redisReplicationInfo.Status.ReadyReplicas != totalReplicas { - reqLogger.Info("Redis replication nodes are not ready yet", "Ready.Replicas", strconv.Itoa(int(redisReplicationInfo.Status.ReadyReplicas)), "Expected.Replicas", totalReplicas) - return ctrl.Result{RequeueAfter: time.Second * 60}, nil + var realMaster string + masterNodes := k8sutils.GetRedisNodesByRole(ctx, r.K8sClient, r.Log, instance, "master") + slaveNodes := k8sutils.GetRedisNodesByRole(ctx, r.K8sClient, r.Log, instance, "slave") + + if redisReplicationInfo.Status.ReadyReplicas == 0 { + reqLogger.Info("Redis replication nodes are not ready yet", "Ready.Replicas", strconv.Itoa(int(redisReplicationInfo.Status.ReadyReplicas)), "Expected.Replicas", totalReplicas) + return ctrl.Result{RequeueAfter: time.Second * 60}, nil + } + + reqLogger.Info("The number of Redis replication replicas is less than the desired status\n", "Ready.Replicas", strconv.Itoa(int(redisReplicationInfo.Status.ReadyReplicas)), "Expected.Replicas", totalReplicas) + if len(masterNodes) == int(leaderReplicas) && followerReplicas != 0 && len(slaveNodes) != 0 { + realMaster = k8sutils.GetRedisReplicationRealMaster(ctx, r.K8sClient, r.Log, instance, masterNodes) + if err = k8sutils.UpdateRoleLabelPod(ctx, r.K8sClient, r.Log, instance, "master", []string{realMaster}); err != nil { + return ctrl.Result{Requeue: true}, err + } + if err = k8sutils.UpdateRoleLabelPod(ctx, r.K8sClient, r.Log, instance, "slave", slaveNodes); err != nil { + return ctrl.Result{RequeueAfter: time.Second * 1}, err + } + } + return ctrl.Result{RequeueAfter: time.Second * 1}, nil } var realMaster string @@ -86,15 +104,29 @@ func (r *RedisReplicationReconciler) Reconcile(ctx context.Context, req ctrl.Req if len(slaveNodes) == 0 { realMaster = masterNodes[0] } - err := k8sutils.CreateMasterSlaveReplication(ctx, r.K8sClient, r.Log, instance, masterNodes, realMaster) + if err = k8sutils.UpdateRoleLabelPod(ctx, r.K8sClient, r.Log, instance, "master", []string{realMaster}); err != nil { + return ctrl.Result{Requeue: true}, err + } + err = k8sutils.CreateMasterSlaveReplication(ctx, r.K8sClient, r.Log, instance, masterNodes, realMaster) if err != nil { - return ctrl.Result{RequeueAfter: time.Second * 60}, err + return ctrl.Result{Requeue: true}, err + } + if err = k8sutils.UpdateRoleLabelPod(ctx, r.K8sClient, r.Log, instance, "slave", slaveNodes); err != nil { + return ctrl.Result{Requeue: true}, err } } realMaster = k8sutils.GetRedisReplicationRealMaster(ctx, r.K8sClient, r.Log, instance, masterNodes) - if err := r.UpdateRedisReplicationMaster(ctx, instance, realMaster); err != nil { + slaveNodes := k8sutils.GetRedisNodesByRole(ctx, r.K8sClient, r.Log, instance, "slave") + if err = r.UpdateRedisReplicationMaster(ctx, instance, realMaster); err != nil { + return ctrl.Result{}, err + } + if err = k8sutils.UpdateRoleLabelPod(ctx, r.K8sClient, r.Log, instance, "master", []string{realMaster}); err != nil { return ctrl.Result{}, err } + if err = k8sutils.UpdateRoleLabelPod(ctx, r.K8sClient, r.Log, instance, "slave", slaveNodes); err != nil { + return ctrl.Result{}, err + } + reqLogger.Info("Will reconcile redis operator in again 10 seconds") return ctrl.Result{RequeueAfter: time.Second * 10}, nil } diff --git a/k8sutils/redis-replication.go b/k8sutils/redis-replication.go index 9bef5a9eb..9f0e1ae29 100644 --- a/k8sutils/redis-replication.go +++ b/k8sutils/redis-replication.go @@ -6,6 +6,7 @@ import ( redisv1beta2 "github.com/OT-CONTAINER-KIT/redis-operator/api/v1beta2" "github.com/OT-CONTAINER-KIT/redis-operator/pkg/util" "github.com/go-logr/logr" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/dynamic" "k8s.io/client-go/kubernetes" "k8s.io/utils/ptr" @@ -31,6 +32,8 @@ func CreateReplicationService(cr *redisv1beta2.RedisReplication, cl kubernetes.I } objectMetaInfo := generateObjectMetaInformation(cr.ObjectMeta.Name, cr.Namespace, labels, annotations) headlessObjectMetaInfo := generateObjectMetaInformation(cr.ObjectMeta.Name+"-headless", cr.Namespace, labels, annotations) + masterObjectMetaInfo := generateObjectMetaInformation(cr.ObjectMeta.Name+"-leader", cr.Namespace, labels, annotations) + slaveObjectMetaInfo := generateObjectMetaInformation(cr.ObjectMeta.Name+"-follower", cr.Namespace, labels, annotations) additionalObjectMetaInfo := generateObjectMetaInformation(cr.ObjectMeta.Name+"-additional", cr.Namespace, labels, generateServiceAnots(cr.ObjectMeta, additionalServiceAnnotations, epp)) err := CreateOrUpdateService(cr.Namespace, headlessObjectMetaInfo, redisReplicationAsOwner(cr), disableMetrics, true, "ClusterIP", redisPort, cl) if err != nil { @@ -51,6 +54,14 @@ func CreateReplicationService(cr *redisv1beta2.RedisReplication, cl kubernetes.I logger.Error(err, "Cannot create additional service for Redis Replication") return err } + err = CreateOrUpdateService(cr.Namespace, masterObjectMetaInfo, redisReplicationAsOwner(cr), disableMetrics, false, additionalServiceType, redisPort, cl) + if err != nil { + logger.Error(err, "Cannot create additional service for Redis Replication") + } + err = CreateOrUpdateService(cr.Namespace, slaveObjectMetaInfo, redisReplicationAsOwner(cr), disableMetrics, false, additionalServiceType, redisPort, cl) + if err != nil { + logger.Error(err, "Cannot create additional service for Redis Replication") + } return nil } @@ -218,3 +229,22 @@ func IsRedisReplicationReady(ctx context.Context, logger logr.Logger, client kub } return true } + +func UpdateRoleLabelPod(ctx context.Context, cl kubernetes.Interface, logger logr.Logger, cr *redisv1beta2.RedisReplication, role string, nodes []string) error { + for _, node := range nodes { + pod, err := cl.CoreV1().Pods(cr.Namespace).Get(context.TODO(), node, metav1.GetOptions{}) + if err != nil { + logger.Error(err, "Cannot get redis replication pod") + return err + } + // set Label redis-role + metav1.SetMetaDataLabel(&pod.ObjectMeta, "redis-role", role) + // update Label + _, err = cl.CoreV1().Pods(cr.Namespace).Update(context.TODO(), pod, metav1.UpdateOptions{}) + if err != nil { + logger.Error(err, "Cannot update redis replication pod") + return err + } + } + return nil +} diff --git a/k8sutils/services.go b/k8sutils/services.go index 627117277..95672f2f5 100644 --- a/k8sutils/services.go +++ b/k8sutils/services.go @@ -36,13 +36,23 @@ func generateServiceDef(serviceMeta metav1.ObjectMeta, epp exporterPortProvider, } else { PortName = "redis-client" } + selectorLabels := serviceMeta.GetLabels() + if serviceMeta.GetName() == "redis-replication-leader" { + selectorLabels["redis-role"] = "master" + } + if serviceMeta.GetName() == "redis-replication-follower" { + selectorLabels["redis-role"] = "slave" + } + if serviceMeta.GetName() == "redis-replication-follower" { + selectorLabels["redis-role"] = "slave" + } service := &corev1.Service{ TypeMeta: generateMetaInformation("Service", "v1"), ObjectMeta: serviceMeta, Spec: corev1.ServiceSpec{ Type: generateServiceType(serviceType), ClusterIP: "", - Selector: serviceMeta.GetLabels(), + Selector: selectorLabels, Ports: []corev1.ServicePort{ { Name: PortName, diff --git a/k8sutils/services_test.go b/k8sutils/services_test.go index 5906d61db..8e1a45fcc 100644 --- a/k8sutils/services_test.go +++ b/k8sutils/services_test.go @@ -232,6 +232,90 @@ func TestGenerateServiceDef(t *testing.T) { }, }, }, + { + name: "Test redis-replication-leader with ClusterIP service type and metrics enabled", + serviceMeta: metav1.ObjectMeta{ + Name: "test-redis-replication-leader", + Labels: map[string]string{ + "redis-role": "master", + }, + }, + enableMetrics: defaultExporterPortProvider, + headless: false, + serviceType: "ClusterIP", + port: redisPort, + expected: &corev1.Service{ + TypeMeta: metav1.TypeMeta{ + Kind: "Service", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "test-redis-replication-leader", + Labels: map[string]string{ + "redis-role": "master", + }, + OwnerReferences: []metav1.OwnerReference{ + {}, + }, + }, + Spec: corev1.ServiceSpec{ + Ports: []corev1.ServicePort{ + { + Name: "redis-client", + Port: redisPort, + TargetPort: intstr.FromInt(int(redisPort)), + Protocol: corev1.ProtocolTCP, + }, + *enableMetricsPort(redisExporterPort), + }, + Selector: map[string]string{"redis-role": "master"}, + ClusterIP: "", + Type: corev1.ServiceTypeClusterIP, + }, + }, + }, + { + name: "Test redis-replication-follower with ClusterIP service type and metrics enabled", + serviceMeta: metav1.ObjectMeta{ + Name: "test-redis-replication-follower", + Labels: map[string]string{ + "redis-role": "slave", + }, + }, + enableMetrics: defaultExporterPortProvider, + headless: false, + serviceType: "ClusterIP", + port: redisPort, + expected: &corev1.Service{ + TypeMeta: metav1.TypeMeta{ + Kind: "Service", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "test-redis-replication-follower", + Labels: map[string]string{ + "redis-role": "slave", + }, + OwnerReferences: []metav1.OwnerReference{ + {}, + }, + }, + Spec: corev1.ServiceSpec{ + Ports: []corev1.ServicePort{ + { + Name: "redis-client", + Port: redisPort, + TargetPort: intstr.FromInt(int(redisPort)), + Protocol: corev1.ProtocolTCP, + }, + *enableMetricsPort(redisExporterPort), + }, + Selector: map[string]string{"redis-role": "slave"}, + ClusterIP: "", + Type: corev1.ServiceTypeClusterIP, + }, + }, + }, } for _, tt := range tests {