diff --git a/setup.py b/setup.py index 8a7c433b..8689d143 100644 --- a/setup.py +++ b/setup.py @@ -31,7 +31,7 @@ def parse_requirements(fname): from sphinx.setup_command import BuildDoc CMDCLASS.update({"build_sphinx": BuildDoc}) -except ImportError as e: +except ImportError: # sphinx not installed - do not provide build_sphinx cmd pass diff --git a/src/fmu/sumo/explorer/_utils.py b/src/fmu/sumo/explorer/_utils.py index eab123d0..81093839 100644 --- a/src/fmu/sumo/explorer/_utils.py +++ b/src/fmu/sumo/explorer/_utils.py @@ -40,6 +40,36 @@ def get_buckets( return buckets + async def get_buckets_async( + self, + field: str, + query: Dict, + sort: List = None, + ) -> List[Dict]: + """Get a List of buckets + + Arguments: + - field (str): a field in the metadata + - query (List[Dict] or None): filter options + - sort (List or None): sorting options + + Returns: + A List of unique values for a given field + """ + query = { + "size": 0, + "aggs": {f"{field}": {"terms": {"field": field, "size": 2000}}}, + "query": query, + } + + if sort is not None: + query["sort"] = sort + + res = await self._sumo.post_async("/search", json=query) + buckets = res.json()["aggregations"][field]["buckets"] + + return buckets + def get_objects( self, size: int, @@ -65,6 +95,31 @@ def get_objects( return res.json()["hits"]["hits"] + async def get_objects_async( + self, + size: int, + query: Dict, + select: List[str] = None, + ) -> List[Dict]: + """Get objects + + Args: + size (int): number of objects to return + query (List[Dict] or None): filter options + select (List[str] or None): list of metadata fields to return + + Returns: + List[Dict]: A List of metadata + """ + query = {"size": size, "query": query} + + if select is not None: + query["_source"] = select + + res = await self._sumo.post_async("/search", json=query) + + return res.json()["hits"]["hits"] + def get_object(self, uuid: str, select: List[str] = None) -> Dict: """Get metadata object by uuid @@ -92,6 +147,33 @@ def get_object(self, uuid: str, select: List[str] = None) -> Dict: return hits[0] + async def get_object_async(self, uuid: str, select: List[str] = None) -> Dict: + """Get metadata object by uuid + + Args: + uuid (str): uuid of metadata object + select (List[str]): list of metadata fields to return + + Returns: + Dict: a metadata object + """ + + query = { + "query": {"term": {"_id": uuid}}, + "size": 1, + } + + if select is not None: + query["_source"] = select + + res = await self._sumo.post_async("/search", json=query) + hits = res.json()["hits"]["hits"] + + if len(hits) == 0: + raise Exception(f"Document not found: {uuid}") + + return hits[0] + def extend_query_object(self, old: Dict, new: Dict) -> Dict: """Extend query object diff --git a/src/fmu/sumo/explorer/explorer.py b/src/fmu/sumo/explorer/explorer.py index 8bef0e15..a41fae67 100644 --- a/src/fmu/sumo/explorer/explorer.py +++ b/src/fmu/sumo/explorer/explorer.py @@ -87,6 +87,23 @@ def get_permissions(self, asset: str = None): return res + async def get_permissions_async(self, asset: str = None): + """Get permissions + + Args: + asset (str): asset in Sumo + + Returns: + dict: Dictionary of user permissions + """ + res = await self._sumo.get_async("/userpermissions") + + if asset is not None: + if asset not in res: + raise PermissionError(f"No permissions for asset: {asset}") + + return res + def get_case_by_uuid(self, uuid: str) -> Case: """Get case object by uuid @@ -99,6 +116,18 @@ def get_case_by_uuid(self, uuid: str) -> Case: metadata = self._utils.get_object(uuid, _CASE_FIELDS) return Case(self._sumo, metadata) + async def get_case_by_uuid_async(self, uuid: str) -> Case: + """Get case object by uuid + + Args: + uuid (str): case uuid + + Returns: + Case: case object + """ + metadata = await self._utils.get_object_async(uuid, _CASE_FIELDS) + return Case(self._sumo, metadata) + def get_surface_by_uuid(self, uuid: str) -> Surface: """Get surface object by uuid @@ -111,6 +140,18 @@ def get_surface_by_uuid(self, uuid: str) -> Surface: metadata = self._utils.get_object(uuid, _CHILD_FIELDS) return Surface(self._sumo, metadata) + async def get_surface_by_uuid_async(self, uuid: str) -> Surface: + """Get surface object by uuid + + Args: + uuid (str): surface uuid + + Returns: + Surface: surface object + """ + metadata = await self._utils.get_object_async(uuid, _CHILD_FIELDS) + return Surface(self._sumo, metadata) + def get_polygons_by_uuid(self, uuid: str) -> Polygons: """Get polygons object by uuid @@ -123,6 +164,18 @@ def get_polygons_by_uuid(self, uuid: str) -> Polygons: metadata = self._utils.get_object(uuid, _CHILD_FIELDS) return Polygons(self._sumo, metadata) + async def get_polygons_by_uuid_async(self, uuid: str) -> Polygons: + """Get polygons object by uuid + + Args: + uuid (str): polygons uuid + + Returns: + Polygons: polygons object + """ + metadata = await self._utils.get_object_async(uuid, _CHILD_FIELDS) + return Polygons(self._sumo, metadata) + def get_table_by_uuid(self, uuid: str) -> Table: """Get table object by uuid @@ -134,3 +187,15 @@ def get_table_by_uuid(self, uuid: str) -> Table: """ metadata = self._utils.get_object(uuid, _CHILD_FIELDS) return Table(self._sumo, metadata) + + async def get_table_by_uuid_async(self, uuid: str) -> Table: + """Get table object by uuid + + Args: + uuid (str): table uuid + + Returns: + Table: table object + """ + metadata = await self._utils.get_object_async(uuid, _CHILD_FIELDS) + return Table(self._sumo, metadata) diff --git a/src/fmu/sumo/explorer/objects/_child.py b/src/fmu/sumo/explorer/objects/_child.py index 1645558b..dfe6e3b3 100644 --- a/src/fmu/sumo/explorer/objects/_child.py +++ b/src/fmu/sumo/explorer/objects/_child.py @@ -81,3 +81,12 @@ def blob(self) -> BytesIO: self._blob = BytesIO(res) return self._blob + + @property + async def blob_async(self) -> BytesIO: + """Object blob""" + if self._blob is None: + res = await self._sumo.get_async(f"/objects('{self.uuid}')/blob") + self._blob = BytesIO(res) + + return self._blob diff --git a/src/fmu/sumo/explorer/objects/_child_collection.py b/src/fmu/sumo/explorer/objects/_child_collection.py index d929e919..16459102 100644 --- a/src/fmu/sumo/explorer/objects/_child_collection.py +++ b/src/fmu/sumo/explorer/objects/_child_collection.py @@ -51,41 +51,81 @@ def names(self) -> List[str]: """List of unique object names""" return self._get_field_values("data.name.keyword") + @property + async def names_async(self) -> List[str]: + """List of unique object names""" + return await self._get_field_values_async("data.name.keyword") + @property def tagnames(self) -> List[str]: """List of unqiue object tagnames""" return self._get_field_values("data.tagname.keyword") + @property + async def tagnames_async(self) -> List[str]: + """List of unqiue object tagnames""" + return await self._get_field_values_async("data.tagname.keyword") + @property def iterations(self) -> List[int]: """List of unique object iteration names""" return self._get_field_values("fmu.iteration.name.keyword") + @property + async def iterations_async(self) -> List[int]: + """List of unique object iteration names""" + return await self._get_field_values_async("fmu.iteration.name.keyword") + @property def realizations(self) -> List[int]: """List of unique object realization ids""" return self._get_field_values("fmu.realization.id") + @property + async def realizations_async(self) -> List[int]: + """List of unique object realization ids""" + return await self._get_field_values_async("fmu.realization.id") + @property def aggregations(self) -> List[str]: """List of unique object aggregation operations""" return self._get_field_values("fmu.aggregation.operation.keyword") + @property + async def aggregations_async(self) -> List[str]: + """List of unique object aggregation operations""" + return await self._get_field_values_async("fmu.aggregation.operation.keyword") + @property def stages(self) -> List[str]: """List of unique stages""" return self._get_field_values("fmu.context.stage.keyword") + @property + async def stages_async(self) -> List[str]: + """List of unique stages""" + return await self._get_field_values_async("fmu.context.stage.keyword") + @property def stratigraphic(self) -> List[str]: """List of unqiue object stratigraphic""" return self._get_field_values("data.stratigraphic") + @property + async def stratigraphic_async(self) -> List[str]: + """List of unqiue object stratigraphic""" + return await self._get_field_values_async("data.stratigraphic") + @property def vertical_domain(self) -> List[str]: """List of unqiue object vertical domain""" return self._get_field_values("data.vertical_domain") + @property + async def vertical_domain_async(self) -> List[str]: + """List of unqiue object vertical domain""" + return await self._get_field_values_async("data.vertical_domain") + def _init_query(self, doc_type: str, query: Dict = None) -> Dict: new_query = super()._init_query(doc_type, query) case_filter = { diff --git a/src/fmu/sumo/explorer/objects/_document_collection.py b/src/fmu/sumo/explorer/objects/_document_collection.py index 9739eebb..a1eab8df 100644 --- a/src/fmu/sumo/explorer/objects/_document_collection.py +++ b/src/fmu/sumo/explorer/objects/_document_collection.py @@ -41,6 +41,17 @@ def __len__(self) -> int: return self._len + async def length_async(self) -> int: + """Get size of document collection. Async equivalent to 'len(collection)' + + Returns: + Document collection size + """ + if self._len is None: + await self._next_batch_async() + + return self._len + def __getitem__(self, index: int) -> Dict: """Get document @@ -64,6 +75,29 @@ def __getitem__(self, index: int) -> Dict: return self._items[index] + async def getitem_async(self, index: int) -> Dict: + """Get document. Async equivalent to 'collection[index]' + + Arguments: + - index (int): index + + Returns: + A document at a given index + """ + if index >= await self.length_async(): + raise IndexError + + if len(self._items) <= index: + while len(self._items) <= index: + prev_len = len(self._items) + await self._next_batch_async() + curr_len = len(self._items) + + if prev_len == curr_len: + raise IndexError + + return self._items[index] + def _get_field_values( self, field: str, query: Dict = None, key_as_string: bool = False ) -> List: @@ -85,6 +119,27 @@ def _get_field_values( return self._field_values[field] + async def _get_field_values_async( + self, field: str, query: Dict = None, key_as_string: bool = False + ) -> List: + """Get List of unique values for a given field + + Arguments: + - field (str): a metadata field + + Returns: + A List of unique values for the given field + """ + if field not in self._field_values: + bucket_query = self._utils.extend_query_object(self._query, query) + key = "key_as_string" if key_as_string is True else "key" + buckets = await self._utils.get_buckets_async(field, bucket_query) + self._field_values[field] = list( + map(lambda bucket: bucket[key], buckets) + ) + + return self._field_values[field] + def _next_batch(self) -> List[Dict]: """Get next batch of documents @@ -119,6 +174,40 @@ def _next_batch(self) -> List[Dict]: self._after = hits["hits"][-1]["sort"] self._items.extend(hits["hits"]) + async def _next_batch_async(self) -> List[Dict]: + """Get next batch of documents + + Returns: + The next batch of documents + """ + query = { + "query": self._query, + "sort": [{"_doc": {"order": "desc"}}], + "size": 500, + } + + if self._select: + query["_source"] = self._select + + if self._len is None: + query["track_total_hits"] = True + + if self._after is not None: + query["search_after"] = self._after + + if self._pit is not None: + query["pit"] = self._pit.get_pit_object() + + res = await self._sumo.post_async("/search", json=query).json() + hits = res["hits"] + + if self._len is None: + self._len = hits["total"]["value"] + + if len(hits["hits"]) > 0: + self._after = hits["hits"][-1]["sort"] + self._items.extend(hits["hits"]) + def _init_query(self, doc_type: str, query: Dict = None) -> Dict: """Initialize base filter for document collection diff --git a/src/fmu/sumo/explorer/objects/case.py b/src/fmu/sumo/explorer/objects/case.py index 3e48ea4b..527ab958 100644 --- a/src/fmu/sumo/explorer/objects/case.py +++ b/src/fmu/sumo/explorer/objects/case.py @@ -91,6 +91,51 @@ def iterations(self) -> List[Dict]: return self._iterations + @property + async def iterations_async(self) -> List[Dict]: + """List of case iterations""" + if self._iterations is None: + query = { + "query": {"term": {"_sumo.parent_object.keyword": self.uuid}}, + "aggs": { + "id": { + "terms": {"field": "fmu.iteration.id", "size": 50}, + "aggs": { + "name": { + "terms": { + "field": "fmu.iteration.name.keyword", + "size": 1, + } + }, + "realizations": { + "terms": { + "field": "fmu.realization.id", + "size": 1000, + } + }, + }, + }, + }, + "size": 0, + } + + res = await self._sumo.post_async("/search", json=query) + buckets = res.json()["aggregations"]["id"]["buckets"] + iterations = [] + + for bucket in buckets: + iterations.append( + { + "id": bucket["key"], + "name": bucket["name"]["buckets"][0]["key"], + "realizations": len(bucket["realizations"]["buckets"]), + } + ) + + self._iterations = iterations + + return self._iterations + def get_realizations(self, iteration: str = None) -> List[int]: """Get a list of realization ids @@ -118,6 +163,33 @@ def get_realizations(self, iteration: str = None) -> List[int]: return list(map(lambda b: b["key"], buckets)) + async def get_realizations_async(self, iteration: str = None) -> List[int]: + """Get a list of realization ids + + Calling this method without the iteration argument will + return a list of unique realization ids across iterations. + It is not guaranteed that all realizations in this list exists + in all case iterations. + + Args: + iteration (str): iteration name + + Returns: + List[int]: realization ids + """ + must = [{"term": {"_sumo.parent_object.keyword": self.uuid}}] + + if iteration: + must.append({"term": {"fmu.iteration.name.keyword": iteration}}) + + buckets = await self._utils.get_buckets_async( + "fmu.realization.id", + query={"bool": {"must": must}}, + sort=["fmu.realization.id"], + ) + + return list(map(lambda b: b["key"], buckets)) + @property def surfaces(self) -> SurfaceCollection: """List of case surfaces""" diff --git a/src/fmu/sumo/explorer/objects/case_collection.py b/src/fmu/sumo/explorer/objects/case_collection.py index 8063ba4f..08c4238e 100644 --- a/src/fmu/sumo/explorer/objects/case_collection.py +++ b/src/fmu/sumo/explorer/objects/case_collection.py @@ -32,21 +32,41 @@ def names(self) -> List[str]: """List of unique case names""" return self._get_field_values("fmu.case.name.keyword") + @property + async def names_async(self) -> List[str]: + """List of unique case names""" + return await self._get_field_values_async("fmu.case.name.keyword") + @property def statuses(self) -> List[str]: """List of unique statuses""" return self._get_field_values("_sumo.status.keyword") + @property + async def statuses_async(self) -> List[str]: + """List of unique statuses""" + return await self._get_field_values_async("_sumo.status.keyword") + @property def users(self) -> List[str]: """List of unique user names""" return self._get_field_values("fmu.case.user.id.keyword") + @property + async def users_async(self) -> List[str]: + """List of unique user names""" + return await self._get_field_values_async("fmu.case.user.id.keyword") + @property def assets(self) -> List[str]: """List of unique asset names""" return self._get_field_values("access.asset.name.keyword") + @property + async def assets_async(self) -> List[str]: + """List of unique asset names""" + return await self._get_field_values_async("access.asset.name.keyword") + @property def fields(self) -> List[str]: """List of unique field names""" @@ -54,6 +74,13 @@ def fields(self) -> List[str]: "masterdata.smda.field.identifier.keyword" ) + @property + async def fields_async(self) -> List[str]: + """List of unique field names""" + return await self._get_field_values_async( + "masterdata.smda.field.identifier.keyword" + ) + def __getitem__(self, index: int) -> Case: doc = super().__getitem__(index) return Case(self._sumo, doc, self._pit) diff --git a/src/fmu/sumo/explorer/objects/cube.py b/src/fmu/sumo/explorer/objects/cube.py index fa1b20ee..fd473d06 100644 --- a/src/fmu/sumo/explorer/objects/cube.py +++ b/src/fmu/sumo/explorer/objects/cube.py @@ -25,7 +25,16 @@ def _populate_url(self): res = json.loads(res.decode("UTF-8")) self._url = res.get("baseuri") + self.uuid self._sas = res.get("auth") - except: + except Exception: + self._url = res.decode("UTF-8") + + async def _populate_url_async(self): + res = await self._sumo.get_async(f"/objects('{self.uuid}')/blob/authuri") + try: + res = json.loads(res.decode("UTF-8")) + self._url = res.get("baseuri") + self.uuid + self._sas = res.get("auth") + except Exception: self._url = res.decode("UTF-8") @property @@ -33,8 +42,17 @@ def url(self) -> str: if self._url is None: self._populate_url() if self._sas is None: - return self._url - else: + return self._url + else: + return self._url.split("?")[0] + "/" + + @property + async def url_async(self) -> str: + if self._url is None: + await self._populate_url_async() + if self._sas is None: + return self._url + else: return self._url.split("?")[0] + "/" @property @@ -46,14 +64,35 @@ def sas(self) -> str: else: return self._sas + @property + async def sas_async(self) -> str: + if self._url is None: + await self._populate_url_async() + if self._sas is None: + return self._url.split("?")[1] + else: + return self._sas + @property def openvds_handle(self) -> openvds.core.VDS: if self._url is None: self._populate_url() - + + if self._sas is None: + return openvds.open(self._url) + else: + url = "azureSAS" + self._url[5:] + "/" + sas = "Suffix=?" + self._sas + return openvds.open(url, sas) + + @property + async def openvds_handle_async(self) -> openvds.core.VDS: + if self._url is None: + await self._populate_url_async() + if self._sas is None: return openvds.open(self._url) - else: + else: url = "azureSAS" + self._url[5:] + "/" sas = "Suffix=?" + self._sas return openvds.open(url, sas) diff --git a/src/fmu/sumo/explorer/objects/cube_collection.py b/src/fmu/sumo/explorer/objects/cube_collection.py index c5eabf14..e4795fd0 100644 --- a/src/fmu/sumo/explorer/objects/cube_collection.py +++ b/src/fmu/sumo/explorer/objects/cube_collection.py @@ -44,6 +44,13 @@ def timestamps(self) -> List[str]: "data.time.t0.value", TIMESTAMP_QUERY, True ) + @property + async def timestamps_async(self) -> List[str]: + """List of unique timestamps in CubeCollection""" + return await self._get_field_values_async( + "data.time.t0.value", TIMESTAMP_QUERY, True + ) + @property def intervals(self) -> List[Tuple]: """List of unique intervals in CubeCollection""" @@ -78,6 +85,40 @@ def intervals(self) -> List[Tuple]: return intervals + @property + async def intervals_async(self) -> List[Tuple]: + """List of unique intervals in CubeCollection""" + res = await self._sumo.post_async( + "/search", + json={ + "query": self._query, + "aggs": { + "t0": { + "terms": {"field": "data.time.t0.value", "size": 50}, + "aggs": { + "t1": { + "terms": { + "field": "data.time.t1.value", + "size": 50, + } + } + }, + } + }, + }, + ) + + buckets = res.json()["aggregations"]["t0"]["buckets"] + intervals = [] + + for bucket in buckets: + t0 = bucket["key_as_string"] + + for t1 in bucket["t1"]["buckets"]: + intervals.append((t0, t1["key_as_string"])) + + return intervals + def filter( self, name: Union[str, List[str], bool] = None, diff --git a/src/fmu/sumo/explorer/objects/polygons.py b/src/fmu/sumo/explorer/objects/polygons.py index 56ec6597..c7b086c1 100644 --- a/src/fmu/sumo/explorer/objects/polygons.py +++ b/src/fmu/sumo/explorer/objects/polygons.py @@ -29,7 +29,7 @@ def to_dataframe(self) -> pd.DataFrame: stacklevel=2, ) - return self.to_pandas + return self.to_pandas() def to_pandas(self) -> pd.DataFrame: """Get polygons object as a DataFrame @@ -42,3 +42,15 @@ def to_pandas(self) -> pd.DataFrame: return pd.read_csv(self.blob) except TypeError as type_err: raise TypeError(f"Unknown format: {self.format}") from type_err + + async def to_pandas_async(self) -> pd.DataFrame: + """Get polygons object as a DataFrame + + Returns: + DataFrame: A DataFrame object + """ + + try: + return pd.read_csv(await self.blob_async) + except TypeError as type_err: + raise TypeError(f"Unknown format: {self.format}") from type_err diff --git a/src/fmu/sumo/explorer/objects/surface.py b/src/fmu/sumo/explorer/objects/surface.py index 86a078b9..04e32835 100644 --- a/src/fmu/sumo/explorer/objects/surface.py +++ b/src/fmu/sumo/explorer/objects/surface.py @@ -49,3 +49,14 @@ def to_regular_surface(self) -> RegularSurface: return surface_from_file(self.blob) except TypeError as type_err: raise TypeError(f"Unknown format: {self.format}") from type_err + + async def to_regular_surface_async(self) -> RegularSurface: + """Get surface object as a RegularSurface + + Returns: + RegularSurface: A RegularSurface object + """ + try: + return surface_from_file(await self.blob_async) + except TypeError as type_err: + raise TypeError(f"Unknown format: {self.format}") from type_err diff --git a/src/fmu/sumo/explorer/objects/surface_collection.py b/src/fmu/sumo/explorer/objects/surface_collection.py index eafc8cb7..6b9b2335 100644 --- a/src/fmu/sumo/explorer/objects/surface_collection.py +++ b/src/fmu/sumo/explorer/objects/surface_collection.py @@ -48,6 +48,13 @@ def timestamps(self) -> List[str]: "data.time.t0.value", TIMESTAMP_QUERY, True ) + @property + async def timestamps_async(self) -> List[str]: + """List of unique timestamps in CubeCollection""" + return await self._get_field_values_async( + "data.time.t0.value", TIMESTAMP_QUERY, True + ) + @property def intervals(self) -> List[Tuple]: """List of unique intervals in SurfaceCollection""" @@ -82,6 +89,40 @@ def intervals(self) -> List[Tuple]: return intervals + @property + async def intervals_async(self) -> List[Tuple]: + """List of unique intervals in SurfaceCollection""" + res = await self._sumo.post_async( + "/search", + json={ + "query": self._query, + "aggs": { + "t0": { + "terms": {"field": "data.time.t0.value", "size": 50}, + "aggs": { + "t1": { + "terms": { + "field": "data.time.t1.value", + "size": 50, + } + } + }, + } + }, + }, + ) + + buckets = res.json()["aggregations"]["t0"]["buckets"] + intervals = [] + + for bucket in buckets: + t0 = bucket["key_as_string"] + + for t1 in bucket["t1"]["buckets"]: + intervals.append((t0, t1["key_as_string"])) + + return intervals + def _aggregate(self, operation: str) -> xtgeo.RegularSurface: if operation not in self._aggregation_cache: objects = self._utils.get_objects(500, self._query, ["_id"]) @@ -98,6 +139,22 @@ def _aggregate(self, operation: str) -> xtgeo.RegularSurface: return self._aggregation_cache[operation] + async def _aggregate_async(self, operation: str) -> xtgeo.RegularSurface: + if operation not in self._aggregation_cache: + objects = await self._utils.get_objects_async(500, self._query, ["_id"]) + object_ids = list(map(lambda obj: obj["_id"], objects)) + + res = await self._sumo.post_async( + "/aggregate", + json={"operation": [operation], "object_ids": object_ids}, + ) + + self._aggregation_cache[operation] = xtgeo.surface_from_file( + BytesIO(res.content) + ) + + return self._aggregation_cache[operation] + def filter( self, name: Union[str, List[str], bool] = None, @@ -184,26 +241,54 @@ def mean(self) -> xtgeo.RegularSurface: """Perform a mean aggregation""" return self._aggregate("mean") + async def mean_async(self) -> xtgeo.RegularSurface: + """Perform a mean aggregation""" + return await self._aggregate_async("mean") + def min(self) -> xtgeo.RegularSurface: """Perform a minimum aggregation""" return self._aggregate("min") + async def min_async(self) -> xtgeo.RegularSurface: + """Perform a minimum aggregation""" + return await self._aggregate_async("min") + def max(self) -> xtgeo.RegularSurface: """Perform a maximum aggregation""" return self._aggregate("max") + async def max_async(self) -> xtgeo.RegularSurface: + """Perform a maximum aggregation""" + return await self._aggregate_async("max") + def std(self) -> xtgeo.RegularSurface: """Perform a standard deviation aggregation""" return self._aggregate("std") + async def std_async(self) -> xtgeo.RegularSurface: + """Perform a standard deviation aggregation""" + return await self._aggregate_async("std") + def p10(self) -> xtgeo.RegularSurface: """Perform a percentile aggregation""" return self._aggregate("p10") + async def p10_async(self) -> xtgeo.RegularSurface: + """Perform a percentile aggregation""" + return await self._aggregate_async("p10") + def p50(self) -> xtgeo.RegularSurface: """Perform a percentile aggregation""" return self._aggregate("p50") + async def p50_async(self) -> xtgeo.RegularSurface: + """Perform a percentile aggregation""" + return await self._aggregate_async("p50") + def p90(self) -> xtgeo.RegularSurface: """Perform a percentile aggregation""" return self._aggregate("p90") + + async def p90_async(self) -> xtgeo.RegularSurface: + """Perform a percentile aggregation""" + return await self._aggregate_async("p90") \ No newline at end of file diff --git a/src/fmu/sumo/explorer/objects/table.py b/src/fmu/sumo/explorer/objects/table.py index f30f99c2..71d02ef4 100644 --- a/src/fmu/sumo/explorer/objects/table.py +++ b/src/fmu/sumo/explorer/objects/table.py @@ -72,6 +72,40 @@ def to_pandas(self) -> pd.DataFrame: self._logger.debug("Read blob as %s to return pandas", worked) return self._dataframe + async def to_pandas_async(self) -> pd.DataFrame: + """Return object as a pandas DataFrame + + Returns: + DataFrame: A DataFrame object + """ + + if self._dataframe is None: + if self["data"]["format"] == "csv": + worked = "csv" + self._logger.debug("Treating blob as csv") + try: + self._dataframe = pd.read_csv(await self.blob_async) + worked = "csv" + + except UnicodeDecodeError as ud_e: + raise UnicodeDecodeError("Maybe not csv?") from ud_e + else: + try: + worked = "feather" + self._dataframe = pf.read_feather(await self.blob_async) + except pa.lib.ArrowInvalid: + try: + worked = "parquet" + self._dataframe = pd.read_parquet(await self.blob_async) + + except UnicodeDecodeError as ud_error: + raise TypeError( + "Come on, no way this is converting to pandas!!" + ) from ud_error + + self._logger.debug("Read blob as %s to return pandas", worked) + return self._dataframe + @to_pandas.setter def to_pandas(self, frame: pd.DataFrame): self._dataframe = frame @@ -89,9 +123,8 @@ def arrowtable(self) -> pa.Table: stacklevel=2, ) - return self.to_arrow + return self.to_arrow() - @property def to_arrow(self) -> pa.Table: """Return object as an arrow Table @@ -122,3 +155,34 @@ def to_arrow(self) -> pa.Table: self._logger.debug("Read blob as %s to return arrow", worked) return self._arrowtable + + async def to_arrow_async(self) -> pa.Table: + """Return object as an arrow Table + + Returns: + pa.Table: _description_ + """ + if self._arrowtable is None: + if self["data"]["format"] == "arrow": + try: + worked = "feather" + self._arrowtable = pf.read_table(await self.blob_async) + except pa.lib.ArrowInvalid: + worked = "parquet" + self._arrowtable = pq.read_table(await self.blob_async) + else: + warn( + "Reading csv format into arrow, you will not get the full benefit of native arrow" + ) + worked = "csv" + try: + self._arrowtable = pa.Table.from_pandas( + pd.read_csv(await self.blob_async) + ) + + except TypeError as type_err: + raise OSError("Cannot read this into arrow") from type_err + + self._logger.debug("Read blob as %s to return arrow", worked) + + return self._arrowtable diff --git a/src/fmu/sumo/explorer/objects/table_aggregated.py b/src/fmu/sumo/explorer/objects/table_aggregated.py index abc69939..acb99964 100644 --- a/src/fmu/sumo/explorer/objects/table_aggregated.py +++ b/src/fmu/sumo/explorer/objects/table_aggregated.py @@ -79,6 +79,37 @@ def parameters(self): return self._parameters + @property + async def parameters_async(self): + """Return parameter set for iteration + + Returns: + dict: parameters connected to iteration + """ + if not self._parameters: + must = self._utils.build_terms( + { + "class.keyword": "table", + "_sumo.parent_object.keyword": self._case.uuid, + "data.name.keyword": self._name, + "data.tagname.keyword": self._tag, + "fmu.iteration.name.keyword": self._iteration, + "fmu.aggregation.operation.keyword": "collection", + } + ) + + query = { + "size": 1, + "_source": ["fmu.iteration.parameters"], + "query": {"bool": {"must": must}}, + } + + res = await self._sumo.post_async("/search", json=query) + doc = res.json()["hits"]["hits"][0] + self._parameters = doc["_source"]["fmu"]["iteration"]["parameters"] + + return self._parameters + def __len__(self): return len(self._collection) diff --git a/src/fmu/sumo/explorer/objects/table_collection.py b/src/fmu/sumo/explorer/objects/table_collection.py index 71d25a4f..a072c518 100644 --- a/src/fmu/sumo/explorer/objects/table_collection.py +++ b/src/fmu/sumo/explorer/objects/table_collection.py @@ -34,6 +34,11 @@ def columns(self) -> List[str]: """List of unique column names""" return self._get_field_values("data.spec.columns.keyword") + @property + async def columns_async(self) -> List[str]: + """List of unique column names""" + return await self._get_field_values_async("data.spec.columns.keyword") + def filter( self, name: Union[str, List[str], bool] = None, diff --git a/tests/test_aggregated_table.py b/tests/test_aggregated_table.py index 76b9b3a6..0f5d955e 100644 --- a/tests/test_aggregated_table.py +++ b/tests/test_aggregated_table.py @@ -29,7 +29,7 @@ def test_aggregated_summary_arrow(explorer: Explorer): assert len(table.columns) == 972 + 2 column = table["FOPT"] - assert isinstance(column.to_arrow, pa.Table) + assert isinstance(column.to_arrow(), pa.Table) with pytest.raises(IndexError) as e_info: table = table["banana"] assert ( @@ -51,7 +51,7 @@ def test_aggregated_summary_arrow_with_deprecated_function_name( with pytest.warns( DeprecationWarning, - match=".arrowtable is deprecated, renamed to .to_arrow", + match=".arrowtable is deprecated, renamed to .to_arrow()", ): assert isinstance(column.arrowtable, pa.Table) with pytest.raises(IndexError) as e_info: @@ -74,7 +74,7 @@ def test_aggregated_summary_pandas_with_deprecated_function_name( """Test usage of Aggregated class with item_type=pandas with deprecated function name""" case = explorer.cases.filter(name="drogon_ahm-2023-02-22")[0] table = AggregatedTable(case, "summary", "eclipse", "iter-0") - with pytest.warns(match=".dataframe is deprecated, renamed to .to_pandas"): + with pytest.warns(match=".dataframe is deprecated, renamed to .to_pandas()"): mydata = table["FOPT"].dataframe assert isinstance(mydata, pd.DataFrame)