diff --git a/helpers/postgresql.py b/helpers/postgresql.py index db75660d..37366080 100644 --- a/helpers/postgresql.py +++ b/helpers/postgresql.py @@ -12,6 +12,7 @@ def __init__(self, config): self.name = config["name"] self.host, self.port = config["listen"].split(":") self.data_dir = config["data_dir"] + self.version_file = os.path.join(self.data_dir, 'PG_VERSION') self.replication = config["replication"] self.config = config @@ -65,6 +66,24 @@ def query(self, sql): def data_directory_empty(self): return not os.path.exists(self.data_dir) or os.listdir(self.data_dir) == [] + def version_file_exists(self): + return not self.data_directory_empty() and os.path.isfile(self.version_file) + + def postgres_version(self): + if self.version_file_exists(): + try: + with open(self.version_file) as f: + return f.read().strip() + except Exception: + logger.exception('Failed to read PG_VERSION from %s', self._data_dir) + return 0 + + def wal_name(self): + return 'wal' if self.postgres_version >= 10 else 'xlog' + + def lsn_name(self): + return 'lsn' if self.postgres_version >= 10 else 'location' + def initialize(self): if os.system("initdb %s" % self.initdb_options()) == 0: # start Postgres without options to setup replication user indepedent of other system settings @@ -155,7 +174,7 @@ def is_healthiest_node(self, state_store): member_conn = psycopg2.connect(member["address"]) member_conn.autocommit = True member_cursor = member_conn.cursor() - member_cursor.execute("SELECT %s - (pg_last_xlog_replay_location() - '0/000000'::pg_lsn) AS bytes;" % self.xlog_position()) + member_cursor.execute("SELECT %s - (pg_last_%s_replay_%s() - '0/000000'::pg_lsn) AS bytes;" % (self.xlog_position(), self.wal_name(), self.lsn_name()) ) xlog_diff = member_cursor.fetchone()[0] logger.info([self.name, member["hostname"], xlog_diff]) if xlog_diff < 0: @@ -219,10 +238,10 @@ def create_replication_user(self): self.query("CREATE USER \"%s\" WITH REPLICATION ENCRYPTED PASSWORD '%s';" % (self.replication["username"], self.replication["password"])) def xlog_position(self): - return self.query("SELECT pg_last_xlog_replay_location() - '0/0000000'::pg_lsn;").fetchone()[0] + return self.query("SELECT pg_last_%s_replay_%s() - '0/0000000'::pg_lsn;" % (self.wal_name(), self.lsn_name())).fetchone()[0] def last_operation(self): if self.is_leader(): - return self.query("SELECT pg_current_xlog_location() - '0/00000'::pg_lsn;").fetchone()[0] + return self.query("SELECT pg_current_%s_%s() - '0/00000'::pg_lsn;" % (self.wal_name(), self.lsn_name())).fetchone()[0] else: return self.xlog_position()