Skip to content

Commit

Permalink
feat(pool): periodically check IP health (exists in vpc)
Browse files Browse the repository at this point in the history
Check IP health in two conditions:

- Every 10 mins, select an IP from the pool that has not been checked
  for the longest time for checking.
- Check the cooldown IP before adding it to the pool.

The check method is to call VPC using the IP and the current node MAC
address. If the IP exists, it is considered healthy.
  • Loading branch information
fioncat committed Jun 11, 2024
1 parent 495b156 commit 27583f1
Show file tree
Hide file tree
Showing 4 changed files with 257 additions and 168 deletions.
31 changes: 18 additions & 13 deletions cmd/cnivpctl/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,11 +217,13 @@ type PoolRecord struct {
Recycled bool `json:"recycled" yaml:"recycled"`
RecycledTime int64 `json:"recycled_time" yaml:"recycled_time"`

CheckHealthTime int64 `json:"check_health_time" yaml:"check_health_time"`

Cooldown bool `json:"cooldown" yaml:"cooldown"`
}

func (r *PoolRecord) Titles() []string {
return []string{"IP", "RECYCLED", "COOLDOWN", "AGE"}
return []string{"IP", "RECYCLED", "COOLDOWN", "HEALTH_AGE", "AGE"}
}

func (r *PoolRecord) Row() []string {
Expand All @@ -233,6 +235,7 @@ func (r *PoolRecord) Row() []string {
r.IP,
recycled,
fmt.Sprint(r.Cooldown),
getAge(r.CheckHealthTime),
getAge(r.CreateTime),
}
}
Expand Down Expand Up @@ -261,22 +264,24 @@ func ListPool(nodes []*Node) ([]*PoolRecord, error) {
wp.SingleFlight(func() {
for _, ip := range resp.Pool {
records = append(records, &PoolRecord{
IP: ip.VPCIP,
Node: node.Name,
CreateTime: ip.CreateTime,
Recycled: ip.Recycled,
RecycledTime: ip.RecycleTime,
Cooldown: false,
IP: ip.VPCIP,
Node: node.Name,
CreateTime: ip.CreateTime,
Recycled: ip.Recycled,
RecycledTime: ip.RecycleTime,
CheckHealthTime: ip.CheckHealthTime,
Cooldown: false,
})
}
for _, ip := range resp.Cooldown {
records = append(records, &PoolRecord{
IP: ip.VPCIP,
Node: node.Name,
CreateTime: ip.CreateTime,
Recycled: ip.Recycled,
RecycledTime: ip.RecycleTime,
Cooldown: true,
IP: ip.VPCIP,
Node: node.Name,
CreateTime: ip.CreateTime,
Recycled: ip.Recycled,
RecycledTime: ip.RecycleTime,
CheckHealthTime: ip.CheckHealthTime,
Cooldown: true,
})
}
})
Expand Down
74 changes: 74 additions & 0 deletions pkg/ipamd/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ func (s *ipamServer) ipPoolWatermarkManager() {
cooldownTk := time.Tick(time.Second * time.Duration(CooldownPeriodSeconds))
recycleStatusTk := time.Tick(10 * time.Minute)
healthSize := 0
checkIpHealthTk := time.Tick(10 * time.Minute)
for {
select {
case <-tk:
Expand Down Expand Up @@ -279,6 +280,12 @@ func (s *ipamServer) ipPoolWatermarkManager() {
// Recycle the cooldown IP to pool
s.recycleCooldownIP()

case <-checkIpHealthTk:
err := s.checkIPHealth()
if err != nil {
ulog.Errorf("Check ip health error: %v", err)
}

case <-recycleStatusTk:
err := s.recycleStatus()
if err != nil {
Expand Down Expand Up @@ -483,6 +490,21 @@ func (s *ipamServer) recycleCooldownIP() {
return
}

network := kv.Value.Network
exists, err := s.checkSecondaryIpExist(network.VPCIP, s.hostMacAddr)
if err != nil {
ulog.Errorf("Check cooldown ip %v from vpc error: %v", network.VPCIP, err)
err = s.cooldownDB.Put(kv.Key, kv.Value)
if err != nil {
ulog.Errorf("Put ip %v back to cooldown pool after checking health error: %v, it will leak", network.VPCIP, err)
}
return
}
if !exists {
ulog.Warnf("The cooldwon ip %s is not exist in vpc, ignore it", network.VPCIP)
continue
}

if s.putIpToPool(kv.Value.Network) {
ulog.Infof("Recycle cooldown ip %s to pool", kv.Value.Network.VPCIP)
}
Expand Down Expand Up @@ -624,6 +646,58 @@ func (s *ipamServer) usedIP() (map[string]struct{}, error) {
return used, nil
}

func (s *ipamServer) checkIPHealth() error {
kvs, err := s.poolDB.List()
if err != nil {
return err
}

// Let's pick an IP that hasn't been checked for the longest time.
var toCheck *database.KeyValue[rpc.PodNetwork]
for _, kv := range kvs {
if toCheck == nil {
toCheck = kv
continue
}

if kv.Value.CheckHealthTime < toCheck.Value.CheckHealthTime {
toCheck = kv
}
}

if toCheck == nil {
ulog.Infof("No ip in pool, skip checking health")
return nil
}

ulog.Infof("Check health for ip %s", toCheck.Value.VPCIP)
exists, err := s.checkSecondaryIpExist(toCheck.Value.VPCIP, s.hostMacAddr)
if err != nil {
return fmt.Errorf("failed to check ip health for %s, vpc api error: %v", toCheck.Value.VPCIP, err)
}

if !exists {
// This ip has already been removed in VPC, it should not be assigned to Pod.
// So it should be removed from pool immediately.
ulog.Warnf("The ip %s is not exist in vpc, remove it in pool", toCheck.Value.VPCIP)
err = s.poolDB.Delete(toCheck.Key)
if err != nil {
return fmt.Errorf("failed to remove not exists ip %s from pool: %v", toCheck.Value.VPCIP, err)
}

return nil
}

ulog.Infof("The ip %s is exists in VPC", toCheck.Value.VPCIP)

toCheck.Value.CheckHealthTime = time.Now().Unix()
err = s.poolDB.Put(toCheck.Key, toCheck.Value)
if err != nil {
return fmt.Errorf("failed to update check health time for ip %s: %v", toCheck.Value.VPCIP, err)
}
return nil
}

func (s *ipamServer) putIpToPool(ip *rpc.PodNetwork) bool {
err := s.poolDB.Put(getReservedIPKey(ip), ip)
if err != nil {
Expand Down
Loading

0 comments on commit 27583f1

Please sign in to comment.