From 91ea3633038762878bff47222162dc6b9aa6ceeb Mon Sep 17 00:00:00 2001 From: Frode Aarstad <frodeaarstad@gmail.com> Date: Tue, 16 Jan 2024 14:20:13 +0100 Subject: [PATCH] Improve plotting performance --- src/ert/dark_storage/common.py | 6 +- src/ert/dark_storage/endpoints/ensembles.py | 3 - src/ert/dark_storage/endpoints/experiments.py | 2 - src/ert/dark_storage/endpoints/records.py | 22 ++--- src/ert/dark_storage/json_schema/ensemble.py | 15 +--- src/ert/gui/tools/plot/plot_api.py | 32 +++---- src/ert/gui/tools/plot/plot_window.py | 7 +- src/ert/storage/local_ensemble.py | 87 +++++++++++++------ tests/performance_tests/performance_utils.py | 8 +- .../test_dark_storage_performance.py | 45 ++-------- .../dark_storage/test_http_endpoints.py | 22 ----- .../gui/tools/plot/test_plot_api.py | 11 ++- 12 files changed, 106 insertions(+), 154 deletions(-) diff --git a/src/ert/dark_storage/common.py b/src/ert/dark_storage/common.py index 0cf4bfee031..7060ee68d05 100644 --- a/src/ert/dark_storage/common.py +++ b/src/ert/dark_storage/common.py @@ -30,7 +30,6 @@ def get_response_names(ensemble: EnsembleReader) -> List[str]: def data_for_key( ensemble: EnsembleReader, key: str, - realization_index: Optional[int] = None, ) -> pd.DataFrame: """Returns a pandas DataFrame with the datapoints for a given key for a given case. The row index is the realization number, and the columns are an @@ -39,10 +38,10 @@ def data_for_key( if key.startswith("LOG10_"): key = key[6:] if key in ensemble.get_summary_keyset(): - data = ensemble.load_all_summary_data([key], realization_index) + data = ensemble.load_summary(key) data = data[key].unstack(level="Date") elif key in ensemble.get_gen_kw_keyset(): - data = ensemble.load_all_gen_kw_data(key.split(":")[0], realization_index) + data = ensemble.load_all_gen_kw_data(key.split(":")[0]) if data.empty: return pd.DataFrame() data = data[key].to_frame().dropna() @@ -56,7 +55,6 @@ def data_for_key( data = ensemble.load_gen_data( key, report_step, - realization_index, ).T except (ValueError, KeyError): return pd.DataFrame() diff --git a/src/ert/dark_storage/endpoints/ensembles.py b/src/ert/dark_storage/endpoints/ensembles.py index 8d0a53fc02c..4e782b0c30a 100644 --- a/src/ert/dark_storage/endpoints/ensembles.py +++ b/src/ert/dark_storage/endpoints/ensembles.py @@ -4,7 +4,6 @@ from fastapi import APIRouter, Body, Depends from ert.dark_storage import json_schema as js -from ert.dark_storage.common import ensemble_parameter_names, get_response_names from ert.dark_storage.enkf import get_storage from ert.storage import StorageAccessor @@ -36,8 +35,6 @@ def get_ensemble( experiment_id=ensemble.experiment_id, userdata={"name": ensemble.name}, size=ensemble.ensemble_size, - parameter_names=ensemble_parameter_names(storage, ensemble_id), - response_names=get_response_names(ensemble), child_ensemble_ids=[], ) diff --git a/src/ert/dark_storage/endpoints/experiments.py b/src/ert/dark_storage/endpoints/experiments.py index 96f68711b7f..ea039ead4d4 100644 --- a/src/ert/dark_storage/endpoints/experiments.py +++ b/src/ert/dark_storage/endpoints/experiments.py @@ -76,8 +76,6 @@ def get_experiment_ensembles( experiment_id=ens.experiment_id, userdata={"name": ens.name}, size=ens.ensemble_size, - parameter_names=[], - response_names=[], child_ensemble_ids=[], ) for ens in db.get_experiment(experiment_id).ensembles diff --git a/src/ert/dark_storage/endpoints/records.py b/src/ert/dark_storage/endpoints/records.py index 5de1cf58247..6ea7a84ce4b 100644 --- a/src/ert/dark_storage/endpoints/records.py +++ b/src/ert/dark_storage/endpoints/records.py @@ -3,7 +3,6 @@ from typing import Any, Dict, List, Mapping, Optional, Union from uuid import UUID, uuid4 -import pandas as pd from fastapi import APIRouter, Body, Depends, File, Header, Request, UploadFile, status from fastapi.responses import Response from typing_extensions import Annotated @@ -134,10 +133,7 @@ async def post_record_observations( async def get_record_observations( *, res: LibresFacade = DEFAULT_LIBRESFACADE, - db: StorageReader = DEFAULT_STORAGE, - ensemble_id: UUID, name: str, - realization_index: Optional[int] = None, ) -> List[js.ObservationOut]: obs_keys = res.observation_keys(name) obss = observations_for_obs_keys(res, obs_keys) @@ -174,15 +170,8 @@ async def get_ensemble_record( name: str, ensemble_id: UUID, accept: Annotated[Union[str, None], Header()] = None, - realization_index: Optional[int] = None, - label: Optional[str] = None, ) -> Any: - dataframe = data_for_key(db.get_ensemble(ensemble_id), name, realization_index) - if realization_index is not None: - # dataframe.loc returns a Series, and when we reconstruct a DataFrame - # from a Series, it defaults to be oriented the wrong way, so we must - # transpose it - dataframe = pd.DataFrame(dataframe.loc[realization_index]).T + dataframe = data_for_key(db.get_ensemble(ensemble_id), name) media_type = accept if accept is not None else "text/csv" if media_type == "application/x-parquet": @@ -249,15 +238,18 @@ def get_ensemble_responses( ensemble_id: UUID, ) -> Mapping[str, js.RecordOut]: response_map: Dict[str, js.RecordOut] = {} - ens = db.get_ensemble(ensemble_id) + name_dict = {} + + for obs in res.get_observations(): + name_dict[obs.observation_key] = obs.observation_type + for name in ens.get_summary_keyset(): - obs_keys = res.observation_keys(name) response_map[str(name)] = js.RecordOut( id=UUID(int=0), name=name, userdata={"data_origin": "Summary"}, - has_observations=len(obs_keys) != 0, + has_observations=name in name_dict, ) for name in res.get_gen_data_keys(): diff --git a/src/ert/dark_storage/json_schema/ensemble.py b/src/ert/dark_storage/json_schema/ensemble.py index 6b981241589..d7457ce2df6 100644 --- a/src/ert/dark_storage/json_schema/ensemble.py +++ b/src/ert/dark_storage/json_schema/ensemble.py @@ -1,14 +1,11 @@ from typing import Any, List, Mapping, Optional from uuid import UUID -from pydantic import BaseModel, Field, model_validator -from typing_extensions import Self +from pydantic import BaseModel, Field class _Ensemble(BaseModel): size: int - parameter_names: List[str] - response_names: List[str] active_realizations: List[int] = [] @@ -16,16 +13,6 @@ class EnsembleIn(_Ensemble): update_id: Optional[UUID] = None userdata: Mapping[str, Any] = {} - @model_validator(mode="after") - def check_names_no_overlap(self) -> Self: - """ - Verify that `parameter_names` and `response_names` don't overlap. Ie, no - record can be both a parameter and a response. - """ - if not set(self.parameter_names).isdisjoint(set(self.response_names)): - raise ValueError("parameters and responses cannot have a name in common") - return self - class EnsembleOut(_Ensemble): id: UUID diff --git a/src/ert/gui/tools/plot/plot_api.py b/src/ert/gui/tools/plot/plot_api.py index 20e9ddbabe2..ce9cf2195b6 100644 --- a/src/ert/gui/tools/plot/plot_api.py +++ b/src/ert/gui/tools/plot/plot_api.py @@ -68,23 +68,6 @@ def _check_response(response: requests.Response): f"{response.text} from url: {response.url}." ) - def _get_experiments(self) -> dict: - with StorageService.session() as client: - response: requests.Response = client.get( - "/experiments", timeout=self._timeout - ) - self._check_response(response) - return response.json() - - def _get_ensembles(self, experiement_id) -> List: - with StorageService.session() as client: - response: requests.Response = client.get( - f"/experiments/{experiement_id}/ensembles", timeout=self._timeout - ) - self._check_response(response) - response_json = response.json() - return response_json - def all_data_type_keys(self) -> List: """Returns a list of all the keys except observation keys. @@ -94,9 +77,20 @@ def all_data_type_keys(self) -> List: the key""" all_keys = {} + with StorageService.session() as client: - for experiment in self._get_experiments(): - for ensemble in self._get_ensembles(experiment["id"]): + response: requests.Response = client.get( + "/experiments", timeout=self._timeout + ) + self._check_response(response) + + for experiment in response.json(): + response: requests.Response = client.get( + f"/experiments/{experiment['id']}/ensembles", timeout=self._timeout + ) + self._check_response(response) + + for ensemble in response.json(): response: requests.Response = client.get( f"/ensembles/{ensemble['id']}/responses", timeout=self._timeout ) diff --git a/src/ert/gui/tools/plot/plot_window.py b/src/ert/gui/tools/plot/plot_window.py index 1fcd777edb5..6cbd5de34d3 100644 --- a/src/ert/gui/tools/plot/plot_window.py +++ b/src/ert/gui/tools/plot/plot_window.py @@ -1,4 +1,5 @@ import logging +import time from typing import List from httpx import RequestError @@ -42,12 +43,10 @@ class PlotWindow(QMainWindow): def __init__(self, config_file, parent): QMainWindow.__init__(self, parent) - + t = time.perf_counter() logger.info("PlotWindow __init__") - self.setMinimumWidth(850) self.setMinimumHeight(650) - self.setWindowTitle(f"Plotting - {config_file}") self.activateWindow() @@ -109,6 +108,8 @@ def __init__(self, config_file, parent): self._data_type_keys_widget.selectDefault() self._updateCustomizer(current_plot_widget) + logger.info(f"PlotWindow __init__ done. time={time.perf_counter() -t}") + def currentPlotChanged(self): key_def = self.getSelectedKey() if key_def is None: diff --git a/src/ert/storage/local_ensemble.py b/src/ert/storage/local_ensemble.py index 5176b70f946..155e72226e6 100644 --- a/src/ert/storage/local_ensemble.py +++ b/src/ert/storage/local_ensemble.py @@ -103,12 +103,24 @@ def get_realization_mask_without_parent_failure(self) -> npt.NDArray[np.bool_]: ) def get_realization_mask_with_parameters(self) -> npt.NDArray[np.bool_]: - return np.array([self._get_parameter(i) for i in range(self.ensemble_size)]) + return np.array( + [ + self._all_parameters_exist_for_realization(i) + for i in range(self.ensemble_size) + ] + ) - def get_realization_mask_with_responses(self) -> npt.NDArray[np.bool_]: - return np.array([self._get_response(i) for i in range(self.ensemble_size)]) + def get_realization_mask_with_responses( + self, key: Optional[str] = None + ) -> npt.NDArray[np.bool_]: + return np.array( + [ + self._all_responses_exist_for_realization(i, key) + for i in range(self.ensemble_size) + ] + ) - def _get_parameter(self, realization: int) -> bool: + def _all_parameters_exist_for_realization(self, realization: int) -> bool: if not self.experiment.parameter_configuration: return False path = self.mount_point / f"realization-{realization}" @@ -117,10 +129,16 @@ def _get_parameter(self, realization: int) -> bool: for parameter in self.experiment.parameter_configuration ) - def _get_response(self, realization: int) -> bool: + def _all_responses_exist_for_realization( + self, realization: int, key: Optional[str] = None + ) -> bool: if not self.experiment.response_configuration: return False path = self.mount_point / f"realization-{realization}" + + if key: + return (path / f"{key}.nc").exists() + return all( (path / f"{response}.nc").exists() for response in self._filter_response_configuration() @@ -180,10 +198,11 @@ def realizations_initialized(self, realizations: List[int]) -> bool: return all((responses[real] or parameters[real]) for real in realizations) - def get_realization_list_with_responses(self) -> List[int]: - return [ - idx for idx, b in enumerate(self.get_realization_mask_with_responses()) if b - ] + def get_realization_list_with_responses( + self, key: Optional[str] = None + ) -> List[int]: + mask = self.get_realization_mask_with_responses(key) + return np.where(mask)[0].tolist() def set_failure( self, @@ -221,9 +240,9 @@ def _find_state(realization: int) -> RealizationStorageState: failure = self.get_failure(realization) assert failure return failure.type - if self._get_response(realization): + if self._all_responses_exist_for_realization(realization): return RealizationStorageState.HAS_DATA - if self._get_parameter(realization): + if self._all_parameters_exist_for_realization(realization): return RealizationStorageState.INITIALIZED else: return RealizationStorageState.UNDEFINED @@ -253,20 +272,14 @@ def _get_gen_data_config(self, key: str) -> GenDataConfig: @deprecated("Check the experiment for registered responses") def get_gen_data_keyset(self) -> List[str]: - keylist = [ - k - for k, v in self.experiment.response_info.items() - if "_ert_kind" in v and v["_ert_kind"] == "GenDataConfig" - ] - gen_data_list = [] - for key in keylist: - gen_data_config = self._get_gen_data_config(key) - if gen_data_config.report_steps is None: - gen_data_list.append(f"{key}@0") - else: - for report_step in gen_data_config.report_steps: - gen_data_list.append(f"{key}@{report_step}") + for k, v in self.experiment.response_configuration.items(): + if isinstance(v, GenDataConfig): + if v.report_steps is None: + gen_data_list.append(f"{k}@0") + else: + for report_step in v.report_steps: + gen_data_list.append(f"{k}@{report_step}") return sorted(gen_data_list, key=lambda k: k.lower()) @deprecated("Check the experiment for registered parameters") @@ -293,7 +306,7 @@ def load_gen_data( report_step: int, realization_index: Optional[int] = None, ) -> pd.DataFrame: - realizations = self.get_realization_list_with_responses() + realizations = self.get_realization_list_with_responses(key) if realization_index is not None: if realization_index not in realizations: raise IndexError(f"No such realization {realization_index}") @@ -368,6 +381,29 @@ def load_responses( assert isinstance(response, xr.Dataset) return response + def load_responses_summary(self, key: str) -> xr.Dataset: + loaded = [] + for realization in range(self.ensemble_size): + input_path = self.mount_point / f"realization-{realization}" / "summary.nc" + if input_path.exists(): + ds = xr.open_dataset(input_path, engine="scipy") + ds = ds.query(name=f'name=="{key}"') + loaded.append(ds) + return xr.combine_nested(loaded, concat_dim="realization") + + def load_summary(self, key: str) -> pd.DataFrame: + try: + df = self.load_responses_summary(key).to_dataframe() + except (ValueError, KeyError): + return pd.DataFrame() + + df = df.unstack(level="name") + df.columns = [col[1] for col in df.columns.values] + df.index = df.index.rename( + {"time": "Date", "realization": "Realization"} + ).reorder_levels(["Realization", "Date"]) + return df + @deprecated("Use load_responses") def load_all_summary_data( self, @@ -386,6 +422,7 @@ def load_all_summary_data( df = self.load_responses("summary", tuple(realizations)).to_dataframe() except (ValueError, KeyError): return pd.DataFrame() + df = df.unstack(level="name") df.columns = [col[1] for col in df.columns.values] df.index = df.index.rename( diff --git a/tests/performance_tests/performance_utils.py b/tests/performance_tests/performance_utils.py index 772a84c204a..b30b4968549 100644 --- a/tests/performance_tests/performance_utils.py +++ b/tests/performance_tests/performance_utils.py @@ -152,9 +152,9 @@ def dark_storage_app(monkeypatch): folder = py.path.local(tempfile.mkdtemp()) make_poly_example( folder, - "../../test-data/poly_template", - gen_data_count=34, - gen_data_entries=15, + "test-data/poly_template", + gen_data_count=3400, + gen_data_entries=150, summary_data_entries=100, reals=200, summary_data_count=4000, @@ -163,7 +163,7 @@ def dark_storage_app(monkeypatch): sum_obs_every=10, gen_obs_every=1, parameter_entries=10, - parameter_count=8, + parameter_count=10, update_steps=1, ) print(folder) diff --git a/tests/performance_tests/test_dark_storage_performance.py b/tests/performance_tests/test_dark_storage_performance.py index e8c0d72e580..3949dec8ffb 100644 --- a/tests/performance_tests/test_dark_storage_performance.py +++ b/tests/performance_tests/test_dark_storage_performance.py @@ -34,12 +34,8 @@ def get_single_record_csv(storage, ensemble_id1, keyword, poly_ran): assert len(record_df1_indexed.index) == 1 -def get_observations(ert, storage, ensemble_id1, keyword: str, poly_ran): - obs = run_in_loop( - records.get_record_observations( - res=ert, db=storage, ensemble_id=ensemble_id1, name=keyword - ) - ) +def get_observations(ert, keyword: str, poly_ran): + obs = run_in_loop(records.get_record_observations(res=ert, name=keyword)) if "PSUM" in keyword: n = int(keyword[4:]) @@ -69,21 +65,6 @@ def get_observations(ert, storage, ensemble_id1, keyword: str, poly_ran): raise AssertionError(f"should never get here, keyword is {keyword}") -def get_single_record_parquet(storage, ensemble_id1, keyword, poly_ran): - parquet = run_in_loop( - records.get_ensemble_record( - db=storage, - name=keyword, - ensemble_id=ensemble_id1, - realization_index=poly_ran["reals"] - 1, - accept="application/x-parquet", - ) - ).body - record_df1_indexed = pd.read_parquet(io.BytesIO(parquet)) - assert len(record_df1_indexed.columns) == poly_ran["gen_data_entries"] - assert len(record_df1_indexed.index) == 1 - - def get_record_parquet(storage, ensemble_id1, keyword, poly_ran): parquet = run_in_loop( records.get_ensemble_record( @@ -136,7 +117,6 @@ def get_parameters(storage, ensemble_id1, keyword, poly_ran): get_result, get_record_parquet, get_record_csv, - get_single_record_parquet, get_parameters, ], ) @@ -160,16 +140,14 @@ def test_direct_dark_performance( enkf_facade = LibresFacade(ert) storage = open_storage(enkf_facade.enspath) experiment_json = experiments.get_experiments(res=enkf_facade, db=storage) - ensemble_json_default = None ensemble_id_default = None for ensemble_id in experiment_json[0].ensemble_ids: ensemble_json = ensembles.get_ensemble( storage=storage, ensemble_id=ensemble_id ) if ensemble_json.userdata["name"] == "default": - ensemble_json_default = ensemble_json ensemble_id_default = ensemble_id - assert key in ensemble_json_default.response_names + benchmark(function, storage, ensemble_id_default, key, template_config) @@ -197,18 +175,5 @@ def test_direct_dark_performance_with_libres_facade( config = ErtConfig.from_file("poly.ert") ert = EnKFMain(config) enkf_facade = LibresFacade(ert) - storage = open_storage(enkf_facade.enspath) - experiment_json = experiments.get_experiments(res=enkf_facade, db=storage) - ensemble_json_default = None - ensemble_id_default = None - for ensemble_id in experiment_json[0].ensemble_ids: - ensemble_json = ensembles.get_ensemble( - storage=storage, ensemble_id=ensemble_id - ) - if ensemble_json.userdata["name"] == "default": - ensemble_json_default = ensemble_json - ensemble_id_default = ensemble_id - assert key in ensemble_json_default.response_names - benchmark( - function, enkf_facade, storage, ensemble_id_default, key, template_config - ) + + benchmark(function, enkf_facade, key, template_config) diff --git a/tests/unit_tests/dark_storage/test_http_endpoints.py b/tests/unit_tests/dark_storage/test_http_endpoints.py index d005fed6f53..6d5329be3d0 100644 --- a/tests/unit_tests/dark_storage/test_http_endpoints.py +++ b/tests/unit_tests/dark_storage/test_http_endpoints.py @@ -83,7 +83,6 @@ def test_get_response(poly_example_tmp_dir, dark_storage_client): f"\nexperiment_json: {json.dumps(experiment_json, indent=1)} \n\n" f"ensemble_json: {json.dumps(ensemble_json, indent=1)}" ) - assert ensemble_json["response_names"][0] == "POLY_RES@0" resp: Response = dark_storage_client.get(f"/ensembles/{ensemble_id2}") ensemble_json2 = resp.json() @@ -92,7 +91,6 @@ def test_get_response(poly_example_tmp_dir, dark_storage_client): f"\nexperiment_json: {json.dumps(experiment_json, indent=1)} \n\n" f"ensemble_json2: {json.dumps(ensemble_json2, indent=1)}" ) - assert ensemble_json2["response_names"][0] == "POLY_RES@0" resp: Response = dark_storage_client.get( f"/ensembles/{ensemble_id1}/responses/POLY_RES@0/data" @@ -134,14 +132,6 @@ def test_get_response(poly_example_tmp_dir, dark_storage_client): assert len(record_df1.columns) == 10 assert len(record_df1.index) == 3 - resp: Response = dark_storage_client.get( - f"/ensembles/{ensemble_id1}/records/POLY_RES@0?realization_index=2" - ) - stream = io.BytesIO(resp.content) - record_df1_indexed = pd.read_csv(stream, index_col=0, float_precision="round_trip") - assert len(record_df1_indexed.columns) == 10 - assert len(record_df1_indexed.index) == 1 - def test_get_ensemble_parameters(poly_example_tmp_dir, dark_storage_client): resp: Response = dark_storage_client.get("/experiments") @@ -220,18 +210,6 @@ def test_misfit_endpoint(poly_example_tmp_dir, dark_storage_client): assert misfit.shape == (3, 5) -def test_get_record_labels(poly_example_tmp_dir, dark_storage_client): - resp: Response = dark_storage_client.get("/experiments") - answer_json = resp.json() - ensemble_id = answer_json[0]["ensemble_ids"][0] - resp: Response = dark_storage_client.get( - f"/ensembles/{ensemble_id}/records/POLY_RES@0/labels" - ) - labels = resp.json() - - assert labels == [] - - @pytest.mark.parametrize( "coeffs", [ diff --git a/tests/unit_tests/gui/tools/plot/test_plot_api.py b/tests/unit_tests/gui/tools/plot/test_plot_api.py index 0448d85a1e5..60163aa3ec0 100644 --- a/tests/unit_tests/gui/tools/plot/test_plot_api.py +++ b/tests/unit_tests/gui/tools/plot/test_plot_api.py @@ -3,6 +3,8 @@ import pytest from pandas.testing import assert_frame_equal +from tests.unit_tests.gui.tools.plot.conftest import MockResponse + def test_key_def_structure(api): key_defs = api.all_data_type_keys() @@ -102,15 +104,18 @@ def test_load_history_data(api): ) -def test_plot_api_request_errors(api, mocker): +def test_plot_api_request_errors_all_data_type_keys(api, mocker): # Mock the experiment name to be something unexpected mocker.patch( - "ert.gui.tools.plot.plot_api.PlotApi._get_experiments", - return_value=[{"id": "mocked"}], + "tests.unit_tests.gui.tools.plot.conftest.mocked_requests_get", + return_value=MockResponse(None, 404, text="error"), ) + with pytest.raises(httpx.RequestError): api.all_data_type_keys() + +def test_plot_api_request_errors(api): case_name = "default_0" with pytest.raises(httpx.RequestError): api.observations_for_key(case_name, "should_not_be_there")