diff --git a/backend/poetry.lock b/backend/poetry.lock index 289dcd49b..7338adc5f 100644 --- a/backend/poetry.lock +++ b/backend/poetry.lock @@ -1,5 +1,16 @@ # This file is automatically @generated by Poetry 1.6.1 and should not be changed by hand. +[[package]] +name = "aiomultiprocess" +version = "0.9.0" +description = "AsyncIO version of the standard multiprocessing module" +optional = false +python-versions = ">=3.6" +files = [ + {file = "aiomultiprocess-0.9.0-py3-none-any.whl", hash = "sha256:3036c4c881cfbc63674686e036097f22309017c6bf96b04722a542ac9cac7423"}, + {file = "aiomultiprocess-0.9.0.tar.gz", hash = "sha256:07e7d5657697678d9d2825d4732dfd7655139762dee665167380797c02c68848"}, +] + [[package]] name = "annotated-types" version = "0.5.0" @@ -3047,4 +3058,4 @@ tests = ["hypothesis", "pytest", "pytest-benchmark", "pytest-mock", "pytest-snap [metadata] lock-version = "2.0" python-versions = "^3.11" -content-hash = "3a5231dade6fe172008bdfab1842eaa86f7b3d71249b06af756fa979948033ee" +content-hash = "2142dadea78c2cf2df7b386cb95fe6dcb0affad517252e2eb57e70c1900e271e" diff --git a/backend/pyproject.toml b/backend/pyproject.toml index 0690fc6c2..bb84aa7be 100644 --- a/backend/pyproject.toml +++ b/backend/pyproject.toml @@ -25,6 +25,7 @@ psutil = "^5.9.5" vtk = "^9.2.6" fmu-sumo = "^0.5.4" sumo-wrapper-python = "^0.4.1" +aiomultiprocess = "^0.9.0" [tool.poetry.group.dev.dependencies] diff --git a/backend/src/backend/primary/routers/surface/router.py b/backend/src/backend/primary/routers/surface/router.py index 05c6e597f..2e4c417ea 100644 --- a/backend/src/backend/primary/routers/surface/router.py +++ b/backend/src/backend/primary/routers/surface/router.py @@ -1,16 +1,24 @@ import logging from typing import List, Union, Optional +from dataclasses import dataclass import asyncio +import asyncio +from aiomultiprocess import Pool import httpx import numpy as np import xtgeo import json +from pydantic import BaseModel from fastapi import APIRouter, Depends, HTTPException, Query, Body, Response, Request from src.backend.primary.user_session_proxy import proxy_to_user_session from src.backend.primary.user_session_proxy import get_user_session_base_url +from src.backend.user_session.routers.grid.router import get_grid_geometry +from src.backend.user_session.routers.grid.router import get_grid_parameter + + from src.services.sumo_access.surface_access import SurfaceAccess from src.services.smda_access.stratigraphy_access import StratigraphyAccess from src.services.smda_access.stratigraphy_utils import sort_stratigraphic_names_by_hierarchy @@ -215,6 +223,46 @@ async def get_property_surface_resampled_to_statistical_static_surface( return surf_data_response +@router.get("/large_download") +async def get_large_download( + request: Request, + response: Response, + authenticated_user: AuthenticatedUser = Depends(AuthHelper.get_authenticated_user), + real: int = Query(), + dummy: int = Query(1) +) -> str: + + timer = PerfTimer() + + print(f"{real=}") + print(f"{dummy=}") + + gridgeo = await get_grid_geometry( + authenticated_user=authenticated_user, + case_uuid="c619f32d-3ada-4e5e-8d3c-330f940e88f8", + grid_name="Geogrid", + ensemble_name="iter-0", + realization=real) + + print(f"{type(gridgeo)=}") + + prop = await get_grid_parameter( + authenticated_user=authenticated_user, + case_uuid="c619f32d-3ada-4e5e-8d3c-330f940e88f8", + grid_name="Geogrid", + ensemble_name="iter-0", + realization=real, + parameter_name="FACIES") + + print(f"{type(prop)=}") + + print(f"Download took: {timer.elapsed_s():2f}s") + + return "large_download" + + + + @router.post("/surface_intersections/") async def get_surface_intersections( request: Request, @@ -229,7 +277,7 @@ async def get_surface_intersections( cutting_plane: schemas.CuttingPlane = Body(embed=True), ) -> List[schemas.SurfaceIntersectionData]: - + perf_metrics = PerfMetrics(response) # intersections = await execute_usersession_job_calc_surf_intersections_fetch_first( # request, @@ -245,8 +293,21 @@ async def get_surface_intersections( # return intersections - intersections = await execute_usersession_job_calc_surf_intersections_queue( - request, + # intersections = await execute_usersession_job_calc_surf_intersections_queue( + # request, + # authenticated_user, + # case_uuid, + # ensemble_name, + # name, + # attribute, + # num_reals, + # num_workers, + # cutting_plane, + # ) + + # return intersections + + intersections = await _calc_surf_intersections_aiomulti( authenticated_user, case_uuid, ensemble_name, @@ -257,6 +318,112 @@ async def get_surface_intersections( cutting_plane, ) + LOGGER.debug(f"Intersected {len(intersections)} surfaces in: {perf_metrics.to_string()}") + + return intersections + + +# -------------------------------------------------------------------------------------- + +@dataclass +class SurfItem: + access_token: str + case_uuid: str + ensemble_name: str + name: str + attribute: str + real: int + #fence_arr: np.ndarray + +@dataclass +class ResultItem: + perf_info: str + line: np.ndarray + + +global_access = None +global_fence_arr = None + + +def init_access_and_fence(access_token: str, case_uuid: str, ensemble_name: str, fence_arr: np.ndarray): + global global_access + global global_fence_arr + global_access = asyncio.run(SurfaceAccess.from_case_uuid(access_token, case_uuid, ensemble_name)) + global_fence_arr = fence_arr + + +async def process_a_surf(item: SurfItem) -> ResultItem: + print(f"fetch_a_surf {item.real=}") + perf_metrics = PerfMetrics() + + access = global_access + + xtgeo_surf = await access.get_realization_surface_data(real_num=item.real, name=item.name, attribute=item.attribute) + if xtgeo_surf is None: + return None + perf_metrics.record_lap("fetch") + + line = xtgeo_surf.get_randomline(global_fence_arr) + perf_metrics.record_lap("calc") + + res_item = ResultItem(perf_info=perf_metrics.to_string(), line=line) + return res_item + + +async def _calc_surf_intersections_aiomulti( + authenticated_user: AuthenticatedUser, + case_uuid: str, + ensemble_name: str, + name: str, + attribute: str, + num_reals: int, + num_workers: int, + cutting_plane: schemas.CuttingPlane, +) -> List[schemas.SurfaceIntersectionData]: + + myprefix = ">>>>>>>>>>>>>>>>> _calc_surf_intersections_aiomulti():" + print(f"{myprefix} started") + + fence_arr = np.array( + [cutting_plane.x_arr, cutting_plane.y_arr, np.zeros(len(cutting_plane.y_arr)), cutting_plane.length_arr] + ).T + + access_token=authenticated_user.get_sumo_access_token() + + item_list = [] + for i in range(num_reals): + item_list.append(SurfItem( + access_token=access_token, + case_uuid=case_uuid, + ensemble_name=ensemble_name, + name=name, + attribute=attribute, + real=i, + )) + + print(f"{myprefix} built item_list {len(item_list)=}") + + # See + # https://aiomultiprocess.omnilib.dev/en/latest/guide.html + + processes = 4 + queuecount = None + childconcurrency = 4 + + async with Pool(queuecount=queuecount, processes=processes, childconcurrency=childconcurrency, initializer=init_access_and_fence, initargs=[access_token, case_uuid, ensemble_name, fence_arr]) as pool: + print(f"{myprefix} pool info {pool.process_count=}") + print(f"{myprefix} pool info {pool.queue_count=}") + print(f"{myprefix} pool info {pool.childconcurrency=}") + + intersections = [] + async for res_item in pool.map(process_a_surf, item_list): + if res_item is not None: + isecdata = schemas.SurfaceIntersectionData(name="someName", hlen_arr=res_item.line[:, 0].tolist(), z_arr=res_item.line[:, 1].tolist()) + intersections.append(isecdata) + print(f"{myprefix} got isec {len(intersections)} perf_info={res_item.perf_info}") + + print(f"{myprefix} finished") + return intersections @@ -270,7 +437,6 @@ async def execute_usersession_job_calc_surf_intersections_queue( attribute: str, num_reals: int, num_workers: int, - cutting_plane: schemas.CuttingPlane, ) -> List[schemas.SurfaceIntersectionData]: diff --git a/backend/src/backend/user_session/routers/surface/router.py b/backend/src/backend/user_session/routers/surface/router.py index 9f352eae5..432f2b5be 100644 --- a/backend/src/backend/user_session/routers/surface/router.py +++ b/backend/src/backend/user_session/routers/surface/router.py @@ -125,7 +125,9 @@ async def post_calc_surf_intersections_queue( [cutting_plane.x_arr, cutting_plane.y_arr, np.zeros(len(cutting_plane.y_arr)), cutting_plane.length_arr] ).T + LOGGER.debug("====================================") LOGGER.debug(f"{num_reals=} {num_workers=}") + LOGGER.debug("====================================") reals = range(0, num_reals) diff --git a/backend/src/services/sumo_access/grid_access.py b/backend/src/services/sumo_access/grid_access.py index 3585122b2..a89f69d93 100644 --- a/backend/src/services/sumo_access/grid_access.py +++ b/backend/src/services/sumo_access/grid_access.py @@ -31,6 +31,7 @@ async def static_parameter_names(self, grid_name: str) -> List[str]: async def get_grid_geometry(self, grid_name: str, realization: int) -> xtgeo.Grid: timer = PerfTimer() + geometry_blob_id = await get_grid_geometry_blob_id( self._sumo_client, self._case_uuid, @@ -38,9 +39,18 @@ async def get_grid_geometry(self, grid_name: str, realization: int) -> xtgeo.Gri realization, grid_name, ) + et_blob_id_ms = timer.lap_ms() + stream = self._sumo_client.get(f"/objects('{geometry_blob_id}')/blob") + size_mb = len(stream)/(1024*1024) + et_fetch_ms = timer.lap_ms() + print(f"{grid_name} {realization} {geometry_blob_id} in {round(timer.lap_s(),2)}s") + grid_geom = xtgeo.grid_from_file(BytesIO(stream)) + et_load_ms = timer.lap_ms() + + print(f"!!!!!!!! get_grid_geometry() got {size_mb:.2f}MB {grid_name=} {realization=} in {round(timer.elapsed_s(),2)}s (blob_id={et_blob_id_ms}ms, fetch={et_fetch_ms}ms, load={et_load_ms}ms)") return grid_geom @@ -48,6 +58,7 @@ async def get_grid_parameter( self, grid_name: str, grid_parameter_name: str, realization: int ) -> xtgeo.GridProperty: timer = PerfTimer() + parameter_blob_id = await get_grid_parameter_blob_id( self._sumo_client, self._case_uuid, @@ -56,9 +67,19 @@ async def get_grid_parameter( grid_name, grid_parameter_name, ) + et_blob_id_ms = timer.lap_ms() + stream = self._sumo_client.get(f"/objects('{parameter_blob_id}')/blob") + size_mb = len(stream)/(1024*1024) + et_fetch_ms = timer.lap_ms() + print(f"{grid_name} {grid_parameter_name} {realization} {parameter_blob_id} in {round(timer.lap_s(),2)}s") + grid_param = xtgeo.gridproperty_from_file(BytesIO(stream)) + et_load_ms = timer.lap_ms() + + print(f"!!!!!!!! get_grid_parameter() got {size_mb:.2f}MB {grid_name=} {realization=} {grid_parameter_name=} in {round(timer.elapsed_s(),2)}s (blob_id={et_blob_id_ms}ms, fetch={et_fetch_ms}ms, load={et_load_ms}ms)") + return grid_param async def grids_have_equal_nxnynz(self, grid_name: str) -> bool: