Skip to content

Commit

Permalink
Started impl experiment with aiomultiprocess, not in use
Browse files Browse the repository at this point in the history
  • Loading branch information
sigurdp committed Oct 12, 2023
1 parent 941d117 commit 2d7f8c8
Show file tree
Hide file tree
Showing 4 changed files with 135 additions and 127 deletions.
130 changes: 130 additions & 0 deletions backend/src/backend/experiments/calc_surf_isec_aiomultiproc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
import numpy as np
import logging
from typing import List
from aiomultiprocess import Pool

from dataclasses import dataclass

from src.services.sumo_access.surface_access import SurfaceAccess
from src.services.utils.authenticated_user import AuthenticatedUser
from src.backend.primary.routers.surface import schemas
from src.backend.utils.perf_metrics import PerfMetrics

LOGGER = logging.getLogger(__name__)


@dataclass
class SurfItem:
access_token: str
case_uuid: str
ensemble_name: str
name: str
attribute: str
real: int


@dataclass
class ResultItem:
perf_info: str
line: np.ndarray


global_access = None
global_fence_arr = None


def init_access_and_fence(access_token: str, case_uuid: str, ensemble_name: str, fence_arr: np.ndarray):
global global_access
global global_fence_arr
global_access = SurfaceAccess.from_case_uuid_sync(access_token, case_uuid, ensemble_name)
global_fence_arr = fence_arr


async def process_a_surf(item: SurfItem) -> ResultItem:
print(f">>>> process_a_surf {item.real=}", flush=True)
perf_metrics = PerfMetrics()

access = global_access

xtgeo_surf = await access.get_realization_surface_data_async(
real_num=item.real, name=item.name, attribute=item.attribute
)
if xtgeo_surf is None:
return None
perf_metrics.record_lap("fetch")

line = xtgeo_surf.get_randomline(global_fence_arr)
perf_metrics.record_lap("calc")

res_item = ResultItem(perf_info=perf_metrics.to_string(), line=line)

print(f">>>> process_a_surf {item.real=} done", flush=True)

return res_item


async def calc_surf_isec_aiomultiproc(
authenticated_user: AuthenticatedUser,
case_uuid: str,
ensemble_name: str,
name: str,
attribute: str,
num_reals: int,
cutting_plane: schemas.CuttingPlane,
) -> List[schemas.SurfaceIntersectionData]:
myprefix = ">>>>>>>>>>>>>>>>> calc_surf_isec_aiomultiproc():"
print(f"{myprefix} started", flush=True)

fence_arr = np.array(
[cutting_plane.x_arr, cutting_plane.y_arr, np.zeros(len(cutting_plane.y_arr)), cutting_plane.length_arr]
).T

access_token = authenticated_user.get_sumo_access_token()

item_list = []
for i in range(num_reals):
item_list.append(
SurfItem(
access_token=access_token,
case_uuid=case_uuid,
ensemble_name=ensemble_name,
name=name,
attribute=attribute,
real=i,
)
)

print(f"{myprefix} built item_list {len(item_list)=}")

# See
# https://aiomultiprocess.omnilib.dev/en/latest/guide.html

processes = 4
queuecount = None
childconcurrency = 4

async with Pool(
queuecount=queuecount,
processes=processes,
childconcurrency=childconcurrency,
initializer=init_access_and_fence,
initargs=[access_token, case_uuid, ensemble_name, fence_arr],
) as pool:
print(f"{myprefix} pool info {pool.process_count=}")
print(f"{myprefix} pool info {pool.queue_count=}")
print(f"{myprefix} pool info {pool.childconcurrency=}")

intersections = []
async for res_item in pool.map(process_a_surf, item_list):
if res_item is not None and res_item.line is not None:
isecdata = schemas.SurfaceIntersectionData(
name="someName", hlen_arr=res_item.line[:, 0].tolist(), z_arr=res_item.line[:, 1].tolist()
)
intersections.append(isecdata)
print(f"{myprefix} got isec {len(intersections)} perf_info={res_item.perf_info}", flush=True)
else:
print(f"{myprefix} res_item is None", flush=True)

print(f"{myprefix} finished")

return intersections
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import asyncio
import signal
import numpy as np
import logging
from typing import List, Union, Optional
from typing import List
import multiprocessing

