Skip to content

Commit

Permalink
Optimize AWS TargetGroup watch logic
Browse files Browse the repository at this point in the history
Signed-off-by: clarklee92 <[email protected]>
  • Loading branch information
clarklee92 committed Jun 4, 2024
1 parent 67ef94e commit 66e0570
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 35 deletions.
7 changes: 0 additions & 7 deletions cloudprovider/amazonswebservices/amazonswebservices.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ limitations under the License.
package amazonswebservices

import (
"context"

log "k8s.io/klog/v2"

"github.com/openkruise/kruise-game/cloudprovider"
Expand Down Expand Up @@ -60,10 +58,5 @@ func (ap *Provider) registerPlugin(plugin cloudprovider.Plugin) {
}

func NewAmazonsWebServicesProvider() (cloudprovider.CloudProvider, error) {
err := startWatchTargetGroup(context.Background())
if err != nil {
return nil, err
}
log.Info("start to watch TargetGroups successfully")
return amazonsWebServicesProvider, nil
}
75 changes: 47 additions & 28 deletions cloudprovider/amazonswebservices/nlb.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"strconv"
"strings"
"sync"
"time"

ackv1alpha1 "github.com/aws-controllers-k8s/elbv2-controller/apis/v1alpha1"
"github.com/kr/pretty"
Expand Down Expand Up @@ -131,6 +132,7 @@ func (n *NlbPlugin) Alias() string {
func (n *NlbPlugin) Init(c client.Client, options cloudprovider.CloudProviderOptions, ctx context.Context) error {
n.mutex.Lock()
defer n.mutex.Unlock()
startWatchTargetGroup(ctx)
nlbOptions, ok := options.(provideroptions.AmazonsWebServicesOptions)
if !ok {
return cperrors.ToPluginError(fmt.Errorf("failed to convert options to nlbOptions"), cperrors.InternalError)
Expand Down Expand Up @@ -562,7 +564,22 @@ func getACKTargetGroupARN(tg *ackv1alpha1.TargetGroup) (string, error) {
}
}

func startWatchTargetGroup(ctx context.Context) error {
func startWatchTargetGroup(ctx context.Context) {
go func() {
for {
if err := watchTargetGroup(ctx); err != nil {
log.Errorf("error watching TargetGroup: %v", err)
}
select {
case <-ctx.Done():
return
case <-time.After(time.Second * 3): // Retry after a delay
}
}
}()
}

func watchTargetGroup(ctx context.Context) error {
kubeConfig, err := config.GetConfig()
if err != nil {
return fmt.Errorf("failed to get Kubernetes config: %v", err)
Expand All @@ -573,42 +590,44 @@ func startWatchTargetGroup(ctx context.Context) error {
}
utilruntime.Must(ackv1alpha1.AddToScheme(cw.Scheme()))
utilruntime.Must(elbv2api.AddToScheme(cw.Scheme()))
watcher, err := cw.Watch(ctx, &ackv1alpha1.TargetGroupList{})
watcher, err := cw.Watch(ctx, &ackv1alpha1.TargetGroupList{},
client.MatchingLabels{ResourceTagKey: ResourceTagValue})
if err != nil {
return fmt.Errorf("failed to watch TargetGroup: %v", err)
}
go func() {
for event := range watcher.ResultChan() {
if event.Type == watch.Modified || event.Type == watch.Added {
targetGroup, ok := event.Object.(*ackv1alpha1.TargetGroup)
if !ok {
log.Warning("Failed to convert event.Object to TargetGroup")
defer watcher.Stop()
log.Info("Start to watch TargetGroups successfully")
for event := range watcher.ResultChan() {
if event.Type == watch.Modified || event.Type == watch.Added {
targetGroup, ok := event.Object.(*ackv1alpha1.TargetGroup)
if !ok {
log.Warning("Failed to convert event.Object to TargetGroup")
continue
}
if targetGroup.Labels[AWSTargetGroupSyncStatus] == "false" {
targetGroupARN, err := getACKTargetGroupARN(targetGroup)
if err != nil {
continue
}
if targetGroup.Labels[AWSTargetGroupSyncStatus] == "false" {
targetGroupARN, err := getACKTargetGroupARN(targetGroup)
if err != nil {
continue
}
log.Infof("targetGroup sync request watched, start to sync %s/%s, ARN: %s",
targetGroup.GetNamespace(), targetGroup.GetName(), targetGroupARN)
err = syncListenerAndTargetGroupBinding(ctx, cw, targetGroup, &targetGroupARN)
if err != nil {
log.Errorf("syncListenerAndTargetGroupBinding by targetGroup %s error %v",
pretty.Sprint(targetGroup), err)
}
log.Infof("targetGroup sync request watched, start to sync %s/%s, ARN: %s",
targetGroup.GetNamespace(), targetGroup.GetName(), targetGroupARN)
err = syncListenerAndTargetGroupBinding(ctx, cw, targetGroup, &targetGroupARN)
if err != nil {
log.Errorf("syncListenerAndTargetGroupBinding by targetGroup %s error %v",
pretty.Sprint(targetGroup), err)
}

patch := client.RawPatch(types.MergePatchType,
[]byte(fmt.Sprintf(`{"metadata":{"labels":{"%s":"true"}}}`, AWSTargetGroupSyncStatus)))
err = cw.Patch(ctx, targetGroup, patch)
if err != nil {
log.Warningf("patch targetGroup %s %s error %v",
pretty.Sprint(targetGroup), AWSTargetGroupSyncStatus, err)
}
patch := client.RawPatch(types.MergePatchType,
[]byte(fmt.Sprintf(`{"metadata":{"labels":{"%s":"true"}}}`, AWSTargetGroupSyncStatus)))
err = cw.Patch(ctx, targetGroup, patch)
if err != nil {
log.Warningf("patch targetGroup %s %s error %v",
pretty.Sprint(targetGroup), AWSTargetGroupSyncStatus, err)
}
}
}
}()
}
log.Info("TargetGroups watcher channel closed, restarting watcher...")
return nil
}

Expand Down

0 comments on commit 66e0570

Please sign in to comment.