Skip to content

Commit

Permalink
Use queue options directly from ert
Browse files Browse the repository at this point in the history
  • Loading branch information
oyvindeide committed Jan 10, 2025
1 parent 7b2bedf commit dd77cf3
Show file tree
Hide file tree
Showing 20 changed files with 266 additions and 510 deletions.
8 changes: 4 additions & 4 deletions src/ert/config/queue_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ def driver_options(self) -> dict[str, Any]:

@pydantic.dataclasses.dataclass
class LocalQueueOptions(QueueOptions):
name: Literal[QueueSystem.LOCAL] = QueueSystem.LOCAL
name: Literal[QueueSystem.LOCAL, "local", "LOCAL"] = "local"

@property
def driver_options(self) -> dict[str, Any]:
Expand All @@ -101,7 +101,7 @@ def driver_options(self) -> dict[str, Any]:

@pydantic.dataclasses.dataclass
class LsfQueueOptions(QueueOptions):
name: Literal[QueueSystem.LSF] = QueueSystem.LSF
name: Literal[QueueSystem.LSF, "lsf", "LSF"] = "lsf"
bhist_cmd: NonEmptyString | None = None
bjobs_cmd: NonEmptyString | None = None
bkill_cmd: NonEmptyString | None = None
Expand All @@ -124,7 +124,7 @@ def driver_options(self) -> dict[str, Any]:

@pydantic.dataclasses.dataclass
class TorqueQueueOptions(QueueOptions):
name: Literal[QueueSystem.TORQUE] = QueueSystem.TORQUE
name: Literal[QueueSystem.TORQUE, "torque", "TORQUE"] = "torque"
qsub_cmd: NonEmptyString | None = None
qstat_cmd: NonEmptyString | None = None
qdel_cmd: NonEmptyString | None = None
Expand All @@ -145,7 +145,7 @@ def driver_options(self) -> dict[str, Any]:

@pydantic.dataclasses.dataclass
class SlurmQueueOptions(QueueOptions):
name: Literal[QueueSystem.SLURM] = QueueSystem.SLURM
name: Literal[QueueSystem.SLURM, "SLURM", "slurm"] = "slurm"
sbatch: NonEmptyString = "sbatch"
scancel: NonEmptyString = "scancel"
scontrol: NonEmptyString = "scontrol"
Expand Down
24 changes: 22 additions & 2 deletions src/everest/config/everest_config.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging
import os
from argparse import ArgumentParser
from copy import copy
from functools import cached_property
from io import StringIO
from itertools import chain
Expand Down Expand Up @@ -215,7 +216,7 @@ class EverestConfig(BaseModelWithPropertySupport): # type: ignore
""",
)
server: ServerConfig | None = Field(
default=None,
default_factory=ServerConfig,
description="""Defines Everest server settings, i.e., which queue system,
queue name and queue options are used for the everest server.
The main reason for changing this section is situations where everest
Expand Down Expand Up @@ -250,6 +251,25 @@ class EverestConfig(BaseModelWithPropertySupport): # type: ignore
config_path: Path = Field()
model_config = ConfigDict(extra="forbid")

@model_validator(mode="after")
def validate_queue_system(self) -> Self: # pylint: disable=E0213
if self.server is None:
self.server = ServerConfig(queue_system=copy(self.simulator.queue_system))
elif self.server.queue_system is None:
self.server.queue_system = copy(self.simulator.queue_system)
if (
str(self.simulator.queue_system.name).lower() == "local"
and str(self.server.queue_system.name).lower()
!= str(self.simulator.queue_system.name).lower()
):
raise ValueError(
f"The simulator is using local as queue system "
f"while the everest server is using {self.server.queue_system.name}. "
f"If the simulator is using local, so must the everest server."
)
self.server.queue_system.max_running = 1
return self

@model_validator(mode="after")
def validate_forward_model_job_name_installed(self) -> Self: # pylint: disable=E0213
install_jobs = self.install_jobs
Expand Down Expand Up @@ -745,7 +765,7 @@ def with_defaults(cls, **kwargs):
"model": {"realizations": [0]},
}

return EverestConfig.model_validate({**defaults, **kwargs})
return cls.model_validate({**defaults, **kwargs})

@staticmethod
def lint_config_dict(config: dict) -> list["ErrorDetails"]:
Expand Down
13 changes: 0 additions & 13 deletions src/everest/config/has_ert_queue_options.py

This file was deleted.

