Skip to content

Commit

Permalink
First experiment with aiomultiprocess
Browse files Browse the repository at this point in the history
  • Loading branch information
sigurdp committed Oct 11, 2023
1 parent c37c022 commit d98f940
Show file tree
Hide file tree
Showing 5 changed files with 206 additions and 5 deletions.
13 changes: 12 additions & 1 deletion backend/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions backend/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ psutil = "^5.9.5"
vtk = "^9.2.6"
fmu-sumo = "^0.5.4"
sumo-wrapper-python = "^0.4.1"
aiomultiprocess = "^0.9.0"


[tool.poetry.group.dev.dependencies]
Expand Down
174 changes: 170 additions & 4 deletions backend/src/backend/primary/routers/surface/router.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,24 @@
import logging
from typing import List, Union, Optional
from dataclasses import dataclass

import asyncio
import asyncio
from aiomultiprocess import Pool
import httpx
import numpy as np
import xtgeo
import json
from pydantic import BaseModel

from fastapi import APIRouter, Depends, HTTPException, Query, Body, Response, Request
from src.backend.primary.user_session_proxy import proxy_to_user_session
from src.backend.primary.user_session_proxy import get_user_session_base_url

from src.backend.user_session.routers.grid.router import get_grid_geometry
from src.backend.user_session.routers.grid.router import get_grid_parameter


