diff --git a/pkg/executor/lb.go b/pkg/executor/lb.go index d83a9b71c..6611fa12f 100644 --- a/pkg/executor/lb.go +++ b/pkg/executor/lb.go @@ -106,7 +106,7 @@ func (q *qingCloudLoadBalanceExecutor) Stop(id string) error { return qcclient.WaitJob(q.jobapi, *output.JobID, operationWaitTimeout, waitInterval) } -func getEipsFromLB(lb *qcservice.LoadBalancer) []string { +func getEipIdsFromLB(lb *qcservice.LoadBalancer) []string { var eips []string if lb == nil { return eips @@ -125,6 +125,25 @@ func getEipsFromLB(lb *qcservice.LoadBalancer) []string { return eips } +func GetEipsFromLB(lb *qcservice.LoadBalancer) []string { + var eips []string + if lb == nil { + return eips + } + var checkEip = func(eip *qcservice.EIP) { + if eip.EIPAddr != nil && *eip.EIPAddr != "" { + eips = append(eips, *eip.EIPAddr) + } + } + for _, eip := range lb.EIPs { + checkEip(eip) + } + for _, eip := range lb.Cluster { + checkEip(eip) + } + return eips +} + func (q *qingCloudLoadBalanceExecutor) Create(input *qcservice.CreateLoadBalancerInput) (*qcservice.LoadBalancer, error) { klog.V(2).Infof("Creating LB: %+v", *input) name := *input.LoadBalancerName @@ -151,7 +170,7 @@ func (q *qingCloudLoadBalanceExecutor) Create(input *qcservice.CreateLoadBalance } else { klog.Infof("Attach tag %s to loadBalancer %s done", q.tagIDs, lbID) } - var eips = getEipsFromLB(lb) + var eips = getEipIdsFromLB(lb) if len(eips) > 0 { err = AttachTagsToResources(q.tagapi, q.tagIDs, eips, "eip") if err != nil { @@ -281,7 +300,7 @@ func (q *qingCloudLoadBalanceExecutor) Delete(id string) error { if err != nil { return newServerErrorOfLoadBalancer(id, "Wait Deletion Done", err) } - var eips = getEipsFromLB(lb) + var eips = getEipIdsFromLB(lb) if len(eips) > 0 { err = DetachTagsFromResources(q.tagapi, q.tagIDs, eips, "eip") if err != nil { diff --git a/pkg/loadbalance/loadbalancer.go b/pkg/loadbalance/loadbalancer.go index 27c6771bc..2fa14fefa 100644 --- a/pkg/loadbalance/loadbalancer.go +++ b/pkg/loadbalance/loadbalancer.go @@ -358,7 +358,7 @@ func (l *LoadBalancer) UpdateQingCloudLB() error { klog.Errorf("Failed to make loadbalancer %s go into effect", l.Name) return err } - return nil + return l.LoadQcLoadBalancer() } // GetService return service of this loadbalancer @@ -502,11 +502,9 @@ func (l *LoadBalancer) GenerateK8sLoadBalancer() error { status.Ingress = append(status.Ingress, corev1.LoadBalancerIngress{IP: *ip}) } } else { - for _, eip := range l.Status.QcLoadBalancer.Cluster { - status.Ingress = append(status.Ingress, corev1.LoadBalancerIngress{IP: *eip.EIPAddr}) - } - for _, ip := range l.Status.QcLoadBalancer.EIPs { - status.Ingress = append(status.Ingress, corev1.LoadBalancerIngress{IP: *ip.EIPAddr}) + var eips = executor.GetEipsFromLB(l.Status.QcLoadBalancer) + for _, e := range eips { + status.Ingress = append(status.Ingress, corev1.LoadBalancerIngress{IP: e}) } } @@ -514,6 +512,7 @@ func (l *LoadBalancer) GenerateK8sLoadBalancer() error { return fmt.Errorf("Have no ip yet") } l.Status.K8sLoadBalancerStatus = status + l.service.Status.LoadBalancer = *status return nil } diff --git a/pkg/qingcloud/loadbalancer_impl.go b/pkg/qingcloud/loadbalancer_impl.go index b96c336f7..209426557 100644 --- a/pkg/qingcloud/loadbalancer_impl.go +++ b/pkg/qingcloud/loadbalancer_impl.go @@ -8,8 +8,8 @@ import ( "github.com/yunify/qingcloud-cloud-controller-manager/pkg/errors" "github.com/yunify/qingcloud-cloud-controller-manager/pkg/executor" "github.com/yunify/qingcloud-cloud-controller-manager/pkg/loadbalance" - v1 "k8s.io/api/core/v1" - cloudprovider "k8s.io/cloud-provider" + "k8s.io/api/core/v1" + "k8s.io/cloud-provider" "k8s.io/klog" ) @@ -51,6 +51,8 @@ func (qc *QingCloud) LoadBalancer() (cloudprovider.LoadBalancer, bool) { // GetLoadBalancer returns whether the specified load balancer exists, and // if so, what its status is. func (qc *QingCloud) GetLoadBalancer(ctx context.Context, _ string, service *v1.Service) (status *v1.LoadBalancerStatus, exists bool, err error) { + patcher := newServicePatcher(qc.corev1interface, service) + defer patcher.Patch() lb, err := qc.newLoadBalance(ctx, service, nil, false) if err != nil { return nil, false, err @@ -80,6 +82,8 @@ func (qc *QingCloud) GetLoadBalancerName(_ context.Context, _ string, service *v // parameters as read-only and not modify them. // Parameter 'clusterName' is the name of the cluster as presented to kube-controller-manager func (qc *QingCloud) EnsureLoadBalancer(ctx context.Context, _ string, service *v1.Service, nodes []*v1.Node) (*v1.LoadBalancerStatus, error) { + patcher := newServicePatcher(qc.corev1interface, service) + defer patcher.Patch() startTime := time.Now() klog.Infof("===============EnsureLoadBalancer for %s", service.Namespace+"/"+service.Name) defer func() { @@ -105,6 +109,8 @@ func (qc *QingCloud) EnsureLoadBalancer(ctx context.Context, _ string, service * // parameters as read-only and not modify them. // Parameter 'clusterName' is the name of the cluster as presented to kube-controller-manager func (qc *QingCloud) UpdateLoadBalancer(ctx context.Context, _ string, service *v1.Service, nodes []*v1.Node) error { + patcher := newServicePatcher(qc.corev1interface, service) + defer patcher.Patch() klog.Infof("===============UpdateLoadBalancer for %s", service.Namespace+"/"+service.Name) startTime := time.Now() defer func() { diff --git a/pkg/qingcloud/patch.go b/pkg/qingcloud/patch.go new file mode 100644 index 000000000..fc879b132 --- /dev/null +++ b/pkg/qingcloud/patch.go @@ -0,0 +1,70 @@ +package qingcloud + +import ( + "fmt" + "k8s.io/klog" +) +import corev1 "k8s.io/client-go/kubernetes/typed/core/v1" +import "k8s.io/api/core/v1" +import "k8s.io/apimachinery/pkg/types" +import "encoding/json" +import "k8s.io/apimachinery/pkg/util/strategicpatch" + +type servicePatcher struct { + kclient corev1.CoreV1Interface + base *v1.Service + updated *v1.Service +} + +func newServicePatcher(kclient corev1.CoreV1Interface, base *v1.Service) servicePatcher { + return servicePatcher{ + kclient: kclient, + base: base.DeepCopy(), + updated: base, + } +} + +func (sp *servicePatcher) serviceName() string { + return fmt.Sprintf("%s/%s", sp.base.Namespace, sp.base.Name) +} + +func (sp *servicePatcher) Patch() { + var err = sp.patch() + if err != nil { + klog.Infof("Failed to patch service %s %+v, error: %+v", sp.serviceName(), sp.updated.Status, err) + } else { + klog.Infof("Patch service %s %+v success", sp.serviceName(), sp.updated.Status) + } +} + +func (sp *servicePatcher) patch() error { + // Reset spec to make sure only patch for Status or ObjectMeta. + sp.updated.Spec = sp.base.Spec + + patchBytes, err := getPatchBytes(sp.base, sp.updated) + if err != nil { + return err + } + + _, err = sp.kclient.Services(sp.base.Namespace).Patch(sp.base.Name, types.StrategicMergePatchType, patchBytes, "status") + return err +} + +func getPatchBytes(oldSvc, newSvc *v1.Service) ([]byte, error) { + oldData, err := json.Marshal(oldSvc) + if err != nil { + return nil, fmt.Errorf("failed to Marshal oldData for svc %s/%s: %v", oldSvc.Namespace, oldSvc.Name, err) + } + + newData, err := json.Marshal(newSvc) + if err != nil { + return nil, fmt.Errorf("failed to Marshal newData for svc %s/%s: %v", newSvc.Namespace, newSvc.Name, err) + } + + patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, v1.Service{}) + if err != nil { + return nil, fmt.Errorf("failed to CreateTwoWayMergePatch for svc %s/%s: %v", oldSvc.Namespace, oldSvc.Name, err) + } + return patchBytes, nil + +} diff --git a/pkg/qingcloud/qingcloud.go b/pkg/qingcloud/qingcloud.go index 26f887f57..c5f264d13 100644 --- a/pkg/qingcloud/qingcloud.go +++ b/pkg/qingcloud/qingcloud.go @@ -16,6 +16,7 @@ import ( yaml "gopkg.in/yaml.v2" "k8s.io/client-go/informers" corev1informer "k8s.io/client-go/informers/core/v1" + corev1 "k8s.io/client-go/kubernetes/typed/core/v1" cloudprovider "k8s.io/cloud-provider" "k8s.io/klog" ) @@ -57,6 +58,7 @@ type QingCloud struct { nodeInformer corev1informer.NodeInformer serviceInformer corev1informer.ServiceInformer + corev1interface corev1.CoreV1Interface } func init() { @@ -158,6 +160,8 @@ func (qc *QingCloud) Initialize(clientBuilder cloudprovider.ControllerClientBuil serviceInformer := sharedInformer.Core().V1().Services() go serviceInformer.Informer().Run(stop) qc.serviceInformer = serviceInformer + + qc.corev1interface = clientset.CoreV1() } func (qc *QingCloud) Clusters() (cloudprovider.Clusters, bool) {