Skip to content

Commit

Permalink
Fix workflows running from gui's tools panel not working
Browse files Browse the repository at this point in the history
This commit fixes the issue where most of the legacy workflows would not work due to ert_config not being passed to workflow_runner in run_workflow_widget.
This commit refactors the workflow_runner/plugin_runner to not use
ert_config directly; and instead only pass `ert_config.runpaths`
  • Loading branch information
jonathan-eq committed Feb 4, 2025
1 parent 7c27a9c commit a59107f
Show file tree
Hide file tree
Showing 11 changed files with 54 additions and 47 deletions.
2 changes: 1 addition & 1 deletion src/ert/gui/tools/export/exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def run_export(self, parameters: list[Any]) -> None:

export_job_runner = WorkflowJobRunner(self.export_job)
user_warn = export_job_runner.run(
fixtures={"storage": self._notifier.storage, "ert_config": self.config},
fixtures={"storage": self._notifier.storage},
arguments=parameters,
)
if export_job_runner.hasFailed():
Expand Down
12 changes: 5 additions & 7 deletions src/ert/gui/tools/plugins/plugin_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,25 @@

from _ert.threading import ErtThread
from ert.config import CancelPluginException
from ert.runpaths import Runpaths
from ert.workflow_runner import WorkflowJobRunner

from .process_job_dialog import ProcessJobDialog

if TYPE_CHECKING:
from ert.config import ErtConfig
from ert.storage import LocalStorage

from .plugin import Plugin


class PluginRunner:
def __init__(
self, plugin: Plugin, ert_config: ErtConfig, storage: LocalStorage
self, plugin: Plugin, run_paths: Runpaths, storage: LocalStorage
) -> None:
super().__init__()
self.ert_config = ert_config
self.run_paths = run_paths
self.storage = storage
self.__plugin = plugin

self.__plugin_finished_callback: Callable[[], None] = lambda: None