# from concurrent.futures import ProcessPoolExecutor
Expand Down Expand Up @@ -116,7 +115,8 @@ async def calc_surf_isec_multiprocess(
# See: https://superfastpython.com/multiprocessing-pool-context/
# Use None to get the default context
#context = multiprocessing.get_context("spawn")
context = multiprocessing.get_context("forkserver")
#context = multiprocessing.get_context("forkserver")
context = multiprocessing.get_context(None)

with context.Pool(
initializer=init_access_and_fence,
Expand Down
124 changes: 0 additions & 124 deletions backend/src/backend/primary/routers/surface/router.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,16 @@
import logging
from typing import List, Union, Optional
from dataclasses import dataclass

import asyncio
import asyncio
from aiomultiprocess import Pool
import httpx
import numpy as np
import xtgeo
import json
from pydantic import BaseModel

from fastapi import APIRouter, Depends, HTTPException, Query, Body, Response, Request
from src.backend.primary.user_session_proxy import proxy_to_user_session
from src.backend.primary.user_session_proxy import get_user_session_base_url

from src.backend.user_session.routers.grid.router import get_grid_geometry
from src.backend.user_session.routers.grid.router import get_grid_parameter


from src.services.sumo_access.surface_access import SurfaceAccess
from src.services.smda_access.stratigraphy_access import StratigraphyAccess
from src.services.smda_access.stratigraphy_utils import sort_stratigraphic_names_by_hierarchy
Expand Down Expand Up @@ -320,125 +312,9 @@ async def get_surface_intersections(

return intersections

# intersections = await _calc_surf_intersections_aiomulti(
# authenticated_user,
# case_uuid,
# ensemble_name,
# name,
# attribute,
# num_reals,
# num_workers,
# cutting_plane,
# )


return intersections


# --------------------------------------------------------------------------------------

@dataclass
class SurfItem:
access_token: str
case_uuid: str
ensemble_name: str
name: str
attribute: str
real: int
#fence_arr: np.ndarray

@dataclass
class ResultItem:
perf_info: str
line: np.ndarray


global_access = None
global_fence_arr = None


def init_access_and_fence(access_token: str, case_uuid: str, ensemble_name: str, fence_arr: np.ndarray):
global global_access
global global_fence_arr
global_access = asyncio.run(SurfaceAccess.from_case_uuid(access_token, case_uuid, ensemble_name))
global_fence_arr = fence_arr


async def process_a_surf(item: SurfItem) -> ResultItem:
print(f"fetch_a_surf {item.real=}")
perf_metrics = PerfMetrics()

access = global_access

xtgeo_surf = await access.get_realization_surface_data_async(real_num=item.real, name=item.name, attribute=item.attribute)
if xtgeo_surf is None:
return None
perf_metrics.record_lap("fetch")

line = xtgeo_surf.get_randomline(global_fence_arr)
perf_metrics.record_lap("calc")

res_item = ResultItem(perf_info=perf_metrics.to_string(), line=line)
return res_item


async def _calc_surf_intersections_aiomulti(
authenticated_user: AuthenticatedUser,
case_uuid: str,
ensemble_name: str,
name: str,
attribute: str,
num_reals: int,
num_workers: int,
cutting_plane: schemas.CuttingPlane,
) -> List[schemas.SurfaceIntersectionData]:

myprefix = ">>>>>>>>>>>>>>>>> _calc_surf_intersections_aiomulti():"
print(f"{myprefix} started")

fence_arr = np.array(
[cutting_plane.x_arr, cutting_plane.y_arr, np.zeros(len(cutting_plane.y_arr)), cutting_plane.length_arr]
).T

access_token=authenticated_user.get_sumo_access_token()

item_list = []
for i in range(num_reals):
item_list.append(SurfItem(
access_token=access_token,
case_uuid=case_uuid,
ensemble_name=ensemble_name,
name=name,
attribute=attribute,
real=i,
))

print(f"{myprefix} built item_list {len(item_list)=}")

# See
# https://aiomultiprocess.omnilib.dev/en/latest/guide.html

processes = 4
queuecount = None
childconcurrency = 4

async with Pool(queuecount=queuecount, processes=processes, childconcurrency=childconcurrency, initializer=init_access_and_fence, initargs=[access_token, case_uuid, ensemble_name, fence_arr]) as pool:
print(f"{myprefix} pool info {pool.process_count=}")
print(f"{myprefix} pool info {pool.queue_count=}")
print(f"{myprefix} pool info {pool.childconcurrency=}")

intersections = []
async for res_item in pool.map(process_a_surf, item_list):
if res_item is not None:
isecdata = schemas.SurfaceIntersectionData(name="someName", hlen_arr=res_item.line[:, 0].tolist(), z_arr=res_item.line[:, 1].tolist())
intersections.append(isecdata)
print(f"{myprefix} got isec {len(intersections)} perf_info={res_item.perf_info}")

print(f"{myprefix} finished")

return intersections


# --------------------------------------------------------------------------------------
async def execute_usersession_job_calc_surf_isec_experiments(
fastApiRequest: Request,
Expand Down
2 changes: 2 additions & 0 deletions backend/src/backend/user_session/routers/surface/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from src.backend.experiments.calc_surf_isec_fetch_first import calc_surf_isec_fetch_first
from src.backend.experiments.calc_surf_isec_queue import calc_surf_isec_queue
from src.backend.experiments.calc_surf_isec_multiprocess import calc_surf_isec_multiprocess
from src.backend.experiments.calc_surf_isec_aiomultiproc import calc_surf_isec_aiomultiproc


LOGGER = logging.getLogger(__name__)
Expand Down Expand Up @@ -59,6 +60,7 @@ async def post_calc_surf_isec_experiments(
#intersections = await calc_surf_isec_fetch_first(perf_metrics, authenticated_user, case_uuid, ensemble_name, name, attribute, num_reals, cutting_plane)
#intersections = await calc_surf_isec_queue(perf_metrics, authenticated_user, case_uuid, ensemble_name, name, attribute, num_reals, num_workers, cutting_plane)
intersections = await calc_surf_isec_multiprocess(perf_metrics, authenticated_user, case_uuid, ensemble_name, name, attribute, num_reals, cutting_plane)
#intersections = await calc_surf_isec_aiomultiproc(authenticated_user, case_uuid, ensemble_name, name, attribute, num_reals, cutting_plane)

LOGGER.debug(f"route calc_surf_isec_experiments - intersected {len(intersections)} surfaces in: {perf_metrics.to_string()}")

Expand Down

0 comments on commit 2d7f8c8

Please sign in to comment.