From 6c35dce45551f77dc098838eaaab4dce2c19fbae Mon Sep 17 00:00:00 2001 From: Sigurd Pettersen Date: Mon, 16 Oct 2023 16:38:53 +0200 Subject: [PATCH] Try and avoid extra round-trip per surface --- .../experiments/calc_surf_isec_inmem.py | 42 +++++------ .../services/sumo_access/surface_access.py | 69 +++++++++++++++++++ 2 files changed, 91 insertions(+), 20 deletions(-) diff --git a/backend/src/backend/experiments/calc_surf_isec_inmem.py b/backend/src/backend/experiments/calc_surf_isec_inmem.py index 4f020beab..e39410572 100644 --- a/backend/src/backend/experiments/calc_surf_isec_inmem.py +++ b/backend/src/backend/experiments/calc_surf_isec_inmem.py @@ -41,25 +41,18 @@ def get(self, case_uuid: str, ensemble_name: str, name: str, attribute: str, rea @dataclass class SurfItem: - # access_token: str case_uuid: str ensemble_name: str name: str attribute: str real: int - # fence_arr: np.ndarray - - -@dataclass -class ResultItem: - perf_info: str - line: np.ndarray global_access = None +global_many_surfs_getter = None -def init_access(access_token: str, case_uuid: str, ensemble_name: str): +def init_access(access_token: str, case_uuid: str, ensemble_name: str, name: str, attribute: str): # !!!!!!!!!!!!! # See: https://github.com/tiangolo/fastapi/issues/1487#issuecomment-1157066306 signal.set_wakeup_fd(-1) @@ -69,23 +62,30 @@ def init_access(access_token: str, case_uuid: str, ensemble_name: str): global global_access global_access = SurfaceAccess.from_case_uuid_sync(access_token, case_uuid, ensemble_name) + global global_many_surfs_getter + global_many_surfs_getter = global_access.prepare_for_getting_many_realizations(name=name, attribute=attribute) + def fetch_a_surf(item: SurfItem) -> bytes: print(f">>>> fetch_a_surf {item.real=}", flush=True) - perf_metrics = PerfMetrics() - access = global_access - # access = await SurfaceAccess.from_case_uuid(item.access_token, item.case_uuid, item.ensemble_name) - perf_metrics.record_lap("access") + # surf_bytes = global_access.get_realization_surface_bytes_sync(real_num=item.real, name=item.name, attribute=item.attribute) + # if surf_bytes is None: + # return None + + # xtgeo_surf = global_access.get_realization_surface_data_sync(real_num=item.real, name=item.name, attribute=item.attribute) + # if xtgeo_surf is None: + # return None - surf_bytes = access.get_realization_surface_bytes_sync(real_num=item.real, name=item.name, attribute=item.attribute) - if surf_bytes is None: + xtgeo_surf = global_many_surfs_getter.get_real(real_num=item.real) + if xtgeo_surf is None: return None - perf_metrics.record_lap("fetch") print(f">>>> fetch_a_surf {item.real=} done", flush=True) - return surf_bytes + #return surf_bytes + return xtgeo_surf + async def calc_surf_isec_inmem( @@ -132,7 +132,8 @@ async def calc_surf_isec_inmem( if len(items_to_fetch_list) > 0: - with multiprocessing.Pool(initializer=init_access, initargs=(access_token, case_uuid, ensemble_name)) as pool: + with multiprocessing.Pool(initializer=init_access, initargs=(access_token, case_uuid, ensemble_name, name, attribute)) as pool: + print(f"{myprefix} just before map", flush=True) res_item_arr = pool.map(fetch_a_surf, items_to_fetch_list) print(f"{myprefix} back from map {len(res_item_arr)=}", flush=True) @@ -140,8 +141,9 @@ async def calc_surf_isec_inmem( xtgeo_surf = None if res_item is not None: print(f"{myprefix} {type(res_item)=}", flush=True) - byte_stream = io.BytesIO(res_item) - xtgeo_surf = xtgeo.surface_from_file(byte_stream) + xtgeo_surf = res_item + # byte_stream = io.BytesIO(res_item) + # xtgeo_surf = xtgeo.surface_from_file(byte_stream) xtgeo_surf_arr.append(xtgeo_surf) IN_MEM_SURF_CACHE.set(case_uuid, ensemble_name, items_to_fetch_list[idx].name, items_to_fetch_list[idx].attribute, items_to_fetch_list[idx].real, cache_entry=SurfCacheEntry(xtgeo_surf)) diff --git a/backend/src/services/sumo_access/surface_access.py b/backend/src/services/sumo_access/surface_access.py index 6639ece64..34e7f8b61 100644 --- a/backend/src/services/sumo_access/surface_access.py +++ b/backend/src/services/sumo_access/surface_access.py @@ -16,6 +16,23 @@ LOGGER = logging.getLogger(__name__) +class ManyRealSurfsGetter: + def __init__(self, sumo_surf_arr: List[Surface]): + self._sumo_surf_arr = sumo_surf_arr + + def get_real(self, real_num: int) -> Optional[xtgeo.RegularSurface]: + for sumo_surf in self._sumo_surf_arr: + if sumo_surf.realization == real_num: + byte_stream: BytesIO = sumo_surf.blob + xtgeo_surf = xtgeo.surface_from_file(byte_stream) + return xtgeo_surf + + return None + + + + + class SurfaceAccess(SumoEnsemble): async def get_surface_directory(self) -> List[SurfaceMeta]: surface_collection: SurfaceCollection = self._case.surfaces.filter( @@ -50,6 +67,58 @@ async def get_surface_directory(self) -> List[SurfaceMeta]: return surfs + def prepare_for_getting_many_realizations( + self, name: str, attribute: str, time_or_interval_str: Optional[str] = None + ) -> ManyRealSurfsGetter: + """ + Get surface data for a realization surface + """ + timer = PerfTimer() + addr_str = self._make_addr_str(-1, name, attribute, time_or_interval_str) + + if time_or_interval_str is None: + time_filter = TimeFilter(TimeType.NONE) + + else: + timestamp_arr = time_or_interval_str.split("/", 1) + if len(timestamp_arr) == 0 or len(timestamp_arr) > 2: + raise ValueError("time_or_interval_str must contain a single timestamp or interval") + if len(timestamp_arr) == 1: + time_filter = TimeFilter( + TimeType.TIMESTAMP, + start=timestamp_arr[0], + end=timestamp_arr[0], + exact=True, + ) + else: + time_filter = TimeFilter( + TimeType.INTERVAL, + start=timestamp_arr[0], + end=timestamp_arr[1], + exact=True, + ) + + surface_collection: SurfaceCollection = self._case.surfaces.filter( + iteration=self._iteration_name, + aggregation=False, + realization=None, + name=name, + tagname=attribute, + time=time_filter, + ) + + surf_count = len(surface_collection) + if surf_count == 0: + LOGGER.warning(f"No realization surface found in Sumo for {addr_str}") + return None + + sumo_surf_arr = [] + for sumo_surf in surface_collection: + sumo_surf_arr.append(sumo_surf) + + return ManyRealSurfsGetter(sumo_surf_arr) + + def get_realization_surface_data_sync( self, real_num: int, name: str, attribute: str, time_or_interval_str: Optional[str] = None ) -> Optional[xtgeo.RegularSurface]: