diff --git a/Dockerfile b/Dockerfile index b2ce32b3..c0768d05 100644 --- a/Dockerfile +++ b/Dockerfile @@ -13,6 +13,7 @@ RUN go mod download COPY main.go main.go COPY apis/ apis/ COPY pkg/ pkg/ +COPY cloudprovider/ cloudprovider/ # Build RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -a -o manager main.go diff --git a/Makefile b/Makefile index c6538b6e..fb4640a7 100644 --- a/Makefile +++ b/Makefile @@ -57,7 +57,7 @@ vet: ## Run go vet against code. .PHONY: test test: manifests generate fmt vet envtest ## Run tests. - KUBEBUILDER_ASSETS="$(shell $(ENVTEST) use $(ENVTEST_K8S_VERSION) -p path)" go test ./pkg/... -coverprofile cover.out + KUBEBUILDER_ASSETS="$(shell $(ENVTEST) use $(ENVTEST_K8S_VERSION) -p path)" go test ./pkg/... ./cloudprovider/... -coverprofile cover.out ##@ Build diff --git a/apis/v1alpha1/gameserver_types.go b/apis/v1alpha1/gameserver_types.go index da62434a..b0e7b0e7 100644 --- a/apis/v1alpha1/gameserver_types.go +++ b/apis/v1alpha1/gameserver_types.go @@ -23,11 +23,16 @@ import ( ) const ( - GameServerStateKey = "game.kruise.io/gs-state" - GameServerOpsStateKey = "game.kruise.io/gs-opsState" - GameServerUpdatePriorityKey = "game.kruise.io/gs-update-priority" - GameServerDeletePriorityKey = "game.kruise.io/gs-delete-priority" - GameServerDeletingKey = "game.kruise.io/gs-deleting" + GameServerStateKey = "game.kruise.io/gs-state" + GameServerOpsStateKey = "game.kruise.io/gs-opsState" + GameServerUpdatePriorityKey = "game.kruise.io/gs-update-priority" + GameServerDeletePriorityKey = "game.kruise.io/gs-delete-priority" + GameServerDeletingKey = "game.kruise.io/gs-deleting" + GameServerNetworkType = "game.kruise.io/network-type" + GameServerNetworkConf = "game.kruise.io/network-conf" + GameServerNetworkDisabled = "game.kruise.io/network-disabled" + GameServerNetworkStatus = "game.kruise.io/network-status" + GameServerNetworkTriggerTime = "game.kruise.io/network-trigger-time" ) // GameServerSpec defines the desired state of GameServer @@ -109,6 +114,12 @@ type NetworkStatus struct { type NetworkState string +const ( + NetworkReady NetworkState = "Ready" + NetworkWaiting NetworkState = "Waiting" + NetworkNotReady NetworkState = "NotReady" +) + type NetworkAddress struct { IP string `json:"ip"` // TODO add IPv6 diff --git a/cloudprovider/alibabacloud/alibabacloud.go b/cloudprovider/alibabacloud/alibabacloud.go new file mode 100644 index 00000000..b227721c --- /dev/null +++ b/cloudprovider/alibabacloud/alibabacloud.go @@ -0,0 +1,61 @@ +/* +Copyright 2022 The Kruise Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package alibabacloud + +import ( + "github.com/openkruise/kruise-game/cloudprovider" + "k8s.io/klog/v2" +) + +const ( + AlibabaCloud = "AlibabaCloud" +) + +var ( + alibabaCloudProvider = &Provider{ + plugins: make(map[string]cloudprovider.Plugin), + } +) + +type Provider struct { + plugins map[string]cloudprovider.Plugin +} + +func (ap *Provider) Name() string { + return AlibabaCloud +} + +func (ap *Provider) ListPlugins() (map[string]cloudprovider.Plugin, error) { + if ap.plugins == nil { + return make(map[string]cloudprovider.Plugin), nil + } + + return ap.plugins, nil +} + +// register plugin of cloud provider and different cloud providers +func (ap *Provider) registerPlugin(plugin cloudprovider.Plugin) { + name := plugin.Name() + if name == "" { + klog.Fatal("empty plugin name") + } + ap.plugins[name] = plugin +} + +func NewAlibabaCloudProvider() (cloudprovider.CloudProvider, error) { + return alibabaCloudProvider, nil +} diff --git a/cloudprovider/alibabacloud/apis/v1/doc.go b/cloudprovider/alibabacloud/apis/v1/doc.go new file mode 100644 index 00000000..76ab508d --- /dev/null +++ b/cloudprovider/alibabacloud/apis/v1/doc.go @@ -0,0 +1,4 @@ +// Package v1 contains API Schema definitions for the alibabacloud v1 API group +// +k8s:deepcopy-gen=package,register +// +groupName=alibabacloud.com +package v1 diff --git a/cloudprovider/alibabacloud/apis/v1/poddnat_types.go b/cloudprovider/alibabacloud/apis/v1/poddnat_types.go new file mode 100644 index 00000000..031ddfc0 --- /dev/null +++ b/cloudprovider/alibabacloud/apis/v1/poddnat_types.go @@ -0,0 +1,102 @@ +/* +Copyright 2022 The Kruise Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1 + +import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func init() { + SchemeBuilder.Register(&PodDNAT{}, &PodDNATList{}) +} + +// +genclient +// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object + +// PodDNAT let you specficy DNAT rule for pod on nat gateway +type PodDNAT struct { + metav1.TypeMeta `json:",inline"` + // Standard object's metadata. + // More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#metadata + // +optional + metav1.ObjectMeta `json:"metadata,omitempty"` + + // Spec is the desired state of the PodDNAT. + // More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#spec-and-status + Spec PodDNATSpec `json:"spec,omitempty"` + + // 'Status is the current state of the dnat. + // More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#spec-and-status + // +optional + Status PodDNATStatus `json:"status,omitempty"` +} + +// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object + +// PodDNATList is a collection of PodDNAT. +type PodDNATList struct { + metav1.TypeMeta `json:",inline"` + // Standard object's metadata. + // More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#metadata + // +optional + metav1.ListMeta `json:"metadata,omitempty"` + + // Items is the list of PodDNAT. + Items []PodDNAT `json:"items"` +} + +// PodDNATSpec describes the PodDNAT the user wishes to exist. +type PodDNATSpec struct { + VSwitch *string `json:"vswitch,omitempty"` // deprecated + ENI *string `json:"eni,omitempty"` // deprecated + ZoneID *string `json:"zoneID,omitempty"` + ExternalIP *string `json:"externalIP,omitempty"` + ExternalPort *string `json:"externalPort,omitempty"` // deprecated + InternalIP *string `json:"internalIP,omitempty"` // pod IP may change + InternalPort *string `json:"internalPort,omitempty"` // deprecated + Protocol *string `json:"protocol,omitempty"` + TableId *string `json:"tableId,omitempty"` // natGateway ID + EntryId *string `json:"entryId,omitempty"` // deprecated + PortMapping []PortMapping `json:"portMapping,omitempty"` +} + +type PortMapping struct { + ExternalPort string `json:"externalPort,omitempty"` + InternalPort string `json:"internalPort,omitempty"` +} + +// PodDNATStatus is the current state of the dnat. +type PodDNATStatus struct { + // created create status + // +optional + Created *string `json:"created,omitempty"` // deprecated + + // entries + // +optional + Entries []Entry `json:"entries,omitempty"` +} + +// Entry record for forwardEntry +type Entry struct { + ExternalPort string `json:"externalPort,omitempty"` + ExternalIP string `json:"externalIP,omitempty"` + InternalPort string `json:"internalPort,omitempty"` + InternalIP string `json:"internalIP,omitempty"` + + ForwardEntryID string `json:"forwardEntryId,omitempty"` + IPProtocol string `json:"ipProtocol,omitempty"` +} diff --git a/cloudprovider/alibabacloud/apis/v1/register.go b/cloudprovider/alibabacloud/apis/v1/register.go new file mode 100644 index 00000000..8eeb8425 --- /dev/null +++ b/cloudprovider/alibabacloud/apis/v1/register.go @@ -0,0 +1,23 @@ +// NOTE: Boilerplate only. Ignore this file. + +// Package v1 contains API Schema definitions for the alibabacloud v1 API group +// +k8s:deepcopy-gen=package,register +// +groupName=alibabacloud.com + +package v1 + +import ( + "k8s.io/apimachinery/pkg/runtime/schema" + "sigs.k8s.io/controller-runtime/pkg/scheme" +) + +var ( + // SchemeGroupVersion is group version used to register these objects + SchemeGroupVersion = schema.GroupVersion{Group: "alibabacloud.com", Version: "v1"} + + // SchemeBuilder is used to add go types to the GroupVersionKind scheme + SchemeBuilder = &scheme.Builder{GroupVersion: SchemeGroupVersion} + + // AddToScheme adds the types in this group-version to the given scheme. + AddToScheme = SchemeBuilder.AddToScheme +) diff --git a/cloudprovider/alibabacloud/apis/v1/zz_generated.deepcopy.go b/cloudprovider/alibabacloud/apis/v1/zz_generated.deepcopy.go new file mode 100644 index 00000000..7436326b --- /dev/null +++ b/cloudprovider/alibabacloud/apis/v1/zz_generated.deepcopy.go @@ -0,0 +1,210 @@ +//go:build !ignore_autogenerated +// +build !ignore_autogenerated + +/* +Copyright 2022 The Kruise Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by controller-gen. DO NOT EDIT. + +package v1 + +import ( + runtime "k8s.io/apimachinery/pkg/runtime" +) + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *Entry) DeepCopyInto(out *Entry) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Entry. +func (in *Entry) DeepCopy() *Entry { + if in == nil { + return nil + } + out := new(Entry) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *PodDNAT) DeepCopyInto(out *PodDNAT) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) + in.Spec.DeepCopyInto(&out.Spec) + in.Status.DeepCopyInto(&out.Status) +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PodDNAT. +func (in *PodDNAT) DeepCopy() *PodDNAT { + if in == nil { + return nil + } + out := new(PodDNAT) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *PodDNAT) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *PodDNATList) DeepCopyInto(out *PodDNATList) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ListMeta.DeepCopyInto(&out.ListMeta) + if in.Items != nil { + in, out := &in.Items, &out.Items + *out = make([]PodDNAT, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PodDNATList. +func (in *PodDNATList) DeepCopy() *PodDNATList { + if in == nil { + return nil + } + out := new(PodDNATList) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *PodDNATList) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *PodDNATSpec) DeepCopyInto(out *PodDNATSpec) { + *out = *in + if in.VSwitch != nil { + in, out := &in.VSwitch, &out.VSwitch + *out = new(string) + **out = **in + } + if in.ENI != nil { + in, out := &in.ENI, &out.ENI + *out = new(string) + **out = **in + } + if in.ZoneID != nil { + in, out := &in.ZoneID, &out.ZoneID + *out = new(string) + **out = **in + } + if in.ExternalIP != nil { + in, out := &in.ExternalIP, &out.ExternalIP + *out = new(string) + **out = **in + } + if in.ExternalPort != nil { + in, out := &in.ExternalPort, &out.ExternalPort + *out = new(string) + **out = **in + } + if in.InternalIP != nil { + in, out := &in.InternalIP, &out.InternalIP + *out = new(string) + **out = **in + } + if in.InternalPort != nil { + in, out := &in.InternalPort, &out.InternalPort + *out = new(string) + **out = **in + } + if in.Protocol != nil { + in, out := &in.Protocol, &out.Protocol + *out = new(string) + **out = **in + } + if in.TableId != nil { + in, out := &in.TableId, &out.TableId + *out = new(string) + **out = **in + } + if in.EntryId != nil { + in, out := &in.EntryId, &out.EntryId + *out = new(string) + **out = **in + } + if in.PortMapping != nil { + in, out := &in.PortMapping, &out.PortMapping + *out = make([]PortMapping, len(*in)) + copy(*out, *in) + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PodDNATSpec. +func (in *PodDNATSpec) DeepCopy() *PodDNATSpec { + if in == nil { + return nil + } + out := new(PodDNATSpec) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *PodDNATStatus) DeepCopyInto(out *PodDNATStatus) { + *out = *in + if in.Created != nil { + in, out := &in.Created, &out.Created + *out = new(string) + **out = **in + } + if in.Entries != nil { + in, out := &in.Entries, &out.Entries + *out = make([]Entry, len(*in)) + copy(*out, *in) + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PodDNATStatus. +func (in *PodDNATStatus) DeepCopy() *PodDNATStatus { + if in == nil { + return nil + } + out := new(PodDNATStatus) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *PortMapping) DeepCopyInto(out *PortMapping) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PortMapping. +func (in *PortMapping) DeepCopy() *PortMapping { + if in == nil { + return nil + } + out := new(PortMapping) + in.DeepCopyInto(out) + return out +} diff --git a/cloudprovider/alibabacloud/natgw.go b/cloudprovider/alibabacloud/natgw.go new file mode 100644 index 00000000..a3783f22 --- /dev/null +++ b/cloudprovider/alibabacloud/natgw.go @@ -0,0 +1,153 @@ +/* +Copyright 2022 The Kruise Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package alibabacloud + +import ( + "context" + gamekruiseiov1alpha1 "github.com/openkruise/kruise-game/apis/v1alpha1" + "github.com/openkruise/kruise-game/cloudprovider" + "github.com/openkruise/kruise-game/cloudprovider/alibabacloud/apis/v1" + "github.com/openkruise/kruise-game/cloudprovider/errors" + "github.com/openkruise/kruise-game/cloudprovider/utils" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/intstr" + "sigs.k8s.io/controller-runtime/pkg/client" + "strings" +) + +const ( + NATGWNetwork = "AlibabaCloud-NATGW" + AliasNATGW = "NATGW-Network" + FixedConfigName = "Fixed" + PortsConfigName = "Ports" + ProtocolConfigName = "Protocol" + DnatAnsKey = "k8s.aliyun.com/pod-dnat" + PortsAnsKey = "k8s.aliyun.com/pod-dnat-expose-port" + ProtocolAnsKey = "k8s.aliyun.com/pod-dnat-expose-protocol" + FixedAnsKey = "k8s.aliyun.com/pod-dnat-fixed" +) + +type NatGwPlugin struct { +} + +func (n NatGwPlugin) Name() string { + return NATGWNetwork +} + +func (n NatGwPlugin) Alias() string { + return AliasNATGW +} + +func (n NatGwPlugin) Init(c client.Client, options cloudprovider.CloudProviderOptions, ctx context.Context) error { + return nil +} + +func (n NatGwPlugin) OnPodAdded(c client.Client, pod *corev1.Pod, ctx context.Context) (*corev1.Pod, errors.PluginError) { + networkManager := utils.NewNetworkManager(pod, c) + conf := networkManager.GetNetworkConfig() + ports, protocol, fixed := parseConfig(conf) + pod.Annotations[DnatAnsKey] = "true" + pod.Annotations[PortsAnsKey] = ports + if protocol != "" { + pod.Annotations[ProtocolAnsKey] = protocol + } + if fixed != "" { + pod.Annotations[FixedAnsKey] = fixed + } + return pod, nil +} + +func (n NatGwPlugin) OnPodUpdated(c client.Client, pod *corev1.Pod, ctx context.Context) (*corev1.Pod, errors.PluginError) { + networkManager := utils.NewNetworkManager(pod, c) + + networkStatus, _ := networkManager.GetNetworkStatus() + if networkStatus == nil { + pod, err := networkManager.UpdateNetworkStatus(gamekruiseiov1alpha1.NetworkStatus{ + CurrentNetworkState: gamekruiseiov1alpha1.NetworkWaiting, + }, pod) + return pod, errors.ToPluginError(err, errors.InternalError) + } + + podDNat := &v1.PodDNAT{} + err := c.Get(ctx, types.NamespacedName{ + Name: pod.GetName(), + Namespace: pod.GetNamespace(), + }, podDNat) + if err != nil || podDNat.Status.Entries == nil { + return pod, nil + } + + internalAddresses := make([]gamekruiseiov1alpha1.NetworkAddress, 0) + externalAddresses := make([]gamekruiseiov1alpha1.NetworkAddress, 0) + for _, entry := range podDNat.Status.Entries { + instrIPort := intstr.FromString(entry.InternalPort) + instrEPort := intstr.FromString(entry.ExternalPort) + internalAddress := gamekruiseiov1alpha1.NetworkAddress{ + IP: entry.InternalIP, + Ports: []gamekruiseiov1alpha1.NetworkPort{ + { + Name: entry.InternalPort, + Port: &instrIPort, + Protocol: corev1.Protocol(strings.ToUpper(entry.IPProtocol)), + }, + }, + } + externalAddress := gamekruiseiov1alpha1.NetworkAddress{ + IP: entry.ExternalIP, + Ports: []gamekruiseiov1alpha1.NetworkPort{ + { + Name: entry.InternalPort, + Port: &instrEPort, + Protocol: corev1.Protocol(strings.ToUpper(entry.IPProtocol)), + }, + }, + } + internalAddresses = append(internalAddresses, internalAddress) + externalAddresses = append(externalAddresses, externalAddress) + } + networkStatus.InternalAddresses = internalAddresses + networkStatus.ExternalAddresses = externalAddresses + networkStatus.CurrentNetworkState = gamekruiseiov1alpha1.NetworkReady + pod, err = networkManager.UpdateNetworkStatus(*networkStatus, pod) + return pod, errors.ToPluginError(err, errors.InternalError) +} + +func (n NatGwPlugin) OnPodDeleted(c client.Client, pod *corev1.Pod, ctx context.Context) errors.PluginError { + return nil +} + +func init() { + alibabaCloudProvider.registerPlugin(&NatGwPlugin{}) +} + +func parseConfig(conf []gamekruiseiov1alpha1.NetworkConfParams) (string, string, string) { + var ports string + var protocol string + var fixed string + for _, c := range conf { + switch c.Name { + case PortsConfigName: + ports = c.Value + case ProtocolConfigName: + protocol = c.Value + case FixedConfigName: + fixed = c.Value + } + } + return ports, protocol, fixed +} diff --git a/cloudprovider/alibabacloud/slb.go b/cloudprovider/alibabacloud/slb.go new file mode 100644 index 00000000..502a9e7f --- /dev/null +++ b/cloudprovider/alibabacloud/slb.go @@ -0,0 +1,360 @@ +/* +Copyright 2022 The Kruise Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package alibabacloud + +import ( + "context" + gamekruiseiov1alpha1 "github.com/openkruise/kruise-game/apis/v1alpha1" + "github.com/openkruise/kruise-game/cloudprovider" + cperrors "github.com/openkruise/kruise-game/cloudprovider/errors" + provideroptions "github.com/openkruise/kruise-game/cloudprovider/options" + "github.com/openkruise/kruise-game/cloudprovider/utils" + "github.com/openkruise/kruise-game/pkg/util" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/utils/pointer" + "sigs.k8s.io/controller-runtime/pkg/client" + "strconv" + "strings" + "sync" +) + +const ( + SlbNetwork = "AlibabaCloud-SLB" + AliasSLB = "LB-Network" + SlbIdsConfigName = "SlbIds" + PortProtocolsConfigName = "PortProtocols" + SlbListenerOverrideKey = "service.beta.kubernetes.io/alibaba-cloud-loadbalancer-force-override-listeners" + SlbIdAnnotationKey = "service.beta.kubernetes.io/alibaba-cloud-loadbalancer-id" + SlbIdLabelKey = "service.k8s.alibaba/loadbalancer-id" + SvcSelectorKey = "statefulset.kubernetes.io/pod-name" + allocatedPortsKey = "game.kruise.io/AlibabaCloud-SLB-ports-allocated" +) + +type portAllocated map[int32]bool + +type SlbPlugin struct { + maxPort int32 + minPort int32 + cache map[string]portAllocated + mutex sync.RWMutex +} + +func (s *SlbPlugin) Name() string { + return SlbNetwork +} + +func (s *SlbPlugin) Alias() string { + return AliasSLB +} + +func (s *SlbPlugin) Init(c client.Client, options cloudprovider.CloudProviderOptions, ctx context.Context) error { + s.mutex.Lock() + defer s.mutex.Unlock() + slbOptions := options.(provideroptions.AlibabaCloudOptions).SLBOptions + s.minPort = slbOptions.MinPort + s.maxPort = slbOptions.MaxPort + + svcList := &corev1.ServiceList{} + err := c.List(ctx, svcList) + if err != nil { + return err + } + + s.cache = initLbCache(svcList.Items, s.minPort, s.maxPort) + return nil +} + +func initLbCache(svcList []corev1.Service, minPort, maxPort int32) map[string]portAllocated { + newCache := make(map[string]portAllocated) + for _, svc := range svcList { + lbId := svc.Labels[SlbIdLabelKey] + if lbId != "" && svc.Spec.Type == corev1.ServiceTypeLoadBalancer { + if newCache[lbId] == nil { + newCache[lbId] = make(portAllocated, maxPort-minPort) + for i := minPort; i < maxPort; i++ { + newCache[lbId][i] = false + } + } + for _, port := range getPorts(svc.Spec.Ports) { + if port <= maxPort && port >= minPort { + newCache[lbId][port] = true + } + } + } + } + return newCache +} + +func (s *SlbPlugin) OnPodAdded(c client.Client, pod *corev1.Pod, ctx context.Context) (*corev1.Pod, cperrors.PluginError) { + networkManager := utils.NewNetworkManager(pod, c) + err := c.Create(ctx, s.createSvc(networkManager.GetNetworkConfig(), pod, c, ctx)) + return pod, cperrors.ToPluginError(err, cperrors.ApiCallError) +} + +func (s *SlbPlugin) OnPodUpdated(c client.Client, pod *corev1.Pod, ctx context.Context) (*corev1.Pod, cperrors.PluginError) { + networkManager := utils.NewNetworkManager(pod, c) + + networkStatus, _ := networkManager.GetNetworkStatus() + if networkStatus == nil { + pod, err := networkManager.UpdateNetworkStatus(gamekruiseiov1alpha1.NetworkStatus{ + CurrentNetworkState: gamekruiseiov1alpha1.NetworkNotReady, + }, pod) + return pod, cperrors.ToPluginError(err, cperrors.InternalError) + } + + // get svc + svc := &corev1.Service{} + err := c.Get(ctx, types.NamespacedName{ + Name: pod.GetName(), + Namespace: pod.GetNamespace(), + }, svc) + if err != nil { + if errors.IsNotFound(err) { + return pod, cperrors.ToPluginError(c.Create(ctx, s.createSvc(networkManager.GetNetworkConfig(), pod, c, ctx)), cperrors.ApiCallError) + } + return pod, cperrors.NewPluginError(cperrors.ApiCallError, err.Error()) + } + + // disable network + if networkManager.GetNetworkDisabled() && svc.Spec.Type == corev1.ServiceTypeLoadBalancer { + svc.Spec.Type = corev1.ServiceTypeClusterIP + return pod, cperrors.ToPluginError(c.Update(ctx, svc), cperrors.ApiCallError) + } + + // enable network + if !networkManager.GetNetworkDisabled() && svc.Spec.Type == corev1.ServiceTypeClusterIP { + svc.Spec.Type = corev1.ServiceTypeLoadBalancer + return pod, cperrors.ToPluginError(c.Update(ctx, svc), cperrors.ApiCallError) + } + + // network not ready + if svc.Status.LoadBalancer.Ingress == nil { + networkStatus.CurrentNetworkState = gamekruiseiov1alpha1.NetworkNotReady + pod, err = networkManager.UpdateNetworkStatus(*networkStatus, pod) + return pod, cperrors.ToPluginError(err, cperrors.InternalError) + } + + // network ready + internalAddresses := make([]gamekruiseiov1alpha1.NetworkAddress, 0) + externalAddresses := make([]gamekruiseiov1alpha1.NetworkAddress, 0) + for _, port := range svc.Spec.Ports { + instrIPort := port.TargetPort + instrEPort := intstr.FromInt(int(port.Port)) + internalAddress := gamekruiseiov1alpha1.NetworkAddress{ + IP: pod.Status.PodIP, + Ports: []gamekruiseiov1alpha1.NetworkPort{ + { + Name: instrIPort.String(), + Port: &instrIPort, + Protocol: port.Protocol, + }, + }, + } + externalAddress := gamekruiseiov1alpha1.NetworkAddress{ + IP: svc.Status.LoadBalancer.Ingress[0].IP, + Ports: []gamekruiseiov1alpha1.NetworkPort{ + { + Name: instrIPort.String(), + Port: &instrEPort, + Protocol: port.Protocol, + }, + }, + } + internalAddresses = append(internalAddresses, internalAddress) + externalAddresses = append(externalAddresses, externalAddress) + } + networkStatus.InternalAddresses = internalAddresses + networkStatus.ExternalAddresses = externalAddresses + networkStatus.CurrentNetworkState = gamekruiseiov1alpha1.NetworkReady + pod, err = networkManager.UpdateNetworkStatus(*networkStatus, pod) + return pod, cperrors.ToPluginError(err, cperrors.InternalError) +} + +func (s *SlbPlugin) OnPodDeleted(c client.Client, pod *corev1.Pod, ctx context.Context) cperrors.PluginError { + svc := &corev1.Service{} + err := c.Get(ctx, types.NamespacedName{ + Name: pod.GetName(), + Namespace: pod.GetNamespace(), + }, svc) + if err != nil { + return cperrors.NewPluginError(cperrors.ApiCallError, err.Error()) + } + + for _, port := range getPorts(svc.Spec.Ports) { + s.deAllocate(svc.Annotations[SlbIdAnnotationKey], port) + } + + return nil +} + +func (s *SlbPlugin) allocate(lbId string, num int) []int32 { + s.mutex.Lock() + defer s.mutex.Unlock() + + var ports []int32 + for i := 0; i < num; i++ { + var port int32 + if s.cache[lbId] == nil { + s.cache[lbId] = make(portAllocated, s.maxPort-s.minPort) + for i := s.minPort; i < s.maxPort; i++ { + s.cache[lbId][i] = false + } + } + + for p, allocated := range s.cache[lbId] { + if !allocated { + port = p + break + } + } + s.cache[lbId][port] = true + ports = append(ports, port) + } + return ports +} + +func (s *SlbPlugin) deAllocate(lbId string, port int32) { + s.mutex.Lock() + defer s.mutex.Unlock() + + s.cache[lbId][port] = false +} + +func init() { + slbPlugin := SlbPlugin{ + mutex: sync.RWMutex{}, + } + alibabaCloudProvider.registerPlugin(&slbPlugin) +} + +func parseLbConfig(conf []gamekruiseiov1alpha1.NetworkConfParams) (string, []int, []corev1.Protocol, bool) { + var lbId string + ports := make([]int, 0) + protocols := make([]corev1.Protocol, 0) + isFixed := false + for _, c := range conf { + switch c.Name { + case SlbIdsConfigName: + lbId = c.Value + case PortProtocolsConfigName: + for _, pp := range strings.Split(c.Value, ",") { + ppSlice := strings.Split(pp, "/") + port, err := strconv.Atoi(ppSlice[0]) + if err != nil { + continue + } + ports = append(ports, port) + if len(ppSlice) != 2 { + protocols = append(protocols, corev1.ProtocolTCP) + } else { + protocols = append(protocols, corev1.Protocol(ppSlice[1])) + } + } + case FixedConfigName: + v, err := strconv.ParseBool(c.Value) + if err != nil { + continue + } + isFixed = v + } + } + return lbId, ports, protocols, isFixed +} + +func getPorts(ports []corev1.ServicePort) []int32 { + var ret []int32 + for _, port := range ports { + ret = append(ret, port.Port) + } + return ret +} + +func (s *SlbPlugin) createSvc(conf []gamekruiseiov1alpha1.NetworkConfParams, pod *corev1.Pod, c client.Client, ctx context.Context) *corev1.Service { + lbId, targetPorts, protocol, isFixed := parseLbConfig(conf) + + var ports []int32 + allocatedPorts := pod.Annotations[allocatedPortsKey] + if allocatedPorts != "" { + ports = util.StringToInt32Slice(allocatedPorts, ",") + } else { + ports = s.allocate(lbId, len(targetPorts)) + pod.Annotations[allocatedPortsKey] = util.Int32SliceToString(ports, ",") + } + + svcPorts := make([]corev1.ServicePort, 0) + for i := 0; i < len(targetPorts); i++ { + svcPorts = append(svcPorts, corev1.ServicePort{ + Name: strconv.Itoa(targetPorts[i]), + Port: ports[i], + Protocol: protocol[i], + TargetPort: intstr.FromInt(targetPorts[i]), + }) + } + + svc := &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: pod.GetName(), + Namespace: pod.GetNamespace(), + Annotations: map[string]string{ + SlbListenerOverrideKey: "true", + SlbIdAnnotationKey: lbId, + }, + OwnerReferences: getSvcOwnerReference(c, ctx, pod, isFixed), + }, + Spec: corev1.ServiceSpec{ + Type: corev1.ServiceTypeLoadBalancer, + Selector: map[string]string{ + SvcSelectorKey: pod.GetName(), + }, + Ports: svcPorts, + }, + } + return svc +} + +func getSvcOwnerReference(c client.Client, ctx context.Context, pod *corev1.Pod, isFixed bool) []metav1.OwnerReference { + ownerReferences := []metav1.OwnerReference{ + { + APIVersion: pod.APIVersion, + Kind: pod.Kind, + Name: pod.GetName(), + UID: pod.GetUID(), + Controller: pointer.BoolPtr(true), + BlockOwnerDeletion: pointer.BoolPtr(true), + }, + } + if isFixed { + gss, err := util.GetGameServerSetOfPod(pod, c, ctx) + if err == nil { + ownerReferences = []metav1.OwnerReference{ + { + APIVersion: gss.APIVersion, + Kind: gss.Kind, + Name: gss.GetName(), + UID: gss.GetUID(), + Controller: pointer.BoolPtr(true), + BlockOwnerDeletion: pointer.BoolPtr(true), + }, + } + } + } + return ownerReferences +} diff --git a/cloudprovider/alibabacloud/slb_sp.go b/cloudprovider/alibabacloud/slb_sp.go new file mode 100644 index 00000000..85057aca --- /dev/null +++ b/cloudprovider/alibabacloud/slb_sp.go @@ -0,0 +1,334 @@ +package alibabacloud + +import ( + "context" + "fmt" + gamekruiseiov1alpha1 "github.com/openkruise/kruise-game/apis/v1alpha1" + "github.com/openkruise/kruise-game/cloudprovider" + cperrors "github.com/openkruise/kruise-game/cloudprovider/errors" + "github.com/openkruise/kruise-game/cloudprovider/utils" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/intstr" + "sigs.k8s.io/controller-runtime/pkg/client" + "strconv" + "strings" + "sync" +) + +const ( + SlbSPNetwork = "AlibabaCloud-SLB-SharedPort" + SvcSLBSPLabel = "game.kruise.io/AlibabaCloud-SLB-SharedPort" +) + +const ( + ErrorUpperLimit = "the number of backends supported by slb reaches the upper limit" +) + +func init() { + slbSpPlugin := SlbSpPlugin{ + mutex: sync.RWMutex{}, + } + alibabaCloudProvider.registerPlugin(&slbSpPlugin) +} + +type SlbSpPlugin struct { + numBackends map[string]int + podSlbId map[string]string + mutex sync.RWMutex +} + +type lbSpConfig struct { + lbIds []string + ports []int + protocols []corev1.Protocol +} + +func (s *SlbSpPlugin) OnPodAdded(c client.Client, pod *corev1.Pod, ctx context.Context) (*corev1.Pod, cperrors.PluginError) { + networkManager := utils.NewNetworkManager(pod, c) + podNetConfig := parseLbSpConfig(networkManager.GetNetworkConfig()) + + lbId, err := s.getOrAllocate(podNetConfig, pod) + if err != nil { + return pod, cperrors.NewPluginError(cperrors.ParameterError, err.Error()) + } + + // Get Svc + svc := &corev1.Service{} + err = c.Get(ctx, types.NamespacedName{ + Namespace: pod.GetNamespace(), + Name: lbId, + }, svc) + if err != nil { + if errors.IsNotFound(err) { + // Create Svc + return pod, cperrors.ToPluginError(s.createSvc(c, ctx, pod, podNetConfig, lbId), cperrors.ApiCallError) + } + return pod, cperrors.NewPluginError(cperrors.ApiCallError, err.Error()) + } + + pod, err = networkManager.UpdateNetworkStatus(gamekruiseiov1alpha1.NetworkStatus{ + CurrentNetworkState: gamekruiseiov1alpha1.NetworkNotReady, + }, pod) + return pod, cperrors.ToPluginError(err, cperrors.InternalError) +} + +func (s *SlbSpPlugin) OnPodUpdated(c client.Client, pod *corev1.Pod, ctx context.Context) (*corev1.Pod, cperrors.PluginError) { + networkManager := utils.NewNetworkManager(pod, c) + networkStatus, _ := networkManager.GetNetworkStatus() + if networkStatus == nil { + pod, err := networkManager.UpdateNetworkStatus(gamekruiseiov1alpha1.NetworkStatus{ + CurrentNetworkState: gamekruiseiov1alpha1.NetworkNotReady, + }, pod) + return pod, cperrors.ToPluginError(err, cperrors.InternalError) + } + + podNetConfig := parseLbSpConfig(networkManager.GetNetworkConfig()) + podSlbId, err := s.getOrAllocate(podNetConfig, pod) + if err != nil { + return pod, cperrors.NewPluginError(cperrors.ParameterError, err.Error()) + } + + // Get Svc + svc := &corev1.Service{} + err = c.Get(context.Background(), types.NamespacedName{ + Namespace: pod.GetNamespace(), + Name: podSlbId, + }, svc) + if err != nil { + if errors.IsNotFound(err) { + // Create Svc + return pod, cperrors.ToPluginError(s.createSvc(c, ctx, pod, podNetConfig, podSlbId), cperrors.ApiCallError) + } + return pod, cperrors.NewPluginError(cperrors.ApiCallError, err.Error()) + } + + _, hasLabel := pod.Labels[SlbIdLabelKey] + // disable network + if networkManager.GetNetworkDisabled() && hasLabel { + newLabels := pod.GetLabels() + delete(newLabels, SlbIdLabelKey) + pod.Labels = newLabels + networkStatus.CurrentNetworkState = gamekruiseiov1alpha1.NetworkNotReady + pod, err = networkManager.UpdateNetworkStatus(*networkStatus, pod) + return pod, cperrors.ToPluginError(err, cperrors.InternalError) + } + + // enable network + if !networkManager.GetNetworkDisabled() && !hasLabel { + pod.Labels[SlbIdLabelKey] = podSlbId + networkStatus.CurrentNetworkState = gamekruiseiov1alpha1.NetworkReady + pod, err = networkManager.UpdateNetworkStatus(*networkStatus, pod) + return pod, cperrors.ToPluginError(err, cperrors.InternalError) + } + + // network not ready + if svc.Status.LoadBalancer.Ingress == nil { + return pod, nil + } + + // network ready + internalAddresses := make([]gamekruiseiov1alpha1.NetworkAddress, 0) + externalAddresses := make([]gamekruiseiov1alpha1.NetworkAddress, 0) + for _, port := range svc.Spec.Ports { + instrIPort := port.TargetPort + instrEPort := intstr.FromInt(int(port.Port)) + internalAddress := gamekruiseiov1alpha1.NetworkAddress{ + IP: pod.Status.PodIP, + Ports: []gamekruiseiov1alpha1.NetworkPort{ + { + Name: instrIPort.String(), + Port: &instrIPort, + Protocol: port.Protocol, + }, + }, + } + externalAddress := gamekruiseiov1alpha1.NetworkAddress{ + IP: svc.Status.LoadBalancer.Ingress[0].IP, + Ports: []gamekruiseiov1alpha1.NetworkPort{ + { + Name: instrIPort.String(), + Port: &instrEPort, + Protocol: port.Protocol, + }, + }, + } + internalAddresses = append(internalAddresses, internalAddress) + externalAddresses = append(externalAddresses, externalAddress) + } + networkStatus.InternalAddresses = internalAddresses + networkStatus.ExternalAddresses = externalAddresses + networkStatus.CurrentNetworkState = gamekruiseiov1alpha1.NetworkReady + pod, err = networkManager.UpdateNetworkStatus(*networkStatus, pod) + return pod, cperrors.ToPluginError(err, cperrors.InternalError) +} + +func (s *SlbSpPlugin) OnPodDeleted(c client.Client, pod *corev1.Pod, ctx context.Context) cperrors.PluginError { + s.deAllocate(pod.GetNamespace() + "/" + pod.GetName()) + return nil +} + +func (s *SlbSpPlugin) Name() string { + return SlbSPNetwork +} + +func (s *SlbSpPlugin) Alias() string { + return "" +} + +func (s *SlbSpPlugin) Init(c client.Client, options cloudprovider.CloudProviderOptions, ctx context.Context) error { + s.mutex.Lock() + defer s.mutex.Unlock() + + svcList := &corev1.ServiceList{} + err := c.List(ctx, svcList, &client.ListOptions{ + LabelSelector: labels.SelectorFromSet(map[string]string{ + SvcSLBSPLabel: "true", + })}) + if err != nil { + return err + } + + numBackends := make(map[string]int) + podSlbId := make(map[string]string) + for _, svc := range svcList.Items { + slbId := svc.Labels[SlbIdLabelKey] + podList := &corev1.PodList{} + err := c.List(ctx, podList, &client.ListOptions{ + Namespace: svc.GetNamespace(), + LabelSelector: labels.SelectorFromSet(map[string]string{ + SlbIdLabelKey: slbId, + })}) + if err != nil { + return err + } + num := len(podList.Items) + numBackends[slbId] += num + for _, pod := range podList.Items { + podSlbId[pod.GetNamespace()+"/"+pod.GetName()] = slbId + } + } + + s.numBackends = numBackends + s.podSlbId = podSlbId + return nil +} + +func (s *SlbSpPlugin) createSvc(c client.Client, ctx context.Context, pod *corev1.Pod, podConfig *lbSpConfig, lbId string) error { + svcPorts := make([]corev1.ServicePort, 0) + for i := 0; i < len(podConfig.ports); i++ { + svcPorts = append(svcPorts, corev1.ServicePort{ + Name: strconv.Itoa(podConfig.ports[i]), + Port: int32(podConfig.ports[i]), + Protocol: podConfig.protocols[i], + TargetPort: intstr.FromInt(podConfig.ports[i]), + }) + } + + return c.Create(ctx, &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: lbId, + Namespace: pod.GetNamespace(), + Annotations: map[string]string{ + SlbIdAnnotationKey: lbId, + SlbListenerOverrideKey: "true", + }, + Labels: map[string]string{ + SvcSLBSPLabel: "true", + }, + OwnerReferences: getSvcOwnerReference(c, ctx, pod, true), + }, + Spec: corev1.ServiceSpec{ + Type: corev1.ServiceTypeLoadBalancer, + Selector: map[string]string{ + SlbIdLabelKey: lbId, + }, + Ports: svcPorts, + }, + }) +} + +func (s *SlbSpPlugin) getOrAllocate(podNetConfig *lbSpConfig, pod *corev1.Pod) (string, error) { + s.mutex.Lock() + defer s.mutex.Unlock() + + if slbId, ok := s.podSlbId[pod.GetNamespace()+"/"+pod.GetName()]; ok { + return slbId, nil + } + + minValue := 200 + selectId := "" + for _, id := range podNetConfig.lbIds { + numBackends := s.numBackends[id] + if numBackends < 200 && numBackends < minValue { + minValue = numBackends + selectId = id + } + } + + if selectId == "" { + return "", fmt.Errorf(ErrorUpperLimit) + } + + s.numBackends[selectId]++ + s.podSlbId[pod.GetNamespace()+"/"+pod.GetName()] = selectId + pod.Labels[SlbIdLabelKey] = selectId + return selectId, nil +} + +func (s *SlbSpPlugin) deAllocate(nsName string) { + s.mutex.Lock() + defer s.mutex.Unlock() + slbId, ok := s.podSlbId[nsName] + if !ok { + return + } + + s.numBackends[slbId]-- + delete(s.podSlbId, nsName) +} + +func parseLbSpConfig(conf []gamekruiseiov1alpha1.NetworkConfParams) *lbSpConfig { + var lbIds []string + var ports []int + var protocols []corev1.Protocol + for _, c := range conf { + switch c.Name { + case SlbIdsConfigName: + lbIds = parseLbIds(c.Value) + case PortProtocolsConfigName: + ports, protocols = parsePortProtocols(c.Value) + } + } + return &lbSpConfig{ + lbIds: lbIds, + ports: ports, + protocols: protocols, + } +} + +func parsePortProtocols(value string) ([]int, []corev1.Protocol) { + ports := make([]int, 0) + protocols := make([]corev1.Protocol, 0) + for _, pp := range strings.Split(value, ",") { + ppSlice := strings.Split(pp, "/") + port, err := strconv.Atoi(ppSlice[0]) + if err != nil { + continue + } + ports = append(ports, port) + if len(ppSlice) != 2 { + protocols = append(protocols, corev1.ProtocolTCP) + } else { + protocols = append(protocols, corev1.Protocol(ppSlice[1])) + } + } + return ports, protocols +} + +func parseLbIds(value string) []string { + return strings.Split(value, ",") +} diff --git a/cloudprovider/alibabacloud/slb_sp_test.go b/cloudprovider/alibabacloud/slb_sp_test.go new file mode 100644 index 00000000..0f0ea9fd --- /dev/null +++ b/cloudprovider/alibabacloud/slb_sp_test.go @@ -0,0 +1,123 @@ +package alibabacloud + +import ( + "fmt" + gamekruiseiov1alpha1 "github.com/openkruise/kruise-game/apis/v1alpha1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "reflect" + "sync" + "testing" +) + +func TestSlpSpAllocate(t *testing.T) { + tests := []struct { + slbsp *SlbSpPlugin + pod *corev1.Pod + podNetConfig *lbSpConfig + numBackends map[string]int + podSlbId map[string]string + expErr error + }{ + { + slbsp: &SlbSpPlugin{ + numBackends: make(map[string]int), + podSlbId: make(map[string]string), + mutex: sync.RWMutex{}, + }, + pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-name", + Namespace: "pod-ns", + Labels: map[string]string{ + "xxx": "xxx", + }, + }, + }, + podNetConfig: &lbSpConfig{ + lbIds: []string{"lb-xxa"}, + ports: []int{80}, + protocols: []corev1.Protocol{corev1.ProtocolTCP}, + }, + numBackends: map[string]int{"lb-xxa": 1}, + podSlbId: map[string]string{"pod-ns/pod-name": "lb-xxa"}, + expErr: nil, + }, + + { + slbsp: &SlbSpPlugin{ + numBackends: map[string]int{"lb-xxa": 200}, + podSlbId: map[string]string{"a-ns/a-name": "lb-xxa"}, + mutex: sync.RWMutex{}, + }, + pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-name", + Namespace: "pod-ns", + Labels: map[string]string{ + "xxx": "xxx", + }, + }, + }, + podNetConfig: &lbSpConfig{ + lbIds: []string{"lb-xxa"}, + ports: []int{80}, + protocols: []corev1.Protocol{corev1.ProtocolTCP}, + }, + numBackends: map[string]int{"lb-xxa": 200}, + podSlbId: map[string]string{"a-ns/a-name": "lb-xxa"}, + expErr: fmt.Errorf(ErrorUpperLimit), + }, + } + + for _, test := range tests { + slbId, err := test.slbsp.getOrAllocate(test.podNetConfig, test.pod) + if (err == nil) != (test.expErr == nil) { + t.Errorf("expect err: %v, but acutal err: %v", test.expErr, err) + } + + if test.pod.GetLabels()[SlbIdLabelKey] != slbId { + t.Errorf("expect pod have slblabel value: %s, but actual value: %s", slbId, test.pod.GetLabels()[SlbIdLabelKey]) + } + + if !reflect.DeepEqual(test.numBackends, test.slbsp.numBackends) { + t.Errorf("expect numBackends: %v, but actual: %v", test.numBackends, test.slbsp.numBackends) + } + + if !reflect.DeepEqual(test.podSlbId, test.slbsp.podSlbId) { + t.Errorf("expect numBackends: %v, but actual: %v", test.podSlbId, test.slbsp.podSlbId) + } + } +} + +func TestParseLbSpConfig(t *testing.T) { + tests := []struct { + conf []gamekruiseiov1alpha1.NetworkConfParams + podNetConfig *lbSpConfig + }{ + { + conf: []gamekruiseiov1alpha1.NetworkConfParams{ + { + Name: PortProtocolsConfigName, + Value: "80", + }, + { + Name: SlbIdsConfigName, + Value: "lb-xxa", + }, + }, + podNetConfig: &lbSpConfig{ + lbIds: []string{"lb-xxa"}, + ports: []int{80}, + protocols: []corev1.Protocol{corev1.ProtocolTCP}, + }, + }, + } + + for _, test := range tests { + podNetConfig := parseLbSpConfig(test.conf) + if !reflect.DeepEqual(podNetConfig, test.podNetConfig) { + t.Errorf("expect podNetConfig: %v, but actual: %v", test.podNetConfig, podNetConfig) + } + } +} diff --git a/cloudprovider/alibabacloud/slb_test.go b/cloudprovider/alibabacloud/slb_test.go new file mode 100644 index 00000000..943c2c37 --- /dev/null +++ b/cloudprovider/alibabacloud/slb_test.go @@ -0,0 +1,200 @@ +/* +Copyright 2022 The Kruise Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package alibabacloud + +import ( + gamekruiseiov1alpha1 "github.com/openkruise/kruise-game/apis/v1alpha1" + "github.com/openkruise/kruise-game/pkg/util" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" + "sync" + "testing" +) + +func TestAllocate(t *testing.T) { + test := struct { + lbId string + slb *SlbPlugin + num int + }{ + lbId: "xxx-A", + slb: &SlbPlugin{ + maxPort: int32(712), + minPort: int32(512), + cache: make(map[string]portAllocated), + mutex: sync.RWMutex{}, + }, + num: 3, + } + + ports := test.slb.allocate(test.lbId, test.num) + for _, port := range ports { + if port > test.slb.maxPort || port < test.slb.minPort { + t.Errorf("allocate port %d, unexpected", port) + } + + test.slb.deAllocate(test.lbId, port) + if test.slb.cache[test.lbId][port] == true { + t.Errorf("deAllocate port %d failed", port) + } + } +} + +func TestParseLbConfig(t *testing.T) { + tests := []struct { + conf []gamekruiseiov1alpha1.NetworkConfParams + lbId string + ports []int + protocol []corev1.Protocol + isFixed bool + }{ + { + conf: []gamekruiseiov1alpha1.NetworkConfParams{ + { + Name: SlbIdsConfigName, + Value: "xxx-A", + }, + { + Name: PortProtocolsConfigName, + Value: "80", + }, + }, + lbId: "xxx-A", + ports: []int{80}, + protocol: []corev1.Protocol{corev1.ProtocolTCP}, + isFixed: false, + }, + { + conf: []gamekruiseiov1alpha1.NetworkConfParams{ + { + Name: SlbIdsConfigName, + Value: "xxx-A", + }, + { + Name: PortProtocolsConfigName, + Value: "81/UDP,82,83/TCP", + }, + { + Name: FixedConfigName, + Value: "true", + }, + }, + lbId: "xxx-A", + ports: []int{81, 82, 83}, + protocol: []corev1.Protocol{corev1.ProtocolUDP, corev1.ProtocolTCP, corev1.ProtocolTCP}, + isFixed: true, + }, + } + + for _, test := range tests { + lbId, ports, protocol, isFixed := parseLbConfig(test.conf) + if lbId != test.lbId { + t.Errorf("lbId expect: %s, actual: %s", test.lbId, lbId) + } + if !util.IsSliceEqual(ports, test.ports) { + t.Errorf("ports expect: %v, actual: %v", test.ports, ports) + } + if len(test.protocol) != len(protocol) { + t.Errorf("protocol expect: %v, actual: %v", test.protocol, protocol) + } + for i := 0; i < len(test.protocol); i++ { + if protocol[i] != test.protocol[i] { + t.Errorf("protocol expect: %v, actual: %v", test.protocol, protocol) + } + } + if isFixed != test.isFixed { + t.Errorf("protocol expect: %v, actual: %v", test.isFixed, isFixed) + } + } +} + +func TestInitLbCache(t *testing.T) { + test := struct { + svcList []corev1.Service + minPort int32 + maxPort int32 + result map[string]portAllocated + }{ + minPort: 512, + maxPort: 712, + result: map[string]portAllocated{ + "xxx-A": map[int32]bool{ + 666: true, + }, + "xxx-B": map[int32]bool{ + 555: true, + }, + }, + svcList: []corev1.Service{ + { + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + SlbIdLabelKey: "xxx-A", + }, + }, + Spec: corev1.ServiceSpec{ + Type: corev1.ServiceTypeLoadBalancer, + Selector: map[string]string{ + SvcSelectorKey: "pod-A", + }, + Ports: []corev1.ServicePort{ + { + TargetPort: intstr.FromInt(80), + Port: 666, + Protocol: corev1.ProtocolTCP, + }, + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + SlbIdLabelKey: "xxx-B", + }, + }, + Spec: corev1.ServiceSpec{ + Type: corev1.ServiceTypeLoadBalancer, + Selector: map[string]string{ + SvcSelectorKey: "pod-B", + }, + Ports: []corev1.ServicePort{ + { + TargetPort: intstr.FromInt(80), + Port: 9999, + Protocol: corev1.ProtocolTCP, + }, + { + TargetPort: intstr.FromInt(8080), + Port: 555, + Protocol: corev1.ProtocolTCP, + }, + }, + }, + }, + }, + } + + actual := initLbCache(test.svcList, test.minPort, test.maxPort) + for lb, pa := range test.result { + for port, isAllocated := range pa { + if actual[lb][port] != isAllocated { + t.Errorf("lb %s port %d isAllocated, expect: %t, actual: %t", lb, port, isAllocated, actual[lb][port]) + } + } + } +} diff --git a/cloudprovider/cloud_provider.go b/cloudprovider/cloud_provider.go new file mode 100644 index 00000000..cfe733cf --- /dev/null +++ b/cloudprovider/cloud_provider.go @@ -0,0 +1,54 @@ +/* +Copyright 2022 The Kruise Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cloudprovider + +import ( + "context" + "github.com/openkruise/kruise-game/cloudprovider/errors" + corev1 "k8s.io/api/core/v1" + client "sigs.k8s.io/controller-runtime/pkg/client" +) + +/* + |-Cloud Provider + |------ Kubernetes + |------ plugins + |------ AlibabaCloud + |------- plugins + |------ others +*/ + +type Plugin interface { + Name() string + // Alias define the plugin with similar func cross multi cloud provider + Alias() string + Init(client client.Client, options CloudProviderOptions, ctx context.Context) error + // Pod Event handler + OnPodAdded(client client.Client, pod *corev1.Pod, ctx context.Context) (*corev1.Pod, errors.PluginError) + OnPodUpdated(client client.Client, pod *corev1.Pod, ctx context.Context) (*corev1.Pod, errors.PluginError) + OnPodDeleted(client client.Client, pod *corev1.Pod, ctx context.Context) errors.PluginError +} + +type CloudProvider interface { + Name() string + ListPlugins() (map[string]Plugin, error) +} + +type CloudProviderOptions interface { + Enabled() bool + Valid() bool +} diff --git a/cloudprovider/config.go b/cloudprovider/config.go new file mode 100644 index 00000000..4da5bfa7 --- /dev/null +++ b/cloudprovider/config.go @@ -0,0 +1,72 @@ +/* +Copyright 2022 The Kruise Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cloudprovider + +import ( + "github.com/BurntSushi/toml" + "github.com/openkruise/kruise-game/cloudprovider/options" + "k8s.io/klog/v2" +) + +import "flag" + +var Opt *Options + +type Options struct { + CloudProviderConfigFile string +} + +func init() { + Opt = &Options{} +} + +func InitCloudProviderFlags() { + flag.StringVar(&Opt.CloudProviderConfigFile, "provider-config", "/etc/kruise-game/config.toml", "Cloud Provider Config File Path.") +} + +type ConfigFile struct { + Path string +} + +type CloudProviderConfig struct { + KubernetesOptions CloudProviderOptions + AlibabaCloudOptions CloudProviderOptions +} + +type tomlConfigs struct { + Kubernetes options.KubernetesOptions `toml:"kubernetes"` + AlibabaCloud options.AlibabaCloudOptions `toml:"alibabacloud"` +} + +func (cf *ConfigFile) Parse() *CloudProviderConfig { + + var config tomlConfigs + if _, err := toml.DecodeFile(cf.Path, &config); err != nil { + klog.Fatal(err) + } + + return &CloudProviderConfig{ + KubernetesOptions: config.Kubernetes, + AlibabaCloudOptions: config.AlibabaCloud, + } +} + +func NewConfigFile(path string) *ConfigFile { + return &ConfigFile{ + Path: path, + } +} diff --git a/cloudprovider/config_test.go b/cloudprovider/config_test.go new file mode 100644 index 00000000..12558f9f --- /dev/null +++ b/cloudprovider/config_test.go @@ -0,0 +1,70 @@ +package cloudprovider + +import ( + "github.com/openkruise/kruise-game/cloudprovider/options" + "io" + "os" + "reflect" + "testing" +) + +func TestParse(t *testing.T) { + tests := []struct { + fileString string + kubernetes options.KubernetesOptions + alibabacloud options.AlibabaCloudOptions + }{ + { + fileString: ` +[kubernetes] +enable = true + + [kubernetes.hostPort] + max_port = 9000 + min_port = 8000 + +[alibabacloud] +enable = true +`, + kubernetes: options.KubernetesOptions{ + Enable: true, + HostPort: options.HostPortOptions{ + MaxPort: 9000, + MinPort: 8000, + }, + }, + alibabacloud: options.AlibabaCloudOptions{ + Enable: true, + }, + }, + } + + for _, test := range tests { + tempFile := "config" + file, err := os.Create(tempFile) + if err != nil { + t.Errorf("open file failed, because of %s", err.Error()) + } + _, err = io.WriteString(file, test.fileString) + if err != nil { + t.Errorf("write file failed, because of %s", err.Error()) + } + err = file.Close() + if err != nil { + t.Errorf("close file failed, because of %s", err.Error()) + } + + cf := ConfigFile{ + Path: tempFile, + } + cloudProviderConfig := cf.Parse() + + if !reflect.DeepEqual(cloudProviderConfig.AlibabaCloudOptions, test.alibabacloud) { + t.Errorf("expect AlibabaCloudOptions: %v, but got %v", test.alibabacloud, cloudProviderConfig.AlibabaCloudOptions) + } + if !reflect.DeepEqual(cloudProviderConfig.KubernetesOptions, test.kubernetes) { + t.Errorf("expect KubernetesOptions: %v, but got %v", test.kubernetes, cloudProviderConfig.KubernetesOptions) + } + os.Remove(tempFile) + } +} diff --git a/cloudprovider/errors/errors.go b/cloudprovider/errors/errors.go new file mode 100644 index 00000000..5b92604e --- /dev/null +++ b/cloudprovider/errors/errors.go @@ -0,0 +1,72 @@ +/* +Copyright 2023 The Kruise Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package errors + +import "fmt" + +// PluginErrorType describes a high-level category of a given error +type PluginErrorType string + +const ( + // ApiCallError is an error related to communication with k8s API server + ApiCallError PluginErrorType = "apiCallError" + // InternalError is an error inside plugin + InternalError PluginErrorType = "internalError" + // ParameterError is an error related to bad parameters provided by a user + ParameterError PluginErrorType = "parameterError" + // NotImplementedError an error related to be not implemented by developers + NotImplementedError PluginErrorType = "notImplementedError" +) + +type PluginError interface { + // Error implements golang error interface + Error() string + + // Type returns the type of CloudProviderError + Type() PluginErrorType +} + +type pluginErrorImplErrorImpl struct { + errorType PluginErrorType + msg string +} + +func (c pluginErrorImplErrorImpl) Error() string { + return c.msg +} + +func (c pluginErrorImplErrorImpl) Type() PluginErrorType { + return c.errorType +} + +// NewPluginError returns new plugin error with a message constructed from format string +func NewPluginError(errorType PluginErrorType, msg string, args ...interface{}) PluginError { + return pluginErrorImplErrorImpl{ + errorType: errorType, + msg: fmt.Sprintf(msg, args...), + } +} + +func ToPluginError(err error, errorType PluginErrorType) PluginError { + if err == nil { + return nil + } + return pluginErrorImplErrorImpl{ + errorType: errorType, + msg: err.Error(), + } +} diff --git a/cloudprovider/kubernetes/hostPort.go b/cloudprovider/kubernetes/hostPort.go new file mode 100644 index 00000000..83282242 --- /dev/null +++ b/cloudprovider/kubernetes/hostPort.go @@ -0,0 +1,352 @@ +/* +Copyright 2022 The Kruise Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kubernetes + +import ( + "context" + gamekruiseiov1alpha1 "github.com/openkruise/kruise-game/apis/v1alpha1" + "github.com/openkruise/kruise-game/cloudprovider" + "github.com/openkruise/kruise-game/cloudprovider/errors" + provideroptions "github.com/openkruise/kruise-game/cloudprovider/options" + "github.com/openkruise/kruise-game/cloudprovider/utils" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/intstr" + log "k8s.io/klog/v2" + "net" + "sigs.k8s.io/controller-runtime/pkg/client" + "strconv" + "strings" + "sync" +) + +const ( + HostPortNetwork = "Kubernetes-HostPort" + //ContainerPortsKey represents the configuration key when using hostPort. + //Its corresponding value format is as follows, containerName:port1/protocol1,port2/protocol2,... e.g. game-server:25565/TCP + //When no protocol is specified, TCP is used by default + ContainerPortsKey = "ContainerPorts" +) + +type HostPortPlugin struct { + maxPort int32 + minPort int32 + isAllocated map[string]bool + portAmount map[int32]int + amountStat []int + mutex sync.RWMutex +} + +func init() { + hostPortPlugin := HostPortPlugin{ + mutex: sync.RWMutex{}, + isAllocated: make(map[string]bool), + } + kubernetesProvider.registerPlugin(&hostPortPlugin) +} + +func (hpp *HostPortPlugin) Name() string { + return HostPortNetwork +} + +func (hpp *HostPortPlugin) Alias() string { + return "" +} + +func (hpp *HostPortPlugin) OnPodAdded(c client.Client, pod *corev1.Pod, ctx context.Context) (*corev1.Pod, errors.PluginError) { + if _, ok := hpp.isAllocated[pod.GetNamespace()+"/"+pod.GetName()]; ok { + return pod, nil + } + + networkManager := utils.NewNetworkManager(pod, c) + conf := networkManager.GetNetworkConfig() + + containerPortsMap, containerProtocolsMap, numToAlloc := parseConfig(conf, pod) + hostPorts := hpp.allocate(numToAlloc, pod.GetNamespace()+"/"+pod.GetName()) + + log.V(5).Infof("pod %s/%s allocated hostPorts %v", pod.GetNamespace(), pod.GetName(), hostPorts) + + // patch pod container ports + containers := pod.Spec.Containers + for cIndex, container := range pod.Spec.Containers { + if ports, ok := containerPortsMap[container.Name]; ok { + containerPorts := container.Ports + for i, port := range ports { + containerPort := corev1.ContainerPort{ + ContainerPort: port, + HostPort: hostPorts[numToAlloc-1], + Protocol: containerProtocolsMap[container.Name][i], + } + containerPorts = append(containerPorts, containerPort) + numToAlloc-- + } + containers[cIndex].Ports = containerPorts + } + } + pod.Spec.Containers = containers + return pod, nil +} + +func (hpp *HostPortPlugin) OnPodUpdated(c client.Client, pod *corev1.Pod, ctx context.Context) (*corev1.Pod, errors.PluginError) { + node := &corev1.Node{} + err := c.Get(ctx, types.NamespacedName{ + Name: pod.Spec.NodeName, + }, node) + if err != nil { + return pod, errors.NewPluginError(errors.ApiCallError, err.Error()) + } + iip, eip := getAddress(node) + + networkManager := utils.NewNetworkManager(pod, c) + status, _ := networkManager.GetNetworkStatus() + if status != nil { + return pod, nil + } + + iNetworkPorts := make([]gamekruiseiov1alpha1.NetworkPort, 0) + eNetworkPorts := make([]gamekruiseiov1alpha1.NetworkPort, 0) + for _, container := range pod.Spec.Containers { + for _, port := range container.Ports { + if port.HostPort >= hpp.minPort && port.HostPort <= hpp.maxPort { + containerPortIs := intstr.FromInt(int(port.ContainerPort)) + hostPortIs := intstr.FromInt(int(port.HostPort)) + iNetworkPorts = append(iNetworkPorts, gamekruiseiov1alpha1.NetworkPort{ + Name: container.Name + "-" + containerPortIs.String(), + Port: &containerPortIs, + Protocol: port.Protocol, + }) + eNetworkPorts = append(eNetworkPorts, gamekruiseiov1alpha1.NetworkPort{ + Name: container.Name + "-" + containerPortIs.String(), + Port: &hostPortIs, + Protocol: port.Protocol, + }) + } + } + } + + networkStatus := gamekruiseiov1alpha1.NetworkStatus{ + InternalAddresses: []gamekruiseiov1alpha1.NetworkAddress{ + { + IP: iip, + Ports: iNetworkPorts, + }, + }, + ExternalAddresses: []gamekruiseiov1alpha1.NetworkAddress{ + { + IP: eip, + Ports: eNetworkPorts, + }, + }, + CurrentNetworkState: gamekruiseiov1alpha1.NetworkReady, + } + + pod, err = networkManager.UpdateNetworkStatus(networkStatus, pod) + return pod, errors.ToPluginError(err, errors.InternalError) +} + +func (hpp *HostPortPlugin) OnPodDeleted(c client.Client, pod *corev1.Pod, ctx context.Context) errors.PluginError { + if _, ok := hpp.isAllocated[pod.GetNamespace()+"/"+pod.GetName()]; !ok { + return nil + } + + hostPorts := make([]int32, 0) + for _, container := range pod.Spec.Containers { + for _, port := range container.Ports { + if port.HostPort >= hpp.minPort && port.HostPort <= hpp.maxPort { + hostPorts = append(hostPorts, port.HostPort) + } + } + } + + hpp.deAllocate(hostPorts, pod.GetNamespace()+"/"+pod.GetName()) + return nil +} + +func (hpp *HostPortPlugin) Init(c client.Client, options cloudprovider.CloudProviderOptions, ctx context.Context) error { + hpp.mutex.Lock() + defer hpp.mutex.Unlock() + + hostPortOptions := options.(provideroptions.KubernetesOptions).HostPort + hpp.maxPort = hostPortOptions.MaxPort + hpp.minPort = hostPortOptions.MinPort + + newPortAmount := make(map[int32]int, hpp.maxPort-hpp.minPort+1) + for i := hpp.minPort; i <= hpp.maxPort; i++ { + newPortAmount[i] = 0 + } + podList := &corev1.PodList{} + err := c.List(ctx, podList) + if err != nil { + return err + } + for _, pod := range podList.Items { + if pod.GetAnnotations()[gamekruiseiov1alpha1.GameServerNetworkType] == HostPortNetwork { + for _, container := range pod.Spec.Containers { + for _, port := range container.Ports { + if port.HostPort >= hpp.minPort && port.HostPort <= hpp.maxPort { + newPortAmount[port.HostPort]++ + hpp.isAllocated[pod.GetNamespace()+"/"+pod.GetName()] = true + } + } + } + } + } + + size := 0 + for _, amount := range newPortAmount { + if amount > size { + size = amount + } + } + newAmountStat := make([]int, size+1) + for _, amount := range newPortAmount { + newAmountStat[amount]++ + } + + hpp.portAmount = newPortAmount + hpp.amountStat = newAmountStat + return nil +} + +func (hpp *HostPortPlugin) allocate(num int, nsname string) []int32 { + hpp.mutex.Lock() + defer hpp.mutex.Unlock() + + hostPorts, index := selectPorts(hpp.amountStat, hpp.portAmount, num) + for _, hostPort := range hostPorts { + hpp.portAmount[hostPort]++ + hpp.amountStat[index]-- + if index+1 >= len(hpp.amountStat) { + hpp.amountStat = append(hpp.amountStat, 0) + } + hpp.amountStat[index+1]++ + } + + hpp.isAllocated[nsname] = true + return hostPorts +} + +func (hpp *HostPortPlugin) deAllocate(hostPorts []int32, nsname string) { + hpp.mutex.Lock() + defer hpp.mutex.Unlock() + + for _, hostPort := range hostPorts { + amount := hpp.portAmount[hostPort] + hpp.portAmount[hostPort]-- + hpp.amountStat[amount]-- + hpp.amountStat[amount-1]++ + } + + delete(hpp.isAllocated, nsname) +} + +func verifyContainerName(containerName string, pod *corev1.Pod) bool { + for _, container := range pod.Spec.Containers { + if container.Name == containerName { + return true + } + } + return false +} + +func getAddress(node *corev1.Node) (string, string) { + var eip string + var iip string + + for _, a := range node.Status.Addresses { + if a.Type == corev1.NodeExternalIP && net.ParseIP(a.Address) != nil { + eip = a.Address + } + } + + for _, a := range node.Status.Addresses { + if a.Type == corev1.NodeExternalDNS { + eip = a.Address + } + } + + for _, a := range node.Status.Addresses { + if a.Type == corev1.NodeInternalIP && net.ParseIP(a.Address) != nil { + iip = a.Address + } + } + + for _, a := range node.Status.Addresses { + if a.Type == corev1.NodeInternalDNS { + iip = a.Address + } + } + + return iip, eip +} + +func parseConfig(conf []gamekruiseiov1alpha1.NetworkConfParams, pod *corev1.Pod) (map[string][]int32, map[string][]corev1.Protocol, int) { + numToAlloc := 0 + containerPortsMap := make(map[string][]int32) + containerProtocolsMap := make(map[string][]corev1.Protocol) + for _, c := range conf { + if c.Name == ContainerPortsKey { + cpSlice := strings.Split(c.Value, ":") + containerName := cpSlice[0] + if verifyContainerName(containerName, pod) && len(cpSlice) == 2 { + ports := make([]int32, 0) + protocols := make([]corev1.Protocol, 0) + for _, portString := range strings.Split(cpSlice[1], ",") { + ppSlice := strings.Split(portString, "/") + // handle port + port, err := strconv.ParseInt(ppSlice[0], 10, 32) + if err != nil { + continue + } + numToAlloc++ + ports = append(ports, int32(port)) + // handle protocol + if len(ppSlice) == 2 { + protocols = append(protocols, corev1.Protocol(ppSlice[1])) + } else { + protocols = append(protocols, corev1.ProtocolTCP) + } + } + containerPortsMap[containerName] = ports + containerProtocolsMap[containerName] = protocols + } + } + } + return containerPortsMap, containerProtocolsMap, numToAlloc +} + +func selectPorts(amountStat []int, portAmount map[int32]int, num int) ([]int32, int) { + var index int + for i, total := range amountStat { + if total >= num { + index = i + break + } + } + + hostPorts := make([]int32, 0) + for hostPort, amount := range portAmount { + if amount == index { + hostPorts = append(hostPorts, hostPort) + num-- + } + if num == 0 { + break + } + } + return hostPorts, index +} diff --git a/cloudprovider/kubernetes/hostPort_test.go b/cloudprovider/kubernetes/hostPort_test.go new file mode 100644 index 00000000..0b5172b0 --- /dev/null +++ b/cloudprovider/kubernetes/hostPort_test.go @@ -0,0 +1,59 @@ +/* +Copyright 2022 The Kruise Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kubernetes + +import ( + "testing" +) + +func TestSelectPorts(t *testing.T) { + tests := []struct { + amountStat []int + portAmount map[int32]int + num int + shouldIn []int32 + index int + }{ + { + amountStat: []int{8, 3}, + portAmount: map[int32]int{800: 0, 801: 0, 802: 0, 803: 1, 804: 0, 805: 1, 806: 0, 807: 0, 808: 1, 809: 0, 810: 0}, + num: 2, + shouldIn: []int32{800, 801, 802, 804, 806, 807, 809, 810}, + index: 0, + }, + } + + for _, test := range tests { + hostPorts, index := selectPorts(test.amountStat, test.portAmount, test.num) + if index != test.index { + t.Errorf("expect index %v but got %v", test.index, index) + } + + for _, hostPort := range hostPorts { + isIn := false + for _, si := range test.shouldIn { + if si == hostPort { + isIn = true + break + } + } + if !isIn { + t.Errorf("hostPort %d not in expect slice: %v", hostPort, test.shouldIn) + } + } + } +} diff --git a/cloudprovider/kubernetes/kubernetes.go b/cloudprovider/kubernetes/kubernetes.go new file mode 100644 index 00000000..73a8f544 --- /dev/null +++ b/cloudprovider/kubernetes/kubernetes.go @@ -0,0 +1,57 @@ +/* +Copyright 2022 The Kruise Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kubernetes + +import ( + "github.com/openkruise/kruise-game/cloudprovider" + "k8s.io/klog/v2" +) + +var ( + kubernetesProvider = &Provider{ + plugins: make(map[string]cloudprovider.Plugin), + } +) + +type Provider struct { + plugins map[string]cloudprovider.Plugin +} + +func (kp *Provider) Name() string { + return "Kubernetes" +} + +func (kp *Provider) ListPlugins() (map[string]cloudprovider.Plugin, error) { + if kp.plugins == nil { + return make(map[string]cloudprovider.Plugin), nil + } + + return kp.plugins, nil +} + +// register plugin of cloud provider and different cloud providers +func (kp *Provider) registerPlugin(plugin cloudprovider.Plugin) { + name := plugin.Name() + if name == "" { + klog.Fatal("empty plugin name") + } + kp.plugins[name] = plugin +} + +func NewKubernetesProvider() (cloudprovider.CloudProvider, error) { + return kubernetesProvider, nil +} diff --git a/cloudprovider/manager/provider_manager.go b/cloudprovider/manager/provider_manager.go new file mode 100644 index 00000000..2e231711 --- /dev/null +++ b/cloudprovider/manager/provider_manager.go @@ -0,0 +1,120 @@ +/* +Copyright 2022 The Kruise Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package manager + +import ( + "context" + "github.com/openkruise/kruise-game/apis/v1alpha1" + "github.com/openkruise/kruise-game/cloudprovider" + "github.com/openkruise/kruise-game/cloudprovider/alibabacloud" + "github.com/openkruise/kruise-game/cloudprovider/kubernetes" + corev1 "k8s.io/api/core/v1" + log "k8s.io/klog/v2" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +type ProviderManager struct { + CloudProviders map[string]cloudprovider.CloudProvider + CPOptions map[string]cloudprovider.CloudProviderOptions +} + +func (pm *ProviderManager) FindConfigs(cpName string) cloudprovider.CloudProviderOptions { + return pm.CPOptions[cpName] +} + +func (pm *ProviderManager) RegisterCloudProvider(provider cloudprovider.CloudProvider, options cloudprovider.CloudProviderOptions) { + if provider.Name() == "" { + log.Fatal("EmptyCloudProviderName") + } + + pm.CloudProviders[provider.Name()] = provider + pm.CPOptions[provider.Name()] = options +} + +func (pm *ProviderManager) FindAvailablePlugins(pod *corev1.Pod) (cloudprovider.Plugin, bool) { + pluginType, ok := pod.Annotations[v1alpha1.GameServerNetworkType] + if !ok { + log.V(5).Infof("Pod %s has no plugin configured and skip", pod.Name) + return nil, false + } + + for _, cp := range pm.CloudProviders { + plugins, err := cp.ListPlugins() + if err != nil { + log.Warningf("Cloud provider %s can not list plugins,because of %s", cp.Name(), err.Error()) + continue + } + for _, p := range plugins { + // TODO add multi plugins supported + if p.Name() == pluginType { + return p, true + } + } + } + return nil, false +} + +func (pm *ProviderManager) Init(client client.Client) { + for _, cp := range pm.CloudProviders { + name := cp.Name() + plugins, err := cp.ListPlugins() + if err != nil { + continue + } + log.Infof("Cloud Provider [%s] has been registered with %d plugins", name, len(plugins)) + for _, p := range plugins { + err := p.Init(client, pm.FindConfigs(cp.Name()), context.Background()) + if err != nil { + continue + } + log.Infof("plugin [%s] has been registered", p.Name()) + } + } +} + +// NewProviderManager return a new cloud provider manager instance +func NewProviderManager() (*ProviderManager, error) { + configFile := cloudprovider.NewConfigFile(cloudprovider.Opt.CloudProviderConfigFile) + configs := configFile.Parse() + + pm := &ProviderManager{ + CloudProviders: make(map[string]cloudprovider.CloudProvider), + CPOptions: make(map[string]cloudprovider.CloudProviderOptions), + } + + if configs.KubernetesOptions.Valid() && configs.KubernetesOptions.Enabled() { + // Register default kubernetes network provider + kp, err := kubernetes.NewKubernetesProvider() + if err != nil { + log.Errorf("Failed to initialized kubernetes provider,because of %s", err.Error()) + } else { + pm.RegisterCloudProvider(kp, configs.KubernetesOptions) + } + } + + if configs.AlibabaCloudOptions.Valid() && configs.AlibabaCloudOptions.Enabled() { + // build and register alibaba cloud provider + acp, err := alibabacloud.NewAlibabaCloudProvider() + if err != nil { + log.Errorf("Failed to initialize alibabacloud provider.because of %s", err.Error()) + } else { + pm.RegisterCloudProvider(acp, configs.AlibabaCloudOptions) + } + } + + return pm, nil +} diff --git a/cloudprovider/options/alibabacloud_options.go b/cloudprovider/options/alibabacloud_options.go new file mode 100644 index 00000000..d8cace25 --- /dev/null +++ b/cloudprovider/options/alibabacloud_options.go @@ -0,0 +1,27 @@ +package options + +type AlibabaCloudOptions struct { + Enable bool `toml:"enable"` + SLBOptions SLBOptions `toml:"slb"` +} + +type SLBOptions struct { + MaxPort int32 `toml:"max_port"` + MinPort int32 `toml:"min_port"` +} + +func (o AlibabaCloudOptions) Valid() bool { + // SLB valid + slbOptions := o.SLBOptions + if slbOptions.MaxPort-slbOptions.MinPort != 200 { + return false + } + if slbOptions.MinPort <= 0 { + return false + } + return true +} + +func (o AlibabaCloudOptions) Enabled() bool { + return o.Enable +} diff --git a/cloudprovider/options/kubernetes_options.go b/cloudprovider/options/kubernetes_options.go new file mode 100644 index 00000000..789123f7 --- /dev/null +++ b/cloudprovider/options/kubernetes_options.go @@ -0,0 +1,27 @@ +package options + +type KubernetesOptions struct { + Enable bool `toml:"enable"` + HostPort HostPortOptions `toml:"hostPort"` +} + +type HostPortOptions struct { + MaxPort int32 `toml:"max_port"` + MinPort int32 `toml:"min_port"` +} + +func (o KubernetesOptions) Valid() bool { + // HostPort valid + slbOptions := o.HostPort + if slbOptions.MaxPort <= slbOptions.MinPort { + return false + } + if slbOptions.MinPort <= 0 { + return false + } + return true +} + +func (o KubernetesOptions) Enabled() bool { + return o.Enable +} diff --git a/cloudprovider/utils/network_manager.go b/cloudprovider/utils/network_manager.go new file mode 100644 index 00000000..e4300cb1 --- /dev/null +++ b/cloudprovider/utils/network_manager.go @@ -0,0 +1,144 @@ +/* +Copyright 2022 The Kruise Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package utils + +import ( + "context" + "errors" + "github.com/openkruise/kruise-game/apis/v1alpha1" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/json" + log "k8s.io/klog/v2" + "sigs.k8s.io/controller-runtime/pkg/client" + "strconv" +) + +type NetworkManager struct { + pod *corev1.Pod + networkType string + networkConf []v1alpha1.NetworkConfParams + networkStatus *v1alpha1.NetworkStatus + networkDisabled bool + client client.Client +} + +func (nm *NetworkManager) GetNetworkDisabled() bool { + return nm.networkDisabled +} + +func (nm *NetworkManager) SetNetworkState(disabled bool) error { + patchPod := nm.pod.DeepCopy() + if patchPod == nil { + return errors.New("EmptyPodError") + } + + // Initial annotations if necessary + if patchPod.Annotations == nil { + patchPod.Annotations = make(map[string]string) + } + + patchPod.Annotations[v1alpha1.GameServerNetworkDisabled] = strconv.FormatBool(disabled) + patch := client.MergeFrom(patchPod) + return nm.client.Patch(context.Background(), nm.pod, patch, nil) +} + +func (nm *NetworkManager) GetNetworkStatus() (*v1alpha1.NetworkStatus, error) { + p := nm.pod.DeepCopy() + if p == nil || p.Annotations == nil { + return nil, errors.New("EmptyPodError") + } + networkStatusStr := p.Annotations[v1alpha1.GameServerNetworkStatus] + + if networkStatusStr == "" { + return nil, nil + } + networkStatus := &v1alpha1.NetworkStatus{} + + err := json.Unmarshal([]byte(networkStatusStr), networkStatus) + if err != nil { + log.Errorf("Failed to unmarshal pod %s networkStatus,because of %s", p.Name, err.Error()) + return nil, err + } + + return networkStatus, nil +} + +func (nm *NetworkManager) UpdateNetworkStatus(networkStatus v1alpha1.NetworkStatus, pod *corev1.Pod) (*corev1.Pod, error) { + networkStatusBytes, err := json.Marshal(networkStatus) + if err != nil { + log.Errorf("pod %s can not update networkStatus,because of %s", nm.pod.Name, err.Error()) + return pod, err + } + pod.Annotations[v1alpha1.GameServerNetworkStatus] = string(networkStatusBytes) + return pod, nil +} + +func (nm *NetworkManager) GetNetworkConfig() []v1alpha1.NetworkConfParams { + return nm.networkConf +} + +func (nm *NetworkManager) GetNetworkType() string { + return nm.networkType +} + +func NewNetworkManager(pod *corev1.Pod, client client.Client) *NetworkManager { + var ok bool + var err error + + var networkType string + if networkType, ok = pod.Annotations[v1alpha1.GameServerNetworkType]; !ok { + log.V(5).Infof("Pod %s has no network conf", pod.Name) + return nil + } + + var networkConfStr string + var networkConf []v1alpha1.NetworkConfParams + if networkConfStr, ok = pod.Annotations[v1alpha1.GameServerNetworkConf]; ok { + err = json.Unmarshal([]byte(networkConfStr), &networkConf) + if err != nil { + log.Warningf("Pod %s has invalid network conf, err: %s", pod.Name, err.Error()) + return nil + } + } + + // If valid and use status as default + var networkStatusStr string + networkStatus := &v1alpha1.NetworkStatus{} + if networkStatusStr, ok = pod.Annotations[v1alpha1.GameServerNetworkStatus]; ok { + err = json.Unmarshal([]byte(networkStatusStr), networkStatus) + if err != nil { + log.Warningf("Pod %s has invalid network status, err: %s", pod.Name, err.Error()) + } + } + + var networkDisabled bool + if networkDisabledStr, ok := pod.Annotations[v1alpha1.GameServerNetworkDisabled]; ok { + networkDisabled, err = strconv.ParseBool(networkDisabledStr) + if err != nil { + log.Warningf("Pod %s has invalid network disabled option, err: %s", pod.Name, err.Error()) + } + } + + return &NetworkManager{ + pod: pod, + networkType: networkType, + networkConf: networkConf, + networkStatus: networkStatus, + networkDisabled: networkDisabled, + client: client, + } +} diff --git a/config/crd/bases/alibabacloud.com_poddnats.yaml b/config/crd/bases/alibabacloud.com_poddnats.yaml new file mode 100644 index 00000000..87219e9c --- /dev/null +++ b/config/crd/bases/alibabacloud.com_poddnats.yaml @@ -0,0 +1,96 @@ +--- +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + annotations: + controller-gen.kubebuilder.io/version: v0.9.0 + creationTimestamp: null + name: poddnats.alibabacloud.com +spec: + group: alibabacloud.com + names: + kind: PodDNAT + listKind: PodDNATList + plural: poddnats + singular: poddnat + scope: Namespaced + versions: + - name: v1 + schema: + openAPIV3Schema: + description: PodDNAT let you specficy DNAT rule for pod on nat gateway + properties: + apiVersion: + description: 'APIVersion defines the versioned schema of this representation + of an object. Servers should convert recognized schemas to the latest + internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources' + type: string + kind: + description: 'Kind is a string value representing the REST resource this + object represents. Servers may infer this from the endpoint the client + submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds' + type: string + metadata: + type: object + spec: + description: 'Spec is the desired state of the PodDNAT. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#spec-and-status' + properties: + eni: + type: string + entryId: + type: string + externalIP: + type: string + externalPort: + type: string + internalIP: + type: string + internalPort: + type: string + portMapping: + items: + properties: + externalPort: + type: string + internalPort: + type: string + type: object + type: array + protocol: + type: string + tableId: + type: string + vswitch: + type: string + zoneID: + type: string + type: object + status: + description: '''Status is the current state of the dnat. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#spec-and-status' + properties: + created: + description: created create status + type: string + entries: + description: entries + items: + description: Entry record for forwardEntry + properties: + externalIP: + type: string + externalPort: + type: string + forwardEntryId: + type: string + internalIP: + type: string + internalPort: + type: string + ipProtocol: + type: string + type: object + type: array + type: object + type: object + served: true + storage: true diff --git a/config/manager/config.toml b/config/manager/config.toml new file mode 100644 index 00000000..69a125b1 --- /dev/null +++ b/config/manager/config.toml @@ -0,0 +1,11 @@ +[kubernetes] +enable = true +[kubernetes.hostPort] +max_port = 9000 +min_port = 8000 + +[alibabacloud] +enable = true +[alibabacloud.slb] +max_port = 700 +min_port = 500 diff --git a/config/manager/kustomization.yaml b/config/manager/kustomization.yaml index 29d11a06..3d73996d 100644 --- a/config/manager/kustomization.yaml +++ b/config/manager/kustomization.yaml @@ -7,6 +7,7 @@ generatorOptions: configMapGenerator: - files: - controller_manager_config.yaml + - config.toml name: manager-config apiVersion: kustomize.config.k8s.io/v1beta1 kind: Kustomization diff --git a/config/manager/manager.yaml b/config/manager/manager.yaml index e65c4c61..6ce080b3 100644 --- a/config/manager/manager.yaml +++ b/config/manager/manager.yaml @@ -38,6 +38,7 @@ spec: - /manager args: - --leader-elect=false + - --provider-config=/etc/kruise-game/config.toml image: controller:latest name: manager securityContext: @@ -66,5 +67,17 @@ spec: requests: cpu: 10m memory: 64Mi + volumeMounts: + - mountPath: /etc/kruise-game + name: provider-config serviceAccountName: controller-manager terminationGracePeriodSeconds: 10 + volumes: + - configMap: + defaultMode: 420 + items: + - key: config.toml + path: config.toml + name: kruise-game-manager-config + name: provider-config + diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml index 8f584d23..36f0ab04 100644 --- a/config/rbac/role.yaml +++ b/config/rbac/role.yaml @@ -5,6 +5,13 @@ metadata: creationTimestamp: null name: manager-role rules: +- apiGroups: + - "" + resources: + - events + verbs: + - create + - patch - apiGroups: - admissionregistration.k8s.io resources: @@ -27,6 +34,20 @@ rules: - patch - update - watch +- apiGroups: + - alibabacloud.com + resources: + - poddnats + verbs: + - get + - list + - watch +- apiGroups: + - alibabacloud.com + resources: + - poddnats/status + verbs: + - get - apiGroups: - apiextensions.k8s.io resources: @@ -69,6 +90,20 @@ rules: - get - patch - update +- apiGroups: + - "" + resources: + - nodes + verbs: + - get + - list + - watch +- apiGroups: + - "" + resources: + - nodes/status + verbs: + - get - apiGroups: - "" resources: @@ -89,6 +124,26 @@ rules: - get - patch - update +- apiGroups: + - "" + resources: + - services + verbs: + - create + - delete + - get + - list + - patch + - update + - watch +- apiGroups: + - "" + resources: + - services/status + verbs: + - get + - patch + - update - apiGroups: - game.kruise.io resources: diff --git a/go.mod b/go.mod index 6343c102..ab020a7d 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/openkruise/kruise-game go 1.18 require ( + github.com/BurntSushi/toml v0.3.1 github.com/davecgh/go-spew v1.1.1 github.com/onsi/ginkgo v1.16.5 github.com/onsi/gomega v1.18.1 diff --git a/go.sum b/go.sum index 21db659b..cb89074f 100644 --- a/go.sum +++ b/go.sum @@ -53,6 +53,7 @@ github.com/Azure/go-autorest/logger v0.2.1 h1:IG7i4p/mDa2Ce4TRyAO8IHnVhAVF3RFU+Z github.com/Azure/go-autorest/logger v0.2.1/go.mod h1:T9E3cAhj2VqvPOtCYAvby9aBXkZmbF5NWuPV8+WeEW8= github.com/Azure/go-autorest/tracing v0.6.0 h1:TYi4+3m5t6K48TGI9AUdb+IzbnSxvnvUMfuitfgcfuo= github.com/Azure/go-autorest/tracing v0.6.0/go.mod h1:+vhtPC754Xsa23ID7GlGsrdKBpUA79WCAKPPZVC2DeU= +github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/NYTimes/gziphandler v0.0.0-20170623195520-56545f4a5d46/go.mod h1:3wb06e3pkSAbeQ52E9H9iFoQsEEwGN64994WTCIhntQ= diff --git a/main.go b/main.go index 35da340e..6d2e3d0b 100644 --- a/main.go +++ b/main.go @@ -20,6 +20,8 @@ import ( "flag" kruiseV1alpha1 "github.com/openkruise/kruise-api/apps/v1alpha1" kruiseV1beta1 "github.com/openkruise/kruise-api/apps/v1beta1" + "github.com/openkruise/kruise-game/cloudprovider" + cpmanager "github.com/openkruise/kruise-game/cloudprovider/manager" controller "github.com/openkruise/kruise-game/pkg/controllers" "github.com/openkruise/kruise-game/pkg/webhook" "os" @@ -29,6 +31,7 @@ import ( // to ensure that exec-entrypoint and run can make use of them. _ "k8s.io/client-go/plugin/pkg/client/auth" + aliv1 "github.com/openkruise/kruise-game/cloudprovider/alibabacloud/apis/v1" "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" clientgoscheme "k8s.io/client-go/kubernetes/scheme" @@ -52,6 +55,8 @@ func init() { utilruntime.Must(gamekruiseiov1alpha1.AddToScheme(scheme)) utilruntime.Must(kruiseV1beta1.AddToScheme(scheme)) utilruntime.Must(kruiseV1alpha1.AddToScheme(scheme)) + + utilruntime.Must(aliv1.AddToScheme(scheme)) //+kubebuilder:scaffold:scheme } @@ -70,6 +75,9 @@ func main() { "Namespace if specified restricts the manager's cache to watch objects in the desired namespace. Defaults to all namespaces.") flag.StringVar(&syncPeriodStr, "sync-period", "", "Determines the minimum frequency at which watched resources are reconciled.") + // Add cloud provider flags + cloudprovider.InitCloudProviderFlags() + opts := zap.Options{ Development: true, } @@ -117,8 +125,14 @@ func main() { os.Exit(1) } + cloudProviderManager, err := cpmanager.NewProviderManager() + if err != nil { + setupLog.Error(err, "unable to set up cloud provider manager") + os.Exit(1) + } + // create webhook server - wss := webhook.NewWebhookServer(mgr) + wss := webhook.NewWebhookServer(mgr, cloudProviderManager) // validate webhook server if err := wss.SetupWithManager(mgr).Initialize(mgr.GetConfig()); err != nil { setupLog.Error(err, "unable to set up webhook server") @@ -135,17 +149,24 @@ func main() { os.Exit(1) } + signal := ctrl.SetupSignalHandler() go func() { setupLog.Info("setup controllers") if err = controller.SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to setup controllers") os.Exit(1) } + + setupLog.Info("waiting for cache sync") + if mgr.GetCache().WaitForCacheSync(signal) { + setupLog.Info("cache synced, cloud provider manager start to init") + cloudProviderManager.Init(mgr.GetClient()) + } }() setupLog.Info("starting kruise-game-manager") - if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil { + if err := mgr.Start(signal); err != nil { setupLog.Error(err, "problem running manager") os.Exit(1) } diff --git a/pkg/controllers/gameserver/gameserver_controller.go b/pkg/controllers/gameserver/gameserver_controller.go index adba8c30..ed9f4c9b 100644 --- a/pkg/controllers/gameserver/gameserver_controller.go +++ b/pkg/controllers/gameserver/gameserver_controller.go @@ -191,7 +191,7 @@ func (r *GameServerReconciler) Reconcile(ctx context.Context, req ctrl.Request) return reconcile.Result{}, nil } - gsm := NewGameServerManager(gs, pod, r.Client) + gsm := NewGameServerManager(gs, pod, r.Client, r.recorder) gss, err := r.getGameServerSet(pod) if err != nil { @@ -204,7 +204,7 @@ func (r *GameServerReconciler) Reconcile(ctx context.Context, req ctrl.Request) podUpdated, err := gsm.SyncGsToPod() if err != nil || podUpdated { - return reconcile.Result{Requeue: podUpdated, RequeueAfter: 3 * time.Second}, err + return reconcile.Result{RequeueAfter: 3 * time.Second}, err } err = gsm.SyncPodToGs(gss) @@ -212,6 +212,10 @@ func (r *GameServerReconciler) Reconcile(ctx context.Context, req ctrl.Request) return reconcile.Result{}, err } + if gsm.WaitOrNot() { + return ctrl.Result{RequeueAfter: NetworkIntervalTime}, nil + } + return ctrl.Result{}, nil } @@ -254,6 +258,11 @@ func (r *GameServerReconciler) initGameServer(pod *corev1.Pod) error { ors = append(ors, or) gs.OwnerReferences = ors + // set Labels + gsLabels := make(map[string]string) + gsLabels[gamekruiseiov1alpha1.GameServerOwnerGssKey] = gss.GetName() + gs.SetLabels(gsLabels) + // set NetWork gs.Spec.NetworkDisabled = false diff --git a/pkg/controllers/gameserver/gameserver_manager.go b/pkg/controllers/gameserver/gameserver_manager.go index e10a4ac7..43ec4952 100644 --- a/pkg/controllers/gameserver/gameserver_manager.go +++ b/pkg/controllers/gameserver/gameserver_manager.go @@ -20,6 +20,7 @@ import ( "context" kruisePub "github.com/openkruise/kruise-api/apps/pub" gameKruiseV1alpha1 "github.com/openkruise/kruise-game/apis/v1alpha1" + "github.com/openkruise/kruise-game/cloudprovider/utils" "github.com/openkruise/kruise-game/pkg/util" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" @@ -27,9 +28,22 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/json" + "k8s.io/client-go/tools/record" "k8s.io/klog/v2" + "reflect" "sigs.k8s.io/controller-runtime/pkg/client" "strconv" + "time" +) + +const ( + NetworkTotalWaitTime = 60 * time.Second + NetworkIntervalTime = 5 * time.Second + TimeFormat = "2006-01-02 15:04:05" +) + +const ( + StateReason = "GsStateChanged" ) type Control interface { @@ -39,12 +53,15 @@ type Control interface { SyncGsToPod() (bool, error) // SyncPodToGs compares the GameServer with pod, and update the GameServer. SyncPodToGs(*gameKruiseV1alpha1.GameServerSet) error + // WaitOrNot compare the current game server network status to decide whether to re-queue. + WaitOrNot() bool } type GameServerManager struct { - gameServer *gameKruiseV1alpha1.GameServer - pod *corev1.Pod - client client.Client + gameServer *gameKruiseV1alpha1.GameServer + pod *corev1.Pod + client client.Client + eventRecorder record.EventRecorder } func (manager GameServerManager) SyncGsToPod() (bool, error) { @@ -58,16 +75,30 @@ func (manager GameServerManager) SyncGsToPod() (bool, error) { updated := false newLabels := make(map[string]string) + newAnnotations := make(map[string]string) if gs.Spec.DeletionPriority.String() != podDeletePriority { newLabels[gameKruiseV1alpha1.GameServerDeletePriorityKey] = gs.Spec.DeletionPriority.String() + if podDeletePriority != "" { + manager.eventRecorder.Eventf(gs, corev1.EventTypeNormal, StateReason, "DeletionPriority turn from %s to %s ", podDeletePriority, gs.Spec.DeletionPriority.String()) + } updated = true } if gs.Spec.UpdatePriority.String() != podUpdatePriority { newLabels[gameKruiseV1alpha1.GameServerUpdatePriorityKey] = gs.Spec.UpdatePriority.String() + if podUpdatePriority != "" { + manager.eventRecorder.Eventf(gs, corev1.EventTypeNormal, StateReason, "UpdatePriority turn from %s to %s ", podUpdatePriority, gs.Spec.UpdatePriority.String()) + } updated = true } if string(gs.Spec.OpsState) != podGsOpsState { newLabels[gameKruiseV1alpha1.GameServerOpsStateKey] = string(gs.Spec.OpsState) + if podGsOpsState != "" { + eventType := corev1.EventTypeNormal + if gs.Spec.OpsState == gameKruiseV1alpha1.Maintaining { + eventType = corev1.EventTypeWarning + } + manager.eventRecorder.Eventf(gs, eventType, StateReason, "OpsState turn from %s to %s ", podGsOpsState, string(gs.Spec.OpsState)) + } updated = true } @@ -104,11 +135,31 @@ func (manager GameServerManager) SyncGsToPod() (bool, error) { } if string(gsState) != podGsState { newLabels[gameKruiseV1alpha1.GameServerStateKey] = string(gsState) + if podGsState != "" { + eventType := corev1.EventTypeNormal + if gsState == gameKruiseV1alpha1.Crash { + eventType = corev1.EventTypeWarning + } + manager.eventRecorder.Eventf(gs, eventType, StateReason, "State turn from %s to %s ", podGsState, string(gsState)) + } updated = true } + if gsState == gameKruiseV1alpha1.Ready && pod.Annotations[gameKruiseV1alpha1.GameServerNetworkType] != "" { + if pod.Annotations[gameKruiseV1alpha1.GameServerNetworkDisabled] != strconv.FormatBool(gs.Spec.NetworkDisabled) { + newAnnotations[gameKruiseV1alpha1.GameServerNetworkDisabled] = strconv.FormatBool(gs.Spec.NetworkDisabled) + updated = true + } + + oldTime, err := time.Parse(TimeFormat, pod.Annotations[gameKruiseV1alpha1.GameServerNetworkTriggerTime]) + if (err == nil && time.Since(oldTime) > NetworkIntervalTime) || (pod.Annotations[gameKruiseV1alpha1.GameServerNetworkTriggerTime] == "") { + newAnnotations[gameKruiseV1alpha1.GameServerNetworkTriggerTime] = time.Now().Format(TimeFormat) + updated = true + } + } + if updated { - patchPod := map[string]interface{}{"metadata": map[string]map[string]string{"labels": newLabels}} + patchPod := map[string]interface{}{"metadata": map[string]map[string]string{"labels": newLabels, "annotations": newAnnotations}} patchPodBytes, err := json.Marshal(patchPod) if err != nil { return updated, err @@ -156,6 +207,7 @@ func (manager GameServerManager) SyncPodToGs(gss *gameKruiseV1alpha1.GameServerS UpdatePriority: &podUpdatePriority, DeletionPriority: &podDeletePriority, ServiceQualitiesCondition: newGsConditions, + NetworkStatus: manager.syncNetworkStatus(), LastTransitionTime: metav1.Now(), } patchStatus := map[string]interface{}{"status": status} @@ -172,6 +224,60 @@ func (manager GameServerManager) SyncPodToGs(gss *gameKruiseV1alpha1.GameServerS return nil } +func (manager GameServerManager) WaitOrNot() bool { + networkStatus := manager.gameServer.Status.NetworkStatus + alreadyWait := time.Since(networkStatus.LastTransitionTime.Time) + if networkStatus.DesiredNetworkState != networkStatus.CurrentNetworkState && alreadyWait < NetworkTotalWaitTime { + klog.Infof("GameServer %s/%s DesiredNetworkState: %s CurrentNetworkState: %s. %v remaining", + manager.gameServer.GetNamespace(), manager.gameServer.GetName(), networkStatus.DesiredNetworkState, networkStatus.CurrentNetworkState, NetworkTotalWaitTime-alreadyWait) + return true + } + return false +} + +func (manager GameServerManager) syncNetworkStatus() gameKruiseV1alpha1.NetworkStatus { + // No Network, return default + gsNetworkStatus := manager.gameServer.Status.NetworkStatus + nm := utils.NewNetworkManager(manager.pod, manager.client) + if nm == nil { + return gameKruiseV1alpha1.NetworkStatus{} + } + + // NetworkStatus Init + if reflect.DeepEqual(gsNetworkStatus, gameKruiseV1alpha1.NetworkStatus{}) { + return gameKruiseV1alpha1.NetworkStatus{ + NetworkType: nm.GetNetworkType(), + DesiredNetworkState: gameKruiseV1alpha1.NetworkReady, + CreateTime: metav1.Now(), + LastTransitionTime: metav1.Now(), + } + } + + // when pod NetworkStatus is nil + podNetworkStatus, _ := nm.GetNetworkStatus() + if podNetworkStatus == nil { + return gsNetworkStatus + } + + gsNetworkStatus.InternalAddresses = podNetworkStatus.InternalAddresses + gsNetworkStatus.ExternalAddresses = podNetworkStatus.ExternalAddresses + gsNetworkStatus.CurrentNetworkState = podNetworkStatus.CurrentNetworkState + + if gsNetworkStatus.DesiredNetworkState != desiredNetworkState(nm.GetNetworkDisabled()) { + gsNetworkStatus.DesiredNetworkState = desiredNetworkState(nm.GetNetworkDisabled()) + gsNetworkStatus.LastTransitionTime = metav1.Now() + } + + return gsNetworkStatus +} + +func desiredNetworkState(disabled bool) gameKruiseV1alpha1.NetworkState { + if disabled { + return gameKruiseV1alpha1.NetworkNotReady + } + return gameKruiseV1alpha1.NetworkReady +} + func syncServiceQualities(serviceQualities []gameKruiseV1alpha1.ServiceQuality, podConditions []corev1.PodCondition, sqConditions []gameKruiseV1alpha1.ServiceQualityCondition) (gameKruiseV1alpha1.GameServerSpec, []gameKruiseV1alpha1.ServiceQualityCondition) { var spec gameKruiseV1alpha1.GameServerSpec var newGsConditions []gameKruiseV1alpha1.ServiceQualityCondition @@ -211,10 +317,11 @@ func syncServiceQualities(serviceQualities []gameKruiseV1alpha1.ServiceQuality, return spec, newGsConditions } -func NewGameServerManager(gs *gameKruiseV1alpha1.GameServer, pod *corev1.Pod, c client.Client) Control { +func NewGameServerManager(gs *gameKruiseV1alpha1.GameServer, pod *corev1.Pod, c client.Client, recorder record.EventRecorder) Control { return &GameServerManager{ - gameServer: gs, - pod: pod, - client: c, + gameServer: gs, + pod: pod, + client: c, + eventRecorder: recorder, } } diff --git a/pkg/controllers/gameserverset/gameserverset_controller.go b/pkg/controllers/gameserverset/gameserverset_controller.go index cce1697d..606cd6a4 100644 --- a/pkg/controllers/gameserverset/gameserverset_controller.go +++ b/pkg/controllers/gameserverset/gameserverset_controller.go @@ -185,6 +185,7 @@ func (r *GameServerSetReconciler) Reconcile(ctx context.Context, req ctrl.Reques klog.Errorf("failed to create advanced statefulset %s in %s,because of %s.", namespacedName.Name, namespacedName.Namespace, err.Error()) return reconcile.Result{}, err } + r.recorder.Event(gss, corev1.EventTypeNormal, CreateWorkloadReason, "created Advanced StatefulSet") return reconcile.Result{}, nil } klog.Errorf("failed to find advanced statefulset %s in %s,because of %s.", namespacedName.Name, namespacedName.Namespace, err.Error()) @@ -203,7 +204,7 @@ func (r *GameServerSetReconciler) Reconcile(ctx context.Context, req ctrl.Reques return reconcile.Result{}, err } - gsm := NewGameServerSetManager(gss, asts, podList.Items, r.Client) + gsm := NewGameServerSetManager(gss, asts, podList.Items, r.Client, r.recorder) // scale game servers if gsm.IsNeedToScale() { @@ -222,6 +223,7 @@ func (r *GameServerSetReconciler) Reconcile(ctx context.Context, req ctrl.Reques klog.Errorf("GameServerSet %s failed to synchronize workload in %s,because of %s.", namespacedName.Name, namespacedName.Namespace, err.Error()) return reconcile.Result{}, err } + r.recorder.Event(gss, corev1.EventTypeNormal, UpdateWorkloadReason, "updated Advanced StatefulSet") return reconcile.Result{}, nil } @@ -238,6 +240,12 @@ func (r *GameServerSetReconciler) Reconcile(ctx context.Context, req ctrl.Reques return reconcile.Result{}, err } + err = gsm.SyncGameServerReplicas() + if err != nil { + klog.Errorf("GameServerSet %s failed to adjust the replicas of GameServers to match that of Pods in %s, because of %s.", namespacedName.Name, namespacedName.Namespace, err.Error()) + return reconcile.Result{}, err + } + return ctrl.Result{}, nil } diff --git a/pkg/controllers/gameserverset/gameserverset_manager.go b/pkg/controllers/gameserverset/gameserverset_manager.go index 70ad7e95..1d318c77 100644 --- a/pkg/controllers/gameserverset/gameserverset_manager.go +++ b/pkg/controllers/gameserverset/gameserverset_manager.go @@ -18,8 +18,9 @@ package gameserverset import ( "context" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/client-go/tools/record" "sort" - "strconv" "sync" "time" @@ -46,21 +47,32 @@ type Control interface { IsNeedToScale() bool IsNeedToUpdateWorkload() bool SyncPodProbeMarker() error + SyncGameServerReplicas() error } +const ( + ScaleReason = "Scale" + CreatePPMReason = "CreatePpm" + UpdatePPMReason = "UpdatePpm" + CreateWorkloadReason = "CreateWorkload" + UpdateWorkloadReason = "UpdateWorkload" +) + type GameServerSetManager struct { gameServerSet *gameKruiseV1alpha1.GameServerSet asts *kruiseV1beta1.StatefulSet podList []corev1.Pod client client.Client + eventRecorder record.EventRecorder } -func NewGameServerSetManager(gss *gameKruiseV1alpha1.GameServerSet, asts *kruiseV1beta1.StatefulSet, gsList []corev1.Pod, c client.Client) Control { +func NewGameServerSetManager(gss *gameKruiseV1alpha1.GameServerSet, asts *kruiseV1beta1.StatefulSet, gsList []corev1.Pod, c client.Client, recorder record.EventRecorder) Control { return &GameServerSetManager{ gameServerSet: gss, asts: asts, podList: gsList, client: c, + eventRecorder: recorder, } } @@ -93,10 +105,9 @@ func (manager *GameServerSetManager) GameServerScale() error { gssReserveIds := gss.Spec.ReserveGameServerIds klog.Infof("GameServers %s/%s already has %d replicas, expect to have %d replicas.", gss.GetNamespace(), gss.GetName(), currentReplicas, expectedReplicas) + manager.eventRecorder.Eventf(gss, corev1.EventTypeNormal, ScaleReason, "scale from %d to %d", currentReplicas, expectedReplicas) - newNotExistIds, deleteIds := computeToScaleGs(gssReserveIds, reserveIds, notExistIds, expectedReplicas, gsList) - - asts.Spec.ReserveOrdinals = append(gssReserveIds, newNotExistIds...) + asts.Spec.ReserveOrdinals = append(gssReserveIds, computeToScaleGs(gssReserveIds, reserveIds, notExistIds, expectedReplicas, gsList)...) asts.Spec.Replicas = gss.Spec.Replicas asts.Spec.ScaleStrategy = &kruiseV1beta1.StatefulSetScaleStrategy{ MaxUnavailable: gss.Spec.ScaleStrategy.MaxUnavailable, @@ -117,10 +128,10 @@ func (manager *GameServerSetManager) GameServerScale() error { return err } - return manager.deleteGameServers(deleteIds) + return nil } -func computeToScaleGs(gssReserveIds, reserveIds, notExistIds []int, expectedReplicas int, pods []corev1.Pod) ([]int, []int) { +func computeToScaleGs(gssReserveIds, reserveIds, notExistIds []int, expectedReplicas int, pods []corev1.Pod) []int { workloadManageIds := util.GetIndexListFromPodList(pods) var toAdd []int @@ -182,53 +193,63 @@ func computeToScaleGs(gssReserveIds, reserveIds, notExistIds []int, expectedRepl } } - deleteIds := util.GetSliceInANotInB(workloadManageIds, newManageIds) - - return newNotExistIds, deleteIds + return newNotExistIds } -func (manager *GameServerSetManager) deleteGameServers(deleteIds []int) error { +func (manager *GameServerSetManager) SyncGameServerReplicas() error { gss := manager.gameServerSet + gsList := &gameKruiseV1alpha1.GameServerList{} + err := manager.client.List(context.Background(), gsList, &client.ListOptions{ + Namespace: gss.GetNamespace(), + LabelSelector: labels.SelectorFromSet(map[string]string{ + gameKruiseV1alpha1.GameServerOwnerGssKey: gss.GetName(), + })}) + if err != nil { + return err + } + podIds := util.GetIndexListFromPodList(manager.podList) + + gsMap := make(map[int]int) + deleteIds := make([]int, 0) + for id, gs := range gsList.Items { + gsId := util.GetIndexFromGsName(gs.Name) + if !util.IsNumInList(gsId, podIds) { + gsMap[gsId] = id + deleteIds = append(deleteIds, gsId) + } + } + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) defer cancel() errch := make(chan error, len(deleteIds)) var wg sync.WaitGroup - for _, id := range deleteIds { + for _, gsId := range deleteIds { wg.Add(1) - id := id + id := gsId go func(ctx context.Context) { defer wg.Done() defer ctx.Done() - gsName := gss.GetName() + "-" + strconv.Itoa(id) - gs := &gameKruiseV1alpha1.GameServer{} - err := manager.client.Get(ctx, types.NamespacedName{ - Name: gsName, - Namespace: gss.GetNamespace(), - }, gs) + + gs := gsList.Items[gsMap[id]].DeepCopy() + gsLabels := make(map[string]string) + gsLabels[gameKruiseV1alpha1.GameServerDeletingKey] = "true" + patchGs := map[string]interface{}{"metadata": map[string]map[string]string{"labels": gsLabels}} + patchBytes, err := json.Marshal(patchGs) if err != nil { - if !errors.IsNotFound(err) { - errch <- err - } - return - } - newLabels := gs.GetLabels() - if newLabels == nil { - newLabels = make(map[string]string) + errch <- err } - newLabels[gameKruiseV1alpha1.GameServerDeletingKey] = "true" - gs.SetLabels(newLabels) - err = manager.client.Update(ctx, gs) - if err != nil { + err = manager.client.Patch(context.TODO(), gs, client.RawPatch(types.MergePatchType, patchBytes)) + if err != nil && !errors.IsNotFound(err) { errch <- err } }(ctx) } wg.Wait() close(errch) - err := <-errch + err = <-errch if err != nil { - klog.Errorf("failed to delete GameServers %s in %s because of %s.", gss.GetNamespace(), err.Error()) + klog.Errorf("failed to delete GameServers %s in %s because of %s.", gss.GetName(), gss.GetNamespace(), err.Error()) return err } @@ -269,6 +290,7 @@ func (manager *GameServerSetManager) SyncPodProbeMarker() error { return nil } // create ppm + manager.eventRecorder.Event(gss, corev1.EventTypeNormal, CreatePPMReason, "create PodProbeMarker") return c.Create(ctx, createPpm(gss)) } return err @@ -282,6 +304,7 @@ func (manager *GameServerSetManager) SyncPodProbeMarker() error { // update ppm if util.GetHash(gss.Spec.ServiceQualities) != ppm.GetAnnotations()[gameKruiseV1alpha1.PpmHashKey] { ppm.Spec.Probes = constructProbes(gss) + manager.eventRecorder.Event(gss, corev1.EventTypeNormal, UpdatePPMReason, "update PodProbeMarker") return c.Update(ctx, ppm) } return nil diff --git a/pkg/util/gameserver.go b/pkg/util/gameserver.go index 3ef6c745..0dd9a28d 100644 --- a/pkg/util/gameserver.go +++ b/pkg/util/gameserver.go @@ -17,11 +17,15 @@ limitations under the License. package util import ( + "context" + "encoding/json" appspub "github.com/openkruise/kruise-api/apps/pub" kruiseV1beta1 "github.com/openkruise/kruise-api/apps/v1beta1" gameKruiseV1alpha1 "github.com/openkruise/kruise-game/apis/v1alpha1" apps "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" "strconv" "strings" ) @@ -106,6 +110,14 @@ func GetNewAstsFromGss(gss *gameKruiseV1alpha1.GameServerSet, asts *kruiseV1beta // set pod annotations podAnnotations := gss.Spec.GameServerTemplate.GetAnnotations() + if gss.Spec.Network != nil { + if podAnnotations == nil { + podAnnotations = make(map[string]string) + } + networkConfig, _ := json.Marshal(gss.Spec.Network.NetworkConf) + podAnnotations[gameKruiseV1alpha1.GameServerNetworkConf] = string(networkConfig) + podAnnotations[gameKruiseV1alpha1.GameServerNetworkType] = gss.Spec.Network.NetworkType + } asts.Spec.Template.SetAnnotations(podAnnotations) // set template spec @@ -154,12 +166,18 @@ func GetNewAstsFromGss(gss *gameKruiseV1alpha1.GameServerSet, asts *kruiseV1beta type astsToUpdate struct { UpdateStrategy gameKruiseV1alpha1.UpdateStrategy Template gameKruiseV1alpha1.GameServerTemplate + NetworkConfigs []gameKruiseV1alpha1.NetworkConfParams } func GetAstsHash(gss *gameKruiseV1alpha1.GameServerSet) string { + var networkConfigs []gameKruiseV1alpha1.NetworkConfParams + if gss.Spec.Network != nil { + networkConfigs = gss.Spec.Network.NetworkConf + } return GetHash(astsToUpdate{ UpdateStrategy: gss.Spec.UpdateStrategy, Template: gss.Spec.GameServerTemplate, + NetworkConfigs: networkConfigs, }) } @@ -170,3 +188,13 @@ func AddPrefixGameKruise(s string) string { func RemovePrefixGameKruise(s string) string { return strings.TrimPrefix(s, "game.kruise.io/") } + +func GetGameServerSetOfPod(pod *corev1.Pod, c client.Client, ctx context.Context) (*gameKruiseV1alpha1.GameServerSet, error) { + gssName := pod.GetLabels()[gameKruiseV1alpha1.GameServerOwnerGssKey] + gss := &gameKruiseV1alpha1.GameServerSet{} + err := c.Get(ctx, types.NamespacedName{ + Namespace: pod.GetNamespace(), + Name: gssName, + }, gss) + return gss, err +} diff --git a/pkg/util/slice.go b/pkg/util/slice.go index b80b3b37..c543caf1 100644 --- a/pkg/util/slice.go +++ b/pkg/util/slice.go @@ -65,6 +65,10 @@ func IntSliceToString(number []int, delimiter string) string { return strings.Trim(strings.Replace(fmt.Sprint(number), " ", delimiter, -1), "[]") } +func Int32SliceToString(number []int32, delimiter string) string { + return strings.Trim(strings.Replace(fmt.Sprint(number), " ", delimiter, -1), "[]") +} + func StringToIntSlice(str string, delimiter string) []int { if str == "" { return nil @@ -87,6 +91,28 @@ func StringToIntSlice(str string, delimiter string) []int { return retSlice } +func StringToInt32Slice(str string, delimiter string) []int32 { + if str == "" { + return nil + } + strList := strings.Split(str, delimiter) + if len(strList) == 0 { + return nil + } + var retSlice []int32 + for _, item := range strList { + if item == "" { + continue + } + val, err := strconv.ParseInt(item, 10, 32) + if err != nil { + continue + } + retSlice = append(retSlice, int32(val)) + } + return retSlice +} + func IsSliceEqual(a, b []int) bool { if (a == nil) != (b == nil) { return false diff --git a/pkg/util/slice_test.go b/pkg/util/slice_test.go index ba1bd04f..2705e2a4 100644 --- a/pkg/util/slice_test.go +++ b/pkg/util/slice_test.go @@ -132,6 +132,28 @@ func TestIntSliceToString(t *testing.T) { } } +func TestInt32SliceToString(t *testing.T) { + tests := []struct { + number []int32 + delimiter string + result string + }{ + { + number: []int32{4, 5, 1}, + delimiter: ",", + result: "4,5,1", + }, + } + + for _, test := range tests { + actual := Int32SliceToString(test.number, test.delimiter) + expect := test.result + if expect != actual { + t.Errorf("expect %v but got %v", expect, actual) + } + } +} + func TestStringToIntSlice(t *testing.T) { tests := []struct { str string @@ -156,6 +178,30 @@ func TestStringToIntSlice(t *testing.T) { } } +func TestStringToInt32Slice(t *testing.T) { + tests := []struct { + str string + delimiter string + result []int32 + }{ + { + str: "4,5,1", + delimiter: ",", + result: []int32{4, 5, 1}, + }, + } + + for _, test := range tests { + actual := StringToInt32Slice(test.str, test.delimiter) + expect := test.result + for i := 0; i < len(actual); i++ { + if expect[i] != actual[i] { + t.Errorf("expect %v but got %v", expect, actual) + } + } + } +} + func TestIsSliceEqual(t *testing.T) { tests := []struct { a []int diff --git a/pkg/webhook/mutating_pod.go b/pkg/webhook/mutating_pod.go index 5eaf8195..d93dc9af 100644 --- a/pkg/webhook/mutating_pod.go +++ b/pkg/webhook/mutating_pod.go @@ -19,35 +19,120 @@ package webhook import ( "context" "encoding/json" + "fmt" + "github.com/openkruise/kruise-game/cloudprovider/errors" + "github.com/openkruise/kruise-game/cloudprovider/manager" + admissionv1 "k8s.io/api/admission/v1" corev1 "k8s.io/api/core/v1" + "k8s.io/client-go/tools/record" + "k8s.io/klog/v2" "net/http" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/webhook/admission" + "time" ) +const ( + podMutatingTimeout = 8 * time.Second + mutatingTimeoutReason = "MutatingTimeout" +) + +type patchResult struct { + pod *corev1.Pod + err errors.PluginError +} + type PodMutatingHandler struct { - Client client.Client - decoder *admission.Decoder + Client client.Client + decoder *admission.Decoder + CloudProviderManager *manager.ProviderManager + eventRecorder record.EventRecorder } func (pmh *PodMutatingHandler) Handle(ctx context.Context, req admission.Request) admission.Response { - pod := &corev1.Pod{} - err := pmh.decoder.Decode(req, pod) + // decode request & get pod + pod, err := getPodFromRequest(req, pmh.decoder) if err != nil { - return admission.Errored(http.StatusBadRequest, err) + return admission.Errored(http.StatusInternalServerError, err) + } + + // get the plugin according to pod + plugin, ok := pmh.CloudProviderManager.FindAvailablePlugins(pod) + if !ok { + msg := fmt.Sprintf("Pod %s/%s has no available plugin", pod.Namespace, pod.Name) + return admission.Allowed(msg) + } + + // define context with timeout + ctx, cancel := context.WithTimeout(context.Background(), podMutatingTimeout) + defer cancel() + + // cloud provider plugin patches pod + resultCh := make(chan patchResult, 1) + go func() { + var newPod *corev1.Pod + var pluginError errors.PluginError + switch req.Operation { + case admissionv1.Create: + newPod, pluginError = plugin.OnPodAdded(pmh.Client, pod, ctx) + case admissionv1.Update: + newPod, pluginError = plugin.OnPodUpdated(pmh.Client, pod, ctx) + case admissionv1.Delete: + pluginError = plugin.OnPodDeleted(pmh.Client, pod, ctx) + } + if pluginError != nil { + msg := fmt.Sprintf("Failed to %s pod %s/%s ,because of %s", req.Operation, pod.Namespace, pod.Name, pluginError.Error()) + klog.Warningf(msg) + pmh.eventRecorder.Eventf(pod, corev1.EventTypeWarning, string(pluginError.Type()), msg) + newPod = pod.DeepCopy() + } + resultCh <- patchResult{ + pod: newPod, + err: pluginError, + } + }() + + select { + // timeout + case <-ctx.Done(): + msg := fmt.Sprintf("Failed to %s pod %s/%s, because plugin %s exec timed out", req.Operation, pod.Namespace, pod.Name, plugin.Name()) + pmh.eventRecorder.Eventf(pod, corev1.EventTypeWarning, mutatingTimeoutReason, msg) + return admission.Allowed(msg) + // completed before timeout + case result := <-resultCh: + return getAdmissionResponse(req, result) } +} - pod = mutatingPod(pod, pmh.Client) - // mutate the fields in pod +func getPodFromRequest(req admission.Request, decoder *admission.Decoder) (*corev1.Pod, error) { + pod := &corev1.Pod{} + if req.Operation == admissionv1.Delete { + err := decoder.DecodeRaw(req.OldObject, pod) + return pod, err + } + err := decoder.Decode(req, pod) + return pod, err +} - marshaledPod, err := json.Marshal(pod) +func getAdmissionResponse(req admission.Request, result patchResult) admission.Response { + if result.err != nil { + return admission.Allowed(result.err.Error()) + } + if req.Operation == admissionv1.Delete { + return admission.Allowed("delete successfully") + } + marshaledPod, err := json.Marshal(result.pod) if err != nil { return admission.Errored(http.StatusInternalServerError, err) } return admission.PatchResponseFromRaw(req.Object.Raw, marshaledPod) } -func mutatingPod(pod *corev1.Pod, client client.Client) *corev1.Pod { - - return pod +func NewPodMutatingHandler(client client.Client, decoder *admission.Decoder, cpm *manager.ProviderManager, recorder record.EventRecorder) *PodMutatingHandler { + return &PodMutatingHandler{ + Client: client, + decoder: decoder, + CloudProviderManager: cpm, + eventRecorder: recorder, + } } diff --git a/pkg/webhook/validating_gss.go b/pkg/webhook/validating_gss.go index 9de1b2b2..b74fa127 100644 --- a/pkg/webhook/validating_gss.go +++ b/pkg/webhook/validating_gss.go @@ -21,6 +21,7 @@ import ( "fmt" gamekruiseiov1alpha1 "github.com/openkruise/kruise-game/apis/v1alpha1" "github.com/openkruise/kruise-game/pkg/util" + admissionv1 "k8s.io/api/admission/v1" "net/http" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/webhook/admission" @@ -38,24 +39,42 @@ func (gvh *GssValidaatingHandler) Handle(ctx context.Context, req admission.Requ return admission.Errored(http.StatusBadRequest, err) } - var reason string - allowed, err := validatingGss(gss, gvh.Client) - if err != nil { - reason = err.Error() + if allowed, reason := validatingGss(gss, gvh.Client); !allowed { + admission.ValidationResponse(allowed, reason) + } + + switch req.Operation { + case admissionv1.Update: + newGss := gss.DeepCopy() + oldGss := &gamekruiseiov1alpha1.GameServerSet{} + err := gvh.decoder.DecodeRaw(req.OldObject, oldGss) + if err != nil { + return admission.Errored(http.StatusBadRequest, err) + } + return validatingUpdate(newGss, oldGss) } - return admission.ValidationResponse(allowed, reason) + return admission.ValidationResponse(true, "pass validating") } -func validatingGss(gss *gamekruiseiov1alpha1.GameServerSet, client client.Client) (bool, error) { +func validatingGss(gss *gamekruiseiov1alpha1.GameServerSet, client client.Client) (bool, string) { // validate reserveGameServerIds rgsIds := gss.Spec.ReserveGameServerIds if util.IsRepeat(rgsIds) { - return false, fmt.Errorf("reserveGameServerIds should not be repeat. Now it is %v", rgsIds) + return false, fmt.Sprintf("reserveGameServerIds should not be repeat. Now it is %v", rgsIds) } if util.IsHasNegativeNum(rgsIds) { - return false, fmt.Errorf("reserveGameServerIds should be greater or equal to 0. Now it is %v", rgsIds) + return false, fmt.Sprintf("reserveGameServerIds should be greater or equal to 0. Now it is %v", rgsIds) } - return true, nil + return true, "general validating success" +} + +func validatingUpdate(newGss, oldGss *gamekruiseiov1alpha1.GameServerSet) admission.Response { + if oldGss.Spec.Network != nil && newGss.Spec.Network != nil { + if oldGss.Spec.Network.NetworkType != "" && newGss.Spec.Network.NetworkType != oldGss.Spec.Network.NetworkType { + return admission.ValidationResponse(false, "change network type is not allowed") + } + } + return admission.ValidationResponse(true, "validatingUpdate success") } diff --git a/pkg/webhook/webhook.go b/pkg/webhook/webhook.go index ab06eef3..47e32547 100644 --- a/pkg/webhook/webhook.go +++ b/pkg/webhook/webhook.go @@ -20,6 +20,7 @@ import ( "context" "flag" "fmt" + manager2 "github.com/openkruise/kruise-game/cloudprovider/manager" "github.com/openkruise/kruise-game/pkg/webhook/util/generator" "github.com/openkruise/kruise-game/pkg/webhook/util/writer" admissionregistrationv1 "k8s.io/api/admissionregistration/v1" @@ -60,17 +61,26 @@ func init() { // +kubebuilder:rbac:groups=apps.kruise.io,resources=podprobemarkers,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=core,resources=pods,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=core,resources=pods/status,verbs=get;update;patch +// +kubebuilder:rbac:groups=core,resources=services,verbs=get;list;watch;create;update;patch;delete +// +kubebuilder:rbac:groups=core,resources=services/status,verbs=get;update;patch +// +kubebuilder:rbac:groups=core,resources=nodes,verbs=get;list;watch +// +kubebuilder:rbac:groups=core,resources=nodes/status,verbs=get // +kubebuilder:rbac:groups=admissionregistration.k8s.io,resources=mutatingwebhookconfigurations,verbs=create;get;list;watch;update;patch // +kubebuilder:rbac:groups=admissionregistration.k8s.io,resources=validatingwebhookconfigurations,verbs=create;get;list;watch;update;patch // +kubebuilder:rbac:groups=apiextensions.k8s.io,resources=customresourcedefinitions,verbs=get;list;watch;update;patch +// +kubebuilder:rbac:groups=alibabacloud.com,resources=poddnats,verbs=get;list;watch +// +kubebuilder:rbac:groups=alibabacloud.com,resources=poddnats/status,verbs=get +// +kubebuilder:rbac:groups="",resources=events,verbs=create;patch type Webhook struct { mgr manager.Manager + cpm *manager2.ProviderManager } -func NewWebhookServer(mgr manager.Manager) *Webhook { +func NewWebhookServer(mgr manager.Manager, cpm *manager2.ProviderManager) *Webhook { return &Webhook{ mgr: mgr, + cpm: cpm, } } @@ -83,7 +93,8 @@ func (ws *Webhook) SetupWithManager(mgr manager.Manager) *Webhook { if err != nil { log.Fatalln(err) } - server.Register(mutatePodPath, &webhook.Admission{Handler: &PodMutatingHandler{Client: mgr.GetClient(), decoder: decoder}}) + recorder := mgr.GetEventRecorderFor("kruise-game-webhook") + server.Register(mutatePodPath, &webhook.Admission{Handler: NewPodMutatingHandler(mgr.GetClient(), decoder, ws.cpm, recorder)}) server.Register(validateGssPath, &webhook.Admission{Handler: &GssValidaatingHandler{Client: mgr.GetClient(), decoder: decoder}}) return ws } @@ -152,39 +163,11 @@ func checkMutatingConfiguration(dnsName string, kubeClient clientset.Interface, } func createValidatingWebhook(dnsName string, kubeClient clientset.Interface, caBundle []byte) error { - sideEffectClassNone := admissionregistrationv1.SideEffectClassNone - fail := admissionregistrationv1.Fail - webhookConfig := &admissionregistrationv1.ValidatingWebhookConfiguration{ ObjectMeta: metav1.ObjectMeta{ Name: validatingWebhookConfigurationName, }, - Webhooks: []admissionregistrationv1.ValidatingWebhook{ - { - Name: dnsName, - SideEffects: &sideEffectClassNone, - FailurePolicy: &fail, - AdmissionReviewVersions: []string{"v1"}, - ClientConfig: admissionregistrationv1.WebhookClientConfig{ - Service: &admissionregistrationv1.ServiceReference{ - Namespace: webhookServiceNamespace, - Name: webhookServiceName, - Path: &validateGssPath, - }, - CABundle: caBundle, - }, - Rules: []admissionregistrationv1.RuleWithOperations{ - { - Operations: []admissionregistrationv1.OperationType{admissionregistrationv1.Create, admissionregistrationv1.Update}, - Rule: admissionregistrationv1.Rule{ - APIGroups: []string{"game.kruise.io"}, - APIVersions: []string{"v1alpha1"}, - Resources: []string{"gameserversets"}, - }, - }, - }, - }, - }, + Webhooks: getValidatingWebhookConf(dnsName, caBundle), } if _, err := kubeClient.AdmissionregistrationV1().ValidatingWebhookConfigurations().Create(context.TODO(), webhookConfig, metav1.CreateOptions{}); err != nil { @@ -194,39 +177,11 @@ func createValidatingWebhook(dnsName string, kubeClient clientset.Interface, caB } func createMutatingWebhook(dnsName string, kubeClient clientset.Interface, caBundle []byte) error { - sideEffectClassNone := admissionregistrationv1.SideEffectClassNone - ignore := admissionregistrationv1.Ignore - webhookConfig := &admissionregistrationv1.MutatingWebhookConfiguration{ ObjectMeta: metav1.ObjectMeta{ Name: mutatingWebhookConfigurationName, }, - Webhooks: []admissionregistrationv1.MutatingWebhook{ - { - Name: dnsName, - SideEffects: &sideEffectClassNone, - FailurePolicy: &ignore, - AdmissionReviewVersions: []string{"v1"}, - ClientConfig: admissionregistrationv1.WebhookClientConfig{ - Service: &admissionregistrationv1.ServiceReference{ - Namespace: webhookServiceNamespace, - Name: webhookServiceName, - Path: &mutatePodPath, - }, - CABundle: caBundle, - }, - Rules: []admissionregistrationv1.RuleWithOperations{ - { - Operations: []admissionregistrationv1.OperationType{admissionregistrationv1.Create}, - Rule: admissionregistrationv1.Rule{ - APIGroups: []string{""}, - APIVersions: []string{"v1"}, - Resources: []string{"pods"}, - }, - }, - }, - }, - }, + Webhooks: getMutatingWebhookConf(dnsName, caBundle), } if _, err := kubeClient.AdmissionregistrationV1().MutatingWebhookConfigurations().Create(context.TODO(), webhookConfig, metav1.CreateOptions{}); err != nil { @@ -236,14 +191,7 @@ func createMutatingWebhook(dnsName string, kubeClient clientset.Interface, caBun } func updateValidatingWebhook(vwc *admissionregistrationv1.ValidatingWebhookConfiguration, dnsName string, kubeClient clientset.Interface, caBundle []byte) error { - var mutatingWHs []admissionregistrationv1.ValidatingWebhook - for _, wh := range vwc.Webhooks { - if wh.Name == dnsName { - wh.ClientConfig.CABundle = caBundle - } - mutatingWHs = append(mutatingWHs, wh) - } - vwc.Webhooks = mutatingWHs + vwc.Webhooks = getValidatingWebhookConf(dnsName, caBundle) if _, err := kubeClient.AdmissionregistrationV1().ValidatingWebhookConfigurations().Update(context.TODO(), vwc, metav1.UpdateOptions{}); err != nil { return fmt.Errorf("failed to update %s: %v", validatingWebhookConfigurationName, err) } @@ -251,16 +199,71 @@ func updateValidatingWebhook(vwc *admissionregistrationv1.ValidatingWebhookConfi } func updateMutatingWebhook(mwc *admissionregistrationv1.MutatingWebhookConfiguration, dnsName string, kubeClient clientset.Interface, caBundle []byte) error { - var mutatingWHs []admissionregistrationv1.MutatingWebhook - for _, wh := range mwc.Webhooks { - if wh.Name == dnsName { - wh.ClientConfig.CABundle = caBundle - } - mutatingWHs = append(mutatingWHs, wh) - } - mwc.Webhooks = mutatingWHs + mwc.Webhooks = getMutatingWebhookConf(dnsName, caBundle) if _, err := kubeClient.AdmissionregistrationV1().MutatingWebhookConfigurations().Update(context.TODO(), mwc, metav1.UpdateOptions{}); err != nil { return fmt.Errorf("failed to update %s: %v", mutatingWebhookConfigurationName, err) } return nil } + +func getValidatingWebhookConf(dnsName string, caBundle []byte) []admissionregistrationv1.ValidatingWebhook { + sideEffectClassNone := admissionregistrationv1.SideEffectClassNone + fail := admissionregistrationv1.Fail + return []admissionregistrationv1.ValidatingWebhook{ + { + Name: dnsName, + SideEffects: &sideEffectClassNone, + FailurePolicy: &fail, + AdmissionReviewVersions: []string{"v1", "v1beta1"}, + ClientConfig: admissionregistrationv1.WebhookClientConfig{ + Service: &admissionregistrationv1.ServiceReference{ + Namespace: webhookServiceNamespace, + Name: webhookServiceName, + Path: &validateGssPath, + }, + CABundle: caBundle, + }, + Rules: []admissionregistrationv1.RuleWithOperations{ + { + Operations: []admissionregistrationv1.OperationType{admissionregistrationv1.Create, admissionregistrationv1.Update}, + Rule: admissionregistrationv1.Rule{ + APIGroups: []string{"game.kruise.io"}, + APIVersions: []string{"v1alpha1"}, + Resources: []string{"gameserversets"}, + }, + }, + }, + }, + } +} + +func getMutatingWebhookConf(dnsName string, caBundle []byte) []admissionregistrationv1.MutatingWebhook { + sideEffectClassNone := admissionregistrationv1.SideEffectClassNone + ignore := admissionregistrationv1.Ignore + return []admissionregistrationv1.MutatingWebhook{ + { + Name: dnsName, + SideEffects: &sideEffectClassNone, + FailurePolicy: &ignore, + AdmissionReviewVersions: []string{"v1", "v1beta1"}, + ClientConfig: admissionregistrationv1.WebhookClientConfig{ + Service: &admissionregistrationv1.ServiceReference{ + Namespace: webhookServiceNamespace, + Name: webhookServiceName, + Path: &mutatePodPath, + }, + CABundle: caBundle, + }, + Rules: []admissionregistrationv1.RuleWithOperations{ + { + Operations: []admissionregistrationv1.OperationType{admissionregistrationv1.Create, admissionregistrationv1.Update, admissionregistrationv1.Delete}, + Rule: admissionregistrationv1.Rule{ + APIGroups: []string{""}, + APIVersions: []string{"v1"}, + Resources: []string{"pods"}, + }, + }, + }, + }, + } +}