Skip to content

Commit

Permalink
fix some ip can not allocate after released (#3699)
Browse files Browse the repository at this point in the history
* fix some ip can not allocate after released

---------

Signed-off-by: bobz965 <[email protected]>
  • Loading branch information
zbb88888 authored Feb 6, 2024
1 parent 1794ab8 commit 65bb2b7
Show file tree
Hide file tree
Showing 9 changed files with 118 additions and 21 deletions.
17 changes: 17 additions & 0 deletions pkg/controller/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,13 @@ func (c *Controller) InitIPAM() error {
continue
}

// retrigger pod update to delete pod in case of kube-ovn-controller crashed while pod is deleting
podKey := fmt.Sprintf("%s/%s", pod.Namespace, pod.Name)
if pod.DeletionTimestamp != nil {
klog.Infof("enqueue update for deleting pod %s", podKey)
c.updatePodQueue.Add(podKey)
}

podNets, err := c.getPodKubeovnNets(pod)
if err != nil {
klog.Errorf("failed to get pod kubeovn nets %s.%s address %s: %v", pod.Name, pod.Namespace, pod.Annotations[util.IpAddressAnnotation], err)
Expand Down Expand Up @@ -569,6 +576,11 @@ func (c *Controller) initSyncCrdIPs() error {

for _, ipCr := range ips.Items {
ip := ipCr.DeepCopy()
// retrigger ip update to delete ip in case of kube-ovn-controller crashed while ip is deleting
if ip.DeletionTimestamp != nil && util.ContainsString(ip.Finalizers, util.ControllerName) {
klog.Infof("enqueue update for deleting ip %s", ip.Name)
c.updateIPQueue.Add(ip.Name)
}
changed := false
if _, ok := ipMap[ip.Name]; ok && ip.Spec.PodType == "" {
ip.Spec.PodType = util.Vm
Expand Down Expand Up @@ -611,6 +623,11 @@ func (c *Controller) initSyncCrdSubnets() error {
klog.Errorf("failed to calculate subnet %s used ip: %v", subnet.Name, err)
return err
}
// retrigger subnet update to delete subnet in case of kube-ovn-controller crashed while subnet is deleting
if subnet.DeletionTimestamp != nil {
klog.Infof("enqueue update for deleting subnet %s", subnet.Name)
c.addOrUpdateSubnetQueue.Add(subnet.Name)
}
}
return nil
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/controller/ip.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,9 +248,10 @@ func (c *Controller) handleUpdateIP(key string) error {
}
}
}
if cleanIPAM {
klog.V(3).Infof("release ipam for deleted ip %s from subnet %s", cachedIP.Name, cachedIP.Spec.Subnet)
c.ipam.ReleaseAddressByPod(cachedIP.Name, cachedIP.Spec.Subnet)
if cleanIPAM && cachedIP.Spec.PodName != "" && cachedIP.Spec.Namespace != "" {
podKey := fmt.Sprintf("%s/%s", cachedIP.Spec.Namespace, cachedIP.Spec.PodName)
klog.Infof("ip cr %s release ipam pod key %s from subnet %s", cachedIP.Name, podKey, cachedIP.Spec.Subnet)
c.ipam.ReleaseAddressByPod(podKey, cachedIP.Spec.Subnet)
}
if err = c.handleDelIPFinalizer(cachedIP, util.ControllerName); err != nil {
klog.Errorf("failed to handle del ip finalizer %v", err)
Expand All @@ -261,8 +262,7 @@ func (c *Controller) handleUpdateIP(key string) error {
}

func (c *Controller) handleDelIP(ip *kubeovnv1.IP) error {
klog.V(3).Infof("handle delete ip %s", ip.Name)
klog.V(3).Infof("enqueue update status subnet %s", ip.Spec.Subnet)
klog.Infof("deleting ip %s enqueue update status subnet %s", ip.Name, ip.Spec.Subnet)
c.updateSubnetStatusQueue.Add(ip.Spec.Subnet)
for _, as := range ip.Spec.AttachSubnets {
klog.V(3).Infof("enqueue update attach status for subnet %s", as)
Expand Down
7 changes: 4 additions & 3 deletions pkg/controller/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -1534,6 +1534,7 @@ func appendCheckPodToDel(c *Controller, pod *v1.Pod, ownerRefName, ownerRefKind
klog.Errorf("failed to get namespace %s, %v", pod.Namespace, err)
return false, err
}
key := fmt.Sprintf("%s/%s", pod.Namespace, pod.Name)

// check if subnet exist in OwnerReference
var ownerRefSubnetExist bool
Expand Down Expand Up @@ -1575,7 +1576,7 @@ func appendCheckPodToDel(c *Controller, pod *v1.Pod, ownerRefName, ownerRefKind
nsSubnetNames := podNs.Annotations[util.LogicalSwitchAnnotation]
// check if pod use the subnet of its ns
if nsSubnetNames != "" && pod.Annotations[util.LogicalSwitchAnnotation] != "" && !util.ContainsString(strings.Split(nsSubnetNames, ","), strings.TrimSpace(pod.Annotations[util.LogicalSwitchAnnotation])) {
klog.Infof("ns %s annotation subnet is %s, which is inconstant with subnet for pod %s, delete pod", pod.Namespace, podNs.Annotations[util.LogicalSwitchAnnotation], pod.Name)
klog.Infof("ns %s annotation subnet is %s, which is inconstant with subnet for pod %s, delete pod", pod.Namespace, podNs.Annotations[util.LogicalSwitchAnnotation], key)
return true, nil
}
}
Expand All @@ -1587,12 +1588,12 @@ func appendCheckPodToDel(c *Controller, pod *v1.Pod, ownerRefName, ownerRefKind
return false, err
}
if podSubnet != nil && !util.CIDRContainIP(podSubnet.Spec.CIDRBlock, pod.Annotations[util.IpAddressAnnotation]) {
klog.Infof("pod's ip %s is not in the range of subnet %s, delete pod", pod.Annotations[util.IpAddressAnnotation], podSubnet.Name)
klog.Infof("pod %s ip %s is not in the range of subnet %s, delete pod", key, pod.Annotations[util.IpAddressAnnotation], podSubnet.Name)
return true, nil
}
// subnet of ownerReference(sts/vm) has been changed, it needs to handle delete pod and create port on the new logical switch
if podSubnet != nil && ownerRefSubnet != "" && podSubnet.Name != ownerRefSubnet {
klog.Infof("Subnet of owner %s has been changed from %s to %s, delete pod %s/%s", ownerRefName, podSubnet.Name, ownerRefSubnet, pod.Namespace, pod.Name)
klog.Infof("Subnet of owner %s has been changed from %s to %s, delete pod %s", ownerRefName, podSubnet.Name, ownerRefSubnet, key)
return true, nil
}

Expand Down
68 changes: 56 additions & 12 deletions pkg/controller/subnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,7 @@ func formatSubnet(subnet *kubeovnv1.Subnet, c *Controller) error {

changed, err = checkSubnetChanged(subnet)
if err != nil {
klog.Error(err)
return err
}
if subnet.Spec.Provider == "" {
Expand Down Expand Up @@ -339,14 +340,16 @@ func checkSubnetChanged(subnet *kubeovnv1.Subnet) (bool, error) {
// changed value may be overlapped, so use ret to record value
changed, err = checkAndUpdateCIDR(subnet)
if err != nil {
return changed, err
klog.Error(err)
return false, err
}
if changed {
ret = true
}
changed, err = checkAndUpdateGateway(subnet)
if err != nil {
return changed, err
klog.Error(err)
return false, err
}
if changed {
ret = true
Expand Down Expand Up @@ -483,18 +486,21 @@ func (c *Controller) handleAddOrUpdateSubnet(key string) error {
var err error
c.subnetStatusKeyMutex.Lock(key)
defer c.subnetStatusKeyMutex.Unlock(key)
defer klog.Infof("end handle add or update subnet %s", key)
klog.Infof("start handle add or update subnet %s", key)

cachedSubnet, err := c.subnetsLister.Get(key)
if err != nil {
if k8serrors.IsNotFound(err) {
return nil
}
klog.Errorf("failed to get subnet %s error %v", key, err)
return err
}
klog.V(4).Infof("handle add or update subnet %s", cachedSubnet.Name)

subnet := cachedSubnet.DeepCopy()
if err = formatSubnet(subnet, c); err != nil {
klog.Errorf("failed to format subnet %s, %v", subnet.Name, err)
return err
}

Expand Down Expand Up @@ -548,6 +554,7 @@ func (c *Controller) handleAddOrUpdateSubnet(key string) error {
}

if err := c.ipam.AddOrUpdateSubnet(subnet.Name, subnet.Spec.CIDRBlock, subnet.Spec.Gateway, subnet.Spec.ExcludeIps); err != nil {
klog.Errorf("ipam failed to add or update subnet %s, %v", subnet.Name, err)
return err
}

Expand Down Expand Up @@ -651,6 +658,7 @@ func (c *Controller) handleAddOrUpdateSubnet(key string) error {
subnet.Status.EnsureStandardConditions()
// If multiple namespace use same ls name, only first one will success
if err := c.ovnLegacyClient.CreateLogicalSwitch(subnet.Name, vpc.Status.Router, subnet.Spec.CIDRBlock, subnet.Spec.Gateway, needRouter); err != nil {
klog.Errorf("failed to create logical switch %s, %v", subnet.Name, err)
c.patchSubnetStatus(subnet, "CreateLogicalSwitchFailed", err.Error())
return err
}
Expand All @@ -674,6 +682,7 @@ func (c *Controller) handleAddOrUpdateSubnet(key string) error {
}

if err := c.ovnLegacyClient.SetLogicalSwitchConfig(subnet.Name, vpc.Status.Router, subnet.Spec.Protocol, subnet.Spec.CIDRBlock, gateway, subnet.Spec.ExcludeIps, needRouter); err != nil {
klog.Errorf("failed to set logical switch %s config, %v", subnet.Name, err)
c.patchSubnetStatus(subnet, "SetLogicalSwitchConfigFailed", err.Error())
return err
}
Expand Down Expand Up @@ -704,7 +713,9 @@ func (c *Controller) handleAddOrUpdateSubnet(key string) error {
case util.NetworkTypeStt:
mtu -= util.SttHeaderLength
default:
return fmt.Errorf("invalid network type: %s", c.config.NetworkType)
err = fmt.Errorf("invalid network type: %s", c.config.NetworkType)
klog.Error(err)
return err
}
}
}
Expand Down Expand Up @@ -747,6 +758,7 @@ func (c *Controller) handleAddOrUpdateSubnet(key string) error {
}
if c.config.EnableLb && subnet.Name != c.config.NodeSwitch {
if err := c.ovnLegacyClient.AddLbToLogicalSwitch(subnet.Name, lbs...); err != nil {
klog.Errorf("failed to add lb %v to logical switch %s, %v", lbs, subnet.Name, err)
c.patchSubnetStatus(subnet, "AddLbToLogicalSwitchFailed", err.Error())
return err
}
Expand All @@ -764,19 +776,22 @@ func (c *Controller) handleAddOrUpdateSubnet(key string) error {

if subnet.Spec.Private {
if err := c.ovnLegacyClient.SetPrivateLogicalSwitch(subnet.Name, subnet.Spec.CIDRBlock, subnet.Spec.AllowSubnets); err != nil {
klog.Errorf("failed to set private logical switch %s, %v", subnet.Name, err)
c.patchSubnetStatus(subnet, "SetPrivateLogicalSwitchFailed", err.Error())
return err
}
c.patchSubnetStatus(subnet, "SetPrivateLogicalSwitchSuccess", "")
} else {
if err := c.ovnLegacyClient.ResetLogicalSwitchAcl(subnet.Name); err != nil {
klog.Errorf("failed to reset logical switch acl %s, %v", subnet.Name, err)
c.patchSubnetStatus(subnet, "ResetLogicalSwitchAclFailed", err.Error())
return err
}
c.patchSubnetStatus(subnet, "ResetLogicalSwitchAclSuccess", "")
}

if err := c.ovnLegacyClient.UpdateSubnetACL(subnet.Name, subnet.Spec.Acls); err != nil {
klog.Errorf("failed to update subnet acl %s, %v", subnet.Name, err)
c.patchSubnetStatus(subnet, "SetLogicalSwitchAclsFailed", err.Error())
return err
}
Expand All @@ -788,6 +803,8 @@ func (c *Controller) handleAddOrUpdateSubnet(key string) error {
func (c *Controller) handleUpdateSubnetStatus(key string) error {
c.subnetStatusKeyMutex.Lock(key)
defer c.subnetStatusKeyMutex.Unlock(key)
defer klog.Infof("end handle add or update status for subnet %s", key)
klog.Infof("start handle add or update status for subnet %s", key)

cachedSubnet, err := c.subnetsLister.Get(key)
subnet := cachedSubnet.DeepCopy()
Expand All @@ -798,10 +815,15 @@ func (c *Controller) handleUpdateSubnetStatus(key string) error {
return err
}
if util.CheckProtocol(subnet.Spec.CIDRBlock) == kubeovnv1.ProtocolDual {
return calcDualSubnetStatusIP(subnet, c)
err = calcDualSubnetStatusIP(subnet, c)
} else {
return calcSubnetStatusIP(subnet, c)
err = calcSubnetStatusIP(subnet, c)
}
if err != nil {
klog.Error(err)
return err
}
return nil
}

func (c *Controller) handleDeleteRoute(subnet *kubeovnv1.Subnet) error {
Expand All @@ -810,6 +832,7 @@ func (c *Controller) handleDeleteRoute(subnet *kubeovnv1.Subnet) error {
if k8serrors.IsNotFound(err) {
return nil
}
klog.Errorf("failed to get vpc %s, %v", subnet.Spec.Vpc, err)
return err
}
return c.deleteStaticRoute(subnet.Spec.CIDRBlock, vpc.Status.Router)
Expand Down Expand Up @@ -925,6 +948,7 @@ func (c *Controller) handleDeleteSubnet(subnet *kubeovnv1.Subnet) error {

for _, vlan := range vlans {
if err = c.updateVlanStatusForSubnetDeletion(vlan, subnet.Name); err != nil {
klog.Errorf("failed to update status of vlan %s: %v", vlan.Name, err)
return err
}
}
Expand Down Expand Up @@ -1123,12 +1147,14 @@ func (c *Controller) reconcileOvnRoute(subnet *kubeovnv1.Subnet) error {
for _, pod := range pods {
if pod.Annotations[util.LogicalSwitchAnnotation] == subnet.Name && pod.Annotations[util.IpAddressAnnotation] != "" {
if err := c.deleteStaticRoute(pod.Annotations[util.IpAddressAnnotation], c.config.ClusterRouter); err != nil {
klog.Errorf("failed to delete static route, %v", err)
return err
}
}
}

if err := c.deleteStaticRoute(subnet.Spec.CIDRBlock, c.config.ClusterRouter); err != nil {
klog.Errorf("failed to delete static route, %v", err)
return err
}

Expand Down Expand Up @@ -1183,10 +1209,12 @@ func (c *Controller) reconcileOvnRoute(subnet *kubeovnv1.Subnet) error {
subnet.Status.ActivateGateway = ""
bytes, err := subnet.Status.Bytes()
if err != nil {
klog.Error(err)
return err
}
_, err = c.config.KubeOvnClient.KubeovnV1().Subnets().Patch(context.Background(), subnet.Name, types.MergePatchType, bytes, metav1.PatchOptions{}, "")
if err != nil {
klog.Error(err)
return err
}
}
Expand All @@ -1198,6 +1226,7 @@ func (c *Controller) reconcileOvnRoute(subnet *kubeovnv1.Subnet) error {
}
for _, node := range nodes {
if err = c.createPortGroupForDistributedSubnet(node, subnet); err != nil {
klog.Errorf("failed to create port group for distributed subnet %s, %v", subnet.Name, err)
return err
}
if node.Annotations[util.AllocatedAnnotation] != "true" {
Expand Down Expand Up @@ -1320,9 +1349,14 @@ func (c *Controller) reconcileOvnRoute(subnet *kubeovnv1.Subnet) error {
subnet.Status.NotReady("NoReadyGateway", "")
bytes, err := subnet.Status.Bytes()
if err != nil {
klog.Error(err)
return err
}
_, err = c.config.KubeOvnClient.KubeovnV1().Subnets().Patch(context.Background(), subnet.Name, types.MergePatchType, bytes, metav1.PatchOptions{}, "status")
if err != nil {
klog.Error(err)
return err
}
return err
}

Expand Down Expand Up @@ -1454,6 +1488,9 @@ func (c *Controller) reconcileOvnRoute(subnet *kubeovnv1.Subnet) error {
gw = strings.TrimSpace(gw)
}
node, err := c.nodesLister.Get(gw)
if err != nil {
klog.Errorf("failed to get gw node %s, %v", gw, err)
}
if err == nil && nodeReady(node) {
newActivateNode = node.Name
nodeTunlIPAddr, err = getNodeTunlIP(node)
Expand All @@ -1469,6 +1506,7 @@ func (c *Controller) reconcileOvnRoute(subnet *kubeovnv1.Subnet) error {
subnet.Status.ActivateGateway = newActivateNode
bytes, err := subnet.Status.Bytes()
if err != nil {
klog.Errorf("failed to get subnet %s status: %v", subnet.Name, err)
return err
}
if _, err = c.config.KubeOvnClient.KubeovnV1().Subnets().Patch(context.Background(), subnet.Name, types.MergePatchType, bytes, metav1.PatchOptions{}, "status"); err != nil {
Expand Down Expand Up @@ -1560,7 +1598,6 @@ func (c *Controller) reconcileVlan(subnet *kubeovnv1.Subnet) error {
}

func (c *Controller) reconcileU2OInterconnectionIP(subnet *kubeovnv1.Subnet) error {

needCalcIP := false
klog.Infof("reconcile underlay subnet %s to overlay interconnection with U2OInterconnection %v U2OInterconnectionIP %s ",
subnet.Name, subnet.Spec.U2OInterconnection, subnet.Status.U2OInterconnectionIP)
Expand Down Expand Up @@ -1711,10 +1748,15 @@ func calcDualSubnetStatusIP(subnet *kubeovnv1.Subnet, c *Controller) error {
subnet.Status.V6UsingIPs = usingIPs
bytes, err := subnet.Status.Bytes()
if err != nil {
klog.Errorf("failed to marshal subnet %s status, %v", subnet.Name, err)
return err
}
_, err = c.config.KubeOvnClient.KubeovnV1().Subnets().Patch(context.Background(), subnet.Name, types.MergePatchType, bytes, metav1.PatchOptions{}, "status")
return err
if err != nil {
klog.Errorf("failed to patch subnet %s status, %v", subnet.Name, err)
return err
}
return nil
}

func calcSubnetStatusIP(subnet *kubeovnv1.Subnet, c *Controller) error {
Expand Down Expand Up @@ -1796,10 +1838,15 @@ func calcSubnetStatusIP(subnet *kubeovnv1.Subnet, c *Controller) error {

bytes, err := subnet.Status.Bytes()
if err != nil {
klog.Errorf("failed to marshal subnet %s status, %v", subnet.Name, err)
return err
}
_, err = c.config.KubeOvnClient.KubeovnV1().Subnets().Patch(context.Background(), subnet.Name, types.MergePatchType, bytes, metav1.PatchOptions{}, "status")
return err
if err != nil {
klog.Errorf("failed to patch subnet %s status, %v", subnet.Name, err)
return err
}
return nil
}

func isOvnSubnet(subnet *kubeovnv1.Subnet) bool {
Expand Down Expand Up @@ -2181,7 +2228,6 @@ func (c *Controller) deletePolicyRouteByGatewayType(subnet *kubeovnv1.Subnet, ga
}

func (c *Controller) addPolicyRouteForU2OInterconn(subnet *kubeovnv1.Subnet) error {

var v4Gw, v6Gw string
for _, gw := range strings.Split(subnet.Spec.Gateway, ",") {
switch util.CheckProtocol(gw) {
Expand Down Expand Up @@ -2295,7 +2341,6 @@ func (c *Controller) addPolicyRouteForU2OInterconn(subnet *kubeovnv1.Subnet) err
}

func (c *Controller) deletePolicyRouteForU2OInterconn(subnet *kubeovnv1.Subnet) error {

results, err := c.ovnLegacyClient.CustomFindEntity("Logical_Router_Policy", []string{"_uuid", "match", "priority"},
"external_ids:isU2ORoutePolicy=\"true\"",
fmt.Sprintf("external_ids:vendor=\"%s\"", util.CniTypeName),
Expand Down Expand Up @@ -2347,7 +2392,6 @@ func (c *Controller) addCustomVPCPolicyRoutesForSubnet(subnet *kubeovnv1.Subnet)
}

func (c *Controller) deleteCustomVPCPolicyRoutesForSubnet(subnet *kubeovnv1.Subnet) error {

for _, cidr := range strings.Split(subnet.Spec.CIDRBlock, ",") {
af := 4
if util.CheckProtocol(cidr) == kubeovnv1.ProtocolIPv6 {
Expand Down
2 changes: 2 additions & 0 deletions pkg/controller/vpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,8 @@ func (c *Controller) addLoadBalancer(vpc string) (*VpcLoadBalancer, error) {
}

func (c *Controller) handleAddOrUpdateVpc(key string) error {
klog.Infof("handle add or update vpc %s", key)

// get latest vpc info
cachedVpc, err := c.config.KubeOvnClient.KubeovnV1().Vpcs().Get(context.Background(), key, metav1.GetOptions{})
if err != nil {
Expand Down
Loading

0 comments on commit 65bb2b7

Please sign in to comment.