-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathwatcher.go
247 lines (223 loc) · 7.18 KB
/
watcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
// Copyright Istio Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"os"
"os/signal"
"sync"
"syscall"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
"k8s.io/client-go/rest"
"istio.io/client-go/pkg/apis/networking/v1beta1"
"istio.io/client-go/pkg/clientset/versioned"
versionedclient "istio.io/client-go/pkg/clientset/versioned"
)
type Watcher struct {
istioClient *versioned.Clientset
k8sClient *kubernetes.Clientset
namespace string
Watch watch.Interface
requiredTerminations sync.WaitGroup
sdFileName string
}
func NewWatcher(restConfig *rest.Config) *Watcher {
// istio client
ic, err := versionedclient.NewForConfig(restConfig)
if err != nil {
log.Fatalf("Failed to create istio client: %s", err)
}
// k8s client
k8sClientSet, err := kubernetes.NewForConfig(restConfig)
if err != nil {
log.Fatalf("Failed to create k8s client: %s", err)
}
namespace := "" // get workload from all namespaces
watchWLE, err := ic.NetworkingV1beta1().WorkloadEntries(namespace).Watch(context.TODO(), metav1.ListOptions{})
if err != nil {
log.Fatalf("Failed to get Workload Entry watch: %v", err)
}
w := &Watcher{
istioClient: ic,
k8sClient: k8sClientSet,
namespace: namespace,
Watch: watchWLE,
sdFileName: "staticConfigurations.json",
}
log.Println("workload entry watcher created")
return w
}
// Start the workload entry watcher. It could be stopped with keyboard interrupt
func (w *Watcher) Start(stop <-chan struct{}) {
if w.namespace == "" {
var err error
w.namespace, err = discoverPromNamespace(w.k8sClient)
if err != nil {
log.Fatalf("Failed to find prometheus deployment namespace: %v\n", err)
}
}
go func() {
w.requiredTerminations.Add(1)
for event := range w.Watch.ResultChan() {
// get the static configurations
fileSDConfig, err := w.getOrCreatePromSDConfigMap(w.k8sClient)
if err != nil {
log.Fatalf("get or create config map failed: %v\n", err)
}
var staticConfigurations []map[string][]string
if err := json.Unmarshal([]byte(fileSDConfig.Data[w.sdFileName]), &staticConfigurations); err != nil {
log.Println("static configuration json generation failed")
}
staticConfigurations = dedupConfig(staticConfigurations)
// handle events from the workload entries watch
wle, ok := event.Object.(*v1beta1.WorkloadEntry)
if !ok {
log.Print("unexpected type")
}
switch event.Type {
case watch.Deleted:
log.Printf("handle deleted workload %s", wle.Spec.Address)
toDelete := 0
outer:
for i, target := range staticConfigurations {
for _, ip := range target["targets"] {
if ip == wle.Spec.Address {
toDelete = i
break outer
}
}
}
staticConfigurations = append(staticConfigurations[:toDelete], staticConfigurations[toDelete+1:]...)
log.Printf("Deleted VM workload %s\n", wle.ObjectMeta.Name)
default: // add or update
newTargetAddr := fmt.Sprintf("%s:15020", wle.Spec.Address)
// Remove duplicates from the node IPs.
existsDupEP := isDuplicate(staticConfigurations, newTargetAddr)
if !existsDupEP {
log.Printf("handle update workload %s", wle.Spec.Address)
newTarget := make(map[string][]string)
newTarget["targets"] = append(newTarget["targets"], newTargetAddr)
staticConfigurations = append(staticConfigurations, newTarget)
log.Printf("Registered VM workload %s \n", wle.ObjectMeta.Name)
break
}
log.Printf("VM workload %s exists\n", wle.ObjectMeta.Name)
}
// assign the updated static configurations to the config map
marshaledString, err := json.Marshal(staticConfigurations)
if err != nil {
log.Printf("update static configuration json failed: %v", err)
}
fileSDConfig.Data[w.sdFileName] = string(marshaledString)
if err := updatePromSDConfigMap(w.k8sClient, fileSDConfig, w.namespace); err != nil {
log.Printf("update config map failed: %v\n", err)
}
}
w.requiredTerminations.Done()
}()
w.waitForShutdown(stop)
}
func (w *Watcher) waitForShutdown(stop <-chan struct{}) {
go func() {
<-stop
w.Watch.Stop()
w.requiredTerminations.Wait()
}()
}
// get or create ConfigMap from a namespace with Prometheus deployment
func (w *Watcher) getOrCreatePromSDConfigMap(client *kubernetes.Clientset) (*v1.ConfigMap, error) {
configMap, err := client.CoreV1().ConfigMaps(w.namespace).
Get(context.TODO(), "file-sd-config", metav1.GetOptions{})
if err == nil {
// config map exists, return directly
return configMap, nil
}
cfg := &v1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: "file-sd-config",
},
Data: make(map[string]string),
}
cfg.Data[w.sdFileName] = ""
if configMap, err = client.CoreV1().ConfigMaps(w.namespace).Create(context.TODO(), cfg,
metav1.CreateOptions{}); err != nil {
return nil, err
}
return configMap, nil
}
func discoverPromNamespace(client *kubernetes.Clientset) (string, error) {
label := "prometheus"
labelSelector := metav1.LabelSelector{MatchLabels: map[string]string{"app": label}}
listOptions := metav1.ListOptions{
LabelSelector: labels.Set(labelSelector.MatchLabels).String(),
}
podsList, err := client.CoreV1().Pods("").List(context.TODO(), listOptions)
if err != nil {
return "", err
}
promNamespace := podsList.Items[0].Namespace
log.Printf("discover prometheus deployment in namespace %s\n", promNamespace)
return promNamespace, nil
}
// WaitSignal awaits for SIGINT or SIGTERM and closes the channel
func (w *Watcher) WaitSignal(stop chan struct{}) {
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
<-sigs
close(stop)
}
func updatePromSDConfigMap(client *kubernetes.Clientset, fileSDConfig *v1.ConfigMap, ns string) error {
// Write the update config map back to cluster
if _, err := client.CoreV1().ConfigMaps(ns).Update(context.TODO(), fileSDConfig,
metav1.UpdateOptions{}); err != nil {
return err
}
return nil
}
func isDuplicate(existing []map[string][]string, newTarget string) bool {
for _, target := range existing {
for _, ip := range target["targets"] {
if ip == newTarget {
return true
}
}
}
return false
}
func dedupConfig(values []map[string][]string) []map[string][]string {
set := make(map[string]bool)
var config []map[string][]string
for _, target := range values {
var flag bool
for _, ip := range target["targets"] {
if _, v := set[ip]; !v {
set[ip] = true
continue
}
flag = true
}
if !flag {
config = append(config, target)
}
}
return config
}