diff --git a/bidscoin/bcoin.py b/bidscoin/bcoin.py index 7baa9e32..26be81bc 100755 --- a/bidscoin/bcoin.py +++ b/bidscoin/bcoin.py @@ -31,10 +31,7 @@ yaml = YAML() yaml.representer.ignore_aliases = lambda *data: True # Expand aliases (https://stackoverflow.com/questions/58091449/disabling-alias-for-yaml-file-in-python) -# Define custom logging levels -BCDEBUG, BCDEBUG_LEVEL = 'BCDEBUG', 11 # NB: using the standard debug mode will generate may debug messages from imports -VERBOSE, VERBOSE_LEVEL = 'VERBOSE', 15 -SUCCESS, SUCCESS_LEVEL = 'SUCCESS', 25 +LOGGER = logging.getLogger(__name__) class TqdmUpTo(tqdm): @@ -53,39 +50,76 @@ def update_to(self, b=1, bsize=1, tsize=None): self.update(b * bsize - self.n) # will also set self.n = b * bsize -class CustomLogger(logging.Logger): - """Extend the Logger class to add custom methods for the new levels""" +def drmaa_nativespec(specs: str, session) -> str: + """ + Converts (CLI default) native Torque walltime and memory specifications to the DRMAA implementation (currently only Slurm is supported) - def bcdebug(self, message, *args, **kwargs): - """Custom BIDSCOIN DEBUG messages""" - if self.isEnabledFor(BCDEBUG_LEVEL): - self._log(BCDEBUG_LEVEL, message, args, **kwargs) + :param specs: Native Torque walltime and memory specifications, e.g. '-l walltime=00:10:00,mem=2gb' + :param session: The DRMAA session + :return: The converted native specifications + """ - def verbose(self, message, *args, **kwargs): - """Custom BIDSCOIN VERBOSE messages""" - if self.isEnabledFor(VERBOSE_LEVEL): - self._log(VERBOSE_LEVEL, message, args, **kwargs) + jobmanager: str = session.drmaaImplementation + + if '-l ' in specs and 'pbs' not in jobmanager.lower(): - def success(self, message, *args, **kwargs): - """Custom BIDSCOIN SUCCESS messages""" - if self.isEnabledFor(SUCCESS_LEVEL): - self._log(SUCCESS_LEVEL, message, args, **kwargs) + if 'slurm' in jobmanager.lower(): + specs = (specs.replace('-l ', '') + .replace(',', ' ') + .replace('walltime', '--time') + .replace('mem', '--mem') + .replace('gb','000')) + else: + LOGGER.warning(f"Default `--cluster` native specifications are not (yet) provided for {jobmanager}. Please add them to your command if you get DRMAA errors") + specs = '' + return specs.strip() -# Get a logger from the custom logger class -logging.setLoggerClass(CustomLogger) -LOGGER = logging.getLogger(__name__) + +def synchronize(pbatch, jobids: list, wait: int=15): + """ + Shows tqdm progress bars for queued and running DRMAA jobs. Waits until all jobs have finished + + some extra wait time to give NAS systems the opportunity to fully synchronize + + :param pbatch: The DRMAA session + :param jobids: The job ids + :param wait: The extra wait time for the NAS + :return: + """ + + with logging_redirect_tqdm(): + + qbar = tqdm(total=len(jobids), desc='Queued ', unit='job', leave=False, bar_format='{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}]') + rbar = tqdm(total=len(jobids), desc='Running', unit='job', leave=False, bar_format='{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}]', colour='green') + done = 0 + while done < len(jobids): + jobs = [pbatch.jobStatus(jobid) for jobid in jobids] + done = sum(status in ('done', 'failed', 'undetermined') for status in jobs) + qbar.n = sum(status == 'queued_active' for status in jobs) + rbar.n = sum(status == 'running' for status in jobs) + qbar.refresh(), rbar.refresh() + time.sleep(2) + qbar.close(), rbar.close() + + failedjobs = [jobid for jobid in jobids if pbatch.jobStatus(jobid)=='failed'] + if failedjobs: + LOGGER.error(f"{len(failedjobs)} HPC jobs failed to run:\n{failedjobs}\nThis may well be due to an underspecified `--cluster` input option (e.g. not enough memory)") + + # Give NAS systems some time to fully synchronize + for t in tqdm(range(wait*100), desc='synchronizing', leave=False, bar_format='{l_bar}{bar}| [{elapsed}]'): + time.sleep(.01) def setup_logging(logfile: Path=Path()): """ Set up the logging framework: - 1) Add custom logging levels: 'bcdebug', 'verbose', and 'success'. - 2) Add a console stream handler for generating terminal output. - 3) Optionally add file handlers for normal log and warning/error log if logfile is provided. + 1) Add a 'bcdebug', 'verbose' and a 'success' logging level + 2) Add a console streamhandler + 3) If logfile then add a normal log and a warning/error filehandler - :param logfile: Path to the logfile. If none, logging is console-only - """ + :param logfile: Name of the logfile + :return: + """ # Set the default formats if DEBUG: @@ -96,17 +130,36 @@ def setup_logging(logfile: Path=Path()): cfmt = '%(levelname)s | %(message)s' datefmt = '%Y-%m-%d %H:%M:%S' - # Add custom log levels to logging - logging.addLevelName(BCDEBUG_LEVEL, BCDEBUG) - logging.addLevelName(VERBOSE_LEVEL, VERBOSE) - logging.addLevelName(SUCCESS_LEVEL, SUCCESS) - - # Get the root logger and set the appropriate level + # Add a BIDScoin debug logging level = 11 (NB: using the standard debug mode will generate may debug messages from imports) + logging.BCDEBUG = 11 + logging.addLevelName(logging.BCDEBUG, 'BCDEBUG') + logging.__all__ += ['BCDEBUG'] if 'BCDEBUG' not in logging.__all__ else [] + def bcdebug(self, message, *args, **kws): + if self.isEnabledFor(logging.BCDEBUG): self._log(logging.BCDEBUG, message, args, **kws) + logging.Logger.bcdebug = bcdebug + + # Add a verbose logging level = 15 + logging.VERBOSE = 15 + logging.addLevelName(logging.VERBOSE, 'VERBOSE') + logging.__all__ += ['VERBOSE'] if 'VERBOSE' not in logging.__all__ else [] + def verbose(self, message, *args, **kws): + if self.isEnabledFor(logging.VERBOSE): self._log(logging.VERBOSE, message, args, **kws) + logging.Logger.verbose = verbose + + # Add a success logging level = 25 + logging.SUCCESS = 25 + logging.addLevelName(logging.SUCCESS, 'SUCCESS') + logging.__all__ += ['SUCCESS'] if 'SUCCESS' not in logging.__all__ else [] + def success(self, message, *args, **kws): + if self.isEnabledFor(logging.SUCCESS): self._log(logging.SUCCESS, message, args, **kws) + logging.Logger.success = success + + # Set the root logging level logger = logging.getLogger() - logger.setLevel(BCDEBUG_LEVEL if DEBUG else VERBOSE_LEVEL) + logger.setLevel('BCDEBUG' if DEBUG else 'VERBOSE') # Add the console streamhandler and bring some color to those boring logs! :-) - coloredlogs.install(level=BCDEBUG if DEBUG else VERBOSE if not logfile.name else 'INFO', fmt=cfmt, datefmt=datefmt) # NB: Using tqdm sets the streamhandler level to 0, see: https://github.com/tqdm/tqdm/pull/1235 + coloredlogs.install(level='BCDEBUG' if DEBUG else 'VERBOSE' if not logfile.name else 'INFO', fmt=cfmt, datefmt=datefmt) # NB: Using tqdm sets the streamhandler level to 0, see: https://github.com/tqdm/tqdm/pull/1235 coloredlogs.DEFAULT_LEVEL_STYLES['verbose']['color'] = 245 # = Gray if logfile.name: @@ -115,7 +168,7 @@ def setup_logging(logfile: Path=Path()): logfile.parent.mkdir(parents=True, exist_ok=True) # Create the log dir if it does not exist formatter = logging.Formatter(fmt=fmt, datefmt=datefmt) loghandler = logging.FileHandler(logfile) - loghandler.setLevel(BCDEBUG) + loghandler.setLevel('BCDEBUG') loghandler.setFormatter(formatter) loghandler.set_name('loghandler') logger.addHandler(loghandler) @@ -166,66 +219,6 @@ def reporterrors() -> str: return errors -def drmaa_nativespec(specs: str, session) -> str: - """ - Converts (CLI default) native Torque walltime and memory specifications to the DRMAA implementation (currently only Slurm is supported) - - :param specs: Native Torque walltime and memory specifications, e.g. '-l walltime=00:10:00,mem=2gb' - :param session: The DRMAA session - :return: The converted native specifications - """ - - jobmanager: str = session.drmaaImplementation - - if '-l ' in specs and 'pbs' not in jobmanager.lower(): - - if 'slurm' in jobmanager.lower(): - specs = (specs.replace('-l ', '') - .replace(',', ' ') - .replace('walltime', '--time') - .replace('mem', '--mem') - .replace('gb','000')) - else: - LOGGER.warning(f"Default `--cluster` native specifications are not (yet) provided for {jobmanager}. Please add them to your command if you get DRMAA errors") - specs = '' - - return specs.strip() - - -def synchronize(pbatch, jobids: list, wait: int=15): - """ - Shows tqdm progress bars for queued and running DRMAA jobs. Waits until all jobs have finished + - some extra wait time to give NAS systems the opportunity to fully synchronize - - :param pbatch: The DRMAA session - :param jobids: The job ids - :param wait: The extra wait time for the NAS - :return: - """ - - with logging_redirect_tqdm(): - - qbar = tqdm(total=len(jobids), desc='Queued ', unit='job', leave=False, bar_format='{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}]') - rbar = tqdm(total=len(jobids), desc='Running', unit='job', leave=False, bar_format='{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}]', colour='green') - done = 0 - while done < len(jobids): - jobs = [pbatch.jobStatus(jobid) for jobid in jobids] - done = sum(status in ('done', 'failed', 'undetermined') for status in jobs) - qbar.n = sum(status == 'queued_active' for status in jobs) - rbar.n = sum(status == 'running' for status in jobs) - qbar.refresh(), rbar.refresh() - time.sleep(2) - qbar.close(), rbar.close() - - failedjobs = [jobid for jobid in jobids if pbatch.jobStatus(jobid)=='failed'] - if failedjobs: - LOGGER.error(f"{len(failedjobs)} HPC jobs failed to run:\n{failedjobs}\nThis may well be due to an underspecified `--cluster` input option (e.g. not enough memory)") - - # Give NAS systems some time to fully synchronize - for t in tqdm(range(wait*100), desc='synchronizing', leave=False, bar_format='{l_bar}{bar}| [{elapsed}]'): - time.sleep(.01) - - def list_executables(show: bool=False) -> list: """ :param show: Print the installed console scripts if True