70 changes: 47 additions & 23 deletions src/everest/config/server_config.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,16 @@
import json
import os
from typing import Literal
from typing import Any

from pydantic import BaseModel, ConfigDict, Field
from pydantic import BaseModel, ConfigDict, Field, field_validator, model_validator

from ert.config.queue_config import (
LocalQueueOptions,
LsfQueueOptions,
SlurmQueueOptions,
TorqueQueueOptions,
)
from ert.plugins import ErtPluginManager

from ..strings import (
CERTIFICATE_DIR,
Expand All @@ -11,10 +19,9 @@
SERVER_STATUS,
SESSION_DIR,
)
from .has_ert_queue_options import HasErtQueueOptions


class ServerConfig(BaseModel, HasErtQueueOptions): # type: ignore
class ServerConfig(BaseModel): # type: ignore
name: str | None = Field(
None,
description="""Specifies which queue to use.
Expand All @@ -27,30 +34,47 @@ class ServerConfig(BaseModel, HasErtQueueOptions): # type: ignore
as RMS and Eclipse.
""",
) # Corresponds to queue name
exclude_host: str | None = Field(
"",
description="""Comma separated list of nodes that should be
excluded from the slurm run""",
)
include_host: str | None = Field(
"",
description="""Comma separated list of nodes that
should be included in the slurm run""",
)
options: str | None = Field(
None,
description="""Used to specify options to LSF.
Examples to set memory requirement is:
* rusage[mem=1000]""",
)
queue_system: Literal["lsf", "local", "slurm"] | None = Field(
None,
description="Defines which queue system the everest server runs on.",
queue_system: (
LocalQueueOptions
| LsfQueueOptions
| SlurmQueueOptions
| TorqueQueueOptions
| None
) = Field(
default=None,
description="Defines which queue system the everest submits jobs to",
discriminator="name",
)
model_config = ConfigDict(
extra="forbid",
)

@field_validator("queue_system", mode="before")
@classmethod
def default_local_queue(cls, v):
if v is None:
return v
elif "activate_script" not in v and ErtPluginManager().activate_script():
v["activate_script"] = ErtPluginManager().activate_script()
return v

@model_validator(mode="before")
@classmethod
def check_old_config(cls, data: Any) -> Any:
if isinstance(data, dict):
queue_system = data.get("queue_system")
queue_systems = {
"lsf": LsfQueueOptions,
"torque": TorqueQueueOptions,
"slurm": SlurmQueueOptions,
"local": LocalQueueOptions,
}
if isinstance(queue_system, str) and queue_system in queue_systems:
raise ValueError(
f"Queue system configuration has changed, valid options for {queue_system} are: {list(queue_systems[queue_system].__dataclass_fields__.keys())}"
)
return data

@staticmethod
def get_server_url(output_dir: str) -> str:
"""Return the url of the server.
Expand Down
145 changes: 51 additions & 94 deletions src/everest/config/simulator_config.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,26 @@
import warnings
from typing import Literal
from typing import Any

from pydantic import BaseModel, Field, NonNegativeInt, PositiveInt, field_validator
from pydantic import (
BaseModel,
Field,
NonNegativeInt,
PositiveInt,
field_validator,
model_validator,
)

from .has_ert_queue_options import HasErtQueueOptions
from ert.config.queue_config import (
LocalQueueOptions,
LsfQueueOptions,
SlurmQueueOptions,
TorqueQueueOptions,
)
from ert.plugins import ErtPluginManager

simulator_example = {"queue_system": {"name": "local", "max_running": 3}}


class SimulatorConfig(BaseModel, HasErtQueueOptions, extra="forbid"): # type: ignore
name: str | None = Field(default=None, description="Specifies which queue to use")
cores: PositiveInt | None = Field(
default=None,
description="""Defines the number of simultaneously running forward models.
When using queue system lsf, this corresponds to number of nodes used at one
time, whereas when using the local queue system, cores refers to the number of
cores you want to use on your system.
This number is specified in Ert as MAX_RUNNING.
""",
)
class SimulatorConfig(BaseModel, extra="forbid"): # type: ignore
cores_per_node: PositiveInt | None = Field(
default=None,
description="""defines the number of CPUs when running
Expand All @@ -35,32 +35,24 @@ class SimulatorConfig(BaseModel, HasErtQueueOptions, extra="forbid"): # type: i
description="Whether the batch folder for a successful simulation "
"needs to be deleted.",
)
exclude_host: str | None = Field(
"",
description="""Comma separated list of nodes that should be
excluded from the slurm run.""",
)
include_host: str | None = Field(
"",
description="""Comma separated list of nodes that
should be included in the slurm run""",
)
max_runtime: NonNegativeInt | None = Field(
default=None,
description="""Maximum allowed running time of a forward model. When
set, a job is only allowed to run for max_runtime seconds.
A value of 0 means unlimited runtime.
""",
)
options: str | None = Field(
queue_system: (
LocalQueueOptions
| LsfQueueOptions
| SlurmQueueOptions
| TorqueQueueOptions
| None
) = Field(
default=None,
description="""Used to specify options to LSF.
Examples to set memory requirement is:
* rusage[mem=1000]""",
)
queue_system: Literal["lsf", "local", "slurm", "torque"] | None = Field(
default="local",
description="Defines which queue system the everest server runs on.",
description="Defines which queue system the everest submits jobs to",
discriminator="name",
validate_default=True,
)
resubmit_limit: NonNegativeInt | None = Field(
default=None,
Expand All @@ -73,38 +65,6 @@ class SimulatorConfig(BaseModel, HasErtQueueOptions, extra="forbid"): # type: i
resumbit_limit defines the number of times we will resubmit a failing forward model.
If not specified, a default value of 1 will be used.""",
)
sbatch: str | None = Field(
default=None,
description="sbatch executable to be used by the slurm queue interface.",
)
scancel: str | None = Field(
default=None,
description="scancel executable to be used by the slurm queue interface.",
)
scontrol: str | None = Field(
default=None,
description="scontrol executable to be used by the slurm queue interface.",
)
sacct: str | None = Field(
default=None,
description="sacct executable to be used by the slurm queue interface.",
)
squeue: str | None = Field(
default=None,
description="squeue executable to be used by the slurm queue interface.",
)
server: str | None = Field(
default=None,
description="Name of LSF server to use. This option is deprecated and no longer required",
)
slurm_timeout: int | None = Field(
default=None,
description="Timeout for cached status used by the slurm queue interface",
)
squeue_timeout: int | None = Field(
default=None,
description="Timeout for cached status used by the slurm queue interface.",
)
enable_cache: bool = Field(
default=False,
description="""Enable forward model result caching.
Expand All @@ -118,32 +78,29 @@ class SimulatorConfig(BaseModel, HasErtQueueOptions, extra="forbid"): # type: i
the most common use of a standard optimization with a continuous
optimizer.""",
)
qsub_cmd: str | None = Field(default="qsub", description="The submit command")
qstat_cmd: str | None = Field(default="qstat", description="The query command")
qdel_cmd: str | None = Field(default="qdel", description="The kill command")
cluster_label: str | None = Field(
default=None,
description="The name of the cluster you are running simulations in.",
)
keep_qsub_output: int | None = Field(
default=0,
description="Set to 1 to keep error messages from qsub. Usually only to be used if somethign is seriously wrong with the queue environment/setup.",
)
submit_sleep: float | None = Field(
default=0.5,
description="To avoid stressing the TORQUE/PBS system you can instruct the driver to sleep for every submit request. The argument to the SUBMIT_SLEEP is the number of seconds to sleep for every submit, which can be a fraction like 0.5",
)
project_code: str | None = Field(
default=None,
description="String identifier used to map hardware resource usage to a project or account. The project or account does not have to exist.",
)

@field_validator("server")
@field_validator("queue_system", mode="before")
@classmethod
def default_local_queue(cls, v):
if v is None:
return LocalQueueOptions(max_running=8)
elif "activate_script" not in v and ErtPluginManager().activate_script():
v["activate_script"] = ErtPluginManager().activate_script()
return v

@model_validator(mode="before")
@classmethod
def validate_server(cls, server): # pylint: disable=E0213
if server is not None and server:
warnings.warn(
"The simulator server property was deprecated and is no longer needed",
DeprecationWarning,
stacklevel=1,
)
def check_old_config(cls, data: Any) -> Any:
if isinstance(data, dict):
queue_system = data.get("queue_system")
queue_systems = {
"lsf": LsfQueueOptions,
"torque": TorqueQueueOptions,
"slurm": SlurmQueueOptions,
"local": LocalQueueOptions,
}
if isinstance(queue_system, str) and queue_system in queue_systems:
raise ValueError(
f"Queue system configuration has changed, valid options for {queue_system} are: {list(queue_systems[queue_system].__dataclass_fields__.keys())}"
)
return data
Loading

0 comments on commit dd77cf3

Please sign in to comment.