Skip to content

Commit

Permalink
Account for ert and ert3 discrepancy when uploading parameter data to…
Browse files Browse the repository at this point in the history
… storage.
  • Loading branch information
DanSava committed Dec 1, 2021
1 parent 906aaa0 commit 37e9f3a
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 17 deletions.
52 changes: 41 additions & 11 deletions webviz_ert/data_loader/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import os
import json
from typing import Any, Union, Mapping, Optional, List, MutableMapping, Tuple
from typing import Any, Union, Mapping, Optional, List, MutableMapping, Tuple, Dict
from collections import defaultdict
from pprint import pformat
import requests
Expand Down Expand Up @@ -191,32 +191,62 @@ def get_ensemble_userdata(self, ensemble_id: str) -> dict:
def get_ensemble_parameters(self, ensemble_id: str) -> list:
return self._get(url=f"ensembles/{ensemble_id}/parameters").json()

def get_record_labels(self, ensemble_id: str, name: str) -> list:
return self._get(url=f"ensembles/{ensemble_id}/records/{name}/labels").json()

def get_experiment_priors(self, experiment_id: str) -> dict:
return json.loads(
self._query(GET_PRIORS, id=experiment_id)["experiment"]["priors"]
)

def get_ensemble_parameter_data(
self, ensemble_id: str, parameter_name: str
self,
ensemble_id: str,
parameter_name: str,
realizations: list,
) -> pd.DataFrame:
resp = self._get(
url=f"ensembles/{ensemble_id}/records/{parameter_name}",
headers={"accept": "application/x-parquet"},
)
stream = io.BytesIO(resp.content)
df = pd.read_parquet(stream)
return df
try:
if "::" in parameter_name:
name, label = parameter_name.split("::", 1)
resp = self._get(
url=f"ensembles/{ensemble_id}/records/{name}",
headers={"accept": "application/x-parquet"},
params={"label": label},
)
else:
resp = self._get(
url=f"ensembles/{ensemble_id}/records/{parameter_name}",
headers={"accept": "application/x-parquet"},
)
stream = io.BytesIO(resp.content)
df = pd.read_parquet(stream)
return df
except DataLoaderException as e:
logger.error(e)
return self.get_ensemble_record_data(
ensemble_id=ensemble_id,
record_name=name,
active_realizations=realizations,
label=label,
)

def get_ensemble_record_data(
self, ensemble_id: str, record_name: str, active_realizations: List[int]
self,
ensemble_id: str,
record_name: str,
active_realizations: List[int],
label: Optional[str] = None,
) -> pd.DataFrame:
dfs = []
for rel_idx in active_realizations:
try:
params: Dict[str, Any] = {"realization_index": rel_idx}
if label:
params["label"] = label
resp = self._get(
url=f"ensembles/{ensemble_id}/records/{record_name}",
headers={"accept": "application/x-parquet"},
params={"realization_index": rel_idx},
params=params,
)
stream = io.BytesIO(resp.content)
df = pd.read_parquet(stream).transpose().sort_index()
Expand Down
25 changes: 20 additions & 5 deletions webviz_ert/models/ensemble_model.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
import json
import pandas as pd
from typing import Mapping, List, Dict, Union, Any, Optional
from webviz_ert.data_loader import (
get_data_loader,
)
from webviz_ert.data_loader import get_data_loader, DataLoaderException

from webviz_ert.models import Response, PriorModel, ParametersModel


def _create_parameter_models(
parameters_names: list, priors: dict, ensemble_id: str, project_id: str
parameters_names: list,
priors: dict,
ensemble_id: str,
project_id: str,
realizations: list,
) -> Optional[Mapping[str, ParametersModel]]:
parameters = {}
for param in parameters_names:
Expand All @@ -30,6 +32,7 @@ def _create_parameter_models(
param_id="", # TODO?
project_id=project_id,
ensemble_id=ensemble_id,
realizations=realizations,
)
return parameters

Expand Down Expand Up @@ -98,7 +101,18 @@ def parameters(
self,
) -> Optional[Mapping[str, ParametersModel]]:
if not self._parameters:
parameter_names = self._data_loader.get_ensemble_parameters(self._id)
parameter_names = []
# TODO Remove except code path after PR https://github.com/equinor/ert-storage/pull/180 is merged
try:
for param_name in self._data_loader.get_ensemble_parameters(self._id):
labels = self._data_loader.get_record_labels(self._id, param_name)
if len(labels) > 0:
for label in labels:
parameter_names.append(f"{param_name}::{label}")
else:
parameter_names.append(param_name)
except DataLoaderException as e:
parameter_names = self._data_loader.get_ensemble_parameters(self._id)
parameter_priors = (
self._data_loader.get_experiment_priors(self._experiment_id)
if not self._parent
Expand All @@ -109,6 +123,7 @@ def parameters(
parameter_priors,
ensemble_id=self._id,
project_id=self._project_id,
realizations=self._active_realizations,
)
return self._parameters

Expand Down
5 changes: 4 additions & 1 deletion webviz_ert/models/parameter_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,12 @@ def __init__(self, **kwargs: Any):
self._data_loader = get_data_loader(self._project_id)

def data_df(self) -> pd.DataFrame:
realization = [] if self._realizations is None else self._realizations
if self._data_df.empty:
_data_df = self._data_loader.get_ensemble_parameter_data(
ensemble_id=self._ensemble_id, parameter_name=self.key
ensemble_id=self._ensemble_id,
parameter_name=self.key,
realizations=realization,
)
if _data_df is not None:
_data_df = _data_df.transpose()
Expand Down

0 comments on commit 37e9f3a

Please sign in to comment.