diff --git a/cloudprovider/alibabacloud/multi_nlbs.go b/cloudprovider/alibabacloud/multi_nlbs.go new file mode 100644 index 0000000..38ffdb8 --- /dev/null +++ b/cloudprovider/alibabacloud/multi_nlbs.go @@ -0,0 +1,570 @@ +/* +Copyright 2024 The Kruise Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package alibabacloud + +import ( + "context" + "fmt" + gamekruiseiov1alpha1 "github.com/openkruise/kruise-game/apis/v1alpha1" + "github.com/openkruise/kruise-game/cloudprovider" + cperrors "github.com/openkruise/kruise-game/cloudprovider/errors" + provideroptions "github.com/openkruise/kruise-game/cloudprovider/options" + "github.com/openkruise/kruise-game/cloudprovider/utils" + "github.com/openkruise/kruise-game/pkg/util" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/intstr" + log "k8s.io/klog/v2" + "k8s.io/utils/ptr" + "sigs.k8s.io/controller-runtime/pkg/client" + "strconv" + "strings" + "sync" +) + +const ( + MultiNlbsNetwork = "AlibabaCloud-Multi-NLBs" + AliasMultiNlbs = "Multi-NLBs-Network" + + // ConfigNames defined by OKG + NlbIdNamesConfigName = "NlbIdNames" + + // service annotation defined by OKG + LBIDBelongIndexKey = "game.kruise.io/lb-belong-index" + + // service label defined by OKG + ServiceBelongNetworkTypeKey = "game.kruise.io/network-type" +) + +type MultiNlbsPlugin struct { + maxPort int32 + minPort int32 + blockPorts []int32 + cache [][]bool + // podAllocate format {pod ns/name}: -{lbId: xxx-a, port: -8001 -8002} -{lbId: xxx-b, port: -8001 -8002} + podAllocate map[string]*lbsPorts + mutex sync.RWMutex +} + +type lbsPorts struct { + index int + lbIds []string + ports []int32 + targetPort []int + protocols []corev1.Protocol +} + +func (m *MultiNlbsPlugin) Name() string { + return MultiNlbsNetwork +} + +func (m *MultiNlbsPlugin) Alias() string { + return AliasMultiNlbs +} + +func (m *MultiNlbsPlugin) Init(c client.Client, options cloudprovider.CloudProviderOptions, ctx context.Context) error { + m.mutex.Lock() + defer m.mutex.Unlock() + nlbOptions := options.(provideroptions.AlibabaCloudOptions).NLBOptions + m.minPort = nlbOptions.MinPort + m.maxPort = nlbOptions.MaxPort + m.blockPorts = nlbOptions.BlockPorts + + svcList := &corev1.ServiceList{} + err := c.List(ctx, svcList, client.MatchingLabels{ServiceBelongNetworkTypeKey: MultiNlbsNetwork}) + if err != nil { + return err + } + m.podAllocate, m.cache = initMultiLBCache(svcList.Items, m.maxPort, m.minPort, m.blockPorts) + + log.Infof("[%s] podAllocate cache complete initialization: ", MultiNlbsNetwork) + for podNsName, lps := range m.podAllocate { + log.Infof("[%s] pod %s: %v", MultiNlbsNetwork, podNsName, *lps) + } + return nil +} + +func initMultiLBCache(svcList []corev1.Service, maxPort, minPort int32, blockPorts []int32) (map[string]*lbsPorts, [][]bool) { + podAllocate := make(map[string]*lbsPorts) + cache := make([][]bool, 0) + + for _, svc := range svcList { + index, err := strconv.Atoi(svc.GetAnnotations()[LBIDBelongIndexKey]) + if err != nil { + continue + } + lenCache := len(cache) + for i := lenCache; i <= index; i++ { + cacheLevel := make([]bool, int(maxPort-minPort)+1) + for _, p := range blockPorts { + cacheLevel[int(p-minPort)] = true + } + cache = append(cache, cacheLevel) + } + + ports := make([]int32, 0) + protocols := make([]corev1.Protocol, 0) + targetPorts := make([]int, 0) + for _, port := range svc.Spec.Ports { + cache[index][(port.Port - minPort)] = true + ports = append(ports, port.Port) + protocols = append(protocols, port.Protocol) + targetPorts = append(targetPorts, port.TargetPort.IntValue()) + } + + nsName := svc.GetNamespace() + "/" + svc.Spec.Selector[SvcSelectorKey] + if podAllocate[nsName] == nil { + podAllocate[nsName] = &lbsPorts{ + index: index, + lbIds: []string{svc.Labels[SlbIdLabelKey]}, + ports: ports, + protocols: protocols, + targetPort: targetPorts, + } + } else { + podAllocate[nsName].lbIds = append(podAllocate[nsName].lbIds, svc.Labels[SlbIdLabelKey]) + } + } + return podAllocate, cache +} + +func (m *MultiNlbsPlugin) OnPodAdded(client client.Client, pod *corev1.Pod, ctx context.Context) (*corev1.Pod, cperrors.PluginError) { + return pod, nil +} + +func (m *MultiNlbsPlugin) OnPodUpdated(c client.Client, pod *corev1.Pod, ctx context.Context) (*corev1.Pod, cperrors.PluginError) { + networkManager := utils.NewNetworkManager(pod, c) + + networkStatus, _ := networkManager.GetNetworkStatus() + networkConfig := networkManager.GetNetworkConfig() + conf, err := parseMultiNLBsConfig(networkConfig) + if err != nil { + return pod, cperrors.NewPluginError(cperrors.ParameterError, err.Error()) + } + if networkStatus == nil { + pod, err := networkManager.UpdateNetworkStatus(gamekruiseiov1alpha1.NetworkStatus{ + CurrentNetworkState: gamekruiseiov1alpha1.NetworkNotReady, + }, pod) + return pod, cperrors.ToPluginError(err, cperrors.InternalError) + } + + endPoints := "" + for i, lbId := range conf.idList[0] { + // get svc + lbName := conf.lbNames[lbId] + svc := &corev1.Service{} + err = c.Get(ctx, types.NamespacedName{ + Name: pod.GetName() + "-" + strings.ToLower(lbName), + Namespace: pod.GetNamespace(), + }, svc) + if err != nil { + if errors.IsNotFound(err) { + service, err := m.consSvc(conf, pod, lbName, c, ctx) + if err != nil { + return pod, cperrors.ToPluginError(err, cperrors.ParameterError) + } + return pod, cperrors.ToPluginError(c.Create(ctx, service), cperrors.ApiCallError) + } + return pod, cperrors.NewPluginError(cperrors.ApiCallError, err.Error()) + } + + // old svc remain + if svc.OwnerReferences[0].Kind == "Pod" && svc.OwnerReferences[0].UID != pod.UID { + log.Infof("[%s] waitting old svc %s/%s deleted. old owner pod uid is %s, but now is %s", NlbNetwork, svc.Namespace, svc.Name, svc.OwnerReferences[0].UID, pod.UID) + return pod, nil + } + + // update svc + if util.GetHash(conf) != svc.GetAnnotations()[SlbConfigHashKey] { + networkStatus.CurrentNetworkState = gamekruiseiov1alpha1.NetworkNotReady + pod, err = networkManager.UpdateNetworkStatus(*networkStatus, pod) + if err != nil { + return pod, cperrors.NewPluginError(cperrors.InternalError, err.Error()) + } + service, err := m.consSvc(conf, pod, lbName, c, ctx) + if err != nil { + return pod, cperrors.ToPluginError(err, cperrors.ParameterError) + } + return pod, cperrors.ToPluginError(c.Update(ctx, service), cperrors.ApiCallError) + } + + // disable network + if networkManager.GetNetworkDisabled() && svc.Spec.Type == corev1.ServiceTypeLoadBalancer { + svc.Spec.Type = corev1.ServiceTypeClusterIP + return pod, cperrors.ToPluginError(c.Update(ctx, svc), cperrors.ApiCallError) + } + + // enable network + if !networkManager.GetNetworkDisabled() && svc.Spec.Type == corev1.ServiceTypeClusterIP { + svc.Spec.Type = corev1.ServiceTypeLoadBalancer + return pod, cperrors.ToPluginError(c.Update(ctx, svc), cperrors.ApiCallError) + } + + // network not ready + if svc.Status.LoadBalancer.Ingress == nil { + networkStatus.CurrentNetworkState = gamekruiseiov1alpha1.NetworkNotReady + pod, err = networkManager.UpdateNetworkStatus(*networkStatus, pod) + return pod, cperrors.ToPluginError(err, cperrors.InternalError) + } + + // allow not ready containers + if util.IsAllowNotReadyContainers(networkManager.GetNetworkConfig()) { + toUpDateSvc, err := utils.AllowNotReadyContainers(c, ctx, pod, svc, false) + if err != nil { + return pod, err + } + + if toUpDateSvc { + err := c.Update(ctx, svc) + if err != nil { + return pod, cperrors.ToPluginError(err, cperrors.ApiCallError) + } + } + } + + // network ready + internalAddresses := make([]gamekruiseiov1alpha1.NetworkAddress, 0) + externalAddresses := make([]gamekruiseiov1alpha1.NetworkAddress, 0) + + endPoints = endPoints + svc.Status.LoadBalancer.Ingress[0].Hostname + "/" + lbName + if i != len(conf.idList[0])-1 { + endPoints = endPoints + "," + } + for _, port := range svc.Spec.Ports { + instrIPort := port.TargetPort + instrEPort := intstr.FromInt(int(port.Port)) + internalAddress := gamekruiseiov1alpha1.NetworkAddress{ + IP: pod.Status.PodIP, + Ports: []gamekruiseiov1alpha1.NetworkPort{ + { + Name: port.Name, + Port: &instrIPort, + Protocol: port.Protocol, + }, + }, + } + externalAddress := gamekruiseiov1alpha1.NetworkAddress{ + EndPoint: endPoints, + IP: svc.Status.LoadBalancer.Ingress[0].IP, + Ports: []gamekruiseiov1alpha1.NetworkPort{ + { + Name: port.Name, + Port: &instrEPort, + Protocol: port.Protocol, + }, + }, + } + internalAddresses = append(internalAddresses, internalAddress) + externalAddresses = append(externalAddresses, externalAddress) + } + + networkStatus.InternalAddresses = internalAddresses + networkStatus.ExternalAddresses = externalAddresses + } + + networkStatus.CurrentNetworkState = gamekruiseiov1alpha1.NetworkReady + pod, err = networkManager.UpdateNetworkStatus(*networkStatus, pod) + return pod, cperrors.ToPluginError(err, cperrors.InternalError) +} + +func (m *MultiNlbsPlugin) OnPodDeleted(c client.Client, pod *corev1.Pod, ctx context.Context) cperrors.PluginError { + networkManager := utils.NewNetworkManager(pod, c) + networkConfig := networkManager.GetNetworkConfig() + sc, err := parseMultiNLBsConfig(networkConfig) + if err != nil { + return cperrors.NewPluginError(cperrors.ApiCallError, err.Error()) + } + + var podKeys []string + if sc.isFixed { + gss, err := util.GetGameServerSetOfPod(pod, c, ctx) + if err != nil && !errors.IsNotFound(err) { + return cperrors.ToPluginError(err, cperrors.ApiCallError) + } + // gss exists in cluster, do not deAllocate. + if err == nil && gss.GetDeletionTimestamp() == nil { + return nil + } + // gss not exists in cluster, deAllocate all the ports related to it. + for key := range m.podAllocate { + gssName := pod.GetLabels()[gamekruiseiov1alpha1.GameServerOwnerGssKey] + if strings.Contains(key, pod.GetNamespace()+"/"+gssName) { + podKeys = append(podKeys, key) + } + } + } else { + podKeys = append(podKeys, pod.GetNamespace()+"/"+pod.GetName()) + } + + for _, podKey := range podKeys { + m.deAllocate(podKey) + } + + return nil +} + +func init() { + multiNlbsPlugin := MultiNlbsPlugin{ + mutex: sync.RWMutex{}, + } + alibabaCloudProvider.registerPlugin(&multiNlbsPlugin) +} + +type multiNLBsConfig struct { + lbNames map[string]string + idList [][]string + targetPorts []int + protocols []corev1.Protocol + isFixed bool + *nlbHealthConfig +} + +func (m *MultiNlbsPlugin) consSvc(conf *multiNLBsConfig, pod *corev1.Pod, lbName string, c client.Client, ctx context.Context) (*corev1.Service, error) { + podNsName := pod.GetNamespace() + "/" + pod.GetName() + podLbsPorts, err := m.allocate(conf, podNsName) + if err != nil { + return nil, err + } + + var selectId string + for _, lbId := range podLbsPorts.lbIds { + if conf.lbNames[lbId] == lbName { + selectId = lbId + break + } + } + + svcPorts := make([]corev1.ServicePort, 0) + for i := 0; i < len(podLbsPorts.ports); i++ { + svcPorts = append(svcPorts, corev1.ServicePort{ + Name: strconv.Itoa(podLbsPorts.targetPort[i]) + "-" + strings.ToLower(string(podLbsPorts.protocols[i])), + Port: podLbsPorts.ports[i], + TargetPort: intstr.FromInt(podLbsPorts.targetPort[i]), + Protocol: podLbsPorts.protocols[i], + }) + } + + loadBalancerClass := "alibabacloud.com/nlb" + + svcAnnotations := map[string]string{ + SlbListenerOverrideKey: "true", + SlbIdAnnotationKey: selectId, + SlbConfigHashKey: util.GetHash(conf), + LBHealthCheckFlagAnnotationKey: conf.lBHealthCheckFlag, + } + if conf.lBHealthCheckFlag == "on" { + svcAnnotations[LBHealthCheckTypeAnnotationKey] = conf.lBHealthCheckType + svcAnnotations[LBHealthCheckConnectPortAnnotationKey] = conf.lBHealthCheckConnectPort + svcAnnotations[LBHealthCheckConnectTimeoutAnnotationKey] = conf.lBHealthCheckConnectTimeout + svcAnnotations[LBHealthCheckIntervalAnnotationKey] = conf.lBHealthCheckInterval + svcAnnotations[LBHealthyThresholdAnnotationKey] = conf.lBHealthyThreshold + svcAnnotations[LBUnhealthyThresholdAnnotationKey] = conf.lBUnhealthyThreshold + if conf.lBHealthCheckType == "http" { + svcAnnotations[LBHealthCheckDomainAnnotationKey] = conf.lBHealthCheckDomain + svcAnnotations[LBHealthCheckUriAnnotationKey] = conf.lBHealthCheckUri + svcAnnotations[LBHealthCheckMethodAnnotationKey] = conf.lBHealthCheckMethod + } + } + svcAnnotations[LBIDBelongIndexKey] = strconv.Itoa(podLbsPorts.index) + + return &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: pod.GetName() + "-" + strings.ToLower(lbName), + Namespace: pod.GetNamespace(), + Annotations: svcAnnotations, + Labels: map[string]string{ + ServiceBelongNetworkTypeKey: MultiNlbsNetwork, + }, + OwnerReferences: getSvcOwnerReference(c, ctx, pod, conf.isFixed), + }, + Spec: corev1.ServiceSpec{ + AllocateLoadBalancerNodePorts: ptr.To[bool](false), + ExternalTrafficPolicy: corev1.ServiceExternalTrafficPolicyTypeLocal, + Type: corev1.ServiceTypeLoadBalancer, + Selector: map[string]string{ + SvcSelectorKey: pod.GetName(), + }, + Ports: svcPorts, + LoadBalancerClass: &loadBalancerClass, + }, + }, nil +} + +func (m *MultiNlbsPlugin) allocate(conf *multiNLBsConfig, nsName string) (*lbsPorts, error) { + m.mutex.Lock() + defer m.mutex.Unlock() + + // check if pod is already allocated + if m.podAllocate[nsName] != nil { + return m.podAllocate[nsName], nil + } + + // if the pod has not been allocated, allocate new ports to it + var ports []int32 + needNum := len(conf.targetPorts) + index := -1 + + // init cache according to conf.idList + lenCache := len(m.cache) + for i := lenCache; i < len(conf.idList); i++ { + cacheLevel := make([]bool, int(m.maxPort-m.minPort)+1) + for _, p := range m.blockPorts { + cacheLevel[int(p-m.minPort)] = true + } + m.cache = append(m.cache, cacheLevel) + } + + // find allocated ports + for i := 0; i < len(m.cache); i++ { + sum := 0 + ports = make([]int32, 0) + for j := 0; j < len(m.cache[i]); j++ { + if !m.cache[i][j] { + ports = append(ports, int32(j)+m.minPort) + sum++ + if sum == needNum { + index = i + break + } + } + } + if index != -1 { + break + } + } + + if index == -1 { + return nil, fmt.Errorf("no available ports found") + } + for _, port := range ports { + m.cache[index][port-m.minPort] = true + } + m.podAllocate[nsName] = &lbsPorts{ + index: index, + lbIds: conf.idList[index], + ports: ports, + protocols: conf.protocols, + targetPort: conf.targetPorts, + } + log.Infof("[%s] pod %s allocated: lbIds %v; ports %v", MultiNlbsNetwork, nsName, conf.idList[index], ports) + return m.podAllocate[nsName], nil +} + +func (m *MultiNlbsPlugin) deAllocate(nsName string) { + m.mutex.Lock() + defer m.mutex.Unlock() + + podLbsPorts := m.podAllocate[nsName] + if podLbsPorts == nil { + return + } + for _, port := range podLbsPorts.ports { + m.cache[podLbsPorts.index][port-m.minPort] = false + } + delete(m.podAllocate, nsName) + + log.Infof("[%s] pod %s deallocate: lbIds %s ports %v", MultiNlbsNetwork, nsName, podLbsPorts.lbIds, podLbsPorts.ports) +} + +func parseMultiNLBsConfig(conf []gamekruiseiov1alpha1.NetworkConfParams) (*multiNLBsConfig, error) { + // lbNames format {id}: {name} + lbNames := make(map[string]string) + idList := make([][]string, 0) + nameNums := make(map[string]int) + ports := make([]int, 0) + protocols := make([]corev1.Protocol, 0) + isFixed := false + + for _, c := range conf { + switch c.Name { + case NlbIdNamesConfigName: + for _, nlbIdNamesConfig := range strings.Split(c.Value, ",") { + if nlbIdNamesConfig != "" { + idName := strings.Split(nlbIdNamesConfig, "/") + if len(idName) != 2 { + return nil, fmt.Errorf("invalid NlbIdNames %s. You should input as the format {nlb-id-0}/{name-0}", c.Value) + } + + id := idName[0] + name := idName[1] + + nameNum := nameNums[name] + if nameNum >= len(idList) { + idList = append(idList, []string{id}) + } else { + idList[nameNum] = append(idList[nameNum], id) + } + nameNums[name]++ + lbNames[id] = name + } + } + case PortProtocolsConfigName: + for _, pp := range strings.Split(c.Value, ",") { + ppSlice := strings.Split(pp, "/") + port, err := strconv.Atoi(ppSlice[0]) + if err != nil { + return nil, fmt.Errorf("invalid PortProtocols %s", c.Value) + } + ports = append(ports, port) + if len(ppSlice) != 2 { + protocols = append(protocols, corev1.ProtocolTCP) + } else { + protocols = append(protocols, corev1.Protocol(ppSlice[1])) + } + } + case FixedConfigName: + v, err := strconv.ParseBool(c.Value) + if err != nil { + return nil, fmt.Errorf("invalid Fixed %s", c.Value) + } + isFixed = v + } + } + + // check idList + if len(idList) == 0 { + return nil, fmt.Errorf("invalid NlbIdNames. You should input as the format {nlb-id-0}/{name-0}") + } + num := len(idList[0]) + for i := 1; i < len(idList); i++ { + if num != len(idList[i]) { + return nil, fmt.Errorf("invalid NlbIdNames. The number of names should be same") + } + num = len(idList[i]) + } + + // check ports & protocols + if len(ports) == 0 || len(protocols) == 0 { + return nil, fmt.Errorf("invalid PortProtocols, which can not be empty") + } + + nlbHealthConfig, err := parseNlbHealthConfig(conf) + if err != nil { + return nil, err + } + + return &multiNLBsConfig{ + lbNames: lbNames, + idList: idList, + targetPorts: ports, + protocols: protocols, + isFixed: isFixed, + nlbHealthConfig: nlbHealthConfig, + }, nil +} diff --git a/cloudprovider/alibabacloud/multi_nlbs_test.go b/cloudprovider/alibabacloud/multi_nlbs_test.go new file mode 100644 index 0000000..4e80e7d --- /dev/null +++ b/cloudprovider/alibabacloud/multi_nlbs_test.go @@ -0,0 +1,453 @@ +/* +Copyright 2024 The Kruise Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package alibabacloud + +import ( + gamekruiseiov1alpha1 "github.com/openkruise/kruise-game/apis/v1alpha1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" + "reflect" + "sync" + "testing" +) + +func TestParseMultiNLBsConfig(t *testing.T) { + tests := []struct { + conf []gamekruiseiov1alpha1.NetworkConfParams + multiNLBsConfig *multiNLBsConfig + }{ + // case 0 + { + conf: []gamekruiseiov1alpha1.NetworkConfParams{ + { + Name: NlbIdNamesConfigName, + Value: "id-xx-A/dianxin,id-xx-B/liantong,id-xx-C/dianxin,id-xx-D/liantong", + }, + { + Name: PortProtocolsConfigName, + Value: "80/TCP,80/UDP", + }, + }, + multiNLBsConfig: &multiNLBsConfig{ + lbNames: map[string]string{ + "id-xx-A": "dianxin", + "id-xx-B": "liantong", + "id-xx-C": "dianxin", + "id-xx-D": "liantong", + }, + idList: [][]string{ + { + "id-xx-A", "id-xx-B", + }, + { + "id-xx-C", "id-xx-D", + }, + }, + }, + }, + // case 1 + { + conf: []gamekruiseiov1alpha1.NetworkConfParams{ + { + Name: NlbIdNamesConfigName, + Value: "id-xx-A/dianxin,id-xx-B/dianxin,id-xx-C/dianxin,id-xx-D/liantong,id-xx-E/liantong,id-xx-F/liantong", + }, + { + Name: PortProtocolsConfigName, + Value: "80/TCP,80/UDP", + }, + }, + multiNLBsConfig: &multiNLBsConfig{ + lbNames: map[string]string{ + "id-xx-A": "dianxin", + "id-xx-B": "dianxin", + "id-xx-C": "dianxin", + "id-xx-D": "liantong", + "id-xx-E": "liantong", + "id-xx-F": "liantong", + }, + idList: [][]string{ + { + "id-xx-A", "id-xx-D", + }, + { + "id-xx-B", "id-xx-E", + }, + { + "id-xx-C", "id-xx-F", + }, + }, + }, + }, + } + + for i, tt := range tests { + actual, err := parseMultiNLBsConfig(tt.conf) + if err != nil { + t.Error(err) + } + if !reflect.DeepEqual(actual.lbNames, tt.multiNLBsConfig.lbNames) { + t.Errorf("case %d: parseMultiNLBsConfig lbNames actual: %v, expect: %v", i, actual.lbNames, tt.multiNLBsConfig.lbNames) + } + if !reflect.DeepEqual(actual.idList, tt.multiNLBsConfig.idList) { + t.Errorf("case %d: parseMultiNLBsConfig idList actual: %v, expect: %v", i, actual.idList, tt.multiNLBsConfig.idList) + } + } +} + +func TestAllocate(t *testing.T) { + tests := []struct { + plugin *MultiNlbsPlugin + conf *multiNLBsConfig + nsName string + lbsPorts *lbsPorts + cacheAfter [][]bool + podAllocateAfter map[string]*lbsPorts + }{ + // case 0: cache is nil + { + plugin: &MultiNlbsPlugin{ + maxPort: int32(8002), + minPort: int32(8000), + blockPorts: []int32{8001}, + mutex: sync.RWMutex{}, + podAllocate: make(map[string]*lbsPorts), + cache: make([][]bool, 0), + }, + conf: &multiNLBsConfig{ + lbNames: map[string]string{ + "id-xx-A": "dianxin", + "id-xx-B": "liantong", + "id-xx-C": "dianxin", + "id-xx-D": "liantong", + }, + idList: [][]string{ + { + "id-xx-A", "id-xx-B", + }, + { + "id-xx-C", "id-xx-D", + }, + }, + targetPorts: []int{80, 80}, + protocols: []corev1.Protocol{corev1.ProtocolTCP, corev1.ProtocolUDP}, + }, + nsName: "default/test-0", + lbsPorts: &lbsPorts{ + index: 0, + lbIds: []string{"id-xx-A", "id-xx-B"}, + ports: []int32{8000, 8002}, + targetPort: []int{80, 80}, + protocols: []corev1.Protocol{corev1.ProtocolTCP, corev1.ProtocolUDP}, + }, + cacheAfter: [][]bool{{true, true, true}, {false, true, false}}, + podAllocateAfter: map[string]*lbsPorts{ + "default/test-0": { + index: 0, + lbIds: []string{"id-xx-A", "id-xx-B"}, + ports: []int32{8000, 8002}, + targetPort: []int{80, 80}, + protocols: []corev1.Protocol{corev1.ProtocolTCP, corev1.ProtocolUDP}, + }, + }, + }, + // case 1: cache not nil & new pod + { + plugin: &MultiNlbsPlugin{ + maxPort: int32(8002), + minPort: int32(8000), + blockPorts: []int32{8001}, + mutex: sync.RWMutex{}, + podAllocate: map[string]*lbsPorts{ + "default/test-0": { + index: 0, + lbIds: []string{"id-xx-A", "id-xx-B"}, + ports: []int32{8000}, + targetPort: []int{80}, + protocols: []corev1.Protocol{corev1.ProtocolTCP}, + }, + }, + cache: [][]bool{{true, true, false}}, + }, + conf: &multiNLBsConfig{ + lbNames: map[string]string{ + "id-xx-A": "dianxin", + "id-xx-B": "liantong", + "id-xx-C": "dianxin", + "id-xx-D": "liantong", + }, + idList: [][]string{ + { + "id-xx-A", "id-xx-B", + }, + { + "id-xx-C", "id-xx-D", + }, + }, + targetPorts: []int{80, 80}, + protocols: []corev1.Protocol{corev1.ProtocolTCP, corev1.ProtocolUDP}, + }, + nsName: "default/test-1", + lbsPorts: &lbsPorts{ + index: 1, + lbIds: []string{"id-xx-C", "id-xx-D"}, + ports: []int32{8000, 8002}, + targetPort: []int{80, 80}, + protocols: []corev1.Protocol{corev1.ProtocolTCP, corev1.ProtocolUDP}, + }, + cacheAfter: [][]bool{{true, true, false}, {true, true, true}}, + podAllocateAfter: map[string]*lbsPorts{ + "default/test-0": { + index: 0, + lbIds: []string{"id-xx-A", "id-xx-B"}, + ports: []int32{8000}, + targetPort: []int{80}, + protocols: []corev1.Protocol{corev1.ProtocolTCP}, + }, + "default/test-1": { + index: 1, + lbIds: []string{"id-xx-C", "id-xx-D"}, + ports: []int32{8000, 8002}, + targetPort: []int{80, 80}, + protocols: []corev1.Protocol{corev1.ProtocolTCP, corev1.ProtocolUDP}, + }, + }, + }, + // case 2: cache not nil & old pod + { + plugin: &MultiNlbsPlugin{ + maxPort: int32(8002), + minPort: int32(8000), + blockPorts: []int32{8001}, + mutex: sync.RWMutex{}, + podAllocate: map[string]*lbsPorts{ + "default/test-0": { + index: 0, + lbIds: []string{"id-xx-A", "id-xx-B"}, + ports: []int32{8000}, + targetPort: []int{80}, + protocols: []corev1.Protocol{corev1.ProtocolTCP}, + }, + }, + cache: [][]bool{{true, true, false}}, + }, + conf: &multiNLBsConfig{ + lbNames: map[string]string{ + "id-xx-A": "dianxin", + "id-xx-B": "liantong", + "id-xx-C": "dianxin", + "id-xx-D": "liantong", + }, + idList: [][]string{ + { + "id-xx-A", "id-xx-B", + }, + { + "id-xx-C", "id-xx-D", + }, + }, + targetPorts: []int{80, 80}, + protocols: []corev1.Protocol{corev1.ProtocolTCP, corev1.ProtocolUDP}, + }, + nsName: "default/test-0", + lbsPorts: &lbsPorts{ + index: 0, + lbIds: []string{"id-xx-A", "id-xx-B"}, + ports: []int32{8000}, + targetPort: []int{80}, + protocols: []corev1.Protocol{corev1.ProtocolTCP}, + }, + cacheAfter: [][]bool{{true, true, false}}, + podAllocateAfter: map[string]*lbsPorts{ + "default/test-0": { + index: 0, + lbIds: []string{"id-xx-A", "id-xx-B"}, + ports: []int32{8000}, + targetPort: []int{80}, + protocols: []corev1.Protocol{corev1.ProtocolTCP}, + }, + }, + }, + } + + for i, tt := range tests { + plugin := tt.plugin + lbsPorts, err := plugin.allocate(tt.conf, tt.nsName) + if err != nil { + t.Error(err) + } + if !reflect.DeepEqual(lbsPorts, tt.lbsPorts) { + t.Errorf("case %d: allocate actual: %v, expect: %v", i, lbsPorts, tt.lbsPorts) + } + if !reflect.DeepEqual(plugin.podAllocate, tt.podAllocateAfter) { + t.Errorf("case %d: podAllocate actual: %v, expect: %v", i, plugin.podAllocate, tt.podAllocateAfter) + } + if !reflect.DeepEqual(plugin.cache, tt.cacheAfter) { + t.Errorf("case %d: cache actual: %v, expect: %v", i, plugin.cache, tt.cacheAfter) + } + } +} + +func TestDeAllocate(t *testing.T) { + tests := []struct { + plugin *MultiNlbsPlugin + nsName string + cacheAfter [][]bool + podAllocateAfter map[string]*lbsPorts + }{ + { + plugin: &MultiNlbsPlugin{ + maxPort: int32(8002), + minPort: int32(8000), + blockPorts: []int32{8001}, + mutex: sync.RWMutex{}, + podAllocate: map[string]*lbsPorts{ + "default/test-0": { + index: 0, + lbIds: []string{"id-xx-A", "id-xx-B"}, + ports: []int32{8000}, + targetPort: []int{80}, + protocols: []corev1.Protocol{corev1.ProtocolTCP}, + }, + "default/test-1": { + index: 1, + lbIds: []string{"id-xx-C", "id-xx-D"}, + ports: []int32{8000, 8002}, + targetPort: []int{80, 80}, + protocols: []corev1.Protocol{corev1.ProtocolTCP, corev1.ProtocolUDP}, + }, + }, + cache: [][]bool{{true, true, false}, {true, true, true}}, + }, + nsName: "default/test-1", + cacheAfter: [][]bool{{true, true, false}, {false, true, false}}, + podAllocateAfter: map[string]*lbsPorts{ + "default/test-0": { + index: 0, + lbIds: []string{"id-xx-A", "id-xx-B"}, + ports: []int32{8000}, + targetPort: []int{80}, + protocols: []corev1.Protocol{corev1.ProtocolTCP}, + }, + }, + }, + } + + for i, tt := range tests { + plugin := tt.plugin + plugin.deAllocate(tt.nsName) + + if !reflect.DeepEqual(plugin.podAllocate, tt.podAllocateAfter) { + t.Errorf("case %d: podAllocate actual: %v, expect: %v", i, plugin.podAllocate, tt.podAllocateAfter) + } + if !reflect.DeepEqual(plugin.cache, tt.cacheAfter) { + t.Errorf("case %d: cache actual: %v, expect: %v", i, plugin.cache, tt.cacheAfter) + } + } +} + +func TestInitMultiLBCache(t *testing.T) { + tests := []struct { + svcList []corev1.Service + maxPort int32 + minPort int32 + blockPorts []int32 + podAllocate map[string]*lbsPorts + cache [][]bool + }{ + { + svcList: []corev1.Service{ + { + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{ + LBIDBelongIndexKey: "0", + }, + Labels: map[string]string{ + SlbIdLabelKey: "xxx-A", + ServiceBelongNetworkTypeKey: MultiNlbsNetwork, + }, + Namespace: "ns-0", + Name: "name-0", + }, + Spec: corev1.ServiceSpec{ + Type: corev1.ServiceTypeLoadBalancer, + Selector: map[string]string{ + SvcSelectorKey: "pod-A", + }, + Ports: []corev1.ServicePort{ + { + TargetPort: intstr.FromInt(80), + Port: 666, + Protocol: corev1.ProtocolTCP, + }, + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{ + LBIDBelongIndexKey: "0", + }, + Labels: map[string]string{ + SlbIdLabelKey: "xxx-B", + ServiceBelongNetworkTypeKey: MultiNlbsNetwork, + }, + Namespace: "ns-0", + Name: "name-1", + }, + Spec: corev1.ServiceSpec{ + Type: corev1.ServiceTypeLoadBalancer, + Selector: map[string]string{ + SvcSelectorKey: "pod-A", + }, + Ports: []corev1.ServicePort{ + { + TargetPort: intstr.FromInt(80), + Port: 666, + Protocol: corev1.ProtocolTCP, + }, + }, + }, + }, + }, + maxPort: int32(667), + minPort: int32(665), + blockPorts: []int32{}, + podAllocate: map[string]*lbsPorts{ + "ns-0/pod-A": { + index: 0, + lbIds: []string{"xxx-A", "xxx-B"}, + ports: []int32{666}, + targetPort: []int{80}, + protocols: []corev1.Protocol{corev1.ProtocolTCP}, + }, + }, + cache: [][]bool{{false, true, false}}, + }, + } + for i, tt := range tests { + podAllocate, cache := initMultiLBCache(tt.svcList, tt.maxPort, tt.minPort, tt.blockPorts) + + if !reflect.DeepEqual(podAllocate, tt.podAllocate) { + t.Errorf("case %d: podAllocate actual: %v, expect: %v", i, podAllocate, tt.podAllocate) + } + if !reflect.DeepEqual(cache, tt.cache) { + t.Errorf("case %d: cache actual: %v, expect: %v", i, cache, tt.cache) + } + } +} diff --git a/cloudprovider/alibabacloud/nlb.go b/cloudprovider/alibabacloud/nlb.go index 8bd9927..6a25c1b 100644 --- a/cloudprovider/alibabacloud/nlb.go +++ b/cloudprovider/alibabacloud/nlb.go @@ -77,10 +77,14 @@ type NlbPlugin struct { } type nlbConfig struct { - lbIds []string - targetPorts []int - protocols []corev1.Protocol - isFixed bool + lbIds []string + targetPorts []int + protocols []corev1.Protocol + isFixed bool + *nlbHealthConfig +} + +type nlbHealthConfig struct { lBHealthCheckFlag string lBHealthCheckType string lBHealthCheckConnectPort string @@ -442,16 +446,7 @@ func parseNlbConfig(conf []gamekruiseiov1alpha1.NetworkConfParams) (*nlbConfig, ports := make([]int, 0) protocols := make([]corev1.Protocol, 0) isFixed := false - lBHealthCheckFlag := "on" - lBHealthCheckType := "tcp" - lBHealthCheckConnectPort := "0" - lBHealthCheckConnectTimeout := "5" - lBHealthCheckInterval := "10" - lBUnhealthyThreshold := "2" - lBHealthyThreshold := "2" - lBHealthCheckUri := "" - lBHealthCheckDomain := "" - lBHealthCheckMethod := "" + for _, c := range conf { switch c.Name { case NlbIdsConfigName: @@ -480,6 +475,37 @@ func parseNlbConfig(conf []gamekruiseiov1alpha1.NetworkConfParams) (*nlbConfig, continue } isFixed = v + } + } + + nlbHealthConfig, err := parseNlbHealthConfig(conf) + if err != nil { + return nil, err + } + + return &nlbConfig{ + lbIds: lbIds, + protocols: protocols, + targetPorts: ports, + isFixed: isFixed, + nlbHealthConfig: nlbHealthConfig, + }, nil +} + +func parseNlbHealthConfig(conf []gamekruiseiov1alpha1.NetworkConfParams) (*nlbHealthConfig, error) { + lBHealthCheckFlag := "on" + lBHealthCheckType := "tcp" + lBHealthCheckConnectPort := "0" + lBHealthCheckConnectTimeout := "5" + lBHealthCheckInterval := "10" + lBUnhealthyThreshold := "2" + lBHealthyThreshold := "2" + lBHealthCheckUri := "" + lBHealthCheckDomain := "" + lBHealthCheckMethod := "" + + for _, c := range conf { + switch c.Name { case LBHealthCheckFlagConfigName: flag := strings.ToLower(c.Value) if flag != "on" && flag != "off" { @@ -555,11 +581,8 @@ func parseNlbConfig(conf []gamekruiseiov1alpha1.NetworkConfParams) (*nlbConfig, lBHealthCheckMethod = method } } - return &nlbConfig{ - lbIds: lbIds, - protocols: protocols, - targetPorts: ports, - isFixed: isFixed, + + return &nlbHealthConfig{ lBHealthCheckFlag: lBHealthCheckFlag, lBHealthCheckType: lBHealthCheckType, lBHealthCheckConnectPort: lBHealthCheckConnectPort, diff --git a/cloudprovider/alibabacloud/nlb_test.go b/cloudprovider/alibabacloud/nlb_test.go index 5d1430f..4a9b224 100644 --- a/cloudprovider/alibabacloud/nlb_test.go +++ b/cloudprovider/alibabacloud/nlb_test.go @@ -113,20 +113,22 @@ func TestParseNlbConfig(t *testing.T) { }, }, nlbConfig: &nlbConfig{ - lbIds: []string{"xxx-A"}, - targetPorts: []int{80}, - protocols: []corev1.Protocol{corev1.ProtocolTCP}, - isFixed: false, - lBHealthCheckFlag: "on", - lBHealthCheckType: "http", - lBHealthCheckConnectPort: "6000", - lBHealthCheckConnectTimeout: "100", - lBHealthCheckInterval: "30", - lBHealthCheckUri: "/another?valid", - lBHealthCheckDomain: "www.test.com", - lBHealthCheckMethod: "head", - lBHealthyThreshold: "5", - lBUnhealthyThreshold: "5", + lbIds: []string{"xxx-A"}, + targetPorts: []int{80}, + protocols: []corev1.Protocol{corev1.ProtocolTCP}, + isFixed: false, + nlbHealthConfig: &nlbHealthConfig{ + lBHealthCheckFlag: "on", + lBHealthCheckType: "http", + lBHealthCheckConnectPort: "6000", + lBHealthCheckConnectTimeout: "100", + lBHealthCheckInterval: "30", + lBHealthCheckUri: "/another?valid", + lBHealthCheckDomain: "www.test.com", + lBHealthCheckMethod: "head", + lBHealthyThreshold: "5", + lBUnhealthyThreshold: "5", + }, }, }, { @@ -145,20 +147,22 @@ func TestParseNlbConfig(t *testing.T) { }, }, nlbConfig: &nlbConfig{ - lbIds: []string{"xxx-A", "xxx-B"}, - targetPorts: []int{81, 82, 83}, - protocols: []corev1.Protocol{corev1.ProtocolUDP, corev1.ProtocolTCP, corev1.ProtocolTCP}, - isFixed: true, - lBHealthCheckFlag: "on", - lBHealthCheckType: "tcp", - lBHealthCheckConnectPort: "0", - lBHealthCheckConnectTimeout: "5", - lBHealthCheckInterval: "10", - lBUnhealthyThreshold: "2", - lBHealthyThreshold: "2", - lBHealthCheckUri: "", - lBHealthCheckDomain: "", - lBHealthCheckMethod: "", + lbIds: []string{"xxx-A", "xxx-B"}, + targetPorts: []int{81, 82, 83}, + protocols: []corev1.Protocol{corev1.ProtocolUDP, corev1.ProtocolTCP, corev1.ProtocolTCP}, + isFixed: true, + nlbHealthConfig: &nlbHealthConfig{ + lBHealthCheckFlag: "on", + lBHealthCheckType: "tcp", + lBHealthCheckConnectPort: "0", + lBHealthCheckConnectTimeout: "5", + lBHealthCheckInterval: "10", + lBUnhealthyThreshold: "2", + lBHealthyThreshold: "2", + lBHealthCheckUri: "", + lBHealthCheckDomain: "", + lBHealthCheckMethod: "", + }, }, }, } @@ -213,17 +217,19 @@ func TestNlbPlugin_consSvc(t *testing.T) { protocols: []corev1.Protocol{ corev1.ProtocolTCP, }, - isFixed: false, - lBHealthCheckFlag: "on", - lBHealthCheckType: "tcp", - lBHealthCheckConnectPort: "0", - lBHealthCheckConnectTimeout: "5", - lBHealthCheckInterval: "10", - lBUnhealthyThreshold: "2", - lBHealthyThreshold: "2", - lBHealthCheckUri: "", - lBHealthCheckDomain: "", - lBHealthCheckMethod: "", + isFixed: false, + nlbHealthConfig: &nlbHealthConfig{ + lBHealthCheckFlag: "on", + lBHealthCheckType: "tcp", + lBHealthCheckConnectPort: "0", + lBHealthCheckConnectTimeout: "5", + lBHealthCheckInterval: "10", + lBUnhealthyThreshold: "2", + lBHealthyThreshold: "2", + lBHealthCheckUri: "", + lBHealthCheckDomain: "", + lBHealthCheckMethod: "", + }, }, pod: &corev1.Pod{ TypeMeta: metav1.TypeMeta{ @@ -252,17 +258,19 @@ func TestNlbPlugin_consSvc(t *testing.T) { protocols: []corev1.Protocol{ corev1.ProtocolTCP, }, - isFixed: false, - lBHealthCheckFlag: "on", - lBHealthCheckType: "tcp", - lBHealthCheckConnectPort: "0", - lBHealthCheckConnectTimeout: "5", - lBHealthCheckInterval: "10", - lBUnhealthyThreshold: "2", - lBHealthyThreshold: "2", - lBHealthCheckUri: "", - lBHealthCheckDomain: "", - lBHealthCheckMethod: "", + isFixed: false, + nlbHealthConfig: &nlbHealthConfig{ + lBHealthCheckFlag: "on", + lBHealthCheckType: "tcp", + lBHealthCheckConnectPort: "0", + lBHealthCheckConnectTimeout: "5", + lBHealthCheckInterval: "10", + lBUnhealthyThreshold: "2", + lBHealthyThreshold: "2", + lBHealthCheckUri: "", + lBHealthCheckDomain: "", + lBHealthCheckMethod: "", + }, }), LBHealthCheckFlagAnnotationKey: "on", LBHealthCheckTypeAnnotationKey: "tcp",