from src.services.sumo_access.surface_access import SurfaceAccess
from src.services.smda_access.stratigraphy_access import StratigraphyAccess
from src.services.smda_access.stratigraphy_utils import sort_stratigraphic_names_by_hierarchy
Expand Down Expand Up @@ -215,6 +223,46 @@ async def get_property_surface_resampled_to_statistical_static_surface(
return surf_data_response


@router.get("/large_download")
async def get_large_download(
request: Request,
response: Response,
authenticated_user: AuthenticatedUser = Depends(AuthHelper.get_authenticated_user),
real: int = Query(),
dummy: int = Query(1)
) -> str:

timer = PerfTimer()

print(f"{real=}")
print(f"{dummy=}")

gridgeo = await get_grid_geometry(
authenticated_user=authenticated_user,
case_uuid="c619f32d-3ada-4e5e-8d3c-330f940e88f8",
grid_name="Geogrid",
ensemble_name="iter-0",
realization=real)

print(f"{type(gridgeo)=}")

prop = await get_grid_parameter(
authenticated_user=authenticated_user,
case_uuid="c619f32d-3ada-4e5e-8d3c-330f940e88f8",
grid_name="Geogrid",
ensemble_name="iter-0",
realization=real,
parameter_name="FACIES")

print(f"{type(prop)=}")

print(f"Download took: {timer.elapsed_s():2f}s")

return "large_download"




@router.post("/surface_intersections/")
async def get_surface_intersections(
request: Request,
Expand All @@ -229,7 +277,7 @@ async def get_surface_intersections(
cutting_plane: schemas.CuttingPlane = Body(embed=True),
) -> List[schemas.SurfaceIntersectionData]:


perf_metrics = PerfMetrics(response)

# intersections = await execute_usersession_job_calc_surf_intersections_fetch_first(
# request,
Expand All @@ -245,8 +293,21 @@ async def get_surface_intersections(
# return intersections


intersections = await execute_usersession_job_calc_surf_intersections_queue(
request,
# intersections = await execute_usersession_job_calc_surf_intersections_queue(
# request,
# authenticated_user,
# case_uuid,
# ensemble_name,
# name,
# attribute,
# num_reals,
# num_workers,
# cutting_plane,
# )

# return intersections

intersections = await _calc_surf_intersections_aiomulti(
authenticated_user,
case_uuid,
ensemble_name,
Expand All @@ -257,6 +318,112 @@ async def get_surface_intersections(
cutting_plane,
)

LOGGER.debug(f"Intersected {len(intersections)} surfaces in: {perf_metrics.to_string()}")

return intersections


# --------------------------------------------------------------------------------------

@dataclass
class SurfItem:
access_token: str
case_uuid: str
ensemble_name: str
name: str
attribute: str
real: int
#fence_arr: np.ndarray

@dataclass
class ResultItem:
perf_info: str
line: np.ndarray


global_access = None
global_fence_arr = None


def init_access_and_fence(access_token: str, case_uuid: str, ensemble_name: str, fence_arr: np.ndarray):
global global_access
global global_fence_arr
global_access = asyncio.run(SurfaceAccess.from_case_uuid(access_token, case_uuid, ensemble_name))
global_fence_arr = fence_arr


async def process_a_surf(item: SurfItem) -> ResultItem:
print(f"fetch_a_surf {item.real=}")
perf_metrics = PerfMetrics()

access = global_access

xtgeo_surf = await access.get_realization_surface_data(real_num=item.real, name=item.name, attribute=item.attribute)
if xtgeo_surf is None:
return None
perf_metrics.record_lap("fetch")

line = xtgeo_surf.get_randomline(global_fence_arr)
perf_metrics.record_lap("calc")

res_item = ResultItem(perf_info=perf_metrics.to_string(), line=line)
return res_item


async def _calc_surf_intersections_aiomulti(
authenticated_user: AuthenticatedUser,
case_uuid: str,
ensemble_name: str,
name: str,
attribute: str,
num_reals: int,
num_workers: int,
cutting_plane: schemas.CuttingPlane,
) -> List[schemas.SurfaceIntersectionData]:

myprefix = ">>>>>>>>>>>>>>>>> _calc_surf_intersections_aiomulti():"
print(f"{myprefix} started")

fence_arr = np.array(
[cutting_plane.x_arr, cutting_plane.y_arr, np.zeros(len(cutting_plane.y_arr)), cutting_plane.length_arr]
).T

access_token=authenticated_user.get_sumo_access_token()

item_list = []
for i in range(num_reals):
item_list.append(SurfItem(
access_token=access_token,
case_uuid=case_uuid,
ensemble_name=ensemble_name,
name=name,
attribute=attribute,
real=i,
))

print(f"{myprefix} built item_list {len(item_list)=}")

# See
# https://aiomultiprocess.omnilib.dev/en/latest/guide.html

processes = 4
queuecount = None
childconcurrency = 4

async with Pool(queuecount=queuecount, processes=processes, childconcurrency=childconcurrency, initializer=init_access_and_fence, initargs=[access_token, case_uuid, ensemble_name, fence_arr]) as pool:
print(f"{myprefix} pool info {pool.process_count=}")
print(f"{myprefix} pool info {pool.queue_count=}")
print(f"{myprefix} pool info {pool.childconcurrency=}")

intersections = []
async for res_item in pool.map(process_a_surf, item_list):
if res_item is not None:
isecdata = schemas.SurfaceIntersectionData(name="someName", hlen_arr=res_item.line[:, 0].tolist(), z_arr=res_item.line[:, 1].tolist())
intersections.append(isecdata)
print(f"{myprefix} got isec {len(intersections)} perf_info={res_item.perf_info}")

print(f"{myprefix} finished")

return intersections


Expand All @@ -270,7 +437,6 @@ async def execute_usersession_job_calc_surf_intersections_queue(
attribute: str,
num_reals: int,
num_workers: int,

cutting_plane: schemas.CuttingPlane,
) -> List[schemas.SurfaceIntersectionData]:

Expand Down
2 changes: 2 additions & 0 deletions backend/src/backend/user_session/routers/surface/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,9 @@ async def post_calc_surf_intersections_queue(
[cutting_plane.x_arr, cutting_plane.y_arr, np.zeros(len(cutting_plane.y_arr)), cutting_plane.length_arr]
).T

LOGGER.debug("====================================")
LOGGER.debug(f"{num_reals=} {num_workers=}")
LOGGER.debug("====================================")

reals = range(0, num_reals)

Expand Down
21 changes: 21 additions & 0 deletions backend/src/services/sumo_access/grid_access.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,23 +31,34 @@ async def static_parameter_names(self, grid_name: str) -> List[str]:

async def get_grid_geometry(self, grid_name: str, realization: int) -> xtgeo.Grid:
timer = PerfTimer()

geometry_blob_id = await get_grid_geometry_blob_id(
self._sumo_client,
self._case_uuid,
self._iteration_name,
realization,
grid_name,
)
et_blob_id_ms = timer.lap_ms()

stream = self._sumo_client.get(f"/objects('{geometry_blob_id}')/blob")
size_mb = len(stream)/(1024*1024)
et_fetch_ms = timer.lap_ms()

print(f"{grid_name} {realization} {geometry_blob_id} in {round(timer.lap_s(),2)}s")

grid_geom = xtgeo.grid_from_file(BytesIO(stream))
et_load_ms = timer.lap_ms()

print(f"!!!!!!!! get_grid_geometry() got {size_mb:.2f}MB {grid_name=} {realization=} in {round(timer.elapsed_s(),2)}s (blob_id={et_blob_id_ms}ms, fetch={et_fetch_ms}ms, load={et_load_ms}ms)")

return grid_geom

async def get_grid_parameter(
self, grid_name: str, grid_parameter_name: str, realization: int
) -> xtgeo.GridProperty:
timer = PerfTimer()

parameter_blob_id = await get_grid_parameter_blob_id(
self._sumo_client,
self._case_uuid,
Expand All @@ -56,9 +67,19 @@ async def get_grid_parameter(
grid_name,
grid_parameter_name,
)
et_blob_id_ms = timer.lap_ms()

stream = self._sumo_client.get(f"/objects('{parameter_blob_id}')/blob")
size_mb = len(stream)/(1024*1024)
et_fetch_ms = timer.lap_ms()

print(f"{grid_name} {grid_parameter_name} {realization} {parameter_blob_id} in {round(timer.lap_s(),2)}s")

grid_param = xtgeo.gridproperty_from_file(BytesIO(stream))
et_load_ms = timer.lap_ms()

print(f"!!!!!!!! get_grid_parameter() got {size_mb:.2f}MB {grid_name=} {realization=} {grid_parameter_name=} in {round(timer.elapsed_s(),2)}s (blob_id={et_blob_id_ms}ms, fetch={et_fetch_ms}ms, load={et_load_ms}ms)")

return grid_param

async def grids_have_equal_nxnynz(self, grid_name: str) -> bool:
Expand Down

0 comments on commit d98f940

Please sign in to comment.