Skip to content

Commit

Permalink
Try redis and spawn
Browse files Browse the repository at this point in the history
  • Loading branch information
sigurdp committed Oct 22, 2023
1 parent 3018c32 commit 58e53fd
Showing 1 changed file with 42 additions and 5 deletions.
47 changes: 42 additions & 5 deletions backend/src/backend/experiments/calc_surf_isec_inmem.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from typing import List
import multiprocessing
import xtgeo
import io
import asyncio

# from concurrent.futures import ProcessPoolExecutor
from dataclasses import dataclass
Expand All @@ -14,6 +14,7 @@
from src.services.utils.authenticated_user import AuthenticatedUser
from src.backend.primary.routers.surface import schemas
from src.backend.utils.perf_metrics import PerfMetrics
from src.backend.experiments.caching import get_user_cache

LOGGER = logging.getLogger(__name__)

Expand All @@ -36,6 +37,29 @@ def get(self, case_uuid: str, ensemble_name: str, name: str, attribute: str, rea
return surf


class RedisSurfCache:
def __init__(self, authenticated_user: AuthenticatedUser):
self._user_cache = get_user_cache(authenticated_user)

async def set(self, case_uuid: str, ensemble_name: str, name: str, attribute: str, real: int, cache_entry: SurfCacheEntry):
cache_key = f"surface:{case_uuid}_{ensemble_name}_Real{real}_{name}_{attribute}"
xtgeo_surf = cache_entry.surf
if xtgeo_surf is None:
xtgeo_surf = xtgeo.RegularSurface(1, 1, 1, 1)
await self._user_cache.set_RegularSurface_HACK(cache_key, xtgeo_surf)

async def get(self, case_uuid: str, ensemble_name: str, name: str, attribute: str, real: int) -> SurfCacheEntry | None:
cache_key = f"surface:{case_uuid}_{ensemble_name}_Real{real}_{name}_{attribute}"
xtgeo_surf = await self._user_cache.get_RegularSurface_HACK(cache_key)
if xtgeo_surf is None:
return None

if xtgeo_surf.ncol == 1 and xtgeo_surf.nrow == 1:
return SurfCacheEntry(surf=None)

return SurfCacheEntry(surf=xtgeo_surf)


IN_MEM_SURF_CACHE = InMemSurfCache()


Expand Down Expand Up @@ -112,8 +136,19 @@ async def calc_surf_isec_inmem(
xtgeo_surf_arr = []
items_to_fetch_list = []

redis_surf_cache = RedisSurfCache(authenticated_user)

coro_arr = []
for real in reals:
coro_arr.append(redis_surf_cache.get(case_uuid, ensemble_name, name, attribute, real))
print("awaiting cache_entry_arr", flush=True)
cache_entry_arr: List[SurfCacheEntry | None] = await asyncio.gather(*coro_arr)
print("resolved cache_entry_arr", flush=True)

for real in reals:
cache_entry = IN_MEM_SURF_CACHE.get(case_uuid, ensemble_name, name, attribute, real)
#cache_entry = IN_MEM_SURF_CACHE.get(case_uuid, ensemble_name, name, attribute, real)
#cache_entry = await redis_surf_cache.get(case_uuid, ensemble_name, name, attribute, real)
cache_entry = cache_entry_arr[real]
if cache_entry is not None:
xtgeo_surf_arr.append(cache_entry.surf)
else:
Expand All @@ -130,9 +165,10 @@ async def calc_surf_isec_inmem(
print(f"{myprefix} {len(xtgeo_surf_arr)=}", flush=True)
print(f"{myprefix} {len(items_to_fetch_list)=}", flush=True)


if len(items_to_fetch_list) > 0:
with multiprocessing.Pool(initializer=init_access, initargs=(access_token, case_uuid, ensemble_name, name, attribute)) as pool:
context = multiprocessing.get_context("spawn")
with context.Pool(initializer=init_access, initargs=(access_token, case_uuid, ensemble_name, name, attribute)) as pool:
#with multiprocessing.Pool(initializer=init_access, initargs=(access_token, case_uuid, ensemble_name, name, attribute)) as pool:
print(f"{myprefix} just before map", flush=True)
res_item_arr = pool.map(fetch_a_surf, items_to_fetch_list)
print(f"{myprefix} back from map {len(res_item_arr)=}", flush=True)
Expand All @@ -146,7 +182,8 @@ async def calc_surf_isec_inmem(
# xtgeo_surf = xtgeo.surface_from_file(byte_stream)

xtgeo_surf_arr.append(xtgeo_surf)
IN_MEM_SURF_CACHE.set(case_uuid, ensemble_name, items_to_fetch_list[idx].name, items_to_fetch_list[idx].attribute, items_to_fetch_list[idx].real, cache_entry=SurfCacheEntry(xtgeo_surf))
#IN_MEM_SURF_CACHE.set(case_uuid, ensemble_name, items_to_fetch_list[idx].name, items_to_fetch_list[idx].attribute, items_to_fetch_list[idx].real, cache_entry=SurfCacheEntry(xtgeo_surf))
await redis_surf_cache.set(case_uuid, ensemble_name, items_to_fetch_list[idx].name, items_to_fetch_list[idx].attribute, items_to_fetch_list[idx].real, cache_entry=SurfCacheEntry(xtgeo_surf))

intersections = []

Expand Down

0 comments on commit 58e53fd

Please sign in to comment.