-
Notifications
You must be signed in to change notification settings - Fork 25
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Account for ert and ert3 discrepancy when uploading parameter data to storage #192
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,5 @@ | ||
import json | ||
from typing import Any, Mapping, Optional, List, MutableMapping, Tuple | ||
from typing import Any, Mapping, Optional, List, MutableMapping, Tuple, Dict | ||
from collections import defaultdict | ||
from pprint import pformat | ||
import requests | ||
|
@@ -113,7 +113,7 @@ class DataLoader: | |
_instances: MutableMapping[ServerIdentifier, "DataLoader"] = {} | ||
|
||
baseurl: str | ||
token: str | ||
token: Optional[str] | ||
_graphql_cache: MutableMapping[str, MutableMapping[dict, Any]] | ||
|
||
def __new__(cls, baseurl: str, token: Optional[str] = None) -> "DataLoader": | ||
|
@@ -195,45 +195,62 @@ def get_ensemble_userdata(self, ensemble_id: str) -> dict: | |
def get_ensemble_parameters(self, ensemble_id: str) -> list: | ||
return self._get(url=f"ensembles/{ensemble_id}/parameters").json() | ||
|
||
def get_record_labels(self, ensemble_id: str, name: str) -> list: | ||
return self._get(url=f"ensembles/{ensemble_id}/records/{name}/labels").json() | ||
|
||
def get_experiment_priors(self, experiment_id: str) -> dict: | ||
return json.loads( | ||
self._query(GET_PRIORS, id=experiment_id)["experiment"]["priors"] | ||
) | ||
|
||
def get_ensemble_parameter_data( | ||
self, ensemble_id: str, parameter_name: str | ||
self, | ||
ensemble_id: str, | ||
parameter_name: str, | ||
) -> pd.DataFrame: | ||
resp = self._get( | ||
url=f"ensembles/{ensemble_id}/records/{parameter_name}", | ||
headers={"accept": "application/x-parquet"}, | ||
) | ||
stream = io.BytesIO(resp.content) | ||
df = pd.read_parquet(stream) | ||
return df | ||
try: | ||
if "::" in parameter_name: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is what I meant by future work. It might be that I'm misunderstanding something, but is this not somewhat of a hack to avoid having to make ERT2 upload parameters as it should? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure we can make ert2 upload parameters like ert3 does it, but I thought the idea was that the underlying Data structure is the same for both ert3 and ert2 it should not matter how they are uploaded, ert2 or ert3 style given that both ways are valid and supported by the storage API. |
||
name, label = parameter_name.split("::", 1) | ||
params = {"label": label} | ||
else: | ||
name = parameter_name | ||
params = {} | ||
|
||
resp = self._get( | ||
url=f"ensembles/{ensemble_id}/records/{name}", | ||
headers={"accept": "application/x-parquet"}, | ||
params=params, | ||
) | ||
stream = io.BytesIO(resp.content) | ||
df = pd.read_parquet(stream).transpose() | ||
return df | ||
except DataLoaderException as e: | ||
logger.error(e) | ||
return pd.DataFrame() | ||
|
||
def get_ensemble_record_data( | ||
self, ensemble_id: str, record_name: str, active_realizations: List[int] | ||
self, | ||
ensemble_id: str, | ||
record_name: str, | ||
) -> pd.DataFrame: | ||
dfs = [] | ||
for rel_idx in active_realizations: | ||
try: | ||
resp = self._get( | ||
url=f"ensembles/{ensemble_id}/records/{record_name}", | ||
headers={"accept": "application/x-parquet"}, | ||
params={"realization_index": rel_idx}, | ||
) | ||
stream = io.BytesIO(resp.content) | ||
df = pd.read_parquet(stream).transpose() | ||
df.columns = [rel_idx] | ||
dfs.append(df) | ||
|
||
except DataLoaderException as e: | ||
logger.error(e) | ||
|
||
if dfs == []: | ||
try: | ||
resp = self._get( | ||
url=f"ensembles/{ensemble_id}/records/{record_name}", | ||
headers={"accept": "application/x-parquet"}, | ||
) | ||
stream = io.BytesIO(resp.content) | ||
df = pd.read_parquet(stream).transpose() | ||
|
||
except DataLoaderException as e: | ||
logger.error(e) | ||
return pd.DataFrame() | ||
|
||
return pd.concat(dfs, axis=1) | ||
try: | ||
df.index = df.index.astype(int) | ||
except TypeError: | ||
pass | ||
df = df.sort_index() | ||
return df | ||
|
||
def get_ensemble_record_observations( | ||
self, ensemble_id: str, record_name: str | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you explain why this is optional now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not really sure why pylint was not complaining before about this, it just started, I guess it was updated recently.
The Dataloader
__new__
definition looked like this for quite some timeSo token is expected to he
Optional[str]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks :)