From 5b54ee79011fd47edd996fe4d8c5bb364630ec1f Mon Sep 17 00:00:00 2001 From: "Yngve S. Kristiansen" Date: Thu, 7 Nov 2024 10:56:11 +0100 Subject: [PATCH] Add Everest storage (will be split into nicer commits) --- src/ert/run_models/everest_run_model.py | 32 +- src/ert/storage/local_ensemble.py | 5 + src/everest/everest_storage.py | 1117 ++++++++++++++++------- 3 files changed, 797 insertions(+), 357 deletions(-) diff --git a/src/ert/run_models/everest_run_model.py b/src/ert/run_models/everest_run_model.py index 2ea3d16e28c..f3882fc9d55 100644 --- a/src/ert/run_models/everest_run_model.py +++ b/src/ert/run_models/everest_run_model.py @@ -12,7 +12,6 @@ import shutil import threading import time -from dataclasses import dataclass from pathlib import Path from types import TracebackType from typing import ( @@ -28,7 +27,6 @@ TypedDict, ) -import seba_sqlite.sqlite_storage from ropt.enums import EventType, OptimizerExitCode from ropt.plan import BasicOptimizer, Event from seba_sqlite import SqliteStorage @@ -37,6 +35,7 @@ from ert.ensemble_evaluator import EvaluatorServerConfig from ert.storage import open_storage from everest.config import EverestConfig +from everest.everest_storage import EverestStorage, OptimalResult from everest.optimizer.everest2ropt import everest2ropt from everest.simulator import Simulator from everest.simulator.everest_to_ert import everest_to_ert_config @@ -291,24 +290,6 @@ class OptimizerCallback(Protocol): def __call__(self) -> str | None: ... -@dataclass -class OptimalResult: - batch: int - controls: List[Any] - total_objective: float - - @staticmethod - def from_seba_optimal_result( - o: Optional[seba_sqlite.sqlite_storage.OptimalResult] = None, - ) -> "OptimalResult" | None: - if o is None: - return None - - return OptimalResult( - batch=o.batch, controls=o.controls, total_objective=o.total_objective - ) - - class EverestRunModel(BaseRunModel): def __init__( self, @@ -420,6 +401,15 @@ def run_experiment( # Initialize the ropt optimizer: optimizer = self._create_optimizer(simulator) + self.ever_storage = EverestStorage( + simulator, + optimizer, + Path(self.everest_config.optimization_output_dir) + / "dakota" + / "OPT_DEFAULT.out", + output_dir=Path(self.everest_config.optimization_output_dir), + ) + # The SqliteStorage object is used to store optimization results from # Seba in an sqlite database. It reacts directly to events emitted by # Seba and is not called by Everest directly. The stored results are @@ -437,6 +427,8 @@ def run_experiment( self._result = OptimalResult.from_seba_optimal_result( seba_storage.get_optimal_result() # type: ignore ) + optimal_result_from_everstorage = self.ever_storage.get_optimal_result() + assert self._result == optimal_result_from_everstorage if self._monitor_thread is not None: self._monitor_thread.stop() diff --git a/src/ert/storage/local_ensemble.py b/src/ert/storage/local_ensemble.py index f54464a97ee..ea7833e4095 100644 --- a/src/ert/storage/local_ensemble.py +++ b/src/ert/storage/local_ensemble.py @@ -25,6 +25,7 @@ from ert.storage.local_experiment import LocalExperiment from ert.storage.local_storage import LocalStorage + from everest.everest_storage import JoinedBatchDataFrames logger = logging.getLogger(__name__) @@ -512,6 +513,10 @@ def _find_state(realization: int) -> RealizationStorageState: return [_find_state(i) for i in range(self.ensemble_size)] + def save_optimization_results(self, data: JoinedBatchDataFrames): + os.mkdir(self.mount_point / "optimization_results") + data.write_to_folder(self.mount_point / "optimization_results") + def _load_single_dataset( self, group: str, diff --git a/src/everest/everest_storage.py b/src/everest/everest_storage.py index 7b4e0585133..7d90a6f65f0 100644 --- a/src/everest/everest_storage.py +++ b/src/everest/everest_storage.py @@ -1,54 +1,291 @@ -import copy +from __future__ import annotations + +import datetime +import json import logging import os -import sqlite3 -import time -from collections import namedtuple -from itertools import count +from dataclasses import dataclass, field from pathlib import Path +from typing import ( + TYPE_CHECKING, + Any, + Dict, + List, + Optional, + Tuple, +) import numpy as np - -from ropt.enums import ConstraintType, EventType +import polars +from ropt.config.enopt import EnOptConfig +from ropt.enums import EventType +from ropt.plan import BasicOptimizer, Event from ropt.results import FunctionResults, GradientResults, convert_to_maximize -from .database import Database -from .snapshot import SebaSnapshot +from seba_sqlite import sqlite_storage -OptimalResult = namedtuple( - "OptimalResult", "batch, controls, total_objective, expected_objectives" -) +from everest.simulator import Simulator logger = logging.getLogger(__name__) +if TYPE_CHECKING: + pass -def _convert_names(control_names): - converted_names = [] - for name in control_names: - converted = f"{name[0]}_{name[1]}" - if len(name) > 2: - converted += f"-{name[2]}" - converted_names.append(converted) - return converted_names +@dataclass +class OptimalResult: + batch: int + controls: List[Any] + total_objective: float -class EverestStorage: - # This implementation builds as much as possible on the older database and - # snapshot code, since it is meant for backwards compatibility, and should - # not be extended with new functionality. + @staticmethod + def from_seba_optimal_result( + o: Optional[sqlite_storage.OptimalResult] = None, + ) -> "OptimalResult" | None: + if o is None: + return None + + return OptimalResult( + batch=o.batch, controls=o.controls, total_objective=o.total_objective + ) + + +@dataclass +class BatchDataFrames: + batch_id: int + objective_for_batch: Optional[polars.DataFrame] + objectives_per_real: Optional[polars.DataFrame] + constraints_for_batch: Optional[polars.DataFrame] + constraints_per_real: Optional[polars.DataFrame] + obj_gradient_for_batch: Optional[polars.DataFrame] + obj_gradient_per_real: Optional[polars.DataFrame] + constraint_gradient_for_batch: Optional[polars.DataFrame] + constraint_gradient_per_real: Optional[polars.DataFrame] + + @property + def existing_dataframes(self) -> Dict[str, polars.DataFrame]: + dataframes = {} + + if self.objective_for_batch is not None: + dataframes["objective_for_batch"] = self.objective_for_batch + + if self.objectives_per_real is not None: + dataframes["objectives_per_real"] = self.objectives_per_real + + if self.constraints_for_batch is not None: + dataframes["constraints_for_batch"] = self.constraints_for_batch + + if self.constraints_per_real is not None: + dataframes["constraints_per_real"] = self.constraints_per_real + + if self.obj_gradient_for_batch is not None: + dataframes["obj_gradient_for_batch"] = self.obj_gradient_for_batch + + if self.obj_gradient_per_real is not None: + dataframes["obj_gradient_per_real"] = self.obj_gradient_per_real + + if self.constraint_gradient_for_batch is not None: + dataframes["constraint_gradient_for_batch"] = ( + self.constraint_gradient_for_batch + ) + + if self.constraint_gradient_per_real is not None: + dataframes["constraint_gradient_per_real"] = ( + self.constraint_gradient_per_real + ) + + return dataframes + + +@dataclass +class EverestStorageDataFrames: + batches: List[BatchDataFrames] = field(default_factory=list) + time_ended: Optional[datetime.date] = None + initial_values: Optional[polars.DataFrame] = None + objective_functions: Optional[polars.DataFrame] = None + nonlinear_constraints: Optional[polars.DataFrame] = None + realization_weights: Optional[polars.DataFrame] = None + + def write_to_experiment(self, experiment: _OptimizerOnlyExperiment): + # Stored in ensembles instead + # self.objective_for_batch.write_parquet(path / "objective_data.parquet") + # self.gradient_evaluation.write_parquet(path / "gradient_evaluation.parquet") + # self.gradient.write_parquet(path / "gradient.parquet") + + # The stuff under experiment should maybe be JSON? + # 2DO PICK ONE, for now doing both for proof of concept + + with open( + experiment.optimizer_mount_point / "initial_values.json", + mode="w+", + encoding="utf-8", + ) as f: + f.write(json.dumps(self.initial_values.to_dicts())) + + self.initial_values.write_parquet( + experiment.optimizer_mount_point / "initial_values.parquet" + ) + + with open( + experiment.optimizer_mount_point / "objective_functions.json", + mode="w+", + encoding="utf-8", + ) as f: + f.write(json.dumps(self.objective_functions.to_dicts())) + + self.objective_functions.write_parquet( + experiment.optimizer_mount_point / "objective_functions.parquet" + ) + + if self.nonlinear_constraints is not None: + with open( + experiment.optimizer_mount_point / "nonlinear_constraints.json", + mode="w+", + encoding="utf-8", + ) as f: + f.write(json.dumps(self.nonlinear_constraints.to_dicts())) + self.nonlinear_constraints.write_parquet( + experiment.optimizer_mount_point / "nonlinear_constraints.parquet" + ) + + if self.realization_weights is not None: + with open( + experiment.optimizer_mount_point / "realization_weights.json", + mode="w+", + encoding="utf-8", + ) as f: + f.write(json.dumps(self.realization_weights.to_dicts())) + + self.realization_weights.write_parquet( + experiment.optimizer_mount_point / "realization_weights.parquet" + ) + + for batch_data in self.batches: + for df_key, df in batch_data.existing_dataframes.items(): + df.write_parquet(experiment.optimizer_mount_point / f"{df_key}.parquet") + + def read_from_experiment( + self, experiment: _OptimizerOnlyExperiment + ) -> EverestStorageDataFrames: + self.initial_values = polars.read_parquet( + experiment.optimizer_mount_point / "initial_values.parquet" + ) + self.objective_functions = polars.read_parquet( + experiment.optimizer_mount_point / "objective_functions.parquet" + ) - def __init__(self, optimizer, output_dir): - # Internal variables. + if ( + experiment.optimizer_mount_point / "nonlinear_constraints.parquet" + ).exists(): + self.nonlinear_constraints = polars.read_parquet( + experiment.optimizer_mount_point / "nonlinear_constraints.parquet" + ) + + if (experiment.optimizer_mount_point / "realization_weights.parquet").exists(): + self.realization_weights = polars.read_parquet( + experiment.optimizer_mount_point / "realization_weights.parquet" + ) + + for ens in experiment.ensembles: + batch_id = int(ens.split("_")[1]) + objective = ( + polars.read_parquet(ens.optimizer_mount_point / "objective.parquet") + if (ens.optimizer_mount_point / "objective.parquet").exists() + else None + ) + gradient = ( + polars.read_parquet(ens.optimizer_mount_point / "gradient.parquet") + if (ens.optimizer_mount_point / "gradient.parquet").exists() + else None + ) + gradient_evaluation = ( + polars.read_parquet( + ens.optimizer_mount_point / "gradient_evaluation.parquet" + ) + if (ens.optimizer_mount_point / "gradient_evaluation.parquet").exists() + else None + ) + self.batches.append( + BatchDataFrames(batch_id, objective, gradient, gradient_evaluation) + ) + + self.batches.sort(key=lambda b: b.batch_id) + + +class _OptimizerOnlyEnsemble: + def __init__(self, output_dir: Path): + self._output_dir = output_dir + + @property + def optimizer_mount_point(self) -> Path: + if not (self._output_dir / "optimizer").exists(): + Path.mkdir(self._output_dir / "optimizer", parents=True) + + return self._output_dir / "optimizer" + + +class _OptimizerOnlyExperiment: + def __init__(self, output_dir: Path): self._output_dir = output_dir - self._database = Database(output_dir) + self._ensembles = {} + + @property + def optimizer_mount_point(self) -> Path: + if not (self._output_dir / "optimizer").exists(): + Path.mkdir(self._output_dir / "optimizer", parents=True) + + return self._output_dir / "optimizer" + + def get_ensemble_by_name(self, name: str) -> _OptimizerOnlyEnsemble: + if name not in self._ensembles: + self._ensembles[name] = _OptimizerOnlyEnsemble(self._output_dir / name) + + return self._ensembles[name] + + +class EverestStorage: + def __init__( + self, + simulator: Simulator, + optimizer: BasicOptimizer, + merit_file: Path, + output_dir: Path, + ) -> None: + self._simulator = simulator + self._optimizer = optimizer + self._initialized = False self._control_ensemble_id = 0 self._gradient_ensemble_id = 0 self._simulator_results = None - self._merit_file = Path(output_dir) / "dakota" / "OPT_DEFAULT.out" - # Connect event handlers. - self._set_event_handlers(optimizer) + self._merit_file = merit_file - self._initialized = False + optimizer.add_observer(EventType.START_OPTIMIZER_STEP, self._initialize) + optimizer.add_observer( + EventType.FINISHED_EVALUATION, self._handle_finished_batch_event + ) + optimizer.add_observer( + EventType.FINISHED_OPTIMIZER_STEP, + self._handle_finished_event, + ) + + self._dataframes = EverestStorageDataFrames() + self._experiment = _OptimizerOnlyExperiment(output_dir) + + @property + def experiment(self) -> _OptimizerOnlyExperiment: + # Should be replaced with ERT experiment + # in the long run + return self._experiment + + @staticmethod + def _convert_names(control_names): + converted_names = [] + for name in control_names: + converted = f"{name[0]}_{name[1]}" + if len(name) > 2: + converted += f"-{name[2]}" + converted_names.append(converted) + return converted_names @property def file(self): @@ -59,232 +296,359 @@ def _initialize(self, event): return self._initialized = True - self._database.add_experiment( - name="optimization_experiment", start_time_stamp=time.time() + config = event.config + self._dataframes.initial_values = polars.DataFrame( + { + "control_name": polars.Series( + self._convert_names(config.variables.names), dtype=polars.String + ), + "initial_value": polars.Series( + config.variables.initial_values, dtype=polars.Float32 + ), + "lower_bounds": polars.Series( + config.variables.lower_bounds, dtype=polars.Float32 + ), + "upper_bounds": polars.Series( + config.variables.upper_bounds, dtype=polars.Float32 + ), + } ) - # Add configuration values. - config = event.config - for control_name, initial_value, lower_bound, upper_bound in zip( - _convert_names(config.variables.names), - config.variables.initial_values, - config.variables.lower_bounds, - config.variables.upper_bounds, - ): - self._database.add_control_definition( - control_name, initial_value, lower_bound, upper_bound + self._dataframes.objective_functions = polars.DataFrame( + { + "name": config.objective_functions.names, + "weight": polars.Series( + config.objective_functions.weights, dtype=polars.Float32 + ), + "normalization": polars.Series( + [1.0 / s for s in config.objective_functions.scales], + dtype=polars.Float32, + ), + } + ) + + if config.nonlinear_constraints is not None: + self._dataframes.nonlinear_constraints = polars.DataFrame( + { + "name": config.nonlinear_constraints.names, + "normalization": config.nonlinear_constraints.scales, + "constraint_rhs_value": config.nonlinear_constraints.rhs_values, + "constraint_type": config.nonlinear_constraints.types, + } ) - for name, weight, scale in zip( - config.objective_functions.names, - config.objective_functions.weights, - config.objective_functions.scales, - ): - self._database.add_function( - name=name, - function_type="OBJECTIVE", - weight=weight, - normalization=1.0 / scale, + self._dataframes.realization_weights = polars.DataFrame( + { + "realization": polars.Series( + config.realizations.names, dtype=polars.UInt16 + ), + "weight": polars.Series( + config.realizations.weights, dtype=polars.Float32 + ), + } + ) + + def _store_function_results( + self, config: EnOptConfig, results: FunctionResults + ) -> polars.DataFrame: + objectives_per_real = polars.from_pandas( + results.to_dataframe(config, "evaluations").reset_index() + ) + objectives_for_batch = polars.from_pandas( + results.to_dataframe(config, "functions").reset_index() + ) + + try: + constraints_for_batch = polars.from_pandas( + results.to_dataframe(config, "nonlinear_constraints").reset_index() ) + except AttributeError: + constraints_for_batch = None - if config.nonlinear_constraints is not None: - for name, scale, rhs_value, constraint_type in zip( - config.nonlinear_constraints.names, - config.nonlinear_constraints.scales, - config.nonlinear_constraints.rhs_values, - config.nonlinear_constraints.types, - ): - self._database.add_function( - name=name, - function_type="CONSTRAINT", - normalization=scale, - rhs_value=rhs_value, - constraint_type=ConstraintType(constraint_type).name.lower(), - ) + constraints_per_real = None + + objectives_for_batch = self._rename_columns(objectives_for_batch) + objectives_per_real = self._rename_columns(objectives_per_real) + objective_names = objectives_per_real["objective_name"].unique().to_list() - for name, weight in zip( - config.realizations.names, - config.realizations.weights, - ): - self._database.add_realization(str(name), weight) - - def _add_batch(self, config, controls, perturbed_controls): - self._gradient_ensemble_id += 1 - self._control_ensemble_id = self._gradient_ensemble_id - control_names = _convert_names(config.variables.names) - for control_name, value in zip(control_names, controls): - self._database.add_control_value( - set_id=self._control_ensemble_id, - control_name=control_name, - value=value, + has_scaled_objectives = "scaled_objective_value" in objectives_per_real.columns + + objectives_for_batch = objectives_for_batch.pivot( + on="objective_name", + values=["objective_value", "scaled_objective_value"] + if has_scaled_objectives + else ["objective_value"], + separator=":", + ) + + if has_scaled_objectives: + objectives_for_batch = objectives_for_batch.rename( + { + **{f"objective_value:{name}": name for name in objective_names}, + **( + { + f"scaled_objective_value:{name}": f"{name}.scaled" + for name in objective_names + } + if has_scaled_objectives + else {} + ), + } ) - if perturbed_controls is not None: - perturbed_controls = perturbed_controls.reshape( - perturbed_controls.shape[0], -1 + + if constraints_for_batch is not None: + constraints_for_batch = constraints_for_batch.rename( + { + "nonlinear_constraint": "constraint_name", + "values": "constraint_value", + "violations": "constraint_violation", + "scaled_values": "scaled_constraint_value", + "scaled_violations": "scaled_constraint_violation", + } + ) + + constraint_names = ( + constraints_for_batch["constraint_name"].unique().to_list() ) - self._gradient_ensemble_id = self._control_ensemble_id - for g_idx in range(perturbed_controls.shape[1]): - self._gradient_ensemble_id += 1 - for c_idx, c_name in enumerate(control_names): - self._database.add_control_value( - set_id=self._gradient_ensemble_id, - control_name=c_name, - value=perturbed_controls[c_idx, g_idx], - ) - - def _add_simulations(self, config, result): - self._gradient_ensemble_id = self._control_ensemble_id - simulation_index = count() - if isinstance(result, FunctionResults): - for realization_name in config.realizations.names: - self._database.add_simulation( - realization_name=str(realization_name), - set_id=self._control_ensemble_id, - sim_name=f"{result.batch_id}_{next(simulation_index)}", - is_gradient=False, + + constraints_for_batch = constraints_for_batch.pivot( + on="constraint_name", + values=[ + "constraint_value", + "scaled_constraint_value", + "constraint_violation", + "scaled_constraint_violation", + ], + separator=";", + ).rename( + { + **{f"constraint_value;{name}": name for name in constraint_names}, + **{ + f"scaled_constraint_value;{name}": f"{name}.scaled" + for name in constraint_names + }, + **{ + f"constraint_violation;{name}": f"{name}.violation" + for name in constraint_names + }, + **{ + f"scaled_constraint_violation;{name}": f"{name}.scaled_violation" + for name in constraint_names + }, + } + ) + + # remove from main table, and create separate constraints table + constraints_per_real = objectives_per_real[ + "batch_id", + "realization", + "constraint_name", + "constraint_value", + "scaled_constraint_value", + "control_name", + "control_value", + ] + constraints_per_real = ( + constraints_per_real.pivot( + values=["constraint_value", "scaled_constraint_value"], + on="constraint_name", + separator=";", ) - if isinstance(result, GradientResults): - for realization_name in config.realizations.names: - for _ in range(config.gradient.number_of_perturbations): - self._gradient_ensemble_id += 1 - self._database.add_simulation( - realization_name=str(realization_name), - set_id=self._gradient_ensemble_id, - sim_name=f"{result.batch_id}_{next(simulation_index)}", - is_gradient=True, - ) - - def _add_simulator_results( - self, config, batch, objective_results, constraint_results - ): - if constraint_results is None: - results = objective_results - else: - results = np.vstack((objective_results, constraint_results)) - statuses = np.logical_and.reduce(np.isfinite(results), axis=0) - names = config.objective_functions.names - if config.nonlinear_constraints is not None: - names += config.nonlinear_constraints.names - - for sim_idx, status in enumerate(statuses): - sim_name = f"{batch}_{sim_idx}" - for function_idx, name in enumerate(names): - if status: - self._database.add_simulation_result( - sim_name, results[function_idx, sim_idx], name, 0 - ) - self._database.set_simulation_ended(sim_name, status) - - def _add_constraint_values(self, config, batch, constraint_values): - statuses = np.logical_and.reduce(np.isfinite(constraint_values), axis=0) - for sim_id, status in enumerate(statuses): - if status: - for idx, constraint_name in enumerate( - config.nonlinear_constraints.names - ): - # Note the time_index=0, the database supports storing - # multipel time-points, but we do not support that, so we - # use times_index=0. - self._database.update_simulation_result( - simulation_name=f"{batch}_{sim_id}", - function_name=constraint_name, - times_index=0, - value=constraint_values[idx, sim_id], - ) - - def _add_gradients(self, config, objective_gradients): - for grad_index, gradient in enumerate(objective_gradients): - for control_index, control_name in enumerate( - _convert_names(config.variables.names) - ): - self._database.add_gradient_result( - gradient[control_index], - config.objective_functions.names[grad_index], - 1, - control_name, + .rename( + { + **{ + f"constraint_value;{name}": name + for name in constraint_names + }, + **{ + f"scaled_constraint_value;{name}": f"{name}.scaled" + for name in constraint_names + }, + } ) + .pivot(on="control_name", values="control_value") + ) - def _add_total_objective(self, total_objective): - self._database.add_calculation_result( - set_id=self._control_ensemble_id, - object_function_value=total_objective, + objectives_per_real = objectives_per_real.drop( + [c for c in objectives_per_real.columns if "constraint" in c.lower()] + ).unique(subset=["batch_id", "realization", "control_name"]) + objectives_for_batch = objectives_for_batch.drop( + [c for c in objectives_for_batch.columns if "constraint" in c.lower()] + ).unique(subset=["batch_id"]) + + objectives_per_real = objectives_per_real.pivot( + values="objective_value", + index=["batch_id", "realization", "control_name", "control_value"], + columns="objective_name", + ).pivot( + values="control_value", + index=["batch_id", "realization", *objective_names], + columns="control_name", ) - def _convert_constraints(self, config, constraint_results): - constraint_results = copy.deepcopy(constraint_results) - rhs_values = config.nonlinear_constraints.rhs_values - for idx, constraint_type in enumerate(config.nonlinear_constraints.types): - constraint_results[idx] -= rhs_values[idx] - if constraint_type == ConstraintType.LE: - constraint_results[idx] *= -1.0 - return constraint_results - - def _store_results(self, config, results): - if isinstance(results, FunctionResults): - objective_results = results.evaluations.objectives - objective_results = np.moveaxis(objective_results, -1, 0) - constraint_results = results.evaluations.constraints - if constraint_results is not None: - constraint_results = np.moveaxis(constraint_results, -1, 0) - else: - objective_results = None - constraint_results = None - - if isinstance(results, GradientResults): - perturbed_variables = results.evaluations.perturbed_variables - perturbed_variables = np.moveaxis(perturbed_variables, -1, 0) - perturbed_objectives = results.evaluations.perturbed_objectives - perturbed_objectives = np.moveaxis(perturbed_objectives, -1, 0) - perturbed_constraints = results.evaluations.perturbed_constraints - if perturbed_constraints is not None: - perturbed_constraints = np.moveaxis(perturbed_constraints, -1, 0) - else: - perturbed_variables = None - perturbed_objectives = None - perturbed_constraints = None + return ( + objectives_for_batch, + objectives_per_real, + constraints_for_batch, + constraints_per_real, + ) - self._add_batch(config, results.evaluations.variables, perturbed_variables) - self._add_simulations(config, results) + @staticmethod + def _rename_columns(df: polars.DataFrame): + _renames = { + "objective": "objective_name", + "weighted_objective": "total_objective_value", + "variable": "control_name", + "variables": "control_value", + "objectives": "objective_value", + "constraints": "constraint_value", + "nonlinear_constraint": "constraint_name", + "scaled_constraints": "scaled_constraint_value", + "scaled_objectives": "scaled_objective_value", + "perturbed_variables": "perturbed_control_value", + "perturbed_objectives": "perturbed_objective_value", + "perturbed_constraints": "perturbed_constraint_value", + "scaled_perturbed_objectives": "scaled_perturbed_objective_value", + "scaled_perturbed_constraints": "scaled_perturbed_constraint_value", + } + return df.rename({k: v for k, v in _renames.items() if k in df.columns}) + + def _store_gradient_results( + self, config: EnOptConfig, results: FunctionResults + ) -> Tuple[polars.DataFrame, polars.DataFrame]: + obj_gradient_per_real = polars.from_pandas( + results.to_dataframe(config, "evaluations").reset_index() + ) + obj_gradient_for_batch = polars.from_pandas( + results.to_dataframe(config, "gradients").reset_index() + ) - # Convert back the simulation results to the legacy format: - if perturbed_objectives is not None: - perturbed_objectives = perturbed_objectives.reshape( - perturbed_objectives.shape[0], -1 - ) - if objective_results is None: - objective_results = perturbed_objectives - else: - objective_results = np.hstack((objective_results, perturbed_objectives)) + # ROPT_NOTE: Why does ROPT have the control variable name as a col here??? + obj_gradient_for_batch = obj_gradient_for_batch.drop("variable").unique() - if config.nonlinear_constraints is not None: - if perturbed_constraints is not None: - perturbed_constraints = perturbed_constraints.reshape( - perturbed_constraints.shape[0], -1 + obj_gradient_per_real = self._rename_columns(obj_gradient_per_real) + obj_gradient_for_batch = self._rename_columns(obj_gradient_for_batch) + + if "constraint_name" in obj_gradient_for_batch: + constraint_names = ( + obj_gradient_per_real["constraint_name"].unique().to_list() + ) + constraint_gradient_perturbations = ( + obj_gradient_per_real[ + "batch_id", + "realization", + "perturbation", + "control_name", + "perturbed_control_value", + *[ + c + for c in obj_gradient_per_real.columns + if "constraint" in c.lower() + ], + ] + .pivot( + on="constraint_name", + values=[ + "perturbed_constraint_value", + "scaled_perturbed_constraint_value", + ], + separator=";", ) - if constraint_results is None: - constraint_results = perturbed_constraints - else: - constraint_results = np.hstack( - (constraint_results, perturbed_constraints) - ) - # The legacy code converts all constraints to the form f(x) >=0: - constraint_results = self._convert_constraints(config, constraint_results) - - self._add_simulator_results( - config, results.batch_id, objective_results, constraint_results + .rename( + { + **{ + f"perturbed_constraint_value;{name}": name + for name in constraint_names + }, + **{ + f"scaled_perturbed_constraint_value;{name}": f"{name}.scaled" + for name in constraint_names + }, + } + ) + .pivot(on="control_name", values="perturbed_control_value") + ) + + constraint_gradient_for_batch = obj_gradient_for_batch[ + "batch_id", + *[ + c + for c in obj_gradient_for_batch.columns + if "constraint" in c.lower() + ], + ] + + obj_gradient_for_batch = obj_gradient_for_batch.drop( + [c for c in obj_gradient_for_batch.columns if "constraint" in c.lower()] + ).unique() + + obj_gradient_per_real = obj_gradient_per_real.drop( + [c for c in obj_gradient_per_real.columns if "constraint" in c.lower()] + ).unique() + else: + constraint_gradient_for_batch = None + constraint_gradient_perturbations = None + + obj_gradient_perturbations = obj_gradient_per_real.drop( + "perturbed_evaluation_ids", "control_value" ) - if config.nonlinear_constraints: - self._add_constraint_values(config, results.batch_id, constraint_results) - if isinstance(results, FunctionResults) and results.functions is not None: - self._add_total_objective(results.functions.weighted_objective) - if isinstance(results, GradientResults) and results.gradients is not None: - self._add_gradients(config, results.gradients.objectives) - def _handle_finished_batch_event(self, event): - logger.debug("Storing batch results in the sqlite database") + obj_gradient_perturbations = obj_gradient_perturbations.pivot( + on="objective_name", values="perturbed_objective_value" + ) + + obj_gradient_perturbations = obj_gradient_perturbations.pivot( + on="control_name", values="perturbed_control_value" + ) + + # All that remains in obj_gradient_per_real is + # control values per realization, which is redundant to store here. + + objective_names = obj_gradient_for_batch["objective_name"].unique().to_list() + obj_gradient_for_batch = obj_gradient_for_batch.pivot( + on="objective_name", + values=["objective_value", "total_objective_value"], + separator=";", + ).rename( + { + **{f"objective_value;{name}": name for name in objective_names}, + **{ + f"total_objective_value;{name}": f"{name}.total" + for name in objective_names + }, + } + ) + + constraint_names = ( + constraint_gradient_for_batch["constraint_name"].unique().to_list() + ) + constraint_gradient_for_batch = constraint_gradient_for_batch.pivot( + on="constraint_name", + values=["constraint_value", "scaled_constraint_value"], + separator=";", + ).rename( + { + **{f"constraint_value;{name}": name for name in constraint_names}, + **{ + f"scaled_constraint_value;{name}": f"{name}.scaled" + for name in constraint_names + }, + } + ) + + return ( + obj_gradient_for_batch, + obj_gradient_perturbations, + constraint_gradient_for_batch, + constraint_gradient_perturbations, + ) + + def _handle_finished_batch_event(self, event: Event): + logger.debug("Storing batch results dataframes") converted_results = tuple( - convert_to_maximize(result) for result in event.results) + convert_to_maximize(result) for result in event.results + ) results = [] best_value = -np.inf best_results = None @@ -292,123 +656,202 @@ def _handle_finished_batch_event(self, event): if isinstance(item, GradientResults): results.append(item) if ( - isinstance(item, FunctionResults) - and item.functions is not None - and item.functions.weighted_objective > best_value + isinstance(item, FunctionResults) + and item.functions is not None + and item.functions.weighted_objective > best_value ): best_value = item.functions.weighted_objective best_results = item + if best_results is not None: - results = [best_results] + results + results = [best_results, *results] last_batch = -1 + + _batches = {} for item in results: - if item.batch_id != last_batch: - self._database.add_batch() - self._store_results(event.config, item) - if item.batch_id != last_batch: - self._database.set_batch_ended - last_batch = item.batch_id + if item.batch_id not in _batches: + _batches[item.batch_id] = {} + + if isinstance(item, FunctionResults): + ( + objectives_for_batch, + objectives_per_real, + constraints_for_batch, + constraints_per_real, + ) = self._store_function_results(event.config, item) + + _batches[item.batch_id]["objective_for_batch"] = objectives_for_batch + _batches[item.batch_id]["objectives_per_real"] = objectives_per_real + _batches[item.batch_id]["constraints_for_batch"] = constraints_for_batch + _batches[item.batch_id]["constraints_per_real"] = constraints_per_real - self._database.set_batch_ended(time.time(), True) + if isinstance(item, GradientResults): + ( + obj_gradient_for_batch, + obj_gradient_per_real, + constraint_gradient_for_batch, + constraint_gradient_per_real, + ) = self._store_gradient_results(event.config, item) + + _batches[item.batch_id]["obj_gradient_for_batch"] = ( + obj_gradient_for_batch + ) + _batches[item.batch_id]["obj_gradient_per_real"] = obj_gradient_per_real + _batches[item.batch_id]["constraint_gradient_for_batch"] = ( + constraint_gradient_for_batch + ) + _batches[item.batch_id]["constraint_gradient_per_real"] = ( + constraint_gradient_per_real + ) - # Merit values are dakota specific, load them if the output file exists: - self._database.update_calculation_result(_get_merit_values(self._merit_file)) + if item.batch_id != last_batch: + pass + # Q: Could apply timestamps here but, necessary?.. + # self._database.set_batch_ended + last_batch = item.batch_id - backup_data(self._database.location) + for batch_id, info in _batches.items(): + self._dataframes.batches.append( + BatchDataFrames( + batch_id=batch_id, + objective_for_batch=info.get("objective_for_batch"), + objectives_per_real=info.get("objectives_per_real"), + constraints_for_batch=info.get("constraints_for_batch"), + constraints_per_real=info.get("constraints_per_real"), + obj_gradient_for_batch=info.get("obj_gradient_for_batch"), + obj_gradient_per_real=info.get("obj_gradient_per_real"), + constraint_gradient_for_batch=info.get( + "constraint_gradient_for_batch" + ), + constraint_gradient_per_real=info.get( + "constraint_gradient_per_real" + ), + ) + ) def _handle_finished_event(self, event): logger.debug("Storing final results in the sqlite database") - self._database.update_calculation_result(_get_merit_values(self._merit_file)) - self._database.set_experiment_ended(time.time()) - def _set_event_handlers(self, optimizer): - optimizer.add_observer( - EventType.START_OPTIMIZER_STEP, self._initialize - ) - optimizer.add_observer( - EventType.FINISHED_EVALUATION, self._handle_finished_batch_event - ) - optimizer.add_observer( - EventType.FINISHED_OPTIMIZER_STEP, - self._handle_finished_event, - ) + merit_values = self._get_merit_values() + improvement_batches = [ + b for b in self._dataframes.batches if b.objective_for_batch is not None + ][1:] + for i, b in enumerate(improvement_batches): + merit_value = next( + (m["value"] for m in merit_values if (m["iter"] - 1) == i), None + ) + if merit_value is None: + continue + + b.objective_for_batch = b.objective_for_batch.with_columns( + polars.lit(merit_value).alias("merit_value") + ) + + self._dataframes.write_to_experiment(self.experiment) def get_optimal_result(self): - snapshot = SebaSnapshot(self._output_dir) - optimum = next( - ( - data - for data in reversed(snapshot.get_optimization_data()) - if data.merit_flag - ), - None, - ) - if optimum is None: - return None - objectives = snapshot.get_snapshot(batches=[optimum.batch_id]) - return OptimalResult( - batch=optimum.batch_id, - controls=optimum.controls, - total_objective=optimum.objective_value, - expected_objectives={ - name:value[0] for name, value in objectives.expected_objectives.items() - }, + # Only used in tests, not super important + has_merit = any( + "merit_value" in b.objective_for_batch.columns + for b in self._dataframes.batches + if b.objective_for_batch is not None ) + if has_merit: + # Minimize merit + sorted_batches = [ + b + for b in self._dataframes.batches + if b.objective_for_batch is not None + and "merit_value" in b.objective_for_batch.columns + ] + sorted_batches.sort( + key=lambda b: b.objective_for_batch.select( + polars.col("merit_value").min() + ).item() + ) + batch = sorted_batches[0] + + return OptimalResult( + batch=batch.batch_id, + controls={ + k: batch.objectives_per_real[k].sample(1).item() + for k in self._dataframes.initial_values["control_name"] + }, + total_objective=batch.objective_for_batch.select( + polars.col("total_objective_value").sample(n=1) + ).item(), + ) + else: + # Maximize objective + sorted_batches = [ + b + for b in self._dataframes.batches + if b.objective_for_batch is not None + and not b.objective_for_batch.is_empty() + ] + sorted_batches.sort( + key=lambda b: -b.objective_for_batch.select( + polars.col("total_objective_value").sample(n=1) + ).item() + ) + batch = sorted_batches[0] + + return OptimalResult( + batch=batch.batch_id, + controls={ + k: batch.objectives_per_real[k].sample(1).item() + for k in self._dataframes.initial_values["control_name"] + }, + total_objective=batch.objective_for_batch.select( + polars.col("total_objective_value") + ).item(), + ) -def backup_data(database_location) -> None: - src = sqlite3.connect(database_location) - dst = sqlite3.connect(database_location + ".backup") - with dst: - src.backup(dst) - src.close() - dst.close() - - -def _get_merit_fn_lines(merit_path): - if os.path.isfile(merit_path): - with open(merit_path, "r", errors="replace", encoding="utf-8") as reader: - lines = reader.readlines() - start_line_idx = -1 - for inx, line in enumerate(lines): - if "Merit" in line and "feval" in line: - start_line_idx = inx + 1 - if start_line_idx > -1 and line.startswith("="): - return lines[start_line_idx:inx] - if start_line_idx > -1: - return lines[start_line_idx:] - return [] - - -def _parse_merit_line(merit_values_string): - values = [] - for merit_elem in merit_values_string.split(): - try: - values.append(float(merit_elem)) - except ValueError: - for elem in merit_elem.split("0x")[1:]: - values.append(float.fromhex("0x" + elem)) - if len(values) == 8: - # Dakota starts counting at one, correct to be zero-based. - return int(values[5]) - 1, values[4] - return None - - -def _get_merit_values(merit_file): - # Read the file containing merit information. - # The file should contain the following table header - # Iter F(x) mu alpha Merit feval btracks Penalty - # :return: merit values indexed by the function evaluation number - # example: - # 0: merit_value_0 - # 1: merit_value_1 - # 2 merit_value_2 - # ... - # ] - merit_values = [] - if merit_file.exists(): - for line in _get_merit_fn_lines(merit_file): - value = _parse_merit_line(line) - if value is not None: - merit_values.append({"iter":value[0], "value":value[1]}) - return merit_values + @staticmethod + def _get_merit_fn_lines(merit_path): + if os.path.isfile(merit_path): + with open(merit_path, "r", errors="replace", encoding="utf-8") as reader: + lines = reader.readlines() + start_line_idx = -1 + for inx, line in enumerate(lines): + if "Merit" in line and "feval" in line: + start_line_idx = inx + 1 + if start_line_idx > -1 and line.startswith("="): + return lines[start_line_idx:inx] + if start_line_idx > -1: + return lines[start_line_idx:] + return [] + + @staticmethod + def _parse_merit_line(merit_values_string): + values = [] + for merit_elem in merit_values_string.split(): + try: + values.append(float(merit_elem)) + except ValueError: + for elem in merit_elem.split("0x")[1:]: + values.append(float.fromhex("0x" + elem)) + if len(values) == 8: + # Dakota starts counting at one, correct to be zero-based. + return int(values[5]) - 1, values[4] + return None + + def _get_merit_values(self): + # Read the file containing merit information. + # The file should contain the following table header + # Iter F(x) mu alpha Merit feval btracks Penalty + # :return: merit values indexed by the function evaluation number + # example: + # 0: merit_value_0 + # 1: merit_value_1 + # 2 merit_value_2 + # ... + # ] + merit_values = [] + if self._merit_file.exists(): + for line in EverestStorage._get_merit_fn_lines(self._merit_file): + value = EverestStorage._parse_merit_line(line) + if value is not None: + merit_values.append({"iter": value[0], "value": value[1]}) + return merit_values