From 6a3be9842cfcfe680fed4f426785b01cdb7ac281 Mon Sep 17 00:00:00 2001 From: Sigurd Pettersen Date: Fri, 13 Oct 2023 17:53:08 +0200 Subject: [PATCH] Add loading in worker procs --- .../experiments/calc_surf_isec_custom.py | 40 +++++++++++++++++-- 1 file changed, 37 insertions(+), 3 deletions(-) diff --git a/backend/src/backend/experiments/calc_surf_isec_custom.py b/backend/src/backend/experiments/calc_surf_isec_custom.py index 7966a0dc3..3d0cd9d36 100644 --- a/backend/src/backend/experiments/calc_surf_isec_custom.py +++ b/backend/src/backend/experiments/calc_surf_isec_custom.py @@ -3,6 +3,8 @@ import logging from typing import List import multiprocessing +import xtgeo +from io import BytesIO from src.services.sumo_access.surface_access import SurfaceAccess from src.services.utils.authenticated_user import AuthenticatedUser @@ -47,7 +49,7 @@ async def calc_surf_isec_custom( ) -> List[schemas.SurfaceIntersectionData]: myprefix = ">>>>>>>>>>>>>>>>> calc_surf_isec_custom():" - print(f"{myprefix} started with {num_reals=}") + print(f"{myprefix} started with {num_reals=}", flush=True) perf_metrics.reset_lap_timer() @@ -61,6 +63,13 @@ async def calc_surf_isec_custom( multi_queue = multiprocessing.Queue() async_queue = AsyncQueue(multi_queue) + num_procs=4 + proc_arr = [] + for proc_num in range(num_procs): + p = multiprocessing.Process(target=process_surf_worker, args=(f"sigworker{proc_num}", multi_queue, fence_arr)) + p.start() + proc_arr.append(p) + load_tasks = [] for real in reals: task = asyncio.create_task(load_surf_bytes_task(async_queue, access, real, name, attribute)) @@ -70,9 +79,16 @@ async def calc_surf_isec_custom( await asyncio.gather(*load_tasks) - perf_metrics.record_lap("execute-tasks") + for p in proc_arr: + multi_queue.put(None) + + multi_queue.close() + multi_queue.join_thread() - print(f"{myprefix} finished") + for p in proc_arr: + p.join() + + print(f"{myprefix} finished", flush=True) return [] @@ -82,3 +98,21 @@ async def load_surf_bytes_task(queue, access: SurfaceAccess, real_num: int, name await queue.put(surf_bytes) +def process_surf_worker(workerName, queue, fence_arr): + + while True: + print(f"Worker {workerName} waiting for work...", flush=True) + theBytes = queue.get() + if theBytes is None: + print(f"Worker {workerName} exiting", flush=True) + return + + print(f"Worker {workerName} Doing work...", flush=True) + + xtgeo_surf = xtgeo.surface_from_file(BytesIO(theBytes)) + + size_mb = len(theBytes)/(1024*1024) + nx = xtgeo_surf.ncol + ny = xtgeo_surf.nrow + + print(f"Worker {workerName} read surf [{nx}x{ny}, {size_mb:.2f}MB]", flush=True)