Skip to content
This repository has been archived by the owner on Dec 19, 2022. It is now read-only.

Commit

Permalink
Merge pull request compose#34 from compose/module-support
Browse files Browse the repository at this point in the history
make governor a module
  • Loading branch information
Winslett committed Mar 14, 2016
2 parents e5267c1 + 48da1cb commit 5c070ed
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 61 deletions.
Empty file added __init__.py
Empty file.
127 changes: 66 additions & 61 deletions governor.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,13 @@

logging.basicConfig(format='%(asctime)s %(levelname)s: %(message)s', level=LOG_LEVEL)

f = open(sys.argv[1], "r")
config = yaml.load(f.read())
f.close()

etcd = Etcd(config["etcd"])
postgresql = Postgresql(config["postgresql"])
ha = Ha(postgresql, etcd)

# stop postgresql on script exit
def stop_postgresql():
def stop_postgresql(postgresql):
postgresql.stop()
atexit.register(stop_postgresql)

# wait for etcd to be available
def wait_for_etcd(message):
def wait_for_etcd(message, etcd, postgresql):
etcd_ready = False
while not etcd_ready:
try:
Expand All @@ -35,60 +27,73 @@ def wait_for_etcd(message):
logging.info("waiting on etcd: %s" % message)
time.sleep(5)

logging.info("Governor Starting up")
def run(config):
etcd = Etcd(config["etcd"])
postgresql = Postgresql(config["postgresql"])
ha = Ha(postgresql, etcd)

atexit.register(stop_postgresql, postgresql)
logging.info("Governor Starting up")
# is data directory empty?
if postgresql.data_directory_empty():
logging.info("Governor Starting up: Empty Data Dir")
# racing to initialize
wait_for_etcd("cannot initialize member without ETCD")
if etcd.race("/initialize", postgresql.name):
logging.info("Governor Starting up: Initialisation Race ... WON!!!")
logging.info("Governor Starting up: Initialise Postgres")
postgresql.initialize()
logging.info("Governor Starting up: Initialise Complete")
etcd.take_leader(postgresql.name)
if postgresql.data_directory_empty():
logging.info("Governor Starting up: Empty Data Dir")
# racing to initialize
wait_for_etcd("cannot initialize member without ETCD", etcd, postgresql)
if etcd.race("/initialize", postgresql.name):
logging.info("Governor Starting up: Initialisation Race ... WON!!!")
logging.info("Governor Starting up: Initialise Postgres")
postgresql.initialize()
logging.info("Governor Starting up: Initialise Complete")
etcd.take_leader(postgresql.name)
logging.info("Governor Starting up: Starting Postgres")
postgresql.start()
else:
logging.info("Governor Starting up: Initialisation Race ... LOST")
logging.info("Governor Starting up: Sync Postgres from Leader")
synced_from_leader = False
while not synced_from_leader:
leader = etcd.current_leader()
if not leader:
time.sleep(5)
continue
if postgresql.sync_from_leader(leader):
logging.info("Governor Starting up: Sync Completed")
postgresql.write_recovery_conf(leader)
logging.info("Governor Starting up: Starting Postgres")
postgresql.start()
synced_from_leader = True
else:
time.sleep(5)
else:
logging.info("Governor Starting up: Existing Data Dir")
postgresql.follow_no_leader()
logging.info("Governor Starting up: Starting Postgres")
postgresql.start()
else:
logging.info("Governor Starting up: Initialisation Race ... LOST")
logging.info("Governor Starting up: Sync Postgres from Leader")
synced_from_leader = False
while not synced_from_leader:
leader = etcd.current_leader()
if not leader:
time.sleep(5)
continue
if postgresql.sync_from_leader(leader):
logging.info("Governor Starting up: Sync Completed")
postgresql.write_recovery_conf(leader)
logging.info("Governor Starting up: Starting Postgres")
postgresql.start()
synced_from_leader = True
else:
time.sleep(5)
else:
logging.info("Governor Starting up: Existing Data Dir")
postgresql.follow_no_leader()
logging.info("Governor Starting up: Starting Postgres")
postgresql.start()

wait_for_etcd("running in readonly mode; cannot participate in cluster HA without etcd")
logging.info("Governor Running: Starting Running Loop")
while True:
try:
logging.info("Governor Running: %s" % ha.run_cycle())
wait_for_etcd("running in readonly mode; cannot participate in cluster HA without etcd", etcd, postgresql)
logging.info("Governor Running: Starting Running Loop")
while True:
try:
logging.info("Governor Running: %s" % ha.run_cycle())

# create replication slots
if postgresql.is_leader():
logging.info("Governor Running: I am the Leader")
for node in etcd.get_client_path("/members?recursive=true")["node"]["nodes"]:
member = node["key"].split('/')[-1]
if member != postgresql.name:
postgresql.query("DO LANGUAGE plpgsql $$DECLARE somevar VARCHAR; BEGIN SELECT slot_name INTO somevar FROM pg_replication_slots WHERE slot_name = '%(slot)s' LIMIT 1; IF NOT FOUND THEN PERFORM pg_create_physical_replication_slot('%(slot)s'); END IF; END$$;" % {"slot": member})
etcd.touch_member(postgresql.name, postgresql.connection_string)
# create replication slots
if postgresql.is_leader():
logging.info("Governor Running: I am the Leader")
for node in etcd.get_client_path("/members?recursive=true")["node"]["nodes"]:
member = node["key"].split('/')[-1]
if member != postgresql.name:
postgresql.query("DO LANGUAGE plpgsql $$DECLARE somevar VARCHAR; BEGIN SELECT slot_name INTO somevar FROM pg_replication_slots WHERE slot_name = '%(slot)s' LIMIT 1; IF NOT FOUND THEN PERFORM pg_create_physical_replication_slot('%(slot)s'); END IF; END$$;" % {"slot": member})
etcd.touch_member(postgresql.name, postgresql.connection_string)

time.sleep(config["loop_wait"])
except urllib2.URLError:
logging.warning("Lost connection to etcd, setting no leader and waiting on etcd")
postgresql.follow_no_leader()
wait_for_etcd("running in readonly mode; cannot participate in cluster HA without etcd")
time.sleep(config["loop_wait"])
except urllib2.URLError:
logging.info("Lost connection to etcd, setting no leader and waiting on etcd")
postgresql.follow_no_leader()
wait_for_etcd("running in readonly mode; cannot participate in cluster HA without etcd", etcd, postgresql)

if __name__ == "__main__":
f = open(sys.argv[1], "r")
config = yaml.load(f.read())
f.close()

run(config)

0 comments on commit 5c070ed

Please sign in to comment.