Skip to content

Commit

Permalink
Try and avoid extra round-trip per surface
Browse files Browse the repository at this point in the history
  • Loading branch information
sigurdp committed Oct 16, 2023
1 parent 3e094ad commit 6c35dce
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 20 deletions.
42 changes: 22 additions & 20 deletions backend/src/backend/experiments/calc_surf_isec_inmem.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,25 +41,18 @@ def get(self, case_uuid: str, ensemble_name: str, name: str, attribute: str, rea

@dataclass
class SurfItem:
# access_token: str
case_uuid: str
ensemble_name: str
name: str
attribute: str
real: int
# fence_arr: np.ndarray


@dataclass
class ResultItem:
perf_info: str
line: np.ndarray


global_access = None
global_many_surfs_getter = None


def init_access(access_token: str, case_uuid: str, ensemble_name: str):
def init_access(access_token: str, case_uuid: str, ensemble_name: str, name: str, attribute: str):
# !!!!!!!!!!!!!
# See: https://github.com/tiangolo/fastapi/issues/1487#issuecomment-1157066306
signal.set_wakeup_fd(-1)
Expand All @@ -69,23 +62,30 @@ def init_access(access_token: str, case_uuid: str, ensemble_name: str):
global global_access
global_access = SurfaceAccess.from_case_uuid_sync(access_token, case_uuid, ensemble_name)

global global_many_surfs_getter
global_many_surfs_getter = global_access.prepare_for_getting_many_realizations(name=name, attribute=attribute)


def fetch_a_surf(item: SurfItem) -> bytes:
print(f">>>> fetch_a_surf {item.real=}", flush=True)
perf_metrics = PerfMetrics()

access = global_access
# access = await SurfaceAccess.from_case_uuid(item.access_token, item.case_uuid, item.ensemble_name)
perf_metrics.record_lap("access")
# surf_bytes = global_access.get_realization_surface_bytes_sync(real_num=item.real, name=item.name, attribute=item.attribute)
# if surf_bytes is None:
# return None

# xtgeo_surf = global_access.get_realization_surface_data_sync(real_num=item.real, name=item.name, attribute=item.attribute)
# if xtgeo_surf is None:
# return None

surf_bytes = access.get_realization_surface_bytes_sync(real_num=item.real, name=item.name, attribute=item.attribute)
if surf_bytes is None:
xtgeo_surf = global_many_surfs_getter.get_real(real_num=item.real)
if xtgeo_surf is None:
return None
perf_metrics.record_lap("fetch")

print(f">>>> fetch_a_surf {item.real=} done", flush=True)

return surf_bytes
#return surf_bytes
return xtgeo_surf



async def calc_surf_isec_inmem(
Expand Down Expand Up @@ -132,16 +132,18 @@ async def calc_surf_isec_inmem(


if len(items_to_fetch_list) > 0:
with multiprocessing.Pool(initializer=init_access, initargs=(access_token, case_uuid, ensemble_name)) as pool:
with multiprocessing.Pool(initializer=init_access, initargs=(access_token, case_uuid, ensemble_name, name, attribute)) as pool:
print(f"{myprefix} just before map", flush=True)
res_item_arr = pool.map(fetch_a_surf, items_to_fetch_list)
print(f"{myprefix} back from map {len(res_item_arr)=}", flush=True)

for idx, res_item in enumerate(res_item_arr):
xtgeo_surf = None
if res_item is not None:
print(f"{myprefix} {type(res_item)=}", flush=True)
byte_stream = io.BytesIO(res_item)
xtgeo_surf = xtgeo.surface_from_file(byte_stream)
xtgeo_surf = res_item
# byte_stream = io.BytesIO(res_item)
# xtgeo_surf = xtgeo.surface_from_file(byte_stream)

xtgeo_surf_arr.append(xtgeo_surf)
IN_MEM_SURF_CACHE.set(case_uuid, ensemble_name, items_to_fetch_list[idx].name, items_to_fetch_list[idx].attribute, items_to_fetch_list[idx].real, cache_entry=SurfCacheEntry(xtgeo_surf))
Expand Down
69 changes: 69 additions & 0 deletions backend/src/services/sumo_access/surface_access.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,23 @@
LOGGER = logging.getLogger(__name__)


class ManyRealSurfsGetter:
def __init__(self, sumo_surf_arr: List[Surface]):
self._sumo_surf_arr = sumo_surf_arr

def get_real(self, real_num: int) -> Optional[xtgeo.RegularSurface]:
for sumo_surf in self._sumo_surf_arr:
if sumo_surf.realization == real_num:
byte_stream: BytesIO = sumo_surf.blob
xtgeo_surf = xtgeo.surface_from_file(byte_stream)
return xtgeo_surf

return None





class SurfaceAccess(SumoEnsemble):
async def get_surface_directory(self) -> List[SurfaceMeta]:
surface_collection: SurfaceCollection = self._case.surfaces.filter(
Expand Down Expand Up @@ -50,6 +67,58 @@ async def get_surface_directory(self) -> List[SurfaceMeta]:

return surfs

def prepare_for_getting_many_realizations(
self, name: str, attribute: str, time_or_interval_str: Optional[str] = None
) -> ManyRealSurfsGetter:
"""
Get surface data for a realization surface
"""
timer = PerfTimer()
addr_str = self._make_addr_str(-1, name, attribute, time_or_interval_str)

if time_or_interval_str is None:
time_filter = TimeFilter(TimeType.NONE)

else:
timestamp_arr = time_or_interval_str.split("/", 1)
if len(timestamp_arr) == 0 or len(timestamp_arr) > 2:
raise ValueError("time_or_interval_str must contain a single timestamp or interval")
if len(timestamp_arr) == 1:
time_filter = TimeFilter(
TimeType.TIMESTAMP,
start=timestamp_arr[0],
end=timestamp_arr[0],
exact=True,
)
else:
time_filter = TimeFilter(
TimeType.INTERVAL,
start=timestamp_arr[0],
end=timestamp_arr[1],
exact=True,
)

surface_collection: SurfaceCollection = self._case.surfaces.filter(
iteration=self._iteration_name,
aggregation=False,
realization=None,
name=name,
tagname=attribute,
time=time_filter,
)

surf_count = len(surface_collection)
if surf_count == 0:
LOGGER.warning(f"No realization surface found in Sumo for {addr_str}")
return None

sumo_surf_arr = []
for sumo_surf in surface_collection:
sumo_surf_arr.append(sumo_surf)

return ManyRealSurfsGetter(sumo_surf_arr)


def get_realization_surface_data_sync(
self, real_num: int, name: str, attribute: str, time_or_interval_str: Optional[str] = None
) -> Optional[xtgeo.RegularSurface]:
Expand Down

0 comments on commit 6c35dce

Please sign in to comment.