-
Notifications
You must be signed in to change notification settings - Fork 0
/
membership.go
135 lines (125 loc) · 3.63 KB
/
membership.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
package discovery
import (
"github.com/hashicorp/raft"
"github.com/hashicorp/serf/serf"
"go.uber.org/zap"
"net"
)
/*
Membership is our type wrapping Serf to provide discovery and cluster membership
to our service. Users will call NewMembership() to create a Membership with the required
configuration and event handler.
*/
type Membership struct {
Config
handler Handler
serf *serf.Serf
events chan serf.Event
logger *zap.Logger
}
type Config struct {
NodeName string
BindAddr string
Tags map[string]string
StartJoinAddrs []string
}
type Handler interface {
Join(name, addr string) error
Leave(name string) error
}
func NewMembership(handler Handler, config Config) (*Membership, error) {
c := &Membership{
Config: config,
handler: handler,
logger: zap.L().Named("membership"),
}
if err := c.setupSerf(); err != nil {
return nil, err
}
return c, nil
}
/*
setupSerf() creates and configures a Serf instance and starts the eventsHandler()
goroutine to handle Serf’s events
*/
func (m *Membership) setupSerf() (err error) {
addr, err := net.ResolveTCPAddr("tcp", m.BindAddr)
if err != nil {
return err
}
config := serf.DefaultConfig()
config.Init()
// Serf listens on this address and port for gossiping.
config.MemberlistConfig.BindAddr = addr.IP.String()
config.MemberlistConfig.BindPort = addr.Port
m.events = make(chan serf.Event)
// The event channel receives Serf’s events when a node joins or leaves the cluster
config.EventCh = m.events
// These tags are shared to other nodes in the cluster to inform how to handle this node,
// like if it's a voter or a non-voter, sharing RPC addresses etc.
config.Tags = m.Tags
// Node name acts as the node’s unique identifier across the Serf cluster. Default is hostname.
config.NodeName = m.Config.NodeName
m.serf, err = serf.Create(config)
if err != nil {
return err
}
go m.eventHandler()
// StartJoinAddrs is set to the addresses of nodes already in the cluster,
// to enable this node to join the cluster.
if m.StartJoinAddrs != nil {
_, err = m.serf.Join(m.StartJoinAddrs, true)
if err != nil {
return err
}
}
return nil
}
/*
eventHandler() runs in a loop reading events sent by Serf into the events channel,
handling each incoming event according to the event’s type. Serf may coalesce multiple
members updates into one event, so we iterate over event's members.
*/
func (m *Membership) eventHandler() {
for e := range m.events {
switch e.EventType() {
case serf.EventMemberJoin:
for _, member := range e.(serf.MemberEvent).Members {
if m.isLocal(member) {
continue
}
if err := m.handler.Join(member.Name, member.Tags["rpc_addr"]); err != nil {
m.logError(err, "failed to join", member)
}
}
case serf.EventMemberLeave:
for _, member := range e.(serf.MemberEvent).Members {
if m.isLocal(member) {
return
}
if err := m.handler.Leave(member.Name); err != nil {
m.logError(err, "failed to leave", member)
}
}
}
}
}
func (m *Membership) logError(err error, msg string, member serf.Member) {
log := m.logger.Error
// If the node is a non-leader we will log the errors at the debug level, but
// in case of a Leader, these errors will continue to get logged as critical.
if err == raft.ErrNotLeader {
log = m.logger.Debug
}
log(msg, zap.Error(err), zap.String("name", member.Name),
zap.String("rpc_addr", member.Tags["rpc_addr"]))
}
func (m *Membership) isLocal(member serf.Member) bool {
return m.serf.LocalMember().Name == member.Name
}
func (m *Membership) Members() []serf.Member {
return m.serf.Members()
}
func (m *Membership) Leave() error {
return m.serf.Leave()
}