From 120f3ace4ae7dba76d7ef66195c2933c1af3515b Mon Sep 17 00:00:00 2001 From: "Yngve S. Kristiansen" Date: Thu, 7 Nov 2024 10:14:45 +0100 Subject: [PATCH 01/13] (for easier reviewing) Paste in SEBA storage --- src/everest/everest_storage.py | 414 +++++++++++++++++++++++++++++++++ 1 file changed, 414 insertions(+) create mode 100644 src/everest/everest_storage.py diff --git a/src/everest/everest_storage.py b/src/everest/everest_storage.py new file mode 100644 index 00000000000..7b4e0585133 --- /dev/null +++ b/src/everest/everest_storage.py @@ -0,0 +1,414 @@ +import copy +import logging +import os +import sqlite3 +import time +from collections import namedtuple +from itertools import count +from pathlib import Path + +import numpy as np + +from ropt.enums import ConstraintType, EventType +from ropt.results import FunctionResults, GradientResults, convert_to_maximize +from .database import Database +from .snapshot import SebaSnapshot + +OptimalResult = namedtuple( + "OptimalResult", "batch, controls, total_objective, expected_objectives" +) + +logger = logging.getLogger(__name__) + + +def _convert_names(control_names): + converted_names = [] + for name in control_names: + converted = f"{name[0]}_{name[1]}" + if len(name) > 2: + converted += f"-{name[2]}" + converted_names.append(converted) + return converted_names + + +class EverestStorage: + # This implementation builds as much as possible on the older database and + # snapshot code, since it is meant for backwards compatibility, and should + # not be extended with new functionality. + + def __init__(self, optimizer, output_dir): + # Internal variables. + self._output_dir = output_dir + self._database = Database(output_dir) + self._control_ensemble_id = 0 + self._gradient_ensemble_id = 0 + self._simulator_results = None + self._merit_file = Path(output_dir) / "dakota" / "OPT_DEFAULT.out" + + # Connect event handlers. + self._set_event_handlers(optimizer) + + self._initialized = False + + @property + def file(self): + return self._database.location + + def _initialize(self, event): + if self._initialized: + return + self._initialized = True + + self._database.add_experiment( + name="optimization_experiment", start_time_stamp=time.time() + ) + + # Add configuration values. + config = event.config + for control_name, initial_value, lower_bound, upper_bound in zip( + _convert_names(config.variables.names), + config.variables.initial_values, + config.variables.lower_bounds, + config.variables.upper_bounds, + ): + self._database.add_control_definition( + control_name, initial_value, lower_bound, upper_bound + ) + + for name, weight, scale in zip( + config.objective_functions.names, + config.objective_functions.weights, + config.objective_functions.scales, + ): + self._database.add_function( + name=name, + function_type="OBJECTIVE", + weight=weight, + normalization=1.0 / scale, + ) + + if config.nonlinear_constraints is not None: + for name, scale, rhs_value, constraint_type in zip( + config.nonlinear_constraints.names, + config.nonlinear_constraints.scales, + config.nonlinear_constraints.rhs_values, + config.nonlinear_constraints.types, + ): + self._database.add_function( + name=name, + function_type="CONSTRAINT", + normalization=scale, + rhs_value=rhs_value, + constraint_type=ConstraintType(constraint_type).name.lower(), + ) + + for name, weight in zip( + config.realizations.names, + config.realizations.weights, + ): + self._database.add_realization(str(name), weight) + + def _add_batch(self, config, controls, perturbed_controls): + self._gradient_ensemble_id += 1 + self._control_ensemble_id = self._gradient_ensemble_id + control_names = _convert_names(config.variables.names) + for control_name, value in zip(control_names, controls): + self._database.add_control_value( + set_id=self._control_ensemble_id, + control_name=control_name, + value=value, + ) + if perturbed_controls is not None: + perturbed_controls = perturbed_controls.reshape( + perturbed_controls.shape[0], -1 + ) + self._gradient_ensemble_id = self._control_ensemble_id + for g_idx in range(perturbed_controls.shape[1]): + self._gradient_ensemble_id += 1 + for c_idx, c_name in enumerate(control_names): + self._database.add_control_value( + set_id=self._gradient_ensemble_id, + control_name=c_name, + value=perturbed_controls[c_idx, g_idx], + ) + + def _add_simulations(self, config, result): + self._gradient_ensemble_id = self._control_ensemble_id + simulation_index = count() + if isinstance(result, FunctionResults): + for realization_name in config.realizations.names: + self._database.add_simulation( + realization_name=str(realization_name), + set_id=self._control_ensemble_id, + sim_name=f"{result.batch_id}_{next(simulation_index)}", + is_gradient=False, + ) + if isinstance(result, GradientResults): + for realization_name in config.realizations.names: + for _ in range(config.gradient.number_of_perturbations): + self._gradient_ensemble_id += 1 + self._database.add_simulation( + realization_name=str(realization_name), + set_id=self._gradient_ensemble_id, + sim_name=f"{result.batch_id}_{next(simulation_index)}", + is_gradient=True, + ) + + def _add_simulator_results( + self, config, batch, objective_results, constraint_results + ): + if constraint_results is None: + results = objective_results + else: + results = np.vstack((objective_results, constraint_results)) + statuses = np.logical_and.reduce(np.isfinite(results), axis=0) + names = config.objective_functions.names + if config.nonlinear_constraints is not None: + names += config.nonlinear_constraints.names + + for sim_idx, status in enumerate(statuses): + sim_name = f"{batch}_{sim_idx}" + for function_idx, name in enumerate(names): + if status: + self._database.add_simulation_result( + sim_name, results[function_idx, sim_idx], name, 0 + ) + self._database.set_simulation_ended(sim_name, status) + + def _add_constraint_values(self, config, batch, constraint_values): + statuses = np.logical_and.reduce(np.isfinite(constraint_values), axis=0) + for sim_id, status in enumerate(statuses): + if status: + for idx, constraint_name in enumerate( + config.nonlinear_constraints.names + ): + # Note the time_index=0, the database supports storing + # multipel time-points, but we do not support that, so we + # use times_index=0. + self._database.update_simulation_result( + simulation_name=f"{batch}_{sim_id}", + function_name=constraint_name, + times_index=0, + value=constraint_values[idx, sim_id], + ) + + def _add_gradients(self, config, objective_gradients): + for grad_index, gradient in enumerate(objective_gradients): + for control_index, control_name in enumerate( + _convert_names(config.variables.names) + ): + self._database.add_gradient_result( + gradient[control_index], + config.objective_functions.names[grad_index], + 1, + control_name, + ) + + def _add_total_objective(self, total_objective): + self._database.add_calculation_result( + set_id=self._control_ensemble_id, + object_function_value=total_objective, + ) + + def _convert_constraints(self, config, constraint_results): + constraint_results = copy.deepcopy(constraint_results) + rhs_values = config.nonlinear_constraints.rhs_values + for idx, constraint_type in enumerate(config.nonlinear_constraints.types): + constraint_results[idx] -= rhs_values[idx] + if constraint_type == ConstraintType.LE: + constraint_results[idx] *= -1.0 + return constraint_results + + def _store_results(self, config, results): + if isinstance(results, FunctionResults): + objective_results = results.evaluations.objectives + objective_results = np.moveaxis(objective_results, -1, 0) + constraint_results = results.evaluations.constraints + if constraint_results is not None: + constraint_results = np.moveaxis(constraint_results, -1, 0) + else: + objective_results = None + constraint_results = None + + if isinstance(results, GradientResults): + perturbed_variables = results.evaluations.perturbed_variables + perturbed_variables = np.moveaxis(perturbed_variables, -1, 0) + perturbed_objectives = results.evaluations.perturbed_objectives + perturbed_objectives = np.moveaxis(perturbed_objectives, -1, 0) + perturbed_constraints = results.evaluations.perturbed_constraints + if perturbed_constraints is not None: + perturbed_constraints = np.moveaxis(perturbed_constraints, -1, 0) + else: + perturbed_variables = None + perturbed_objectives = None + perturbed_constraints = None + + self._add_batch(config, results.evaluations.variables, perturbed_variables) + self._add_simulations(config, results) + + # Convert back the simulation results to the legacy format: + if perturbed_objectives is not None: + perturbed_objectives = perturbed_objectives.reshape( + perturbed_objectives.shape[0], -1 + ) + if objective_results is None: + objective_results = perturbed_objectives + else: + objective_results = np.hstack((objective_results, perturbed_objectives)) + + if config.nonlinear_constraints is not None: + if perturbed_constraints is not None: + perturbed_constraints = perturbed_constraints.reshape( + perturbed_constraints.shape[0], -1 + ) + if constraint_results is None: + constraint_results = perturbed_constraints + else: + constraint_results = np.hstack( + (constraint_results, perturbed_constraints) + ) + # The legacy code converts all constraints to the form f(x) >=0: + constraint_results = self._convert_constraints(config, constraint_results) + + self._add_simulator_results( + config, results.batch_id, objective_results, constraint_results + ) + if config.nonlinear_constraints: + self._add_constraint_values(config, results.batch_id, constraint_results) + if isinstance(results, FunctionResults) and results.functions is not None: + self._add_total_objective(results.functions.weighted_objective) + if isinstance(results, GradientResults) and results.gradients is not None: + self._add_gradients(config, results.gradients.objectives) + + def _handle_finished_batch_event(self, event): + logger.debug("Storing batch results in the sqlite database") + + converted_results = tuple( + convert_to_maximize(result) for result in event.results) + results = [] + best_value = -np.inf + best_results = None + for item in converted_results: + if isinstance(item, GradientResults): + results.append(item) + if ( + isinstance(item, FunctionResults) + and item.functions is not None + and item.functions.weighted_objective > best_value + ): + best_value = item.functions.weighted_objective + best_results = item + if best_results is not None: + results = [best_results] + results + last_batch = -1 + for item in results: + if item.batch_id != last_batch: + self._database.add_batch() + self._store_results(event.config, item) + if item.batch_id != last_batch: + self._database.set_batch_ended + last_batch = item.batch_id + + self._database.set_batch_ended(time.time(), True) + + # Merit values are dakota specific, load them if the output file exists: + self._database.update_calculation_result(_get_merit_values(self._merit_file)) + + backup_data(self._database.location) + + def _handle_finished_event(self, event): + logger.debug("Storing final results in the sqlite database") + self._database.update_calculation_result(_get_merit_values(self._merit_file)) + self._database.set_experiment_ended(time.time()) + + def _set_event_handlers(self, optimizer): + optimizer.add_observer( + EventType.START_OPTIMIZER_STEP, self._initialize + ) + optimizer.add_observer( + EventType.FINISHED_EVALUATION, self._handle_finished_batch_event + ) + optimizer.add_observer( + EventType.FINISHED_OPTIMIZER_STEP, + self._handle_finished_event, + ) + + def get_optimal_result(self): + snapshot = SebaSnapshot(self._output_dir) + optimum = next( + ( + data + for data in reversed(snapshot.get_optimization_data()) + if data.merit_flag + ), + None, + ) + if optimum is None: + return None + objectives = snapshot.get_snapshot(batches=[optimum.batch_id]) + return OptimalResult( + batch=optimum.batch_id, + controls=optimum.controls, + total_objective=optimum.objective_value, + expected_objectives={ + name:value[0] for name, value in objectives.expected_objectives.items() + }, + ) + + +def backup_data(database_location) -> None: + src = sqlite3.connect(database_location) + dst = sqlite3.connect(database_location + ".backup") + with dst: + src.backup(dst) + src.close() + dst.close() + + +def _get_merit_fn_lines(merit_path): + if os.path.isfile(merit_path): + with open(merit_path, "r", errors="replace", encoding="utf-8") as reader: + lines = reader.readlines() + start_line_idx = -1 + for inx, line in enumerate(lines): + if "Merit" in line and "feval" in line: + start_line_idx = inx + 1 + if start_line_idx > -1 and line.startswith("="): + return lines[start_line_idx:inx] + if start_line_idx > -1: + return lines[start_line_idx:] + return [] + + +def _parse_merit_line(merit_values_string): + values = [] + for merit_elem in merit_values_string.split(): + try: + values.append(float(merit_elem)) + except ValueError: + for elem in merit_elem.split("0x")[1:]: + values.append(float.fromhex("0x" + elem)) + if len(values) == 8: + # Dakota starts counting at one, correct to be zero-based. + return int(values[5]) - 1, values[4] + return None + + +def _get_merit_values(merit_file): + # Read the file containing merit information. + # The file should contain the following table header + # Iter F(x) mu alpha Merit feval btracks Penalty + # :return: merit values indexed by the function evaluation number + # example: + # 0: merit_value_0 + # 1: merit_value_1 + # 2 merit_value_2 + # ... + # ] + merit_values = [] + if merit_file.exists(): + for line in _get_merit_fn_lines(merit_file): + value = _parse_merit_line(line) + if value is not None: + merit_values.append({"iter":value[0], "value":value[1]}) + return merit_values From 8b9e7b32e9d0fc8b0e16d77434b1757c5b2ebbec Mon Sep 17 00:00:00 2001 From: "Yngve S. Kristiansen" Date: Fri, 10 Jan 2025 14:28:50 +0100 Subject: [PATCH 02/13] Add Everest storage --- src/ert/run_models/everest_run_model.py | 42 +- src/everest/api/everest_data_api.py | 226 +-- src/everest/everest_storage.py | 1209 ++++++++++++----- .../config_multiobj.yml/snapshot.json | 20 +- 4 files changed, 1011 insertions(+), 486 deletions(-) diff --git a/src/ert/run_models/everest_run_model.py b/src/ert/run_models/everest_run_model.py index f05ac95ff8c..b768ee7a286 100644 --- a/src/ert/run_models/everest_run_model.py +++ b/src/ert/run_models/everest_run_model.py @@ -9,21 +9,18 @@ import shutil from collections import defaultdict from collections.abc import Callable -from dataclasses import dataclass from enum import IntEnum from pathlib import Path from types import TracebackType from typing import TYPE_CHECKING, Any, Protocol import numpy as np -import seba_sqlite.sqlite_storage from numpy import float64 from numpy._typing import NDArray from ropt.enums import EventType, OptimizerExitCode from ropt.evaluator import EvaluatorContext, EvaluatorResult from ropt.plan import BasicOptimizer from ropt.plan import Event as OptimizerEvent -from seba_sqlite import SqliteStorage from typing_extensions import TypedDict from _ert.events import EESnapshot, EESnapshotUpdate, Event @@ -32,6 +29,7 @@ from ert.runpaths import Runpaths from ert.storage import open_storage from everest.config import EverestConfig +from everest.everest_storage import EverestStorage, OptimalResult from everest.optimizer.everest2ropt import everest2ropt from everest.simulator.everest_to_ert import everest_to_ert_config from everest.strings import EVEREST @@ -70,24 +68,6 @@ class OptimizerCallback(Protocol): def __call__(self) -> str | None: ... -@dataclass -class OptimalResult: - batch: int - controls: list[Any] - total_objective: float - - @staticmethod - def from_seba_optimal_result( - o: seba_sqlite.sqlite_storage.OptimalResult | None = None, - ) -> OptimalResult | None: - if o is None: - return None - - return OptimalResult( - batch=o.batch, controls=o.controls, total_objective=o.total_objective - ) - - class EverestExitCode(IntEnum): COMPLETED = 1 TOO_FEW_REALIZATIONS = 2 @@ -206,23 +186,21 @@ def run_experiment( # Initialize the ropt optimizer: optimizer = self._create_optimizer() - # The SqliteStorage object is used to store optimization results from - # Seba in an sqlite database. It reacts directly to events emitted by - # Seba and is not called by Everest directly. The stored results are - # accessed by Everest via separate SebaSnapshot objects. - # This mechanism is outdated and not supported by the ropt package. It - # is retained for now via the seba_sqlite package. - seba_storage = SqliteStorage( # type: ignore - optimizer, self._everest_config.optimization_output_dir + self.ever_storage = EverestStorage( + output_dir=Path(self._everest_config.optimization_output_dir), + ) + self.ever_storage.observe_optimizer( + optimizer, + Path(self._everest_config.optimization_output_dir) + / "dakota" + / "OPT_DEFAULT.out", ) # Run the optimization: optimizer_exit_code = optimizer.run().exit_code # Extract the best result from the storage. - self._result = OptimalResult.from_seba_optimal_result( - seba_storage.get_optimal_result() # type: ignore - ) + self._result = self.ever_storage.get_optimal_result() if self._exit_code is None: match optimizer_exit_code: diff --git a/src/everest/api/everest_data_api.py b/src/everest/api/everest_data_api.py index d3b3d804fab..e8e53eb3084 100644 --- a/src/everest/api/everest_data_api.py +++ b/src/everest/api/everest_data_api.py @@ -1,46 +1,65 @@ -from collections import OrderedDict +from pathlib import Path +import polars import polars as pl -from seba_sqlite.snapshot import SebaSnapshot +from ropt.enums import ConstraintType from ert.storage import open_storage from everest.config import EverestConfig, ServerConfig from everest.detached import ServerStatus, everserver_status +from everest.everest_storage import EverestStorage class EverestDataAPI: def __init__(self, config: EverestConfig, filter_out_gradient=True): self._config = config output_folder = config.optimization_output_dir - self._snapshot = SebaSnapshot(output_folder).get_snapshot(filter_out_gradient) + self._ever_storage = EverestStorage(Path(output_folder)) + self._ever_storage.read_from_output_dir() @property def batches(self): - batch_ids = list({opt.batch_id for opt in self._snapshot.optimization_data}) - return sorted(batch_ids) + return sorted( + b.batch_id + for b in self._ever_storage.data.batches + if b.batch_objectives is not None + ) @property def accepted_batches(self): - batch_ids = list( - {opt.batch_id for opt in self._snapshot.optimization_data if opt.merit_flag} + return sorted( + b.batch_id for b in self._ever_storage.data.batches if b.is_improvement ) - return sorted(batch_ids) @property def objective_function_names(self): - return [fnc.name for fnc in self._snapshot.metadata.objectives.values()] + return sorted( + self._ever_storage.data.objective_functions["objective_name"] + .unique() + .to_list() + ) @property def output_constraint_names(self): - return [fnc.name for fnc in self._snapshot.metadata.constraints.values()] + return ( + sorted( + self._ever_storage.data.nonlinear_constraints["constraint_name"] + .unique() + .to_list() + ) + if self._ever_storage.data.nonlinear_constraints is not None + else [] + ) def input_constraint(self, control): - controls = [ - con - for con in self._snapshot.metadata.controls.values() - if con.name == control - ] - return {"min": controls[0].min_value, "max": controls[0].max_value} + initial_values = self._ever_storage.data.controls + control_spec = initial_values.filter( + pl.col("control_name") == control + ).to_dicts()[0] + return { + "min": control_spec.get("lower_bounds"), + "max": control_spec.get("upper_bounds"), + } def output_constraint(self, constraint): """ @@ -50,106 +69,128 @@ def output_constraint(self, constraint): "right_hand_side" is a constant real number that indicates the constraint bound/target. """ - constraints = [ - con - for con in self._snapshot.metadata.constraints.values() - if con.name == constraint - ] + + constraint_dict = self._ever_storage.data.nonlinear_constraints.to_dicts()[0] return { - "type": constraints[0].constraint_type, - "right_hand_side": constraints[0].rhs_value, + "type": ConstraintType(constraint_dict["constraint_type"]).name.lower(), + "right_hand_side": constraint_dict["constraint_rhs_value"], } @property def realizations(self): - return list( - OrderedDict.fromkeys( - int(sim.realization) for sim in self._snapshot.simulation_data - ) + return sorted( + self._ever_storage.data.batches[0] + .realization_objectives["realization"] + .unique() + .to_list() ) @property def simulations(self): - return list( - OrderedDict.fromkeys( - [int(sim.simulation) for sim in self._snapshot.simulation_data] - ) + return sorted( + self._ever_storage.data.batches[0] + .realization_objectives["simulation_id"] + .unique() + .to_list() ) @property def control_names(self): - return [con.name for con in self._snapshot.metadata.controls.values()] + return sorted( + self._ever_storage.data.controls["control_name"].unique().to_list() + ) @property def control_values(self): - controls = [con.name for con in self._snapshot.metadata.controls.values()] - return [ - {"control": con, "batch": sim.batch, "value": sim.controls[con]} - for sim in self._snapshot.simulation_data - for con in controls - if con in sim.controls - ] + all_control_names = self._ever_storage.data.controls["control_name"].to_list() + new = [] + for batch in self._ever_storage.data.batches: + if batch.realization_controls is None: + continue + + for controls_dict in batch.realization_controls.to_dicts(): + for name in all_control_names: + new.append( + { + "control": name, + "batch": batch.batch_id, + "value": controls_dict[name], + } + ) + + return new @property def objective_values(self): return [ - { - "function": objective.name, - "batch": sim.batch, - "realization": sim.realization, - "simulation": sim.simulation, - "value": sim.objectives[objective.name], - "weight": objective.weight, - "norm": objective.normalization, - } - for sim in self._snapshot.simulation_data - for objective in self._snapshot.metadata.objectives.values() - if objective.name in sim.objectives + b for b in self._ever_storage.data.batches if b.batch_objectives is not None ] @property def single_objective_values(self): - single_obj = [ - { - "batch": optimization_el.batch_id, - "objective": optimization_el.objective_value, - "accepted": optimization_el.merit_flag, - } - for optimization_el in self._snapshot.optimization_data + batch_datas = polars.concat( + [ + b.batch_objectives.select( + c for c in b.batch_objectives.columns if c != "merit_value" + ).with_columns( + polars.lit(1 if b.is_improvement else 0).alias("accepted") + ) + for b in self._ever_storage.data.batches + if b.realization_controls is not None + ] + ) + objectives = self._ever_storage.data.objective_functions + objective_names = objectives["objective_name"].unique().to_list() + + for o in objectives.to_dicts(): + batch_datas = batch_datas.with_columns( + polars.col(o["objective_name"]) * o["weight"] * o["normalization"] + ) + + columns = [ + "batch", + "objective", + "accepted", + *(objective_names if len(objective_names) > 1 else []), ] - metadata = { - func.name: {"weight": func.weight, "norm": func.normalization} - for func in self._snapshot.metadata.functions.values() - if func.function_type == func.FUNCTION_OBJECTIVE_TYPE - } - if len(metadata) == 1: - return single_obj - objectives = [] - for name, values in self._snapshot.expected_objectives.items(): - for idx, val in enumerate(values): - factor = metadata[name]["weight"] * metadata[name]["norm"] - if len(objectives) > idx: - objectives[idx].update({name: val * factor}) - else: - objectives.append({name: val * factor}) - for idx, obj in enumerate(single_obj): - obj.update(objectives[idx]) - - return single_obj + + return ( + batch_datas.rename( + {"total_objective_value": "objective", "batch_id": "batch"} + ) + .select(columns) + .to_dicts() + ) @property def gradient_values(self): - return [ - { - "batch": optimization_el.batch_id, - "function": function, - "control": control, - "value": value, - } - for optimization_el in self._snapshot.optimization_data - for function, info in optimization_el.gradient_info.items() - for control, value in info.items() + all_batch_data = [ + b.batch_objective_gradient + for b in self._ever_storage.data.batches + if b.batch_objective_gradient is not None and b.is_improvement ] + if not all_batch_data: + return [] + + all_info = polars.concat(all_batch_data).drop("result_id") + objective_columns = [ + c + for c in all_info.drop(["batch_id", "control_name"]).columns + if not c.endswith(".total") + ] + return ( + all_info.select("batch_id", "control_name", *objective_columns) + .unpivot( + on=objective_columns, + index=["batch_id", "control_name"], + variable_name="function", + value_name="value", + ) + .rename({"control_name": "control", "batch_id": "batch"}) + .sort(by=["batch", "control"]) + .select(["batch", "function", "control", "value"]) + .to_dicts() + ) def summary_values(self, batches=None, keys=None): if batches is None: @@ -180,13 +221,10 @@ def summary_values(self, batches=None, keys=None): summary = summary.with_columns( pl.Series("batch", [batch_id] * summary.shape[0]) ) - # The realization ID as defined by Everest must be - # retrieved via the seba snapshot. - realization_map = { - sim.simulation: sim.realization - for sim in self._snapshot.simulation_data - if sim.batch == batch_id - } + + realization_map = ( + self._ever_storage.data.simulation_to_geo_realization_map + ) realizations = pl.Series( "realization", [realization_map.get(str(sim)) for sim in summary["simulation"]], diff --git a/src/everest/everest_storage.py b/src/everest/everest_storage.py index 7b4e0585133..96c7685e81e 100644 --- a/src/everest/everest_storage.py +++ b/src/everest/everest_storage.py @@ -1,414 +1,923 @@ -import copy +from __future__ import annotations + +import json import logging import os -import sqlite3 -import time -from collections import namedtuple -from itertools import count +from dataclasses import dataclass, field from pathlib import Path +from typing import ( + Any, + TypedDict, + cast, +) import numpy as np - -from ropt.enums import ConstraintType, EventType +import polars +from ropt.enums import EventType +from ropt.plan import BasicOptimizer, Event from ropt.results import FunctionResults, GradientResults, convert_to_maximize -from .database import Database -from .snapshot import SebaSnapshot - -OptimalResult = namedtuple( - "OptimalResult", "batch, controls, total_objective, expected_objectives" -) logger = logging.getLogger(__name__) -def _convert_names(control_names): - converted_names = [] - for name in control_names: - converted = f"{name[0]}_{name[1]}" - if len(name) > 2: - converted += f"-{name[2]}" - converted_names.append(converted) - return converted_names +@dataclass +class OptimalResult: + batch: int + controls: list[Any] + total_objective: float -class EverestStorage: - # This implementation builds as much as possible on the older database and - # snapshot code, since it is meant for backwards compatibility, and should - # not be extended with new functionality. +def try_read_df(path: Path) -> polars.DataFrame | None: + return polars.read_parquet(path) if path.exists() else None + + +@dataclass +class BatchDataFrames: + batch_id: int + realization_controls: polars.DataFrame + batch_objectives: polars.DataFrame | None + realization_objectives: polars.DataFrame | None + batch_constraints: polars.DataFrame | None + realization_constraints: polars.DataFrame | None + batch_objective_gradient: polars.DataFrame | None + perturbation_objectives: polars.DataFrame | None + batch_constraint_gradient: polars.DataFrame | None + perturbation_constraints: polars.DataFrame | None + is_improvement: bool | None = False + + @property + def existing_dataframes(self) -> dict[str, polars.DataFrame]: + return { + k: cast(polars.DataFrame, getattr(self, k)) + for k in [ + "batch_objectives", + "batch_objective_gradient", + "batch_constraints", + "batch_constraint_gradient", + "realization_controls", + "realization_objectives", + "realization_constraints", + "perturbation_objectives", + "perturbation_constraints", + ] + if getattr(self, k) is not None + } + + +@dataclass +class EverestStorageDataFrames: + batches: list[BatchDataFrames] = field(default_factory=list) + controls: polars.DataFrame | None = None + objective_functions: polars.DataFrame | None = None + nonlinear_constraints: polars.DataFrame | None = None + realization_weights: polars.DataFrame | None = None + + @property + def simulation_to_geo_realization_map(self) -> dict[str, str]: + """ + Mapping from simulation ID to geo-realization + """ + dummy_df = next( + ( + b.realization_controls + for b in self.batches + if b.realization_controls is not None + ), + None, + ) + + if dummy_df is None: + return {} + + mapping = {} + for d in dummy_df.select("realization", "simulation_id").to_dicts(): + # Currently we work with str, but should maybe not be done in future + mapping[str(d["simulation_id"])] = str(d["realization"]) + + return mapping + + @property + def existing_dataframes(self) -> dict[str, polars.DataFrame]: + return { + k: cast(polars.DataFrame, getattr(self, k)) + for k in [ + "controls", + "objective_functions", + "nonlinear_constraints", + "realization_weights", + ] + if getattr(self, k) is not None + } + + def write_to_experiment( + self, experiment: _OptimizerOnlyExperiment, write_csv=False + ) -> None: + for df_name, df in self.existing_dataframes.items(): + with open( + experiment.optimizer_mount_point / f"{df_name}.json", + mode="w+", + encoding="utf-8", + ) as f: + f.write(json.dumps(df.to_dicts())) + + df.write_parquet(f"{experiment.optimizer_mount_point / df_name}.parquet") + + if write_csv: + df.write_csv(f"{experiment.optimizer_mount_point / df_name}.csv") + + for batch_data in self.batches: + ensemble = experiment.get_ensemble_by_name(f"batch_{batch_data.batch_id}") + with open( + ensemble.optimizer_mount_point / "batch.json", "w+", encoding="utf-8" + ) as f: + json.dump( + { + "batch_id": batch_data.batch_id, + "is_improvement": batch_data.is_improvement, + }, + f, + ) + + for df_key, df in batch_data.existing_dataframes.items(): + df.write_parquet(ensemble.optimizer_mount_point / f"{df_key}.parquet") + df.write_csv(ensemble.optimizer_mount_point / f"{df_key}.csv") + df.write_json(ensemble.optimizer_mount_point / f"{df_key}.json") + + def read_from_experiment(self, experiment: _OptimizerOnlyExperiment) -> None: + self.controls = polars.read_parquet( + experiment.optimizer_mount_point / "controls.parquet" + ) + self.objective_functions = polars.read_parquet( + experiment.optimizer_mount_point / "objective_functions.parquet" + ) + + if ( + experiment.optimizer_mount_point / "nonlinear_constraints.parquet" + ).exists(): + self.nonlinear_constraints = polars.read_parquet( + experiment.optimizer_mount_point / "nonlinear_constraints.parquet" + ) + + if (experiment.optimizer_mount_point / "realization_weights.parquet").exists(): + self.realization_weights = polars.read_parquet( + experiment.optimizer_mount_point / "realization_weights.parquet" + ) + + for ens in experiment.ensembles.values(): + with open(ens.optimizer_mount_point / "batch.json", encoding="utf-8") as f: + info = json.load(f) + + self.batches.append( + BatchDataFrames( + batch_id=info["batch_id"], + **{ + df_name: try_read_df( + Path(ens.optimizer_mount_point) / f"{df_name}.parquet" + ) + for df_name in [ + "batch_objectives", + "batch_objective_gradient", + "batch_constraints", + "batch_constraint_gradient", + "realization_controls", + "realization_objectives", + "realization_constraints", + "perturbation_objectives", + "perturbation_constraints", + ] + }, + is_improvement=info["is_improvement"], + ) + ) + + self.batches.sort(key=lambda b: b.batch_id) + + +class _OptimizerOnlyEnsemble: + def __init__(self, output_dir: Path) -> None: + self._output_dir = output_dir + + @property + def optimizer_mount_point(self) -> Path: + if not (self._output_dir / "optimizer").exists(): + Path.mkdir(self._output_dir / "optimizer", parents=True) - def __init__(self, optimizer, output_dir): - # Internal variables. + return self._output_dir / "optimizer" + + +class _OptimizerOnlyExperiment: + """ + Mocks an ERT storage, if we want to store optimization results within the + ERT storage, we can use an ERT Experiment object with an optimizer_mount_point + property + """ + + def __init__(self, output_dir: Path) -> None: self._output_dir = output_dir - self._database = Database(output_dir) + self._ensembles = {} + + @property + def optimizer_mount_point(self) -> Path: + if not (self._output_dir / "optimizer").exists(): + Path.mkdir(self._output_dir / "optimizer", parents=True) + + return self._output_dir / "optimizer" + + @property + def ensembles(self) -> dict[str, _OptimizerOnlyEnsemble]: + return { + str(d): _OptimizerOnlyEnsemble(self._output_dir / "ensembles" / d) + for d in os.listdir(self._output_dir / "ensembles") + if "batch_" in d + } + + def get_ensemble_by_name(self, name: str) -> _OptimizerOnlyEnsemble: + if name not in self._ensembles: + self._ensembles[name] = _OptimizerOnlyEnsemble( + self._output_dir / "ensembles" / name + ) + + return self._ensembles[name] + + +@dataclass +class _EvaluationResults(TypedDict): + realization_controls: polars.DataFrame + batch_objectives: polars.DataFrame + realization_objectives: polars.DataFrame + batch_constraints: polars.DataFrame | None + realization_constraints: polars.DataFrame | None + + +@dataclass +class _GradientResults(TypedDict): + batch_objective_gradient: polars.DataFrame | None + perturbation_objectives: polars.DataFrame | None + batch_constraint_gradient: polars.DataFrame | None + perturbation_constraints: polars.DataFrame | None + + +@dataclass +class _MeritValue: + value: float + iter: int + + +class EverestStorage: + def __init__( + self, + output_dir: Path, + ) -> None: self._control_ensemble_id = 0 self._gradient_ensemble_id = 0 - self._simulator_results = None - self._merit_file = Path(output_dir) / "dakota" / "OPT_DEFAULT.out" - # Connect event handlers. - self._set_event_handlers(optimizer) + self._output_dir = output_dir + self._merit_file: Path | None = None + self.data = EverestStorageDataFrames() + + @staticmethod + def _rename_ropt_df_columns(df: polars.DataFrame) -> polars.DataFrame: + """ + Renames columns of a dataframe from ROPT to what will be displayed + to the user. + """ + scaled_cols = [c for c in df.columns if c.lower().startswith("scaled")] + if len(scaled_cols) > 0: + raise ValueError("Scaled columns should not be stored into Everest storage") + + renames = { + "objective": "objective_name", + "weighted_objective": "total_objective_value", + "variable": "control_name", + "variables": "control_value", + "objectives": "objective_value", + "constraints": "constraint_value", + "nonlinear_constraint": "constraint_name", + "perturbed_variables": "perturbed_control_value", + "perturbed_objectives": "perturbed_objective_value", + "perturbed_constraints": "perturbed_constraint_value", + "evaluation_ids": "simulation_id", + } + return df.rename({k: v for k, v in renames.items() if k in df.columns}) + + @staticmethod + def _enforce_dtypes(df: polars.DataFrame) -> polars.DataFrame: + dtypes = { + "batch_id": polars.UInt32, + "result_id": polars.UInt32, + "perturbation": polars.UInt32, + "realization": polars.UInt32, + # -1 is used as a value in simulator cache. + # thus we need signed, otherwise we could do unsigned + "simulation_id": polars.Int32, + "objective_name": polars.String, + "control_name": polars.String, + "constraint_name": polars.String, + "total_objective_value": polars.Float64, + "control_value": polars.Float64, + "objective_value": polars.Float64, + "constraint_value": polars.Float64, + "perturbed_control_value": polars.Float64, + "perturbed_objective_value": polars.Float64, + "perturbed_constraint_value": polars.Float64, + } + + existing_cols = set(df.columns) + unaccounted_cols = existing_cols - set(dtypes) + if len(unaccounted_cols) > 0: + raise KeyError( + f"Expected all keys to have a specified dtype, found {unaccounted_cols}" + ) - self._initialized = False + df = df.cast( + { + colname: dtype + for colname, dtype in dtypes.items() + if colname in df.columns + } + ) - @property - def file(self): - return self._database.location + return df + + def write_to_output_dir(self) -> None: + exp = _OptimizerOnlyExperiment(self._output_dir) + + # csv writing mostly for dev/debugging/quick inspection + self.data.write_to_experiment(exp, write_csv=True) - def _initialize(self, event): - if self._initialized: - return - self._initialized = True + def read_from_output_dir(self) -> None: + exp = _OptimizerOnlyExperiment(self._output_dir) + self.data.read_from_experiment(exp) - self._database.add_experiment( - name="optimization_experiment", start_time_stamp=time.time() + def observe_optimizer( + self, + optimizer: BasicOptimizer, + merit_file: Path, + ) -> None: + # We only need this file if we are observing a running ROPT instance + # (using dakota backend) + self._merit_file = merit_file + + optimizer.add_observer( + EventType.START_OPTIMIZER_STEP, self._on_start_optimization ) + optimizer.add_observer( + EventType.FINISHED_EVALUATION, self._on_batch_evaluation_finished + ) + optimizer.add_observer( + EventType.FINISHED_OPTIMIZER_STEP, + self._on_optimization_finished, + ) + + def _on_start_optimization(self, event: Event) -> None: + def _format_control_names(control_names): + converted_names = [] + for name in control_names: + converted = f"{name[0]}_{name[1]}" + if len(name) > 2: + converted += f"-{name[2]}" + converted_names.append(converted) + + return converted_names - # Add configuration values. config = event.config - for control_name, initial_value, lower_bound, upper_bound in zip( - _convert_names(config.variables.names), - config.variables.initial_values, - config.variables.lower_bounds, - config.variables.upper_bounds, - ): - self._database.add_control_definition( - control_name, initial_value, lower_bound, upper_bound + + # Note: We probably do not have to store + # all of this information, consider removing. + self.data.controls = polars.DataFrame( + { + "control_name": polars.Series( + _format_control_names(config.variables.names), dtype=polars.String + ), + "initial_value": polars.Series( + config.variables.initial_values, dtype=polars.Float64 + ), + "lower_bounds": polars.Series( + config.variables.lower_bounds, dtype=polars.Float64 + ), + "upper_bounds": polars.Series( + config.variables.upper_bounds, dtype=polars.Float64 + ), + } + ) + + self.data.objective_functions = polars.DataFrame( + { + "objective_name": config.objectives.names, + "weight": polars.Series( + config.objectives.weights, dtype=polars.Float64 + ), + "normalization": polars.Series( # Q: Is this correct? + [1.0 / s for s in config.objectives.scales], + dtype=polars.Float64, + ), + } + ) + + if config.nonlinear_constraints is not None: + self.data.nonlinear_constraints = polars.DataFrame( + { + "constraint_name": config.nonlinear_constraints.names, + "normalization": [ + 1.0 / s for s in config.nonlinear_constraints.scales + ], # Q: Is this correct? + "constraint_rhs_value": config.nonlinear_constraints.rhs_values, + "constraint_type": config.nonlinear_constraints.types, + } + ) + + self.data.realization_weights = polars.DataFrame( + { + "realization": polars.Series( + config.realizations.names, dtype=polars.UInt32 + ), + "weight": polars.Series( + config.realizations.weights, dtype=polars.Float64 + ), + } + ) + + def _store_function_results(self, results: FunctionResults) -> _EvaluationResults: + # We could select only objective values, + # but we select all to also get the constraint values (if they exist) + realization_objectives = polars.from_pandas( + results.to_dataframe( + "evaluations", + select=["objectives", "evaluation_ids"], + ).reset_index(), + ).select( + "result_id", + "batch_id", + "realization", + "objective", + "objectives", + "evaluation_ids", + ) + + if results.nonlinear_constraints is not None: + realization_constraints = polars.from_pandas( + results.to_dataframe( + "evaluations", + select=["constraints", "evaluation_ids"], + ).reset_index(), + ).select( + "result_id", + "batch_id", + "realization", + "evaluation_ids", + "nonlinear_constraint", + "constraints", ) - for name, weight, scale in zip( - config.objective_functions.names, - config.objective_functions.weights, - config.objective_functions.scales, - ): - self._database.add_function( - name=name, - function_type="OBJECTIVE", - weight=weight, - normalization=1.0 / scale, + realization_constraints = self._rename_ropt_df_columns( + realization_constraints ) - if config.nonlinear_constraints is not None: - for name, scale, rhs_value, constraint_type in zip( - config.nonlinear_constraints.names, - config.nonlinear_constraints.scales, - config.nonlinear_constraints.rhs_values, - config.nonlinear_constraints.types, - ): - self._database.add_function( - name=name, - function_type="CONSTRAINT", - normalization=scale, - rhs_value=rhs_value, - constraint_type=ConstraintType(constraint_type).name.lower(), - ) + batch_constraints = polars.from_pandas( + results.to_dataframe("nonlinear_constraints").reset_index() + ).select( + "result_id", "batch_id", "nonlinear_constraint", "values", "violations" + ) - for name, weight in zip( - config.realizations.names, - config.realizations.weights, - ): - self._database.add_realization(str(name), weight) - - def _add_batch(self, config, controls, perturbed_controls): - self._gradient_ensemble_id += 1 - self._control_ensemble_id = self._gradient_ensemble_id - control_names = _convert_names(config.variables.names) - for control_name, value in zip(control_names, controls): - self._database.add_control_value( - set_id=self._control_ensemble_id, - control_name=control_name, - value=value, + batch_constraints = batch_constraints.rename( + { + "nonlinear_constraint": "constraint_name", + "values": "constraint_value", + "violations": "constraint_violation", + } ) - if perturbed_controls is not None: - perturbed_controls = perturbed_controls.reshape( - perturbed_controls.shape[0], -1 + + constraint_names = batch_constraints["constraint_name"].unique().to_list() + + batch_constraints = batch_constraints.pivot( + on="constraint_name", + values=[ + "constraint_value", + "constraint_violation", + ], + separator=";", + ).rename( + { + **{f"constraint_value;{name}": name for name in constraint_names}, + **{ + f"constraint_violation;{name}": f"{name}.violation" + for name in constraint_names + }, + } + ) + + realization_constraints = realization_constraints.pivot( + values=["constraint_value"], on="constraint_name" ) - self._gradient_ensemble_id = self._control_ensemble_id - for g_idx in range(perturbed_controls.shape[1]): - self._gradient_ensemble_id += 1 - for c_idx, c_name in enumerate(control_names): - self._database.add_control_value( - set_id=self._gradient_ensemble_id, - control_name=c_name, - value=perturbed_controls[c_idx, g_idx], - ) - - def _add_simulations(self, config, result): - self._gradient_ensemble_id = self._control_ensemble_id - simulation_index = count() - if isinstance(result, FunctionResults): - for realization_name in config.realizations.names: - self._database.add_simulation( - realization_name=str(realization_name), - set_id=self._control_ensemble_id, - sim_name=f"{result.batch_id}_{next(simulation_index)}", - is_gradient=False, - ) - if isinstance(result, GradientResults): - for realization_name in config.realizations.names: - for _ in range(config.gradient.number_of_perturbations): - self._gradient_ensemble_id += 1 - self._database.add_simulation( - realization_name=str(realization_name), - set_id=self._gradient_ensemble_id, - sim_name=f"{result.batch_id}_{next(simulation_index)}", - is_gradient=True, - ) - - def _add_simulator_results( - self, config, batch, objective_results, constraint_results - ): - if constraint_results is None: - results = objective_results else: - results = np.vstack((objective_results, constraint_results)) - statuses = np.logical_and.reduce(np.isfinite(results), axis=0) - names = config.objective_functions.names - if config.nonlinear_constraints is not None: - names += config.nonlinear_constraints.names - - for sim_idx, status in enumerate(statuses): - sim_name = f"{batch}_{sim_idx}" - for function_idx, name in enumerate(names): - if status: - self._database.add_simulation_result( - sim_name, results[function_idx, sim_idx], name, 0 - ) - self._database.set_simulation_ended(sim_name, status) - - def _add_constraint_values(self, config, batch, constraint_values): - statuses = np.logical_and.reduce(np.isfinite(constraint_values), axis=0) - for sim_id, status in enumerate(statuses): - if status: - for idx, constraint_name in enumerate( - config.nonlinear_constraints.names - ): - # Note the time_index=0, the database supports storing - # multipel time-points, but we do not support that, so we - # use times_index=0. - self._database.update_simulation_result( - simulation_name=f"{batch}_{sim_id}", - function_name=constraint_name, - times_index=0, - value=constraint_values[idx, sim_id], - ) - - def _add_gradients(self, config, objective_gradients): - for grad_index, gradient in enumerate(objective_gradients): - for control_index, control_name in enumerate( - _convert_names(config.variables.names) - ): - self._database.add_gradient_result( - gradient[control_index], - config.objective_functions.names[grad_index], - 1, - control_name, - ) + batch_constraints = None + realization_constraints = None + + batch_objectives = polars.from_pandas( + results.to_dataframe( + "functions", + select=["objectives", "weighted_objective"], + ).reset_index() + ).select( + "result_id", "batch_id", "objective", "objectives", "weighted_objective" + ) - def _add_total_objective(self, total_objective): - self._database.add_calculation_result( - set_id=self._control_ensemble_id, - object_function_value=total_objective, + realization_controls = polars.from_pandas( + results.to_dataframe( + "evaluations", select=["variables", "evaluation_ids"] + ).reset_index() + ).select( + "result_id", + "batch_id", + "variable", + "realization", + "variables", + "evaluation_ids", ) - def _convert_constraints(self, config, constraint_results): - constraint_results = copy.deepcopy(constraint_results) - rhs_values = config.nonlinear_constraints.rhs_values - for idx, constraint_type in enumerate(config.nonlinear_constraints.types): - constraint_results[idx] -= rhs_values[idx] - if constraint_type == ConstraintType.LE: - constraint_results[idx] *= -1.0 - return constraint_results - - def _store_results(self, config, results): - if isinstance(results, FunctionResults): - objective_results = results.evaluations.objectives - objective_results = np.moveaxis(objective_results, -1, 0) - constraint_results = results.evaluations.constraints - if constraint_results is not None: - constraint_results = np.moveaxis(constraint_results, -1, 0) - else: - objective_results = None - constraint_results = None - - if isinstance(results, GradientResults): - perturbed_variables = results.evaluations.perturbed_variables - perturbed_variables = np.moveaxis(perturbed_variables, -1, 0) - perturbed_objectives = results.evaluations.perturbed_objectives - perturbed_objectives = np.moveaxis(perturbed_objectives, -1, 0) - perturbed_constraints = results.evaluations.perturbed_constraints - if perturbed_constraints is not None: - perturbed_constraints = np.moveaxis(perturbed_constraints, -1, 0) - else: - perturbed_variables = None - perturbed_objectives = None - perturbed_constraints = None + realization_controls = self._rename_ropt_df_columns(realization_controls) + realization_controls = self._enforce_dtypes(realization_controls) + + realization_controls = realization_controls.pivot( + on="control_name", + values=["control_value"], + separator=":", + ) + + batch_objectives = self._rename_ropt_df_columns(batch_objectives) + batch_objectives = self._enforce_dtypes(batch_objectives) + + realization_objectives = self._rename_ropt_df_columns(realization_objectives) + realization_objectives = self._enforce_dtypes(realization_objectives) - self._add_batch(config, results.evaluations.variables, perturbed_variables) - self._add_simulations(config, results) + batch_objectives = batch_objectives.pivot( + on="objective_name", + values=["objective_value"], + separator=":", + ) + + realization_objectives = realization_objectives.pivot( + values="objective_value", + index=[ + "result_id", + "batch_id", + "realization", + "simulation_id", + ], + columns="objective_name", + ) - # Convert back the simulation results to the legacy format: - if perturbed_objectives is not None: - perturbed_objectives = perturbed_objectives.reshape( - perturbed_objectives.shape[0], -1 + return { + "realization_controls": realization_controls, + "batch_objectives": batch_objectives, + "realization_objectives": realization_objectives, + "batch_constraints": batch_constraints, + "realization_constraints": realization_constraints, + } + + def _store_gradient_results(self, results: GradientResults) -> _GradientResults: + perturbation_objectives = polars.from_pandas( + results.to_dataframe("evaluations").reset_index() + ).select( + [ + "result_id", + "batch_id", + "variable", + "realization", + "perturbation", + "objective", + "variables", + "perturbed_variables", + "perturbed_objectives", + "perturbed_evaluation_ids", + *( + ["nonlinear_constraint", "perturbed_constraints"] + if results.evaluations.perturbed_constraints is not None + else [] + ), + ] + ) + perturbation_objectives = self._rename_ropt_df_columns(perturbation_objectives) + + if results.gradients is not None: + batch_objective_gradient = polars.from_pandas( + results.to_dataframe("gradients").reset_index() + ).select( + [ + "result_id", + "batch_id", + "variable", + "objective", + "weighted_objective", + "objectives", + *( + ["nonlinear_constraint", "constraints"] + if results.gradients.constraints is not None + else [] + ), + ] + ) + batch_objective_gradient = self._rename_ropt_df_columns( + batch_objective_gradient + ) + batch_objective_gradient = self._enforce_dtypes(batch_objective_gradient) + else: + batch_objective_gradient = None + + if results.evaluations.perturbed_constraints is not None: + perturbation_constraints = ( + perturbation_objectives[ + "result_id", + "batch_id", + "realization", + "perturbation", + "control_name", + "perturbed_control_value", + *[ + c + for c in perturbation_objectives.columns + if "constraint" in c.lower() + ], + ] + .pivot(on="constraint_name", values=["perturbed_constraint_value"]) + .pivot(on="control_name", values="perturbed_control_value") ) - if objective_results is None: - objective_results = perturbed_objectives - else: - objective_results = np.hstack((objective_results, perturbed_objectives)) - if config.nonlinear_constraints is not None: - if perturbed_constraints is not None: - perturbed_constraints = perturbed_constraints.reshape( - perturbed_constraints.shape[0], -1 + if batch_objective_gradient is not None: + batch_constraint_gradient = batch_objective_gradient[ + "result_id", + "batch_id", + "control_name", + *[ + c + for c in batch_objective_gradient.columns + if "constraint" in c.lower() + ], + ] + + batch_objective_gradient = batch_objective_gradient.drop( + [ + c + for c in batch_objective_gradient.columns + if "constraint" in c.lower() + ] + ).unique() + + batch_constraint_gradient = batch_constraint_gradient.pivot( + on="constraint_name", + values=["constraint_value"], ) - if constraint_results is None: - constraint_results = perturbed_constraints - else: - constraint_results = np.hstack( - (constraint_results, perturbed_constraints) - ) - # The legacy code converts all constraints to the form f(x) >=0: - constraint_results = self._convert_constraints(config, constraint_results) - - self._add_simulator_results( - config, results.batch_id, objective_results, constraint_results + else: + batch_constraint_gradient = None + + perturbation_objectives = perturbation_objectives.drop( + [ + c + for c in perturbation_objectives.columns + if "constraint" in c.lower() + ] + ).unique() + else: + batch_constraint_gradient = None + perturbation_constraints = None + + perturbation_objectives = perturbation_objectives.drop( + "perturbed_evaluation_ids", "control_value" ) - if config.nonlinear_constraints: - self._add_constraint_values(config, results.batch_id, constraint_results) - if isinstance(results, FunctionResults) and results.functions is not None: - self._add_total_objective(results.functions.weighted_objective) - if isinstance(results, GradientResults) and results.gradients is not None: - self._add_gradients(config, results.gradients.objectives) - def _handle_finished_batch_event(self, event): - logger.debug("Storing batch results in the sqlite database") + perturbation_objectives = perturbation_objectives.pivot( + on="objective_name", values="perturbed_objective_value" + ) + + perturbation_objectives = perturbation_objectives.pivot( + on="control_name", values="perturbed_control_value" + ) + + if batch_objective_gradient is not None: + objective_names = ( + batch_objective_gradient["objective_name"].unique().to_list() + ) + batch_objective_gradient = batch_objective_gradient.pivot( + on="objective_name", + values=["objective_value", "total_objective_value"], + separator=";", + ).rename( + { + **{f"objective_value;{name}": name for name in objective_names}, + **{ + f"total_objective_value;{name}": f"{name}.total" + for name in objective_names + }, + } + ) + + return { + "batch_objective_gradient": batch_objective_gradient, + "perturbation_objectives": perturbation_objectives, + "batch_constraint_gradient": batch_constraint_gradient, + "perturbation_constraints": perturbation_constraints, + } + + def _on_batch_evaluation_finished(self, event: Event) -> None: + logger.debug("Storing batch results dataframes") converted_results = tuple( - convert_to_maximize(result) for result in event.results) - results = [] + convert_to_maximize(result) for result in event.results + ) + results: list[FunctionResults | GradientResults] = [] + best_value = -np.inf best_results = None for item in converted_results: if isinstance(item, GradientResults): results.append(item) if ( - isinstance(item, FunctionResults) - and item.functions is not None - and item.functions.weighted_objective > best_value + isinstance(item, FunctionResults) + and item.functions is not None + and item.functions.weighted_objective > best_value ): best_value = item.functions.weighted_objective best_results = item + if best_results is not None: - results = [best_results] + results - last_batch = -1 - for item in results: - if item.batch_id != last_batch: - self._database.add_batch() - self._store_results(event.config, item) - if item.batch_id != last_batch: - self._database.set_batch_ended - last_batch = item.batch_id + results = [best_results, *results] - self._database.set_batch_ended(time.time(), True) + batch_dicts = {} + for item in results: + if item.batch_id not in batch_dicts: + batch_dicts[item.batch_id] = {} - # Merit values are dakota specific, load them if the output file exists: - self._database.update_calculation_result(_get_merit_values(self._merit_file)) + if isinstance(item, FunctionResults): + eval_results = self._store_function_results(item) + batch_dicts[item.batch_id].update(eval_results) - backup_data(self._database.location) + if isinstance(item, GradientResults): + gradient_results = self._store_gradient_results(item) + batch_dicts[item.batch_id].update(gradient_results) + + for batch_id, batch_dict in batch_dicts.items(): + self.data.batches.append( + BatchDataFrames( + batch_id=batch_id, + realization_controls=batch_dict.get("realization_controls"), + batch_objectives=batch_dict.get("batch_objectives"), + realization_objectives=batch_dict.get("realization_objectives"), + batch_constraints=batch_dict.get("batch_constraints"), + realization_constraints=batch_dict.get("realization_constraints"), + batch_objective_gradient=batch_dict.get("batch_objective_gradient"), + perturbation_objectives=batch_dict.get("perturbation_objectives"), + batch_constraint_gradient=batch_dict.get( + "batch_constraint_gradient" + ), + perturbation_constraints=batch_dict.get("perturbation_constraints"), + ) + ) - def _handle_finished_event(self, event): - logger.debug("Storing final results in the sqlite database") - self._database.update_calculation_result(_get_merit_values(self._merit_file)) - self._database.set_experiment_ended(time.time()) + def _on_optimization_finished(self, _) -> None: + logger.debug("Storing final results Everest storage") + + merit_values = self._get_merit_values() + if merit_values: + # NOTE: Batch 0 is always an "accepted batch", and "accepted batches" are + # batches with merit_flag , which means that it was an improvement + self.data.batches[0].is_improvement = True + + improvement_batches = [ + b for b in self.data.batches if b.batch_objectives is not None + ][1:] + for i, b in enumerate(improvement_batches): + merit_value = next( + (m.value for m in merit_values if (m.iter - 1) == i), None + ) + if merit_value is None: + continue - def _set_event_handlers(self, optimizer): - optimizer.add_observer( - EventType.START_OPTIMIZER_STEP, self._initialize - ) - optimizer.add_observer( - EventType.FINISHED_EVALUATION, self._handle_finished_batch_event - ) - optimizer.add_observer( - EventType.FINISHED_OPTIMIZER_STEP, - self._handle_finished_event, + b.batch_objectives = b.batch_objectives.with_columns( + polars.lit(merit_value).alias("merit_value") + ) + b.is_improvement = True + else: + max_total_objective = -np.inf + for b in self.data.batches: + if b.batch_objectives is not None: + total_objective = b.batch_objectives["total_objective_value"].item() + if total_objective > max_total_objective: + b.is_improvement = True + max_total_objective = total_objective + + self.write_to_output_dir() + + def get_optimal_result(self) -> OptimalResult | None: + # Only used in tests, but re-created to ensure + # same behavior as w/ old SEBA setup + has_merit = any( + "merit_value" in b.batch_objectives.columns + for b in self.data.batches + if b.batch_objectives is not None ) - def get_optimal_result(self): - snapshot = SebaSnapshot(self._output_dir) - optimum = next( - ( - data - for data in reversed(snapshot.get_optimization_data()) - if data.merit_flag - ), - None, - ) - if optimum is None: + def find_best_batch( + filter_by, sort_by + ) -> tuple[BatchDataFrames | None, dict | None]: + matching_batches = [b for b in self.data.batches if filter_by(b)] + + if not matching_batches: + return None, None + + matching_batches.sort(key=sort_by) + batch = matching_batches[0] + controls_dict = batch.realization_controls.drop( + [ + "result_id", + "batch_id", + "simulation_id", + "realization", + ] + ).to_dicts()[0] + + return batch, controls_dict + + if has_merit: + # Minimize merit + batch, controls_dict = find_best_batch( + filter_by=lambda b: ( + b.batch_objectives is not None + and "merit_value" in b.batch_objectives.columns + ), + sort_by=lambda b: b.batch_objectives.select( + polars.col("merit_value").min() + ).item(), + ) + + if batch is None: + return None + + return OptimalResult( + batch=batch.batch_id, + controls=controls_dict, + total_objective=batch.batch_objectives.select( + polars.col("total_objective_value").sample(n=1) + ).item(), + ) + else: + # Maximize objective + batch, controls_dict = find_best_batch( + filter_by=lambda b: b.batch_objectives is not None + and not b.batch_objectives.is_empty(), + sort_by=lambda b: -b.batch_objectives.select( + polars.col("total_objective_value").sample(n=1) + ).item(), + ) + + if batch is None: + return None + + return OptimalResult( + batch=batch.batch_id, + controls=controls_dict, + total_objective=batch.batch_objectives.select( + polars.col("total_objective_value") + ).item(), + ) + + def _get_merit_values(self) -> list[_MeritValue]: + # Read the file containing merit information. + # The file should contain the following table header + # Iter F(x) mu alpha Merit feval btracks Penalty + # :return: merit values indexed by the function evaluation number + + def _get_merit_fn_lines(merit_path: str) -> list[str]: + if os.path.isfile(merit_path): + with open(merit_path, errors="replace", encoding="utf-8") as reader: + lines = reader.readlines() + start_line_idx = -1 + for inx, line in enumerate(lines): + if "Merit" in line and "feval" in line: + start_line_idx = inx + 1 + if start_line_idx > -1 and line.startswith("="): + return lines[start_line_idx:inx] + if start_line_idx > -1: + return lines[start_line_idx:] + return [] + + def _parse_merit_line(merit_values_string: str) -> tuple[int, float] | None: + values = [] + for merit_elem in merit_values_string.split(): + try: + values.append(float(merit_elem)) + except ValueError: + for elem in merit_elem.split("0x")[1:]: + values.append(float.fromhex("0x" + elem)) + if len(values) == 8: + # Dakota starts counting at one, correct to be zero-based. + return int(values[5]) - 1, values[4] return None - objectives = snapshot.get_snapshot(batches=[optimum.batch_id]) - return OptimalResult( - batch=optimum.batch_id, - controls=optimum.controls, - total_objective=optimum.objective_value, - expected_objectives={ - name:value[0] for name, value in objectives.expected_objectives.items() - }, - ) + merit_values = [] + if self._merit_file.exists(): + for line in _get_merit_fn_lines(self._merit_file): + value = _parse_merit_line(line) + if value is not None: + merit_values.append(_MeritValue(iter=value[0], value=value[1])) -def backup_data(database_location) -> None: - src = sqlite3.connect(database_location) - dst = sqlite3.connect(database_location + ".backup") - with dst: - src.backup(dst) - src.close() - dst.close() - - -def _get_merit_fn_lines(merit_path): - if os.path.isfile(merit_path): - with open(merit_path, "r", errors="replace", encoding="utf-8") as reader: - lines = reader.readlines() - start_line_idx = -1 - for inx, line in enumerate(lines): - if "Merit" in line and "feval" in line: - start_line_idx = inx + 1 - if start_line_idx > -1 and line.startswith("="): - return lines[start_line_idx:inx] - if start_line_idx > -1: - return lines[start_line_idx:] - return [] - - -def _parse_merit_line(merit_values_string): - values = [] - for merit_elem in merit_values_string.split(): - try: - values.append(float(merit_elem)) - except ValueError: - for elem in merit_elem.split("0x")[1:]: - values.append(float.fromhex("0x" + elem)) - if len(values) == 8: - # Dakota starts counting at one, correct to be zero-based. - return int(values[5]) - 1, values[4] - return None - - -def _get_merit_values(merit_file): - # Read the file containing merit information. - # The file should contain the following table header - # Iter F(x) mu alpha Merit feval btracks Penalty - # :return: merit values indexed by the function evaluation number - # example: - # 0: merit_value_0 - # 1: merit_value_1 - # 2 merit_value_2 - # ... - # ] - merit_values = [] - if merit_file.exists(): - for line in _get_merit_fn_lines(merit_file): - value = _parse_merit_line(line) - if value is not None: - merit_values.append({"iter":value[0], "value":value[1]}) - return merit_values + return merit_values diff --git a/tests/everest/snapshots/test_api_snapshots/test_api_snapshots/config_multiobj.yml/snapshot.json b/tests/everest/snapshots/test_api_snapshots/test_api_snapshots/config_multiobj.yml/snapshot.json index 2670e0de601..c5c346ba775 100644 --- a/tests/everest/snapshots/test_api_snapshots/test_api_snapshots/config_multiobj.yml/snapshot.json +++ b/tests/everest/snapshots/test_api_snapshots/test_api_snapshots/config_multiobj.yml/snapshot.json @@ -69,27 +69,27 @@ }, { "batch": 0, - "control": "point_y", - "function": "distance_p", - "value": 0.98866227 + "control": "point_x", + "function": "distance_q", + "value": -3.00456477 }, { "batch": 0, - "control": "point_z", + "control": "point_y", "function": "distance_p", - "value": 1.00465235 + "value": 0.98866227 }, { "batch": 0, - "control": "point_x", + "control": "point_y", "function": "distance_q", - "value": -3.00456477 + "value": -3.011388 }, { "batch": 0, - "control": "point_y", - "function": "distance_q", - "value": -3.011388 + "control": "point_z", + "function": "distance_p", + "value": 1.00465235 }, { "batch": 0, From 3638a0b6c111f7dd209db689b3a7750508e03aa5 Mon Sep 17 00:00:00 2001 From: "Yngve S. Kristiansen" Date: Fri, 10 Jan 2025 13:58:16 +0100 Subject: [PATCH 03/13] Remove everexport --- docs/everest/cli.rst | 73 +--- src/everest/__init__.py | 6 - src/everest/api/everest_data_api.py | 12 +- src/everest/bin/everexport_script.py | 27 +- src/everest/bin/utils.py | 37 +- src/everest/config/__init__.py | 2 - src/everest/config/everest_config.py | 22 -- src/everest/detached/jobs/everserver.py | 30 -- src/everest/export.py | 365 ------------------ tests/everest/conftest.py | 2 - tests/everest/entry_points/test_everexport.py | 311 --------------- .../functional/test_main_everest_entry.py | 7 +- tests/everest/test_everlint.py | 17 - tests/everest/test_export.py | 353 ----------------- tests/everest/test_math_func.py | 117 ------ 15 files changed, 10 insertions(+), 1371 deletions(-) delete mode 100644 tests/everest/entry_points/test_everexport.py delete mode 100644 tests/everest/test_export.py diff --git a/docs/everest/cli.rst b/docs/everest/cli.rst index bdd699e1db5..8229da37c56 100644 --- a/docs/everest/cli.rst +++ b/docs/everest/cli.rst @@ -52,78 +52,7 @@ Using again the command `everest monitor config_file.yml`, will reattach to the Everest `export` ================ -.. argparse:: - :module: everest.bin.everexport_script - :func: _build_args_parser - :prog: everexport_entry - - -The everest export functionality is configured in the export section of the config file. -The following represents an export section a config file set with default values. - -.. code-block:: yaml - - export: - skip_export: False - keywords: - batches: - discard_gradient: True # Export only non-gradient simulations - discard_rejected: True # Export only increased merit simulations - csv_output_filepath: everest_output_folder/config_file.csv - -When the export command `everest export config_file.yml` is run with a config file that does not define an export section default values will be used, a `config_file.csv` file in the Everest output folder will be created. -By default Everest exports only non-gradient with increased merit simulations when no config section is defined in the config file. -The file will contain optimization data for all the optimization batches and the available eclipse keywords (if a data file is available) for only the non-gradient simulations and the simulations that increase merit. - -**Examples** - -* Export only non-gradient simulation using the following export section in the config file - -.. code-block:: yaml - - export: - discard_rejected: False - -* Export only increased merit simulation using the following export section in the config file - -.. code-block:: yaml - - export: - discard_gradient: False - - -* Export only a list of available batches even if they are gradient batches and if no export section is defined. - - everest export config_file.yml --batches 0 2 4 - -The command above is equivalent to having the following export section defined in the config file `config_file.yml`. - -.. code-block:: yaml - - export: - batches: [0, 2, 4] - -* Exporting just a specific list of eclipse keywords requires the following export section defined in the config file. - -.. code-block:: yaml - - export: - keywords: ['FOIP', 'FOPT'] - -* Skip export by adding the following section in the config file. - -.. code-block:: yaml - - export: - skip_export: True - -* Export will also be skipped if an empty list of batches is defined in the export section. - -.. code-block:: yaml - - export: - batches: [] - +The everest export has been removed. All data is now always exported to the optimization output directory. ============== Everest `lint` diff --git a/src/everest/__init__.py b/src/everest/__init__.py index b4c85c59128..f9646f32c68 100644 --- a/src/everest/__init__.py +++ b/src/everest/__init__.py @@ -21,18 +21,12 @@ __version__ = "0.0.0" from everest import detached, jobs, templates, util -from everest.bin.utils import export_to_csv, export_with_progress from everest.config_keys import ConfigKeys -from everest.export import MetaDataColumnNames, filter_data __author__ = "Equinor ASA and TNO" __all__ = [ "ConfigKeys", - "MetaDataColumnNames", "detached", - "export_to_csv", - "export_with_progress", - "filter_data", "jobs", "load", "templates", diff --git a/src/everest/api/everest_data_api.py b/src/everest/api/everest_data_api.py index e8e53eb3084..ff90bbdd4f8 100644 --- a/src/everest/api/everest_data_api.py +++ b/src/everest/api/everest_data_api.py @@ -5,8 +5,7 @@ from ropt.enums import ConstraintType from ert.storage import open_storage -from everest.config import EverestConfig, ServerConfig -from everest.detached import ServerStatus, everserver_status +from everest.config import EverestConfig from everest.everest_storage import EverestStorage @@ -239,12 +238,3 @@ def summary_values(self, batches=None, keys=None): @property def output_folder(self): return self._config.output_dir - - @property - def everest_csv(self): - status_path = ServerConfig.get_everserver_status_path(self._config.output_dir) - state = everserver_status(status_path) - if state["status"] == ServerStatus.completed: - return self._config.export_path - else: - return None diff --git a/src/everest/bin/everexport_script.py b/src/everest/bin/everexport_script.py index a7f0e15e60d..500e59f02b1 100755 --- a/src/everest/bin/everexport_script.py +++ b/src/everest/bin/everexport_script.py @@ -4,10 +4,7 @@ import logging from functools import partial -from everest import export_to_csv, export_with_progress from everest.config import EverestConfig -from everest.config.export_config import ExportConfig -from everest.export import check_for_errors from everest.strings import EVEREST @@ -22,27 +19,9 @@ def everexport_entry(args=None): config = options.config_file - # Turn into .export once - # explicit None is disallowed - if config.export is None: - config.export = ExportConfig() - - if options.batches is not None: - batch_list = [int(item) for item in options.batches] - config.export.batches = batch_list - - err_msgs, export_ecl = check_for_errors( - config=config.export, - optimization_output_path=config.optimization_output_dir, - storage_path=config.storage_dir, - data_file_path=config.model.data_file, - ) - for msg in err_msgs: - logger.warning(msg) - - export_to_csv( - data_frame=export_with_progress(config, export_ecl), - export_path=config.export_path, + logger.info("Everexport deprecation warning seen") + print( + f"Everexport is deprecated, optimization results already exist @ {config.optimization_output_dir}" ) diff --git a/src/everest/bin/utils.py b/src/everest/bin/utils.py index 19971b9adfe..ef73ce988ec 100644 --- a/src/everest/bin/utils.py +++ b/src/everest/bin/utils.py @@ -8,10 +8,8 @@ import colorama from colorama import Fore -from pandas import DataFrame from ert.resources import all_shell_script_fm_steps -from everest.config import EverestConfig from everest.detached import ( OPT_PROGRESS_ID, SIM_PROGRESS_ID, @@ -20,41 +18,9 @@ get_opt_status, start_monitor, ) -from everest.export import export_data from everest.simulator import JOB_FAILURE, JOB_RUNNING, JOB_SUCCESS from everest.strings import EVEREST -try: - from progressbar import AdaptiveETA, Bar, Percentage, ProgressBar, Timer -except ImportError: - ProgressBar = None # type: ignore - - -def export_with_progress(config: EverestConfig, export_ecl=True): - logging.getLogger(EVEREST).info("Exporting results to csv ...") - if ProgressBar is not None: - widgets = [Percentage(), " ", Bar(), " ", Timer(), " ", AdaptiveETA()] - with ProgressBar(max_value=1, widgets=widgets) as bar: - return export_data( - export_config=config.export, - output_dir=config.output_dir, - data_file=config.model.data_file if config.model else None, - export_ecl=export_ecl, - progress_callback=bar.update, - ) - return export_data( - export_config=config.export, - output_dir=config.output_dir, - data_file=config.model.data_file if config.model else None, - export_ecl=export_ecl, - ) - - -def export_to_csv(data_frame: DataFrame, export_path: str) -> None: - os.makedirs(os.path.dirname(export_path), exist_ok=True) - data_frame.to_csv(export_path, sep=";", index=False) - logging.getLogger(EVEREST).info(f"Data exported to {export_path}") - def handle_keyboard_interrupt(signal, frame, options): print("\n" + "=" * 80) @@ -335,6 +301,5 @@ def report_on_previous_run( f"Optimization completed.\n" "\nTo re-run the optimization use command:\n" f" `everest run --new-run {config_file}`\n" - "To export the results use command:\n" - f" `everest export {config_file}`" + f"Results are stored in {optimization_output_dir}" ) diff --git a/src/everest/config/__init__.py b/src/everest/config/__init__.py index 4ce9c72f7e5..4f722f14f7d 100644 --- a/src/everest/config/__init__.py +++ b/src/everest/config/__init__.py @@ -6,7 +6,6 @@ from .cvar_config import CVaRConfig from .environment_config import EnvironmentConfig from .everest_config import EverestConfig, EverestValidationError -from .export_config import ExportConfig from .input_constraint_config import InputConstraintConfig from .install_data_config import InstallDataConfig from .install_job_config import InstallJobConfig @@ -29,7 +28,6 @@ "EnvironmentConfig", "EverestConfig", "EverestValidationError", - "ExportConfig", "InputConstraintConfig", "InstallDataConfig", "InstallJobConfig", diff --git a/src/everest/config/everest_config.py b/src/everest/config/everest_config.py index f41f00fb736..69ece8aedec 100644 --- a/src/everest/config/everest_config.py +++ b/src/everest/config/everest_config.py @@ -722,28 +722,6 @@ def function_aliases(self) -> dict[str, str]: aliases[f"{constraint.name}:upper"] = constraint.name return aliases - @property - def export_path(self): - """Returns the export file path. If not file name is provide the default - export file name will have the same name as the config file, with the '.csv' - extension.""" - - export = self.export - - output_path = None - if export is not None: - output_path = export.csv_output_filepath - - if output_path is None: - output_path = "" - - full_file_path = os.path.join(self.output_dir, output_path) - if output_path: - return full_file_path - else: - default_export_file = f"{os.path.splitext(self.config_file)[0]}.csv" - return os.path.join(full_file_path, default_export_file) - def to_dict(self) -> dict: the_dict = self.model_dump(exclude_none=True) diff --git a/src/everest/detached/jobs/everserver.py b/src/everest/detached/jobs/everserver.py index a8a9402911a..2725350465a 100755 --- a/src/everest/detached/jobs/everserver.py +++ b/src/everest/detached/jobs/everserver.py @@ -34,10 +34,8 @@ from ert.config.parsing.queue_system import QueueSystem from ert.ensemble_evaluator import EvaluatorServerConfig from ert.run_models.everest_run_model import EverestExitCode, EverestRunModel -from everest import export_to_csv, export_with_progress from everest.config import EverestConfig, ServerConfig from everest.detached import ServerStatus, get_opt_status, update_everserver_status -from everest.export import check_for_errors from everest.plugins.everest_plugin_manager import EverestPluginManager from everest.simulator import JOB_FAILURE from everest.strings import ( @@ -333,34 +331,6 @@ def main(): ) return - try: - # Exporting data - update_everserver_status(status_path, ServerStatus.exporting_to_csv) - - if config.export is not None: - err_msgs, export_ecl = check_for_errors( - config=config.export, - optimization_output_path=config.optimization_output_dir, - storage_path=config.storage_dir, - data_file_path=config.model.data_file, - ) - for msg in err_msgs: - logging.getLogger(EVEREST).warning(msg) - else: - export_ecl = True - - export_to_csv( - data_frame=export_with_progress(config, export_ecl), - export_path=config.export_path, - ) - except: - update_everserver_status( - status_path, - ServerStatus.failed, - message=traceback.format_exc(), - ) - return - update_everserver_status(status_path, ServerStatus.completed, message=message) diff --git a/src/everest/export.py b/src/everest/export.py index 318a2cef54c..e69de29bb2d 100644 --- a/src/everest/export.py +++ b/src/everest/export.py @@ -1,365 +0,0 @@ -import os -import re -from enum import StrEnum -from typing import Any - -import pandas as pd -from pandas import DataFrame -from seba_sqlite.snapshot import SebaSnapshot - -from ert.storage import open_storage -from everest.config import ExportConfig -from everest.strings import STORAGE_DIR - - -class MetaDataColumnNames(StrEnum): - # NOTE: Always add a new column name to the list below! - BATCH = "batch" - REALIZATION = "realization" - REALIZATION_WEIGHT = "realization_weight" - SIMULATION = "simulation" - IS_GRADIENT = "is_gradient" - SUCCESS = "success" - START_TIME = "start_time" - END_TIME = "end_time" - SIM_AVERAGED_OBJECTIVE = "sim_avg_obj" - REAL_AVERAGED_OBJECTIVE = "real_avg_obj" - SIMULATED_DATE = "sim_date" - INCREASED_MERIT = "increased_merit" - - @classmethod - def get_all(cls): - return [ - cls.BATCH, - cls.REALIZATION, - cls.REALIZATION_WEIGHT, - cls.SIMULATION, - cls.IS_GRADIENT, - cls.SUCCESS, - cls.START_TIME, - cls.END_TIME, - cls.SIM_AVERAGED_OBJECTIVE, - cls.REAL_AVERAGED_OBJECTIVE, - cls.SIMULATED_DATE, - cls.INCREASED_MERIT, - ] - - -def filter_data(data: DataFrame, keyword_filters: set[str]): - filtered_columns = [] - - for col in data.columns: - for expr in keyword_filters: - expr = expr.replace("*", ".*") - if re.match(expr, col) is not None: - filtered_columns.append(col) - - return data[filtered_columns] - - -def available_batches(optimization_output_dir: str) -> set[int]: - snapshot = SebaSnapshot(optimization_output_dir).get_snapshot( - filter_out_gradient=False, batches=None - ) - return {data.batch for data in snapshot.simulation_data} - - -def export_metadata(config: ExportConfig | None, optimization_output_dir: str): - discard_gradient = True - discard_rejected = True - batches = None - - if config: - if config.discard_gradient is not None: - discard_gradient = config.discard_gradient - - if config.discard_rejected is not None: - discard_rejected = config.discard_rejected - - if config.batches: - # If user defined batches to export in the conf file, ignore previously - # discard gradient and discard rejected flags if defined and true - discard_rejected = False - discard_gradient = False - batches = config.batches - - snapshot = SebaSnapshot(optimization_output_dir).get_snapshot( - filter_out_gradient=discard_gradient, - batches=batches, - ) - - opt_data = snapshot.optimization_data_by_batch - metadata = [] - for data in snapshot.simulation_data: - # If export section not defined in the config file export only increased - # merit non-gradient simulation results - if ( - discard_rejected - and data.batch in opt_data - and opt_data[data.batch].merit_flag != 1 - ): - continue - - md_row: dict[str, Any] = { - MetaDataColumnNames.BATCH: data.batch, - MetaDataColumnNames.SIM_AVERAGED_OBJECTIVE: data.sim_avg_obj, - MetaDataColumnNames.IS_GRADIENT: data.is_gradient, - MetaDataColumnNames.REALIZATION: int(data.realization), - MetaDataColumnNames.START_TIME: data.start_time, - MetaDataColumnNames.END_TIME: data.end_time, - MetaDataColumnNames.SUCCESS: data.success, - MetaDataColumnNames.REALIZATION_WEIGHT: data.realization_weight, - MetaDataColumnNames.SIMULATION: int(data.simulation), - } - if data.objectives: - md_row.update(data.objectives) - if data.constraints: - md_row.update(data.constraints) - if data.controls: - md_row.update(data.controls) - - if not md_row[MetaDataColumnNames.IS_GRADIENT]: - if md_row[MetaDataColumnNames.BATCH] in opt_data: - opt = opt_data[md_row[MetaDataColumnNames.BATCH]] - md_row.update( - { - MetaDataColumnNames.REAL_AVERAGED_OBJECTIVE: opt.objective_value, - MetaDataColumnNames.INCREASED_MERIT: opt.merit_flag, - } - ) - for function, gradients in opt.gradient_info.items(): - for control, gradient_value in gradients.items(): - md_row.update( - {f"gradient-{function}-{control}": gradient_value} - ) - else: - print( - f"Batch {md_row[MetaDataColumnNames.BATCH]} has no available optimization data" - ) - metadata.append(md_row) - - return metadata - - -def get_internalized_keys( - config: ExportConfig, - storage_path: str, - optimization_output_path: str, - batch_ids: set[int] | None = None, -): - if batch_ids is None: - metadata = export_metadata(config, optimization_output_path) - batch_ids = {data[MetaDataColumnNames.BATCH] for data in metadata} - internal_keys: set = set() - with open_storage(storage_path, "r") as storage: - for batch_id in batch_ids: - experiments = [*storage.experiments] - assert len(experiments) == 1 - experiment = experiments[0] - - ensemble = experiment.get_ensemble_by_name(f"batch_{batch_id}") - if not internal_keys: - internal_keys = set( - ensemble.experiment.response_type_to_response_keys["summary"] - ) - else: - internal_keys = internal_keys.intersection( - set(ensemble.experiment.response_type_to_response_keys["summary"]) - ) - - return internal_keys - - -def check_for_errors( - config: ExportConfig, - optimization_output_path: str, - storage_path: str, - data_file_path: str | None, -): - """ - Checks for possible errors when attempting to export current optimization - case. - """ - export_ecl = True - export_errors: list[str] = [] - - if config.batches: - available_batches_ = available_batches(optimization_output_path) - for batch in set(config.batches).difference(available_batches_): - export_errors.append( - f"Batch {batch} not found in optimization " - "results. Skipping for current export." - ) - config.batches = list(set(config.batches).intersection(available_batches_)) - - if config.batches == []: - export_errors.append( - "No batches selected for export. Only optimization data will be exported." - ) - return export_errors, False - - if not data_file_path: - export_ecl = False - export_errors.append( - "No data file found in config.Only optimization data will be exported." - ) - - # If no user defined keywords are present it is no longer possible to check - # availability in internal storage - if config.keywords is None: - return export_errors, export_ecl - - if not config.keywords: - export_ecl = False - export_errors.append( - "No eclipse keywords selected for export. Only" - " optimization data will be exported." - ) - - internal_keys = get_internalized_keys( - config=config, - storage_path=storage_path, - optimization_output_path=optimization_output_path, - batch_ids=set(config.batches) if config.batches else None, - ) - - extra_keys = set(config.keywords).difference(set(internal_keys)) - if extra_keys: - export_ecl = False - export_errors.append( - f"Non-internalized ecl keys selected for export '{' '.join(extra_keys)}'." - " in order to internalize missing keywords " - f"run 'everest load '. " - "Only optimization data will be exported." - ) - - return export_errors, export_ecl - - -def export_data( - export_config: ExportConfig | None, - output_dir: str, - data_file: str | None, - export_ecl=True, - progress_callback=lambda _: None, -): - """Export everest data into a pandas dataframe. If the config specifies - a data_file and @export_ecl is True, simulation data is included. When - exporting simulation data, only keywords matching elements in @ecl_keywords - are exported. Note that wildcards are allowed. - - @progress_callback will be called with a number between 0 and 1 indicating - the fraction of batches that has been loaded. - """ - - ecl_keywords = None - # If user exports with a config file that has the SKIP_EXPORT - # set to true export nothing - if export_config is not None: - if export_config.skip_export or export_config.batches == []: - return pd.DataFrame([]) - - ecl_keywords = export_config.keywords - optimization_output_dir = os.path.join( - os.path.abspath(output_dir), "optimization_output" - ) - metadata = export_metadata(export_config, optimization_output_dir) - if data_file is None or not export_ecl: - return pd.DataFrame(metadata) - - data = load_simulation_data( - output_path=output_dir, - metadata=metadata, - progress_callback=progress_callback, - ) - - if ecl_keywords is not None: - keywords = tuple(ecl_keywords) - # NOTE: Some of these keywords are necessary to export successfully, - # we should not leave this to the user - keywords += tuple(pd.DataFrame(metadata).columns) - keywords += tuple(MetaDataColumnNames.get_all()) - keywords_set = set(keywords) - data = filter_data(data, keywords_set) - - return data - - -def load_simulation_data( - output_path: str, metadata: list[dict], progress_callback=lambda _: None -): - """Export simulations to a pandas DataFrame - @output_path optimization output folder path. - @metadata is a one ora a list of dictionaries. Keys from the dictionary become - columns in the resulting dataframe. The values from the dictionary are - assigned to those columns for the corresponding simulation. - If a column is defined for some simulations but not for others, the value - for that column is set to NaN for simulations without it - - For instance, assume we have 2 simulations and - tags = [ {'geoid': 0, 'sim': 'ro'}, - {'geoid': 2, 'sim': 'pi', 'best': True }, - ] - And assume exporting each of the two simulations produces 3 rows. - The resulting dataframe will be something like - geoid sim best data... - 0 0 ro sim_0_row_0... - 1 0 ro sim_0_row_1... - 2 0 ro sim_0_row_2... - 3 2 pi True sim_1_row_0... - 4 2 pi True sim_2_row_0... - 5 2 pi True sim_3_row_0... - """ - ens_path = os.path.join(output_path, STORAGE_DIR) - with open_storage(ens_path, "r") as storage: - # pylint: disable=unnecessary-lambda-assignment - def load_batch_by_id(): - experiments = [*storage.experiments] - - # Always assume 1 experiment per simulation/enspath, never multiple - assert len(experiments) == 1 - experiment = experiments[0] - - ensemble = experiment.get_ensemble_by_name(f"batch_{batch}") - realizations = ensemble.get_realization_list_with_responses() - try: - df_pl = ensemble.load_responses("summary", tuple(realizations)) - except (ValueError, KeyError): - return pd.DataFrame() - df_pl = df_pl.pivot( - on="response_key", index=["realization", "time"], sort_columns=True - ) - df_pl = df_pl.rename({"time": "Date", "realization": "Realization"}) - return ( - df_pl.to_pandas() - .set_index(["Realization", "Date"]) - .sort_values(by=["Date", "Realization"]) - ) - - batches = {elem[MetaDataColumnNames.BATCH] for elem in metadata} - batch_data = [] - for idx, batch in enumerate(batches): - progress_callback(float(idx) / len(batches)) - batch_data.append(load_batch_by_id()) - batch_data[-1][MetaDataColumnNames.BATCH] = batch - - for b in batch_data: - b.reset_index(inplace=True) - b.rename( - index=str, - inplace=True, - columns={ - "Realization": MetaDataColumnNames.SIMULATION, - "Date": MetaDataColumnNames.SIMULATED_DATE, - }, - ) - - data = pd.concat(batch_data, ignore_index=True, sort=False) - data = pd.merge( - left=data, - right=pd.DataFrame(metadata), - on=[MetaDataColumnNames.BATCH, MetaDataColumnNames.SIMULATION], - sort=False, - ) - - return data diff --git a/tests/everest/conftest.py b/tests/everest/conftest.py index 41ef76c3e75..d0ea5dd387d 100644 --- a/tests/everest/conftest.py +++ b/tests/everest/conftest.py @@ -235,5 +235,3 @@ def mock_server(monkeypatch): monkeypatch.setattr(everserver, "_find_open_port", lambda *args, **kwargs: 42) monkeypatch.setattr(everserver, "_write_hostfile", MagicMock()) monkeypatch.setattr(everserver, "_everserver_thread", MagicMock()) - monkeypatch.setattr(everserver, "export_to_csv", MagicMock()) - monkeypatch.setattr(everserver, "export_with_progress", MagicMock()) diff --git a/tests/everest/entry_points/test_everexport.py b/tests/everest/entry_points/test_everexport.py deleted file mode 100644 index 31be878aab4..00000000000 --- a/tests/everest/entry_points/test_everexport.py +++ /dev/null @@ -1,311 +0,0 @@ -import logging -import os -from pathlib import Path -from unittest.mock import patch - -import pandas as pd -import pytest - -from everest import ConfigKeys as CK -from everest import MetaDataColumnNames as MDCN -from everest.bin.everexport_script import everexport_entry -from everest.bin.utils import ProgressBar -from everest.config import EverestConfig, ExportConfig -from tests.everest.utils import ( - satisfy, - satisfy_callable, - satisfy_type, -) - -CONFIG_FILE_MINIMAL = "config_minimal.yml" - -CONFIG_FILE_MOCKED_TEST_CASE = "mocked_multi_batch.yml" - - -pytestmark = pytest.mark.xdist_group(name="starts_everest") - -TEST_DATA = pd.DataFrame( - columns=[ - MDCN.BATCH, - MDCN.SIMULATION, - MDCN.IS_GRADIENT, - MDCN.START_TIME, - ], - data=[ - [0, 0, False, 0.0], # First func evaluation on 2 realizations - [0, 1, False, 0.0], - [0, 2, True, 0.0], # First grad evaluation 2 perts per real - [0, 3, True, 0.1], - [0, 4, True, 0.1], - [0, 5, True, 0.1], - [1, 0, False, 0.3], - [1, 1, False, 0.32], - [2, 0, True, 0.5], - [2, 1, True, 0.5], - [2, 2, True, 0.5], - [2, 3, True, 0.6], - ], -) - - -def export_mock(config, export_ecl=True, progress_callback=lambda _: None): - progress_callback(1.0) - return TEST_DATA - - -def empty_mock(config, export_ecl=True, progress_callback=lambda _: None): - progress_callback(1.0) - return pd.DataFrame() - - -def validate_export_mock(**_): - return ([], True) - - -@patch("everest.bin.everexport_script.export_with_progress", side_effect=export_mock) -def test_everexport_entry_run(_, cached_example): - """Test running everexport with not flags""" - config_path, config_file, _ = cached_example("math_func/config_minimal.yml") - config = EverestConfig.load_file(Path(config_path) / config_file) - export_file_path = config.export_path - assert not os.path.isfile(export_file_path) - - everexport_entry([config_file]) - - assert os.path.isfile(export_file_path) - df = pd.read_csv(export_file_path, sep=";") - assert df.equals(TEST_DATA) - - -@patch("everest.bin.everexport_script.export_with_progress", side_effect=empty_mock) -def test_everexport_entry_empty(mocked_func, cached_example): - """Test running everexport with no data""" - # NOTE: When there is no data (ie, the optimization has not yet run) - # the current behavior is to create an empty .csv file. It is arguable - # whether that is really the desired behavior, but for now we assume - # it is and we test against that expected behavior. - config_path, config_file, _ = cached_example("math_func/config_minimal.yml") - config = EverestConfig.load_file(Path(config_path) / config_file) - export_file_path = config.export_path - assert not os.path.isfile(export_file_path) - - everexport_entry([CONFIG_FILE_MINIMAL]) - - assert os.path.isfile(export_file_path) - with open(export_file_path, encoding="utf-8") as f: - content = f.read() - assert not content.strip() - - -@patch( - "everest.bin.everexport_script.check_for_errors", - side_effect=validate_export_mock, -) -@patch("everest.bin.utils.export_data") -@pytest.mark.fails_on_macos_github_workflow -def test_everexport_entry_batches(mocked_func, validate_export_mock, cached_example): - """Test running everexport with the --batches flag""" - _, config_file, _ = cached_example("math_func/config_minimal.yml") - everexport_entry([config_file, "--batches", "0", "2"]) - - def check_export_batches(config): - batches = (config.batches if config is not None else None) or False - return set(batches) == {0, 2} - - if ProgressBar: # different calls if ProgressBar available or not - mocked_func.assert_called_once_with( - export_config=satisfy(check_export_batches), - output_dir=satisfy_type(str), - data_file=None, - export_ecl=True, - progress_callback=satisfy_callable(), - ) - else: - mocked_func.assert_called_once() - - -@patch("everest.bin.everexport_script.export_to_csv") -def test_everexport_entry_no_export(mocked_func, cached_example): - """Test running everexport on config file with skip_export flag - set to true""" - - config_path, config_file, _ = cached_example("math_func/config_minimal.yml") - config = EverestConfig.load_file(Path(config_path) / config_file) - config.export = ExportConfig(skip_export=True) - # Add export section to config file and set run_export flag to false - export_file_path = config.export_path - assert not os.path.isfile(export_file_path) - - everexport_entry([CONFIG_FILE_MINIMAL]) - # Check export to csv is called even if the skip_export entry is in the - # config file - mocked_func.assert_called_once() - - -@patch("everest.bin.everexport_script.export_to_csv") -def test_everexport_entry_empty_export(mocked_func, cached_example): - """Test running everexport on config file with empty export section""" - _, config_file, _ = cached_example("math_func/config_minimal.yml") - - # Add empty export section to config file - with open(config_file, "a", encoding="utf-8") as f: - f.write(f"{CK.EXPORT}:\n") - - everexport_entry([config_file]) - # Check export to csv is called even if export section is empty - mocked_func.assert_called_once() - - -@patch("everest.bin.utils.export_data") -@pytest.mark.fails_on_macos_github_workflow -def test_everexport_entry_no_usr_def_ecl_keys(mocked_func, cached_example): - """Test running everexport with config file containing only the - keywords label without any list of keys""" - - _, config_file, _ = cached_example( - "../../tests/everest/test_data/mocked_test_case/mocked_multi_batch.yml" - ) - - # Add export section to config file and set run_export flag to false - with open(config_file, "a", encoding="utf-8") as f: - f.write(f"{CK.EXPORT}:\n {CK.KEYWORDS}:") - - everexport_entry([config_file]) - - def condition(config): - batches = config.batches if config is not None else None - keys = config.keywords if config is not None else None - - return batches is None and keys is None - - if ProgressBar: - mocked_func.assert_called_once_with( - export_config=satisfy(condition), - output_dir=satisfy_type(str), - data_file=satisfy_type(str), - export_ecl=True, - progress_callback=satisfy_callable(), - ) - else: - mocked_func.assert_called_once() - - -@patch("everest.bin.utils.export_data") -@pytest.mark.fails_on_macos_github_workflow -def test_everexport_entry_internalized_usr_def_ecl_keys(mocked_func, cached_example): - """Test running everexport with config file containing a key in the - list of user defined ecl keywords, that has been internalized on - a previous run""" - - _, config_file, _ = cached_example( - "../../tests/everest/test_data/mocked_test_case/mocked_multi_batch.yml" - ) - user_def_keys = ["FOPT"] - - # Add export section to config file and set run_export flag to false - with open(config_file, "a", encoding="utf-8") as f: - f.write(f"{CK.EXPORT}:\n {CK.KEYWORDS}: {user_def_keys}") - - everexport_entry([config_file]) - - def condition(config): - batches = config.batches if config is not None else None - keys = config.keywords if config is not None else None - - return batches is None and keys == user_def_keys - - if ProgressBar: - mocked_func.assert_called_once_with( - export_config=satisfy(condition), - output_dir=satisfy_type(str), - data_file=satisfy_type(str), - export_ecl=True, - progress_callback=satisfy_callable(), - ) - else: - mocked_func.assert_called_once() - - -@patch("everest.bin.utils.export_data") -@pytest.mark.fails_on_macos_github_workflow -def test_everexport_entry_non_int_usr_def_ecl_keys(mocked_func, caplog, cached_example): - """Test running everexport when config file contains non internalized - ecl keys in the user defined keywords list""" - - _, config_file, _ = cached_example( - "../../tests/everest/test_data/mocked_test_case/mocked_multi_batch.yml" - ) - - non_internalized_key = "KEY" - user_def_keys = ["FOPT", non_internalized_key] - - # Add export section to config file and set run_export flag to false - with open(config_file, "a", encoding="utf-8") as f: - f.write(f"{CK.EXPORT}:\n {CK.KEYWORDS}: {user_def_keys}") - - with caplog.at_level(logging.DEBUG): - everexport_entry([config_file]) - - assert ( - f"Non-internalized ecl keys selected for export '{non_internalized_key}'" - in "\n".join(caplog.messages) - ) - - def condition(config): - batches = config.batches if config is not None else None - keys = config.keywords if config is not None else None - - return batches is None and keys == user_def_keys - - if ProgressBar: - mocked_func.assert_called_once_with( - export_config=satisfy(condition), - output_dir=satisfy_type(str), - data_file=satisfy_type(str), - export_ecl=False, - progress_callback=satisfy_callable(), - ) - else: - mocked_func.assert_called_once() - - -@patch("everest.bin.utils.export_data") -@pytest.mark.fails_on_macos_github_workflow -def test_everexport_entry_not_available_batches(mocked_func, caplog, cached_example): - """Test running everexport when config file contains non existing - batch numbers in the list of user defined batches""" - - _, config_file, _ = cached_example( - "../../tests/everest/test_data/mocked_test_case/mocked_multi_batch.yml" - ) - - na_batch = 42 - user_def_batches = [0, na_batch] - - # Add export section to config file and set run_export flag to false - with open(config_file, "a", encoding="utf-8") as f: - f.write(f"{CK.EXPORT}:\n {CK.BATCHES}: {user_def_batches}") - - with caplog.at_level(logging.DEBUG): - everexport_entry([config_file]) - - assert ( - f"Batch {na_batch} not found in optimization results." - f" Skipping for current export" in "\n".join(caplog.messages) - ) - - def condition(config): - batches = config.batches if config is not None else None - keys = config.keywords if config is not None else None - return batches == [0] and keys is None - - if ProgressBar: - mocked_func.assert_called_once_with( - export_config=satisfy(condition), - output_dir=satisfy_type(str), - data_file=satisfy_type(str), - export_ecl=True, - progress_callback=satisfy_callable(), - ) - else: - mocked_func.assert_called_once() diff --git a/tests/everest/functional/test_main_everest_entry.py b/tests/everest/functional/test_main_everest_entry.py index 028248e0d5a..723a7691e70 100644 --- a/tests/everest/functional/test_main_everest_entry.py +++ b/tests/everest/functional/test_main_everest_entry.py @@ -94,9 +94,10 @@ def test_everest_entry_monitor_no_run(cached_example): def test_everest_main_export_entry(cached_example): # Setup command line arguments _, config_file, _ = cached_example("math_func/config_minimal.yml") - with capture_streams(): - start_everest(["everest", "export", config_file]) - assert os.path.exists(os.path.join("everest_output", "config_minimal.csv")) + with capture_streams() as (out, _): + start_everest(["everest", "export", str(config_file)]) + + assert "Everexport is deprecated" in out.getvalue() @pytest.mark.integration_test diff --git a/tests/everest/test_everlint.py b/tests/everest/test_everlint.py index 4aa2bb4c729..2c79dba3915 100644 --- a/tests/everest/test_everlint.py +++ b/tests/everest/test_everlint.py @@ -202,23 +202,6 @@ def test_bool_validation(value, valid, min_config, tmp_path, monkeypatch): EverestConfig(**min_config) -@pytest.mark.parametrize( - "path_val, valid", [("my_file", True), ("my_folder/", False), ("my_folder", False)] -) -def test_export_filepath_validation(min_config, tmp_path, monkeypatch, path_val, valid): - monkeypatch.chdir(tmp_path) - Path("my_file").touch() - Path("my_folder").mkdir() - min_config["export"] = {"csv_output_filepath": path_val} - expectation = ( - does_not_raise() - if valid - else pytest.raises(ValidationError, match="Invalid type") - ) - with expectation: - EverestConfig(**min_config) - - def test_invalid_wells(tmp_path, monkeypatch): monkeypatch.chdir(tmp_path) Path("my_file").touch() diff --git a/tests/everest/test_export.py b/tests/everest/test_export.py deleted file mode 100644 index 4686748df4a..00000000000 --- a/tests/everest/test_export.py +++ /dev/null @@ -1,353 +0,0 @@ -import os -import shutil -from pathlib import Path - -import pandas as pd -import pytest - -from everest import filter_data -from everest.bin.utils import export_with_progress -from everest.config import EverestConfig -from everest.config.export_config import ExportConfig -from everest.export import check_for_errors, export_data - -CONFIG_FILE = "config_multiobj.yml" -DATA = pd.DataFrame( - { - "WOPT:WELL0": range(4), - "MONKEY": 4 * [0], - "WCON:WELL1": 4 * [14], - "GOPT:GROUP0": [5, 6, 2, 1], - "WOPT:WELL1": range(4), - } -) - -pytestmark = pytest.mark.xdist_group(name="starts_everest") - - -def assertEqualDataFrames(x, y): - assert set(x.columns) == set(y.columns) - for col in x.columns: - assert list(x[col]) == list(y[col]) - - -def test_filter_no_wildcard(): - keywords = ["MONKEY", "Dr. MONKEY", "WOPT:WELL1"] - assertEqualDataFrames(DATA[["MONKEY", "WOPT:WELL1"]], filter_data(DATA, keywords)) - - -def test_filter_leading_wildcard(): - keywords = ["*:WELL1"] - assertEqualDataFrames( - DATA[["WCON:WELL1", "WOPT:WELL1"]], filter_data(DATA, keywords) - ) - - -def test_filter_trailing_wildcard(): - keywords = ["WOPT:*", "MONKEY"] - assertEqualDataFrames( - DATA[["MONKEY", "WOPT:WELL0", "WOPT:WELL1"]], - filter_data(DATA, keywords), - ) - - -def test_filter_double_wildcard(): - keywords = ["*OPT:*0"] - assertEqualDataFrames( - DATA[["WOPT:WELL0", "GOPT:GROUP0"]], filter_data(DATA, keywords) - ) - - -def test_export_only_non_gradient_with_increased_merit(cached_example, snapshot): - config_path, config_file, _ = cached_example("math_func/config_multiobj.yml") - config = EverestConfig.load_file(Path(config_path) / config_file) - # Default export functionality when no export section is defined - df = export_data( - export_config=config.export, - output_dir=config.output_dir, - data_file=config.model.data_file if config.model else None, - ) - - # Test that the default export functionality generated data frame - # contains only non gradient simulations - snapshot.assert_match( - df.drop(["start_time", "end_time"], axis=1).round(4).to_csv(), "export.csv" - ) - - -def test_export_only_non_gradient(cached_example, snapshot): - config_path, config_file, _ = cached_example("math_func/config_multiobj.yml") - config = EverestConfig.load_file(Path(config_path) / config_file) - - # Add export section to config - config.export = ExportConfig(discard_rejected=False) - - df = export_data( - export_config=config.export, - output_dir=config.output_dir, - data_file=config.model.data_file if config.model else None, - ) - - snapshot.assert_match( - df.drop(["start_time", "end_time"], axis=1).round(4).to_csv(), "export.csv" - ) - - -def test_export_only_increased_merit(cached_example, snapshot): - config_path, config_file, _ = cached_example("math_func/config_multiobj.yml") - config = EverestConfig.load_file(Path(config_path) / config_file) - - # Add export section to config - config.export = ExportConfig(discard_gradient=False) - - df = export_data( - export_config=config.export, - output_dir=config.output_dir, - data_file=config.model.data_file if config.model else None, - ) - - snapshot.assert_match( - df.drop(["start_time", "end_time"], axis=1).round(4).to_csv(), - "export.csv", - ) - - -def test_export_all_batches(cached_example, snapshot): - config_path, config_file, _ = cached_example("math_func/config_multiobj.yml") - config = EverestConfig.load_file(Path(config_path) / config_file) - - # Add export section to config - config.export = ExportConfig(discard_gradient=False, discard_rejected=False) - - df = export_data( - export_config=config.export, - output_dir=config.output_dir, - data_file=config.model.data_file if config.model else None, - ) - - snapshot.assert_match( - df.drop(["start_time", "end_time"], axis=1).round(4).to_csv(), "export.csv" - ) - - -def test_export_only_give_batches(cached_example, snapshot): - config_path, config_file, _ = cached_example("math_func/config_multiobj.yml") - config = EverestConfig.load_file(Path(config_path) / config_file) - - # Add export section to config - config.export = ExportConfig(discard_gradient=True, batches=[2]) - - df = export_data( - export_config=config.export, - output_dir=config.output_dir, - data_file=config.model.data_file if config.model else None, - ) - - snapshot.assert_match( - df.drop(["start_time", "end_time"], axis=1).round(4).to_csv(), "export.csv" - ) - - -def test_export_batches_progress(cached_example, snapshot): - config_path, config_file, _ = cached_example("math_func/config_multiobj.yml") - config = EverestConfig.load_file(Path(config_path) / config_file) - - # Add export section to config - config.export = ExportConfig(discard_gradient=True, batches=[2]) - - df = export_with_progress(config) - # Check only simulations from given batches are present in export - # drop non-deterministic columns - df = df.drop(["start_time", "end_time", "simulation"], axis=1) - df = df.sort_values(by=["realization", "batch", "sim_avg_obj"]) - - snapshot.assert_match(df.round(4).to_csv(index=False), "export.csv") - - -def test_export_nothing_for_empty_batch_list(cached_example): - config_path, config_file, _ = cached_example("math_func/config_multiobj.yml") - config = EverestConfig.load_file(Path(config_path) / config_file) - - # Add discard gradient flag to config file - config.export = ExportConfig( - discard_gradient=True, discard_rejected=True, batches=[] - ) - df = export_data( - export_config=config.export, - output_dir=config.output_dir, - data_file=config.model.data_file if config.model else None, - ) - - # Check export returns empty data frame - assert df.empty - - -def test_export_nothing(cached_example): - config_path, config_file, _ = cached_example("math_func/config_multiobj.yml") - config = EverestConfig.load_file(Path(config_path) / config_file) - - # Add discard gradient flag to config file - config.export = ExportConfig( - skip_export=True, discard_gradient=True, discard_rejected=True, batches=[3] - ) - df = export_data( - export_config=config.export, - output_dir=config.output_dir, - data_file=config.model.data_file if config.model else None, - ) - - # Check export returns empty data frame - assert df.empty - - -def test_get_export_path(cached_example): - config_path, config_file, _ = cached_example("math_func/config_multiobj.yml") - config = EverestConfig.load_file(Path(config_path) / config_file) - - # Test default export path when no csv_output_filepath is defined - expected_export_path = os.path.join( - config.output_dir, CONFIG_FILE.replace(".yml", ".csv") - ) - assert expected_export_path == config.export_path - - # Test export path when csv_output_filepath is an absolute path - new_export_folderpath = os.path.join(config.output_dir, "new/folder") - new_export_filepath = os.path.join( - new_export_folderpath, CONFIG_FILE.replace(".yml", ".csv") - ) - - config.export = ExportConfig(csv_output_filepath=new_export_filepath) - - expected_export_path = new_export_filepath - assert expected_export_path == config.export_path - - # Test export path when csv_output_filepath is a relative path - config.export.csv_output_filepath = os.path.join( - "new/folder", CONFIG_FILE.replace(".yml", ".csv") - ) - assert expected_export_path == config.export_path - - # Test export when file does not contain an extension. - config_file_no_extension = os.path.splitext(os.path.basename(CONFIG_FILE))[0] - shutil.copy(CONFIG_FILE, config_file_no_extension) - new_config = EverestConfig.load_file(config_file_no_extension) - expected_export_path = os.path.join( - new_config.output_dir, f"{config_file_no_extension}.csv" - ) - assert expected_export_path == new_config.export_path - - -def test_validate_export(cached_example): - config_path, config_file, _ = cached_example( - "../../tests/everest/test_data/mocked_test_case/mocked_multi_batch.yml" - ) - config = EverestConfig.load_file(Path(config_path) / config_file) - - def check_error(expected_error, reported_errors): - expected_error_msg, expected_export_ecl = expected_error - error_list, export_ecl = reported_errors - # If no error was message provided the list of errors - # should also be empty - if not expected_error_msg: - assert len(error_list) == 0 - assert expected_export_ecl == export_ecl - else: - found = False - for error in error_list: - if expected_error_msg in error: - found = True - break - assert found - assert expected_export_ecl == export_ecl - - # Test error when user defines an empty list for the eclipse keywords - config.export = ExportConfig() - config.export.keywords = [] - errors, export_ecl = check_for_errors( - config=config.export, - optimization_output_path=config.optimization_output_dir, - storage_path=config.storage_dir, - data_file_path=config.model.data_file, - ) - check_error( - expected_error=("No eclipse keywords selected for export", False), - reported_errors=(errors, export_ecl), - ) - - # Test error when user defines an empty list for the eclipse keywords - # and empty list of for batches to export - config.export.batches = [] - errors, export_ecl = check_for_errors( - config=config.export, - optimization_output_path=config.optimization_output_dir, - storage_path=config.storage_dir, - data_file_path=config.model.data_file, - ) - check_error( - expected_error=("No batches selected for export.", False), - reported_errors=(errors, export_ecl), - ) - - # Test export validator outputs no errors when the config file contains - # only keywords that represent a subset of already internalized keys - config.export.keywords = ["FOPT"] - config.export.batches = None - errors, export_ecl = check_for_errors( - config=config.export, - optimization_output_path=config.optimization_output_dir, - storage_path=config.storage_dir, - data_file_path=config.model.data_file, - ) - check_error(expected_error=("", True), reported_errors=(errors, export_ecl)) - - non_int_key = "STANGE_KEY" - config.export.keywords = [non_int_key, "FOPT"] - errors, export_ecl = check_for_errors( - config=config.export, - optimization_output_path=config.optimization_output_dir, - storage_path=config.storage_dir, - data_file_path=config.model.data_file, - ) - - check_error( - ( - f"Non-internalized ecl keys selected for export '{non_int_key}'.", - False, - ), - (errors, export_ecl), - ) - - # Test that validating the export spots non-valid batches and removes - # them from the list of batches selected for export. - non_valid_batch = 42 - config.export = ExportConfig(batches=[0, non_valid_batch]) - errors, export_ecl = check_for_errors( - config=config.export, - optimization_output_path=config.optimization_output_dir, - storage_path=config.storage_dir, - data_file_path=config.model.data_file, - ) - check_error( - ( - f"Batch {non_valid_batch} not found in optimization results. Skipping for" - " current export", - True, - ), - (errors, export_ecl), - ) - assert config.export.batches == [0] - - -def test_export_gradients(cached_example, snapshot): - config_path, config_file, _ = cached_example("math_func/config_multiobj.yml") - config = EverestConfig.load_file(Path(config_path) / config_file) - - df = export_data( - export_config=config.export, - output_dir=config.output_dir, - data_file=config.model.data_file if config.model else None, - ) - - snapshot.assert_match( - df.drop(["start_time", "end_time"], axis=1).round(4).to_csv(), "export.csv" - ) diff --git a/tests/everest/test_math_func.py b/tests/everest/test_math_func.py index c5de0df03f9..ffde0b7d2e5 100644 --- a/tests/everest/test_math_func.py +++ b/tests/everest/test_math_func.py @@ -1,16 +1,10 @@ -import itertools import os -import numpy as np -import pandas as pd import pytest import yaml from ert.run_models.everest_run_model import EverestRunModel -from everest import ConfigKeys as CK from everest.config import EverestConfig, InputConstraintConfig -from everest.config.export_config import ExportConfig -from everest.export import export_data from everest.util import makedirs_if_needed CONFIG_FILE_MULTIOBJ = "config_multiobj.yml" @@ -38,68 +32,6 @@ def test_math_func_multiobj( (-0.5 * (2.0 / 3.0) * 1.5) + (-4.5 * (1.0 / 3.0) * 1.0), abs=0.01 ) - # Test conversion to pandas DataFrame - if config.export is None: - config.export = ExportConfig(discard_rejected=False) - - df = export_data( - export_config=config.export, - output_dir=config.output_dir, - data_file=config.model.data_file if config.model else None, - ) - ok_evals = df[(df["is_gradient"] == 0) & (df["success"] == 1)] - - # Three points in this case are increasing the merit - assert len(ok_evals[ok_evals["increased_merit"] == 1]) == 2 - - first = ok_evals.iloc[0] - best = ok_evals.iloc[-1] - assert first["point_x"] == 0 - assert first["point_y"] == 0 - assert first["point_z"] == 0 - assert first["distance_p"] == -(0.5 * 0.5 * 3) - assert first["distance_q"] == -(1.5 * 1.5 * 2 + 0.5 * 0.5) - assert first["sim_avg_obj"] == (-0.75 * (2.0 / 3.0) * 1.5) + ( - -4.75 * (1.0 / 3.0) * 1.0 - ) - - assert best["point_x"] == pytest.approx(x) - assert best["point_y"] == pytest.approx(y) - assert best["point_z"] == pytest.approx(z) - assert best["sim_avg_obj"] == pytest.approx(run_model.result.total_objective) - - test_space = itertools.product( - (first, best), - ( - ("distance_p", 2.0 / 3, 1.5), - ("distance_q", 1.0 / 3, 1), - ), - ) - for row, (obj_name, weight, norm) in test_space: - assert row[obj_name] * norm == row[obj_name + "_norm"] - assert row[obj_name] * weight * norm == pytest.approx( - row[obj_name + "_weighted_norm"] - ) - - assert first["realization_weight"] == 1.0 - assert best["realization_weight"] == 1.0 - - # check exported sim_avg_obj against dakota_tabular - dt = pd.read_csv( - os.path.join(config.optimization_output_dir, "dakota", "dakota_tabular.dat"), - sep=" +", - engine="python", - ) - dt.sort_values(by=["%eval_id"], inplace=True) - ok_evals = ok_evals.sort_values(by=["batch"]) - for a, b in zip( - dt["obj_fn"], # pylint: disable=unsubscriptable-object - ok_evals["sim_avg_obj"], - strict=False, - ): - # Opposite, because ropt negates values before passing to dakota - assert -a == pytest.approx(b) - @pytest.mark.integration_test def test_math_func_advanced( @@ -131,55 +63,6 @@ def test_math_func_advanced( expected_opt = -(w[0] * (dist_0) + w[1] * (dist_1)) assert expected_opt == pytest.approx(run_model.result.total_objective, abs=0.001) - # Test conversion to pandas DataFrame - df = export_data( - export_config=config.export, - output_dir=config.output_dir, - data_file=config.model.data_file if config.model else None, - ) - ok_evals = df[(df["is_gradient"] == 0) & (df["success"] == 1)] - - ok_evals_0 = ok_evals[ok_evals["realization"] == 0] - best_0 = ok_evals_0.iloc[-1] - assert best_0[f"point_{point_names[0]}"] == pytest.approx(x0) - assert best_0[f"point_{point_names[1]}"] == pytest.approx(x1) - assert best_0[f"point_{point_names[2]}"] == pytest.approx(x2) - assert best_0["distance"] == pytest.approx(-dist_0, abs=0.001) - assert best_0["real_avg_obj"] == pytest.approx( - run_model.result.total_objective, abs=0.001 - ) - assert best_0["realization_weight"] == 0.25 - - ok_evals_1 = ok_evals[ok_evals["realization"] == 2] - best_1 = ok_evals_1.iloc[-1] - assert best_1[f"point_{point_names[0]}"] == pytest.approx(x0) - assert best_1[f"point_{point_names[1]}"] == pytest.approx(x1) - assert best_1[f"point_{point_names[2]}"] == pytest.approx(x2) - assert best_1["distance"] == pytest.approx(-dist_1, abs=0.001) - assert best_1["real_avg_obj"] == pytest.approx( - run_model.result.total_objective, abs=0.001 - ) - assert best_1["realization_weight"] == 0.75 - - # check functionality of export batch filtering - if CK.EXPORT not in config: - config.export = ExportConfig() - - exp_nunique = 2 - batches_list = [0, 2] - config.export.batches = batches_list - - batch_filtered_df = export_data( - export_config=config.export, - output_dir=config.output_dir, - data_file=config.model.data_file if config.model else None, - ) - n_unique_batches = batch_filtered_df["batch"].nunique() - unique_batches = np.sort(batch_filtered_df["batch"].unique()).tolist() - - assert exp_nunique == n_unique_batches - assert batches_list == unique_batches - @pytest.mark.integration_test def test_remove_run_path( From 198760f50d1fb018d1164a5a9ff69b7c53b846b4 Mon Sep 17 00:00:00 2001 From: "Yngve S. Kristiansen" Date: Fri, 10 Jan 2025 13:58:49 +0100 Subject: [PATCH 04/13] Use new storage for egg test --- .../test_egg_snapshot/best_controls | 4 +++ .../best_objective_gradients_csv | 13 +++++++ .../test_egg_snapshot/best_objectives_csv | 4 +++ tests/everest/test_egg_simulation.py | 34 ++++++++++++------- 4 files changed, 42 insertions(+), 13 deletions(-) create mode 100644 tests/everest/snapshots/test_egg_simulation/test_egg_snapshot/best_controls create mode 100644 tests/everest/snapshots/test_egg_simulation/test_egg_snapshot/best_objective_gradients_csv create mode 100644 tests/everest/snapshots/test_egg_simulation/test_egg_snapshot/best_objectives_csv diff --git a/tests/everest/snapshots/test_egg_simulation/test_egg_snapshot/best_controls b/tests/everest/snapshots/test_egg_simulation/test_egg_snapshot/best_controls new file mode 100644 index 00000000000..efa2c369beb --- /dev/null +++ b/tests/everest/snapshots/test_egg_simulation/test_egg_snapshot/best_controls @@ -0,0 +1,4 @@ +result_id,batch_id,realization,simulation_id,well_rate_PROD1-1,well_rate_PROD2-1,well_rate_PROD3-1,well_rate_PROD4-1,well_rate_INJECT1-1,well_rate_INJECT2-1,well_rate_INJECT3-1,well_rate_INJECT4-1,well_rate_INJECT5-1,well_rate_INJECT6-1,well_rate_INJECT7-1,well_rate_INJECT8-1 +2,1,0,0,0.1999058451019366,0.6000415374802492,0.19994822906594492,0.5999121404696927,0.5999333812347782,0.5998249609455343,0.5999478780872213,0.6000766587484829,0.5997483050379139,0.5997645538239165,0.5999464128453291,0.6001158013106983 +2,1,1,1,0.1999058451019366,0.6000415374802492,0.19994822906594492,0.5999121404696927,0.5999333812347782,0.5998249609455343,0.5999478780872213,0.6000766587484829,0.5997483050379139,0.5997645538239165,0.5999464128453291,0.6001158013106983 +2,1,2,2,0.1999058451019366,0.6000415374802492,0.19994822906594492,0.5999121404696927,0.5999333812347782,0.5998249609455343,0.5999478780872213,0.6000766587484829,0.5997483050379139,0.5997645538239165,0.5999464128453291,0.6001158013106983 diff --git a/tests/everest/snapshots/test_egg_simulation/test_egg_snapshot/best_objective_gradients_csv b/tests/everest/snapshots/test_egg_simulation/test_egg_snapshot/best_objective_gradients_csv new file mode 100644 index 00000000000..c3c9b602ce8 --- /dev/null +++ b/tests/everest/snapshots/test_egg_simulation/test_egg_snapshot/best_objective_gradients_csv @@ -0,0 +1,13 @@ +result_id,batch_id,control_name,rf,rf.total +3,1,well_rate_PROD1-1,-0.0001652263441330399,-0.0001652263441330399 +3,1,well_rate_PROD2-1,-0.00017471993459638647,-0.00017471993459638647 +3,1,well_rate_PROD3-1,-0.0006246697489175857,-0.0006246697489175857 +3,1,well_rate_PROD4-1,-0.00041696791141070507,-0.00041696791141070507 +3,1,well_rate_INJECT1-1,0.00014572715946017988,0.00014572715946017988 +3,1,well_rate_INJECT2-1,0.000020160145178916144,0.000020160145178916144 +3,1,well_rate_INJECT3-1,0.00011211576845746024,0.00011211576845746024 +3,1,well_rate_INJECT4-1,-0.00039013326453636804,-0.00039013326453636804 +3,1,well_rate_INJECT5-1,0.000015273711834717528,0.000015273711834717528 +3,1,well_rate_INJECT6-1,0.00022688941880139667,0.00022688941880139667 +3,1,well_rate_INJECT7-1,-0.000017986172009426353,-0.000017986172009426353 +3,1,well_rate_INJECT8-1,-0.00007249824942858863,-0.00007249824942858863 diff --git a/tests/everest/snapshots/test_egg_simulation/test_egg_snapshot/best_objectives_csv b/tests/everest/snapshots/test_egg_simulation/test_egg_snapshot/best_objectives_csv new file mode 100644 index 00000000000..8be2ccb0adb --- /dev/null +++ b/tests/everest/snapshots/test_egg_simulation/test_egg_snapshot/best_objectives_csv @@ -0,0 +1,4 @@ +result_id,batch_id,realization,perturbation,rf,well_rate_PROD1-1,well_rate_PROD2-1,well_rate_PROD3-1,well_rate_PROD4-1,well_rate_INJECT1-1,well_rate_INJECT2-1,well_rate_INJECT3-1,well_rate_INJECT4-1,well_rate_INJECT5-1,well_rate_INJECT6-1,well_rate_INJECT7-1,well_rate_INJECT8-1 +3,1,0,0,0.17690399289131165,0.21284489964814637,0.6909564325624082,0.38431070691176333,0.8356643180772911,0.521629534106952,0.593979014614453,0.6040659148639875,0.7933463643524403,0.5993493412129899,0.5212494525431188,0.6070894345108306,0.6519904009594275 +3,1,1,0,0.175477996468544,0.298444266061598,0.7489971930595409,0.012799873895175784,0.6900597174834636,0.48925234184495137,0.542695688308217,0.6807768824317812,0.7317670260000283,0.6644455051643829,0.5000562156945711,0.6616537223014831,0.6921270651437204 +3,1,2,0,0.17582400143146515,0.43332587738243267,0.6304305725596401,0.254408502582991,0.4256930350912209,0.5920584096483588,0.5533444899078019,0.5039440678184369,0.5579961940071504,0.6274756974975367,0.49923076114351883,0.6427740436133934,0.5885310395649261 diff --git a/tests/everest/test_egg_simulation.py b/tests/everest/test_egg_simulation.py index ce55463f374..4e5c5672d59 100644 --- a/tests/everest/test_egg_simulation.py +++ b/tests/everest/test_egg_simulation.py @@ -1,7 +1,8 @@ +import io import json import os -import sys +import polars import pytest from ert.config import ErtConfig @@ -11,7 +12,6 @@ from everest.config import EverestConfig from everest.config.export_config import ExportConfig from everest.config_keys import ConfigKeys -from everest.export import export_data from everest.simulator.everest_to_ert import _everest_to_ert_config_dict from tests.everest.utils import ( everest_default_jobs, @@ -708,15 +708,23 @@ def sweetcallbackofmine(self, *args, **kwargs): run_model.run_experiment(evaluator_server_config) assert cbtracker.called + best_batch = [b for b in run_model.ever_storage.data.batches if b.is_improvement][ + -1 + ] - data = export_data( - export_config=config.export, - output_dir=config.output_dir, - data_file=config.model.data_file if config.model else None, - ) - snapshot.assert_match( - data.drop(columns=["TCPUDAY", "start_time", "end_time"], axis=1) - .round(6) - .to_csv(), - f"egg-py{sys.version_info.major}{sys.version_info.minor}.csv", - ) + def _df_to_string(df: polars.DataFrame): + strbuf = io.StringIO() + schema = df.schema + df.with_columns( + polars.col(c) for c in df.columns if schema[c] == polars.Float32 + ).write_csv(strbuf) + + return strbuf.getvalue() + + best_objectives_csv = _df_to_string(best_batch.perturbation_objectives) + best_objective_gradients_csv = _df_to_string(best_batch.batch_objective_gradient) + best_controls = _df_to_string(best_batch.realization_controls) + + snapshot.assert_match(best_controls, "best_controls") + snapshot.assert_match(best_objectives_csv, "best_objectives_csv") + snapshot.assert_match(best_objective_gradients_csv, "best_objective_gradients_csv") From 74bfb3dd261887247de00dbf106d90f1b409d472 Mon Sep 17 00:00:00 2001 From: "Yngve S. Kristiansen" Date: Fri, 10 Jan 2025 08:56:24 +0100 Subject: [PATCH 05/13] Add test locking in get_opt_status behavior --- tests/everest/test_detached.py | 78 ++++++++++++++++++++++++++++++++++ 1 file changed, 78 insertions(+) diff --git a/tests/everest/test_detached.py b/tests/everest/test_detached.py index 9f828dd57da..f0351626d59 100644 --- a/tests/everest/test_detached.py +++ b/tests/everest/test_detached.py @@ -3,6 +3,7 @@ from pathlib import Path from unittest.mock import MagicMock, patch +import numpy as np import pytest import requests @@ -25,6 +26,7 @@ PROXY, ServerStatus, everserver_status, + get_opt_status, server_is_running, start_server, stop_server, @@ -337,3 +339,79 @@ async def server_running(): driver = await start_server(everest_config, debug=True) final_state = await server_running() assert final_state.returncode == 0 + + +def test_get_opt_status(cached_example): + _, config_file, _ = cached_example("math_func/config_multiobj.yml") + config = EverestConfig.load_file(config_file) + + opts = get_opt_status(config.optimization_output_dir) + + assert np.allclose( + opts["objective_history"], [-2.3333, -2.3335, -2.0000], atol=1e-4 + ) + + assert np.allclose( + opts["control_history"]["point_x"], + [0.0, -0.004202181916184627, -0.0021007888698514315], + atol=1e-4, + ) + assert np.allclose( + opts["control_history"]["point_y"], + [0.0, -0.011298196942730383, -0.0056482862617779715], + atol=1e-4, + ) + assert np.allclose( + opts["control_history"]["point_z"], [0.0, 1.0, 0.4999281115746754], atol=1e-4 + ) + + assert np.allclose( + opts["objectives_history"]["distance_p"], + [-0.75, -0.7656459808349609, -0.5077850222587585], + atol=1e-4, + ) + assert np.allclose( + opts["objectives_history"]["distance_q"], + [-4.75, -4.703639984130859, -4.476789951324463], + atol=1e-4, + ) + + assert opts["accepted_control_indices"] == [0, 2] + + cmond = opts["cli_monitor_data"] + + assert cmond["batches"] == [0, 1, 2] + assert cmond["controls"][0]["point_x"] == 0.0 + assert cmond["controls"][0]["point_y"] == 0.0 + assert cmond["controls"][0]["point_z"] == 0.0 + + assert np.allclose( + cmond["controls"][1]["point_x"], -0.004202181916184627, atol=1e-4 + ) + assert np.allclose( + cmond["controls"][1]["point_y"], -0.011298196942730383, atol=1e-4 + ) + assert np.allclose(cmond["controls"][1]["point_z"], 1.0, atol=1e-4) + assert np.allclose( + cmond["controls"][2]["point_x"], -0.0021007888698514315, atol=1e-4 + ) + assert np.allclose( + cmond["controls"][2]["point_y"], -0.0056482862617779715, atol=1e-4 + ) + assert np.allclose(cmond["controls"][2]["point_z"], 0.4999281115746754, atol=1e-4) + + assert np.allclose( + cmond["objective_value"], + [-2.333333333333333, -2.333525975545247, -2.000048339366913], + atol=1e-4, + ) + assert np.allclose( + cmond["expected_objectives"]["distance_p"], + [-0.75, -0.7656459808349609, -0.5077850222587585], + atol=1e-4, + ) + assert np.allclose( + cmond["expected_objectives"]["distance_q"], + [-4.75, -4.703639984130859, -4.476789951324463], + atol=1e-4, + ) From f07f6e0d2c2fe5dcfa680f5bbb9068827e0d18d5 Mon Sep 17 00:00:00 2001 From: "Yngve S. Kristiansen" Date: Fri, 10 Jan 2025 09:12:48 +0100 Subject: [PATCH 06/13] Use new storage for get_opt_status --- src/everest/bin/config_branch_script.py | 26 +++++---- src/everest/detached/__init__.py | 77 +++++++++++++++++-------- 2 files changed, 69 insertions(+), 34 deletions(-) diff --git a/src/everest/bin/config_branch_script.py b/src/everest/bin/config_branch_script.py index b53453eb859..ac42c88e717 100644 --- a/src/everest/bin/config_branch_script.py +++ b/src/everest/bin/config_branch_script.py @@ -1,16 +1,15 @@ import argparse from copy import deepcopy as copy from functools import partial -from os.path import exists, join +from pathlib import Path from typing import Any from ruamel.yaml import YAML -from seba_sqlite.database import Database as seba_db -from seba_sqlite.snapshot import SebaSnapshot from everest.config import EverestConfig from everest.config_file_loader import load_yaml from everest.config_keys import ConfigKeys as CK +from everest.everest_storage import EverestStorage def _yaml_config(file_path: str, parser) -> tuple[str, dict[str, Any] | None]: @@ -46,10 +45,19 @@ def _build_args_parser(): def opt_controls_by_batch(optimization_dir, batch): - snapshot = SebaSnapshot(optimization_dir) - for opt_data in snapshot.get_optimization_data(): - if opt_data.batch_id == batch: - return opt_data.controls + storage = EverestStorage(Path(optimization_dir)) + storage.read_from_output_dir() + + control_names = storage.data.controls["control_name"] + batch_data = next((b for b in storage.data.batches if b.batch_id == batch), None) + + if batch_data: + # All geo-realizations should have the same unperturbed control values per batch + # hence it does not matter which realization we select the controls for + return batch_data.realization_controls.select( + control_names.to_list() + ).to_dicts()[0] + return None @@ -92,10 +100,6 @@ def config_branch_entry(args=None): options = parser.parse_args(args) optimization_dir, yml_config = options.input_config - db_path = join(optimization_dir, seba_db.FILENAME) - if not exists(db_path): - parser.error(f"Optimization source {db_path} not found") - opt_controls = opt_controls_by_batch(optimization_dir, options.batch) if opt_controls is None: parser.error(f"Batch {options.batch} not present in optimization data") diff --git a/src/everest/detached/__init__.py b/src/everest/detached/__init__.py index 60a3c1ab96f..ca3855de805 100644 --- a/src/everest/detached/__init__.py +++ b/src/everest/detached/__init__.py @@ -11,15 +11,15 @@ from pathlib import Path from typing import Literal +import polars import requests -from seba_sqlite.exceptions import ObjectNotFoundError -from seba_sqlite.snapshot import SebaSnapshot from ert.scheduler import create_driver from ert.scheduler.driver import Driver, FailedSubmit from ert.scheduler.event import StartedEvent from everest.config import EverestConfig, ServerConfig from everest.config_keys import ConfigKeys as CK +from everest.everest_storage import EverestStorage from everest.strings import ( EVEREST_SERVER_CONFIG, OPT_PROGRESS_ENDPOINT, @@ -124,32 +124,63 @@ def wait_for_server(output_dir: str, timeout: int) -> None: def get_opt_status(output_folder): - """Retrieve a seba database snapshot and return a dictionary with - optimization information.""" - if not os.path.exists(os.path.join(output_folder, "seba.db")): + """Return a dictionary with optimization information retrieved from storage""" + if not Path(output_folder).exists() or not os.listdir(output_folder): return {} + + storage = EverestStorage(Path(output_folder)) try: - seba_snapshot = SebaSnapshot(output_folder) - except ObjectNotFoundError: + storage.read_from_output_dir() + except FileNotFoundError: + # Optimization output dir exists and not empty, but still missing + # actual stored results return {} - snapshot = seba_snapshot.get_snapshot(filter_out_gradient=True) - - cli_monitor_data = {} - if snapshot.optimization_data: - cli_monitor_data = { - "batches": [item.batch_id for item in snapshot.optimization_data], - "controls": [item.controls for item in snapshot.optimization_data], - "objective_value": [ - item.objective_value for item in snapshot.optimization_data - ], - "expected_objectives": snapshot.expected_objectives, - } + + objective_names = storage.data.objective_functions["objective_name"].to_list() + control_names = storage.data.controls["control_name"].to_list() + + expected_objectives = polars.concat( + [ + b.batch_objectives.select(objective_names) + for b in storage.data.batches + if b.batch_objectives is not None + ] + ).to_dict(as_series=False) + + expected_total_objective = [ + b.batch_objectives["total_objective_value"].item() + for b in storage.data.batches + if b.batch_objectives is not None + ] + + improvement_batches = [b.batch_id for b in storage.data.batches if b.is_improvement] + + cli_monitor_data = { + "batches": [ + b.batch_id + for b in storage.data.batches + if b.realization_controls is not None and b.batch_objectives is not None + ], + "controls": [ + b.realization_controls.select(control_names).to_dicts()[0] + for b in storage.data.batches + if b.realization_controls is not None + ], + "objective_value": expected_total_objective, + "expected_objectives": expected_objectives, + } return { - "objective_history": snapshot.expected_single_objective, - "control_history": snapshot.optimization_controls, - "objectives_history": snapshot.expected_objectives, - "accepted_control_indices": snapshot.increased_merit_indices, + "objective_history": expected_total_objective, + "control_history": polars.concat( + [ + b.realization_controls.select(control_names) + for b in storage.data.batches + if b.realization_controls is not None + ] + ).to_dict(as_series=False), + "objectives_history": expected_objectives, + "accepted_control_indices": improvement_batches, "cli_monitor_data": cli_monitor_data, } From 5d2f6999d34c9d7f4ce28b57cc46f8cab8c4ebcd Mon Sep 17 00:00:00 2001 From: "Yngve S. Kristiansen" Date: Fri, 10 Jan 2025 09:13:23 +0100 Subject: [PATCH 07/13] Use new storage for test_main_everest_entry.py --- .../functional/test_main_everest_entry.py | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/tests/everest/functional/test_main_everest_entry.py b/tests/everest/functional/test_main_everest_entry.py index 723a7691e70..6ea31c965fc 100644 --- a/tests/everest/functional/test_main_everest_entry.py +++ b/tests/everest/functional/test_main_everest_entry.py @@ -1,9 +1,9 @@ import os +from pathlib import Path from textwrap import dedent import pytest from ruamel.yaml import YAML -from seba_sqlite.snapshot import SebaSnapshot from tests.everest.utils import ( capture_streams, skipif_no_everest_models, @@ -13,6 +13,7 @@ from everest.bin.main import start_everest from everest.config import EverestConfig, ServerConfig from everest.detached import ServerStatus, everserver_status +from everest.everest_storage import EverestStorage WELL_ORDER = "everest/model/config.yml" @@ -56,14 +57,15 @@ def test_everest_entry_run(cached_example): assert status["status"] == ServerStatus.completed - snapshot = SebaSnapshot(config.optimization_output_dir).get_snapshot() + storage = EverestStorage(Path(config.optimization_output_dir)) + storage.read_from_output_dir() + optimal = storage.get_optimal_result() - best_settings = snapshot.optimization_data[-1] - assert best_settings.controls["point_x"] == pytest.approx(0.5, abs=0.05) - assert best_settings.controls["point_y"] == pytest.approx(0.5, abs=0.05) - assert best_settings.controls["point_z"] == pytest.approx(0.5, abs=0.05) + assert optimal.controls["point_x"] == pytest.approx(0.5, abs=0.05) + assert optimal.controls["point_y"] == pytest.approx(0.5, abs=0.05) + assert optimal.controls["point_z"] == pytest.approx(0.5, abs=0.05) - assert best_settings.objective_value == pytest.approx(0.0, abs=0.0005) + assert optimal.total_objective == pytest.approx(0.0, abs=0.0005) with capture_streams(): start_everest(["everest", "monitor", config_file]) From e2d7de7d1a00ee17fcf6ce41812075cfa483a6c2 Mon Sep 17 00:00:00 2001 From: "Yngve S. Kristiansen" Date: Fri, 10 Jan 2025 10:20:12 +0100 Subject: [PATCH 08/13] Use new storage in test_everserver.py --- tests/everest/test_everserver.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/tests/everest/test_everserver.py b/tests/everest/test_everserver.py index 4075e3fbc41..07b5de64fb0 100644 --- a/tests/everest/test_everserver.py +++ b/tests/everest/test_everserver.py @@ -5,12 +5,11 @@ from pathlib import Path from unittest.mock import patch -from seba_sqlite.snapshot import SebaSnapshot - from ert.run_models.everest_run_model import EverestExitCode from everest.config import EverestConfig, OptimizationConfig, ServerConfig from everest.detached import ServerStatus, everserver_status from everest.detached.jobs import everserver +from everest.everest_storage import EverestStorage from everest.simulator import JOB_FAILURE, JOB_SUCCESS from everest.strings import OPT_FAILURE_REALIZATIONS, SIM_PROGRESS_ENDPOINT @@ -215,12 +214,11 @@ def test_everserver_status_max_batch_num( # The server should complete without error. assert status["status"] == ServerStatus.completed + storage = EverestStorage(Path(config.optimization_output_dir)) + storage.read_from_output_dir() # Check that there is only one batch. - snapshot = SebaSnapshot(config.optimization_output_dir).get_snapshot( - filter_out_gradient=False, batches=None - ) - assert {data.batch for data in snapshot.simulation_data} == {0} + assert {b.batch_id for b in storage.data.batches} == {0} @patch("sys.argv", ["name", "--config-file", "config_minimal.yml"]) From 95c8adce2be8fb384ec29c1fe3b0cb0d2e600ec8 Mon Sep 17 00:00:00 2001 From: "Yngve S. Kristiansen" Date: Fri, 10 Jan 2025 10:42:47 +0100 Subject: [PATCH 09/13] Use new storage in test_config_branch_entry.py --- .../entry_points/test_config_branch_entry.py | 42 ++++++++++--------- 1 file changed, 22 insertions(+), 20 deletions(-) diff --git a/tests/everest/entry_points/test_config_branch_entry.py b/tests/everest/entry_points/test_config_branch_entry.py index c388125cbbc..191ea487897 100644 --- a/tests/everest/entry_points/test_config_branch_entry.py +++ b/tests/everest/entry_points/test_config_branch_entry.py @@ -2,11 +2,10 @@ from os.path import exists from pathlib import Path -from seba_sqlite.snapshot import SebaSnapshot - from everest.bin.config_branch_script import config_branch_entry from everest.config_file_loader import load_yaml from everest.config_keys import ConfigKeys as CK +from everest.everest_storage import EverestStorage def test_config_branch_entry(cached_example): @@ -28,19 +27,21 @@ def test_config_branch_entry(cached_example): assert len(new_controls) == len(old_controls) assert len(new_controls[0][CK.VARIABLES]) == len(old_controls[0][CK.VARIABLES]) - opt_controls = {} - - snapshot = SebaSnapshot(Path(path) / "everest_output" / "optimization_output") - for opt_data in snapshot._optimization_data(): - if opt_data.batch_id == 1: - opt_controls = opt_data.controls + storage = EverestStorage(Path(path) / "everest_output" / "optimization_output") + storage.read_from_output_dir() new_controls_initial_guesses = { var[CK.INITIAL_GUESS] for var in new_controls[0][CK.VARIABLES] } - opt_control_val_for_batch_id = {v for k, v in opt_controls.items()} - assert new_controls_initial_guesses == opt_control_val_for_batch_id + control_names = storage.data.controls["control_name"] + batch_1_info = next(b for b in storage.data.batches if b.batch_id == 1) + realization_control_vals_mean = batch_1_info.realization_controls.select( + *control_names + ).to_dicts()[0] + control_values = set(realization_control_vals_mean.values()) + + assert new_controls_initial_guesses == control_values def test_config_branch_preserves_config_section_order(cached_example): @@ -50,15 +51,6 @@ def test_config_branch_preserves_config_section_order(cached_example): assert exists("new_restart_config.yml") - opt_controls = {} - - snapshot = SebaSnapshot(Path(path) / "everest_output" / "optimization_output") - for opt_data in snapshot._optimization_data(): - if opt_data.batch_id == 1: - opt_controls = opt_data.controls - - opt_control_val_for_batch_id = {v for k, v in opt_controls.items()} - diff_lines = [] with ( open("config_advanced.yml", encoding="utf-8") as initial_config, @@ -80,5 +72,15 @@ def test_config_branch_preserves_config_section_order(cached_example): assert len(diff_lines) == 4 assert "-initial_guess:0.25" in diff_lines - for control_val in opt_control_val_for_batch_id: + + storage = EverestStorage(Path(path) / "everest_output" / "optimization_output") + storage.read_from_output_dir() + control_names = storage.data.controls["control_name"] + batch_1_info = next(b for b in storage.data.batches if b.batch_id == 1) + realization_control_vals_mean = batch_1_info.realization_controls.select( + *control_names + ).to_dicts()[0] + control_values = set(realization_control_vals_mean.values()) + + for control_val in control_values: assert f"+initial_guess:{control_val}" in diff_lines From fd6764b7965ca62245811c1aaa59a02ba2f57474 Mon Sep 17 00:00:00 2001 From: "Yngve S. Kristiansen" Date: Fri, 10 Jan 2025 11:56:35 +0100 Subject: [PATCH 10/13] Remove SEBA reference in test_simulator_cache --- tests/everest/test_simulator_cache.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/tests/everest/test_simulator_cache.py b/tests/everest/test_simulator_cache.py index 14765b49b14..816cd4d943a 100644 --- a/tests/everest/test_simulator_cache.py +++ b/tests/everest/test_simulator_cache.py @@ -1,5 +1,3 @@ -from pathlib import Path - import numpy as np from ert.config import QueueSystem @@ -41,9 +39,6 @@ def new_call(*args): # Now do another run, where the functions should come from the cache: n_evals = 0 - # If we want to do another run, the seba database must be made new: - Path("everest_output/optimization_output/seba.db").unlink() - # The batch_id was used as a stopping criterion, so it must be reset: run_model._batch_id = 0 From 2389cbdee9f836ac0d81e458454f02470e634207 Mon Sep 17 00:00:00 2001 From: "Yngve S. Kristiansen" Date: Mon, 13 Jan 2025 14:10:09 +0100 Subject: [PATCH 11/13] review: use int for realization map --- src/everest/api/everest_data_api.py | 2 +- src/everest/everest_storage.py | 5 ++--- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/src/everest/api/everest_data_api.py b/src/everest/api/everest_data_api.py index ff90bbdd4f8..535e5262189 100644 --- a/src/everest/api/everest_data_api.py +++ b/src/everest/api/everest_data_api.py @@ -226,7 +226,7 @@ def summary_values(self, batches=None, keys=None): ) realizations = pl.Series( "realization", - [realization_map.get(str(sim)) for sim in summary["simulation"]], + [realization_map.get(int(sim)) for sim in summary["simulation"]], ) realizations = realizations.cast(pl.Int64, strict=False) summary = summary.with_columns(realizations) diff --git a/src/everest/everest_storage.py b/src/everest/everest_storage.py index 96c7685e81e..e6859e7aef6 100644 --- a/src/everest/everest_storage.py +++ b/src/everest/everest_storage.py @@ -73,7 +73,7 @@ class EverestStorageDataFrames: realization_weights: polars.DataFrame | None = None @property - def simulation_to_geo_realization_map(self) -> dict[str, str]: + def simulation_to_geo_realization_map(self) -> dict[int, int]: """ Mapping from simulation ID to geo-realization """ @@ -91,8 +91,7 @@ def simulation_to_geo_realization_map(self) -> dict[str, str]: mapping = {} for d in dummy_df.select("realization", "simulation_id").to_dicts(): - # Currently we work with str, but should maybe not be done in future - mapping[str(d["simulation_id"])] = str(d["realization"]) + mapping[int(d["simulation_id"])] = int(d["realization"]) return mapping From c3203fa4f390be42493ff98c38dad25db4f23160 Mon Sep 17 00:00:00 2001 From: "Yngve S. Kristiansen" Date: Mon, 13 Jan 2025 14:15:07 +0100 Subject: [PATCH 12/13] review: Add comment about weird function in everest api --- src/everest/api/everest_data_api.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/everest/api/everest_data_api.py b/src/everest/api/everest_data_api.py index 535e5262189..358d8694189 100644 --- a/src/everest/api/everest_data_api.py +++ b/src/everest/api/everest_data_api.py @@ -51,6 +51,8 @@ def output_constraint_names(self): ) def input_constraint(self, control): + # Note: This function is weird, its existence is probably not well-justified + # consider removing! initial_values = self._ever_storage.data.controls control_spec = initial_values.filter( pl.col("control_name") == control From 6bef204e484543b9b40da4423b78b1152ffdabdc Mon Sep 17 00:00:00 2001 From: "Yngve S. Kristiansen" Date: Wed, 15 Jan 2025 14:12:04 +0100 Subject: [PATCH 13/13] review: Stop storing result_id --- src/everest/api/everest_data_api.py | 2 +- src/everest/everest_storage.py | 18 ++---------------- 2 files changed, 3 insertions(+), 17 deletions(-) diff --git a/src/everest/api/everest_data_api.py b/src/everest/api/everest_data_api.py index 358d8694189..85ee49e9fb4 100644 --- a/src/everest/api/everest_data_api.py +++ b/src/everest/api/everest_data_api.py @@ -173,7 +173,7 @@ def gradient_values(self): if not all_batch_data: return [] - all_info = polars.concat(all_batch_data).drop("result_id") + all_info = polars.concat(all_batch_data) objective_columns = [ c for c in all_info.drop(["batch_id", "control_name"]).columns diff --git a/src/everest/everest_storage.py b/src/everest/everest_storage.py index e6859e7aef6..60842b24c3a 100644 --- a/src/everest/everest_storage.py +++ b/src/everest/everest_storage.py @@ -303,7 +303,6 @@ def _rename_ropt_df_columns(df: polars.DataFrame) -> polars.DataFrame: def _enforce_dtypes(df: polars.DataFrame) -> polars.DataFrame: dtypes = { "batch_id": polars.UInt32, - "result_id": polars.UInt32, "perturbation": polars.UInt32, "realization": polars.UInt32, # -1 is used as a value in simulator cache. @@ -445,7 +444,6 @@ def _store_function_results(self, results: FunctionResults) -> _EvaluationResult select=["objectives", "evaluation_ids"], ).reset_index(), ).select( - "result_id", "batch_id", "realization", "objective", @@ -460,7 +458,6 @@ def _store_function_results(self, results: FunctionResults) -> _EvaluationResult select=["constraints", "evaluation_ids"], ).reset_index(), ).select( - "result_id", "batch_id", "realization", "evaluation_ids", @@ -474,9 +471,7 @@ def _store_function_results(self, results: FunctionResults) -> _EvaluationResult batch_constraints = polars.from_pandas( results.to_dataframe("nonlinear_constraints").reset_index() - ).select( - "result_id", "batch_id", "nonlinear_constraint", "values", "violations" - ) + ).select("batch_id", "nonlinear_constraint", "values", "violations") batch_constraints = batch_constraints.rename( { @@ -517,16 +512,13 @@ def _store_function_results(self, results: FunctionResults) -> _EvaluationResult "functions", select=["objectives", "weighted_objective"], ).reset_index() - ).select( - "result_id", "batch_id", "objective", "objectives", "weighted_objective" - ) + ).select("batch_id", "objective", "objectives", "weighted_objective") realization_controls = polars.from_pandas( results.to_dataframe( "evaluations", select=["variables", "evaluation_ids"] ).reset_index() ).select( - "result_id", "batch_id", "variable", "realization", @@ -558,7 +550,6 @@ def _store_function_results(self, results: FunctionResults) -> _EvaluationResult realization_objectives = realization_objectives.pivot( values="objective_value", index=[ - "result_id", "batch_id", "realization", "simulation_id", @@ -579,7 +570,6 @@ def _store_gradient_results(self, results: GradientResults) -> _GradientResults: results.to_dataframe("evaluations").reset_index() ).select( [ - "result_id", "batch_id", "variable", "realization", @@ -603,7 +593,6 @@ def _store_gradient_results(self, results: GradientResults) -> _GradientResults: results.to_dataframe("gradients").reset_index() ).select( [ - "result_id", "batch_id", "variable", "objective", @@ -626,7 +615,6 @@ def _store_gradient_results(self, results: GradientResults) -> _GradientResults: if results.evaluations.perturbed_constraints is not None: perturbation_constraints = ( perturbation_objectives[ - "result_id", "batch_id", "realization", "perturbation", @@ -644,7 +632,6 @@ def _store_gradient_results(self, results: GradientResults) -> _GradientResults: if batch_objective_gradient is not None: batch_constraint_gradient = batch_objective_gradient[ - "result_id", "batch_id", "control_name", *[ @@ -827,7 +814,6 @@ def find_best_batch( batch = matching_batches[0] controls_dict = batch.realization_controls.drop( [ - "result_id", "batch_id", "simulation_id", "realization",