diff --git a/python/TestHarness/OutputInterface.py b/python/TestHarness/OutputInterface.py index b6e54a7b2548..5f4613d9e4a0 100644 --- a/python/TestHarness/OutputInterface.py +++ b/python/TestHarness/OutputInterface.py @@ -7,16 +7,20 @@ #* Licensed under LGPL 2.1, please see LICENSE for details #* https://www.gnu.org/licenses/lgpl-2.1.html +import contextlib import os import json +import threading class OutputInterface: """ Helper class for writing output to either memory or a file """ - def __init__(self): + def __init__(self, locking=False): # The in-memory output, if any self.output = '' # The path to write output to, if any self.separate_output_path = None + # Thread lock for the output (if enabled) + self.output_lock = threading.Lock() if locking else None class BadOutputException(Exception): """ Exception that is thrown when bad output is detected """ @@ -25,14 +29,24 @@ def __init__(self, errors): message = 'Bad output detected: ' + ', '.join(errors) super().__init__(message) + def getOutputLock(self): + """ + Gets the thread lock for this system, if any. + + This is safe to use in a with statement even if locking + is not enabled. + """ + return self.output_lock if self.output_lock else contextlib.suppress() + def setSeparateOutputPath(self, separate_output_path): """ Sets the path for writing output to """ self.separate_output_path = separate_output_path # If we have any dangling output, write it - if self.output: - self.setOutput(self.output) - self.output = '' + with self.getOutputLock(): + if self.output: + self.setOutput(self.output) + self.output = '' def getSeparateOutputFilePath(self) -> str: """ Gets the path that this output is writing to, if any """ @@ -40,9 +54,10 @@ def getSeparateOutputFilePath(self) -> str: def hasOutput(self) -> bool: """ Whether or not this object has any content written """ - if self.separate_output_path: - return os.path.isfile(self.separate_output_path) - return len(self.output) > 0 + with self.getOutputLock(): + if self.separate_output_path: + return os.path.isfile(self.separate_output_path) + return len(self.output) > 0 def getOutput(self, sanitize: bool = True) -> str: """ @@ -56,46 +71,48 @@ def getOutput(self, sanitize: bool = True) -> str: on before the output is used. """ output = '' - if self.separate_output_path: - try: - output = open(self.separate_output_path, 'r').read() - except FileNotFoundError: - pass - else: - output = self.output - - if sanitize: - _, sanitize_failures = self._sanitizeOutput(output) - if sanitize_failures: - raise self.BadOutputException(sanitize_failures) - - return output + with self.getOutputLock(): + if self.separate_output_path: + try: + output = open(self.separate_output_path, 'r').read() + except FileNotFoundError: + pass + else: + output = self.output + + if sanitize: + _, sanitize_failures = self._sanitizeOutput(output) + if sanitize_failures: + raise self.BadOutputException(sanitize_failures) + + return output def setOutput(self, output: str): """ Sets the output given some output string """ - if not output: - return - if self.separate_output_path: - open(self.separate_output_path, 'w').write(output) - else: - self.output = output + with self.getOutputLock(): + if self.separate_output_path: + open(self.separate_output_path, 'w').write(output) + else: + self.output = output def appendOutput(self, output: str): """ Appends to the output """ if not output: return - if self.separate_output_path: - open(self.separate_output_path, 'a').write(output) - else: - self.output += output + with self.getOutputLock(): + if self.separate_output_path: + open(self.separate_output_path, 'a').write(output) + else: + self.output += output def clearOutput(self): """ Clears the output """ - if self.separate_output_path: - if os.path.exists(self.separate_output_path): - os.remove(self.separate_output_path) - else: - self.output = '' + with self.getOutputLock(): + if self.separate_output_path: + if os.path.exists(self.separate_output_path): + os.remove(self.separate_output_path) + else: + self.output = '' @staticmethod def _sanitizeOutput(output): diff --git a/python/TestHarness/TestHarness.py b/python/TestHarness/TestHarness.py index a4f010664964..27b3963aa7da 100644 --- a/python/TestHarness/TestHarness.py +++ b/python/TestHarness/TestHarness.py @@ -729,6 +729,10 @@ def cleanup(self): summary += fatal_error print(util.colorText(summary, "", html=True, colored=self.options.colored, code=self.options.code)) else: + # Fill summary footer + summary = '' + + # Number of tests, their status, and timing num_nonzero_timing = sum(1 if float(tup[0].getTiming()) > 0 else 0 for tup in self.test_table) if num_nonzero_timing > 0: timing_max = max(float(tup[0].getTiming()) for tup in self.test_table) @@ -739,6 +743,13 @@ def cleanup(self): summary = f'Ran {self.num_passed + self.num_failed} tests in {stats["time_total"]:.1f} seconds.' summary += f' Average test time {timing_avg:.1f} seconds,' summary += f' maximum test time {timing_max:.1f} seconds.' + # Memory usage, if available + max_memory = [tup[0].getMaxMemoryUsage() for tup in self.test_table if tup[0].getMaxMemoryUsage() not in [None, 0]] + if max_memory: + max_max_memory = max(max_memory) + avg_max_memory = sum(max_memory) / len(max_memory) + summary += f'\nEstimated maximum test memory usage maximum {util.humanMemory(max_max_memory)}, ' + summary += f'average {util.humanMemory(avg_max_memory)}.' print(summary) # Get additional results from the scheduler @@ -809,6 +820,25 @@ def cleanup(self): print(str(group[0]).ljust((self.options.term_cols - (len(group[1]) + 4)), ' '), f'[{group[1]}s]') print('\n') + if self.options.largest_jobs: + valued_tups = [tup for tup in self.test_table if tup[0].getMaxMemoryUsage() not in [None, 0]] + sorted_tups = sorted(valued_tups, key=lambda tup: tup[0].getMaxMemoryUsage(), reverse=True) + + print('\n%d largest jobs:' % self.options.largest_jobs) + print(('-' * (self.options.term_cols))) + + # Copy the current options and force timing to be true so that + # we get memory when we call formatResult() below + options_with_timing = copy.deepcopy(self.options) + options_with_timing.timing = True + + for tup in sorted_tups[0:self.options.largest_jobs]: + job = tup[0] + if not job.isSkip() and job.getMaxMemoryUsage() > 0: + print(util.formatResult(job, options_with_timing, caveats=True)) + if len(sorted_tups) == 0: + print('No jobs were completed or no jobs contained resource usage.') + all_jobs = self.scheduler.retrieveJobs() # Gather and print the jobs with race conditions after the jobs are finished @@ -1065,6 +1095,7 @@ def parseCLArgs(self, argv): parser.add_argument('-l', '--load-average', action='store', type=float, dest='load', help='Do not run additional tests if the load average is at least LOAD') parser.add_argument('-t', '--timing', action='store_true', dest='timing', help='Report Timing information for passing tests') parser.add_argument('--longest-jobs', action='store', dest='longest_jobs', type=int, default=0, help='Print the longest running jobs upon completion') + parser.add_argument('--largest-jobs', action='store', dest='largest_jobs', type=int, default=0, help='Print the largest (by max memory usage) jobs upon completion') parser.add_argument('-s', '--scale', action='store_true', dest='scaling', help='Scale problems that have SCALE_REFINE set') parser.add_argument('-i', nargs=1, action='store', type=str, dest='input_file_name', help='The test specification file to look for (default: tests)') parser.add_argument('--libmesh_dir', nargs=1, action='store', type=str, dest='libmesh_dir', help='Currently only needed for bitten code coverage') @@ -1131,6 +1162,10 @@ def parseCLArgs(self, argv): hpcgroup.add_argument('--hpc-no-hold', nargs=1, action='store', type=bool, default=False, dest='hpc_no_hold', help='Do not pre-create hpc jobs to be held') hpcgroup.add_argument('--pbs-queue', nargs=1, action='store', dest='hpc_queue', type=str, metavar='', help='Submit jobs to the specified queue') + # Options for resource limits + resourcegroup = parser.add_argument_group('Resource Options', 'Options for controlling resource limits') + resourcegroup.add_argument('--max-memory', dest='max_memory', action='store', type=str, default=None, help='Set maximum memory allowed per slot (default none, ex: 100MB)') + # Try to find the terminal size if we can # Try/except here because the terminal size could fail w/o a display term_cols = None @@ -1142,7 +1177,7 @@ def parseCLArgs(self, argv): # Optionally load in the environment controlled values term_cols = int(os.getenv('MOOSE_TERM_COLS', term_cols)) - term_format = os.getenv('MOOSE_TERM_FORMAT', 'njcst') + term_format = os.getenv('MOOSE_TERM_FORMAT', 'njcsmt') # Terminal options termgroup = parser.add_argument_group('Terminal Options', 'Options for controlling the formatting of terminal output') @@ -1234,6 +1269,22 @@ def checkAndUpdateCLArgs(self): if not self.options.input_file_name: self.options.input_file_name = 'tests' + # Resource usage collection + has_psutil = True + try: + import psutil + except: + has_psutil = False + # Set max_memory in bytes if set + if self.options.max_memory is not None: + try: + self.options.max_memory = util.convertMemoryToBytes(self.options.max_memory) + except: + print(f'ERROR: Failed to parse --max-memory="{self.options.max_memory}"') + sys.exit(1) + if not has_psutil: + print(f'ERROR: --max-memory cannot be used because the python module "psutil" is not available') + def postRun(self, specs, timing): return diff --git a/python/TestHarness/runners/Runner.py b/python/TestHarness/runners/Runner.py index 89b0e36d0f32..a5a4e005319f 100644 --- a/python/TestHarness/runners/Runner.py +++ b/python/TestHarness/runners/Runner.py @@ -7,10 +7,14 @@ #* Licensed under LGPL 2.1, please see LICENSE for details #* https://www.gnu.org/licenses/lgpl-2.1.html -import os, json +import os, threading, time, traceback +from collections import namedtuple from TestHarness import OutputInterface, util class Runner(OutputInterface): + # Helper struct for storing information about sampled resource usage + ResourceUsage = namedtuple('ResourceUsage', 'time mem_bytes') + """ Base class for running a process via a command. @@ -19,7 +23,8 @@ class Runner(OutputInterface): or externally (i.e., PBS, slurm, etc on HPC) """ def __init__(self, job, options): - OutputInterface.__init__(self) + # Output is locking so that the resource thread can concurrently write + OutputInterface.__init__(self, locking=True) # The job that this runner is for self.job = job @@ -109,3 +114,47 @@ def readOutput(self, stream): if output and output[-1] != '\n': output += '\n' return output + + def getResourceUsage(self): + """ + To be overridden by derived Runners that support resource usage collection + + Should return a list of ResourceUsage objects + """ + return None + + def getMaxMemoryUsage(self): + """ + Get the max memory usage (in bytes) of the spawned process if it was + able to be captured + """ + resource_usage = self.getResourceUsage() + if not resource_usage: # runner doesn't support it + return None + max_mem = 0 + for usage in resource_usage: + max_mem = max(max_mem, usage.mem_bytes) + return max_mem + + def checkResourceUsage(self, usage): + """ + Checks the resource usage to ensure that it does not go over + limits. Will kill the job if so. + + Usage should be a ResourceUsage object + """ + # Scale all of the requirements on a per-slot basis + slots = self.job.getSlots() + + # Check for memory overrun if max is set + if self.options.max_memory is not None: + allowed_mem = slots * self.options.max_memory + if usage.mem_bytes > allowed_mem: + usage_human = util.humanMemory(usage.mem_bytes) + allowed_human = util.humanMemory(allowed_mem) + output = util.outputHeader('Process killed due to resource oversubscription') + output += f'Memory usage {usage_human} exceeded {allowed_human}' + self.appendOutput(output) + self.job.setStatus(self.job.error, 'EXCEEDED MEM') + self.kill() + return diff --git a/python/TestHarness/runners/SubprocessRunner.py b/python/TestHarness/runners/SubprocessRunner.py index f4a4f9bc8d32..cd2a7e85a0c5 100644 --- a/python/TestHarness/runners/SubprocessRunner.py +++ b/python/TestHarness/runners/SubprocessRunner.py @@ -7,11 +7,10 @@ #* Licensed under LGPL 2.1, please see LICENSE for details #* https://www.gnu.org/licenses/lgpl-2.1.html -import os, platform, subprocess, shlex, time +import copy, os, platform, subprocess, shlex, time, threading from tempfile import SpooledTemporaryFile from signal import SIGTERM from TestHarness.runners.Runner import Runner -from TestHarness import util class SubprocessRunner(Runner): """ @@ -27,6 +26,11 @@ def __init__(self, job, options): # The underlying subprocess self.process = None + # List of resource usage over time, if enabled + self.resource_usage = None + # Lock for accessing self.resource_usage + self.resource_usage_lock = threading.Lock() + def spawn(self, timer): tester = self.job.getTester() use_shell = tester.specs["use_shell"] @@ -73,15 +77,56 @@ def spawn(self, timer): timer.start('runner_run') def wait(self, timer): - self.process.wait() + # If psutil is available, track resources over time for this job. + # This replaces the traditional self.process.wait() call with + # a loop that polls for resources in an interval + try: + import psutil + except ModuleNotFoundError: + pass + else: + poll_time = 0.1 # sec + psutil_process = psutil.Process(self.process.pid) + self.resource_usage = [] + start_poll_time = time.time() + last_poll_time = None + while True: + if last_poll_time is not None: + sleep_time = poll_time - (time.time() - last_poll_time) + if sleep_time > 0: + time.sleep(sleep_time) + last_poll_time = time.time() + + # Get RSS memory usage for the top level process + # and all of its children + try: + total_memory = psutil_process.memory_info().rss + children = psutil_process.children(recursive=True) + except psutil.NoSuchProcess: + break + for child in children: + try: + total_memory += child.memory_info().rss + except psutil.NoSuchProcess: + pass + + # Store the usage and make sure that the usage + # doesn't oversubscribe any requirements + usage = self.ResourceUsage(time=last_poll_time - start_poll_time, + mem_bytes=total_memory) + self.checkResourceUsage(usage) + with self.resource_usage_lock: + self.resource_usage.append(usage) + + # Process has ended + if self.process.poll() is not None: + break + self.process.wait() timer.stop('runner_run') self.exit_code = self.process.poll() - - # This should have been cleared before the job started - if self.getRunOutput().hasOutput(): - raise Exception('Runner run output was not cleared') + self.process = None # Load combined output for file in [self.outfile, self.errfile]: @@ -99,6 +144,12 @@ def wait(self, timer): self.getRunOutput().appendOutput(output) + def getResourceUsage(self): + with self.resource_usage_lock: + if self.resource_usage is None: + return None + return copy.deepcopy(self.resource_usage) + def kill(self): if self.process is not None: try: diff --git a/python/TestHarness/schedulers/Job.py b/python/TestHarness/schedulers/Job.py index b0096eff259f..2caddc734a67 100644 --- a/python/TestHarness/schedulers/Job.py +++ b/python/TestHarness/schedulers/Job.py @@ -676,6 +676,12 @@ def getTiming(self): return self.timer.totalTime() return 0.0 + def getMaxMemoryUsage(self): + """ Return max memory usage of the runner process, if available """ + if self._runner is not None: + return self._runner.getMaxMemoryUsage() + return None + def getStatus(self): return self.job_status.getStatus() diff --git a/python/TestHarness/util.py b/python/TestHarness/util.py index 117a066cba14..7206f0bff1cc 100644 --- a/python/TestHarness/util.py +++ b/python/TestHarness/util.py @@ -307,6 +307,23 @@ def formatResult(job, options, result='', color=True, **kwargs): f_time = '[' + '{0: <6}'.format('%0.*fs' % (precision, actual)) + ']' formatCase(f_key, (f_time, None), formatted_results) + if str(f_key).lower() == 'm' and options.timing: + value = 0 + suffix = 'MB' + if not job.isSkip(): + max_mem = job.getMaxMemoryUsage() + if max_mem is None or max_mem == 0: + value = None + else: + value, suffix = humanMemory(max_mem, split=True) + if isinstance(value, (float, int)): + int_len = len(str(int(value))) + precision = min(3, max(0,(4-int_len))) + f_max_mem = f'[{value:<5.{precision}f}{suffix}]' + else: + f_max_mem = f'[?????{suffix}]' + formatCase(f_key, (f_max_mem, None), formatted_results) + # Decorate Caveats if job.getCaveats() and caveat_index is not None and 'caveats' in kwargs and kwargs['caveats']: caveats = ','.join(job.getCaveats()) @@ -890,3 +907,40 @@ def outputHeader(header, ending=True): begin_sep = '#' * 80 end_sep = f'{begin_sep}\n' if ending else '' return f'{begin_sep}\n{header}\n{end_sep}' + +# Conversions for memory sizes +byte_conversions = {'B': 1, + 'KB': 1024, + 'MB':1024**2, + 'GB':1024**3, + 'TB':1024**4} + +def convertMemoryToBytes(size_str: str) -> int: + """ + Converts the given size string (100B, 100MB, etc) + into an integer number of bytes + """ + search = re.fullmatch(r'(\d+(?:.\d+)?)([A-Z]+)', size_str) + if search is None: + raise ValueError(f'Failed to parse memory size from "{size_str}"') + value = search.group(1) + unit = search.group(2) + if unit not in byte_conversions: + raise ValueError(f'Unknown memory unit "{unit}"') + return float(value) * byte_conversions[unit] + +def humanMemory(bytes: int, digits=2, split=False) -> str: + """ + Convert the given size in bytes to a human readable memory value + + The split option returns the value and unit separately. + """ + value = f'{bytes}B' + for unit, conversion in byte_conversions.items(): + scaled_value = float(bytes) / float(conversion) + value = f'{scaled_value:.{digits}f}{unit}' + if scaled_value < 999: + break + if split: + return scaled_value, unit + return value