diff --git a/flepimop/gempyor_pkg/src/gempyor/batch.py b/flepimop/gempyor_pkg/src/gempyor/batch.py index 5d4b7ca3c..78d97e4ea 100644 --- a/flepimop/gempyor_pkg/src/gempyor/batch.py +++ b/flepimop/gempyor_pkg/src/gempyor/batch.py @@ -31,6 +31,7 @@ from .logging import get_script_logger from .utils import _format_cli_options, _git_checkout, _git_head, _shutil_which, config from .shared_cli import ( + MEMORY_MB, NONNEGATIVE_DURATION, cli, config_files_argument, @@ -970,7 +971,7 @@ def _submit_scenario_job( ), click.Option( param_decls=["--memory", "memory"], - type=click.IntRange(min=1), + type=MEMORY_MB, default=None, help="Override for the amount of memory per node to use in MB.", ), @@ -1002,7 +1003,59 @@ def _submit_scenario_job( ) @click.pass_context def _click_submit(ctx: click.Context = mock_context, **kwargs: Any) -> None: - """Submit batch jobs""" + """ + Submit batch inference jobs. + + This CLI tool makes it a breeze to submit batch inference jobs. In general, this + tool does: + + 1. Determine information about the job such as the inference method, batch system, + job size/time limit, and resources. + 2. Write metadata about the batch job to a 'manifest.json' file. + 3. Loop over the outcome/seir scenario modifiers and generate/submit batch jobs for + each of those. + 4. Checkout a new branch in the project directory to preserve the outputs. + + To get a better understanding of this tool you can use the `--dry-run` flag which + will complete all of steps described above except for submitting the jobs. Or if you + would like to test run the batch scripts without submitting to slurm or AWS you can + use the `--local` flag which will run the "batch" job locally (only use for small + test jobs). + + Here are some common examples to get started: + + 1. Assuming you are in your clone of the `HopkinsIDD/flepimop_sample` repository + this command will do a dry run of the sample inference configuration with two + populations. + ```bash + $ flepimop submit --jobs 4 \ # With legacy inference jobs refers to chains + --simulations 20 \ # The number of iterations per a chain + --blocks 1 \ # The number of consecutive blocks to run + --slurm \ # Use slurm, shorthand for `--batch-system slurm` + --project-path $(pwd) \ # Manually specify the project path + --email me@college.edu \ # Use slurm's built-in email alerts for the job + --skip-checkout \ # Do not create a new branch in the project repo + -vvv \ # Use the max verbosity + --dry-run \ # Do not actually submit the job via `sbatch`. + config_sample_2pop_inference.yml + ``` + + 2. Extending on example (1) let's make some small modifications to submit a "real" + job. + ```bash + $ flepimop submit --jobs 10 \ # See before + --simulations 400 \ # See before + --blocks 1 \ # See before + --slurm \ # See before + --partition dedicated_slurm_partition \ # Submit to a particular slurm partition + --simulation-time 30s \ # Specify the time limit per simulation + --initial-time 5min \ # Specify the initial time limit + --conda-env custom-flepimop-env \ # Specify a custom conda env to use + --cpus 4 \ # Use 4 CPUs per a job + -vvv \ # Use the max verbosity + config_sample_2pop_inference.yml + ``` + """ # Generic setup now = datetime.now(timezone.utc) logger = get_script_logger(__name__, kwargs["verbosity"]) @@ -1073,28 +1126,22 @@ def _click_submit(ctx: click.Context = mock_context, **kwargs: Any) -> None: logger.info("Setting a total job time limit of %s minutes", job_time_limit.format()) # Job resources + memory = None if kwargs["memory"] is None else math.ceil(kwargs["memory"]) + if memory != kwargs["memory"]: + logger.warning( + "The requested memory of %.3fMB has been rounded up to %uMB for submission", + kwargs["memory"], + memory, + ) job_resources = JobResources.from_presets( job_size, inference_method, nodes=kwargs["nodes"], cpus=kwargs["cpus"], - memory=kwargs["memory"], + memory=memory, ) logger.info("Requesting the resources %s for this job.", job_resources) - # Outcome/seir modifier scenarios - outcome_modifiers_scenarios = ( - cfg["outcome_modifiers"]["scenarios"].as_str_seq() - if cfg["outcome_modifiers"].exists() - and cfg["outcome_modifiers"]["scenarios"].exists() - else [None] - ) - seir_modifiers_scenarios = ( - cfg["seir_modifiers"]["scenarios"].as_str_seq() - if cfg["seir_modifiers"].exists() and cfg["seir_modifiers"]["scenarios"].exists() - else [None] - ) - # Restart/continuation location # TODO: Implement this @@ -1119,6 +1166,19 @@ def _click_submit(ctx: click.Context = mock_context, **kwargs: Any) -> None: "Dumped the final config for this batch submission to %s", config_out.absolute() ) + # Outcome/seir modifier scenarios + outcome_modifiers_scenarios = ( + cfg["outcome_modifiers"]["scenarios"].as_str_seq() + if cfg["outcome_modifiers"].exists() + and cfg["outcome_modifiers"]["scenarios"].exists() + else [None] + ) + seir_modifiers_scenarios = ( + cfg["seir_modifiers"]["scenarios"].as_str_seq() + if cfg["seir_modifiers"].exists() and cfg["seir_modifiers"]["scenarios"].exists() + else [None] + ) + # Loop over the scenarios for outcome_modifiers_scenario, seir_modifiers_scenario in product( outcome_modifiers_scenarios, seir_modifiers_scenarios diff --git a/flepimop/gempyor_pkg/src/gempyor/shared_cli.py b/flepimop/gempyor_pkg/src/gempyor/shared_cli.py index 805de36c4..d47d6de96 100644 --- a/flepimop/gempyor_pkg/src/gempyor/shared_cli.py +++ b/flepimop/gempyor_pkg/src/gempyor/shared_cli.py @@ -199,6 +199,53 @@ def convert( NONNEGATIVE_DURATION = DurationParamType(nonnegative=True) +class MemoryParamType(click.ParamType): + name = "memory" + _units = { + "kb": 1024.0**1.0, + "k": 1024.0**1.0, + "mb": 1024.0**2.0, + "m": 1024.0**2.0, + "gb": 1024.0**3.0, + "g": 1024.0**3.0, + "t": 1024.0**4.0, + "tb": 1024.0**4.0, + } + + def __init__(self, unit: str) -> None: + super().__init__() + if (unit := unit.lower()) not in self._units.keys(): + raise ValueError( + f"The `unit` given is not valid, given '{unit}' and " + "must be one of: {', '.join(self._units.keys())}." + ) + self._unit = unit + self._regex = re.compile( + rf"^(([0-9]+)?(\.[0-9]+)?)({'|'.join(self._units.keys())})?$", + flags=re.IGNORECASE, + ) + + def convert( + self, value: Any, param: click.Parameter | None, ctx: click.Context | None + ) -> float: + value = str(value).strip() + if (m := self._regex.match(value)) is None: + self.fail(f"{value!r} is not a valid memory size.", param, ctx) + number, _, _, unit = m.groups() + unit = unit.lower() + if unit == self._unit: + return float(number) + return (self._units.get(unit, self._unit) * float(number)) / ( + self._units.get(self._unit) + ) + + +MEMORY_KB = MemoryParamType("kb") +MEMORY_MB = MemoryParamType("mb") +MEMORY_GB = MemoryParamType("gb") +MEMORY_TB = MemoryParamType("tb") + + def click_helpstring( params: click.Parameter | list[click.Parameter], ) -> Callable[[Callable[..., Any]], Callable[..., Any]]: diff --git a/flepimop/gempyor_pkg/tests/shared_cli/test_memory_param_type_class.py b/flepimop/gempyor_pkg/tests/shared_cli/test_memory_param_type_class.py new file mode 100644 index 000000000..1db89a5a7 --- /dev/null +++ b/flepimop/gempyor_pkg/tests/shared_cli/test_memory_param_type_class.py @@ -0,0 +1,56 @@ +import random +from typing import Any + +from click.exceptions import BadParameter +import pytest + +from gempyor.shared_cli import MemoryParamType + + +@pytest.mark.parametrize("unit", ("Nope", "NO CHANCE", "wrong", "bb")) +def test_invalid_unit_value_error(unit: str) -> None: + with pytest.raises( + ValueError, + match=( + "^The `unit` given is not valid, given " + f"'{unit.lower()}' and must be one of:.*.$" + ), + ): + MemoryParamType(unit) + + +@pytest.mark.parametrize("value", ("1..2MB", "3.4cb", "56.abc", "-1GB")) +def test_invalid_value_bad_parameter(value: Any) -> None: + memory = MemoryParamType("mb") + with pytest.raises(BadParameter, match="^.* is not a valid memory size.$"): + memory.convert(value, None, None) + + +@pytest.mark.parametrize("unit", MemoryParamType._units.keys()) +@pytest.mark.parametrize( + "number", + [random.randint(1, 1000) for _ in range(3)] # int + + [random.random() for _ in range(3)] # float without numbers left of decimal + + [ + random.randint(1, 25) + random.random() for _ in range(3) + ], # float with numbers left of the decimal +) +def test_convert_acts_as_identity(unit: str, number: int) -> None: + memory = MemoryParamType(unit) + assert memory.convert(f"{number}{unit}".lstrip("0"), None, None) == number + assert memory.convert(f"{number}{unit.upper()}".lstrip("0"), None, None) == number + + +@pytest.mark.parametrize( + ("unit", "value", "expected"), + ( + ("gb", "1.2gb", 1.2), + ("kb", "1mb", 1024.0), + ("gb", "30mb", 30.0 / 1024.0), + ("kb", "2tb", 2.0 * (1024.0**3.0)), + ("mb", "0.1gb", 0.1 * 1024.0), + ), +) +def test_exact_results_for_select_inputs(unit: str, value: Any, expected: float) -> None: + memory = MemoryParamType(unit) + assert memory.convert(value, None, None) == expected