Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(local): handle local launcher #116

Merged
merged 13 commits into from
Mar 7, 2025
3 changes: 3 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
antares-timeseries-generation==0.1.7
antares-study-version==1.0.9

absl-py~=1.4.0
click~=8.1.7
configparser~=5.0.2
numpy~=1.26.4
pandas~=2.2.2
pandas-stubs~=2.2.2
psutil~=7.0.0
types-psutil~=7.0.0.20250218
pydantic~=2.7.1
pytest~=7.2.1
python-dateutil~=2.9.0
Expand Down
22 changes: 22 additions & 0 deletions src/antares/craft/model/simulation.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
from enum import Enum
from typing import Any, Optional

from antares.study.version import SolverVersion


class Solver(Enum):
COIN = "coin"
Expand Down Expand Up @@ -56,6 +58,26 @@ def to_api(self) -> dict[str, Any]:

return data

def to_local(self, solver_version: SolverVersion) -> list[str]:
args = []

if self.nb_cpu:
args += ["--force-parallel", str(self.nb_cpu)]

if not self.unzip_output:
args.append("-z")

if self.output_suffix:
args += ["-n", self.output_suffix]

if solver_version >= SolverVersion.parse("9.2") or self.solver != Solver.SIRIUS:
args += ["--use-ortools", " --ortools-solver", self.solver.value]

if self.presolve:
args += ["--solver-parameters", "PRESOLVE 1"]

return args


