diff --git a/.gitignore b/.gitignore index 7f65b20..2e3b08d 100644 --- a/.gitignore +++ b/.gitignore @@ -59,6 +59,7 @@ nosetests.xml coverage.xml *.cover .hypothesis/ +tmp/ # Translations *.mo @@ -113,3 +114,6 @@ ENV/ # mypy .mypy_cache/ + +# Development scripts/nbs +develop.ipynb diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..cf3585f --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,37 @@ +# Changelog + +All notable changes to this project will be documented in this file. + +The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), +and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). + +## [Unreleased] + +## [0.4.2] - 2024.01.26 + +### Fixed + +- liftoff-procs in linux machines + +### Removed + +- functional tests as not all were passing in linux machines (temporary solution) + +## [0.4.0] - 2023.11.16 + +### Added + +- Code for WIndows compatibility in liftoff: launch_run +- Unit tests for liftoff, liftoff-prepare, liftoff-status, liftoff-procs, liftoff-abort +- New parameter CLI flag --skip-confirmation for abort + +### Changed + +- String manipulation for path replaced with os.path.join +- Some linux terminal specific functions replaced with cross platform python equivalents + +### Fixed + +- Some string formatting issues +- Path building in 'prepare.py' for cross compatibility with Windows +- Some typos diff --git a/TODO.md b/TODO.md index c0eed8a..6a16358 100644 --- a/TODO.md +++ b/TODO.md @@ -13,4 +13,4 @@ ## 3. liftoff - - [ ] Implement server / client \ No newline at end of file + - [ ] Implement server / client diff --git a/liftoff/__init__.py b/liftoff/__init__.py index e7d1862..2b3071d 100644 --- a/liftoff/__init__.py +++ b/liftoff/__init__.py @@ -2,6 +2,7 @@ """ import os.path +from pathlib import Path import yaml from .common.dict_utils import dict_to_namespace from .common.options_parser import OptionParser @@ -27,6 +28,7 @@ def parse_opts(): opts = dict_to_namespace(config_data) if not hasattr(opts, "out_dir"): raise RuntimeError("No out_dir in config file.") - if not os.path.isdir(opts.out_dir): # pylint: disable=no-member - raise RuntimeError("Out dir does not exist.") + out_dir_path = Path(opts.out_dir) + if not out_dir_path.is_dir(): # pylint: disable=no-member + raise RuntimeError(f"Out {opts.out_dir} dir does not exist.") return opts diff --git a/liftoff/abort.py b/liftoff/abort.py index 3b5b235..bd1c095 100644 --- a/liftoff/abort.py +++ b/liftoff/abort.py @@ -4,6 +4,7 @@ from argparse import Namespace import os import subprocess +import psutil from termcolor import colored as clr from .common.experiment_info import is_experiment @@ -11,127 +12,123 @@ def parse_options() -> Namespace: - """ Parse command line arguments and liftoff configuration. - """ + """Parse command line arguments and liftoff configuration.""" - opt_parser = OptionParser("liftoff-abort", ["pid", "results_path"]) + opt_parser = OptionParser("liftoff-abort", ["pid", "results_path", "skip_confirmation"]) return opt_parser.parse_args() def ask_user(): - answer = str(input("\nAre you sure you want to kill them? (y/n)")).lower().strip() + answer = input("\nAre you sure you want to kill them? (Y/n) ").lower().strip() - if answer and answer[0] == "y": + # If the answer is empty or 'yes' (or just 'y'), return True + if answer == "" or answer.startswith("y"): return True - elif answer and answer[0] == "n": - return False - return ask_user() + # Otherwise, default to False + return False def running_children(session_id): - """ Gets running processes with a specific session-id. - TODO: check more details. + """Gets running processes with a specific session-id. + TODO: check more details. """ - escaped_sid = session_id.replace("-", r"\-") - cmd = ( - f"for p in " - f"`pgrep -f '\\-\\-session\\-id {escaped_sid:s}'`" - f"; do COLUMNS=0 ps -p $p -o pid,ppid,cmd h; done" - ) - result = subprocess.run( - cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True - ) - if result.stderr: - raise Exception(result.stderr.decode("utf-8")) - + """ Gets running processes with a specific session-id.""" + session_id_flag = f"--session-id {session_id}" pids = [] - for line1 in result.stdout.decode("utf-8").split("\n"): - if not line1: + + for proc in psutil.process_iter(["pid", "ppid", "cmdline"]): + try: + cmdline = proc.info["cmdline"] + # Ensure cmdline is a list before joining + if isinstance(cmdline, list): + cmdline_str = " ".join(cmdline) + else: + continue # Skip this process if cmdline is not a list + + # Check if the session id flag is in the command line arguments + if session_id_flag in cmdline_str: + # Check if the process has not crashed + if not any(".__crash" in arg for arg in cmdline): + pids.append(proc.info["pid"]) + except (psutil.NoSuchProcess, psutil.AccessDenied): continue - pid, fake_ppid, *other = line1.split() - pid, fake_ppid = int(pid), int(fake_ppid) - if fake_ppid != 1: - good = not any(".__crash" in p for p in other) - if good: - pids.append(pid) return pids -def abort_experiment(ppid, results_path): - """ Here we search for running pids. - """ - - cmd = f"COLUMNS=0 ps -o cmd= -f {ppid:d}" - result = subprocess.run( - cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True - ) - if result.stderr: - raise Exception(result.stderr.decode("utf-8")) - - (lcmd,) = result.stdout.decode("utf-8").strip().split("\n") +def abort_experiment(ppid, results_path, skip_confirmation=False): + """Terminate a running experiment and its subprocesses.""" + try: + parent_process = psutil.Process(ppid) + except psutil.NoSuchProcess: + print("No process found with PID", ppid) + return + # Check if the parent process is part of an experiment found = False - for arg in lcmd.split(): + for arg in parent_process.cmdline(): if is_experiment(arg): found = True break if not found: - print(lcmd) + print(f"Found pid {ppid}, but it's not the main experiment:") + print(" ".join(parent_process.cmdline())) return - experiment_name = None + # Find the experiment name and session ID + experiment_name, session_id = None, None found = False with os.scandir(results_path) as fit: for entry in fit: if not is_experiment(entry.path): continue experiment_name = entry.name - with os.scandir(entry.path) as fit2: - for entry2 in fit2: - if entry2.name.startswith(".__"): - with open(entry2.path) as hndlr: - try: - candidate_pid = int(hndlr.readline().strip()) - if candidate_pid == ppid: - found = True - session_id = entry2.name[3:] - break - except ValueError: - pass + for entry2 in os.scandir(entry.path): + if entry2.name.startswith(".__"): + with open(entry2.path) as hndlr: + try: + candidate_pid = int(hndlr.readline().strip()) + if candidate_pid == ppid: + found = True + session_id = entry2.name[3:] + break + except ValueError: + pass if found: break + if not found: print("Couldn't find the process you want to kill.") - print( - "Run", clr("liftoff-procs", attrs=["bold"]), "to see running liftoffs.", - ) - return - - pids = running_children(session_id) - nrunning = clr(f"{len(pids):d}", color="blue", attrs=["bold"]) - cppid = clr(f"{ppid:5d}", color="red", attrs=["bold"]) - name = clr(f"{experiment_name:s}::{session_id:s}", attrs=["bold"]) - - print(f"\nWill kill {nrunning:s} subprocesses from {name} ({cppid:s}).") - if not ask_user(): return + # Get the running child processes pids = running_children(session_id) + print( + f"\nWill kill {len(pids)} subprocesses from {experiment_name}::{session_id} (PID: {ppid})." + ) - cmd = f"kill {ppid:d} " + " ".join([str(p) for p in pids]) - - result = subprocess.run(cmd, stderr=subprocess.PIPE, shell=True) - if result.stderr: - raise Exception(result.stderr.decode("utf-8")) - + # We might want to skip the confirmation + # (when running the process from script) + if not skip_confirmation: + if not ask_user(): + return + + # Attempt to terminate the parent process and its children + try: + for pid in pids: + child_proc = psutil.Process(pid) + child_proc.terminate() # or child_proc.kill() for a forceful termination + parent_process.terminate() + except Exception as e: + print(f"Error terminating processes: {e}") + + # ( ͡° ͜ʖ ͡°) print("The eagle is down! Mission accomplished.") def abort(): - """ Main function. - """ + """Main function.""" opts = parse_options() - abort_experiment(opts.pid, opts.results_path) + abort_experiment(opts.pid, opts.results_path, opts.skip_confirmation) diff --git a/liftoff/common/local_info.py b/liftoff/common/local_info.py index 0b7d8db..35390b5 100644 --- a/liftoff/common/local_info.py +++ b/liftoff/common/local_info.py @@ -9,7 +9,8 @@ def version(): """Q: Is there a better way to have some unique source for the version? - A: Yes. + A: Yes. + Proposal: Use poetry instead of setup.py, will solve this too. """ from ..version import __version__ as version diff --git a/liftoff/common/options_parser.py b/liftoff/common/options_parser.py index 1358773..d932973 100644 --- a/liftoff/common/options_parser.py +++ b/liftoff/common/options_parser.py @@ -4,7 +4,7 @@ TODO: change to class methods to common methods if there is no need to call those functions outside an instance of OptionParser. """ - +import os from argparse import ArgumentParser, Namespace from typing import List import uuid @@ -22,8 +22,9 @@ def __init__(self, name, arguments: List[str]) -> None: self.arg_parser = ArgumentParser(name) self.arguments = [str(arg) for arg in arguments] - for arg in self.arguments: - getattr(self, f"_add_{arg:s}")() + if self.arguments: + for arg in self.arguments: + getattr(self, f"_add_{arg:s}")() def parse_args(self, args: List[str] = None, strict: bool = True) -> Namespace: """ Parses command-line arguments and completes options with values @@ -95,6 +96,14 @@ def _add_do(self) -> None: dest="do", help="Apply the actions (do not only simulate).", ) + + def _add_skip_confirmation(self) -> None: + self.arg_parser.add_argument( + "--skip-confirmation", + action="store_true", + dest="skip_confirmation", + help="Skip user confirmation before executing.", + ) def _add_crashed_only(self) -> None: self.arg_parser.add_argument( @@ -199,7 +208,7 @@ def _add_procs_no(self) -> None: def _add_results_path(self) -> None: default_value = self.liftoff_config.get("results_path") if default_value is None: - default_value = "./results" + default_value = os.path.join(".", "results") default_value = str(default_value) self.arg_parser.add_argument( "--results-path", diff --git a/liftoff/liftoff.py b/liftoff/liftoff.py index b4f686a..871120e 100644 --- a/liftoff/liftoff.py +++ b/liftoff/liftoff.py @@ -8,6 +8,9 @@ import sys import time import traceback +import psutil +import platform +import threading from argparse import Namespace from functools import partial from importlib import import_module @@ -37,7 +40,7 @@ def __init__(self, opts): elif len(opts.per_gpu) == 1: self.per_gpu = {g: int(opts.per_gpu[0]) for g in opts.gpus} else: - raise ValueError("Strage per_gpu values. {opts.per_gpu}") + raise ValueError(f"Strange per_gpu values. {opts.per_gpu}") else: self.per_gpu = None @@ -49,6 +52,7 @@ def process_commands(self, commands: List[str]): """Here we process some commands we got from god knows where that might change the way we want to allocate resources. """ + pass def free(self, gpu=None): """Here we inform that some process ended, maybe on a specific gpu.""" @@ -154,16 +158,31 @@ def parse_options() -> Namespace: def get_command_for_pid(pid: int) -> str: """Returns the command for a pid if that process exists.""" try: - result = subprocess.run( - f"ps -p {pid:d} -o cmd h", stdout=subprocess.PIPE, shell=True - ) - return result.stdout.decode("utf-8").strip() - except subprocess.CalledProcessError as _e: + if platform.system() == "Windows": + process = psutil.Process(pid) + return " ".join(process.cmdline()) + else: + result = subprocess.run( + f"ps -p {pid:d} -o cmd h", + stdout=subprocess.PIPE, + shell=True, + check=True, + ) + return result.stdout.decode("utf-8").strip() + except ( + subprocess.CalledProcessError, + psutil.NoSuchProcess, + psutil.AccessDenied, + psutil.ZombieProcess, + ): return "" def still_active(pid: int, cmd: str) -> bool: """Checks if a subprocess is still active.""" + if isinstance(cmd, list): + cmd = " ".join(cmd) # Convert list to string + os_cmd = get_command_for_pid(pid) return cmd in os_cmd @@ -183,69 +202,179 @@ def lock_file(lock_path: str, session_id: str) -> bool: return False +def run_experiment_in_subprocess_windows( + py_cmd, + env_vars, + start_path, + out_path, + err_path, + end_path, + crash_path, + pid_dict, + pid_event, +): + """ + Function to execute the experiment subprocess in a separate thread. + Used for Windows based systems to emulate the pre-existing Unix behaviour. + """ + proc = None + try: + with open(start_path, "w") as f: + f.write(str(int(time.time()))) + + with open(err_path, "w") as err_file, open(out_path, "w") as out_file: + proc = subprocess.Popen( + py_cmd, stdout=out_file, stderr=err_file, env=env_vars, shell=True + ) + + pid_dict["pid"] = proc.pid + pid_event.set() + + # Wait for the process to complete + proc.wait() + + # Write to the end file or crash file based on the process return code + if proc.returncode != 0: + with open(crash_path, "w") as f: + f.write(str(int(time.time()))) + print( + f"Process {proc.pid} terminated unexpectedly with return code {proc.returncode}" + ) + else: + with open(end_path, "w") as f: + f.write(str(int(time.time()))) + + except Exception as e: + print(f"An error occurred in run_experiment: {e}") + if proc and proc.poll() is None: + proc.terminate() # Safely terminate the subprocess if it's still running + if not pid_event.is_set(): + pid_dict['pid'] = None + pid_event.set() + + finally: + # This block ensures that the thread will terminate + # even if an exception occurs + if proc and proc.poll() is None: + proc.terminate() + + def launch_run( # pylint: disable=bad-continuation - run_path, py_script, session_id, gpu=None, do_nohup=True, optim=False, end_by=None + run_path, + py_script, + session_id, + gpu=None, + do_nohup=True, + optim=False, + end_by=None, ): """Here we launch a run from an experiment. This might be the most important function here. """ + # Common path setup err_path = os.path.join(run_path, "err") out_path = os.path.join(run_path, "out") - wrap_err_path = os.path.join(run_path, "nohup.err" if do_nohup else "sh.err") - wrap_out_path = os.path.join(run_path, "nohup.out" if do_nohup else "sh.out") cfg_path = os.path.join(run_path, "cfg.yaml") start_path = os.path.join(run_path, ".__start") end_path = os.path.join(run_path, ".__end") crash_path = os.path.join(run_path, ".__crash") - env_vars = "" + + flags = "-u -OO" if optim else "-u" with open(cfg_path) as handler: title = yaml.load(handler, Loader=yaml.SafeLoader)["title"] - if gpu is not None: - env_vars = f"CUDA_VISIBLE_DEVICES={gpu} {env_vars:s}" + # Determine the platform + platform_is_windows = platform.system() == "Windows" - if end_by is not None: - env_vars += f" ENDBY={end_by}" + # Unix-specific command setup + if platform_is_windows: + env_vars = os.environ.copy() - flags = "-u -OO" if optim else "-u" + if gpu is not None: + env_vars["CUDA_VISIBLE_DEVICES"] = str(gpu) + if end_by is not None: + env_vars["ENDBY"] = str(end_by) - py_cmd = f"python {flags} {py_script:s} {cfg_path:s} --session-id {session_id}" - - if do_nohup: - cmd = ( - f" date +%s 1> {start_path:s} 2>/dev/null &&" - f" nohup sh -c '{env_vars:s} {py_cmd:s}" - f" 2>{err_path:s} 1>{out_path:s}" - f" && date +%s > {end_path:s}" - f" || date +%s > {crash_path:s}'" - f" 1> {wrap_out_path} 2> {wrap_err_path}" - f" & echo $!" + py_cmd = ( + [sys.executable, flags] + + py_script.split() + + [cfg_path, "--session-id", session_id] ) + py_cmd_str = " ".join(map(str, py_cmd)) + + print(f"[{time.strftime(time.ctime())}] Command to be run:\n{py_cmd_str:s}") + sys.stdout.flush() + + pid_dict = {} + pid_event = threading.Event() + + # Launch the subprocess in a separate thread + experiment_thread = threading.Thread( + target=run_experiment_in_subprocess_windows, + args=( + py_cmd, + env_vars, + start_path, + out_path, + err_path, + end_path, + crash_path, + pid_dict, + pid_event, + ), + ) + experiment_thread.start() + + # Wait for the PID to be set + pid_event.wait() + + pid = pid_dict.get("pid") + else: - cmd = ( - f" date +%s 1> {start_path:s} 2>/dev/null &&" - f" sh -c '{env_vars:s} {py_cmd:s}" - f" 2>{err_path:s} 1>{out_path:s}" - f" && date +%s > {end_path:s}" - f" || date +%s > {crash_path:s}'" - f" 1>{wrap_out_path} 2>{wrap_err_path}" - f" & echo $!" + wrap_err_path = os.path.join(run_path, "nohup.err" if do_nohup else "sh.err") + wrap_out_path = os.path.join(run_path, "nohup.out" if do_nohup else "sh.out") + env_vars = "" + + if gpu is not None: + env_vars = f"CUDA_VISIBLE_DEVICES={gpu} {env_vars:s}" + if end_by is not None: + env_vars += f" ENDBY={end_by}" + + cmd_prefix = f" date +%s 1> {start_path:s} 2>/dev/null &&" + + py_cmd = f"python {flags} {py_script:s} {cfg_path:s} --session-id {session_id}" + main_cmd = ( + f"'{env_vars:s} {py_cmd:s}" + f" 2>{err_path:s} 1>{out_path:s} && date +%s > {end_path:s}" + f" || date +%s > {crash_path:s}' 1> {wrap_out_path} 2> {wrap_err_path} & echo $!" ) - print(f"[{time.strftime(time.ctime())}] Command to be run:\n{cmd:s}") - sys.stdout.flush() + # Construct the full command based on 'do_nohup' + if do_nohup: + cmd = f"{cmd_prefix} nohup sh -c {main_cmd}" + else: + cmd = f"{cmd_prefix} sh -c {main_cmd}" - proc = subprocess.Popen( - cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True - ) - (out, err) = proc.communicate() - err = err.decode("utf-8").strip() - if err: - print(f"[{time.strftime(time.ctime())}] Some error: {clr(err, 'red'):s}.") - pid = int(out.decode("utf-8").strip()) - print(f"[{time.strftime(time.ctime())}] New PID is {pid:d}.") + print(f"[{time.strftime(time.ctime())}] Command to be run:\n{cmd:s}") + sys.stdout.flush() + + print(f"[{time.strftime(time.ctime())}] Command to be run:\n{cmd:s}") + sys.stdout.flush() + + proc = subprocess.Popen( + cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True + ) + + (out, err) = proc.communicate() + err = err.decode("utf-8").strip() + if err: + print(f"[{time.strftime(time.ctime())}] Some error: {clr(err, 'red'):s}.") + pid = int(out.decode("utf-8").strip()) + + print(f"[{time.strftime(time.ctime())}] New PID is {pid}.") sys.stdout.flush() + return pid, gpu, title, py_cmd @@ -394,10 +523,8 @@ def launch_experiment(opts): def systime_to(timestamp_file_path: str) -> None: """Write current system time to a file.""" - cmd = f"date +%s 1> {timestamp_file_path:s}" - proc = subprocess.Popen(cmd, stderr=subprocess.PIPE, shell=True) - (_, err) = proc.communicate() - return err.decode("utf-8").strip() + with open(timestamp_file_path, "w") as file: + file.write(str(int(time.time()))) def wrapper(function: Callable[[Namespace], None], args: Namespace) -> None: @@ -495,6 +622,10 @@ def check_opts_integrity(opts): def launch() -> None: """Main function.""" opts = parse_options() + + if (not os.path.isdir(opts.config_path)) and (not os.path.isfile(opts.config_path)): + raise FileNotFoundError(f"Cannot find path: {opts.config_path}") + if is_experiment(opts.config_path): opts.experiment_path = opts.config_path check_opts_integrity(opts) diff --git a/liftoff/locker.py b/liftoff/locker.py index 83d2c77..a63246c 100644 --- a/liftoff/locker.py +++ b/liftoff/locker.py @@ -133,7 +133,7 @@ def change_experiment_lock_status(opts, unlock=False): if not opts.do: print( - "\nThis was just a simultation. Rerun with", + "\nThis was just a simulation. Rerun with", clr("--do", attrs=["bold"]), "to lock/unlock the experiment for real.", ) diff --git a/liftoff/prepare.py b/liftoff/prepare.py index abb33e9..e6cfa28 100644 --- a/liftoff/prepare.py +++ b/liftoff/prepare.py @@ -374,7 +374,7 @@ def prepare_experiment(opts): name = opts.name while True: timestamp = f"{datetime.now():{opts.timestamp_fmt:s}}" - full_name = f"{timestamp:s}_{name:s}/" + full_name = f"{timestamp:s}_{name:s}" experiment_path = os.path.join(opts.results_path, full_name) if not os.path.exists(experiment_path): break @@ -490,7 +490,7 @@ def prepare_experiment(opts): run_cfg["run_id"] = run_id run_cfg["cfg_id"] = crt_cfg_id run_cfg["title"] = title - dir_name = os.path.basename(opts.experiment_path.rstrip("/")) + dir_name = os.path.basename(os.path.normpath(opts.experiment_path)) run_cfg["full_title"] = f"{dir_name}_{title}" \ if title not in dir_name else dir_name @@ -530,7 +530,7 @@ def prepare_experiment(opts): if not opts.do: print( - "\nThis was just a simultation. Rerun with", + "\nThis was just a simulation. Rerun with", clr("--do", attrs=["bold"]), "to prepare experiment for real.", ) diff --git a/liftoff/proc_info.py b/liftoff/proc_info.py index a632431..b75d619 100644 --- a/liftoff/proc_info.py +++ b/liftoff/proc_info.py @@ -1,114 +1,190 @@ -""" Here we implement liftoff-procs and liftoff-abort +""" Here we implement liftoff-procs. """ from argparse import Namespace -import os.path -import subprocess +import os, sys +import psutil +import re from termcolor import colored as clr from .common.options_parser import OptionParser def parse_options() -> Namespace: - """ Parse command line arguments and liftoff configuration. - """ + """Parse command line arguments and liftoff configuration.""" opt_parser = OptionParser( - "liftoff-status", ["experiment", "all", "timestamp_fmt", "results_path", "do"], + "liftoff-procs", + ["experiment", "all", "timestamp_fmt", "results_path", "do"], ) return opt_parser.parse_args() -def get_running_liftoffs(experiment: str, results_path: str): - """ Get the running liftoff processes. - """ - - cmd = ( - "COLUMNS=0 pgrep liftoff" - " | xargs -r -n 1 grep " - f"--files-with-matches {results_path:s}/*/.__* -e" - ) - result = subprocess.run( - cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True - ) - if result.stderr: - raise Exception(result.stderr.decode("utf-8")) - +def is_liftoff_main_process(cmdline): + """Check if the process is a main liftoff process.""" + cmd_str = " ".join(cmdline).lower() + if sys.platform.startswith('win'): + return 'liftoff.exe' in cmd_str + else: + # Regex pattern to match 'liftoff' not followed by '-procs' + pattern = r'/liftoff(?!\-procs)\b' + return re.search(pattern, cmd_str) is not None + +def extract_session_id(cmdline): + """Extract session ID from the command line arguments.""" + cmdline_str = " ".join(cmdline) # Convert cmdline list to a single string + try: + # Split the command line string by spaces and search for '--session-id' + parts = cmdline_str.split() + if "--session-id" in parts: + index = parts.index("--session-id") + return parts[index + 1] if index < len(parts) - 1 else None + except ValueError: + pass + return None + + +def extract_experiment_name(cmdline): + """Extract experiment name from the command line arguments.""" + cmdline_str = " ".join(cmdline) + try: + # Find the part of the command line string that ends with '.yaml' + yaml_path = next( + (part for part in cmdline_str.split() if part.endswith(".yaml")), None + ) + if yaml_path: + path_parts = yaml_path.split(os.path.sep) + main_experiment = path_parts[-4] + sub_experiment_1 = path_parts[-3] + sub_experiment_2 = path_parts[-2] + return main_experiment, f"{sub_experiment_1}{os.path.sep}{sub_experiment_2}" + except IndexError: + # Handle cases where the path does not have the expected number of parts + print(f"Warning: Unexpected path format for experiment in cmdline: {cmdline}") + return None, None + + +def extract_common_path(cmdline): + # Assuming the path is the fourth argument in the main Liftoff process command line + # Adjust the index based on your actual command line structure + if len(cmdline) > 3: + path = cmdline[3] + # Normalize the path to ensure consistency + return os.path.normpath(path) + return None + +def extract_subprocess_path(cmdline, common_path): + # Extracting the path from subprocess command line + for arg in cmdline: + if common_path in arg: + # Extract the segment of the path that matches the common_path + path_segment = arg.split(common_path)[-1] + return os.path.normpath(common_path + path_segment) + return None + +def get_running_liftoffs(): running = {} - - for session_path in result.stdout.decode("utf-8").split("\n"): - if not session_path: + main_process_pids = set() + path_to_main_pid = {} + unique_subprocess_keys = set() # To store unique combinations of path and experiment name + + # First pass: Identify all liftoff main processes and map paths + for proc in psutil.process_iter(["pid", "cmdline"]): + try: + cmdline = proc.cmdline() + if is_liftoff_main_process(cmdline): + main_process_pids.add(proc.pid) + running[proc.pid] = { + "procs": [], + "experiment": None, + } + if not sys.platform.startswith('win'): # Linux/WSL + common_path = extract_common_path(cmdline) + if common_path: + path_to_main_pid[common_path] = proc.pid + except (psutil.NoSuchProcess, psutil.AccessDenied): continue - with open(session_path) as hndlr: - ppid = int(hndlr.readline().strip()) - experiment_full_name = os.path.basename(os.path.dirname(session_path)) - - if experiment is not None and experiment not in experiment_full_name: + # Second pass: Associate subprocesses + for proc in psutil.process_iter(["pid", "cmdline"]): + try: + cmdline = proc.cmdline() + proc_pid = proc.pid + if proc_pid not in main_process_pids: # Exclude main process PID + parent_pid = None + + if sys.platform.startswith('win'): + # Windows: use parent PID for association + parent_pid = proc.ppid() if proc.ppid() in main_process_pids else None + + if parent_pid: + experiment_name, sub_experiment_name = extract_experiment_name(cmdline) + unique_key = (proc_pid, sub_experiment_name) # Unique key for Windows + + if unique_key not in unique_subprocess_keys: + subprocess_info = { + "pid": proc_pid, + "experiment_name": sub_experiment_name, + } + running[parent_pid]["procs"].append(subprocess_info) + unique_subprocess_keys.add(unique_key) + + if not running[parent_pid]["experiment"] and experiment_name: + running[parent_pid]["experiment"] = experiment_name + + else: + # Linux/WSL: focus on 'sh -c' processes + if 'sh' in cmdline and '-c' in cmdline: + for common_path, pid in path_to_main_pid.items(): + subprocess_path = extract_subprocess_path(cmdline, common_path) + if subprocess_path and subprocess_path.startswith(common_path): + parent_pid = pid + break + + if parent_pid: + experiment_name, sub_experiment_name = extract_experiment_name(cmdline) + unique_key = (subprocess_path, sub_experiment_name) + if unique_key not in unique_subprocess_keys: + subprocess_info = { + "pid": proc_pid, + "experiment_name": sub_experiment_name, + } + running[parent_pid]["procs"].append(subprocess_info) + unique_subprocess_keys.add(unique_key) + + if not running[parent_pid]["experiment"] and experiment_name: + running[parent_pid]["experiment"] = experiment_name + + except (psutil.NoSuchProcess, psutil.AccessDenied): continue - proc_group = dict({}) - session_id = os.path.basename(session_path)[3:] - - escaped_sid = session_id.replace("-", r"\-") - cmd = ( - f"for p in " - f"`pgrep -f '\\-\\-session\\-id {escaped_sid:s}'`" - f"; do COLUMNS=0 ps -p $p -o pid,ppid,cmd h; done" - ) - result = subprocess.run( - cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True - ) - if result.stderr: - raise Exception(result.stderr.decode("utf-8")) - - pids = [] - print(result.stdout.decode("utf-8").split("\n")) - for line1 in result.stdout.decode("utf-8").split("\n"): - if not line1: - continue - - pid, fake_ppid, *other = line1.split() - pid, fake_ppid = int(pid), int(fake_ppid) - if fake_ppid != 1: - cfg, good = "", True - for part in other: - if part.endswith("cfg.yaml"): - cfg = ( - os.path.basename(os.path.dirname(os.path.dirname(part))) - + "/" - + os.path.basename(os.path.dirname(part)) - ) - elif ".__crash" in part: - good = False - break - if good: - pids.append((pid, cfg)) - - proc_group["session"] = session_id - proc_group["ppid"] = ppid - proc_group["procs"] = pids - - running.setdefault(experiment_full_name, []).append(proc_group) - return running def display_procs(running): - """ Display the running liftoff processes. - """ - for experiment_name, details in running.items(): - print(clr(experiment_name, attrs=["bold"])) - for info in details: - nrunning = clr(f"{len(info['procs']):d}", color="blue", attrs=["bold"]) - ppid = clr(f"{info['ppid']:5d}", color="red", attrs=["bold"]) - print(f" {ppid:s}" f" :: {info['session']:s}" f" :: {nrunning:s} running") - for pid, name in info["procs"]: - print(f" - {pid:5d} :: {name:s}") + """Display the running liftoff processes.""" + if running: + for main_pid, main_proc_info in running.items(): + # Format the main process ID + main_pid_formatted = clr(f"{main_pid:5d}", color="red", attrs=["bold"]) + experiment_name = main_proc_info.get("experiment", "N/A") + + # Print the main process information + print(f"{main_pid_formatted} :: {experiment_name} :: {len(main_proc_info['procs'])} running") + + # Iterate and display each subprocess + for subproc in main_proc_info["procs"]: + sub_pid = subproc.get("pid") + session_id = subproc.get("session_id", "N/A") + sub_experiment_name = subproc.get("experiment_name", "N/A") + + # Format the subprocess information + sub_pid_formatted = clr(f"{sub_pid:5d}", color="blue", attrs=["bold"]) + print(f" - {sub_pid_formatted} :: {sub_experiment_name}") + else: + print("No running liftoff processes.") def procs() -> None: - """ Entry point for liftoff-procs. - """ + """Entry point for liftoff-procs.""" - opts = parse_options() - display_procs(get_running_liftoffs(opts.experiment, opts.results_path)) + display_procs(get_running_liftoffs()) diff --git a/liftoff/sanitizer.py b/liftoff/sanitizer.py index ff3c749..ee53866 100644 --- a/liftoff/sanitizer.py +++ b/liftoff/sanitizer.py @@ -144,7 +144,7 @@ def clean_experiment(opts): if not opts.do: print( - "\nThis was just a simultation. Rerun with", + "\nThis was just a simulation. Rerun with", clr("--do", attrs=["bold"]), "to clean the experiment for real.", ) diff --git a/pyproject.toml b/pyproject.toml index 75f53d1..ee2b330 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -27,4 +27,8 @@ exclude = ''' | tests/data | profiling )/ -''' \ No newline at end of file +''' + +[tool.poetry.group.dev.dependencies] +pytest = "^7.4.1" +pytest-cov = "^4.1.0" \ No newline at end of file diff --git a/pytest.ini b/pytest.ini new file mode 100644 index 0000000..452f476 --- /dev/null +++ b/pytest.ini @@ -0,0 +1,14 @@ +[pytest] +testpaths = + tests + +norecursedirs = + dist + build + .vscode + *.egg-info + +addopts = + --doctest-modules + --cov=liftoff + --cov-report term-missing diff --git a/setup.py b/setup.py index 8506aeb..a52ebc1 100644 --- a/setup.py +++ b/setup.py @@ -6,12 +6,11 @@ from setuptools import setup, find_packages -VERSION = "0.3.3" # single source of truth +VERSION = "0.4.2" # single source of truth print("-- Installing liftoff " + VERSION) with open("./liftoff/version.py", "w") as f: f.write("__version__ = '{}'\n".format(VERSION)) - setup( name="liftoff", version=VERSION, @@ -40,6 +39,7 @@ "tabulate", "termcolor", "pyperclip", + "psutil", ], zip_safe=False, )