Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix some ip can not allocate after released #3699

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions pkg/controller/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,13 @@ func (c *Controller) InitIPAM() error {
continue
}

// retrigger pod update to delete pod in case of kube-ovn-controller crashed while pod is deleting
podKey := fmt.Sprintf("%s/%s", pod.Namespace, pod.Name)
if pod.DeletionTimestamp != nil {
klog.Infof("enqueue update for deleting pod %s", podKey)
c.updatePodQueue.Add(podKey)
}

podNets, err := c.getPodKubeovnNets(pod)
if err != nil {
klog.Errorf("failed to get pod kubeovn nets %s.%s address %s: %v", pod.Name, pod.Namespace, pod.Annotations[util.IpAddressAnnotation], err)
Expand Down Expand Up @@ -569,6 +576,11 @@ func (c *Controller) initSyncCrdIPs() error {

for _, ipCr := range ips.Items {
ip := ipCr.DeepCopy()
// retrigger ip update to delete ip in case of kube-ovn-controller crashed while ip is deleting
if ip.DeletionTimestamp != nil && util.ContainsString(ip.Finalizers, util.ControllerName) {
klog.Infof("enqueue update for deleting ip %s", ip.Name)
c.updateIPQueue.Add(ip.Name)
}
changed := false
if _, ok := ipMap[ip.Name]; ok && ip.Spec.PodType == "" {
ip.Spec.PodType = util.Vm
Expand Down Expand Up @@ -611,6 +623,11 @@ func (c *Controller) initSyncCrdSubnets() error {
klog.Errorf("failed to calculate subnet %s used ip: %v", subnet.Name, err)
return err
}
// retrigger subnet update to delete subnet in case of kube-ovn-controller crashed while subnet is deleting
if subnet.DeletionTimestamp != nil {
klog.Infof("enqueue update for deleting subnet %s", subnet.Name)
c.addOrUpdateSubnetQueue.Add(subnet.Name)
}
}
return nil
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/controller/ip.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,9 +248,10 @@ func (c *Controller) handleUpdateIP(key string) error {
}
}
}
if cleanIPAM {
klog.V(3).Infof("release ipam for deleted ip %s from subnet %s", cachedIP.Name, cachedIP.Spec.Subnet)
c.ipam.ReleaseAddressByPod(cachedIP.Name, cachedIP.Spec.Subnet)
if cleanIPAM && cachedIP.Spec.PodName != "" && cachedIP.Spec.Namespace != "" {
podKey := fmt.Sprintf("%s/%s", cachedIP.Spec.Namespace, cachedIP.Spec.PodName)
klog.Infof("ip cr %s release ipam pod key %s from subnet %s", cachedIP.Name, podKey, cachedIP.Spec.Subnet)
c.ipam.ReleaseAddressByPod(podKey, cachedIP.Spec.Subnet)
}
if err = c.handleDelIPFinalizer(cachedIP, util.ControllerName); err != nil {
klog.Errorf("failed to handle del ip finalizer %v", err)
Expand All @@ -261,8 +262,7 @@ func (c *Controller) handleUpdateIP(key string) error {
}

func (c *Controller) handleDelIP(ip *kubeovnv1.IP) error {
klog.V(3).Infof("handle delete ip %s", ip.Name)
klog.V(3).Infof("enqueue update status subnet %s", ip.Spec.Subnet)
klog.Infof("deleting ip %s enqueue update status subnet %s", ip.Name, ip.Spec.Subnet)
c.updateSubnetStatusQueue.Add(ip.Spec.Subnet)
for _, as := range ip.Spec.AttachSubnets {
klog.V(3).Infof("enqueue update attach status for subnet %s", as)
Expand Down
7 changes: 4 additions & 3 deletions pkg/controller/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -1534,6 +1534,7 @@ func appendCheckPodToDel(c *Controller, pod *v1.Pod, ownerRefName, ownerRefKind
klog.Errorf("failed to get namespace %s, %v", pod.Namespace, err)
return false, err
}
key := fmt.Sprintf("%s/%s", pod.Namespace, pod.Name)

// check if subnet exist in OwnerReference
var ownerRefSubnetExist bool
Expand Down Expand Up @@ -1575,7 +1576,7 @@ func appendCheckPodToDel(c *Controller, pod *v1.Pod, ownerRefName, ownerRefKind
nsSubnetNames := podNs.Annotations[util.LogicalSwitchAnnotation]
// check if pod use the subnet of its ns
if nsSubnetNames != "" && pod.Annotations[util.LogicalSwitchAnnotation] != "" && !util.ContainsString(strings.Split(nsSubnetNames, ","), strings.TrimSpace(pod.Annotations[util.LogicalSwitchAnnotation])) {
klog.Infof("ns %s annotation subnet is %s, which is inconstant with subnet for pod %s, delete pod", pod.Namespace, podNs.Annotations[util.LogicalSwitchAnnotation], pod.Name)
klog.Infof("ns %s annotation subnet is %s, which is inconstant with subnet for pod %s, delete pod", pod.Namespace, podNs.Annotations[util.LogicalSwitchAnnotation], key)
return true, nil
}
}
Expand All @@ -1587,12 +1588,12 @@ func appendCheckPodToDel(c *Controller, pod *v1.Pod, ownerRefName, ownerRefKind
return false, err
}
if podSubnet != nil && !util.CIDRContainIP(podSubnet.Spec.CIDRBlock, pod.Annotations[util.IpAddressAnnotation]) {
klog.Infof("pod's ip %s is not in the range of subnet %s, delete pod", pod.Annotations[util.IpAddressAnnotation], podSubnet.Name)
klog.Infof("pod %s ip %s is not in the range of subnet %s, delete pod", key, pod.Annotations[util.IpAddressAnnotation], podSubnet.Name)
return true, nil
}
// subnet of ownerReference(sts/vm) has been changed, it needs to handle delete pod and create port on the new logical switch
if podSubnet != nil && ownerRefSubnet != "" && podSubnet.Name != ownerRefSubnet {
klog.Infof("Subnet of owner %s has been changed from %s to %s, delete pod %s/%s", ownerRefName, podSubnet.Name, ownerRefSubnet, pod.Namespace, pod.Name)
klog.Infof("Subnet of owner %s has been changed from %s to %s, delete pod %s", ownerRefName, podSubnet.Name, ownerRefSubnet, key)
return true, nil
}

Expand Down
68 changes: 56 additions & 12 deletions pkg/controller/subnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,7 @@ func formatSubnet(subnet *kubeovnv1.Subnet, c *Controller) error {

changed, err = checkSubnetChanged(subnet)
if err != nil {
klog.Error(err)
return err
}
if subnet.Spec.Provider == "" {
Expand Down Expand Up @@ -339,14 +340,16 @@ func checkSubnetChanged(subnet *kubeovnv1.Subnet) (bool, error) {
// changed value may be overlapped, so use ret to record value
changed, err = checkAndUpdateCIDR(subnet)
if err != nil {
return changed, err
klog.Error(err)
return false, err
}
if changed {
ret = true
}
changed, err = checkAndUpdateGateway(subnet)
if err != nil {
return changed, err
klog.Error(err)
return false, err
}
if changed {
ret = true
Expand Down Expand Up @@ -483,18 +486,21 @@ func (c *Controller) handleAddOrUpdateSubnet(key string) error {
var err error
c.subnetStatusKeyMutex.Lock(key)
defer c.subnetStatusKeyMutex.Unlock(key)
defer klog.Infof("end handle add or update subnet %s", key)
klog.Infof("start handle add or update subnet %s", key)

cachedSubnet, err := c.subnetsLister.Get(key)
if err != nil {
if k8serrors.IsNotFound(err) {
return nil
}
klog.Errorf("failed to get subnet %s error %v", key, err)
return err
}
klog.V(4).Infof("handle add or update subnet %s", cachedSubnet.Name)

subnet := cachedSubnet.DeepCopy()
if err = formatSubnet(subnet, c); err != nil {
klog.Errorf("failed to format subnet %s, %v", subnet.Name, err)
return err
}

Expand Down Expand Up @@ -548,6 +554,7 @@ func (c *Controller) handleAddOrUpdateSubnet(key string) error {
}

if err := c.ipam.AddOrUpdateSubnet(subnet.Name, subnet.Spec.CIDRBlock, subnet.Spec.Gateway, subnet.Spec.ExcludeIps); err != nil {
klog.Errorf("ipam failed to add or update subnet %s, %v", subnet.Name, err)
return err
}

Expand Down Expand Up @@ -651,6 +658,7 @@ func (c *Controller) handleAddOrUpdateSubnet(key string) error {
subnet.Status.EnsureStandardConditions()
// If multiple namespace use same ls name, only first one will success
if err := c.ovnLegacyClient.CreateLogicalSwitch(subnet.Name, vpc.Status.Router, subnet.Spec.CIDRBlock, subnet.Spec.Gateway, needRouter); err != nil {
klog.Errorf("failed to create logical switch %s, %v", subnet.Name, err)
c.patchSubnetStatus(subnet, "CreateLogicalSwitchFailed", err.Error())
return err
}
Expand All @@ -674,6 +682,7 @@ func (c *Controller) handleAddOrUpdateSubnet(key string) error {
}

if err := c.ovnLegacyClient.SetLogicalSwitchConfig(subnet.Name, vpc.Status.Router, subnet.Spec.Protocol, subnet.Spec.CIDRBlock, gateway, subnet.Spec.ExcludeIps, needRouter); err != nil {
klog.Errorf("failed to set logical switch %s config, %v", subnet.Name, err)
c.patchSubnetStatus(subnet, "SetLogicalSwitchConfigFailed", err.Error())
return err
}
Expand Down Expand Up @@ -704,7 +713,9 @@ func (c *Controller) handleAddOrUpdateSubnet(key string) error {
case util.NetworkTypeStt:
mtu -= util.SttHeaderLength
default:
return fmt.Errorf("invalid network type: %s", c.config.NetworkType)
err = fmt.Errorf("invalid network type: %s", c.config.NetworkType)
klog.Error(err)
return err
}
}
}
Expand Down Expand Up @@ -747,6 +758,7 @@ func (c *Controller) handleAddOrUpdateSubnet(key string) error {
}
if c.config.EnableLb && subnet.Name != c.config.NodeSwitch {
if err := c.ovnLegacyClient.AddLbToLogicalSwitch(subnet.Name, lbs...); err != nil {
klog.Errorf("failed to add lb %v to logical switch %s, %v", lbs, subnet.Name, err)
c.patchSubnetStatus(subnet, "AddLbToLogicalSwitchFailed", err.Error())
return err
}
Expand All @@ -764,19 +776,22 @@ func (c *Controller) handleAddOrUpdateSubnet(key string) error {

if subnet.Spec.Private {
if err := c.ovnLegacyClient.SetPrivateLogicalSwitch(subnet.Name, subnet.Spec.CIDRBlock, subnet.Spec.AllowSubnets); err != nil {
klog.Errorf("failed to set private logical switch %s, %v", subnet.Name, err)
c.patchSubnetStatus(subnet, "SetPrivateLogicalSwitchFailed", err.Error())
return err
}
c.patchSubnetStatus(subnet, "SetPrivateLogicalSwitchSuccess", "")
} else {
if err := c.ovnLegacyClient.ResetLogicalSwitchAcl(subnet.Name); err != nil {
klog.Errorf("failed to reset logical switch acl %s, %v", subnet.Name, err)
c.patchSubnetStatus(subnet, "ResetLogicalSwitchAclFailed", err.Error())
return err
}
c.patchSubnetStatus(subnet, "ResetLogicalSwitchAclSuccess", "")
}

if err := c.ovnLegacyClient.UpdateSubnetACL(subnet.Name, subnet.Spec.Acls); err != nil {
klog.Errorf("failed to update subnet acl %s, %v", subnet.Name, err)
c.patchSubnetStatus(subnet, "SetLogicalSwitchAclsFailed", err.Error())
return err
}
Expand All @@ -788,6 +803,8 @@ func (c *Controller) handleAddOrUpdateSubnet(key string) error {
func (c *Controller) handleUpdateSubnetStatus(key string) error {
c.subnetStatusKeyMutex.Lock(key)
defer c.subnetStatusKeyMutex.Unlock(key)
defer klog.Infof("end handle add or update status for subnet %s", key)
klog.Infof("start handle add or update status for subnet %s", key)

cachedSubnet, err := c.subnetsLister.Get(key)
subnet := cachedSubnet.DeepCopy()
Expand All @@ -798,10 +815,15 @@ func (c *Controller) handleUpdateSubnetStatus(key string) error {
return err
}
if util.CheckProtocol(subnet.Spec.CIDRBlock) == kubeovnv1.ProtocolDual {
return calcDualSubnetStatusIP(subnet, c)
err = calcDualSubnetStatusIP(subnet, c)
} else {
return calcSubnetStatusIP(subnet, c)
err = calcSubnetStatusIP(subnet, c)
}
if err != nil {
klog.Error(err)
return err
}
return nil
}

func (c *Controller) handleDeleteRoute(subnet *kubeovnv1.Subnet) error {
Expand All @@ -810,6 +832,7 @@ func (c *Controller) handleDeleteRoute(subnet *kubeovnv1.Subnet) error {
if k8serrors.IsNotFound(err) {
return nil
}
klog.Errorf("failed to get vpc %s, %v", subnet.Spec.Vpc, err)
return err
}
return c.deleteStaticRoute(subnet.Spec.CIDRBlock, vpc.Status.Router)
Expand Down Expand Up @@ -925,6 +948,7 @@ func (c *Controller) handleDeleteSubnet(subnet *kubeovnv1.Subnet) error {

for _, vlan := range vlans {
if err = c.updateVlanStatusForSubnetDeletion(vlan, subnet.Name); err != nil {
klog.Errorf("failed to update status of vlan %s: %v", vlan.Name, err)
return err
}
}
Expand Down Expand Up @@ -1123,12 +1147,14 @@ func (c *Controller) reconcileOvnRoute(subnet *kubeovnv1.Subnet) error {
for _, pod := range pods {
if pod.Annotations[util.LogicalSwitchAnnotation] == subnet.Name && pod.Annotations[util.IpAddressAnnotation] != "" {
if err := c.deleteStaticRoute(pod.Annotations[util.IpAddressAnnotation], c.config.ClusterRouter); err != nil {
klog.Errorf("failed to delete static route, %v", err)
return err
}
}
}

if err := c.deleteStaticRoute(subnet.Spec.CIDRBlock, c.config.ClusterRouter); err != nil {
klog.Errorf("failed to delete static route, %v", err)
return err
}

Expand Down Expand Up @@ -1183,10 +1209,12 @@ func (c *Controller) reconcileOvnRoute(subnet *kubeovnv1.Subnet) error {
subnet.Status.ActivateGateway = ""
bytes, err := subnet.Status.Bytes()
if err != nil {
klog.Error(err)
return err
}
_, err = c.config.KubeOvnClient.KubeovnV1().Subnets().Patch(context.Background(), subnet.Name, types.MergePatchType, bytes, metav1.PatchOptions{}, "")
if err != nil {
klog.Error(err)
return err
}
}
Expand All @@ -1198,6 +1226,7 @@ func (c *Controller) reconcileOvnRoute(subnet *kubeovnv1.Subnet) error {
}
for _, node := range nodes {
if err = c.createPortGroupForDistributedSubnet(node, subnet); err != nil {
klog.Errorf("failed to create port group for distributed subnet %s, %v", subnet.Name, err)
return err
}
if node.Annotations[util.AllocatedAnnotation] != "true" {
Expand Down Expand Up @@ -1320,9 +1349,14 @@ func (c *Controller) reconcileOvnRoute(subnet *kubeovnv1.Subnet) error {
subnet.Status.NotReady("NoReadyGateway", "")
bytes, err := subnet.Status.Bytes()
if err != nil {
klog.Error(err)
return err
}
_, err = c.config.KubeOvnClient.KubeovnV1().Subnets().Patch(context.Background(), subnet.Name, types.MergePatchType, bytes, metav1.PatchOptions{}, "status")
if err != nil {
klog.Error(err)
return err
}
return err
}

Expand Down Expand Up @@ -1454,6 +1488,9 @@ func (c *Controller) reconcileOvnRoute(subnet *kubeovnv1.Subnet) error {
gw = strings.TrimSpace(gw)
}
node, err := c.nodesLister.Get(gw)
if err != nil {
klog.Errorf("failed to get gw node %s, %v", gw, err)
}
if err == nil && nodeReady(node) {
newActivateNode = node.Name
nodeTunlIPAddr, err = getNodeTunlIP(node)
Expand All @@ -1469,6 +1506,7 @@ func (c *Controller) reconcileOvnRoute(subnet *kubeovnv1.Subnet) error {
subnet.Status.ActivateGateway = newActivateNode
bytes, err := subnet.Status.Bytes()
if err != nil {
klog.Errorf("failed to get subnet %s status: %v", subnet.Name, err)
return err
}
if _, err = c.config.KubeOvnClient.KubeovnV1().Subnets().Patch(context.Background(), subnet.Name, types.MergePatchType, bytes, metav1.PatchOptions{}, "status"); err != nil {
Expand Down Expand Up @@ -1560,7 +1598,6 @@ func (c *Controller) reconcileVlan(subnet *kubeovnv1.Subnet) error {
}

func (c *Controller) reconcileU2OInterconnectionIP(subnet *kubeovnv1.Subnet) error {

needCalcIP := false
klog.Infof("reconcile underlay subnet %s to overlay interconnection with U2OInterconnection %v U2OInterconnectionIP %s ",
subnet.Name, subnet.Spec.U2OInterconnection, subnet.Status.U2OInterconnectionIP)
Expand Down Expand Up @@ -1711,10 +1748,15 @@ func calcDualSubnetStatusIP(subnet *kubeovnv1.Subnet, c *Controller) error {
subnet.Status.V6UsingIPs = usingIPs
bytes, err := subnet.Status.Bytes()
if err != nil {
klog.Errorf("failed to marshal subnet %s status, %v", subnet.Name, err)
return err
}
_, err = c.config.KubeOvnClient.KubeovnV1().Subnets().Patch(context.Background(), subnet.Name, types.MergePatchType, bytes, metav1.PatchOptions{}, "status")
return err
if err != nil {
klog.Errorf("failed to patch subnet %s status, %v", subnet.Name, err)
return err
}
return nil
}

func calcSubnetStatusIP(subnet *kubeovnv1.Subnet, c *Controller) error {
Expand Down Expand Up @@ -1796,10 +1838,15 @@ func calcSubnetStatusIP(subnet *kubeovnv1.Subnet, c *Controller) error {

bytes, err := subnet.Status.Bytes()
if err != nil {
klog.Errorf("failed to marshal subnet %s status, %v", subnet.Name, err)
return err
}
_, err = c.config.KubeOvnClient.KubeovnV1().Subnets().Patch(context.Background(), subnet.Name, types.MergePatchType, bytes, metav1.PatchOptions{}, "status")
return err
if err != nil {
klog.Errorf("failed to patch subnet %s status, %v", subnet.Name, err)
return err
}
return nil
}

func isOvnSubnet(subnet *kubeovnv1.Subnet) bool {
Expand Down Expand Up @@ -2181,7 +2228,6 @@ func (c *Controller) deletePolicyRouteByGatewayType(subnet *kubeovnv1.Subnet, ga
}

func (c *Controller) addPolicyRouteForU2OInterconn(subnet *kubeovnv1.Subnet) error {

var v4Gw, v6Gw string
for _, gw := range strings.Split(subnet.Spec.Gateway, ",") {
switch util.CheckProtocol(gw) {
Expand Down Expand Up @@ -2295,7 +2341,6 @@ func (c *Controller) addPolicyRouteForU2OInterconn(subnet *kubeovnv1.Subnet) err
}

func (c *Controller) deletePolicyRouteForU2OInterconn(subnet *kubeovnv1.Subnet) error {

results, err := c.ovnLegacyClient.CustomFindEntity("Logical_Router_Policy", []string{"_uuid", "match", "priority"},
"external_ids:isU2ORoutePolicy=\"true\"",
fmt.Sprintf("external_ids:vendor=\"%s\"", util.CniTypeName),
Expand Down Expand Up @@ -2347,7 +2392,6 @@ func (c *Controller) addCustomVPCPolicyRoutesForSubnet(subnet *kubeovnv1.Subnet)
}

func (c *Controller) deleteCustomVPCPolicyRoutesForSubnet(subnet *kubeovnv1.Subnet) error {

for _, cidr := range strings.Split(subnet.Spec.CIDRBlock, ",") {
af := 4
if util.CheckProtocol(cidr) == kubeovnv1.ProtocolIPv6 {
Expand Down
2 changes: 2 additions & 0 deletions pkg/controller/vpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,8 @@ func (c *Controller) addLoadBalancer(vpc string) (*VpcLoadBalancer, error) {
}

func (c *Controller) handleAddOrUpdateVpc(key string) error {
klog.Infof("handle add or update vpc %s", key)

// get latest vpc info
cachedVpc, err := c.config.KubeOvnClient.KubeovnV1().Vpcs().Get(context.Background(), key, metav1.GetOptions{})
if err != nil {
Expand Down
Loading
Loading