diff --git a/src/fmu/sumo/explorer/explorer.py b/src/fmu/sumo/explorer/explorer.py index 22fe6e3f..99d2a4dc 100644 --- a/src/fmu/sumo/explorer/explorer.py +++ b/src/fmu/sumo/explorer/explorer.py @@ -1,4 +1,5 @@ from sumo.wrapper import SumoClient +from fmu.sumo.explorer.pit import Pit from fmu.sumo.explorer.objects.case_collection import ( CaseCollection, _CASE_FIELDS, @@ -30,22 +31,43 @@ def __init__( env: str = "prod", token: str = None, interactive: bool = True, + keep_alive: str = None, ): """Initialize the Explorer class + When iterating over large datasets, use the `keep_alive` argument + to create a snapshot of the data to ensure consistency. The + argument specifies the lifespan of the snapshot and every + request to the Sumo API will extend the lifetime of the snapshot + with the specified `keep_alive` value. The argument uses a format + of a number followed by a unit indicator. Supported indicators are: + - d (day) + - h (hour) + - m (minute) + - s (second) + - ms (milisecond) + - micros (microsecond) + - nanos (nanosecond) + + Examples: 1d, 2h, 15m, 30s + + Every request to Sumo will extend the lifespan of the snapshot + by the time specified in `keep_alive`. + Args: env (str): Sumo environment token (str): authenticate with existing token interactive (bool): authenticate using interactive flow (browser) + keep_alive (str): point in time lifespan """ self._sumo = SumoClient(env, token=token, interactive=interactive) - self._cases = CaseCollection(self._sumo) + self._pit = Pit(self._sumo, keep_alive) if keep_alive else None self._utils = Utils(self._sumo) @property - def cases(self) -> CaseCollection: - """A collection of accessible cases in Sumo""" - return self._cases + def cases(self): + """Cases in Sumo""" + return CaseCollection(sumo=self._sumo, pit=self._pit) def get_permissions(self, asset: str = None): """Get permissions diff --git a/src/fmu/sumo/explorer/objects/_child_collection.py b/src/fmu/sumo/explorer/objects/_child_collection.py index 61eefeef..8235d100 100644 --- a/src/fmu/sumo/explorer/objects/_child_collection.py +++ b/src/fmu/sumo/explorer/objects/_child_collection.py @@ -2,6 +2,7 @@ from typing import List, Dict, Union from sumo.wrapper import SumoClient from fmu.sumo.explorer.timefilter import TimeFilter +from fmu.sumo.explorer.pit import Pit _CHILD_FIELDS = [ "_id", @@ -27,10 +28,15 @@ class ChildCollection(DocumentCollection): """Class for representing a collection of child objects in Sumo""" def __init__( - self, type: str, sumo: SumoClient, case_uuid: str, query: Dict = None + self, + type: str, + sumo: SumoClient, + case_uuid: str, + query: Dict = None, + pit: Pit = None, ): self._case_uuid = case_uuid - super().__init__(type, sumo, query, _CHILD_FIELDS) + super().__init__(type, sumo, query, _CHILD_FIELDS, pit) @property def names(self) -> List[str]: diff --git a/src/fmu/sumo/explorer/objects/_document_collection.py b/src/fmu/sumo/explorer/objects/_document_collection.py index 86aabd66..226dda01 100644 --- a/src/fmu/sumo/explorer/objects/_document_collection.py +++ b/src/fmu/sumo/explorer/objects/_document_collection.py @@ -1,6 +1,7 @@ from sumo.wrapper import SumoClient from fmu.sumo.explorer._utils import Utils from typing import List, Dict +from fmu.sumo.explorer.pit import Pit class DocumentCollection: @@ -12,10 +13,14 @@ def __init__( sumo: SumoClient, query: Dict = None, select: List[str] = None, + pit: Pit = None, ): self._utils = Utils(sumo) self._type = type self._sumo = sumo + self._query = self._init_query(type, query) + self._pit = pit + self._after = None self._curr_index = 0 self._len = None @@ -100,6 +105,9 @@ def _next_batch(self) -> List[Dict]: if self._after is not None: query["search_after"] = self._after + if self._pit is not None: + query["pit"] = self._pit.get_pit_object() + res = self._sumo.post("/search", json=query).json() hits = res["hits"] diff --git a/src/fmu/sumo/explorer/objects/case.py b/src/fmu/sumo/explorer/objects/case.py index 9f09e761..a4c37abb 100644 --- a/src/fmu/sumo/explorer/objects/case.py +++ b/src/fmu/sumo/explorer/objects/case.py @@ -4,14 +4,16 @@ from fmu.sumo.explorer.objects.polygons_collection import PolygonsCollection from fmu.sumo.explorer.objects.table_collection import TableCollection from fmu.sumo.explorer._utils import Utils +from fmu.sumo.explorer.pit import Pit from typing import Dict, List class Case(Document): """Class for representing a case in Sumo""" - def __init__(self, sumo: SumoClient, metadata: Dict): + def __init__(self, sumo: SumoClient, metadata: Dict, pit: Pit = None): super().__init__(metadata) + self._pit = pit self._sumo = sumo self._utils = Utils(sumo) self._iterations = None @@ -117,14 +119,14 @@ def get_realizations(self, iteration: str = None) -> List[int]: @property def surfaces(self) -> SurfaceCollection: """List of case surfaces""" - return SurfaceCollection(self._sumo, self._uuid) + return SurfaceCollection(self._sumo, self._uuid, pit=self._pit) @property def polygons(self) -> PolygonsCollection: """List of case polygons""" - return PolygonsCollection(self._sumo, self._uuid) + return PolygonsCollection(self._sumo, self._uuid, pit=self._pit) @property def tables(self) -> TableCollection: """List of case tables""" - return TableCollection(self._sumo, self._uuid) + return TableCollection(self._sumo, self._uuid, pit=self._pit) diff --git a/src/fmu/sumo/explorer/objects/case_collection.py b/src/fmu/sumo/explorer/objects/case_collection.py index 23b11947..4396440b 100644 --- a/src/fmu/sumo/explorer/objects/case_collection.py +++ b/src/fmu/sumo/explorer/objects/case_collection.py @@ -2,6 +2,7 @@ from sumo.wrapper import SumoClient from fmu.sumo.explorer.objects.case import Case from typing import Union, List, Dict +from fmu.sumo.explorer.pit import Pit _CASE_FIELDS = [ "_id", @@ -16,8 +17,14 @@ class CaseCollection(DocumentCollection): """A class for representing a collection of cases in Sumo""" - def __init__(self, sumo: SumoClient, query: Dict = None): - super().__init__("case", sumo, query, _CASE_FIELDS) + def __init__(self, sumo: SumoClient, query: Dict = None, pit: Pit = None): + """ + Args: + sumo (SumoClient): connection to Sumo + query (dict): elastic query object + pit (Pit): point in time + """ + super().__init__("case", sumo, query, _CASE_FIELDS, pit) @property def names(self) -> List[str]: @@ -48,7 +55,7 @@ def fields(self) -> List[str]: def __getitem__(self, index: int) -> Case: doc = super().__getitem__(index) - return Case(self._sumo, doc) + return Case(self._sumo, doc, self._pit) def filter( self, @@ -84,4 +91,4 @@ def filter( ) query = super()._add_filter({"bool": {"must": must}}) - return CaseCollection(self._sumo, query) + return CaseCollection(self._sumo, query, self._pit) diff --git a/src/fmu/sumo/explorer/objects/polygons_collection.py b/src/fmu/sumo/explorer/objects/polygons_collection.py index 350806cd..6babab9f 100644 --- a/src/fmu/sumo/explorer/objects/polygons_collection.py +++ b/src/fmu/sumo/explorer/objects/polygons_collection.py @@ -1,14 +1,28 @@ from sumo.wrapper import SumoClient from fmu.sumo.explorer.objects._child_collection import ChildCollection from fmu.sumo.explorer.objects.polygons import Polygons +from fmu.sumo.explorer.pit import Pit from typing import Union, List, Dict class PolygonsCollection(ChildCollection): """Class for representing a collection of polygons objects in Sumo""" - def __init__(self, sumo: SumoClient, case_uuid: str, query: Dict = None): - super().__init__("polygons", sumo, case_uuid, query) + def __init__( + self, + sumo: SumoClient, + case_uuid: str, + query: Dict = None, + pit: Pit = None, + ): + """ + Args: + sumo (SumoClient): connection to Sumo + case_uuid (str): parent case uuid + query (dict): elastic query object + pit (Pit): point in time + """ + super().__init__("polygons", sumo, case_uuid, query, pit) def __getitem__(self, index) -> Polygons: doc = super().__getitem__(index) @@ -41,4 +55,7 @@ def filter( realization=realization, uuid=uuid, ) - return PolygonsCollection(self._sumo, self._case_uuid, query) + + return PolygonsCollection( + self._sumo, self._case_uuid, query, self._pit + ) diff --git a/src/fmu/sumo/explorer/objects/surface_collection.py b/src/fmu/sumo/explorer/objects/surface_collection.py index 85ab3875..fa2de823 100644 --- a/src/fmu/sumo/explorer/objects/surface_collection.py +++ b/src/fmu/sumo/explorer/objects/surface_collection.py @@ -2,6 +2,7 @@ from fmu.sumo.explorer.objects._child_collection import ChildCollection from fmu.sumo.explorer.objects.surface import Surface from fmu.sumo.explorer.timefilter import TimeFilter +from fmu.sumo.explorer.pit import Pit import xtgeo from io import BytesIO from typing import Union, List, Dict, Tuple @@ -17,14 +18,22 @@ class SurfaceCollection(ChildCollection): """Class representing a collection of surface objects in Sumo""" - def __init__(self, sumo: SumoClient, case_uuid: str, query: Dict = None): + def __init__( + self, + sumo: SumoClient, + case_uuid: str, + query: Dict = None, + pit: Pit = None, + ): """ Args: sumo (SumoClient): connection to Sumo case_uuid (str): parent case uuid query (dict): elastic query object + pit (Pit): point in time """ - super().__init__("surface", sumo, case_uuid, query) + super().__init__("surface", sumo, case_uuid, query, pit) + self._aggregation_cache = {} def __getitem__(self, index) -> Surface: @@ -162,7 +171,7 @@ def filter( uuid=uuid, ) - return SurfaceCollection(self._sumo, self._case_uuid, query) + return SurfaceCollection(self._sumo, self._case_uuid, query, self._pit) def mean(self) -> xtgeo.RegularSurface: """Perform a mean aggregation""" diff --git a/src/fmu/sumo/explorer/objects/table_collection.py b/src/fmu/sumo/explorer/objects/table_collection.py index 64b8f47c..67fcc351 100644 --- a/src/fmu/sumo/explorer/objects/table_collection.py +++ b/src/fmu/sumo/explorer/objects/table_collection.py @@ -1,14 +1,28 @@ from sumo.wrapper import SumoClient from fmu.sumo.explorer.objects._child_collection import ChildCollection from fmu.sumo.explorer.objects.table import Table +from fmu.sumo.explorer.pit import Pit from typing import Union, List, Dict class TableCollection(ChildCollection): """Class for representing a collection of table objects in Sumo""" - def __init__(self, sumo: SumoClient, case_uuid: str, query: Dict = None): - super().__init__("table", sumo, case_uuid, query) + def __init__( + self, + sumo: SumoClient, + case_uuid: str, + query: Dict = None, + pit: Pit = None, + ): + """ + Args: + sumo (SumoClient): connection to Sumo + case_uuid (str): parent case uuid + query (dict): elastic query object + pit (Pit): point in time + """ + super().__init__("table", sumo, case_uuid, query, pit) def __getitem__(self, index) -> Table: doc = super().__getitem__(index) @@ -55,4 +69,4 @@ def filter( column=column, uuid=uuid, ) - return TableCollection(self._sumo, self._case_uuid, query) + return TableCollection(self._sumo, self._case_uuid, query, self._pit) diff --git a/src/fmu/sumo/explorer/pit.py b/src/fmu/sumo/explorer/pit.py new file mode 100644 index 00000000..c944a49b --- /dev/null +++ b/src/fmu/sumo/explorer/pit.py @@ -0,0 +1,16 @@ +from sumo.wrapper import SumoClient +from typing import Dict + + +class Pit: + def __init__(self, sumo: SumoClient, keep_alive: str) -> None: + self._sumo = sumo + self._keep_alive = keep_alive + self._pit_id = self.__get_pit_id(keep_alive) + + def __get_pit_id(self, keep_alive) -> str: + res = self._sumo.post("/pit", params={"keep-alive": keep_alive}) + return res.json()["id"] + + def get_pit_object(self) -> Dict: + return {"id": self._pit_id, "keep_alive": self._keep_alive}