forked from compose/governor
-
Notifications
You must be signed in to change notification settings - Fork 0
/
governor.py
executable file
·72 lines (59 loc) · 2.2 KB
/
governor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
#!/usr/bin/env python
import sys, os, yaml, time, urllib2, atexit
import logging
from helpers.etcd import Etcd
from helpers.postgresql import Postgresql
from helpers.ha import Ha
logging.basicConfig(format='%(asctime)s %(levelname)s: %(message)s', level=logging.INFO)
f = open(sys.argv[1], "r")
config = yaml.load(f.read())
f.close()
etcd = Etcd(config["etcd"])
postgresql = Postgresql(config["postgresql"])
ha = Ha(postgresql, etcd)
# stop postgresql on script exit
def stop_postgresql():
postgresql.stop()
atexit.register(stop_postgresql)
# wait for etcd to be available
etcd_ready = False
while not etcd_ready:
try:
etcd.touch_member(postgresql.name, postgresql.connection_string)
etcd_ready = True
except urllib2.URLError:
logging.info("waiting on etcd")
time.sleep(5)
# is data directory empty?
if postgresql.data_directory_empty():
# racing to initialize
if etcd.race("/initialize", postgresql.name):
postgresql.initialize()
etcd.take_leader(postgresql.name)
postgresql.start()
else:
synced_from_leader = False
while not synced_from_leader:
leader = etcd.current_leader()
if not leader:
time.sleep(5)
continue
if postgresql.sync_from_leader(leader):
postgresql.write_recovery_conf(leader)
postgresql.start()
synced_from_leader = True
else:
time.sleep(5)
else:
postgresql.follow_no_leader()
postgresql.start()
while True:
logging.info(ha.run_cycle())
# create replication slots
if postgresql.is_leader():
for node in etcd.get_client_path("/members?recursive=true")["node"]["nodes"]:
member = node["key"].split('/')[-1]
if member != postgresql.name:
postgresql.query("DO LANGUAGE plpgsql $$DECLARE somevar VARCHAR; BEGIN SELECT slot_name INTO somevar FROM pg_replication_slots WHERE slot_name = '%(slot)s' LIMIT 1; IF NOT FOUND THEN PERFORM pg_create_physical_replication_slot('%(slot)s'); END IF; END$$;" % {"slot": member})
etcd.touch_member(postgresql.name, postgresql.connection_string)
time.sleep(config["loop_wait"])