This repository has been archived by the owner on Jun 29, 2019. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtopology.go
159 lines (132 loc) · 3.79 KB
/
topology.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
package rdsmysql
import (
"sort"
"sync"
"time"
)
// topologyOpts are the options needed to create topology state
type topologyOpts struct {
// MaxTimeLeaving is the max time the server is in leaving state. When this time is exceeded the OnLeave function is called
MaxTimeLeaving time.Duration
// OnLeave function is called when a server is in leaving state longer than MaxTimeLeaving
OnLeave func(id string)
// OnFailed function is called when a server is marked failed
OnFailed func(id string)
// OnFound function is called when a new server is found
OnFound func(id string)
// FailDuration specifies the duration for which MarkFailed is active for
FailDuration time.Duration
// Log outputs the passes data for debugging purposes
Logger Logger
Now func() time.Time
}
// topology manages the states of available servers
type topology struct {
opts topologyOpts
// values from information_schema.replica_host_statuss
availableReplicaHostStatus map[string]bool
availableReplicaHostStatusUpdated time.Time
lastAvailable map[string]bool
// map[server_id]startedLeaving
leaving map[string]time.Time
// When a node is marked as failed, it acts as if it is not returned from initial list of replicas for FailDuration time. OnLeave is called after MaxTimeLeaving if it is < FailDuration.
// map[server_id]lastFailed
failed map[string]time.Time
mu sync.Mutex
}
// newTopology creates a new topology state manager
func newTopology(opts topologyOpts) *topology {
s := &topology{}
s.opts = opts
s.availableReplicaHostStatus = map[string]bool{}
s.leaving = map[string]time.Time{}
s.failed = map[string]time.Time{}
if opts.Logger == nil {
s.opts.Logger = &defaultLogger{}
}
return s
}
func (s *topology) GetAvailable() []string {
now := s.opts.Now()
s.mu.Lock()
defer s.mu.Unlock()
notFailed := map[string]bool{}
for c := range s.availableReplicaHostStatus {
if s.failed[c].After(now.Add(-s.opts.FailDuration)) {
continue
}
notFailed[c] = true
}
newHosts := []string{}
for id := range notFailed {
ok := s.lastAvailable[id]
if !ok {
newHosts = append(newHosts, id)
}
}
sort.Strings(newHosts)
if len(newHosts) > 0 {
s.opts.Logger.Log("msg", "new hosts available", "hostname", newHosts)
}
// hosts that were leaving but now available again
for c := range notFailed {
_, ok := s.leaving[c]
if !ok {
continue
}
s.opts.Logger.Log("msg", "host is no longer leaving", "hostname", c)
delete(s.leaving, c)
}
// new leaving hosts
for c := range s.lastAvailable {
ok := notFailed[c]
if !ok {
s.leaving[c] = now
s.opts.Logger.Log("msg", "host is leaving", "hostname", c)
}
}
s.lastAvailable = notFailed
res := []string{}
for h := range s.lastAvailable {
res = append(res, h)
}
sort.Strings(res)
return res
}
// ExecuteOnLeaveIfNeeded calls OnLeave callback if one of the replicas is in leaving state longer than MaxTimeLeaving
func (s *topology) ExecuteOnLeaveIfNeeded() {
now := s.opts.Now()
s.mu.Lock()
for c, startedLeaving := range s.leaving {
if startedLeaving.Before(now.Add(-s.opts.MaxTimeLeaving)) {
s.opts.Logger.Log("msg", "host left", "hostname", c)
s.opts.OnLeave(c)
delete(s.leaving, c)
}
}
s.mu.Unlock()
}
func (s *topology) SetAvailableFromReplicaHostStatus(hostname string, current []string) {
now := s.opts.Now()
s.mu.Lock()
s.availableReplicaHostStatus = map[string]bool{}
for _, c := range current {
s.availableReplicaHostStatus[c] = true
if s.opts.OnFound != nil {
s.opts.OnFound(c)
}
}
s.availableReplicaHostStatusUpdated = now
s.mu.Unlock()
}
func (s *topology) MarkFailed(host string) {
s.opts.Logger.Log("msg", "marking host as failed", "hostname", host)
now := s.opts.Now()
s.mu.Lock()
s.failed[host] = now
s.leaving[host] = now
s.mu.Unlock()
if s.opts.OnFailed != nil {
s.opts.OnFailed(host)
}
}