Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
sigurdp committed Oct 21, 2023
1 parent 6c35dce commit 3018c32
Show file tree
Hide file tree
Showing 3 changed files with 160 additions and 3 deletions.
152 changes: 152 additions & 0 deletions backend/src/backend/experiments/calc_surf_isec_many_multiprocess.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
import os
import signal
import numpy as np
import logging
from typing import List
import multiprocessing

# from concurrent.futures import ProcessPoolExecutor
from dataclasses import dataclass

from src.services.sumo_access.surface_access import SurfaceAccess
from src.services.utils.authenticated_user import AuthenticatedUser
from src.backend.primary.routers.surface import schemas
from src.backend.utils.perf_metrics import PerfMetrics

LOGGER = logging.getLogger(__name__)


@dataclass
class SurfItem:
# access_token: str
case_uuid: str
ensemble_name: str
name: str
attribute: str
real: int
# fence_arr: np.ndarray


@dataclass
class ResultItem:
perf_info: str
line: np.ndarray


global_access = None
global_fence_arr = None


def init_access_and_fence(access_token: str, case_uuid: str, ensemble_name: str, fence_arr: np.ndarray):
# !!!!!!!!!!!!!
# See: https://github.com/tiangolo/fastapi/issues/1487#issuecomment-1157066306
signal.set_wakeup_fd(-1)
signal.signal(signal.SIGTERM, signal.SIG_DFL)
signal.signal(signal.SIGINT, signal.SIG_DFL)

global global_access
global global_fence_arr
global_access = SurfaceAccess.from_case_uuid_sync(access_token, case_uuid, ensemble_name)
global_fence_arr = fence_arr


def process_a_surf(item: SurfItem) -> ResultItem:
print(f">>>> process_a_surf {item.real=}", flush=True)
perf_metrics = PerfMetrics()

access = global_access
# access = await SurfaceAccess.from_case_uuid(item.access_token, item.case_uuid, item.ensemble_name)
perf_metrics.record_lap("access")

xtgeo_surf = access.get_realization_surface_data_sync(real_num=item.real, name=item.name, attribute=item.attribute)
if xtgeo_surf is None:
return None
perf_metrics.record_lap("fetch")

# line = xtgeo_surf.get_randomline(item.fence_arr)

num_fences = 800
line_list = []
for i in range(num_fences):
line = xtgeo_surf.get_randomline(global_fence_arr)
line_list.append(line)
perf_metrics.record_lap(f"calc-{num_fences}")

res_item = ResultItem(perf_info=perf_metrics.to_string(), line=line)

print(f">>>> process_a_surf {num_fences=} {item.real=} done", flush=True)

return res_item


async def calc_surf_isec_many_multiprocess(
perf_metrics: PerfMetrics,
authenticated_user: AuthenticatedUser,
case_uuid: str,
ensemble_name: str,
name: str,
attribute: str,
num_reals: int,
cutting_plane: schemas.CuttingPlane,
) -> List[schemas.SurfaceIntersectionData]:
myprefix = ">>>>>>>>>>>>>>>>> calc_surf_isec_many_multiprocess():"
print(f"{myprefix} started", flush=True)

fence_arr = np.array(
[cutting_plane.x_arr, cutting_plane.y_arr, np.zeros(len(cutting_plane.y_arr)), cutting_plane.length_arr]
).T

access_token = authenticated_user.get_sumo_access_token()

item_list = []
for i in range(num_reals):
item_list.append(
SurfItem(
# access_token=access_token,
case_uuid=case_uuid,
ensemble_name=ensemble_name,
name=name,
attribute=attribute,
real=i,
# fence_arr=fence_arr
)
)

print(f"{myprefix} built item_list {len(item_list)=}", flush=True)

intersections = []

# Experiment with switching to spawn
# Note that for multiprocess the default is fork, which is faster, but has issues with shutting down uvicorn
# See: https://superfastpython.com/multiprocessing-pool-context/
# Use None to get the default context
#context = multiprocessing.get_context("spawn")
#context = multiprocessing.get_context("forkserver")
#context = multiprocessing.get_context(None)

