Skip to content

Commit

Permalink
Revert "Subclassed logging.Logger to add the bcdebug, verbose, and su…
Browse files Browse the repository at this point in the history
…ccess methods directly to the logger instance"

This reverts commit a8be713.
  • Loading branch information
marcelzwiers committed Oct 25, 2024
1 parent b273603 commit 46b652c
Showing 1 changed file with 88 additions and 95 deletions.
183 changes: 88 additions & 95 deletions bidscoin/bcoin.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,7 @@
yaml = YAML()
yaml.representer.ignore_aliases = lambda *data: True # Expand aliases (https://stackoverflow.com/questions/58091449/disabling-alias-for-yaml-file-in-python)

# Define custom logging levels
BCDEBUG, BCDEBUG_LEVEL = 'BCDEBUG', 11 # NB: using the standard debug mode will generate may debug messages from imports
VERBOSE, VERBOSE_LEVEL = 'VERBOSE', 15
SUCCESS, SUCCESS_LEVEL = 'SUCCESS', 25
LOGGER = logging.getLogger(__name__)


class TqdmUpTo(tqdm):
Expand All @@ -53,39 +50,76 @@ def update_to(self, b=1, bsize=1, tsize=None):
self.update(b * bsize - self.n) # will also set self.n = b * bsize


class CustomLogger(logging.Logger):
"""Extend the Logger class to add custom methods for the new levels"""
def drmaa_nativespec(specs: str, session) -> str:
"""
Converts (CLI default) native Torque walltime and memory specifications to the DRMAA implementation (currently only Slurm is supported)
def bcdebug(self, message, *args, **kwargs):
"""Custom BIDSCOIN DEBUG messages"""
if self.isEnabledFor(BCDEBUG_LEVEL):
self._log(BCDEBUG_LEVEL, message, args, **kwargs)
:param specs: Native Torque walltime and memory specifications, e.g. '-l walltime=00:10:00,mem=2gb'
:param session: The DRMAA session
:return: The converted native specifications
"""

def verbose(self, message, *args, **kwargs):
"""Custom BIDSCOIN VERBOSE messages"""
if self.isEnabledFor(VERBOSE_LEVEL):
self._log(VERBOSE_LEVEL, message, args, **kwargs)
jobmanager: str = session.drmaaImplementation

if '-l ' in specs and 'pbs' not in jobmanager.lower():

def success(self, message, *args, **kwargs):
"""Custom BIDSCOIN SUCCESS messages"""
if self.isEnabledFor(SUCCESS_LEVEL):
self._log(SUCCESS_LEVEL, message, args, **kwargs)
if 'slurm' in jobmanager.lower():
specs = (specs.replace('-l ', '')
.replace(',', ' ')
.replace('walltime', '--time')
.replace('mem', '--mem')
.replace('gb','000'))
else:
LOGGER.warning(f"Default `--cluster` native specifications are not (yet) provided for {jobmanager}. Please add them to your command if you get DRMAA errors")
specs = ''

return specs.strip()

# Get a logger from the custom logger class
logging.setLoggerClass(CustomLogger)
LOGGER = logging.getLogger(__name__)

def synchronize(pbatch, jobids: list, wait: int=15):
"""
Shows tqdm progress bars for queued and running DRMAA jobs. Waits until all jobs have finished +
some extra wait time to give NAS systems the opportunity to fully synchronize
:param pbatch: The DRMAA session
:param jobids: The job ids
:param wait: The extra wait time for the NAS
:return:
"""

with logging_redirect_tqdm():

qbar = tqdm(total=len(jobids), desc='Queued ', unit='job', leave=False, bar_format='{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}]')
rbar = tqdm(total=len(jobids), desc='Running', unit='job', leave=False, bar_format='{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}]', colour='green')
done = 0
while done < len(jobids):
jobs = [pbatch.jobStatus(jobid) for jobid in jobids]
done = sum(status in ('done', 'failed', 'undetermined') for status in jobs)
qbar.n = sum(status == 'queued_active' for status in jobs)
rbar.n = sum(status == 'running' for status in jobs)
qbar.refresh(), rbar.refresh()
time.sleep(2)
qbar.close(), rbar.close()

failedjobs = [jobid for jobid in jobids if pbatch.jobStatus(jobid)=='failed']
if failedjobs:
LOGGER.error(f"{len(failedjobs)} HPC jobs failed to run:\n{failedjobs}\nThis may well be due to an underspecified `--cluster` input option (e.g. not enough memory)")

# Give NAS systems some time to fully synchronize
for t in tqdm(range(wait*100), desc='synchronizing', leave=False, bar_format='{l_bar}{bar}| [{elapsed}]'):
time.sleep(.01)


