From 843f4fdee343e3a797d9a11981aeacc2814df7ab Mon Sep 17 00:00:00 2001 From: "Yngve S. Kristiansen" Date: Wed, 15 Jan 2025 21:15:43 +0100 Subject: [PATCH] Revert "Store everest results in ERT storage" Revert oopsie --- docs/everest/cli.rst | 73 +- src/ert/run_models/everest_run_model.py | 42 +- src/everest/__init__.py | 6 + src/everest/api/everest_data_api.py | 242 ++--- src/everest/bin/config_branch_script.py | 26 +- src/everest/bin/everexport_script.py | 27 +- src/everest/bin/utils.py | 37 +- src/everest/config/__init__.py | 2 + src/everest/config/everest_config.py | 22 + src/everest/detached/__init__.py | 77 +- src/everest/detached/jobs/everserver.py | 30 + src/everest/everest_storage.py | 908 ------------------ src/everest/export.py | 365 +++++++ tests/everest/conftest.py | 2 + .../entry_points/test_config_branch_entry.py | 42 +- tests/everest/entry_points/test_everexport.py | 311 ++++++ .../functional/test_main_everest_entry.py | 23 +- .../config_multiobj.yml/snapshot.json | 20 +- .../test_egg_snapshot/best_controls | 4 - .../best_objective_gradients_csv | 13 - .../test_egg_snapshot/best_objectives_csv | 4 - tests/everest/test_detached.py | 78 -- tests/everest/test_egg_simulation.py | 34 +- tests/everest/test_everlint.py | 17 + tests/everest/test_everserver.py | 10 +- tests/everest/test_export.py | 353 +++++++ tests/everest/test_math_func.py | 117 +++ tests/everest/test_simulator_cache.py | 5 + 28 files changed, 1593 insertions(+), 1297 deletions(-) delete mode 100644 src/everest/everest_storage.py create mode 100644 tests/everest/entry_points/test_everexport.py delete mode 100644 tests/everest/snapshots/test_egg_simulation/test_egg_snapshot/best_controls delete mode 100644 tests/everest/snapshots/test_egg_simulation/test_egg_snapshot/best_objective_gradients_csv delete mode 100644 tests/everest/snapshots/test_egg_simulation/test_egg_snapshot/best_objectives_csv create mode 100644 tests/everest/test_export.py diff --git a/docs/everest/cli.rst b/docs/everest/cli.rst index 8229da37c56..bdd699e1db5 100644 --- a/docs/everest/cli.rst +++ b/docs/everest/cli.rst @@ -52,7 +52,78 @@ Using again the command `everest monitor config_file.yml`, will reattach to the Everest `export` ================ -The everest export has been removed. All data is now always exported to the optimization output directory. +.. argparse:: + :module: everest.bin.everexport_script + :func: _build_args_parser + :prog: everexport_entry + + +The everest export functionality is configured in the export section of the config file. +The following represents an export section a config file set with default values. + +.. code-block:: yaml + + export: + skip_export: False + keywords: + batches: + discard_gradient: True # Export only non-gradient simulations + discard_rejected: True # Export only increased merit simulations + csv_output_filepath: everest_output_folder/config_file.csv + +When the export command `everest export config_file.yml` is run with a config file that does not define an export section default values will be used, a `config_file.csv` file in the Everest output folder will be created. +By default Everest exports only non-gradient with increased merit simulations when no config section is defined in the config file. +The file will contain optimization data for all the optimization batches and the available eclipse keywords (if a data file is available) for only the non-gradient simulations and the simulations that increase merit. + +**Examples** + +* Export only non-gradient simulation using the following export section in the config file + +.. code-block:: yaml + + export: + discard_rejected: False + +* Export only increased merit simulation using the following export section in the config file + +.. code-block:: yaml + + export: + discard_gradient: False + + +* Export only a list of available batches even if they are gradient batches and if no export section is defined. + + everest export config_file.yml --batches 0 2 4 + +The command above is equivalent to having the following export section defined in the config file `config_file.yml`. + +.. code-block:: yaml + + export: + batches: [0, 2, 4] + +* Exporting just a specific list of eclipse keywords requires the following export section defined in the config file. + +.. code-block:: yaml + + export: + keywords: ['FOIP', 'FOPT'] + +* Skip export by adding the following section in the config file. + +.. code-block:: yaml + + export: + skip_export: True + +* Export will also be skipped if an empty list of batches is defined in the export section. + +.. code-block:: yaml + + export: + batches: [] + ============== Everest `lint` diff --git a/src/ert/run_models/everest_run_model.py b/src/ert/run_models/everest_run_model.py index b768ee7a286..f05ac95ff8c 100644 --- a/src/ert/run_models/everest_run_model.py +++ b/src/ert/run_models/everest_run_model.py @@ -9,18 +9,21 @@ import shutil from collections import defaultdict from collections.abc import Callable +from dataclasses import dataclass from enum import IntEnum from pathlib import Path from types import TracebackType from typing import TYPE_CHECKING, Any, Protocol import numpy as np +import seba_sqlite.sqlite_storage from numpy import float64 from numpy._typing import NDArray from ropt.enums import EventType, OptimizerExitCode from ropt.evaluator import EvaluatorContext, EvaluatorResult from ropt.plan import BasicOptimizer from ropt.plan import Event as OptimizerEvent +from seba_sqlite import SqliteStorage from typing_extensions import TypedDict from _ert.events import EESnapshot, EESnapshotUpdate, Event @@ -29,7 +32,6 @@ from ert.runpaths import Runpaths from ert.storage import open_storage from everest.config import EverestConfig -from everest.everest_storage import EverestStorage, OptimalResult from everest.optimizer.everest2ropt import everest2ropt from everest.simulator.everest_to_ert import everest_to_ert_config from everest.strings import EVEREST @@ -68,6 +70,24 @@ class OptimizerCallback(Protocol): def __call__(self) -> str | None: ... +@dataclass +class OptimalResult: + batch: int + controls: list[Any] + total_objective: float + + @staticmethod + def from_seba_optimal_result( + o: seba_sqlite.sqlite_storage.OptimalResult | None = None, + ) -> OptimalResult | None: + if o is None: + return None + + return OptimalResult( + batch=o.batch, controls=o.controls, total_objective=o.total_objective + ) + + class EverestExitCode(IntEnum): COMPLETED = 1 TOO_FEW_REALIZATIONS = 2 @@ -186,21 +206,23 @@ def run_experiment( # Initialize the ropt optimizer: optimizer = self._create_optimizer() - self.ever_storage = EverestStorage( - output_dir=Path(self._everest_config.optimization_output_dir), - ) - self.ever_storage.observe_optimizer( - optimizer, - Path(self._everest_config.optimization_output_dir) - / "dakota" - / "OPT_DEFAULT.out", + # The SqliteStorage object is used to store optimization results from + # Seba in an sqlite database. It reacts directly to events emitted by + # Seba and is not called by Everest directly. The stored results are + # accessed by Everest via separate SebaSnapshot objects. + # This mechanism is outdated and not supported by the ropt package. It + # is retained for now via the seba_sqlite package. + seba_storage = SqliteStorage( # type: ignore + optimizer, self._everest_config.optimization_output_dir ) # Run the optimization: optimizer_exit_code = optimizer.run().exit_code # Extract the best result from the storage. - self._result = self.ever_storage.get_optimal_result() + self._result = OptimalResult.from_seba_optimal_result( + seba_storage.get_optimal_result() # type: ignore + ) if self._exit_code is None: match optimizer_exit_code: diff --git a/src/everest/__init__.py b/src/everest/__init__.py index f9646f32c68..b4c85c59128 100644 --- a/src/everest/__init__.py +++ b/src/everest/__init__.py @@ -21,12 +21,18 @@ __version__ = "0.0.0" from everest import detached, jobs, templates, util +from everest.bin.utils import export_to_csv, export_with_progress from everest.config_keys import ConfigKeys +from everest.export import MetaDataColumnNames, filter_data __author__ = "Equinor ASA and TNO" __all__ = [ "ConfigKeys", + "MetaDataColumnNames", "detached", + "export_to_csv", + "export_with_progress", + "filter_data", "jobs", "load", "templates", diff --git a/src/everest/api/everest_data_api.py b/src/everest/api/everest_data_api.py index 85ee49e9fb4..d3b3d804fab 100644 --- a/src/everest/api/everest_data_api.py +++ b/src/everest/api/everest_data_api.py @@ -1,66 +1,46 @@ -from pathlib import Path +from collections import OrderedDict -import polars import polars as pl -from ropt.enums import ConstraintType +from seba_sqlite.snapshot import SebaSnapshot from ert.storage import open_storage -from everest.config import EverestConfig -from everest.everest_storage import EverestStorage +from everest.config import EverestConfig, ServerConfig +from everest.detached import ServerStatus, everserver_status class EverestDataAPI: def __init__(self, config: EverestConfig, filter_out_gradient=True): self._config = config output_folder = config.optimization_output_dir - self._ever_storage = EverestStorage(Path(output_folder)) - self._ever_storage.read_from_output_dir() + self._snapshot = SebaSnapshot(output_folder).get_snapshot(filter_out_gradient) @property def batches(self): - return sorted( - b.batch_id - for b in self._ever_storage.data.batches - if b.batch_objectives is not None - ) + batch_ids = list({opt.batch_id for opt in self._snapshot.optimization_data}) + return sorted(batch_ids) @property def accepted_batches(self): - return sorted( - b.batch_id for b in self._ever_storage.data.batches if b.is_improvement + batch_ids = list( + {opt.batch_id for opt in self._snapshot.optimization_data if opt.merit_flag} ) + return sorted(batch_ids) @property def objective_function_names(self): - return sorted( - self._ever_storage.data.objective_functions["objective_name"] - .unique() - .to_list() - ) + return [fnc.name for fnc in self._snapshot.metadata.objectives.values()] @property def output_constraint_names(self): - return ( - sorted( - self._ever_storage.data.nonlinear_constraints["constraint_name"] - .unique() - .to_list() - ) - if self._ever_storage.data.nonlinear_constraints is not None - else [] - ) + return [fnc.name for fnc in self._snapshot.metadata.constraints.values()] def input_constraint(self, control): - # Note: This function is weird, its existence is probably not well-justified - # consider removing! - initial_values = self._ever_storage.data.controls - control_spec = initial_values.filter( - pl.col("control_name") == control - ).to_dicts()[0] - return { - "min": control_spec.get("lower_bounds"), - "max": control_spec.get("upper_bounds"), - } + controls = [ + con + for con in self._snapshot.metadata.controls.values() + if con.name == control + ] + return {"min": controls[0].min_value, "max": controls[0].max_value} def output_constraint(self, constraint): """ @@ -70,128 +50,106 @@ def output_constraint(self, constraint): "right_hand_side" is a constant real number that indicates the constraint bound/target. """ - - constraint_dict = self._ever_storage.data.nonlinear_constraints.to_dicts()[0] + constraints = [ + con + for con in self._snapshot.metadata.constraints.values() + if con.name == constraint + ] return { - "type": ConstraintType(constraint_dict["constraint_type"]).name.lower(), - "right_hand_side": constraint_dict["constraint_rhs_value"], + "type": constraints[0].constraint_type, + "right_hand_side": constraints[0].rhs_value, } @property def realizations(self): - return sorted( - self._ever_storage.data.batches[0] - .realization_objectives["realization"] - .unique() - .to_list() + return list( + OrderedDict.fromkeys( + int(sim.realization) for sim in self._snapshot.simulation_data + ) ) @property def simulations(self): - return sorted( - self._ever_storage.data.batches[0] - .realization_objectives["simulation_id"] - .unique() - .to_list() + return list( + OrderedDict.fromkeys( + [int(sim.simulation) for sim in self._snapshot.simulation_data] + ) ) @property def control_names(self): - return sorted( - self._ever_storage.data.controls["control_name"].unique().to_list() - ) + return [con.name for con in self._snapshot.metadata.controls.values()] @property def control_values(self): - all_control_names = self._ever_storage.data.controls["control_name"].to_list() - new = [] - for batch in self._ever_storage.data.batches: - if batch.realization_controls is None: - continue - - for controls_dict in batch.realization_controls.to_dicts(): - for name in all_control_names: - new.append( - { - "control": name, - "batch": batch.batch_id, - "value": controls_dict[name], - } - ) - - return new + controls = [con.name for con in self._snapshot.metadata.controls.values()] + return [ + {"control": con, "batch": sim.batch, "value": sim.controls[con]} + for sim in self._snapshot.simulation_data + for con in controls + if con in sim.controls + ] @property def objective_values(self): return [ - b for b in self._ever_storage.data.batches if b.batch_objectives is not None + { + "function": objective.name, + "batch": sim.batch, + "realization": sim.realization, + "simulation": sim.simulation, + "value": sim.objectives[objective.name], + "weight": objective.weight, + "norm": objective.normalization, + } + for sim in self._snapshot.simulation_data + for objective in self._snapshot.metadata.objectives.values() + if objective.name in sim.objectives ] @property def single_objective_values(self): - batch_datas = polars.concat( - [ - b.batch_objectives.select( - c for c in b.batch_objectives.columns if c != "merit_value" - ).with_columns( - polars.lit(1 if b.is_improvement else 0).alias("accepted") - ) - for b in self._ever_storage.data.batches - if b.realization_controls is not None - ] - ) - objectives = self._ever_storage.data.objective_functions - objective_names = objectives["objective_name"].unique().to_list() - - for o in objectives.to_dicts(): - batch_datas = batch_datas.with_columns( - polars.col(o["objective_name"]) * o["weight"] * o["normalization"] - ) - - columns = [ - "batch", - "objective", - "accepted", - *(objective_names if len(objective_names) > 1 else []), + single_obj = [ + { + "batch": optimization_el.batch_id, + "objective": optimization_el.objective_value, + "accepted": optimization_el.merit_flag, + } + for optimization_el in self._snapshot.optimization_data ] - - return ( - batch_datas.rename( - {"total_objective_value": "objective", "batch_id": "batch"} - ) - .select(columns) - .to_dicts() - ) + metadata = { + func.name: {"weight": func.weight, "norm": func.normalization} + for func in self._snapshot.metadata.functions.values() + if func.function_type == func.FUNCTION_OBJECTIVE_TYPE + } + if len(metadata) == 1: + return single_obj + objectives = [] + for name, values in self._snapshot.expected_objectives.items(): + for idx, val in enumerate(values): + factor = metadata[name]["weight"] * metadata[name]["norm"] + if len(objectives) > idx: + objectives[idx].update({name: val * factor}) + else: + objectives.append({name: val * factor}) + for idx, obj in enumerate(single_obj): + obj.update(objectives[idx]) + + return single_obj @property def gradient_values(self): - all_batch_data = [ - b.batch_objective_gradient - for b in self._ever_storage.data.batches - if b.batch_objective_gradient is not None and b.is_improvement - ] - if not all_batch_data: - return [] - - all_info = polars.concat(all_batch_data) - objective_columns = [ - c - for c in all_info.drop(["batch_id", "control_name"]).columns - if not c.endswith(".total") + return [ + { + "batch": optimization_el.batch_id, + "function": function, + "control": control, + "value": value, + } + for optimization_el in self._snapshot.optimization_data + for function, info in optimization_el.gradient_info.items() + for control, value in info.items() ] - return ( - all_info.select("batch_id", "control_name", *objective_columns) - .unpivot( - on=objective_columns, - index=["batch_id", "control_name"], - variable_name="function", - value_name="value", - ) - .rename({"control_name": "control", "batch_id": "batch"}) - .sort(by=["batch", "control"]) - .select(["batch", "function", "control", "value"]) - .to_dicts() - ) def summary_values(self, batches=None, keys=None): if batches is None: @@ -222,13 +180,16 @@ def summary_values(self, batches=None, keys=None): summary = summary.with_columns( pl.Series("batch", [batch_id] * summary.shape[0]) ) - - realization_map = ( - self._ever_storage.data.simulation_to_geo_realization_map - ) + # The realization ID as defined by Everest must be + # retrieved via the seba snapshot. + realization_map = { + sim.simulation: sim.realization + for sim in self._snapshot.simulation_data + if sim.batch == batch_id + } realizations = pl.Series( "realization", - [realization_map.get(int(sim)) for sim in summary["simulation"]], + [realization_map.get(str(sim)) for sim in summary["simulation"]], ) realizations = realizations.cast(pl.Int64, strict=False) summary = summary.with_columns(realizations) @@ -240,3 +201,12 @@ def summary_values(self, batches=None, keys=None): @property def output_folder(self): return self._config.output_dir + + @property + def everest_csv(self): + status_path = ServerConfig.get_everserver_status_path(self._config.output_dir) + state = everserver_status(status_path) + if state["status"] == ServerStatus.completed: + return self._config.export_path + else: + return None diff --git a/src/everest/bin/config_branch_script.py b/src/everest/bin/config_branch_script.py index ac42c88e717..b53453eb859 100644 --- a/src/everest/bin/config_branch_script.py +++ b/src/everest/bin/config_branch_script.py @@ -1,15 +1,16 @@ import argparse from copy import deepcopy as copy from functools import partial -from pathlib import Path +from os.path import exists, join from typing import Any from ruamel.yaml import YAML +from seba_sqlite.database import Database as seba_db +from seba_sqlite.snapshot import SebaSnapshot from everest.config import EverestConfig from everest.config_file_loader import load_yaml from everest.config_keys import ConfigKeys as CK -from everest.everest_storage import EverestStorage def _yaml_config(file_path: str, parser) -> tuple[str, dict[str, Any] | None]: @@ -45,19 +46,10 @@ def _build_args_parser(): def opt_controls_by_batch(optimization_dir, batch): - storage = EverestStorage(Path(optimization_dir)) - storage.read_from_output_dir() - - control_names = storage.data.controls["control_name"] - batch_data = next((b for b in storage.data.batches if b.batch_id == batch), None) - - if batch_data: - # All geo-realizations should have the same unperturbed control values per batch - # hence it does not matter which realization we select the controls for - return batch_data.realization_controls.select( - control_names.to_list() - ).to_dicts()[0] - + snapshot = SebaSnapshot(optimization_dir) + for opt_data in snapshot.get_optimization_data(): + if opt_data.batch_id == batch: + return opt_data.controls return None @@ -100,6 +92,10 @@ def config_branch_entry(args=None): options = parser.parse_args(args) optimization_dir, yml_config = options.input_config + db_path = join(optimization_dir, seba_db.FILENAME) + if not exists(db_path): + parser.error(f"Optimization source {db_path} not found") + opt_controls = opt_controls_by_batch(optimization_dir, options.batch) if opt_controls is None: parser.error(f"Batch {options.batch} not present in optimization data") diff --git a/src/everest/bin/everexport_script.py b/src/everest/bin/everexport_script.py index 500e59f02b1..a7f0e15e60d 100755 --- a/src/everest/bin/everexport_script.py +++ b/src/everest/bin/everexport_script.py @@ -4,7 +4,10 @@ import logging from functools import partial +from everest import export_to_csv, export_with_progress from everest.config import EverestConfig +from everest.config.export_config import ExportConfig +from everest.export import check_for_errors from everest.strings import EVEREST @@ -19,9 +22,27 @@ def everexport_entry(args=None): config = options.config_file - logger.info("Everexport deprecation warning seen") - print( - f"Everexport is deprecated, optimization results already exist @ {config.optimization_output_dir}" + # Turn into .export once + # explicit None is disallowed + if config.export is None: + config.export = ExportConfig() + + if options.batches is not None: + batch_list = [int(item) for item in options.batches] + config.export.batches = batch_list + + err_msgs, export_ecl = check_for_errors( + config=config.export, + optimization_output_path=config.optimization_output_dir, + storage_path=config.storage_dir, + data_file_path=config.model.data_file, + ) + for msg in err_msgs: + logger.warning(msg) + + export_to_csv( + data_frame=export_with_progress(config, export_ecl), + export_path=config.export_path, ) diff --git a/src/everest/bin/utils.py b/src/everest/bin/utils.py index ef73ce988ec..19971b9adfe 100644 --- a/src/everest/bin/utils.py +++ b/src/everest/bin/utils.py @@ -8,8 +8,10 @@ import colorama from colorama import Fore +from pandas import DataFrame from ert.resources import all_shell_script_fm_steps +from everest.config import EverestConfig from everest.detached import ( OPT_PROGRESS_ID, SIM_PROGRESS_ID, @@ -18,9 +20,41 @@ get_opt_status, start_monitor, ) +from everest.export import export_data from everest.simulator import JOB_FAILURE, JOB_RUNNING, JOB_SUCCESS from everest.strings import EVEREST +try: + from progressbar import AdaptiveETA, Bar, Percentage, ProgressBar, Timer +except ImportError: + ProgressBar = None # type: ignore + + +def export_with_progress(config: EverestConfig, export_ecl=True): + logging.getLogger(EVEREST).info("Exporting results to csv ...") + if ProgressBar is not None: + widgets = [Percentage(), " ", Bar(), " ", Timer(), " ", AdaptiveETA()] + with ProgressBar(max_value=1, widgets=widgets) as bar: + return export_data( + export_config=config.export, + output_dir=config.output_dir, + data_file=config.model.data_file if config.model else None, + export_ecl=export_ecl, + progress_callback=bar.update, + ) + return export_data( + export_config=config.export, + output_dir=config.output_dir, + data_file=config.model.data_file if config.model else None, + export_ecl=export_ecl, + ) + + +def export_to_csv(data_frame: DataFrame, export_path: str) -> None: + os.makedirs(os.path.dirname(export_path), exist_ok=True) + data_frame.to_csv(export_path, sep=";", index=False) + logging.getLogger(EVEREST).info(f"Data exported to {export_path}") + def handle_keyboard_interrupt(signal, frame, options): print("\n" + "=" * 80) @@ -301,5 +335,6 @@ def report_on_previous_run( f"Optimization completed.\n" "\nTo re-run the optimization use command:\n" f" `everest run --new-run {config_file}`\n" - f"Results are stored in {optimization_output_dir}" + "To export the results use command:\n" + f" `everest export {config_file}`" ) diff --git a/src/everest/config/__init__.py b/src/everest/config/__init__.py index 4f722f14f7d..4ce9c72f7e5 100644 --- a/src/everest/config/__init__.py +++ b/src/everest/config/__init__.py @@ -6,6 +6,7 @@ from .cvar_config import CVaRConfig from .environment_config import EnvironmentConfig from .everest_config import EverestConfig, EverestValidationError +from .export_config import ExportConfig from .input_constraint_config import InputConstraintConfig from .install_data_config import InstallDataConfig from .install_job_config import InstallJobConfig @@ -28,6 +29,7 @@ "EnvironmentConfig", "EverestConfig", "EverestValidationError", + "ExportConfig", "InputConstraintConfig", "InstallDataConfig", "InstallJobConfig", diff --git a/src/everest/config/everest_config.py b/src/everest/config/everest_config.py index 69ece8aedec..f41f00fb736 100644 --- a/src/everest/config/everest_config.py +++ b/src/everest/config/everest_config.py @@ -722,6 +722,28 @@ def function_aliases(self) -> dict[str, str]: aliases[f"{constraint.name}:upper"] = constraint.name return aliases + @property + def export_path(self): + """Returns the export file path. If not file name is provide the default + export file name will have the same name as the config file, with the '.csv' + extension.""" + + export = self.export + + output_path = None + if export is not None: + output_path = export.csv_output_filepath + + if output_path is None: + output_path = "" + + full_file_path = os.path.join(self.output_dir, output_path) + if output_path: + return full_file_path + else: + default_export_file = f"{os.path.splitext(self.config_file)[0]}.csv" + return os.path.join(full_file_path, default_export_file) + def to_dict(self) -> dict: the_dict = self.model_dump(exclude_none=True) diff --git a/src/everest/detached/__init__.py b/src/everest/detached/__init__.py index ca3855de805..60a3c1ab96f 100644 --- a/src/everest/detached/__init__.py +++ b/src/everest/detached/__init__.py @@ -11,15 +11,15 @@ from pathlib import Path from typing import Literal -import polars import requests +from seba_sqlite.exceptions import ObjectNotFoundError +from seba_sqlite.snapshot import SebaSnapshot from ert.scheduler import create_driver from ert.scheduler.driver import Driver, FailedSubmit from ert.scheduler.event import StartedEvent from everest.config import EverestConfig, ServerConfig from everest.config_keys import ConfigKeys as CK -from everest.everest_storage import EverestStorage from everest.strings import ( EVEREST_SERVER_CONFIG, OPT_PROGRESS_ENDPOINT, @@ -124,63 +124,32 @@ def wait_for_server(output_dir: str, timeout: int) -> None: def get_opt_status(output_folder): - """Return a dictionary with optimization information retrieved from storage""" - if not Path(output_folder).exists() or not os.listdir(output_folder): + """Retrieve a seba database snapshot and return a dictionary with + optimization information.""" + if not os.path.exists(os.path.join(output_folder, "seba.db")): return {} - - storage = EverestStorage(Path(output_folder)) try: - storage.read_from_output_dir() - except FileNotFoundError: - # Optimization output dir exists and not empty, but still missing - # actual stored results + seba_snapshot = SebaSnapshot(output_folder) + except ObjectNotFoundError: return {} - - objective_names = storage.data.objective_functions["objective_name"].to_list() - control_names = storage.data.controls["control_name"].to_list() - - expected_objectives = polars.concat( - [ - b.batch_objectives.select(objective_names) - for b in storage.data.batches - if b.batch_objectives is not None - ] - ).to_dict(as_series=False) - - expected_total_objective = [ - b.batch_objectives["total_objective_value"].item() - for b in storage.data.batches - if b.batch_objectives is not None - ] - - improvement_batches = [b.batch_id for b in storage.data.batches if b.is_improvement] - - cli_monitor_data = { - "batches": [ - b.batch_id - for b in storage.data.batches - if b.realization_controls is not None and b.batch_objectives is not None - ], - "controls": [ - b.realization_controls.select(control_names).to_dicts()[0] - for b in storage.data.batches - if b.realization_controls is not None - ], - "objective_value": expected_total_objective, - "expected_objectives": expected_objectives, - } + snapshot = seba_snapshot.get_snapshot(filter_out_gradient=True) + + cli_monitor_data = {} + if snapshot.optimization_data: + cli_monitor_data = { + "batches": [item.batch_id for item in snapshot.optimization_data], + "controls": [item.controls for item in snapshot.optimization_data], + "objective_value": [ + item.objective_value for item in snapshot.optimization_data + ], + "expected_objectives": snapshot.expected_objectives, + } return { - "objective_history": expected_total_objective, - "control_history": polars.concat( - [ - b.realization_controls.select(control_names) - for b in storage.data.batches - if b.realization_controls is not None - ] - ).to_dict(as_series=False), - "objectives_history": expected_objectives, - "accepted_control_indices": improvement_batches, + "objective_history": snapshot.expected_single_objective, + "control_history": snapshot.optimization_controls, + "objectives_history": snapshot.expected_objectives, + "accepted_control_indices": snapshot.increased_merit_indices, "cli_monitor_data": cli_monitor_data, } diff --git a/src/everest/detached/jobs/everserver.py b/src/everest/detached/jobs/everserver.py index 2725350465a..a8a9402911a 100755 --- a/src/everest/detached/jobs/everserver.py +++ b/src/everest/detached/jobs/everserver.py @@ -34,8 +34,10 @@ from ert.config.parsing.queue_system import QueueSystem from ert.ensemble_evaluator import EvaluatorServerConfig from ert.run_models.everest_run_model import EverestExitCode, EverestRunModel +from everest import export_to_csv, export_with_progress from everest.config import EverestConfig, ServerConfig from everest.detached import ServerStatus, get_opt_status, update_everserver_status +from everest.export import check_for_errors from everest.plugins.everest_plugin_manager import EverestPluginManager from everest.simulator import JOB_FAILURE from everest.strings import ( @@ -331,6 +333,34 @@ def main(): ) return + try: + # Exporting data + update_everserver_status(status_path, ServerStatus.exporting_to_csv) + + if config.export is not None: + err_msgs, export_ecl = check_for_errors( + config=config.export, + optimization_output_path=config.optimization_output_dir, + storage_path=config.storage_dir, + data_file_path=config.model.data_file, + ) + for msg in err_msgs: + logging.getLogger(EVEREST).warning(msg) + else: + export_ecl = True + + export_to_csv( + data_frame=export_with_progress(config, export_ecl), + export_path=config.export_path, + ) + except: + update_everserver_status( + status_path, + ServerStatus.failed, + message=traceback.format_exc(), + ) + return + update_everserver_status(status_path, ServerStatus.completed, message=message) diff --git a/src/everest/everest_storage.py b/src/everest/everest_storage.py deleted file mode 100644 index 60842b24c3a..00000000000 --- a/src/everest/everest_storage.py +++ /dev/null @@ -1,908 +0,0 @@ -from __future__ import annotations - -import json -import logging -import os -from dataclasses import dataclass, field -from pathlib import Path -from typing import ( - Any, - TypedDict, - cast, -) - -import numpy as np -import polars -from ropt.enums import EventType -from ropt.plan import BasicOptimizer, Event -from ropt.results import FunctionResults, GradientResults, convert_to_maximize - -logger = logging.getLogger(__name__) - - -@dataclass -class OptimalResult: - batch: int - controls: list[Any] - total_objective: float - - -def try_read_df(path: Path) -> polars.DataFrame | None: - return polars.read_parquet(path) if path.exists() else None - - -@dataclass -class BatchDataFrames: - batch_id: int - realization_controls: polars.DataFrame - batch_objectives: polars.DataFrame | None - realization_objectives: polars.DataFrame | None - batch_constraints: polars.DataFrame | None - realization_constraints: polars.DataFrame | None - batch_objective_gradient: polars.DataFrame | None - perturbation_objectives: polars.DataFrame | None - batch_constraint_gradient: polars.DataFrame | None - perturbation_constraints: polars.DataFrame | None - is_improvement: bool | None = False - - @property - def existing_dataframes(self) -> dict[str, polars.DataFrame]: - return { - k: cast(polars.DataFrame, getattr(self, k)) - for k in [ - "batch_objectives", - "batch_objective_gradient", - "batch_constraints", - "batch_constraint_gradient", - "realization_controls", - "realization_objectives", - "realization_constraints", - "perturbation_objectives", - "perturbation_constraints", - ] - if getattr(self, k) is not None - } - - -@dataclass -class EverestStorageDataFrames: - batches: list[BatchDataFrames] = field(default_factory=list) - controls: polars.DataFrame | None = None - objective_functions: polars.DataFrame | None = None - nonlinear_constraints: polars.DataFrame | None = None - realization_weights: polars.DataFrame | None = None - - @property - def simulation_to_geo_realization_map(self) -> dict[int, int]: - """ - Mapping from simulation ID to geo-realization - """ - dummy_df = next( - ( - b.realization_controls - for b in self.batches - if b.realization_controls is not None - ), - None, - ) - - if dummy_df is None: - return {} - - mapping = {} - for d in dummy_df.select("realization", "simulation_id").to_dicts(): - mapping[int(d["simulation_id"])] = int(d["realization"]) - - return mapping - - @property - def existing_dataframes(self) -> dict[str, polars.DataFrame]: - return { - k: cast(polars.DataFrame, getattr(self, k)) - for k in [ - "controls", - "objective_functions", - "nonlinear_constraints", - "realization_weights", - ] - if getattr(self, k) is not None - } - - def write_to_experiment( - self, experiment: _OptimizerOnlyExperiment, write_csv=False - ) -> None: - for df_name, df in self.existing_dataframes.items(): - with open( - experiment.optimizer_mount_point / f"{df_name}.json", - mode="w+", - encoding="utf-8", - ) as f: - f.write(json.dumps(df.to_dicts())) - - df.write_parquet(f"{experiment.optimizer_mount_point / df_name}.parquet") - - if write_csv: - df.write_csv(f"{experiment.optimizer_mount_point / df_name}.csv") - - for batch_data in self.batches: - ensemble = experiment.get_ensemble_by_name(f"batch_{batch_data.batch_id}") - with open( - ensemble.optimizer_mount_point / "batch.json", "w+", encoding="utf-8" - ) as f: - json.dump( - { - "batch_id": batch_data.batch_id, - "is_improvement": batch_data.is_improvement, - }, - f, - ) - - for df_key, df in batch_data.existing_dataframes.items(): - df.write_parquet(ensemble.optimizer_mount_point / f"{df_key}.parquet") - df.write_csv(ensemble.optimizer_mount_point / f"{df_key}.csv") - df.write_json(ensemble.optimizer_mount_point / f"{df_key}.json") - - def read_from_experiment(self, experiment: _OptimizerOnlyExperiment) -> None: - self.controls = polars.read_parquet( - experiment.optimizer_mount_point / "controls.parquet" - ) - self.objective_functions = polars.read_parquet( - experiment.optimizer_mount_point / "objective_functions.parquet" - ) - - if ( - experiment.optimizer_mount_point / "nonlinear_constraints.parquet" - ).exists(): - self.nonlinear_constraints = polars.read_parquet( - experiment.optimizer_mount_point / "nonlinear_constraints.parquet" - ) - - if (experiment.optimizer_mount_point / "realization_weights.parquet").exists(): - self.realization_weights = polars.read_parquet( - experiment.optimizer_mount_point / "realization_weights.parquet" - ) - - for ens in experiment.ensembles.values(): - with open(ens.optimizer_mount_point / "batch.json", encoding="utf-8") as f: - info = json.load(f) - - self.batches.append( - BatchDataFrames( - batch_id=info["batch_id"], - **{ - df_name: try_read_df( - Path(ens.optimizer_mount_point) / f"{df_name}.parquet" - ) - for df_name in [ - "batch_objectives", - "batch_objective_gradient", - "batch_constraints", - "batch_constraint_gradient", - "realization_controls", - "realization_objectives", - "realization_constraints", - "perturbation_objectives", - "perturbation_constraints", - ] - }, - is_improvement=info["is_improvement"], - ) - ) - - self.batches.sort(key=lambda b: b.batch_id) - - -class _OptimizerOnlyEnsemble: - def __init__(self, output_dir: Path) -> None: - self._output_dir = output_dir - - @property - def optimizer_mount_point(self) -> Path: - if not (self._output_dir / "optimizer").exists(): - Path.mkdir(self._output_dir / "optimizer", parents=True) - - return self._output_dir / "optimizer" - - -class _OptimizerOnlyExperiment: - """ - Mocks an ERT storage, if we want to store optimization results within the - ERT storage, we can use an ERT Experiment object with an optimizer_mount_point - property - """ - - def __init__(self, output_dir: Path) -> None: - self._output_dir = output_dir - self._ensembles = {} - - @property - def optimizer_mount_point(self) -> Path: - if not (self._output_dir / "optimizer").exists(): - Path.mkdir(self._output_dir / "optimizer", parents=True) - - return self._output_dir / "optimizer" - - @property - def ensembles(self) -> dict[str, _OptimizerOnlyEnsemble]: - return { - str(d): _OptimizerOnlyEnsemble(self._output_dir / "ensembles" / d) - for d in os.listdir(self._output_dir / "ensembles") - if "batch_" in d - } - - def get_ensemble_by_name(self, name: str) -> _OptimizerOnlyEnsemble: - if name not in self._ensembles: - self._ensembles[name] = _OptimizerOnlyEnsemble( - self._output_dir / "ensembles" / name - ) - - return self._ensembles[name] - - -@dataclass -class _EvaluationResults(TypedDict): - realization_controls: polars.DataFrame - batch_objectives: polars.DataFrame - realization_objectives: polars.DataFrame - batch_constraints: polars.DataFrame | None - realization_constraints: polars.DataFrame | None - - -@dataclass -class _GradientResults(TypedDict): - batch_objective_gradient: polars.DataFrame | None - perturbation_objectives: polars.DataFrame | None - batch_constraint_gradient: polars.DataFrame | None - perturbation_constraints: polars.DataFrame | None - - -@dataclass -class _MeritValue: - value: float - iter: int - - -class EverestStorage: - def __init__( - self, - output_dir: Path, - ) -> None: - self._control_ensemble_id = 0 - self._gradient_ensemble_id = 0 - - self._output_dir = output_dir - self._merit_file: Path | None = None - self.data = EverestStorageDataFrames() - - @staticmethod - def _rename_ropt_df_columns(df: polars.DataFrame) -> polars.DataFrame: - """ - Renames columns of a dataframe from ROPT to what will be displayed - to the user. - """ - scaled_cols = [c for c in df.columns if c.lower().startswith("scaled")] - if len(scaled_cols) > 0: - raise ValueError("Scaled columns should not be stored into Everest storage") - - renames = { - "objective": "objective_name", - "weighted_objective": "total_objective_value", - "variable": "control_name", - "variables": "control_value", - "objectives": "objective_value", - "constraints": "constraint_value", - "nonlinear_constraint": "constraint_name", - "perturbed_variables": "perturbed_control_value", - "perturbed_objectives": "perturbed_objective_value", - "perturbed_constraints": "perturbed_constraint_value", - "evaluation_ids": "simulation_id", - } - return df.rename({k: v for k, v in renames.items() if k in df.columns}) - - @staticmethod - def _enforce_dtypes(df: polars.DataFrame) -> polars.DataFrame: - dtypes = { - "batch_id": polars.UInt32, - "perturbation": polars.UInt32, - "realization": polars.UInt32, - # -1 is used as a value in simulator cache. - # thus we need signed, otherwise we could do unsigned - "simulation_id": polars.Int32, - "objective_name": polars.String, - "control_name": polars.String, - "constraint_name": polars.String, - "total_objective_value": polars.Float64, - "control_value": polars.Float64, - "objective_value": polars.Float64, - "constraint_value": polars.Float64, - "perturbed_control_value": polars.Float64, - "perturbed_objective_value": polars.Float64, - "perturbed_constraint_value": polars.Float64, - } - - existing_cols = set(df.columns) - unaccounted_cols = existing_cols - set(dtypes) - if len(unaccounted_cols) > 0: - raise KeyError( - f"Expected all keys to have a specified dtype, found {unaccounted_cols}" - ) - - df = df.cast( - { - colname: dtype - for colname, dtype in dtypes.items() - if colname in df.columns - } - ) - - return df - - def write_to_output_dir(self) -> None: - exp = _OptimizerOnlyExperiment(self._output_dir) - - # csv writing mostly for dev/debugging/quick inspection - self.data.write_to_experiment(exp, write_csv=True) - - def read_from_output_dir(self) -> None: - exp = _OptimizerOnlyExperiment(self._output_dir) - self.data.read_from_experiment(exp) - - def observe_optimizer( - self, - optimizer: BasicOptimizer, - merit_file: Path, - ) -> None: - # We only need this file if we are observing a running ROPT instance - # (using dakota backend) - self._merit_file = merit_file - - optimizer.add_observer( - EventType.START_OPTIMIZER_STEP, self._on_start_optimization - ) - optimizer.add_observer( - EventType.FINISHED_EVALUATION, self._on_batch_evaluation_finished - ) - optimizer.add_observer( - EventType.FINISHED_OPTIMIZER_STEP, - self._on_optimization_finished, - ) - - def _on_start_optimization(self, event: Event) -> None: - def _format_control_names(control_names): - converted_names = [] - for name in control_names: - converted = f"{name[0]}_{name[1]}" - if len(name) > 2: - converted += f"-{name[2]}" - converted_names.append(converted) - - return converted_names - - config = event.config - - # Note: We probably do not have to store - # all of this information, consider removing. - self.data.controls = polars.DataFrame( - { - "control_name": polars.Series( - _format_control_names(config.variables.names), dtype=polars.String - ), - "initial_value": polars.Series( - config.variables.initial_values, dtype=polars.Float64 - ), - "lower_bounds": polars.Series( - config.variables.lower_bounds, dtype=polars.Float64 - ), - "upper_bounds": polars.Series( - config.variables.upper_bounds, dtype=polars.Float64 - ), - } - ) - - self.data.objective_functions = polars.DataFrame( - { - "objective_name": config.objectives.names, - "weight": polars.Series( - config.objectives.weights, dtype=polars.Float64 - ), - "normalization": polars.Series( # Q: Is this correct? - [1.0 / s for s in config.objectives.scales], - dtype=polars.Float64, - ), - } - ) - - if config.nonlinear_constraints is not None: - self.data.nonlinear_constraints = polars.DataFrame( - { - "constraint_name": config.nonlinear_constraints.names, - "normalization": [ - 1.0 / s for s in config.nonlinear_constraints.scales - ], # Q: Is this correct? - "constraint_rhs_value": config.nonlinear_constraints.rhs_values, - "constraint_type": config.nonlinear_constraints.types, - } - ) - - self.data.realization_weights = polars.DataFrame( - { - "realization": polars.Series( - config.realizations.names, dtype=polars.UInt32 - ), - "weight": polars.Series( - config.realizations.weights, dtype=polars.Float64 - ), - } - ) - - def _store_function_results(self, results: FunctionResults) -> _EvaluationResults: - # We could select only objective values, - # but we select all to also get the constraint values (if they exist) - realization_objectives = polars.from_pandas( - results.to_dataframe( - "evaluations", - select=["objectives", "evaluation_ids"], - ).reset_index(), - ).select( - "batch_id", - "realization", - "objective", - "objectives", - "evaluation_ids", - ) - - if results.nonlinear_constraints is not None: - realization_constraints = polars.from_pandas( - results.to_dataframe( - "evaluations", - select=["constraints", "evaluation_ids"], - ).reset_index(), - ).select( - "batch_id", - "realization", - "evaluation_ids", - "nonlinear_constraint", - "constraints", - ) - - realization_constraints = self._rename_ropt_df_columns( - realization_constraints - ) - - batch_constraints = polars.from_pandas( - results.to_dataframe("nonlinear_constraints").reset_index() - ).select("batch_id", "nonlinear_constraint", "values", "violations") - - batch_constraints = batch_constraints.rename( - { - "nonlinear_constraint": "constraint_name", - "values": "constraint_value", - "violations": "constraint_violation", - } - ) - - constraint_names = batch_constraints["constraint_name"].unique().to_list() - - batch_constraints = batch_constraints.pivot( - on="constraint_name", - values=[ - "constraint_value", - "constraint_violation", - ], - separator=";", - ).rename( - { - **{f"constraint_value;{name}": name for name in constraint_names}, - **{ - f"constraint_violation;{name}": f"{name}.violation" - for name in constraint_names - }, - } - ) - - realization_constraints = realization_constraints.pivot( - values=["constraint_value"], on="constraint_name" - ) - else: - batch_constraints = None - realization_constraints = None - - batch_objectives = polars.from_pandas( - results.to_dataframe( - "functions", - select=["objectives", "weighted_objective"], - ).reset_index() - ).select("batch_id", "objective", "objectives", "weighted_objective") - - realization_controls = polars.from_pandas( - results.to_dataframe( - "evaluations", select=["variables", "evaluation_ids"] - ).reset_index() - ).select( - "batch_id", - "variable", - "realization", - "variables", - "evaluation_ids", - ) - - realization_controls = self._rename_ropt_df_columns(realization_controls) - realization_controls = self._enforce_dtypes(realization_controls) - - realization_controls = realization_controls.pivot( - on="control_name", - values=["control_value"], - separator=":", - ) - - batch_objectives = self._rename_ropt_df_columns(batch_objectives) - batch_objectives = self._enforce_dtypes(batch_objectives) - - realization_objectives = self._rename_ropt_df_columns(realization_objectives) - realization_objectives = self._enforce_dtypes(realization_objectives) - - batch_objectives = batch_objectives.pivot( - on="objective_name", - values=["objective_value"], - separator=":", - ) - - realization_objectives = realization_objectives.pivot( - values="objective_value", - index=[ - "batch_id", - "realization", - "simulation_id", - ], - columns="objective_name", - ) - - return { - "realization_controls": realization_controls, - "batch_objectives": batch_objectives, - "realization_objectives": realization_objectives, - "batch_constraints": batch_constraints, - "realization_constraints": realization_constraints, - } - - def _store_gradient_results(self, results: GradientResults) -> _GradientResults: - perturbation_objectives = polars.from_pandas( - results.to_dataframe("evaluations").reset_index() - ).select( - [ - "batch_id", - "variable", - "realization", - "perturbation", - "objective", - "variables", - "perturbed_variables", - "perturbed_objectives", - "perturbed_evaluation_ids", - *( - ["nonlinear_constraint", "perturbed_constraints"] - if results.evaluations.perturbed_constraints is not None - else [] - ), - ] - ) - perturbation_objectives = self._rename_ropt_df_columns(perturbation_objectives) - - if results.gradients is not None: - batch_objective_gradient = polars.from_pandas( - results.to_dataframe("gradients").reset_index() - ).select( - [ - "batch_id", - "variable", - "objective", - "weighted_objective", - "objectives", - *( - ["nonlinear_constraint", "constraints"] - if results.gradients.constraints is not None - else [] - ), - ] - ) - batch_objective_gradient = self._rename_ropt_df_columns( - batch_objective_gradient - ) - batch_objective_gradient = self._enforce_dtypes(batch_objective_gradient) - else: - batch_objective_gradient = None - - if results.evaluations.perturbed_constraints is not None: - perturbation_constraints = ( - perturbation_objectives[ - "batch_id", - "realization", - "perturbation", - "control_name", - "perturbed_control_value", - *[ - c - for c in perturbation_objectives.columns - if "constraint" in c.lower() - ], - ] - .pivot(on="constraint_name", values=["perturbed_constraint_value"]) - .pivot(on="control_name", values="perturbed_control_value") - ) - - if batch_objective_gradient is not None: - batch_constraint_gradient = batch_objective_gradient[ - "batch_id", - "control_name", - *[ - c - for c in batch_objective_gradient.columns - if "constraint" in c.lower() - ], - ] - - batch_objective_gradient = batch_objective_gradient.drop( - [ - c - for c in batch_objective_gradient.columns - if "constraint" in c.lower() - ] - ).unique() - - batch_constraint_gradient = batch_constraint_gradient.pivot( - on="constraint_name", - values=["constraint_value"], - ) - else: - batch_constraint_gradient = None - - perturbation_objectives = perturbation_objectives.drop( - [ - c - for c in perturbation_objectives.columns - if "constraint" in c.lower() - ] - ).unique() - else: - batch_constraint_gradient = None - perturbation_constraints = None - - perturbation_objectives = perturbation_objectives.drop( - "perturbed_evaluation_ids", "control_value" - ) - - perturbation_objectives = perturbation_objectives.pivot( - on="objective_name", values="perturbed_objective_value" - ) - - perturbation_objectives = perturbation_objectives.pivot( - on="control_name", values="perturbed_control_value" - ) - - if batch_objective_gradient is not None: - objective_names = ( - batch_objective_gradient["objective_name"].unique().to_list() - ) - batch_objective_gradient = batch_objective_gradient.pivot( - on="objective_name", - values=["objective_value", "total_objective_value"], - separator=";", - ).rename( - { - **{f"objective_value;{name}": name for name in objective_names}, - **{ - f"total_objective_value;{name}": f"{name}.total" - for name in objective_names - }, - } - ) - - return { - "batch_objective_gradient": batch_objective_gradient, - "perturbation_objectives": perturbation_objectives, - "batch_constraint_gradient": batch_constraint_gradient, - "perturbation_constraints": perturbation_constraints, - } - - def _on_batch_evaluation_finished(self, event: Event) -> None: - logger.debug("Storing batch results dataframes") - - converted_results = tuple( - convert_to_maximize(result) for result in event.results - ) - results: list[FunctionResults | GradientResults] = [] - - best_value = -np.inf - best_results = None - for item in converted_results: - if isinstance(item, GradientResults): - results.append(item) - if ( - isinstance(item, FunctionResults) - and item.functions is not None - and item.functions.weighted_objective > best_value - ): - best_value = item.functions.weighted_objective - best_results = item - - if best_results is not None: - results = [best_results, *results] - - batch_dicts = {} - for item in results: - if item.batch_id not in batch_dicts: - batch_dicts[item.batch_id] = {} - - if isinstance(item, FunctionResults): - eval_results = self._store_function_results(item) - batch_dicts[item.batch_id].update(eval_results) - - if isinstance(item, GradientResults): - gradient_results = self._store_gradient_results(item) - batch_dicts[item.batch_id].update(gradient_results) - - for batch_id, batch_dict in batch_dicts.items(): - self.data.batches.append( - BatchDataFrames( - batch_id=batch_id, - realization_controls=batch_dict.get("realization_controls"), - batch_objectives=batch_dict.get("batch_objectives"), - realization_objectives=batch_dict.get("realization_objectives"), - batch_constraints=batch_dict.get("batch_constraints"), - realization_constraints=batch_dict.get("realization_constraints"), - batch_objective_gradient=batch_dict.get("batch_objective_gradient"), - perturbation_objectives=batch_dict.get("perturbation_objectives"), - batch_constraint_gradient=batch_dict.get( - "batch_constraint_gradient" - ), - perturbation_constraints=batch_dict.get("perturbation_constraints"), - ) - ) - - def _on_optimization_finished(self, _) -> None: - logger.debug("Storing final results Everest storage") - - merit_values = self._get_merit_values() - if merit_values: - # NOTE: Batch 0 is always an "accepted batch", and "accepted batches" are - # batches with merit_flag , which means that it was an improvement - self.data.batches[0].is_improvement = True - - improvement_batches = [ - b for b in self.data.batches if b.batch_objectives is not None - ][1:] - for i, b in enumerate(improvement_batches): - merit_value = next( - (m.value for m in merit_values if (m.iter - 1) == i), None - ) - if merit_value is None: - continue - - b.batch_objectives = b.batch_objectives.with_columns( - polars.lit(merit_value).alias("merit_value") - ) - b.is_improvement = True - else: - max_total_objective = -np.inf - for b in self.data.batches: - if b.batch_objectives is not None: - total_objective = b.batch_objectives["total_objective_value"].item() - if total_objective > max_total_objective: - b.is_improvement = True - max_total_objective = total_objective - - self.write_to_output_dir() - - def get_optimal_result(self) -> OptimalResult | None: - # Only used in tests, but re-created to ensure - # same behavior as w/ old SEBA setup - has_merit = any( - "merit_value" in b.batch_objectives.columns - for b in self.data.batches - if b.batch_objectives is not None - ) - - def find_best_batch( - filter_by, sort_by - ) -> tuple[BatchDataFrames | None, dict | None]: - matching_batches = [b for b in self.data.batches if filter_by(b)] - - if not matching_batches: - return None, None - - matching_batches.sort(key=sort_by) - batch = matching_batches[0] - controls_dict = batch.realization_controls.drop( - [ - "batch_id", - "simulation_id", - "realization", - ] - ).to_dicts()[0] - - return batch, controls_dict - - if has_merit: - # Minimize merit - batch, controls_dict = find_best_batch( - filter_by=lambda b: ( - b.batch_objectives is not None - and "merit_value" in b.batch_objectives.columns - ), - sort_by=lambda b: b.batch_objectives.select( - polars.col("merit_value").min() - ).item(), - ) - - if batch is None: - return None - - return OptimalResult( - batch=batch.batch_id, - controls=controls_dict, - total_objective=batch.batch_objectives.select( - polars.col("total_objective_value").sample(n=1) - ).item(), - ) - else: - # Maximize objective - batch, controls_dict = find_best_batch( - filter_by=lambda b: b.batch_objectives is not None - and not b.batch_objectives.is_empty(), - sort_by=lambda b: -b.batch_objectives.select( - polars.col("total_objective_value").sample(n=1) - ).item(), - ) - - if batch is None: - return None - - return OptimalResult( - batch=batch.batch_id, - controls=controls_dict, - total_objective=batch.batch_objectives.select( - polars.col("total_objective_value") - ).item(), - ) - - def _get_merit_values(self) -> list[_MeritValue]: - # Read the file containing merit information. - # The file should contain the following table header - # Iter F(x) mu alpha Merit feval btracks Penalty - # :return: merit values indexed by the function evaluation number - - def _get_merit_fn_lines(merit_path: str) -> list[str]: - if os.path.isfile(merit_path): - with open(merit_path, errors="replace", encoding="utf-8") as reader: - lines = reader.readlines() - start_line_idx = -1 - for inx, line in enumerate(lines): - if "Merit" in line and "feval" in line: - start_line_idx = inx + 1 - if start_line_idx > -1 and line.startswith("="): - return lines[start_line_idx:inx] - if start_line_idx > -1: - return lines[start_line_idx:] - return [] - - def _parse_merit_line(merit_values_string: str) -> tuple[int, float] | None: - values = [] - for merit_elem in merit_values_string.split(): - try: - values.append(float(merit_elem)) - except ValueError: - for elem in merit_elem.split("0x")[1:]: - values.append(float.fromhex("0x" + elem)) - if len(values) == 8: - # Dakota starts counting at one, correct to be zero-based. - return int(values[5]) - 1, values[4] - return None - - merit_values = [] - if self._merit_file.exists(): - for line in _get_merit_fn_lines(self._merit_file): - value = _parse_merit_line(line) - if value is not None: - merit_values.append(_MeritValue(iter=value[0], value=value[1])) - - return merit_values diff --git a/src/everest/export.py b/src/everest/export.py index e69de29bb2d..318a2cef54c 100644 --- a/src/everest/export.py +++ b/src/everest/export.py @@ -0,0 +1,365 @@ +import os +import re +from enum import StrEnum +from typing import Any + +import pandas as pd +from pandas import DataFrame +from seba_sqlite.snapshot import SebaSnapshot + +from ert.storage import open_storage +from everest.config import ExportConfig +from everest.strings import STORAGE_DIR + + +class MetaDataColumnNames(StrEnum): + # NOTE: Always add a new column name to the list below! + BATCH = "batch" + REALIZATION = "realization" + REALIZATION_WEIGHT = "realization_weight" + SIMULATION = "simulation" + IS_GRADIENT = "is_gradient" + SUCCESS = "success" + START_TIME = "start_time" + END_TIME = "end_time" + SIM_AVERAGED_OBJECTIVE = "sim_avg_obj" + REAL_AVERAGED_OBJECTIVE = "real_avg_obj" + SIMULATED_DATE = "sim_date" + INCREASED_MERIT = "increased_merit" + + @classmethod + def get_all(cls): + return [ + cls.BATCH, + cls.REALIZATION, + cls.REALIZATION_WEIGHT, + cls.SIMULATION, + cls.IS_GRADIENT, + cls.SUCCESS, + cls.START_TIME, + cls.END_TIME, + cls.SIM_AVERAGED_OBJECTIVE, + cls.REAL_AVERAGED_OBJECTIVE, + cls.SIMULATED_DATE, + cls.INCREASED_MERIT, + ] + + +def filter_data(data: DataFrame, keyword_filters: set[str]): + filtered_columns = [] + + for col in data.columns: + for expr in keyword_filters: + expr = expr.replace("*", ".*") + if re.match(expr, col) is not None: + filtered_columns.append(col) + + return data[filtered_columns] + + +def available_batches(optimization_output_dir: str) -> set[int]: + snapshot = SebaSnapshot(optimization_output_dir).get_snapshot( + filter_out_gradient=False, batches=None + ) + return {data.batch for data in snapshot.simulation_data} + + +def export_metadata(config: ExportConfig | None, optimization_output_dir: str): + discard_gradient = True + discard_rejected = True + batches = None + + if config: + if config.discard_gradient is not None: + discard_gradient = config.discard_gradient + + if config.discard_rejected is not None: + discard_rejected = config.discard_rejected + + if config.batches: + # If user defined batches to export in the conf file, ignore previously + # discard gradient and discard rejected flags if defined and true + discard_rejected = False + discard_gradient = False + batches = config.batches + + snapshot = SebaSnapshot(optimization_output_dir).get_snapshot( + filter_out_gradient=discard_gradient, + batches=batches, + ) + + opt_data = snapshot.optimization_data_by_batch + metadata = [] + for data in snapshot.simulation_data: + # If export section not defined in the config file export only increased + # merit non-gradient simulation results + if ( + discard_rejected + and data.batch in opt_data + and opt_data[data.batch].merit_flag != 1 + ): + continue + + md_row: dict[str, Any] = { + MetaDataColumnNames.BATCH: data.batch, + MetaDataColumnNames.SIM_AVERAGED_OBJECTIVE: data.sim_avg_obj, + MetaDataColumnNames.IS_GRADIENT: data.is_gradient, + MetaDataColumnNames.REALIZATION: int(data.realization), + MetaDataColumnNames.START_TIME: data.start_time, + MetaDataColumnNames.END_TIME: data.end_time, + MetaDataColumnNames.SUCCESS: data.success, + MetaDataColumnNames.REALIZATION_WEIGHT: data.realization_weight, + MetaDataColumnNames.SIMULATION: int(data.simulation), + } + if data.objectives: + md_row.update(data.objectives) + if data.constraints: + md_row.update(data.constraints) + if data.controls: + md_row.update(data.controls) + + if not md_row[MetaDataColumnNames.IS_GRADIENT]: + if md_row[MetaDataColumnNames.BATCH] in opt_data: + opt = opt_data[md_row[MetaDataColumnNames.BATCH]] + md_row.update( + { + MetaDataColumnNames.REAL_AVERAGED_OBJECTIVE: opt.objective_value, + MetaDataColumnNames.INCREASED_MERIT: opt.merit_flag, + } + ) + for function, gradients in opt.gradient_info.items(): + for control, gradient_value in gradients.items(): + md_row.update( + {f"gradient-{function}-{control}": gradient_value} + ) + else: + print( + f"Batch {md_row[MetaDataColumnNames.BATCH]} has no available optimization data" + ) + metadata.append(md_row) + + return metadata + + +def get_internalized_keys( + config: ExportConfig, + storage_path: str, + optimization_output_path: str, + batch_ids: set[int] | None = None, +): + if batch_ids is None: + metadata = export_metadata(config, optimization_output_path) + batch_ids = {data[MetaDataColumnNames.BATCH] for data in metadata} + internal_keys: set = set() + with open_storage(storage_path, "r") as storage: + for batch_id in batch_ids: + experiments = [*storage.experiments] + assert len(experiments) == 1 + experiment = experiments[0] + + ensemble = experiment.get_ensemble_by_name(f"batch_{batch_id}") + if not internal_keys: + internal_keys = set( + ensemble.experiment.response_type_to_response_keys["summary"] + ) + else: + internal_keys = internal_keys.intersection( + set(ensemble.experiment.response_type_to_response_keys["summary"]) + ) + + return internal_keys + + +def check_for_errors( + config: ExportConfig, + optimization_output_path: str, + storage_path: str, + data_file_path: str | None, +): + """ + Checks for possible errors when attempting to export current optimization + case. + """ + export_ecl = True + export_errors: list[str] = [] + + if config.batches: + available_batches_ = available_batches(optimization_output_path) + for batch in set(config.batches).difference(available_batches_): + export_errors.append( + f"Batch {batch} not found in optimization " + "results. Skipping for current export." + ) + config.batches = list(set(config.batches).intersection(available_batches_)) + + if config.batches == []: + export_errors.append( + "No batches selected for export. Only optimization data will be exported." + ) + return export_errors, False + + if not data_file_path: + export_ecl = False + export_errors.append( + "No data file found in config.Only optimization data will be exported." + ) + + # If no user defined keywords are present it is no longer possible to check + # availability in internal storage + if config.keywords is None: + return export_errors, export_ecl + + if not config.keywords: + export_ecl = False + export_errors.append( + "No eclipse keywords selected for export. Only" + " optimization data will be exported." + ) + + internal_keys = get_internalized_keys( + config=config, + storage_path=storage_path, + optimization_output_path=optimization_output_path, + batch_ids=set(config.batches) if config.batches else None, + ) + + extra_keys = set(config.keywords).difference(set(internal_keys)) + if extra_keys: + export_ecl = False + export_errors.append( + f"Non-internalized ecl keys selected for export '{' '.join(extra_keys)}'." + " in order to internalize missing keywords " + f"run 'everest load '. " + "Only optimization data will be exported." + ) + + return export_errors, export_ecl + + +def export_data( + export_config: ExportConfig | None, + output_dir: str, + data_file: str | None, + export_ecl=True, + progress_callback=lambda _: None, +): + """Export everest data into a pandas dataframe. If the config specifies + a data_file and @export_ecl is True, simulation data is included. When + exporting simulation data, only keywords matching elements in @ecl_keywords + are exported. Note that wildcards are allowed. + + @progress_callback will be called with a number between 0 and 1 indicating + the fraction of batches that has been loaded. + """ + + ecl_keywords = None + # If user exports with a config file that has the SKIP_EXPORT + # set to true export nothing + if export_config is not None: + if export_config.skip_export or export_config.batches == []: + return pd.DataFrame([]) + + ecl_keywords = export_config.keywords + optimization_output_dir = os.path.join( + os.path.abspath(output_dir), "optimization_output" + ) + metadata = export_metadata(export_config, optimization_output_dir) + if data_file is None or not export_ecl: + return pd.DataFrame(metadata) + + data = load_simulation_data( + output_path=output_dir, + metadata=metadata, + progress_callback=progress_callback, + ) + + if ecl_keywords is not None: + keywords = tuple(ecl_keywords) + # NOTE: Some of these keywords are necessary to export successfully, + # we should not leave this to the user + keywords += tuple(pd.DataFrame(metadata).columns) + keywords += tuple(MetaDataColumnNames.get_all()) + keywords_set = set(keywords) + data = filter_data(data, keywords_set) + + return data + + +def load_simulation_data( + output_path: str, metadata: list[dict], progress_callback=lambda _: None +): + """Export simulations to a pandas DataFrame + @output_path optimization output folder path. + @metadata is a one ora a list of dictionaries. Keys from the dictionary become + columns in the resulting dataframe. The values from the dictionary are + assigned to those columns for the corresponding simulation. + If a column is defined for some simulations but not for others, the value + for that column is set to NaN for simulations without it + + For instance, assume we have 2 simulations and + tags = [ {'geoid': 0, 'sim': 'ro'}, + {'geoid': 2, 'sim': 'pi', 'best': True }, + ] + And assume exporting each of the two simulations produces 3 rows. + The resulting dataframe will be something like + geoid sim best data... + 0 0 ro sim_0_row_0... + 1 0 ro sim_0_row_1... + 2 0 ro sim_0_row_2... + 3 2 pi True sim_1_row_0... + 4 2 pi True sim_2_row_0... + 5 2 pi True sim_3_row_0... + """ + ens_path = os.path.join(output_path, STORAGE_DIR) + with open_storage(ens_path, "r") as storage: + # pylint: disable=unnecessary-lambda-assignment + def load_batch_by_id(): + experiments = [*storage.experiments] + + # Always assume 1 experiment per simulation/enspath, never multiple + assert len(experiments) == 1 + experiment = experiments[0] + + ensemble = experiment.get_ensemble_by_name(f"batch_{batch}") + realizations = ensemble.get_realization_list_with_responses() + try: + df_pl = ensemble.load_responses("summary", tuple(realizations)) + except (ValueError, KeyError): + return pd.DataFrame() + df_pl = df_pl.pivot( + on="response_key", index=["realization", "time"], sort_columns=True + ) + df_pl = df_pl.rename({"time": "Date", "realization": "Realization"}) + return ( + df_pl.to_pandas() + .set_index(["Realization", "Date"]) + .sort_values(by=["Date", "Realization"]) + ) + + batches = {elem[MetaDataColumnNames.BATCH] for elem in metadata} + batch_data = [] + for idx, batch in enumerate(batches): + progress_callback(float(idx) / len(batches)) + batch_data.append(load_batch_by_id()) + batch_data[-1][MetaDataColumnNames.BATCH] = batch + + for b in batch_data: + b.reset_index(inplace=True) + b.rename( + index=str, + inplace=True, + columns={ + "Realization": MetaDataColumnNames.SIMULATION, + "Date": MetaDataColumnNames.SIMULATED_DATE, + }, + ) + + data = pd.concat(batch_data, ignore_index=True, sort=False) + data = pd.merge( + left=data, + right=pd.DataFrame(metadata), + on=[MetaDataColumnNames.BATCH, MetaDataColumnNames.SIMULATION], + sort=False, + ) + + return data diff --git a/tests/everest/conftest.py b/tests/everest/conftest.py index d0ea5dd387d..41ef76c3e75 100644 --- a/tests/everest/conftest.py +++ b/tests/everest/conftest.py @@ -235,3 +235,5 @@ def mock_server(monkeypatch): monkeypatch.setattr(everserver, "_find_open_port", lambda *args, **kwargs: 42) monkeypatch.setattr(everserver, "_write_hostfile", MagicMock()) monkeypatch.setattr(everserver, "_everserver_thread", MagicMock()) + monkeypatch.setattr(everserver, "export_to_csv", MagicMock()) + monkeypatch.setattr(everserver, "export_with_progress", MagicMock()) diff --git a/tests/everest/entry_points/test_config_branch_entry.py b/tests/everest/entry_points/test_config_branch_entry.py index 191ea487897..c388125cbbc 100644 --- a/tests/everest/entry_points/test_config_branch_entry.py +++ b/tests/everest/entry_points/test_config_branch_entry.py @@ -2,10 +2,11 @@ from os.path import exists from pathlib import Path +from seba_sqlite.snapshot import SebaSnapshot + from everest.bin.config_branch_script import config_branch_entry from everest.config_file_loader import load_yaml from everest.config_keys import ConfigKeys as CK -from everest.everest_storage import EverestStorage def test_config_branch_entry(cached_example): @@ -27,21 +28,19 @@ def test_config_branch_entry(cached_example): assert len(new_controls) == len(old_controls) assert len(new_controls[0][CK.VARIABLES]) == len(old_controls[0][CK.VARIABLES]) - storage = EverestStorage(Path(path) / "everest_output" / "optimization_output") - storage.read_from_output_dir() + opt_controls = {} + + snapshot = SebaSnapshot(Path(path) / "everest_output" / "optimization_output") + for opt_data in snapshot._optimization_data(): + if opt_data.batch_id == 1: + opt_controls = opt_data.controls new_controls_initial_guesses = { var[CK.INITIAL_GUESS] for var in new_controls[0][CK.VARIABLES] } + opt_control_val_for_batch_id = {v for k, v in opt_controls.items()} - control_names = storage.data.controls["control_name"] - batch_1_info = next(b for b in storage.data.batches if b.batch_id == 1) - realization_control_vals_mean = batch_1_info.realization_controls.select( - *control_names - ).to_dicts()[0] - control_values = set(realization_control_vals_mean.values()) - - assert new_controls_initial_guesses == control_values + assert new_controls_initial_guesses == opt_control_val_for_batch_id def test_config_branch_preserves_config_section_order(cached_example): @@ -51,6 +50,15 @@ def test_config_branch_preserves_config_section_order(cached_example): assert exists("new_restart_config.yml") + opt_controls = {} + + snapshot = SebaSnapshot(Path(path) / "everest_output" / "optimization_output") + for opt_data in snapshot._optimization_data(): + if opt_data.batch_id == 1: + opt_controls = opt_data.controls + + opt_control_val_for_batch_id = {v for k, v in opt_controls.items()} + diff_lines = [] with ( open("config_advanced.yml", encoding="utf-8") as initial_config, @@ -72,15 +80,5 @@ def test_config_branch_preserves_config_section_order(cached_example): assert len(diff_lines) == 4 assert "-initial_guess:0.25" in diff_lines - - storage = EverestStorage(Path(path) / "everest_output" / "optimization_output") - storage.read_from_output_dir() - control_names = storage.data.controls["control_name"] - batch_1_info = next(b for b in storage.data.batches if b.batch_id == 1) - realization_control_vals_mean = batch_1_info.realization_controls.select( - *control_names - ).to_dicts()[0] - control_values = set(realization_control_vals_mean.values()) - - for control_val in control_values: + for control_val in opt_control_val_for_batch_id: assert f"+initial_guess:{control_val}" in diff_lines diff --git a/tests/everest/entry_points/test_everexport.py b/tests/everest/entry_points/test_everexport.py new file mode 100644 index 00000000000..31be878aab4 --- /dev/null +++ b/tests/everest/entry_points/test_everexport.py @@ -0,0 +1,311 @@ +import logging +import os +from pathlib import Path +from unittest.mock import patch + +import pandas as pd +import pytest + +from everest import ConfigKeys as CK +from everest import MetaDataColumnNames as MDCN +from everest.bin.everexport_script import everexport_entry +from everest.bin.utils import ProgressBar +from everest.config import EverestConfig, ExportConfig +from tests.everest.utils import ( + satisfy, + satisfy_callable, + satisfy_type, +) + +CONFIG_FILE_MINIMAL = "config_minimal.yml" + +CONFIG_FILE_MOCKED_TEST_CASE = "mocked_multi_batch.yml" + + +pytestmark = pytest.mark.xdist_group(name="starts_everest") + +TEST_DATA = pd.DataFrame( + columns=[ + MDCN.BATCH, + MDCN.SIMULATION, + MDCN.IS_GRADIENT, + MDCN.START_TIME, + ], + data=[ + [0, 0, False, 0.0], # First func evaluation on 2 realizations + [0, 1, False, 0.0], + [0, 2, True, 0.0], # First grad evaluation 2 perts per real + [0, 3, True, 0.1], + [0, 4, True, 0.1], + [0, 5, True, 0.1], + [1, 0, False, 0.3], + [1, 1, False, 0.32], + [2, 0, True, 0.5], + [2, 1, True, 0.5], + [2, 2, True, 0.5], + [2, 3, True, 0.6], + ], +) + + +def export_mock(config, export_ecl=True, progress_callback=lambda _: None): + progress_callback(1.0) + return TEST_DATA + + +def empty_mock(config, export_ecl=True, progress_callback=lambda _: None): + progress_callback(1.0) + return pd.DataFrame() + + +def validate_export_mock(**_): + return ([], True) + + +@patch("everest.bin.everexport_script.export_with_progress", side_effect=export_mock) +def test_everexport_entry_run(_, cached_example): + """Test running everexport with not flags""" + config_path, config_file, _ = cached_example("math_func/config_minimal.yml") + config = EverestConfig.load_file(Path(config_path) / config_file) + export_file_path = config.export_path + assert not os.path.isfile(export_file_path) + + everexport_entry([config_file]) + + assert os.path.isfile(export_file_path) + df = pd.read_csv(export_file_path, sep=";") + assert df.equals(TEST_DATA) + + +@patch("everest.bin.everexport_script.export_with_progress", side_effect=empty_mock) +def test_everexport_entry_empty(mocked_func, cached_example): + """Test running everexport with no data""" + # NOTE: When there is no data (ie, the optimization has not yet run) + # the current behavior is to create an empty .csv file. It is arguable + # whether that is really the desired behavior, but for now we assume + # it is and we test against that expected behavior. + config_path, config_file, _ = cached_example("math_func/config_minimal.yml") + config = EverestConfig.load_file(Path(config_path) / config_file) + export_file_path = config.export_path + assert not os.path.isfile(export_file_path) + + everexport_entry([CONFIG_FILE_MINIMAL]) + + assert os.path.isfile(export_file_path) + with open(export_file_path, encoding="utf-8") as f: + content = f.read() + assert not content.strip() + + +@patch( + "everest.bin.everexport_script.check_for_errors", + side_effect=validate_export_mock, +) +@patch("everest.bin.utils.export_data") +@pytest.mark.fails_on_macos_github_workflow +def test_everexport_entry_batches(mocked_func, validate_export_mock, cached_example): + """Test running everexport with the --batches flag""" + _, config_file, _ = cached_example("math_func/config_minimal.yml") + everexport_entry([config_file, "--batches", "0", "2"]) + + def check_export_batches(config): + batches = (config.batches if config is not None else None) or False + return set(batches) == {0, 2} + + if ProgressBar: # different calls if ProgressBar available or not + mocked_func.assert_called_once_with( + export_config=satisfy(check_export_batches), + output_dir=satisfy_type(str), + data_file=None, + export_ecl=True, + progress_callback=satisfy_callable(), + ) + else: + mocked_func.assert_called_once() + + +@patch("everest.bin.everexport_script.export_to_csv") +def test_everexport_entry_no_export(mocked_func, cached_example): + """Test running everexport on config file with skip_export flag + set to true""" + + config_path, config_file, _ = cached_example("math_func/config_minimal.yml") + config = EverestConfig.load_file(Path(config_path) / config_file) + config.export = ExportConfig(skip_export=True) + # Add export section to config file and set run_export flag to false + export_file_path = config.export_path + assert not os.path.isfile(export_file_path) + + everexport_entry([CONFIG_FILE_MINIMAL]) + # Check export to csv is called even if the skip_export entry is in the + # config file + mocked_func.assert_called_once() + + +@patch("everest.bin.everexport_script.export_to_csv") +def test_everexport_entry_empty_export(mocked_func, cached_example): + """Test running everexport on config file with empty export section""" + _, config_file, _ = cached_example("math_func/config_minimal.yml") + + # Add empty export section to config file + with open(config_file, "a", encoding="utf-8") as f: + f.write(f"{CK.EXPORT}:\n") + + everexport_entry([config_file]) + # Check export to csv is called even if export section is empty + mocked_func.assert_called_once() + + +@patch("everest.bin.utils.export_data") +@pytest.mark.fails_on_macos_github_workflow +def test_everexport_entry_no_usr_def_ecl_keys(mocked_func, cached_example): + """Test running everexport with config file containing only the + keywords label without any list of keys""" + + _, config_file, _ = cached_example( + "../../tests/everest/test_data/mocked_test_case/mocked_multi_batch.yml" + ) + + # Add export section to config file and set run_export flag to false + with open(config_file, "a", encoding="utf-8") as f: + f.write(f"{CK.EXPORT}:\n {CK.KEYWORDS}:") + + everexport_entry([config_file]) + + def condition(config): + batches = config.batches if config is not None else None + keys = config.keywords if config is not None else None + + return batches is None and keys is None + + if ProgressBar: + mocked_func.assert_called_once_with( + export_config=satisfy(condition), + output_dir=satisfy_type(str), + data_file=satisfy_type(str), + export_ecl=True, + progress_callback=satisfy_callable(), + ) + else: + mocked_func.assert_called_once() + + +@patch("everest.bin.utils.export_data") +@pytest.mark.fails_on_macos_github_workflow +def test_everexport_entry_internalized_usr_def_ecl_keys(mocked_func, cached_example): + """Test running everexport with config file containing a key in the + list of user defined ecl keywords, that has been internalized on + a previous run""" + + _, config_file, _ = cached_example( + "../../tests/everest/test_data/mocked_test_case/mocked_multi_batch.yml" + ) + user_def_keys = ["FOPT"] + + # Add export section to config file and set run_export flag to false + with open(config_file, "a", encoding="utf-8") as f: + f.write(f"{CK.EXPORT}:\n {CK.KEYWORDS}: {user_def_keys}") + + everexport_entry([config_file]) + + def condition(config): + batches = config.batches if config is not None else None + keys = config.keywords if config is not None else None + + return batches is None and keys == user_def_keys + + if ProgressBar: + mocked_func.assert_called_once_with( + export_config=satisfy(condition), + output_dir=satisfy_type(str), + data_file=satisfy_type(str), + export_ecl=True, + progress_callback=satisfy_callable(), + ) + else: + mocked_func.assert_called_once() + + +@patch("everest.bin.utils.export_data") +@pytest.mark.fails_on_macos_github_workflow +def test_everexport_entry_non_int_usr_def_ecl_keys(mocked_func, caplog, cached_example): + """Test running everexport when config file contains non internalized + ecl keys in the user defined keywords list""" + + _, config_file, _ = cached_example( + "../../tests/everest/test_data/mocked_test_case/mocked_multi_batch.yml" + ) + + non_internalized_key = "KEY" + user_def_keys = ["FOPT", non_internalized_key] + + # Add export section to config file and set run_export flag to false + with open(config_file, "a", encoding="utf-8") as f: + f.write(f"{CK.EXPORT}:\n {CK.KEYWORDS}: {user_def_keys}") + + with caplog.at_level(logging.DEBUG): + everexport_entry([config_file]) + + assert ( + f"Non-internalized ecl keys selected for export '{non_internalized_key}'" + in "\n".join(caplog.messages) + ) + + def condition(config): + batches = config.batches if config is not None else None + keys = config.keywords if config is not None else None + + return batches is None and keys == user_def_keys + + if ProgressBar: + mocked_func.assert_called_once_with( + export_config=satisfy(condition), + output_dir=satisfy_type(str), + data_file=satisfy_type(str), + export_ecl=False, + progress_callback=satisfy_callable(), + ) + else: + mocked_func.assert_called_once() + + +@patch("everest.bin.utils.export_data") +@pytest.mark.fails_on_macos_github_workflow +def test_everexport_entry_not_available_batches(mocked_func, caplog, cached_example): + """Test running everexport when config file contains non existing + batch numbers in the list of user defined batches""" + + _, config_file, _ = cached_example( + "../../tests/everest/test_data/mocked_test_case/mocked_multi_batch.yml" + ) + + na_batch = 42 + user_def_batches = [0, na_batch] + + # Add export section to config file and set run_export flag to false + with open(config_file, "a", encoding="utf-8") as f: + f.write(f"{CK.EXPORT}:\n {CK.BATCHES}: {user_def_batches}") + + with caplog.at_level(logging.DEBUG): + everexport_entry([config_file]) + + assert ( + f"Batch {na_batch} not found in optimization results." + f" Skipping for current export" in "\n".join(caplog.messages) + ) + + def condition(config): + batches = config.batches if config is not None else None + keys = config.keywords if config is not None else None + return batches == [0] and keys is None + + if ProgressBar: + mocked_func.assert_called_once_with( + export_config=satisfy(condition), + output_dir=satisfy_type(str), + data_file=satisfy_type(str), + export_ecl=True, + progress_callback=satisfy_callable(), + ) + else: + mocked_func.assert_called_once() diff --git a/tests/everest/functional/test_main_everest_entry.py b/tests/everest/functional/test_main_everest_entry.py index 6ea31c965fc..028248e0d5a 100644 --- a/tests/everest/functional/test_main_everest_entry.py +++ b/tests/everest/functional/test_main_everest_entry.py @@ -1,9 +1,9 @@ import os -from pathlib import Path from textwrap import dedent import pytest from ruamel.yaml import YAML +from seba_sqlite.snapshot import SebaSnapshot from tests.everest.utils import ( capture_streams, skipif_no_everest_models, @@ -13,7 +13,6 @@ from everest.bin.main import start_everest from everest.config import EverestConfig, ServerConfig from everest.detached import ServerStatus, everserver_status -from everest.everest_storage import EverestStorage WELL_ORDER = "everest/model/config.yml" @@ -57,15 +56,14 @@ def test_everest_entry_run(cached_example): assert status["status"] == ServerStatus.completed - storage = EverestStorage(Path(config.optimization_output_dir)) - storage.read_from_output_dir() - optimal = storage.get_optimal_result() + snapshot = SebaSnapshot(config.optimization_output_dir).get_snapshot() - assert optimal.controls["point_x"] == pytest.approx(0.5, abs=0.05) - assert optimal.controls["point_y"] == pytest.approx(0.5, abs=0.05) - assert optimal.controls["point_z"] == pytest.approx(0.5, abs=0.05) + best_settings = snapshot.optimization_data[-1] + assert best_settings.controls["point_x"] == pytest.approx(0.5, abs=0.05) + assert best_settings.controls["point_y"] == pytest.approx(0.5, abs=0.05) + assert best_settings.controls["point_z"] == pytest.approx(0.5, abs=0.05) - assert optimal.total_objective == pytest.approx(0.0, abs=0.0005) + assert best_settings.objective_value == pytest.approx(0.0, abs=0.0005) with capture_streams(): start_everest(["everest", "monitor", config_file]) @@ -96,10 +94,9 @@ def test_everest_entry_monitor_no_run(cached_example): def test_everest_main_export_entry(cached_example): # Setup command line arguments _, config_file, _ = cached_example("math_func/config_minimal.yml") - with capture_streams() as (out, _): - start_everest(["everest", "export", str(config_file)]) - - assert "Everexport is deprecated" in out.getvalue() + with capture_streams(): + start_everest(["everest", "export", config_file]) + assert os.path.exists(os.path.join("everest_output", "config_minimal.csv")) @pytest.mark.integration_test diff --git a/tests/everest/snapshots/test_api_snapshots/test_api_snapshots/config_multiobj.yml/snapshot.json b/tests/everest/snapshots/test_api_snapshots/test_api_snapshots/config_multiobj.yml/snapshot.json index c5c346ba775..2670e0de601 100644 --- a/tests/everest/snapshots/test_api_snapshots/test_api_snapshots/config_multiobj.yml/snapshot.json +++ b/tests/everest/snapshots/test_api_snapshots/test_api_snapshots/config_multiobj.yml/snapshot.json @@ -69,27 +69,27 @@ }, { "batch": 0, - "control": "point_x", - "function": "distance_q", - "value": -3.00456477 + "control": "point_y", + "function": "distance_p", + "value": 0.98866227 }, { "batch": 0, - "control": "point_y", + "control": "point_z", "function": "distance_p", - "value": 0.98866227 + "value": 1.00465235 }, { "batch": 0, - "control": "point_y", + "control": "point_x", "function": "distance_q", - "value": -3.011388 + "value": -3.00456477 }, { "batch": 0, - "control": "point_z", - "function": "distance_p", - "value": 1.00465235 + "control": "point_y", + "function": "distance_q", + "value": -3.011388 }, { "batch": 0, diff --git a/tests/everest/snapshots/test_egg_simulation/test_egg_snapshot/best_controls b/tests/everest/snapshots/test_egg_simulation/test_egg_snapshot/best_controls deleted file mode 100644 index efa2c369beb..00000000000 --- a/tests/everest/snapshots/test_egg_simulation/test_egg_snapshot/best_controls +++ /dev/null @@ -1,4 +0,0 @@ -result_id,batch_id,realization,simulation_id,well_rate_PROD1-1,well_rate_PROD2-1,well_rate_PROD3-1,well_rate_PROD4-1,well_rate_INJECT1-1,well_rate_INJECT2-1,well_rate_INJECT3-1,well_rate_INJECT4-1,well_rate_INJECT5-1,well_rate_INJECT6-1,well_rate_INJECT7-1,well_rate_INJECT8-1 -2,1,0,0,0.1999058451019366,0.6000415374802492,0.19994822906594492,0.5999121404696927,0.5999333812347782,0.5998249609455343,0.5999478780872213,0.6000766587484829,0.5997483050379139,0.5997645538239165,0.5999464128453291,0.6001158013106983 -2,1,1,1,0.1999058451019366,0.6000415374802492,0.19994822906594492,0.5999121404696927,0.5999333812347782,0.5998249609455343,0.5999478780872213,0.6000766587484829,0.5997483050379139,0.5997645538239165,0.5999464128453291,0.6001158013106983 -2,1,2,2,0.1999058451019366,0.6000415374802492,0.19994822906594492,0.5999121404696927,0.5999333812347782,0.5998249609455343,0.5999478780872213,0.6000766587484829,0.5997483050379139,0.5997645538239165,0.5999464128453291,0.6001158013106983 diff --git a/tests/everest/snapshots/test_egg_simulation/test_egg_snapshot/best_objective_gradients_csv b/tests/everest/snapshots/test_egg_simulation/test_egg_snapshot/best_objective_gradients_csv deleted file mode 100644 index c3c9b602ce8..00000000000 --- a/tests/everest/snapshots/test_egg_simulation/test_egg_snapshot/best_objective_gradients_csv +++ /dev/null @@ -1,13 +0,0 @@ -result_id,batch_id,control_name,rf,rf.total -3,1,well_rate_PROD1-1,-0.0001652263441330399,-0.0001652263441330399 -3,1,well_rate_PROD2-1,-0.00017471993459638647,-0.00017471993459638647 -3,1,well_rate_PROD3-1,-0.0006246697489175857,-0.0006246697489175857 -3,1,well_rate_PROD4-1,-0.00041696791141070507,-0.00041696791141070507 -3,1,well_rate_INJECT1-1,0.00014572715946017988,0.00014572715946017988 -3,1,well_rate_INJECT2-1,0.000020160145178916144,0.000020160145178916144 -3,1,well_rate_INJECT3-1,0.00011211576845746024,0.00011211576845746024 -3,1,well_rate_INJECT4-1,-0.00039013326453636804,-0.00039013326453636804 -3,1,well_rate_INJECT5-1,0.000015273711834717528,0.000015273711834717528 -3,1,well_rate_INJECT6-1,0.00022688941880139667,0.00022688941880139667 -3,1,well_rate_INJECT7-1,-0.000017986172009426353,-0.000017986172009426353 -3,1,well_rate_INJECT8-1,-0.00007249824942858863,-0.00007249824942858863 diff --git a/tests/everest/snapshots/test_egg_simulation/test_egg_snapshot/best_objectives_csv b/tests/everest/snapshots/test_egg_simulation/test_egg_snapshot/best_objectives_csv deleted file mode 100644 index 8be2ccb0adb..00000000000 --- a/tests/everest/snapshots/test_egg_simulation/test_egg_snapshot/best_objectives_csv +++ /dev/null @@ -1,4 +0,0 @@ -result_id,batch_id,realization,perturbation,rf,well_rate_PROD1-1,well_rate_PROD2-1,well_rate_PROD3-1,well_rate_PROD4-1,well_rate_INJECT1-1,well_rate_INJECT2-1,well_rate_INJECT3-1,well_rate_INJECT4-1,well_rate_INJECT5-1,well_rate_INJECT6-1,well_rate_INJECT7-1,well_rate_INJECT8-1 -3,1,0,0,0.17690399289131165,0.21284489964814637,0.6909564325624082,0.38431070691176333,0.8356643180772911,0.521629534106952,0.593979014614453,0.6040659148639875,0.7933463643524403,0.5993493412129899,0.5212494525431188,0.6070894345108306,0.6519904009594275 -3,1,1,0,0.175477996468544,0.298444266061598,0.7489971930595409,0.012799873895175784,0.6900597174834636,0.48925234184495137,0.542695688308217,0.6807768824317812,0.7317670260000283,0.6644455051643829,0.5000562156945711,0.6616537223014831,0.6921270651437204 -3,1,2,0,0.17582400143146515,0.43332587738243267,0.6304305725596401,0.254408502582991,0.4256930350912209,0.5920584096483588,0.5533444899078019,0.5039440678184369,0.5579961940071504,0.6274756974975367,0.49923076114351883,0.6427740436133934,0.5885310395649261 diff --git a/tests/everest/test_detached.py b/tests/everest/test_detached.py index f0351626d59..9f828dd57da 100644 --- a/tests/everest/test_detached.py +++ b/tests/everest/test_detached.py @@ -3,7 +3,6 @@ from pathlib import Path from unittest.mock import MagicMock, patch -import numpy as np import pytest import requests @@ -26,7 +25,6 @@ PROXY, ServerStatus, everserver_status, - get_opt_status, server_is_running, start_server, stop_server, @@ -339,79 +337,3 @@ async def server_running(): driver = await start_server(everest_config, debug=True) final_state = await server_running() assert final_state.returncode == 0 - - -def test_get_opt_status(cached_example): - _, config_file, _ = cached_example("math_func/config_multiobj.yml") - config = EverestConfig.load_file(config_file) - - opts = get_opt_status(config.optimization_output_dir) - - assert np.allclose( - opts["objective_history"], [-2.3333, -2.3335, -2.0000], atol=1e-4 - ) - - assert np.allclose( - opts["control_history"]["point_x"], - [0.0, -0.004202181916184627, -0.0021007888698514315], - atol=1e-4, - ) - assert np.allclose( - opts["control_history"]["point_y"], - [0.0, -0.011298196942730383, -0.0056482862617779715], - atol=1e-4, - ) - assert np.allclose( - opts["control_history"]["point_z"], [0.0, 1.0, 0.4999281115746754], atol=1e-4 - ) - - assert np.allclose( - opts["objectives_history"]["distance_p"], - [-0.75, -0.7656459808349609, -0.5077850222587585], - atol=1e-4, - ) - assert np.allclose( - opts["objectives_history"]["distance_q"], - [-4.75, -4.703639984130859, -4.476789951324463], - atol=1e-4, - ) - - assert opts["accepted_control_indices"] == [0, 2] - - cmond = opts["cli_monitor_data"] - - assert cmond["batches"] == [0, 1, 2] - assert cmond["controls"][0]["point_x"] == 0.0 - assert cmond["controls"][0]["point_y"] == 0.0 - assert cmond["controls"][0]["point_z"] == 0.0 - - assert np.allclose( - cmond["controls"][1]["point_x"], -0.004202181916184627, atol=1e-4 - ) - assert np.allclose( - cmond["controls"][1]["point_y"], -0.011298196942730383, atol=1e-4 - ) - assert np.allclose(cmond["controls"][1]["point_z"], 1.0, atol=1e-4) - assert np.allclose( - cmond["controls"][2]["point_x"], -0.0021007888698514315, atol=1e-4 - ) - assert np.allclose( - cmond["controls"][2]["point_y"], -0.0056482862617779715, atol=1e-4 - ) - assert np.allclose(cmond["controls"][2]["point_z"], 0.4999281115746754, atol=1e-4) - - assert np.allclose( - cmond["objective_value"], - [-2.333333333333333, -2.333525975545247, -2.000048339366913], - atol=1e-4, - ) - assert np.allclose( - cmond["expected_objectives"]["distance_p"], - [-0.75, -0.7656459808349609, -0.5077850222587585], - atol=1e-4, - ) - assert np.allclose( - cmond["expected_objectives"]["distance_q"], - [-4.75, -4.703639984130859, -4.476789951324463], - atol=1e-4, - ) diff --git a/tests/everest/test_egg_simulation.py b/tests/everest/test_egg_simulation.py index 4e5c5672d59..ce55463f374 100644 --- a/tests/everest/test_egg_simulation.py +++ b/tests/everest/test_egg_simulation.py @@ -1,8 +1,7 @@ -import io import json import os +import sys -import polars import pytest from ert.config import ErtConfig @@ -12,6 +11,7 @@ from everest.config import EverestConfig from everest.config.export_config import ExportConfig from everest.config_keys import ConfigKeys +from everest.export import export_data from everest.simulator.everest_to_ert import _everest_to_ert_config_dict from tests.everest.utils import ( everest_default_jobs, @@ -708,23 +708,15 @@ def sweetcallbackofmine(self, *args, **kwargs): run_model.run_experiment(evaluator_server_config) assert cbtracker.called - best_batch = [b for b in run_model.ever_storage.data.batches if b.is_improvement][ - -1 - ] - - def _df_to_string(df: polars.DataFrame): - strbuf = io.StringIO() - schema = df.schema - df.with_columns( - polars.col(c) for c in df.columns if schema[c] == polars.Float32 - ).write_csv(strbuf) - - return strbuf.getvalue() - best_objectives_csv = _df_to_string(best_batch.perturbation_objectives) - best_objective_gradients_csv = _df_to_string(best_batch.batch_objective_gradient) - best_controls = _df_to_string(best_batch.realization_controls) - - snapshot.assert_match(best_controls, "best_controls") - snapshot.assert_match(best_objectives_csv, "best_objectives_csv") - snapshot.assert_match(best_objective_gradients_csv, "best_objective_gradients_csv") + data = export_data( + export_config=config.export, + output_dir=config.output_dir, + data_file=config.model.data_file if config.model else None, + ) + snapshot.assert_match( + data.drop(columns=["TCPUDAY", "start_time", "end_time"], axis=1) + .round(6) + .to_csv(), + f"egg-py{sys.version_info.major}{sys.version_info.minor}.csv", + ) diff --git a/tests/everest/test_everlint.py b/tests/everest/test_everlint.py index 2c79dba3915..4aa2bb4c729 100644 --- a/tests/everest/test_everlint.py +++ b/tests/everest/test_everlint.py @@ -202,6 +202,23 @@ def test_bool_validation(value, valid, min_config, tmp_path, monkeypatch): EverestConfig(**min_config) +@pytest.mark.parametrize( + "path_val, valid", [("my_file", True), ("my_folder/", False), ("my_folder", False)] +) +def test_export_filepath_validation(min_config, tmp_path, monkeypatch, path_val, valid): + monkeypatch.chdir(tmp_path) + Path("my_file").touch() + Path("my_folder").mkdir() + min_config["export"] = {"csv_output_filepath": path_val} + expectation = ( + does_not_raise() + if valid + else pytest.raises(ValidationError, match="Invalid type") + ) + with expectation: + EverestConfig(**min_config) + + def test_invalid_wells(tmp_path, monkeypatch): monkeypatch.chdir(tmp_path) Path("my_file").touch() diff --git a/tests/everest/test_everserver.py b/tests/everest/test_everserver.py index 07b5de64fb0..4075e3fbc41 100644 --- a/tests/everest/test_everserver.py +++ b/tests/everest/test_everserver.py @@ -5,11 +5,12 @@ from pathlib import Path from unittest.mock import patch +from seba_sqlite.snapshot import SebaSnapshot + from ert.run_models.everest_run_model import EverestExitCode from everest.config import EverestConfig, OptimizationConfig, ServerConfig from everest.detached import ServerStatus, everserver_status from everest.detached.jobs import everserver -from everest.everest_storage import EverestStorage from everest.simulator import JOB_FAILURE, JOB_SUCCESS from everest.strings import OPT_FAILURE_REALIZATIONS, SIM_PROGRESS_ENDPOINT @@ -214,11 +215,12 @@ def test_everserver_status_max_batch_num( # The server should complete without error. assert status["status"] == ServerStatus.completed - storage = EverestStorage(Path(config.optimization_output_dir)) - storage.read_from_output_dir() # Check that there is only one batch. - assert {b.batch_id for b in storage.data.batches} == {0} + snapshot = SebaSnapshot(config.optimization_output_dir).get_snapshot( + filter_out_gradient=False, batches=None + ) + assert {data.batch for data in snapshot.simulation_data} == {0} @patch("sys.argv", ["name", "--config-file", "config_minimal.yml"]) diff --git a/tests/everest/test_export.py b/tests/everest/test_export.py new file mode 100644 index 00000000000..4686748df4a --- /dev/null +++ b/tests/everest/test_export.py @@ -0,0 +1,353 @@ +import os +import shutil +from pathlib import Path + +import pandas as pd +import pytest + +from everest import filter_data +from everest.bin.utils import export_with_progress +from everest.config import EverestConfig +from everest.config.export_config import ExportConfig +from everest.export import check_for_errors, export_data + +CONFIG_FILE = "config_multiobj.yml" +DATA = pd.DataFrame( + { + "WOPT:WELL0": range(4), + "MONKEY": 4 * [0], + "WCON:WELL1": 4 * [14], + "GOPT:GROUP0": [5, 6, 2, 1], + "WOPT:WELL1": range(4), + } +) + +pytestmark = pytest.mark.xdist_group(name="starts_everest") + + +def assertEqualDataFrames(x, y): + assert set(x.columns) == set(y.columns) + for col in x.columns: + assert list(x[col]) == list(y[col]) + + +def test_filter_no_wildcard(): + keywords = ["MONKEY", "Dr. MONKEY", "WOPT:WELL1"] + assertEqualDataFrames(DATA[["MONKEY", "WOPT:WELL1"]], filter_data(DATA, keywords)) + + +def test_filter_leading_wildcard(): + keywords = ["*:WELL1"] + assertEqualDataFrames( + DATA[["WCON:WELL1", "WOPT:WELL1"]], filter_data(DATA, keywords) + ) + + +def test_filter_trailing_wildcard(): + keywords = ["WOPT:*", "MONKEY"] + assertEqualDataFrames( + DATA[["MONKEY", "WOPT:WELL0", "WOPT:WELL1"]], + filter_data(DATA, keywords), + ) + + +def test_filter_double_wildcard(): + keywords = ["*OPT:*0"] + assertEqualDataFrames( + DATA[["WOPT:WELL0", "GOPT:GROUP0"]], filter_data(DATA, keywords) + ) + + +def test_export_only_non_gradient_with_increased_merit(cached_example, snapshot): + config_path, config_file, _ = cached_example("math_func/config_multiobj.yml") + config = EverestConfig.load_file(Path(config_path) / config_file) + # Default export functionality when no export section is defined + df = export_data( + export_config=config.export, + output_dir=config.output_dir, + data_file=config.model.data_file if config.model else None, + ) + + # Test that the default export functionality generated data frame + # contains only non gradient simulations + snapshot.assert_match( + df.drop(["start_time", "end_time"], axis=1).round(4).to_csv(), "export.csv" + ) + + +def test_export_only_non_gradient(cached_example, snapshot): + config_path, config_file, _ = cached_example("math_func/config_multiobj.yml") + config = EverestConfig.load_file(Path(config_path) / config_file) + + # Add export section to config + config.export = ExportConfig(discard_rejected=False) + + df = export_data( + export_config=config.export, + output_dir=config.output_dir, + data_file=config.model.data_file if config.model else None, + ) + + snapshot.assert_match( + df.drop(["start_time", "end_time"], axis=1).round(4).to_csv(), "export.csv" + ) + + +def test_export_only_increased_merit(cached_example, snapshot): + config_path, config_file, _ = cached_example("math_func/config_multiobj.yml") + config = EverestConfig.load_file(Path(config_path) / config_file) + + # Add export section to config + config.export = ExportConfig(discard_gradient=False) + + df = export_data( + export_config=config.export, + output_dir=config.output_dir, + data_file=config.model.data_file if config.model else None, + ) + + snapshot.assert_match( + df.drop(["start_time", "end_time"], axis=1).round(4).to_csv(), + "export.csv", + ) + + +def test_export_all_batches(cached_example, snapshot): + config_path, config_file, _ = cached_example("math_func/config_multiobj.yml") + config = EverestConfig.load_file(Path(config_path) / config_file) + + # Add export section to config + config.export = ExportConfig(discard_gradient=False, discard_rejected=False) + + df = export_data( + export_config=config.export, + output_dir=config.output_dir, + data_file=config.model.data_file if config.model else None, + ) + + snapshot.assert_match( + df.drop(["start_time", "end_time"], axis=1).round(4).to_csv(), "export.csv" + ) + + +def test_export_only_give_batches(cached_example, snapshot): + config_path, config_file, _ = cached_example("math_func/config_multiobj.yml") + config = EverestConfig.load_file(Path(config_path) / config_file) + + # Add export section to config + config.export = ExportConfig(discard_gradient=True, batches=[2]) + + df = export_data( + export_config=config.export, + output_dir=config.output_dir, + data_file=config.model.data_file if config.model else None, + ) + + snapshot.assert_match( + df.drop(["start_time", "end_time"], axis=1).round(4).to_csv(), "export.csv" + ) + + +def test_export_batches_progress(cached_example, snapshot): + config_path, config_file, _ = cached_example("math_func/config_multiobj.yml") + config = EverestConfig.load_file(Path(config_path) / config_file) + + # Add export section to config + config.export = ExportConfig(discard_gradient=True, batches=[2]) + + df = export_with_progress(config) + # Check only simulations from given batches are present in export + # drop non-deterministic columns + df = df.drop(["start_time", "end_time", "simulation"], axis=1) + df = df.sort_values(by=["realization", "batch", "sim_avg_obj"]) + + snapshot.assert_match(df.round(4).to_csv(index=False), "export.csv") + + +def test_export_nothing_for_empty_batch_list(cached_example): + config_path, config_file, _ = cached_example("math_func/config_multiobj.yml") + config = EverestConfig.load_file(Path(config_path) / config_file) + + # Add discard gradient flag to config file + config.export = ExportConfig( + discard_gradient=True, discard_rejected=True, batches=[] + ) + df = export_data( + export_config=config.export, + output_dir=config.output_dir, + data_file=config.model.data_file if config.model else None, + ) + + # Check export returns empty data frame + assert df.empty + + +def test_export_nothing(cached_example): + config_path, config_file, _ = cached_example("math_func/config_multiobj.yml") + config = EverestConfig.load_file(Path(config_path) / config_file) + + # Add discard gradient flag to config file + config.export = ExportConfig( + skip_export=True, discard_gradient=True, discard_rejected=True, batches=[3] + ) + df = export_data( + export_config=config.export, + output_dir=config.output_dir, + data_file=config.model.data_file if config.model else None, + ) + + # Check export returns empty data frame + assert df.empty + + +def test_get_export_path(cached_example): + config_path, config_file, _ = cached_example("math_func/config_multiobj.yml") + config = EverestConfig.load_file(Path(config_path) / config_file) + + # Test default export path when no csv_output_filepath is defined + expected_export_path = os.path.join( + config.output_dir, CONFIG_FILE.replace(".yml", ".csv") + ) + assert expected_export_path == config.export_path + + # Test export path when csv_output_filepath is an absolute path + new_export_folderpath = os.path.join(config.output_dir, "new/folder") + new_export_filepath = os.path.join( + new_export_folderpath, CONFIG_FILE.replace(".yml", ".csv") + ) + + config.export = ExportConfig(csv_output_filepath=new_export_filepath) + + expected_export_path = new_export_filepath + assert expected_export_path == config.export_path + + # Test export path when csv_output_filepath is a relative path + config.export.csv_output_filepath = os.path.join( + "new/folder", CONFIG_FILE.replace(".yml", ".csv") + ) + assert expected_export_path == config.export_path + + # Test export when file does not contain an extension. + config_file_no_extension = os.path.splitext(os.path.basename(CONFIG_FILE))[0] + shutil.copy(CONFIG_FILE, config_file_no_extension) + new_config = EverestConfig.load_file(config_file_no_extension) + expected_export_path = os.path.join( + new_config.output_dir, f"{config_file_no_extension}.csv" + ) + assert expected_export_path == new_config.export_path + + +def test_validate_export(cached_example): + config_path, config_file, _ = cached_example( + "../../tests/everest/test_data/mocked_test_case/mocked_multi_batch.yml" + ) + config = EverestConfig.load_file(Path(config_path) / config_file) + + def check_error(expected_error, reported_errors): + expected_error_msg, expected_export_ecl = expected_error + error_list, export_ecl = reported_errors + # If no error was message provided the list of errors + # should also be empty + if not expected_error_msg: + assert len(error_list) == 0 + assert expected_export_ecl == export_ecl + else: + found = False + for error in error_list: + if expected_error_msg in error: + found = True + break + assert found + assert expected_export_ecl == export_ecl + + # Test error when user defines an empty list for the eclipse keywords + config.export = ExportConfig() + config.export.keywords = [] + errors, export_ecl = check_for_errors( + config=config.export, + optimization_output_path=config.optimization_output_dir, + storage_path=config.storage_dir, + data_file_path=config.model.data_file, + ) + check_error( + expected_error=("No eclipse keywords selected for export", False), + reported_errors=(errors, export_ecl), + ) + + # Test error when user defines an empty list for the eclipse keywords + # and empty list of for batches to export + config.export.batches = [] + errors, export_ecl = check_for_errors( + config=config.export, + optimization_output_path=config.optimization_output_dir, + storage_path=config.storage_dir, + data_file_path=config.model.data_file, + ) + check_error( + expected_error=("No batches selected for export.", False), + reported_errors=(errors, export_ecl), + ) + + # Test export validator outputs no errors when the config file contains + # only keywords that represent a subset of already internalized keys + config.export.keywords = ["FOPT"] + config.export.batches = None + errors, export_ecl = check_for_errors( + config=config.export, + optimization_output_path=config.optimization_output_dir, + storage_path=config.storage_dir, + data_file_path=config.model.data_file, + ) + check_error(expected_error=("", True), reported_errors=(errors, export_ecl)) + + non_int_key = "STANGE_KEY" + config.export.keywords = [non_int_key, "FOPT"] + errors, export_ecl = check_for_errors( + config=config.export, + optimization_output_path=config.optimization_output_dir, + storage_path=config.storage_dir, + data_file_path=config.model.data_file, + ) + + check_error( + ( + f"Non-internalized ecl keys selected for export '{non_int_key}'.", + False, + ), + (errors, export_ecl), + ) + + # Test that validating the export spots non-valid batches and removes + # them from the list of batches selected for export. + non_valid_batch = 42 + config.export = ExportConfig(batches=[0, non_valid_batch]) + errors, export_ecl = check_for_errors( + config=config.export, + optimization_output_path=config.optimization_output_dir, + storage_path=config.storage_dir, + data_file_path=config.model.data_file, + ) + check_error( + ( + f"Batch {non_valid_batch} not found in optimization results. Skipping for" + " current export", + True, + ), + (errors, export_ecl), + ) + assert config.export.batches == [0] + + +def test_export_gradients(cached_example, snapshot): + config_path, config_file, _ = cached_example("math_func/config_multiobj.yml") + config = EverestConfig.load_file(Path(config_path) / config_file) + + df = export_data( + export_config=config.export, + output_dir=config.output_dir, + data_file=config.model.data_file if config.model else None, + ) + + snapshot.assert_match( + df.drop(["start_time", "end_time"], axis=1).round(4).to_csv(), "export.csv" + ) diff --git a/tests/everest/test_math_func.py b/tests/everest/test_math_func.py index ffde0b7d2e5..c5de0df03f9 100644 --- a/tests/everest/test_math_func.py +++ b/tests/everest/test_math_func.py @@ -1,10 +1,16 @@ +import itertools import os +import numpy as np +import pandas as pd import pytest import yaml from ert.run_models.everest_run_model import EverestRunModel +from everest import ConfigKeys as CK from everest.config import EverestConfig, InputConstraintConfig +from everest.config.export_config import ExportConfig +from everest.export import export_data from everest.util import makedirs_if_needed CONFIG_FILE_MULTIOBJ = "config_multiobj.yml" @@ -32,6 +38,68 @@ def test_math_func_multiobj( (-0.5 * (2.0 / 3.0) * 1.5) + (-4.5 * (1.0 / 3.0) * 1.0), abs=0.01 ) + # Test conversion to pandas DataFrame + if config.export is None: + config.export = ExportConfig(discard_rejected=False) + + df = export_data( + export_config=config.export, + output_dir=config.output_dir, + data_file=config.model.data_file if config.model else None, + ) + ok_evals = df[(df["is_gradient"] == 0) & (df["success"] == 1)] + + # Three points in this case are increasing the merit + assert len(ok_evals[ok_evals["increased_merit"] == 1]) == 2 + + first = ok_evals.iloc[0] + best = ok_evals.iloc[-1] + assert first["point_x"] == 0 + assert first["point_y"] == 0 + assert first["point_z"] == 0 + assert first["distance_p"] == -(0.5 * 0.5 * 3) + assert first["distance_q"] == -(1.5 * 1.5 * 2 + 0.5 * 0.5) + assert first["sim_avg_obj"] == (-0.75 * (2.0 / 3.0) * 1.5) + ( + -4.75 * (1.0 / 3.0) * 1.0 + ) + + assert best["point_x"] == pytest.approx(x) + assert best["point_y"] == pytest.approx(y) + assert best["point_z"] == pytest.approx(z) + assert best["sim_avg_obj"] == pytest.approx(run_model.result.total_objective) + + test_space = itertools.product( + (first, best), + ( + ("distance_p", 2.0 / 3, 1.5), + ("distance_q", 1.0 / 3, 1), + ), + ) + for row, (obj_name, weight, norm) in test_space: + assert row[obj_name] * norm == row[obj_name + "_norm"] + assert row[obj_name] * weight * norm == pytest.approx( + row[obj_name + "_weighted_norm"] + ) + + assert first["realization_weight"] == 1.0 + assert best["realization_weight"] == 1.0 + + # check exported sim_avg_obj against dakota_tabular + dt = pd.read_csv( + os.path.join(config.optimization_output_dir, "dakota", "dakota_tabular.dat"), + sep=" +", + engine="python", + ) + dt.sort_values(by=["%eval_id"], inplace=True) + ok_evals = ok_evals.sort_values(by=["batch"]) + for a, b in zip( + dt["obj_fn"], # pylint: disable=unsubscriptable-object + ok_evals["sim_avg_obj"], + strict=False, + ): + # Opposite, because ropt negates values before passing to dakota + assert -a == pytest.approx(b) + @pytest.mark.integration_test def test_math_func_advanced( @@ -63,6 +131,55 @@ def test_math_func_advanced( expected_opt = -(w[0] * (dist_0) + w[1] * (dist_1)) assert expected_opt == pytest.approx(run_model.result.total_objective, abs=0.001) + # Test conversion to pandas DataFrame + df = export_data( + export_config=config.export, + output_dir=config.output_dir, + data_file=config.model.data_file if config.model else None, + ) + ok_evals = df[(df["is_gradient"] == 0) & (df["success"] == 1)] + + ok_evals_0 = ok_evals[ok_evals["realization"] == 0] + best_0 = ok_evals_0.iloc[-1] + assert best_0[f"point_{point_names[0]}"] == pytest.approx(x0) + assert best_0[f"point_{point_names[1]}"] == pytest.approx(x1) + assert best_0[f"point_{point_names[2]}"] == pytest.approx(x2) + assert best_0["distance"] == pytest.approx(-dist_0, abs=0.001) + assert best_0["real_avg_obj"] == pytest.approx( + run_model.result.total_objective, abs=0.001 + ) + assert best_0["realization_weight"] == 0.25 + + ok_evals_1 = ok_evals[ok_evals["realization"] == 2] + best_1 = ok_evals_1.iloc[-1] + assert best_1[f"point_{point_names[0]}"] == pytest.approx(x0) + assert best_1[f"point_{point_names[1]}"] == pytest.approx(x1) + assert best_1[f"point_{point_names[2]}"] == pytest.approx(x2) + assert best_1["distance"] == pytest.approx(-dist_1, abs=0.001) + assert best_1["real_avg_obj"] == pytest.approx( + run_model.result.total_objective, abs=0.001 + ) + assert best_1["realization_weight"] == 0.75 + + # check functionality of export batch filtering + if CK.EXPORT not in config: + config.export = ExportConfig() + + exp_nunique = 2 + batches_list = [0, 2] + config.export.batches = batches_list + + batch_filtered_df = export_data( + export_config=config.export, + output_dir=config.output_dir, + data_file=config.model.data_file if config.model else None, + ) + n_unique_batches = batch_filtered_df["batch"].nunique() + unique_batches = np.sort(batch_filtered_df["batch"].unique()).tolist() + + assert exp_nunique == n_unique_batches + assert batches_list == unique_batches + @pytest.mark.integration_test def test_remove_run_path( diff --git a/tests/everest/test_simulator_cache.py b/tests/everest/test_simulator_cache.py index 816cd4d943a..14765b49b14 100644 --- a/tests/everest/test_simulator_cache.py +++ b/tests/everest/test_simulator_cache.py @@ -1,3 +1,5 @@ +from pathlib import Path + import numpy as np from ert.config import QueueSystem @@ -39,6 +41,9 @@ def new_call(*args): # Now do another run, where the functions should come from the cache: n_evals = 0 + # If we want to do another run, the seba database must be made new: + Path("everest_output/optimization_output/seba.db").unlink() + # The batch_id was used as a stopping criterion, so it must be reset: run_model._batch_id = 0