self.__result = None
Expand All @@ -35,17 +34,16 @@ def __init__(
def run(self) -> None:
try:
plugin = self.__plugin

arguments = plugin.getArguments(
fixtures={"storage": self.storage, "ert_config": self.ert_config}
fixtures={"storage": self.storage, "run_paths": self.run_paths}
)
dialog = ProcessJobDialog(plugin.getName(), plugin.getParentWindow())
dialog.setObjectName("process_job_dialog")

dialog.cancelConfirmed.connect(self.cancel)
fixtures = {
k: getattr(self, k)
for k in ["storage", "ert_config"]
for k in ["storage", "run_paths"]
if getattr(self, k)
}
workflow_job_thread = ErtThread(
Expand Down
10 changes: 9 additions & 1 deletion src/ert/gui/tools/plugins/plugins_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from PyQt6.QtWidgets import QMenu

from ert.gui.tools import Tool
from ert.runpaths import Runpaths

from .plugin_runner import PluginRunner

Expand Down Expand Up @@ -35,8 +36,15 @@ def __init__(
self.__plugins = {}

self.menu = QMenu("&Plugins")
runpaths = Runpaths(
jobname_format=ert_config.model_config.jobname_format_string,
runpath_format=ert_config.model_config.runpath_format_string,
filename=str(ert_config.runpath_file),
substitutions=ert_config.substitutions,
eclbase=ert_config.model_config.eclbase_format_string,
)
for plugin in plugin_handler:
plugin_runner = PluginRunner(plugin, ert_config, notifier.storage)
plugin_runner = PluginRunner(plugin, runpaths, notifier.storage)
plugin_runner.setPluginFinishedCallback(self.trigger)

self.__plugins[plugin] = plugin_runner
Expand Down
8 changes: 8 additions & 0 deletions src/ert/gui/tools/workflows/run_workflow_widget.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from _ert.threading import ErtThread
from ert.gui.ertwidgets import EnsembleSelector
from ert.gui.tools.workflows.workflow_dialog import WorkflowDialog
from ert.runpaths import Runpaths
from ert.workflow_runner import WorkflowRunner

if TYPE_CHECKING:
Expand Down Expand Up @@ -128,6 +129,13 @@ def startWorkflow(self) -> None:
workflow,
storage=self.storage,
ensemble=self.source_ensemble_selector.currentData(),
run_paths=Runpaths(
jobname_format=self.config.model_config.jobname_format_string,
runpath_format=self.config.model_config.runpath_format_string,
filename=str(self.config.runpath_file),
substitutions=self.config.substitutions,
eclbase=self.config.model_config.eclbase_format_string,
),
)
self._workflow_runner.run()

Expand Down
7 changes: 1 addition & 6 deletions src/ert/plugins/hook_implementations/workflows/csv_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@
import pandas as pd

from ert import ErtScript, LibresFacade
from ert.config import ErtConfig
from ert.storage import Storage


def loadDesignMatrix(filename: str) -> pd.DataFrame:
Expand Down Expand Up @@ -52,16 +50,13 @@ def getDescription() -> str:

def run(
self,
ert_config: ErtConfig,
storage: Storage,
workflow_args: Sequence[str],
) -> str:
output_file = workflow_args[0]
ensemble_data_as_json = None if len(workflow_args) < 2 else workflow_args[1]
design_matrix_path = None if len(workflow_args) < 3 else workflow_args[2]
_ = True if len(workflow_args) < 4 else workflow_args[3]
drop_const_cols = False if len(workflow_args) < 5 else workflow_args[4]
facade = LibresFacade(ert_config)

ensemble_data_as_dict = (
json.loads(ensemble_data_as_json) if ensemble_data_as_json else {}
Expand Down Expand Up @@ -96,7 +91,7 @@ def run(
if not design_matrix_data.empty:
ensemble_data = ensemble_data.join(design_matrix_data, how="outer")

misfit_data = facade.load_all_misfit_data(ensemble)
misfit_data = LibresFacade.load_all_misfit_data(ensemble)
if not misfit_data.empty:
ensemble_data = ensemble_data.join(misfit_data, how="outer")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
from ert.exceptions import StorageError

if TYPE_CHECKING:
from ert.config import ErtConfig
from ert.storage import Ensemble


Expand All @@ -23,17 +22,14 @@ class ExportMisfitDataJob(ErtScript):
((response_value - observation_data) / observation_std)**2
"""

def run(
self, ert_config: ErtConfig, ensemble: Ensemble, workflow_args: list[Any]
) -> None:
def run(self, ensemble: Ensemble, workflow_args: list[Any]) -> None:
target_file = "misfit.hdf" if not workflow_args else workflow_args[0]

realizations = ensemble.get_realization_list_with_responses()

from ert import LibresFacade # noqa: PLC0415 (circular import)

facade = LibresFacade(ert_config)
misfit = facade.load_all_misfit_data(ensemble)
misfit = LibresFacade.load_all_misfit_data(ensemble)
if len(realizations) == 0 or misfit.empty:
raise StorageError("No responses loaded")
misfit.columns = pd.Index([val.split(":")[1] for val in misfit.columns])
Expand Down
22 changes: 11 additions & 11 deletions src/ert/plugins/hook_implementations/workflows/export_runpath.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@

from ert.config.ert_script import ErtScript
from ert.runpaths import Runpaths
from ert.storage import Ensemble
from ert.validation import rangestring_to_list

if TYPE_CHECKING:
from ert.config import ErtConfig
pass


class ExportRunpathJob(ErtScript):
Expand All @@ -33,20 +34,19 @@ class ExportRunpathJob(ErtScript):
file.
"""

def run(self, ert_config: ErtConfig, workflow_args: list[Any]) -> None:
def run(
self, run_paths: Runpaths, ensemble: Ensemble, workflow_args: list[Any]
) -> None:
args = " ".join(workflow_args).split() # Make sure args is a list of words
run_paths = Runpaths(
jobname_format=ert_config.model_config.jobname_format_string,
runpath_format=ert_config.model_config.runpath_format_string,
filename=str(ert_config.runpath_file),
substitutions=ert_config.substitutions,
eclbase=ert_config.model_config.eclbase_format_string,
)
print("VROOOOOOM ->>>>")
assert ensemble
iter = ensemble.iteration
reals = ensemble.ensemble_size
run_paths.write_runpath_list(
*self.get_ranges(
args,
ert_config.analysis_config.num_iterations,
ert_config.model_config.num_realizations,
iter,
reals,
)
)

Expand Down
15 changes: 8 additions & 7 deletions src/ert/run_models/base_run_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -677,11 +677,12 @@ def validate_successful_realizations_count(self) -> None:
def run_workflows(
self,
runtime: HookRuntime,
storage: Storage | None = None,
ensemble: Ensemble | None = None,
) -> None:
for workflow in self._hooked_workflows[runtime]:
WorkflowRunner(workflow, storage, ensemble).run_blocking()
WorkflowRunner(
workflow, self._storage, ensemble, self.run_paths
).run_blocking()

def _evaluate_and_postprocess(
self,
Expand All @@ -703,7 +704,7 @@ def _evaluate_and_postprocess(
context_env=self._context_env,
)

self.run_workflows(HookRuntime.PRE_SIMULATION, self._storage, ensemble)
self.run_workflows(HookRuntime.PRE_SIMULATION, ensemble)
successful_realizations = self.run_ensemble_evaluator(
run_args,
ensemble,
Expand All @@ -729,7 +730,7 @@ def _evaluate_and_postprocess(
f"{self.ensemble_size - num_successful_realizations}"
)
logger.info(f"Experiment run finished in: {self.get_runtime()}s")
self.run_workflows(HookRuntime.POST_SIMULATION, self._storage, ensemble)
self.run_workflows(HookRuntime.POST_SIMULATION, ensemble)

return num_successful_realizations

Expand Down Expand Up @@ -802,8 +803,8 @@ def update(
prior_ensemble=prior,
)
if prior.iteration == 0:
self.run_workflows(HookRuntime.PRE_FIRST_UPDATE, self._storage, prior)
self.run_workflows(HookRuntime.PRE_UPDATE, self._storage, prior)
self.run_workflows(HookRuntime.PRE_FIRST_UPDATE, prior)
self.run_workflows(HookRuntime.PRE_UPDATE, prior)
try:
smoother_update(
prior,
Expand All @@ -825,5 +826,5 @@ def update(
"Update algorithm failed for iteration:"
f"{posterior.iteration}. The following error occurred: {e}"
) from e
self.run_workflows(HookRuntime.POST_UPDATE, self._storage, prior)
self.run_workflows(HookRuntime.POST_UPDATE, prior)
return posterior
4 changes: 2 additions & 2 deletions src/ert/run_models/ensemble_experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ def run_experiment(
raise ErtRunError(str(exc)) from exc

if not restart:
self.run_workflows(HookRuntime.PRE_EXPERIMENT)
self.experiment = self._storage.create_experiment(
name=self.experiment_name,
parameters=(
Expand All @@ -109,6 +108,7 @@ def run_experiment(
name=self.ensemble_name,
ensemble_size=self.ensemble_size,
)
self.run_workflows(HookRuntime.PRE_EXPERIMENT, self.ensemble)
else:
self.active_realizations = self._create_mask_from_failed_realizations()

Expand Down Expand Up @@ -143,7 +143,7 @@ def run_experiment(
self.ensemble,
evaluator_server_config,
)
self.run_workflows(HookRuntime.POST_EXPERIMENT)
self.run_workflows(HookRuntime.POST_EXPERIMENT, self.ensemble)

@classmethod
def name(cls) -> str:
Expand Down
4 changes: 2 additions & 2 deletions src/ert/run_models/ensemble_smoother.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ def run_experiment(
) -> None:
self.log_at_startup()
self.restart = restart
self.run_workflows(HookRuntime.PRE_EXPERIMENT)
ensemble_format = self.target_ensemble_format
experiment = self._storage.create_experiment(
parameters=self._parameter_configuration,
Expand All @@ -89,6 +88,7 @@ def run_experiment(
ensemble_size=self.ensemble_size,
name=ensemble_format % 0,
)
self.run_workflows(HookRuntime.PRE_EXPERIMENT, prior)
self.set_env_key("_ERT_ENSEMBLE_ID", str(prior.id))
prior_args = create_run_arguments(
self.run_paths,
Expand Down Expand Up @@ -120,7 +120,7 @@ def run_experiment(
posterior,
evaluator_server_config,
)
self.run_workflows(HookRuntime.POST_EXPERIMENT)
self.run_workflows(HookRuntime.POST_EXPERIMENT, posterior)

@classmethod
def name(cls) -> str:
Expand Down
9 changes: 5 additions & 4 deletions src/ert/workflow_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
from concurrent.futures import Future
from typing import TYPE_CHECKING, Any, Self

from ert.config import ErtConfig, ErtScript, ExternalErtScript, Workflow, WorkflowJob
from ert.config import ErtScript, ExternalErtScript, Workflow, WorkflowJob
from ert.runpaths import Runpaths

if TYPE_CHECKING:
from ert.storage import Ensemble, Storage
Expand Down Expand Up @@ -109,12 +110,12 @@ def __init__(
workflow: Workflow,
storage: Storage | None = None,
ensemble: Ensemble | None = None,
ert_config: ErtConfig | None = None,
run_paths: Runpaths | None = None,
) -> None:
self.__workflow = workflow
self.storage = storage
self.ensemble = ensemble
self.ert_config = ert_config
self.run_paths = run_paths

self.__workflow_result: bool | None = None
self._workflow_executor = futures.ThreadPoolExecutor(max_workers=1)
Expand Down Expand Up @@ -152,7 +153,7 @@ def run_blocking(self) -> None:
self.__running = True
fixtures = {
k: getattr(self, k)
for k in ["storage", "ensemble", "ert_config"]
for k in ["storage", "ensemble", "run_paths"]
if getattr(self, k)
}

Expand Down

0 comments on commit a59107f

Please sign in to comment.