def setup_logging(logfile: Path=Path()):
"""
Set up the logging framework:
1) Add custom logging levels: 'bcdebug', 'verbose', and 'success'.
2) Add a console stream handler for generating terminal output.
3) Optionally add file handlers for normal log and warning/error log if logfile is provided.
1) Add a 'bcdebug', 'verbose' and a 'success' logging level
2) Add a console streamhandler
3) If logfile then add a normal log and a warning/error filehandler
:param logfile: Path to the logfile. If none, logging is console-only
"""
:param logfile: Name of the logfile
:return:
"""

# Set the default formats
if DEBUG:
Expand All @@ -96,17 +130,36 @@ def setup_logging(logfile: Path=Path()):
cfmt = '%(levelname)s | %(message)s'
datefmt = '%Y-%m-%d %H:%M:%S'

# Add custom log levels to logging
logging.addLevelName(BCDEBUG_LEVEL, BCDEBUG)
logging.addLevelName(VERBOSE_LEVEL, VERBOSE)
logging.addLevelName(SUCCESS_LEVEL, SUCCESS)

# Get the root logger and set the appropriate level
# Add a BIDScoin debug logging level = 11 (NB: using the standard debug mode will generate may debug messages from imports)
logging.BCDEBUG = 11
logging.addLevelName(logging.BCDEBUG, 'BCDEBUG')
logging.__all__ += ['BCDEBUG'] if 'BCDEBUG' not in logging.__all__ else []
def bcdebug(self, message, *args, **kws):
if self.isEnabledFor(logging.BCDEBUG): self._log(logging.BCDEBUG, message, args, **kws)
logging.Logger.bcdebug = bcdebug

# Add a verbose logging level = 15
logging.VERBOSE = 15
logging.addLevelName(logging.VERBOSE, 'VERBOSE')
logging.__all__ += ['VERBOSE'] if 'VERBOSE' not in logging.__all__ else []
def verbose(self, message, *args, **kws):
if self.isEnabledFor(logging.VERBOSE): self._log(logging.VERBOSE, message, args, **kws)
logging.Logger.verbose = verbose

# Add a success logging level = 25
logging.SUCCESS = 25
logging.addLevelName(logging.SUCCESS, 'SUCCESS')
logging.__all__ += ['SUCCESS'] if 'SUCCESS' not in logging.__all__ else []
def success(self, message, *args, **kws):
if self.isEnabledFor(logging.SUCCESS): self._log(logging.SUCCESS, message, args, **kws)
logging.Logger.success = success

# Set the root logging level
logger = logging.getLogger()
logger.setLevel(BCDEBUG_LEVEL if DEBUG else VERBOSE_LEVEL)
logger.setLevel('BCDEBUG' if DEBUG else 'VERBOSE')

# Add the console streamhandler and bring some color to those boring logs! :-)
coloredlogs.install(level=BCDEBUG if DEBUG else VERBOSE if not logfile.name else 'INFO', fmt=cfmt, datefmt=datefmt) # NB: Using tqdm sets the streamhandler level to 0, see: https://github.com/tqdm/tqdm/pull/1235
coloredlogs.install(level='BCDEBUG' if DEBUG else 'VERBOSE' if not logfile.name else 'INFO', fmt=cfmt, datefmt=datefmt) # NB: Using tqdm sets the streamhandler level to 0, see: https://github.com/tqdm/tqdm/pull/1235
coloredlogs.DEFAULT_LEVEL_STYLES['verbose']['color'] = 245 # = Gray

if logfile.name:
Expand All @@ -115,7 +168,7 @@ def setup_logging(logfile: Path=Path()):
logfile.parent.mkdir(parents=True, exist_ok=True) # Create the log dir if it does not exist
formatter = logging.Formatter(fmt=fmt, datefmt=datefmt)
loghandler = logging.FileHandler(logfile)
loghandler.setLevel(BCDEBUG)
loghandler.setLevel('BCDEBUG')
loghandler.setFormatter(formatter)
loghandler.set_name('loghandler')
logger.addHandler(loghandler)
Expand Down Expand Up @@ -166,66 +219,6 @@ def reporterrors() -> str:
return errors


def drmaa_nativespec(specs: str, session) -> str:
"""
Converts (CLI default) native Torque walltime and memory specifications to the DRMAA implementation (currently only Slurm is supported)
:param specs: Native Torque walltime and memory specifications, e.g. '-l walltime=00:10:00,mem=2gb'
:param session: The DRMAA session
:return: The converted native specifications
"""

jobmanager: str = session.drmaaImplementation

if '-l ' in specs and 'pbs' not in jobmanager.lower():

if 'slurm' in jobmanager.lower():
specs = (specs.replace('-l ', '')
.replace(',', ' ')
.replace('walltime', '--time')
.replace('mem', '--mem')
.replace('gb','000'))
else:
LOGGER.warning(f"Default `--cluster` native specifications are not (yet) provided for {jobmanager}. Please add them to your command if you get DRMAA errors")
specs = ''

return specs.strip()


def synchronize(pbatch, jobids: list, wait: int=15):
"""
Shows tqdm progress bars for queued and running DRMAA jobs. Waits until all jobs have finished +
some extra wait time to give NAS systems the opportunity to fully synchronize
:param pbatch: The DRMAA session
:param jobids: The job ids
:param wait: The extra wait time for the NAS
:return:
"""

with logging_redirect_tqdm():

qbar = tqdm(total=len(jobids), desc='Queued ', unit='job', leave=False, bar_format='{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}]')
rbar = tqdm(total=len(jobids), desc='Running', unit='job', leave=False, bar_format='{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}]', colour='green')
done = 0
while done < len(jobids):
jobs = [pbatch.jobStatus(jobid) for jobid in jobids]
done = sum(status in ('done', 'failed', 'undetermined') for status in jobs)
qbar.n = sum(status == 'queued_active' for status in jobs)
rbar.n = sum(status == 'running' for status in jobs)
qbar.refresh(), rbar.refresh()
time.sleep(2)
qbar.close(), rbar.close()

failedjobs = [jobid for jobid in jobids if pbatch.jobStatus(jobid)=='failed']
if failedjobs:
LOGGER.error(f"{len(failedjobs)} HPC jobs failed to run:\n{failedjobs}\nThis may well be due to an underspecified `--cluster` input option (e.g. not enough memory)")

# Give NAS systems some time to fully synchronize
for t in tqdm(range(wait*100), desc='synchronizing', leave=False, bar_format='{l_bar}{bar}| [{elapsed}]'):
time.sleep(.01)


def list_executables(show: bool=False) -> list:
"""
:param show: Print the installed console scripts if True
Expand Down

0 comments on commit 46b652c

Please sign in to comment.