-
Notifications
You must be signed in to change notification settings - Fork 1
/
code.py
114 lines (93 loc) · 3.78 KB
/
code.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
import etcd3
import sys
import time
from threading import Event
# The current leader is going to be the value with this key.
LEADER_KEY = "/leader"
# Entrypoint of the program
def main(server_name):
# Create a new client to etcd.
client = etcd3.client(host="localhost",port=2379)
while True:
is_leader, lease = leader_election(client,server_name)
if (is_leader):
print("I am the leader.")
on_leadership_gained(lease)
else:
print("I am a follower.")
wait_for_next_election(client)
# This election mechanism consists of all clients trying to put
# their name into a single key. But in a way that only works if
# the key does not exist (or has expired before).
def leader_election(client, server_name):
print("New leader election happening.")
# Create a lease before creating a key. This way, if this client
# ever lets the lease expire, the keys associated with that lease
# will all expire as well.
# Here, if the client fails to renew lease for 5 seconds
# (network partitions or machine gets down), than the leader
# election key will expire
# https://help.compose.com/docs/etcd-using-etcd3-features#selection-leases
lease = client.lease(5)
# Try to create the key with your name as value. If it fails, then another
# server got their first.
is_leader = try_insert(client,LEADER_KEY, server_name, lease)
return is_leader,lease
def on_leadership_gained(lease):
while True:
# As long as this process is alive and we are the leader,
# we try to renew the lease. We dont give up the leadership
# unless the process / machine crashes or some exception
# is raised.
try:
print("Refreshing lease; still the leader")
lease.refresh()
print("Passing next stage")
# Here we can add business logic
do_work()
except Exception:
# Here we most likely got a client timeout
# maybe frome network issues. Try to revoke
# the current lease so another member can get
# leadership.
lease.revoke()
return
except KeyboardInterrupt:
print ("\nRevoking lease, no longer the leader.")
lease.revoke()
sys.exit(1)
def wait_for_next_election(client):
election_event = Event()
def watch_callback(resp):
for event in resp.events:
# For each event in watch event, if the event is a deletion
# it means the kep expired / got deleted, which means the leadership
# is up for grasp.
if isinstance(event, etcd3.events.DeleteEvent):
print("LEADERSHIP CHANGE REQUIRED")
election_event.set()
watch_id = client.add_watch_callback(LEADER_KEY, watch_callback)
# While we haven't seen that leadership needs change, just sleep.
try:
while not election_event.is_set():
time.sleep(1)
except KeyboardInterrupt:
client.cancel_watch(watch_id)
sys.exit(1)
# Cancel the watch; we see that election should happen again
client.cancel_watch(watch_id)
# Try to insert a key into etcd with a value and a lease.If the lease expires
# that key will get automatically deleted behind the scenes.If that key
# was already present this will raise an exception
def try_insert(client, key, value, lease):
insert_succeeded, _ = client.transaction(
failure = [],
success = [client.transactions.put(key,value,lease)],
compare = [client.transactions.version(key) == 0],
)
return insert_succeeded
def do_work():
time.sleep(1)
if __name__ == "__main__":
server_name = sys.argv[1]
main(server_name)