diff --git a/src/fmu/sumo/explorer/objects/_document_collection.py b/src/fmu/sumo/explorer/objects/_document_collection.py index 447edf72..6d92c99d 100644 --- a/src/fmu/sumo/explorer/objects/_document_collection.py +++ b/src/fmu/sumo/explorer/objects/_document_collection.py @@ -179,12 +179,16 @@ def _next_batch(self) -> List[Dict]: if self._after is not None: query["search_after"] = self._after + pit = None if self._pit is not None: - query["pit"] = self._pit.get_pit_object(self._new_pit_id) + pit = self._pit.get_pit_object(self._new_pit_id) + query["pit"] = pit res = self._sumo.post("/search", json=query).json() hits = res["hits"] + self._postprocess_batch(hits["hits"], pit) + if self._pit is not None: self._new_pit_id = res["pit_id"] @@ -218,13 +222,17 @@ async def _next_batch_async(self) -> List[Dict]: if self._after is not None: query["search_after"] = self._after + pit = None if self._pit is not None: - query["pit"] = self._pit.get_pit_object(self._new_pit_id) + pit = self._pit.get_pit_object(self._new_pit_id) + query["pit"] = pit res = await self._sumo.post_async("/search", json=query) data = res.json() hits = data["hits"] + self._postprocess_batch_async(hits["hits"], pit) + if self._pit is not None: self._new_pit_id = data["pit_id"] @@ -237,6 +245,12 @@ async def _next_batch_async(self) -> List[Dict]: return len(hits["hits"]) + def _postprocess_batch(self, hits): + return + + async def _postprocess_batch_async(self, hits): + return + def _init_query(self, doc_type: str, query: Dict = None) -> Dict: """Initialize base filter for document collection diff --git a/src/fmu/sumo/explorer/objects/case.py b/src/fmu/sumo/explorer/objects/case.py index 3a21e17e..f0299a82 100644 --- a/src/fmu/sumo/explorer/objects/case.py +++ b/src/fmu/sumo/explorer/objects/case.py @@ -16,8 +16,10 @@ class Case(Document): """Class for representing a case in Sumo""" - def __init__(self, sumo: SumoClient, metadata: Dict, pit: Pit = None): + def __init__(self, sumo: SumoClient, metadata: Dict, summary: Dict, + pit: Pit = None): super().__init__(metadata) + self._summary = summary self._pit = pit self._sumo = sumo self._utils = Utils(sumo) @@ -28,6 +30,11 @@ def name(self) -> str: """Case name""" return self._get_property(["fmu", "case", "name"]) + @property + def summary(self): + """Summary of case contents.""" + return self._summary + @property def status(self) -> str: """Case status""" diff --git a/src/fmu/sumo/explorer/objects/case_collection.py b/src/fmu/sumo/explorer/objects/case_collection.py index c2a5f93c..98def3a5 100644 --- a/src/fmu/sumo/explorer/objects/case_collection.py +++ b/src/fmu/sumo/explorer/objects/case_collection.py @@ -10,6 +10,76 @@ "exclude": [] } +def _make_summary_query(ids, pit): + query = { + "query": { + "terms": { + "fmu.case.uuid.keyword": ids + } + }, + "aggs": { + "cases": { + "terms": { + "field": "fmu.case.uuid.keyword", + "size": 1000 + }, + "aggs": { + "iteration_uuids": { + "terms": { + "field": "fmu.iteration.uuid.keyword", + "size": 100 + } + }, + "iteration_names": { + "terms": { + "field": "fmu.iteration.name.keyword", + "size": 100 + } + }, + "data_types": { + "terms": { + "field": "class.keyword", + "size": 100 + } + }, + "iterations": { + "terms": { + "field": "fmu.iteration.uuid.keyword", + "size": 100 + }, + "aggs": { + "iteration_name": { + "terms": { + "field": "fmu.iteration.name.keyword", + "size": 100 + } + }, + "numreal": { + "cardinality": { + "field": "fmu.realization.id" + } + }, + "maxreal": { + "max": { + "field": "fmu.realization.id" + } + }, + "minreal": { + "min": { + "field": "fmu.realization.id" + } + } + } + } + } + } + }, + "size": 0 + } + if pit: + query["pit"] = pit + return query + class CaseCollection(DocumentCollection): """A class for representing a collection of cases in Sumo""" @@ -22,6 +92,7 @@ def __init__(self, sumo: SumoClient, query: Dict = None, pit: Pit = None): pit (Pit): point in time """ super().__init__("case", sumo, query, _CASE_FIELDS, pit) + self._summaries = {} @property def names(self) -> List[str]: @@ -79,11 +150,62 @@ async def fields_async(self) -> List[str]: def __getitem__(self, index: int) -> Case: doc = super().__getitem__(index) - return Case(self._sumo, doc, self._pit) + uuid = doc["_id"] + summary = self._summaries[uuid] + return Case(self._sumo, doc, summary, self._pit) async def getitem_async(self, index: int) -> Case: doc = await super().getitem_async(index) - return Case(self._sumo, doc) + uuid = doc["_id"] + summary = self._summaries[uuid] + return Case(self._sumo, doc, summary, self._pit) + + def _postprocess_batch(self, hits, pit): + ids = [hit["_id"] for hit in hits] + query = _make_summary_query(ids, pit) + res = self._sumo.post("/search", json=query) + data = res.json() + aggs = data["aggregations"] + self._insert_summaries(aggs) + return + + async def _postprocess_batch_async(self, hits, pit): + ids = [hit["_id"] for hit in hits] + query = _make_summary_query(ids, pit) + res = await self._sumo.post_async("/search", json=query) + data = res.json() + aggs = data["aggregations"] + self._insert_summaries(aggs) + return + + def _insert_summaries(self, aggs): + def extract_bucket_keys(bucket, name): + return [b["key"] for b in bucket[name]["buckets"]] + for bucket in aggs["cases"]["buckets"]: + caseid = bucket["key"] + iteration_names = extract_bucket_keys(bucket, "iteration_names") + iteration_uuids = extract_bucket_keys(bucket, "iteration_uuids") + data_types = extract_bucket_keys(bucket, "data_types") + iterations = {} + for ibucket in bucket["iterations"]["buckets"]: + iterid = ibucket["key"] + itername = extract_bucket_keys(ibucket, "iteration_name") + minreal = ibucket["minreal"]["value"] + maxreal = ibucket["maxreal"]["value"] + numreal = ibucket["numreal"]["value"] + iterations[iterid] = { + "name": itername, + "minreal": minreal, + "maxreal": maxreal, + "numreal": numreal + } + self._summaries[caseid] = { + "iteration_names": iteration_names, + "iteration_uuids": iteration_uuids, + "data_types": data_types, + "iterations": iterations + } + return def filter( self,