#with context.Pool(initializer=init_access_and_fence,initargs=(access_token, case_uuid, ensemble_name, fence_arr),) as pool:

# Note that the default number of processes is os.cpu_count()
# Try to use twice as many processes as there are cores
# Unclear if this has a positive effect or not??
#processes = 2*os.cpu_count()
processes = None
print(f"{myprefix} trying to use {processes=} ({os.cpu_count()=})", flush=True)

with multiprocessing.Pool(processes=processes, initializer=init_access_and_fence, initargs=(access_token, case_uuid, ensemble_name, fence_arr)) as pool:
res_item_arr = pool.map(process_a_surf, item_list)
print(f"{myprefix} back from map {len(res_item_arr)=}", flush=True)

for res_item in res_item_arr:
if res_item is not None and res_item.line is not None:
isecdata = schemas.SurfaceIntersectionData(
name="someName", hlen_arr=res_item.line[:, 0].tolist(), z_arr=res_item.line[:, 1].tolist()
)
intersections.append(isecdata)
print(f"{myprefix} got isec {len(intersections)} perf_info={res_item.perf_info}", flush=True)
else:
print(f"{myprefix} res_item is None", flush=True)

print(f"{myprefix} finished", flush=True)

return intersections
6 changes: 4 additions & 2 deletions backend/src/backend/primary/routers/surface/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ async def execute_usersession_job_calc_surf_isec_experiments(
cutting_plane: schemas.CuttingPlane,
) -> List[schemas.SurfaceIntersectionData]:

print(">>>>>>>>>>>>>>>>> execute_usersession_job_calc_surf_isec_experiments() started")
LOGGER.debug(">>>>>>>>>>>>>>>>> execute_usersession_job_calc_surf_isec_experiments() started")

query_params = {
"case_uuid": case_uuid,
Expand All @@ -403,9 +403,11 @@ async def execute_usersession_job_calc_surf_isec_experiments(
timeout=600,
)

LOGGER.debug(">>>>>>>>>>>>>>>>> execute_usersession_job_calc_surf_isec_experiments() issued request, waiting...")

job_resp = await client.send(job_req)

print(">>>>>>>>>>>>>>>>> execute_usersession_job_calc_surf_isec_experiments() finished")
LOGGER.debug(">>>>>>>>>>>>>>>>> execute_usersession_job_calc_surf_isec_experiments() finished")

return job_resp.json()

Expand Down
5 changes: 4 additions & 1 deletion backend/src/backend/user_session/routers/surface/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from src.backend.experiments.calc_surf_isec_custom import calc_surf_isec_custom
from src.backend.experiments.calc_surf_isec_inmem import calc_surf_isec_inmem

from src.backend.experiments.calc_surf_isec_many_multiprocess import calc_surf_isec_many_multiprocess

LOGGER = logging.getLogger(__name__)

Expand Down Expand Up @@ -66,7 +67,9 @@ async def post_calc_surf_isec_experiments(
#intersections = await calc_surf_isec_custom(perf_metrics, authenticated_user, case_uuid, ensemble_name, name, attribute, num_reals, num_workers, cutting_plane)
intersections = await calc_surf_isec_inmem(perf_metrics, authenticated_user, case_uuid, ensemble_name, name, attribute, num_reals, cutting_plane)

LOGGER.debug(f"route calc_surf_isec_experiments - intersected {len(intersections)} surfaces in: {perf_metrics.to_string()}")
#intersections = await calc_surf_isec_many_multiprocess(perf_metrics, authenticated_user, case_uuid, ensemble_name, name, attribute, num_reals, cutting_plane)

LOGGER.debug(f"route calc_surf_isec_experiments - exiting - intersected {len(intersections)} surfaces in: {perf_metrics.to_string()}")

return intersections

0 comments on commit 3018c32

Please sign in to comment.