Skip to content

Commit

Permalink
Started custom experiment
Browse files Browse the repository at this point in the history
  • Loading branch information
sigurdp committed Oct 13, 2023
1 parent c01bca4 commit 5f67e9e
Show file tree
Hide file tree
Showing 3 changed files with 151 additions and 1 deletion.
84 changes: 84 additions & 0 deletions backend/src/backend/experiments/calc_surf_isec_custom.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
import asyncio
import numpy as np
import logging
from typing import List
import multiprocessing

from src.services.sumo_access.surface_access import SurfaceAccess
from src.services.utils.authenticated_user import AuthenticatedUser
from src.backend.primary.routers.surface import schemas
from src.backend.utils.perf_metrics import PerfMetrics

LOGGER = logging.getLogger(__name__)


class AsyncQueue:
SLEEP: float = 0.01

def __init__(self, queue: multiprocessing.Queue):
self._queue = queue

async def get(self):
while True:
try:
return self._queue.get_nowait()
except multiprocessing.Empty:
await asyncio.sleep(self.SLEEP)

async def put(self, item):
while True:
try:
self._queue.put_nowait(item)
return None
except multiprocessing.Full:
await asyncio.sleep(self.SLEEP)



async def calc_surf_isec_custom(
perf_metrics: PerfMetrics,
authenticated_user: AuthenticatedUser,
case_uuid: str,
ensemble_name: str,
name: str,
attribute: str,
num_reals: int,
cutting_plane: schemas.CuttingPlane,
) -> List[schemas.SurfaceIntersectionData]:

myprefix = ">>>>>>>>>>>>>>>>> calc_surf_isec_custom():"
print(f"{myprefix} started with {num_reals=}")

perf_metrics.reset_lap_timer()

access = await SurfaceAccess.from_case_uuid(authenticated_user.get_sumo_access_token(), case_uuid, ensemble_name)
perf_metrics.record_lap("access")

fence_arr = np.array([cutting_plane.x_arr, cutting_plane.y_arr, np.zeros(len(cutting_plane.y_arr)), cutting_plane.length_arr]).T

reals = range(0, num_reals)

multi_queue = multiprocessing.Queue()
async_queue = AsyncQueue(multi_queue)

load_tasks = []
for real in reals:
task = asyncio.create_task(load_surf_bytes_task(async_queue, access, real, name, attribute))
load_tasks.append(task)

perf_metrics.record_lap("issue-tasks")

await asyncio.gather(*load_tasks)

perf_metrics.record_lap("execute-tasks")

print(f"{myprefix} finished")

return []


async def load_surf_bytes_task(queue, access: SurfaceAccess, real_num: int, name: str, attribute: str):
surf_bytes = await access.get_realization_surface_bytes_async(real_num=real_num, name=name, attribute=attribute)
await queue.put(surf_bytes)


3 changes: 2 additions & 1 deletion backend/src/backend/user_session/routers/surface/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from src.backend.experiments.calc_surf_isec_queue import calc_surf_isec_queue
from src.backend.experiments.calc_surf_isec_multiprocess import calc_surf_isec_multiprocess
from src.backend.experiments.calc_surf_isec_aiomultiproc import calc_surf_isec_aiomultiproc
from src.backend.experiments.calc_surf_isec_custom import calc_surf_isec_custom


LOGGER = logging.getLogger(__name__)
Expand Down Expand Up @@ -60,7 +61,7 @@ async def post_calc_surf_isec_experiments(
#intersections = await calc_surf_isec_fetch_first(perf_metrics, authenticated_user, case_uuid, ensemble_name, name, attribute, num_reals, cutting_plane)
#intersections = await calc_surf_isec_queue(perf_metrics, authenticated_user, case_uuid, ensemble_name, name, attribute, num_reals, num_workers, cutting_plane)
#intersections = await calc_surf_isec_multiprocess(perf_metrics, authenticated_user, case_uuid, ensemble_name, name, attribute, num_reals, cutting_plane)
intersections = await calc_surf_isec_aiomultiproc(authenticated_user, case_uuid, ensemble_name, name, attribute, num_reals, cutting_plane)
intersections = await calc_surf_isec_custom(perf_metrics, authenticated_user, case_uuid, ensemble_name, name, attribute, num_reals, cutting_plane)

LOGGER.debug(f"route calc_surf_isec_experiments - intersected {len(intersections)} surfaces in: {perf_metrics.to_string()}")

Expand Down
65 changes: 65 additions & 0 deletions backend/src/services/sumo_access/surface_access.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,71 @@ async def get_realization_surface_data_async(

return xtgeo_surf

async def get_realization_surface_bytes_async(
self, real_num: int, name: str, attribute: str, time_or_interval_str: Optional[str] = None
) -> Optional[bytes]:
"""
Get surface data for a realization surface
"""
timer = PerfTimer()
addr_str = self._make_addr_str(real_num, name, attribute, time_or_interval_str)

if time_or_interval_str is None:
time_filter = TimeFilter(TimeType.NONE)

else:
timestamp_arr = time_or_interval_str.split("/", 1)
if len(timestamp_arr) == 0 or len(timestamp_arr) > 2:
raise ValueError("time_or_interval_str must contain a single timestamp or interval")
if len(timestamp_arr) == 1:
time_filter = TimeFilter(
TimeType.TIMESTAMP,
start=timestamp_arr[0],
end=timestamp_arr[0],
exact=True,
)
else:
time_filter = TimeFilter(
TimeType.INTERVAL,
start=timestamp_arr[0],
end=timestamp_arr[1],
exact=True,
)

surface_collection: SurfaceCollection = self._case.surfaces.filter(
iteration=self._iteration_name,
aggregation=False,
realization=real_num,
name=name,
tagname=attribute,
time=time_filter,
)

surf_count = await surface_collection.length_async()
if surf_count == 0:
LOGGER.warning(f"No realization surface found in Sumo for {addr_str}")
return None
if surf_count > 1:
LOGGER.warning(f"Multiple ({surf_count}) surfaces found in Sumo for: {addr_str}. Returning first surface.")

sumo_surf: Surface = await surface_collection.getitem_async(0)
et_locate_ms = timer.lap_ms()

surf_bytes: bytes = await self._sumo_client.get_async(f"/objects('{sumo_surf.uuid}')/blob")
et_download_ms = timer.lap_ms()

size_mb = len(surf_bytes)/(1024*1024)

LOGGER.debug(
f"Got realization surface bytes from Sumo in: {timer.elapsed_ms()}ms ("
f"locate={et_locate_ms}ms, "
f"download={et_download_ms}ms, "
f"[{size_mb:.2f}MB] "
f"({addr_str})"
)

return surf_bytes

def get_statistical_surface_data(
self,
statistic_function: StatisticFunction,
Expand Down

0 comments on commit 5f67e9e

Please sign in to comment.