diff --git a/docs/configure.md b/docs/configure.md index 12640b3b5..5150b5c71 100644 --- a/docs/configure.md +++ b/docs/configure.md @@ -155,7 +155,13 @@ spec: - `Cluster`: 如果`service`中不显式指定 `externalTrafficPolicy` 字段的值,则默认为`Cluster`;这种模式下,可以通过给服务添加相关注解来指定LB监听器backend的添加规则 `Cluster`模式下,目前支持的 `service` 注解有: -- 使用指定Label的Worker节点作为后端服务器, `service.beta.kubernetes.io/qingcloud-lb-backend-label`,可以指定多个Label,多个Label以逗号分隔。例如:`key1=value1,key2=value2`,多个Label之间是And关系。同时,在需要成为后端服务器的Worker节点打上`key1=value1,key2=value2`的Label;只有服务指定的所有Label的key和value都和Worker节点匹配时,Worker节点会被选为服务的后端服务器;没有此注解则添加所有Worker节点为backend;通过注解过滤节点后,如果没有满足条件的节点,为了避免服务中断,会添加所有Worker节点为后端服务器; +> 以下各种过滤节点的方式不能结合使用,如果指定了多个,则会按照以下注解的说明顺序选用第一个匹配到的注解,并按照该方法过滤节点; +- 使用指定Label的Worker节点作为后端服务器, `service.beta.kubernetes.io/qingcloud-lb-backend-label`,可以指定多个Label,多个Label以逗号分隔。例如:`key1=value1,key2=value2`,多个Label之间是And关系。同时,在需要成为后端服务器的Worker节点打上`key1=value1,key2=value2`的Label;只有服务指定的所有Label的key和value都和Worker节点匹配时,Worker节点会被选为服务的后端服务器;特殊情况说明: + - 没有此注解则添加所有Worker节点为backend; + - 通过注解过滤节点后,如果没有满足条件的节点,为了避免服务中断,会添加所有Worker节点为后端服务器; +- 使用指定数量的Worker节点作为后端服务器,`service.beta.kubernetes.io/qingcloud-lb-backend-count`,通过此注解指定该服务使用的lb backend节点数量;如果服务添加了该注解,就认为用户想使用该特性;如果指定的数量为0或者不在可用节点数量的范围内,则使用默认值:集群所有节点的1/3;如果集群中节点状态发生变化,但是当前服务的lb后端数量就是用户指定的数量,并且已添加的所有lb后端节点在集群中都是ready状态的,则不会更新lb的backend;默认值特殊情况说明: + - 如果集群总worker节点数少于3个,则添加所有worker节点为backend,不再按照比例计算节点数; + - 如果集群总worker节点数多于3个,若按照比例计算后少于3个,则设置为3个; > 本章节所说的"所有Worker节点"特指所有 `Ready` 状态的Worker节点; @@ -184,6 +190,7 @@ spec: ``` #### Cluster模式 +##### 使用指定Label的Worker节点作为后端服务器 将服务的`externalTrafficPolicy`指定为`Cluster`,并在服务的注解`service.beta.kubernetes.io/qingcloud-lb-backend-label`中指定要添加为backend的worker节点的label: ```yaml @@ -232,6 +239,28 @@ status: ... ``` +##### 使用指定数量的Worker节点作为后端服务器 + +```yaml +kind: Service +apiVersion: v1 +metadata: + name: reuse-lb + annotations: + service.beta.kubernetes.io/qingcloud-load-balancer-eip-strategy: "reuse-lb" + service.beta.kubernetes.io/qingcloud-load-balancer-id: "lb-oglqftju" + service.beta.kubernetes.io/qingcloud-lb-backend-count: "3" +spec: + externalTrafficPolicy: Cluster + selector: + app: mylbapp + type: LoadBalancer + ports: + - name: http + port: 8090 + protocol: TCP + targetPort: 80 +``` ## 配置内网负载均衡器 ### 已知问题 diff --git a/pkg/qingcloud/annotations.go b/pkg/qingcloud/annotations.go index 0b076e059..96f671c28 100644 --- a/pkg/qingcloud/annotations.go +++ b/pkg/qingcloud/annotations.go @@ -80,10 +80,14 @@ const ( ServiceAnnotationListenerServerCertificate = "service.beta.kubernetes.io/qingcloud-lb-listener-cert" // port:protocol, such as "443:https,80:http" ServiceAnnotationListenerProtocol = "service.beta.kubernetes.io/qingcloud-lb-listener-protocol" + // port:timeout, such as "443:50", the value must in range 10 ~ 86400 + ServiceAnnotationListenerTimeout = "service.beta.kubernetes.io/qingcloud-lb-listener-timeout" // 5. Configure backend // backend label, such as "key1=value1,key2=value2" ServiceAnnotationBackendLabel = "service.beta.kubernetes.io/qingcloud-lb-backend-label" + // backend count limit, if value is 0 or greater than cluster ready worker, will use default value : 1/3 of cluster ready worker + ServiceAnnotationBackendCount = "service.beta.kubernetes.io/qingcloud-lb-backend-count" ) type LoadBalancerConfig struct { @@ -105,7 +109,9 @@ type LoadBalancerConfig struct { Protocol *string //backend - BackendLabel string + BackendLabel string + BackendCountConfig string + BackendCountResult int //It's just for defining names, nothing more. NetworkType string @@ -171,6 +177,13 @@ func (qc *QingCloud) ParseServiceLBConfig(cluster string, service *v1.Service) ( if backendLabel, ok := annotation[ServiceAnnotationBackendLabel]; ok { config.BackendLabel = backendLabel } + if backendCount, ok := annotation[ServiceAnnotationBackendCount]; ok { + _, err := strconv.Atoi(backendCount) + if err != nil { + return nil, fmt.Errorf("please spec a valid value of loadBalancer backend count") + } + config.BackendCountConfig = backendCount + } networkType := annotation[ServiceAnnotationLoadBalancerNetworkType] if config.VxNetID == nil && qc.Config.DefaultVxNetForLB != "" { diff --git a/pkg/qingcloud/loadbalancer_test.go b/pkg/qingcloud/loadbalancer_test.go index 27365f7c8..0abca0dd6 100644 --- a/pkg/qingcloud/loadbalancer_test.go +++ b/pkg/qingcloud/loadbalancer_test.go @@ -400,7 +400,7 @@ func TestDiffListeners(t *testing.T) { } for _, tc := range testCases { - toDelete, toAdd := diffListeners(tc.listeners, tc.conf, tc.ports) + toDelete, toAdd, _ := diffListeners(tc.listeners, tc.conf, tc.ports) // fmt.Printf("delete=%s, add=%s", spew.Sdump(toDelete), spew.Sdump(toAdd)) if !reflect.DeepEqual(toDelete, tc.toDelete) || !reflect.DeepEqual(toAdd, tc.toAdd) { t.Fail() diff --git a/pkg/qingcloud/loadbalancer_utils.go b/pkg/qingcloud/loadbalancer_utils.go index f548e204e..449112f11 100644 --- a/pkg/qingcloud/loadbalancer_utils.go +++ b/pkg/qingcloud/loadbalancer_utils.go @@ -1,7 +1,9 @@ package qingcloud import ( + "crypto/rand" "fmt" + "math/big" "strconv" "strings" @@ -152,7 +154,7 @@ func getProtocol(annotationConf map[int]string, port int) *string { } } -func diffListeners(listeners []*apis.LoadBalancerListener, conf *LoadBalancerConfig, ports []v1.ServicePort) (toDelete []*string, toAdd []v1.ServicePort) { +func diffListeners(listeners []*apis.LoadBalancerListener, conf *LoadBalancerConfig, ports []v1.ServicePort) (toDelete []*string, toAdd []v1.ServicePort, toKeep []*apis.LoadBalancerListener) { svcNodePort := make(map[string]int) for _, listener := range listeners { if len(listener.Status.LoadBalancerBackends) > 0 { @@ -197,6 +199,8 @@ func diffListeners(listeners []*apis.LoadBalancerListener, conf *LoadBalancerCon } if delete { toDelete = append(toDelete, listener.Status.LoadBalancerListenerID) + } else { + toKeep = append(toKeep, listener) } } @@ -521,3 +525,18 @@ func equalProtocol(listener *apis.LoadBalancerListener, conf *LoadBalancerConfig } return false } + +func getRandomNodes(nodes []*v1.Node, count int) (result []*v1.Node) { + resultMap := make(map[int64]bool) + length := int64(len(nodes)) + + for i := 0; i < count; { + r, _ := rand.Int(rand.Reader, big.NewInt(length)) + if !resultMap[r.Int64()] { + result = append(result, nodes[r.Int64()]) + resultMap[r.Int64()] = true + i++ + } + } + return +} diff --git a/pkg/qingcloud/qingcloud.go b/pkg/qingcloud/qingcloud.go index 82f139e77..ca0e04bdc 100644 --- a/pkg/qingcloud/qingcloud.go +++ b/pkg/qingcloud/qingcloud.go @@ -11,6 +11,7 @@ import ( "context" "fmt" "io" + "strconv" "github.com/davecgh/go-spew/spew" yaml "gopkg.in/yaml.v2" @@ -27,8 +28,9 @@ import ( ) const ( - ProviderName = "qingcloud" - QYConfigPath = "/etc/qingcloud/config.yaml" + ProviderName = "qingcloud" + QYConfigPath = "/etc/qingcloud/config.yaml" + DefaultBackendCount = 3 ) type Config struct { @@ -232,7 +234,7 @@ func (qc *QingCloud) EnsureLoadBalancer(ctx context.Context, _ string, service * klog.Infof("The loadbalancer %s has the following listeners %s", *lb.Status.LoadBalancerID, spew.Sdump(listenerIDs)) if len(listenerIDs) <= 0 { klog.Infof("creating listeners for loadbalancers %s, service ports %s", *lb.Status.LoadBalancerID, spew.Sdump(service.Spec.Ports)) - if err = qc.createListenersAndBackends(conf, lb, service.Spec.Ports, nodes); err != nil { + if err = qc.createListenersAndBackends(conf, lb, service.Spec.Ports, nodes, service); err != nil { klog.Errorf("createListenersAndBackends for loadbalancer %s error: %v", *lb.Status.LoadBalancerID, err) return nil, err } @@ -244,7 +246,7 @@ func (qc *QingCloud) EnsureLoadBalancer(ctx context.Context, _ string, service * } //update listerner - toDelete, toAdd := diffListeners(listeners, conf, service.Spec.Ports) + toDelete, toAdd, toKeep := diffListeners(listeners, conf, service.Spec.Ports) if len(toDelete) > 0 { klog.Infof("listeners %s will be deleted for lb %s", spew.Sdump(toDelete), *lb.Status.LoadBalancerID) err = qc.Client.DeleteListener(toDelete) @@ -256,7 +258,7 @@ func (qc *QingCloud) EnsureLoadBalancer(ctx context.Context, _ string, service * if len(toAdd) > 0 { klog.Infof("listeners %s will be added for lb %s", spew.Sdump(toAdd), *lb.Status.LoadBalancerID) - err = qc.createListenersAndBackends(conf, lb, toAdd, nodes) + err = qc.createListenersAndBackends(conf, lb, toAdd, nodes, service) if err != nil { return nil, err } @@ -264,30 +266,29 @@ func (qc *QingCloud) EnsureLoadBalancer(ctx context.Context, _ string, service * } //update backend; for example, service annotation for backend label changed - if len(toAdd) == 0 && len(toDelete) == 0 { - for _, listener := range listeners { - toDelete, toAdd := diffBackend(listener, nodes) - - if len(toDelete) > 0 { - klog.Infof("backends %s will be deleted for listener %s(%s) of lb %s", - spew.Sdump(toDelete), *listener.Spec.LoadBalancerListenerName, *listener.Spec.LoadBalancerListenerID, *lb.Status.LoadBalancerID) - err = qc.Client.DeleteBackends(toDelete) - if err != nil { - return nil, err - } - modify = true + for _, listener := range toKeep { + // toDelete, toAdd := diffBackend(listener, nodes) + toDelete, toAdd := qc.diffBackend(listener, nodes, conf, service) + + if len(toDelete) > 0 { + klog.Infof("backends %s will be deleted for listener %s(%s) of lb %s", + spew.Sdump(toDelete), *listener.Spec.LoadBalancerListenerName, *listener.Spec.LoadBalancerListenerID, *lb.Status.LoadBalancerID) + err = qc.Client.DeleteBackends(toDelete) + if err != nil { + return nil, err } + modify = true + } - toAddBackends := generateLoadBalancerBackends(toAdd, listener, service.Spec.Ports) - if len(toAddBackends) > 0 { - klog.Infof("backends %s will be added for listener %s(%s) of lb %s", - spew.Sdump(toAddBackends), *listener.Spec.LoadBalancerListenerName, *listener.Spec.LoadBalancerListenerID, *lb.Status.LoadBalancerID) - _, err = qc.Client.CreateBackends(toAddBackends) - if err != nil { - return nil, err - } - modify = true + toAddBackends := generateLoadBalancerBackends(toAdd, listener, service.Spec.Ports) + if len(toAddBackends) > 0 { + klog.Infof("backends %s will be added for listener %s(%s) of lb %s", + spew.Sdump(toAddBackends), *listener.Spec.LoadBalancerListenerName, *listener.Spec.LoadBalancerListenerID, *lb.Status.LoadBalancerID) + _, err = qc.Client.CreateBackends(toAddBackends) + if err != nil { + return nil, err } + modify = true } } @@ -316,6 +317,7 @@ func (qc *QingCloud) EnsureLoadBalancer(ctx context.Context, _ string, service * //1.2 prepare sg //default sg set by Client auto //1.3 create lb + klog.Infof("creating loadbalance for service %s/%s", service.Namespace, service.Name) lb, err = qc.Client.CreateLB(&apis.LoadBalancer{ Spec: apis.LoadBalancerSpec{ LoadBalancerName: &conf.LoadBalancerName, @@ -331,7 +333,7 @@ func (qc *QingCloud) EnsureLoadBalancer(ctx context.Context, _ string, service * } //create listener - if err = qc.createListenersAndBackends(conf, lb, service.Spec.Ports, nodes); err != nil { + if err = qc.createListenersAndBackends(conf, lb, service.Spec.Ports, nodes, service); err != nil { return nil, err } } else { @@ -382,7 +384,8 @@ func (qc *QingCloud) UpdateLoadBalancer(ctx context.Context, _ string, service * } for _, listener := range listeners { - toDelete, toAdd := diffBackend(listener, nodes) + // toDelete, toAdd := diffBackend(listener, nodes) + toDelete, toAdd := qc.diffBackend(listener, nodes, conf, service) if len(toDelete) > 0 { klog.Infof("backends %s will be deleted for listener %s(%s) of lb %s", @@ -414,7 +417,7 @@ func (qc *QingCloud) UpdateLoadBalancer(ctx context.Context, _ string, service * return qc.Client.UpdateLB(lb.Status.LoadBalancerID) } -func (qc *QingCloud) createListenersAndBackends(conf *LoadBalancerConfig, status *apis.LoadBalancer, ports []v1.ServicePort, nodes []*v1.Node) error { +func (qc *QingCloud) createListenersAndBackends(conf *LoadBalancerConfig, status *apis.LoadBalancer, ports []v1.ServicePort, nodes []*v1.Node, svc *v1.Service) error { listeners, err := generateLoadBalancerListeners(conf, status, ports) if err != nil { klog.Errorf("generateLoadBalancerListeners for loadbalancer %s error: %v", *status.Status.LoadBalancerID, err) @@ -426,7 +429,19 @@ func (qc *QingCloud) createListenersAndBackends(conf *LoadBalancerConfig, status return err } - //create backend + // filter backend nodes by count + if svc.Spec.ExternalTrafficPolicy == v1.ServiceExternalTrafficPolicyTypeCluster && conf.BackendCountConfig != "" { + klog.Infof("service %s/%s has lb backend count annotation, try to get %d random nodes as backend", svc.Namespace, svc.Name, conf.BackendCountResult) + nodes = getRandomNodes(nodes, conf.BackendCountResult) + + var resultNames []string + for _, node := range nodes { + resultNames = append(resultNames, node.Name) + } + klog.Infof("get random nodes result for service %s/%s: %v", svc.Namespace, svc.Name, resultNames) + } + + // create backend for _, listener := range listeners { backends := generateLoadBalancerBackends(nodes, listener, ports) _, err = qc.Client.CreateBackends(backends) @@ -515,7 +530,7 @@ func (qc *QingCloud) filterNodes(ctx context.Context, svc *v1.Service, nodes []* } } } else { - if lbconfog.BackendLabel != "" { + if lbconfog.BackendLabel != "" { // filter by node label klog.Infof("filter nodes for service %s/%s by backend label: %s", svc.Namespace, svc.Name, lbconfog.BackendLabel) // filter by label @@ -543,6 +558,28 @@ func (qc *QingCloud) filterNodes(ctx context.Context, svc *v1.Service, nodes []* klog.Infof("there are no available nodes for service %s/%s, use all nodes!", svc.Namespace, svc.Name) newNodes = nodes } + // clear lb backend count config + lbconfog.BackendCountConfig = "" + } else if lbconfog.BackendCountConfig != "" { //filter by backend count config + var backendCountResult int + + backendCountConfig, _ := strconv.Atoi(lbconfog.BackendCountConfig) + if backendCountConfig > 0 && backendCountConfig <= len(nodes) { + backendCountResult = backendCountConfig + } else { + //invalid count config, use default value (1/3 of all nodes) + if len(nodes) <= 3 { + backendCountResult = len(nodes) + } else { + backendCountResult = len(nodes) / 3 + if backendCountResult < 3 { + backendCountResult = DefaultBackendCount + } + } + } + + lbconfog.BackendCountResult = backendCountResult + newNodes = nodes } else { // no need to filter newNodes = nodes diff --git a/pkg/qingcloud/qingcloud_utils.go b/pkg/qingcloud/qingcloud_utils.go index caa6ea3b4..9baad0b8e 100644 --- a/pkg/qingcloud/qingcloud_utils.go +++ b/pkg/qingcloud/qingcloud_utils.go @@ -4,9 +4,11 @@ import ( "fmt" "github.com/davecgh/go-spew/spew" + v1 "k8s.io/api/core/v1" "k8s.io/klog/v2" "github.com/yunify/qingcloud-cloud-controller-manager/pkg/apis" + "github.com/yunify/qingcloud-cloud-controller-manager/pkg/util" ) func (qc *QingCloud) prepareEip(eipSource *string) (eip *apis.EIP, err error) { @@ -143,3 +145,40 @@ func (qc *QingCloud) updateLBEip(config *LoadBalancerConfig, lb *apis.LoadBalanc return nil } + +func (qc *QingCloud) diffBackend(listener *apis.LoadBalancerListener, nodes []*v1.Node, conf *LoadBalancerConfig, svc *v1.Service) (toDelete []*string, toAdd []*v1.Node) { + var backendLeftID []*string + for _, backend := range listener.Status.LoadBalancerBackends { + if !nodesHasBackend(*backend.Spec.LoadBalancerBackendName, nodes) { + toDelete = append(toDelete, backend.Status.LoadBalancerBackendID) + } else { + backendLeftID = append(backendLeftID, backend.Status.LoadBalancerBackendID) + } + } + + // filter backend nodes by count + if svc.Spec.ExternalTrafficPolicy == v1.ServiceExternalTrafficPolicyTypeCluster && conf.BackendCountConfig != "" { + backendLeftCount := len(listener.Status.LoadBalancerBackends) - len(toDelete) + if backendLeftCount > conf.BackendCountResult { + // delete some + toDelete = append(toDelete, util.GetRandomItems(backendLeftID, backendLeftCount-conf.BackendCountResult)...) + } else { + // add some + var nodeLeft []*v1.Node + for _, node := range nodes { + if !backendsHasNode(node, listener.Status.LoadBalancerBackends) { + nodeLeft = append(nodeLeft, node) + } + } + toAdd = append(toAdd, getRandomNodes(nodeLeft, conf.BackendCountResult-backendLeftCount)...) + } + } else { + for _, node := range nodes { + if !backendsHasNode(node, listener.Status.LoadBalancerBackends) { + toAdd = append(toAdd, node) + } + } + } + + return +} diff --git a/pkg/util/util.go b/pkg/util/util.go index cc34c1b97..9e930c3e7 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -5,6 +5,8 @@ package util import ( + "crypto/rand" + "math/big" "strings" "time" @@ -81,3 +83,18 @@ func TwoArrayEqual(a []int, b []int) bool { } return true } + +func GetRandomItems(items []*string, count int) (result []*string) { + resultMap := make(map[int64]bool) + length := int64(len(items)) + + for i := 0; i < count; { + r, _ := rand.Int(rand.Reader, big.NewInt(length)) + if !resultMap[r.Int64()] { + result = append(result, items[r.Int64()]) + resultMap[r.Int64()] = true + i++ + } + } + return +}