Skip to content

Commit

Permalink
Remove EnKFMain and replace usage with fixtures
Browse files Browse the repository at this point in the history
This removes the EnKFMain class, and replaces the remaning
usage with pytest style fixtures. The current usage was mostly
as a facade to ErtConfig, and it held state for the workflows.
  • Loading branch information
oyvindeide committed Jun 19, 2024
1 parent e04190d commit 3f3e682
Show file tree
Hide file tree
Showing 57 changed files with 700 additions and 503 deletions.
2 changes: 2 additions & 0 deletions docs/getting_started/howto/plugin_system.rst
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ Implement the hook specification as follows to register the workflow job ``CSV_E
def installable_workflow_jobs() -> Dict[str, str]:
return {"<path_to_workflow_job_config_file>": "CSV_EXPORT"}
.. _legacy_ert_workflow_jobs:

2. **Using the legacy_ertscript_workflow hook**

The second approach does not require creating a workflow job configuration file up-front,
Expand Down
87 changes: 87 additions & 0 deletions docs/reference/workflows/workflows.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,90 @@ and go through all the realizations in one loop, forward model jobs run in paral
The executable invoked by the workflow job can be an executable you
have written yourself - in any language, or it can be an existing
Linux command like e.g. :code:`cp` or :code:`mv`.

Internal workflow jobs
======================

.. warning::
Internal workflow jobs are under development and the API is subject to changes

Internal workflow jobs is a way to call custom python scripts as workflows. In order
to use this, create a class which inherits from `ErtScript`:

.. code-block:: python
from ert import ErtScript
class MyJob(ErtScript):
def run(self):
print("Hello World")
ERT will initialize this class and call the `run` function when the workflow is called,
either through hooks, or through the gui/cli.

The `run` function can be called with a number of arguments, depending on the context the workflow
is called. There are three distinct ways to call the `run` function:

1. If the `run` function is using `*args` in the `run` function, only the arguments from the user
configuration is passed to the workflow:

.. code-block:: python
class MyJob(ErtScript):
def run(self, *args):
print(f"Provided user arguments: {args}")
1. If the `run` function is using `positional arguments in the `run` function. In this case no fixtures
will be added:

.. warning::
This is not recommended, you are adviced to use either option 1 or option 3

.. code-block:: python
class MyJob(ErtScript):
def run(self, my_arg_1, my_arg_2):
print(f"Provided user arguments: {my_arg_1, my_arg_2}")
.. note::
The name of the argument is not required to be `args`, that is just convention.

3. There are a number of specially named arguments the user can call which gives access to internal
state of the experiment that is running:

.. glossary::

ert_config
This gives access to the full configuration of the running experiment

storage
This gives access to the storage of the current session

ensemble
This gives access to the current ensemble, making it possible to load responses and parameters

workflow_args
This gives access to the arguments from the user configuration file

.. note::
The current ensemble will depend on the context. For hooked workflows the ensemble will be:
`PRE_SIMULATION`: parameters and no reponses in ensemble
`POST_SIMULATION`: parameters and responses in ensemble
`PRE_FIRST_UPDATE`/`PRE_UPDATE`: parameters and responses in ensemble
`POST_UPDATE`: parameters and responses in ensemble
The ensemble will switch after the `POST_UPDATE` hook, and will move from prior -> posterior

.. code-block:: python
class MyJob(ErtScript):
def run(
self,
workflow_args: List,
ert_config: ErtConfig,
ensemble: Ensemble,
storage: Storage,
):
print(f"Provided user arguments: {workflow_args}")
For how to load internal workflow jobs into ERT, see: :ref:`installing workflows <legacy_ert_workflow_jobs>`
4 changes: 1 addition & 3 deletions src/ert/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
from ert.cli.monitor import Monitor
from ert.cli.workflow import execute_workflow
from ert.config import ErtConfig, QueueSystem
from ert.enkf_main import EnKFMain
from ert.ensemble_evaluator import EvaluatorServerConfig
from ert.mode_definitions import (
ENSEMBLE_EXPERIMENT_MODE,
Expand Down Expand Up @@ -57,7 +56,6 @@ def run_cli(args: Namespace, plugin_manager: Optional[ErtPluginManager] = None)
for fm_step in ert_config.forward_model_steps:
logger.info("Config contains forward model step %s", fm_step.name)

ert = EnKFMain(ert_config)
if not ert_config.observation_keys and args.mode not in [
ENSEMBLE_EXPERIMENT_MODE,
TEST_RUN_MODE,
Expand All @@ -82,7 +80,7 @@ def run_cli(args: Namespace, plugin_manager: Optional[ErtPluginManager] = None)
storage = open_storage(ert_config.ens_path, "w")

if args.mode == WORKFLOW_MODE:
execute_workflow(ert, storage, args.name)
execute_workflow(ert_config, storage, args.name)
return

status_queue: queue.SimpleQueue[StatusEvents] = queue.SimpleQueue()
Expand Down
10 changes: 6 additions & 4 deletions src/ert/cli/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,21 @@
from ert.job_queue import WorkflowRunner

if TYPE_CHECKING:
from ert.enkf_main import EnKFMain
from ert.config import ErtConfig
from ert.storage import Storage


def execute_workflow(ert: EnKFMain, storage: Storage, workflow_name: str) -> None:
def execute_workflow(
ert_config: ErtConfig, storage: Storage, workflow_name: str
) -> None:
logger = logging.getLogger(__name__)
try:
workflow = ert.ert_config.workflows[workflow_name]
workflow = ert_config.workflows[workflow_name]
except KeyError:
msg = "Workflow {} is not in the list of available workflows"
logger.error(msg.format(workflow_name))
return
runner = WorkflowRunner(workflow, ert, storage)
runner = WorkflowRunner(workflow, storage, ert_config=ert_config)
runner.run_blocking()
if not all(v["completed"] for v in runner.workflowReport().values()):
logger.error(f"Workflow {workflow_name} failed!")
4 changes: 3 additions & 1 deletion src/ert/config/ert_plugin.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from __future__ import annotations

from abc import ABC
from typing import Any, List

Expand All @@ -9,7 +11,7 @@ class CancelPluginException(Exception):


class ErtPlugin(ErtScript, ABC):
def getArguments(self, parent: Any = None) -> List[Any]: # noqa: PLR6301
def getArguments(self, args: List[Any]) -> List[Any]: # noqa: PLR6301
return []

def getName(self) -> str:
Expand Down
94 changes: 71 additions & 23 deletions src/ert/config/ert_script.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,18 @@
import logging
import sys
import traceback
import warnings
from abc import abstractmethod
from types import ModuleType
from typing import TYPE_CHECKING, Any, Callable, List, Optional, Type
from types import MappingProxyType, ModuleType
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Type, Union

from typing_extensions import deprecated

if TYPE_CHECKING:
from ert.enkf_main import EnKFMain
from ert.config import ErtConfig
from ert.storage import Ensemble, Storage

Fixtures = Union[ErtConfig, Ensemble, Storage]
logger = logging.getLogger(__name__)


Expand All @@ -28,19 +32,17 @@ class ErtScript:

def __init__(
self,
ert: EnKFMain,
storage: Storage,
ensemble: Optional[Ensemble] = None,
) -> None:
self.__ert = ert
self.__storage = storage
self.__ensemble = ensemble

self.__is_cancelled = False
self.__failed = False
self._stdoutdata = ""
self._stderrdata = ""

# Deprecated:
self._ert = None
self._ensemble = None
self._storage = None

@abstractmethod
def run(self, *arg: Any, **kwarg: Any) -> Any:
"""
Expand Down Expand Up @@ -68,21 +70,30 @@ def stderrdata(self) -> str:
self._stderrdata = self._stderrdata.decode()
return self._stderrdata

def ert(self) -> "EnKFMain":
@deprecated("Use fixtures to the run function instead")
def ert(self) -> Optional[ErtConfig]:
logger.info(f"Accessing EnKFMain from workflow: {self.__class__.__name__}")
return self.__ert

@property
def storage(self) -> Storage:
return self.__storage
return self._ert

@property
def ensemble(self) -> Optional[Ensemble]:
return self.__ensemble
warnings.warn(
"The ensemble property is deprecated, use the fixture to the run function instead",
DeprecationWarning,
stacklevel=1,
)
logger.info(f"Accessing ensemble from workflow: {self.__class__.__name__}")
return self._ensemble

@ensemble.setter
def ensemble(self, ensemble: Ensemble) -> None:
self.__ensemble = ensemble
@property
def storage(self) -> Optional[Storage]:
warnings.warn(
"The storage property is deprecated, use the fixture to the run function instead",
DeprecationWarning,
stacklevel=1,
)
logger.info(f"Accessing storage from workflow: {self.__class__.__name__}")
return self._storage

def isCancelled(self) -> bool:
return self.__is_cancelled
Expand All @@ -102,7 +113,9 @@ def initializeAndRun(
self,
argument_types: List[Type[Any]],
argument_values: List[str],
fixtures: Optional[Dict[str, Any]] = None,
) -> Any:
fixtures = {} if fixtures is None else fixtures
arguments = []
for index, arg_value in enumerate(argument_values):
arg_type = argument_types[index] if index < len(argument_types) else str
Expand All @@ -111,8 +124,24 @@ def initializeAndRun(
arguments.append(arg_type(arg_value)) # type: ignore
else:
arguments.append(None)

fixtures["workflow_args"] = arguments
try:
func_args = inspect.signature(self.run).parameters
# If the user has specified *args, we skip injecting fixtures, and just
# pass the user configured arguments
if not any([p.kind == p.VAR_POSITIONAL for p in func_args.values()]):
try:
arguments = self.insert_fixtures(func_args, fixtures)
except ValueError as e:
# This is here for backwards compatibility, the user does not have *argv
# but positional arguments. Can not be mixed with using fixtures.
logger.warning(
f"Mixture of fixtures and positional arguments, err: {e}"
)
# Part of deprecation
self._ert = fixtures.get("ert_config")
self._ensemble = fixtures.get("ensemble")
self._storage = fixtures.get("storage")
return self.run(*arguments)
except AttributeError as e:
error_msg = str(e)
Expand All @@ -137,6 +166,25 @@ def initializeAndRun(
# Need to have unique modules in case of identical object naming in scripts
__module_count = 0

def insert_fixtures(
self,
func_args: MappingProxyType[str, inspect.Parameter],
fixtures: Dict[str, Fixtures],
) -> List[Any]:
arguments = []
errors = []
for val in func_args:
if val in fixtures:
arguments.append(fixtures[val])
else:
errors.append(val)
if errors:
raise ValueError(
f"Plugin: {self.__class__.__name__} misconfigured, arguments: {errors} "
f"not found in fixtures: {list(fixtures)}"
)
return arguments

def output_stack_trace(self, error: str = "") -> None:
stack_trace = error or "".join(traceback.format_exception(*sys.exc_info()))
sys.stderr.write(
Expand All @@ -150,7 +198,7 @@ def output_stack_trace(self, error: str = "") -> None:
@staticmethod
def loadScriptFromFile(
path: str,
) -> Callable[["EnKFMain", "Storage"], "ErtScript"]:
) -> Callable[[], "ErtScript"]:
module_name = f"ErtScriptModule_{ErtScript.__module_count}"
ErtScript.__module_count += 1

Expand All @@ -171,7 +219,7 @@ def loadScriptFromFile(
@staticmethod
def __findErtScriptImplementations(
module: ModuleType,
) -> Callable[["EnKFMain", "Storage"], "ErtScript"]:
) -> Callable[[], "ErtScript"]:
result = []
for _, member in inspect.getmembers(
module,
Expand Down
10 changes: 3 additions & 7 deletions src/ert/config/external_ert_script.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,14 @@
import codecs
import sys
from subprocess import PIPE, Popen
from typing import TYPE_CHECKING, Any, Optional
from typing import Any, Optional

from .ert_script import ErtScript

if TYPE_CHECKING:
from ert.enkf_main import EnKFMain
from ert.storage import Storage


class ExternalErtScript(ErtScript):
def __init__(self, ert: EnKFMain, storage: Storage, executable: str):
super().__init__(ert, storage, None)
def __init__(self, executable: str):
super().__init__()

self.__executable = executable
self.__job: Optional[Popen[bytes]] = None
Expand Down
3 changes: 1 addition & 2 deletions src/ert/dark_storage/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
"""
Dark Storage is an API towards data provided by the legacy EnKFMain object and
the `storage/` directory.
Dark Storage is an API towards data provided the `storage/` directory.
"""
Loading

0 comments on commit 3f3e682

Please sign in to comment.