Skip to content

Commit

Permalink
Account for ert and ert3 discrepancy when uploading parameter data to…
Browse files Browse the repository at this point in the history
… storage.
  • Loading branch information
DanSava committed Dec 16, 2021
1 parent 27495b9 commit 00a3826
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 37 deletions.
12 changes: 12 additions & 0 deletions tests/data/snake_oil_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,8 @@ def content(self):
"id": "FOPR",
},
},
"http://127.0.0.1:5000/ensembles/1/records/OP1_DIVERGENCE_SCALE/labels": [],
"http://127.0.0.1:5000/ensembles/1/records/BPR_138_PERSISTENCE/labels": [],
"http://127.0.0.1:5000/ensembles/1/records/SNAKE_OIL_GPR_DIFF/observations?realization_index=0": [],
"http://127.0.0.1:5000/ensembles/3/records/SNAKE_OIL_GPR_DIFF?realization_index=0": pd.DataFrame(
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
Expand Down Expand Up @@ -235,6 +237,16 @@ def to_parquet_helper(dataframe: pd.DataFrame) -> bytes:
).transpose()
)

ensembles_response[
"http://127.0.0.1:5000/ensembles/1/records/SNAKE_OIL_GPR_DIFF"
] = to_parquet_helper(
pd.DataFrame(
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
columns=["0"],
index=["0", "1", "2", "3", "4", "5", "6", "7", "8", "9"],
).transpose()
)

ensembles_response[
"http://127.0.0.1:5000/ensembles/1/records/OP1_DIVERGENCE_SCALE"
] = to_parquet_helper(
Expand Down
75 changes: 46 additions & 29 deletions webviz_ert/data_loader/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import json
from typing import Any, Mapping, Optional, List, MutableMapping, Tuple
from typing import Any, Mapping, Optional, List, MutableMapping, Tuple, Dict
from collections import defaultdict
from pprint import pformat
import requests
Expand Down Expand Up @@ -113,7 +113,7 @@ class DataLoader:
_instances: MutableMapping[ServerIdentifier, "DataLoader"] = {}

baseurl: str
token: str
token: Optional[str]
_graphql_cache: MutableMapping[str, MutableMapping[dict, Any]]

def __new__(cls, baseurl: str, token: Optional[str] = None) -> "DataLoader":
Expand Down Expand Up @@ -195,45 +195,62 @@ def get_ensemble_userdata(self, ensemble_id: str) -> dict:
def get_ensemble_parameters(self, ensemble_id: str) -> list:
return self._get(url=f"ensembles/{ensemble_id}/parameters").json()

def get_record_labels(self, ensemble_id: str, name: str) -> list:
return self._get(url=f"ensembles/{ensemble_id}/records/{name}/labels").json()

def get_experiment_priors(self, experiment_id: str) -> dict:
return json.loads(
self._query(GET_PRIORS, id=experiment_id)["experiment"]["priors"]
)

def get_ensemble_parameter_data(
self, ensemble_id: str, parameter_name: str
self,
ensemble_id: str,
parameter_name: str,
) -> pd.DataFrame:
resp = self._get(
url=f"ensembles/{ensemble_id}/records/{parameter_name}",
headers={"accept": "application/x-parquet"},
)
stream = io.BytesIO(resp.content)
df = pd.read_parquet(stream)
return df
try:
if "::" in parameter_name:
name, label = parameter_name.split("::", 1)
params = {"label": label}
else:
name = parameter_name
params = {}

resp = self._get(
url=f"ensembles/{ensemble_id}/records/{name}",
headers={"accept": "application/x-parquet"},
params=params,
)
stream = io.BytesIO(resp.content)
df = pd.read_parquet(stream).transpose()
return df
except DataLoaderException as e:
logger.error(e)
return pd.DataFrame()

def get_ensemble_record_data(
self, ensemble_id: str, record_name: str, active_realizations: List[int]
self,
ensemble_id: str,
record_name: str,
) -> pd.DataFrame:
dfs = []
for rel_idx in active_realizations:
try:
resp = self._get(
url=f"ensembles/{ensemble_id}/records/{record_name}",
headers={"accept": "application/x-parquet"},
params={"realization_index": rel_idx},
)
stream = io.BytesIO(resp.content)
df = pd.read_parquet(stream).transpose()
df.columns = [rel_idx]
dfs.append(df)

except DataLoaderException as e:
logger.error(e)

if dfs == []:
try:
resp = self._get(
url=f"ensembles/{ensemble_id}/records/{record_name}",
headers={"accept": "application/x-parquet"},
)
stream = io.BytesIO(resp.content)
df = pd.read_parquet(stream).transpose()

except DataLoaderException as e:
logger.error(e)
return pd.DataFrame()

return pd.concat(dfs, axis=1)
try:
df.index = df.index.astype(int)
except TypeError:
pass
df = df.sort_index()
return df

def get_ensemble_record_observations(
self, ensemble_id: str, record_name: str
Expand Down
18 changes: 13 additions & 5 deletions webviz_ert/models/ensemble_model.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
import json
import pandas as pd
from typing import Mapping, List, Dict, Union, Any, Optional
from webviz_ert.data_loader import (
get_data_loader,
)
from webviz_ert.data_loader import get_data_loader, DataLoaderException

from webviz_ert.models import Response, PriorModel, ParametersModel


def _create_parameter_models(
parameters_names: list, priors: dict, ensemble_id: str, project_id: str
parameters_names: list,
priors: dict,
ensemble_id: str,
project_id: str,
) -> Optional[Mapping[str, ParametersModel]]:
parameters = {}
for param in parameters_names:
Expand Down Expand Up @@ -98,7 +99,14 @@ def parameters(
self,
) -> Optional[Mapping[str, ParametersModel]]:
if not self._parameters:
parameter_names = self._data_loader.get_ensemble_parameters(self._id)
parameter_names = []
for param_name in self._data_loader.get_ensemble_parameters(self._id):
labels = self._data_loader.get_record_labels(self._id, param_name)
if len(labels) > 0:
for label in labels:
parameter_names.append(f"{param_name}::{label}")
else:
parameter_names.append(param_name)
parameter_priors = (
self._data_loader.get_experiment_priors(self._experiment_id)
if not self._parent
Expand Down
4 changes: 2 additions & 2 deletions webviz_ert/models/parameter_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ def __init__(self, **kwargs: Any):
def data_df(self) -> pd.DataFrame:
if self._data_df.empty:
_data_df = self._data_loader.get_ensemble_parameter_data(
ensemble_id=self._ensemble_id, parameter_name=self.key
ensemble_id=self._ensemble_id,
parameter_name=self.key,
)
if _data_df is not None:
_data_df = _data_df.transpose()
_data_df.index.name = self.key
self._data_df = _data_df
return self._data_df
2 changes: 1 addition & 1 deletion webviz_ert/models/response.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def axis(self) -> Optional[List[Union[int, str, datetime.datetime]]]:
def data(self) -> pd.DataFrame:
if self._data is None:
self._data = self._data_loader.get_ensemble_record_data(
self._ensemble_id, self.name, self._active_realizations
self._ensemble_id, self.name
)
return self._data

Expand Down

0 comments on commit 00a3826

Please sign in to comment.