class JobStatus(Enum):
PENDING = "pending"
Expand Down
14 changes: 9 additions & 5 deletions src/antares/craft/model/study.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ def __init__(
version: str,
services: StudyServices,
path: PurePath = PurePath("."),
solver_path: Optional[Path] = None,
):
self.name = name
self.version = version
Expand All @@ -64,6 +65,7 @@ def __init__(
self._links: dict[str, Link] = dict()
self._binding_constraints: dict[str, BindingConstraint] = dict()
self._outputs: dict[str, Output] = dict()
self._solver_path: Optional[Path] = solver_path

@property
def service(self) -> BaseStudyService:
Expand Down Expand Up @@ -283,7 +285,7 @@ def run_antares_simulation(self, parameters: Optional[AntaresSimulationParameter

Returns: A job representing the simulation task
"""
return self._run_service.run_antares_simulation(parameters)
return self._run_service.run_antares_simulation(parameters, self._solver_path)

def wait_job_completion(self, job: Job, time_out: int = 172800) -> None:
"""
Expand Down Expand Up @@ -365,16 +367,18 @@ def update_multiple_links(self, new_properties: Dict[str, LinkPropertiesUpdate])
# import mechanics, we need to use local imports to avoid circular dependencies.


def create_study_local(study_name: str, version: str, parent_directory: "Path") -> "Study":
def create_study_local(
study_name: str, version: str, parent_directory: "Path", solver_path: Optional[Path] = None
) -> "Study":
from antares.craft.service.local_services.factory import create_study_local

return create_study_local(study_name, version, parent_directory)
return create_study_local(study_name, version, parent_directory, solver_path)


def read_study_local(study_path: "Path") -> "Study":
def read_study_local(study_path: "Path", solver_path: Optional[Path] = None) -> "Study":
from antares.craft.service.local_services.factory import read_study_local

return read_study_local(study_path)
return read_study_local(study_path, solver_path)


def create_study_api(
Expand Down
5 changes: 4 additions & 1 deletion src/antares/craft/service/api_services/services/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
# This file is part of the Antares project.
import time

from pathlib import Path
from typing import Any, Optional, cast

from antares.craft.api_conf.api_conf import APIconf
Expand Down Expand Up @@ -38,7 +39,9 @@ def __init__(self, config: APIconf, study_id: str):
self._wrapper = RequestWrapper(self.config.set_up_api_conf())

@override
def run_antares_simulation(self, parameters: Optional[AntaresSimulationParameters] = None) -> Job:
def run_antares_simulation(
self, parameters: Optional[AntaresSimulationParameters] = None, solver_path: Optional[Path] = None
) -> Job:
url = f"{self._base_url}/launcher/run/{self.study_id}"
try:
if parameters is not None:
Expand Down
4 changes: 3 additions & 1 deletion src/antares/craft/service/base_services.py
Original file line number Diff line number Diff line change
Expand Up @@ -757,7 +757,9 @@ def update_storage_matrix(self, storage: "STStorage", ts_name: "STStorageMatrixN

class BaseRunService(ABC):
@abstractmethod
def run_antares_simulation(self, parameters: Optional[AntaresSimulationParameters] = None) -> Job:
def run_antares_simulation(
self, parameters: Optional[AntaresSimulationParameters] = None, solver_path: Optional[Path] = None
) -> Job:
"""
Runs the Antares simulation.

Expand Down
11 changes: 9 additions & 2 deletions src/antares/craft/service/local_services/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import time

from pathlib import Path
from typing import Optional

from antares.craft.config.local_configuration import LocalConfiguration
from antares.craft.model.settings.study_settings import StudySettings
Expand Down Expand Up @@ -113,14 +114,17 @@ def _create_directory_structure(study_path: Path) -> None:
(study_path / subdirectory).mkdir(parents=True, exist_ok=True)


def create_study_local(study_name: str, version: str, parent_directory: Path) -> "Study":
def create_study_local(
study_name: str, version: str, parent_directory: Path, solver_path: Optional[Path] = None
) -> "Study":
"""
Create a directory structure for the study with empty files.

Args:
study_name: antares study name to be created
version: antares version for study
parent_directory: Local directory to store the study in.
solver_path: antares solver path to use to run simulations

Raises:
FileExistsError if the study already exists in the given location
Expand Down Expand Up @@ -166,6 +170,7 @@ def create_study_local(study_name: str, version: str, parent_directory: Path) ->
version=version,
services=create_local_services(config=local_config, study_name=study_name),
path=study_directory,
solver_path=solver_path,
)
# We need to create the file with default value
default_settings = StudySettings()
Expand All @@ -175,11 +180,12 @@ def create_study_local(study_name: str, version: str, parent_directory: Path) ->
return study


def read_study_local(study_directory: Path) -> "Study":
def read_study_local(study_directory: Path, solver_path: Optional[Path] = None) -> "Study":
"""
Read a study structure by returning a study object.
Args:
study_directory: antares study path to be read
solver_path: antares solver path to use to run simulations

Raises:
FileNotFoundError: If the provided directory does not exist.
Expand All @@ -202,6 +208,7 @@ def _directory_not_exists(local_path: Path) -> None:
version=study_params["version"],
services=create_local_services(config=local_config, study_name=study_params["caption"]),
path=study_directory,
solver_path=solver_path,
)
study.read_settings()
study.read_areas()
Expand Down
2 changes: 2 additions & 0 deletions src/antares/craft/service/local_services/services/area.py
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,8 @@ def get_misc_gen_matrix(self, area_id: str) -> pd.DataFrame:
def read_areas(self) -> List[Area]:
local_path = self.config.local_path
areas_path = local_path / self.study_name / "input" / "areas"
if not areas_path.exists():
return []
areas = []
for element in areas_path.iterdir():
if element.is_dir():
Expand Down
98 changes: 94 additions & 4 deletions src/antares/craft/service/local_services/services/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,114 @@
# SPDX-License-Identifier: MPL-2.0
#
# This file is part of the Antares project.
import shutil
import subprocess

from datetime import datetime
from pathlib import Path
from typing import Any, Optional

import psutil

from antares.craft.config.local_configuration import LocalConfiguration
from antares.craft.model.simulation import AntaresSimulationParameters, Job
from antares.craft.exceptions.exceptions import AntaresSimulationRunningError, SimulationTimeOutError
from antares.craft.model.simulation import AntaresSimulationParameters, Job, JobStatus
from antares.craft.service.base_services import BaseRunService
from antares.study.version import SolverVersion
from typing_extensions import override


def _get_solver_version(solver_path: Path) -> SolverVersion:
solver_name = solver_path.name
solver_version_str = solver_name.removeprefix("antares-").removesuffix("-solver")
return SolverVersion.parse(solver_version_str)


class RunLocalService(BaseRunService):
def __init__(self, config: LocalConfiguration, study_name: str, **kwargs: Any) -> None:
super().__init__(**kwargs)
self.config = config
self.study_name = study_name

@override
def run_antares_simulation(self, parameters: Optional[AntaresSimulationParameters] = None) -> Job:
raise NotImplementedError
def run_antares_simulation(
self, parameters: Optional[AntaresSimulationParameters] = None, solver_path: Optional[Path] = None
) -> Job:
if not solver_path:
raise AntaresSimulationRunningError(self.study_name, "No solver path was provided")

# Builds command line call
args = [str(solver_path)]
parameters = parameters or AntaresSimulationParameters()
solver_version = _get_solver_version(solver_path)
args += parameters.to_local(solver_version)
args += ["-i", str(self.config.study_path)]

# Launches the simulation
process = subprocess.Popen(
args, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, universal_newlines=True, encoding="utf-8"
)

return Job(job_id=str(process.pid), status=JobStatus.RUNNING, parameters=parameters, output_id=None)

@override
def wait_job_completion(self, job: Job, time_out: int) -> None:
raise NotImplementedError
pid = int(job.job_id)
try:
process = psutil.Process(pid)

return_code = process.wait(timeout=time_out)
if return_code == 0:
self._handle_success(job)
else:
self._handle_failure(job)

except psutil.NoSuchProcess:
return self._handle_simulation_ending(job)
except psutil.TimeoutExpired:
raise SimulationTimeOutError(job.job_id, time_out)

def _handle_simulation_ending(self, job: Job) -> None:
output_id = self._find_most_recent_output(job.parameters.unzip_output)
if output_id.endswith(".zip"):
job.status = JobStatus.SUCCESS
job.output_id = output_id
else:
output_path = self.config.study_path / "output" / output_id
if (output_path / "execution_info.ini").exists():
job.status = JobStatus.SUCCESS
job.output_id = output_id
else:
job.status = JobStatus.FAILED
shutil.rmtree(output_path)

def _handle_success(self, job: Job) -> None:
job.status = JobStatus.SUCCESS
job.output_id = self._find_most_recent_output(job.parameters.unzip_output)

def _handle_failure(self, job: Job) -> None:
job.status = JobStatus.FAILED
output_id = self._find_most_recent_output(job.parameters.unzip_output)
shutil.rmtree(self.config.study_path / "output" / output_id)

def _find_most_recent_output(self, output_is_unzipped: bool) -> str:
output_path = self.config.study_path / "output"
all_outputs = output_path.iterdir()
zipped_outputs = []
unzipped_outputs = []
for output in all_outputs:
if output.name.endswith(".zip"):
zipped_outputs.append(output.name)
else:
unzipped_outputs.append(output.name)

end_of_date_pattern = 13
concerned_list = unzipped_outputs if output_is_unzipped else zipped_outputs
output_result_tuple: tuple[float, str] = (0, "")
for output_name in concerned_list:
output_date_as_str = output_name[:end_of_date_pattern]
output_date_as_datetime = datetime.strptime(output_date_as_str, "%Y%m%d-%H%M")
total_seconds = output_date_as_datetime.timestamp()
if total_seconds > output_result_tuple[0]:
output_result_tuple = (total_seconds, output_name)
return output_result_tuple[1]
84 changes: 84 additions & 0 deletions tests/integration/test_local_launcher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
# Copyright (c) 2024, RTE (https://www.rte-france.com)
#
# See AUTHORS.txt
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
#
# SPDX-License-Identifier: MPL-2.0
#
# This file is part of the Antares project.
import pytest

import re

from pathlib import Path

from antares.craft import create_study_local, read_study_local
from antares.craft.exceptions.exceptions import AntaresSimulationRunningError
from antares.craft.model.hydro import HydroPropertiesUpdate
from antares.craft.model.simulation import AntaresSimulationParameters, JobStatus


def find_executable_path() -> Path:
solver_parent_path = (
[p for p in Path(__file__).parents if p.name == "antares_craft"][0]
/ "AntaresWebDesktop"
/ "AntaresWeb"
/ "antares_solver"
)
return list(solver_parent_path.glob("antares-*"))[0]


class TestLocalLauncher:
def test_error_case(self, tmp_path: Path):
study = create_study_local("test study", "880", tmp_path)
# Ensure it's impossible to run a study without giving a solver path at the instantiation
with pytest.raises(
AntaresSimulationRunningError,
match=re.escape("Could not run the simulation for study test study: No solver path was provided"),
):
study.run_antares_simulation()

solver_path = find_executable_path()
study = read_study_local(tmp_path / "test study", solver_path)

# Asserts running a simulation without areas fail and doesn't create an output file
job = study.run_antares_simulation()
study.wait_job_completion(job)
assert job.status == JobStatus.FAILED
assert job.parameters == AntaresSimulationParameters()
assert job.output_id is None
output_path = Path(study.path / "output")
assert list(output_path.iterdir()) == []

def test_lifecycle(self, tmp_path: Path):
solver_path = find_executable_path()
study = create_study_local("test study", "880", tmp_path, solver_path)
output_path = Path(study.path / "output")

# Simulation succeeds
area_1 = study.create_area("area_1")
area_1.hydro.update_properties(HydroPropertiesUpdate(reservoir_capacity=1)) # make the simulation succeeds
job = study.run_antares_simulation()
study.wait_job_completion(job)
assert job.status == JobStatus.SUCCESS
assert job.parameters == AntaresSimulationParameters()
outputs = list(output_path.iterdir())
assert len(outputs) == 1
output_id = outputs[0].name
assert job.output_id == output_id
assert not output_id.endswith(".zip")

# Runs simulation with parameters
simulation_parameters = AntaresSimulationParameters(unzip_output=False, output_suffix="test_integration")
second_job = study.run_antares_simulation(simulation_parameters)
study.wait_job_completion(second_job)
assert second_job.status == JobStatus.SUCCESS
assert second_job.parameters == simulation_parameters
outputs = list(output_path.iterdir())
assert len(outputs) == 2
second_output = [otp.name for otp in outputs if otp.name.endswith(".zip")][0]
assert second_job.output_id == second_output
assert second_output.endswith(".zip")
Loading