From 00a38261211ceeaa4b2297f6466ec451d3b7d697 Mon Sep 17 00:00:00 2001 From: DanSava Date: Tue, 23 Nov 2021 14:28:45 +0200 Subject: [PATCH] Account for ert and ert3 discrepancy when uploading parameter data to storage. --- tests/data/snake_oil_data.py | 12 +++++ webviz_ert/data_loader/__init__.py | 75 +++++++++++++++++----------- webviz_ert/models/ensemble_model.py | 18 +++++-- webviz_ert/models/parameter_model.py | 4 +- webviz_ert/models/response.py | 2 +- 5 files changed, 74 insertions(+), 37 deletions(-) diff --git a/tests/data/snake_oil_data.py b/tests/data/snake_oil_data.py index cde60e9b..1dab4ca3 100644 --- a/tests/data/snake_oil_data.py +++ b/tests/data/snake_oil_data.py @@ -156,6 +156,8 @@ def content(self): "id": "FOPR", }, }, + "http://127.0.0.1:5000/ensembles/1/records/OP1_DIVERGENCE_SCALE/labels": [], + "http://127.0.0.1:5000/ensembles/1/records/BPR_138_PERSISTENCE/labels": [], "http://127.0.0.1:5000/ensembles/1/records/SNAKE_OIL_GPR_DIFF/observations?realization_index=0": [], "http://127.0.0.1:5000/ensembles/3/records/SNAKE_OIL_GPR_DIFF?realization_index=0": pd.DataFrame( [0, 1, 2, 3, 4, 5, 6, 7, 8, 9], @@ -235,6 +237,16 @@ def to_parquet_helper(dataframe: pd.DataFrame) -> bytes: ).transpose() ) +ensembles_response[ + "http://127.0.0.1:5000/ensembles/1/records/SNAKE_OIL_GPR_DIFF" +] = to_parquet_helper( + pd.DataFrame( + [0, 1, 2, 3, 4, 5, 6, 7, 8, 9], + columns=["0"], + index=["0", "1", "2", "3", "4", "5", "6", "7", "8", "9"], + ).transpose() +) + ensembles_response[ "http://127.0.0.1:5000/ensembles/1/records/OP1_DIVERGENCE_SCALE" ] = to_parquet_helper( diff --git a/webviz_ert/data_loader/__init__.py b/webviz_ert/data_loader/__init__.py index 5b03b75e..a3d9c30e 100644 --- a/webviz_ert/data_loader/__init__.py +++ b/webviz_ert/data_loader/__init__.py @@ -1,5 +1,5 @@ import json -from typing import Any, Mapping, Optional, List, MutableMapping, Tuple +from typing import Any, Mapping, Optional, List, MutableMapping, Tuple, Dict from collections import defaultdict from pprint import pformat import requests @@ -113,7 +113,7 @@ class DataLoader: _instances: MutableMapping[ServerIdentifier, "DataLoader"] = {} baseurl: str - token: str + token: Optional[str] _graphql_cache: MutableMapping[str, MutableMapping[dict, Any]] def __new__(cls, baseurl: str, token: Optional[str] = None) -> "DataLoader": @@ -195,45 +195,62 @@ def get_ensemble_userdata(self, ensemble_id: str) -> dict: def get_ensemble_parameters(self, ensemble_id: str) -> list: return self._get(url=f"ensembles/{ensemble_id}/parameters").json() + def get_record_labels(self, ensemble_id: str, name: str) -> list: + return self._get(url=f"ensembles/{ensemble_id}/records/{name}/labels").json() + def get_experiment_priors(self, experiment_id: str) -> dict: return json.loads( self._query(GET_PRIORS, id=experiment_id)["experiment"]["priors"] ) def get_ensemble_parameter_data( - self, ensemble_id: str, parameter_name: str + self, + ensemble_id: str, + parameter_name: str, ) -> pd.DataFrame: - resp = self._get( - url=f"ensembles/{ensemble_id}/records/{parameter_name}", - headers={"accept": "application/x-parquet"}, - ) - stream = io.BytesIO(resp.content) - df = pd.read_parquet(stream) - return df + try: + if "::" in parameter_name: + name, label = parameter_name.split("::", 1) + params = {"label": label} + else: + name = parameter_name + params = {} + + resp = self._get( + url=f"ensembles/{ensemble_id}/records/{name}", + headers={"accept": "application/x-parquet"}, + params=params, + ) + stream = io.BytesIO(resp.content) + df = pd.read_parquet(stream).transpose() + return df + except DataLoaderException as e: + logger.error(e) + return pd.DataFrame() def get_ensemble_record_data( - self, ensemble_id: str, record_name: str, active_realizations: List[int] + self, + ensemble_id: str, + record_name: str, ) -> pd.DataFrame: - dfs = [] - for rel_idx in active_realizations: - try: - resp = self._get( - url=f"ensembles/{ensemble_id}/records/{record_name}", - headers={"accept": "application/x-parquet"}, - params={"realization_index": rel_idx}, - ) - stream = io.BytesIO(resp.content) - df = pd.read_parquet(stream).transpose() - df.columns = [rel_idx] - dfs.append(df) - - except DataLoaderException as e: - logger.error(e) - - if dfs == []: + try: + resp = self._get( + url=f"ensembles/{ensemble_id}/records/{record_name}", + headers={"accept": "application/x-parquet"}, + ) + stream = io.BytesIO(resp.content) + df = pd.read_parquet(stream).transpose() + + except DataLoaderException as e: + logger.error(e) return pd.DataFrame() - return pd.concat(dfs, axis=1) + try: + df.index = df.index.astype(int) + except TypeError: + pass + df = df.sort_index() + return df def get_ensemble_record_observations( self, ensemble_id: str, record_name: str diff --git a/webviz_ert/models/ensemble_model.py b/webviz_ert/models/ensemble_model.py index 3e850bcd..82d40876 100644 --- a/webviz_ert/models/ensemble_model.py +++ b/webviz_ert/models/ensemble_model.py @@ -1,15 +1,16 @@ import json import pandas as pd from typing import Mapping, List, Dict, Union, Any, Optional -from webviz_ert.data_loader import ( - get_data_loader, -) +from webviz_ert.data_loader import get_data_loader, DataLoaderException from webviz_ert.models import Response, PriorModel, ParametersModel def _create_parameter_models( - parameters_names: list, priors: dict, ensemble_id: str, project_id: str + parameters_names: list, + priors: dict, + ensemble_id: str, + project_id: str, ) -> Optional[Mapping[str, ParametersModel]]: parameters = {} for param in parameters_names: @@ -98,7 +99,14 @@ def parameters( self, ) -> Optional[Mapping[str, ParametersModel]]: if not self._parameters: - parameter_names = self._data_loader.get_ensemble_parameters(self._id) + parameter_names = [] + for param_name in self._data_loader.get_ensemble_parameters(self._id): + labels = self._data_loader.get_record_labels(self._id, param_name) + if len(labels) > 0: + for label in labels: + parameter_names.append(f"{param_name}::{label}") + else: + parameter_names.append(param_name) parameter_priors = ( self._data_loader.get_experiment_priors(self._experiment_id) if not self._parent diff --git a/webviz_ert/models/parameter_model.py b/webviz_ert/models/parameter_model.py index a7d7c8b1..24545410 100644 --- a/webviz_ert/models/parameter_model.py +++ b/webviz_ert/models/parameter_model.py @@ -30,10 +30,10 @@ def __init__(self, **kwargs: Any): def data_df(self) -> pd.DataFrame: if self._data_df.empty: _data_df = self._data_loader.get_ensemble_parameter_data( - ensemble_id=self._ensemble_id, parameter_name=self.key + ensemble_id=self._ensemble_id, + parameter_name=self.key, ) if _data_df is not None: - _data_df = _data_df.transpose() _data_df.index.name = self.key self._data_df = _data_df return self._data_df diff --git a/webviz_ert/models/response.py b/webviz_ert/models/response.py index 85ff2886..82505325 100644 --- a/webviz_ert/models/response.py +++ b/webviz_ert/models/response.py @@ -40,7 +40,7 @@ def axis(self) -> Optional[List[Union[int, str, datetime.datetime]]]: def data(self) -> pd.DataFrame: if self._data is None: self._data = self._data_loader.get_ensemble_record_data( - self._ensemble_id, self.name, self._active_realizations + self._ensemble_id, self.name ) return self._data