This repository has been archived by the owner on Nov 18, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 7
/
main.go
96 lines (85 loc) · 2.19 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
package main
import (
"context"
"flag"
"os"
"os/signal"
"sort"
"time"
"github.com/miekg/xds/pkg/cache"
"github.com/miekg/xds/pkg/log"
"github.com/miekg/xds/pkg/server"
)
var (
nodeID = flag.String("nodeID", "test-id", "Node ID")
addr = flag.String("addr", ":18000", "management server address")
conf = flag.String("conf", ".", "cluster configuration directory")
debug = flag.Bool("debug", false, "enable debug logging")
)
// main returns code 1 if any of the batches failed to pass all requests
func main() {
flag.Parse()
if *debug {
log.D.Set()
}
clusters, err := parseClusters(*conf)
if err != nil {
log.Fatal(err)
}
// create a cache
config := cache.New()
for _, cl := range clusters {
config.Insert(cl)
}
log.Infof("Initialized cache with 'v1' of %d clusters parsed from directory: %q", len(clusters), *conf)
// Every 10s look through the config directory to see if there are new files to be loaded
stop := make(chan bool)
go rereadConfig(config, *conf, stop)
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
srv := server.NewServer(ctx, config)
go RunManagementServer(ctx, srv, *addr) // start the xDS server
sig := make(chan os.Signal)
signal.Notify(sig, os.Interrupt)
for {
select {
case <-sig:
close(stop)
cancel()
os.Exit(1)
}
}
}
func rereadConfig(config *cache.Cluster, path string, stop <-chan bool) {
tick := time.NewTicker(10 * time.Second)
defer tick.Stop()
for {
select {
case <-stop:
return
case <-tick.C:
clusters, err := parseClusters(path)
if err != nil {
log.Warningf("Error reparsing clusters: %s", err)
continue
}
current := config.All()
for _, c := range clusters {
i := sort.Search(len(current), func(i int) bool { return c.Name <= current[i] })
if i < len(current) && current[i] == c.Name {
cl, _ := config.Retrieve(current[i])
h1 := cache.HashFromMetadata(cl)
h2 := cache.HashFromMetadata(c)
if h1 != h2 {
log.Infof("cluster in %q updated, re-inserting cluster %q", path, c.Name)
config.Insert(c)
}
continue
}
// new cluster
log.Infof("Found new cluster in %q, adding cluster %q", path, c.Name)
config.Insert(c)
}
}
}
}