diff --git a/pg_view/__init__.py b/pg_view/__init__.py index babef24..a394387 100644 --- a/pg_view/__init__.py +++ b/pg_view/__init__.py @@ -55,9 +55,6 @@ def parse_args(): action='store', dest='tick', type='int', default=1) parser.add_option('-o', '--output-method', help='send output to the following source', action='store', default=OUTPUT_METHOD.curses, dest='output_method') - parser.add_option('-V', '--use-version', - help='version of the instance to monitor (in case it can\'t be autodetected)', - action='store', dest='version', type='float') parser.add_option('-l', '--log-file', help='direct log output to the file', action='store', dest='log_file') parser.add_option('-R', '--reset-output', help='clear screen after each tick', action='store_true', default=False, @@ -190,7 +187,6 @@ def main(): clusters = [] config = read_configuration(options.config_file) if options.config_file else None - dbversion = None # configuration file takes priority over the rest of database connection information sources. if config: for instance in config: @@ -221,29 +217,24 @@ def main(): # get all PostgreSQL instances for result_work_dir, data in postmasters.items(): - (ppid, dbversion, dbname) = data - # if user requested a specific database name and version - don't try to connect to others + (ppid, version, dbname) = data + # if user requested a specific database don't try to connect to others if options.instance: if dbname != options.instance or not result_work_dir or not ppid: continue - if options.version is not None and dbversion != options.version: - continue try: conndata = detect_db_connection_arguments( - result_work_dir, ppid, dbversion, options.username, options.dbname) + result_work_dir, ppid, version, options.username, options.dbname) if conndata is None: continue host = conndata['host'] port = conndata['port'] conn = build_connection(host, port, options.username, options.dbname) - pgcon = psycopg2.connect(**conn) + psycopg2.connect(**conn).close() # test if we can connect + desc = make_cluster_desc(name=dbname, version=version, workdir=result_work_dir, conn=conn) + clusters.append(desc) except Exception as e: logger.error('PostgreSQL exception {0}'.format(e)) - pgcon = None - if pgcon: - desc = make_cluster_desc(name=dbname, version=dbversion, workdir=result_work_dir, - pid=ppid, pgcon=pgcon, conn=conn) - clusters.append(desc) collectors = [] groups = {} try: @@ -255,10 +246,7 @@ def main(): # initialize the disks stat collector process and create an exchange queue q = JoinableQueue(1) - work_directories = [cl['wd'] for cl in clusters if 'wd' in cl] - dbversion = dbversion or clusters[0]['ver'] - - collector = DetachedDiskStatCollector(q, work_directories, dbversion) + collector = DetachedDiskStatCollector(q, clusters) collector.start() consumer = DiskCollectorConsumer(q) @@ -266,8 +254,8 @@ def main(): collectors.append(SystemStatCollector()) collectors.append(MemoryStatCollector()) for cl in clusters: - part = PartitionStatCollector(cl['name'], cl['ver'], cl['wd'], consumer) - pg = PgstatCollector(cl['pgcon'], cl['reconnect'], cl['pid'], cl['name'], cl['ver'], options.pid) + part = PartitionStatCollector(cl['name'], cl['version'], cl['wd'], consumer) + pg = PgstatCollector(cl['name'], cl['reconnect'], options.pid) groupname = cl['wd'] groups[groupname] = {'pg': pg, 'partitions': part} collectors.append(part) diff --git a/pg_view/collectors/partition_collector.py b/pg_view/collectors/partition_collector.py index c0802be..851de79 100644 --- a/pg_view/collectors/partition_collector.py +++ b/pg_view/collectors/partition_collector.py @@ -22,10 +22,10 @@ class PartitionStatCollector(StatCollector): XLOG_NAME = 'xlog' BLOCK_SIZE = 1024 - def __init__(self, dbname, dbversion, work_directory, consumer): + def __init__(self, dbname, version, work_directory, consumer): super(PartitionStatCollector, self).__init__(ticks_per_refresh=1) self.dbname = dbname - self.dbver = dbversion + self.version = version self.queue_consumer = consumer self.work_directory = work_directory self.df_list_transformation = [{'out': 'dev', 'in': 0, 'fn': self._dereference_dev_name}, @@ -130,7 +130,7 @@ def __init__(self, dbname, dbversion, work_directory, consumer): self.postinit() def ident(self): - return '{0} ({1}/{2})'.format(super(PartitionStatCollector, self).ident(), self.dbname, self.dbver) + return '{0} ({1}/{2})'.format(super(PartitionStatCollector, self).ident(), self.dbname, self.version) @staticmethod def _dereference_dev_name(devname): @@ -163,7 +163,7 @@ def refresh(self): self._do_refresh([result[PartitionStatCollector.DATA_NAME], result[PartitionStatCollector.XLOG_NAME]]) @staticmethod - def calculate_time_until_full(colname, prev, cur): + def calculate_time_until_full(_, prev, cur): # both should be expressed in common units, guaranteed by BLOCK_SIZE if (cur.get('path_size', 0) > 0 and prev.get('path_size', 0) > 0 and @@ -178,8 +178,8 @@ def get_io_data(pnames): result = {} found = 0 # stop if we found records for all partitions total = len(pnames) + fp = None try: - fp = None fp = open(PartitionStatCollector.DISK_STAT_FILE, 'rU') for l in fp: elements = l.split() @@ -208,22 +208,21 @@ class DetachedDiskStatCollector(Process): OLD_WAL_SUBDIR = '/pg_xlog/' WAL_SUBDIR = '/pg_wal/' - NEW_WAL_SINCE = 10.0 + NEW_WAL_SINCE = 100000 - def __init__(self, q, work_directories, db_version): + def __init__(self, q, clusters): super(DetachedDiskStatCollector, self).__init__() - self.work_directories = work_directories self.q = q self.daemon = True - self.db_version = db_version + self.clusters = clusters self.df_cache = {} - @property - def wal_directory(self): + @staticmethod + def wal_directory(version): """ Since Postgresql 10.0 wal directory was renamed, so we need to choose actual wal directory based on a db_version. """ - if self.db_version < DetachedDiskStatCollector.NEW_WAL_SINCE: + if version < DetachedDiskStatCollector.NEW_WAL_SINCE: return DetachedDiskStatCollector.OLD_WAL_SUBDIR else: return DetachedDiskStatCollector.WAL_SUBDIR @@ -234,32 +233,36 @@ def run(self): self.q.join() result = {} self.df_cache = {} - for wd in self.work_directories: - du_data = self.get_du_data(wd) - df_data = self.get_df_data(wd) - result[wd] = [du_data, df_data] + for cluster in self.clusters: + work_directory = cluster['wd'] + wal_directory = self.wal_directory(cluster['version']) + du_data = self.get_du_data(work_directory, wal_directory) + df_data = self.get_df_data(work_directory, wal_directory) + result[work_directory] = [du_data, df_data] self.q.put(result) time.sleep(consts.TICK_LENGTH) - def get_du_data(self, wd): + def get_du_data(self, work_directory, wal_directory): data_size = 0 xlog_size = 0 result = {'data': [], 'xlog': []} try: - data_size = self.run_du(wd, BLOCK_SIZE) - xlog_size = self.run_du(wd + self.wal_directory, BLOCK_SIZE) + data_size = self.run_du(work_directory, BLOCK_SIZE) + xlog_size = self.run_du(work_directory + wal_directory, BLOCK_SIZE) except Exception as e: logger.error('Unable to read free space information for the pg_xlog and data directories for the directory\ - {0}: {1}'.format(wd, e)) + {0}: {1}'.format(work_directory, e)) else: # XXX: why do we pass the block size there? - result['data'] = str(data_size), wd - result['xlog'] = str(xlog_size), wd + self.wal_directory + result['data'] = str(data_size), work_directory + result['xlog'] = str(xlog_size), work_directory + wal_directory return result @staticmethod - def run_du(pathname, block_size=BLOCK_SIZE, exclude=['lost+found']): + def run_du(pathname, block_size=BLOCK_SIZE, exclude=None): + if exclude == None: + exclude = ["lost+found"] size = 0 folders = [pathname] root_dev = os.lstat(pathname).st_dev @@ -285,13 +288,13 @@ def run_du(pathname, block_size=BLOCK_SIZE, exclude=['lost+found']): size += st.st_size return long(size / block_size) - def get_df_data(self, work_directory): + def get_df_data(self, work_directory, wal_directory): """ Retrive raw data from df (transformations are performed via df_list_transformation) """ result = {'data': [], 'xlog': []} # obtain the device names data_dev = self.get_mounted_device(self.get_mount_point(work_directory)) - xlog_dev = self.get_mounted_device(self.get_mount_point(work_directory + self.wal_directory)) + xlog_dev = self.get_mounted_device(self.get_mount_point(work_directory + wal_directory)) if data_dev not in self.df_cache: data_vfs = os.statvfs(work_directory) self.df_cache[data_dev] = data_vfs @@ -299,7 +302,7 @@ def get_df_data(self, work_directory): data_vfs = self.df_cache[data_dev] if xlog_dev not in self.df_cache: - xlog_vfs = os.statvfs(work_directory + self.wal_directory) + xlog_vfs = os.statvfs(work_directory + wal_directory) self.df_cache[xlog_dev] = xlog_vfs else: xlog_vfs = self.df_cache[xlog_dev] @@ -353,7 +356,7 @@ def get_mounted_device(pathname): @staticmethod def get_mount_point(pathname): """Get the mounlst point of the filesystem containing pathname""" - + mount_point = None pathname = os.path.normcase(os.path.realpath(pathname)) parent_device = path_device = os.stat(pathname).st_dev while parent_device == path_device: diff --git a/pg_view/collectors/pg_collector.py b/pg_view/collectors/pg_collector.py index 1779dd0..96b127b 100644 --- a/pg_view/collectors/pg_collector.py +++ b/pg_view/collectors/pg_collector.py @@ -6,7 +6,7 @@ from pg_view.collectors.base_collector import StatCollector from pg_view.loggers import logger from pg_view.models.outputs import COLSTATUS, COLALIGN -from pg_view.utils import MEM_PAGE_SIZE, dbversion_as_float +from pg_view.utils import MEM_PAGE_SIZE if sys.hexversion >= 0x03000000: long = int @@ -20,22 +20,17 @@ class PgstatCollector(StatCollector): STATM_FILENAME = '/proc/{0}/statm' - def __init__(self, pgcon, reconnect, pid, dbname, dbver, always_track_pids): + def __init__(self, dbname, reconnect, always_track_pids): super(PgstatCollector, self).__init__() - self.postmaster_pid = pid - self.pgcon = pgcon - self.reconnect = reconnect + self.reconnect_fn = reconnect self.pids = [] self.rows_diff = [] self.rows_diff_output = [] - # figure out our backend pid - self.connection_pid = pgcon.get_backend_pid() - self.max_connections = self._get_max_connections() + # figure out connection invariants + self.connect_to_postgres() self.recovery_status = self._get_recovery_status() self.always_track_pids = always_track_pids self.dbname = dbname - self.dbver = dbver - self.server_version = pgcon.get_parameter_status('server_version') self.filter_aux_processes = True self.total_connections = 0 self.active_connections = 0 @@ -240,7 +235,7 @@ def idle_format_fn(self, text): if not r: return text else: - if self.dbver >= 9.2: + if self.pgversion >= 90200: return 'idle in transaction for ' + StatCollector.time_pretty_print(int(r.group(1))) else: return 'idle in transaction ' + StatCollector.time_pretty_print(int(r.group(1))) \ @@ -256,7 +251,7 @@ def query_status_fn(self, row, col): return {-1: COLSTATUS.cs_ok} def ident(self): - return '{0} ({1}/{2})'.format('postgres', self.dbname, self.dbver) + return '{0} ({1}/{2})'.format('postgres', self.dbname, self.pgversion) @staticmethod def _get_psinfo(cmdline): @@ -293,6 +288,16 @@ def ncurses_filter_row(self, row): else: return False + def connect_to_postgres(self): + self.pgcon, self.postmaster_pid = self.reconnect_fn() + self._read_connection_invariants(self.pgcon) + + def _read_connection_invariants(self, pgcon): + self.connection_pid = self.pgcon.get_backend_pid() + self.max_connections = self._get_max_connections(pgcon) + self.pgversion = int(self.pgcon.server_version) + self.server_version_as_string = self.pgcon.get_parameter_status('server_version') + def refresh(self): """ Reads data from /proc and PostgreSQL stats """ result = [] @@ -302,11 +307,7 @@ def refresh(self): if not self.pgcon: # if we've lost the connection, try to reconnect and # re-initialize all connection invariants - self.pgcon, self.postmaster_pid = self.reconnect() - self.connection_pid = self.pgcon.get_backend_pid() - self.max_connections = self._get_max_connections() - self.dbver = dbversion_as_float(self.pgcon) - self.server_version = self.pgcon.get_parameter_status('server_version') + self.connect_to_postgres() stat_data = self._read_pg_stat_activity() except psycopg2.OperationalError as e: logger.info("failed to query the server: {}".format(e)) @@ -409,10 +410,10 @@ def _get_memory_usage(self, pid): uss = (long(statm[1]) - long(statm[2])) * MEM_PAGE_SIZE return uss - def _get_max_connections(self): + def _get_max_connections(self, pgcon): """ Read max connections from the database """ - cur = self.pgcon.cursor(cursor_factory=psycopg2.extras.RealDictCursor) + cur = pgcon.cursor(cursor_factory=psycopg2.extras.RealDictCursor) cur.execute('show max_connections') result = cur.fetchone() cur.close() @@ -435,7 +436,7 @@ def _read_pg_stat_activity(self): # the pg_stat_activity format has been changed to 9.2, avoiding ambigiuous meanings for some columns. # since it makes more sense then the previous layout, we 'cast' the former versions to 9.2 - if self.dbver < 9.2: + if self.pgversion < 90200: cur.execute(""" SELECT datname, procpid as pid, @@ -472,8 +473,8 @@ def _read_pg_stat_activity(self): AND other.granted = 't' WHERE procpid != pg_backend_pid() GROUP BY 1,2,3,4,5,6,7,9 - """) - elif self.dbver < 9.6: + """) + elif self.pgversion < 90600: cur.execute(""" SELECT datname, a.pid as pid, @@ -559,7 +560,7 @@ def ncurses_produce_prefix(self): if self.pgcon: return "{dbname} {version} {role} connections: {conns} of {max_conns} allocated, {active_conns} active\n". \ format(dbname=self.dbname, - version=self.server_version, + version=self.server_version_as_string, role=self.recovery_status, conns=self.total_connections, max_conns=self.max_connections, @@ -567,7 +568,7 @@ def ncurses_produce_prefix(self): else: return "{dbname} {version} (offline)\n". \ format(dbname=self.dbname, - version=self.server_version) + version=self.server_version_as_string) @staticmethod def process_sort_key(process): diff --git a/pg_view/models/db_client.py b/pg_view/models/db_client.py index a779e16..cba2638 100644 --- a/pg_view/models/db_client.py +++ b/pg_view/models/db_client.py @@ -6,7 +6,7 @@ from pg_view.loggers import logger from pg_view.models.parsers import ProcNetParser -from pg_view.utils import STAT_FIELD, dbversion_as_float +from pg_view.utils import STAT_FIELD, postgres_major_version_to_int def read_postmaster_pid(work_directory, dbname): @@ -81,10 +81,11 @@ def detect_db_connection_arguments(work_directory, pid, version, username, dbnam next reading the postgresql.conf if necessary and, at last, """ conn_args = detect_with_proc_net(pid) - if not conn_args: + if not conn_args and version >= 90100: # if we failed to detect the arguments via the /proc/net/ readings, # perhaps we'll get better luck with just peeking into postmaster.pid. - conn_args = detect_with_postmaster_pid(work_directory, version) + # Only try for PostgreSQL 9.1 and above, earlier version doesn't contain enough data. + conn_args = detect_with_postmaster_pid(work_directory) if not conn_args: logger.error('unable to detect connection parameters for the PostgreSQL cluster at {0}'.format( work_directory)) @@ -110,7 +111,7 @@ def establish_user_defined_connection(instance, conn, clusters): logger.error('PostgreSQL exception: {0}'.format(e)) return None # get the database version from the pgcon properties - dbver = dbversion_as_float(pgcon) + pg_version = int(pgcon.server_version) cur = pgcon.cursor() cur.execute('show data_directory') work_directory = cur.fetchone()[0] @@ -132,13 +133,12 @@ def establish_user_defined_connection(instance, conn, clusters): pgcon.close() return True # now we have all components to create a cluster descriptor - desc = make_cluster_desc(name=instance, version=dbver, workdir=work_directory, - pid=pid, pgcon=pgcon, conn=conn) + desc = make_cluster_desc(name=instance, version=pg_version, workdir=work_directory, conn=conn) clusters.append(desc) return True -def make_cluster_desc(name, version, workdir, pid, pgcon, conn): +def make_cluster_desc(name, version, workdir, conn): """Create cluster descriptor, complete with the reconnect function.""" def reconnect(): @@ -148,10 +148,8 @@ def reconnect(): return { 'name': name, - 'ver': version, + 'version': version, 'wd': workdir, - 'pid': pid, - 'pgcon': pgcon, 'reconnect': reconnect } @@ -231,8 +229,7 @@ def get_postmasters_directories(): try: fp = open(PG_VERSION_FILENAME, 'rU') val = fp.read().strip() - if val is not None and len(val) >= 2: - version = float(val) + version = postgres_major_version_to_int(val) except os.error as e: logger.error( 'unable to read version number from PG_VERSION directory {0}, have to skip it'.format(pg_dir)) @@ -240,6 +237,8 @@ def get_postmasters_directories(): except ValueError: logger.error('PG_VERSION doesn\'t contain a valid version number: {0}'.format(val)) continue + except Exception as e: + logger.error("could not parse postgres version number: {0}".format(e)) else: dbname = get_dbname_from_path(pg_dir) postmasters[pg_dir] = [pid, version, dbname] @@ -269,12 +268,9 @@ def fetch_socket_inodes_for_process(pid): return inodes -def detect_with_postmaster_pid(work_directory, version): +def detect_with_postmaster_pid(work_directory): - # PostgreSQL 9.0 doesn't have enough data result = {} - if version is None or version == 9.0: - return None PID_FILE = '{0}/postmaster.pid'.format(work_directory) # try to access the socket directory diff --git a/pg_view/utils.py b/pg_view/utils.py index 6d7f3d5..7b6819b 100644 --- a/pg_view/utils.py +++ b/pg_view/utils.py @@ -20,7 +20,6 @@ def enum(**enums): MEM_PAGE_SIZE = resource.getpagesize() OUTPUT_METHOD = enum(console='console', json='json', curses='curses') - def get_valid_output_methods(): result = [] for key in OUTPUT_METHOD.__dict__.keys(): @@ -96,7 +95,47 @@ def process_groups(groups): part.ncurses_set_prefix(pg.ncurses_produce_prefix()) -def dbversion_as_float(pgcon): - version_num = pgcon.server_version - version_num /= 100 - return float('{0}.{1}'.format(version_num / 100, version_num % 100)) + +# The version parsing is shamelessly stolen from https://github.com/zalando/patroni/blob/master/patroni/postgresql.py +def postgres_version_to_int(pg_version): + """ Convert the server_version to integer + + >>> postgres_version_to_int('9.5.3') + 90503 + >>> postgres_version_to_int('9.3.13') + 90313 + >>> postgres_version_to_int('10.1') + 100001 + >>> postgres_version_to_int('10') + Traceback (most recent call last): + ... + Exception: Invalid PostgreSQL format: X.Y or X.Y.Z is accepted: 10 + >>> postgres_version_to_int('a.b.c') + Traceback (most recent call last): + ... + Exception: Invalid PostgreSQL version: a.b.c + """ + components = pg_version.split('.') + + result = [] + if len(components) < 2 or len(components) > 3: + raise Exception("Invalid PostgreSQL format: X.Y or X.Y.Z is accepted: {0}".format(pg_version)) + if len(components) == 2: + # new style verion numbers, i.e. 10.1 becomes 100001 + components.insert(1, '0') + try: + result = [c if int(c) > 10 else '0{0}'.format(c) for c in components] + result = int(''.join(result)) + except ValueError: + raise Exception("Invalid PostgreSQL version: {0}".format(pg_version)) + return result + +def postgres_major_version_to_int(pg_version): + """ + >>> postgres_major_version_to_int('10') + 100000 + >>> postgres_major_version_to_int('9.6') + 90600 + """ + return postgres_version_to_int(pg_version + '.0') +