Skip to content

Commit

Permalink
Generate case summaries based on aggregations. Done as part of the me…
Browse files Browse the repository at this point in the history
…thod.
  • Loading branch information
rwiker committed Dec 4, 2023
1 parent d1d3d7a commit 0381cc1
Show file tree
Hide file tree
Showing 3 changed files with 148 additions and 5 deletions.
18 changes: 16 additions & 2 deletions src/fmu/sumo/explorer/objects/_document_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,12 +179,16 @@ def _next_batch(self) -> List[Dict]:
if self._after is not None:
query["search_after"] = self._after

pit = None
if self._pit is not None:
query["pit"] = self._pit.get_pit_object(self._new_pit_id)
pit = self._pit.get_pit_object(self._new_pit_id)
query["pit"] = pit

res = self._sumo.post("/search", json=query).json()
hits = res["hits"]

self._postprocess_batch(hits["hits"], pit)

if self._pit is not None:
self._new_pit_id = res["pit_id"]

Expand Down Expand Up @@ -218,13 +222,17 @@ async def _next_batch_async(self) -> List[Dict]:
if self._after is not None:
query["search_after"] = self._after

pit = None
if self._pit is not None:
query["pit"] = self._pit.get_pit_object(self._new_pit_id)
pit = self._pit.get_pit_object(self._new_pit_id)
query["pit"] = pit

res = await self._sumo.post_async("/search", json=query)
data = res.json()
hits = data["hits"]

self._postprocess_batch_async(hits["hits"], pit)

if self._pit is not None:
self._new_pit_id = data["pit_id"]

Expand All @@ -237,6 +245,12 @@ async def _next_batch_async(self) -> List[Dict]:

return len(hits["hits"])

def _postprocess_batch(self, hits):
return

async def _postprocess_batch_async(self, hits):
return

def _init_query(self, doc_type: str, query: Dict = None) -> Dict:
"""Initialize base filter for document collection
Expand Down
9 changes: 8 additions & 1 deletion src/fmu/sumo/explorer/objects/case.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@
class Case(Document):
"""Class for representing a case in Sumo"""

def __init__(self, sumo: SumoClient, metadata: Dict, pit: Pit = None):
def __init__(self, sumo: SumoClient, metadata: Dict, summary: Dict,
pit: Pit = None):
super().__init__(metadata)
self._summary = summary
self._pit = pit
self._sumo = sumo
self._utils = Utils(sumo)
Expand All @@ -28,6 +30,11 @@ def name(self) -> str:
"""Case name"""
return self._get_property(["fmu", "case", "name"])

@property
def summary(self):
"""Summary of case contents."""
return self._summary

@property
def status(self) -> str:
"""Case status"""
Expand Down
126 changes: 124 additions & 2 deletions src/fmu/sumo/explorer/objects/case_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,76 @@
"exclude": []
}

def _make_summary_query(ids, pit):
query = {
"query": {
"terms": {
"fmu.case.uuid.keyword": ids
}
},
"aggs": {
"cases": {
"terms": {
"field": "fmu.case.uuid.keyword",
"size": 1000
},
"aggs": {
"iteration_uuids": {
"terms": {
"field": "fmu.iteration.uuid.keyword",
"size": 100
}
},
"iteration_names": {
"terms": {
"field": "fmu.iteration.name.keyword",
"size": 100
}
},
"data_types": {
"terms": {
"field": "class.keyword",
"size": 100
}
},
"iterations": {
"terms": {
"field": "fmu.iteration.uuid.keyword",
"size": 100
},
"aggs": {
"iteration_name": {
"terms": {
"field": "fmu.iteration.name.keyword",
"size": 100
}
},
"numreal": {
"cardinality": {
"field": "fmu.realization.id"
}
},
"maxreal": {
"max": {
"field": "fmu.realization.id"
}
},
"minreal": {
"min": {
"field": "fmu.realization.id"
}
}
}
}
}
}
},
"size": 0
}
if pit:
query["pit"] = pit
return query


class CaseCollection(DocumentCollection):
"""A class for representing a collection of cases in Sumo"""
Expand All @@ -22,6 +92,7 @@ def __init__(self, sumo: SumoClient, query: Dict = None, pit: Pit = None):
pit (Pit): point in time
"""
super().__init__("case", sumo, query, _CASE_FIELDS, pit)
self._summaries = {}

@property
def names(self) -> List[str]:
Expand Down Expand Up @@ -79,11 +150,62 @@ async def fields_async(self) -> List[str]:

def __getitem__(self, index: int) -> Case:
doc = super().__getitem__(index)
return Case(self._sumo, doc, self._pit)
uuid = doc["_id"]
summary = self._summaries[uuid]
return Case(self._sumo, doc, summary, self._pit)

async def getitem_async(self, index: int) -> Case:
doc = await super().getitem_async(index)
return Case(self._sumo, doc)
uuid = doc["_id"]
summary = self._summaries[uuid]
return Case(self._sumo, doc, summary, self._pit)

def _postprocess_batch(self, hits, pit):
ids = [hit["_id"] for hit in hits]
query = _make_summary_query(ids, pit)
res = self._sumo.post("/search", json=query)
data = res.json()
aggs = data["aggregations"]
self._insert_summaries(aggs)
return

async def _postprocess_batch_async(self, hits, pit):
ids = [hit["_id"] for hit in hits]
query = _make_summary_query(ids, pit)
res = await self._sumo.post_async("/search", json=query)
data = res.json()
aggs = data["aggregations"]
self._insert_summaries(aggs)
return

def _insert_summaries(self, aggs):
def extract_bucket_keys(bucket, name):
return [b["key"] for b in bucket[name]["buckets"]]
for bucket in aggs["cases"]["buckets"]:
caseid = bucket["key"]
iteration_names = extract_bucket_keys(bucket, "iteration_names")
iteration_uuids = extract_bucket_keys(bucket, "iteration_uuids")
data_types = extract_bucket_keys(bucket, "data_types")
iterations = {}
for ibucket in bucket["iterations"]["buckets"]:
iterid = ibucket["key"]
itername = extract_bucket_keys(ibucket, "iteration_name")
minreal = ibucket["minreal"]["value"]
maxreal = ibucket["maxreal"]["value"]
numreal = ibucket["numreal"]["value"]
iterations[iterid] = {
"name": itername,
"minreal": minreal,
"maxreal": maxreal,
"numreal": numreal
}
self._summaries[caseid] = {
"iteration_names": iteration_names,
"iteration_uuids": iteration_uuids,
"data_types": data_types,
"iterations": iterations
}
return

def filter(
self,
Expand Down

0 comments on commit 0381cc1

Please sign in to comment.