Skip to content

Commit

Permalink
Add loading in worker procs
Browse files Browse the repository at this point in the history
  • Loading branch information
sigurdp committed Oct 13, 2023
1 parent 5f67e9e commit 6a3be98
Showing 1 changed file with 37 additions and 3 deletions.
40 changes: 37 additions & 3 deletions backend/src/backend/experiments/calc_surf_isec_custom.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import logging
from typing import List
import multiprocessing
import xtgeo
from io import BytesIO

from src.services.sumo_access.surface_access import SurfaceAccess
from src.services.utils.authenticated_user import AuthenticatedUser
Expand Down Expand Up @@ -47,7 +49,7 @@ async def calc_surf_isec_custom(
) -> List[schemas.SurfaceIntersectionData]:

myprefix = ">>>>>>>>>>>>>>>>> calc_surf_isec_custom():"
print(f"{myprefix} started with {num_reals=}")
print(f"{myprefix} started with {num_reals=}", flush=True)

perf_metrics.reset_lap_timer()

Expand All @@ -61,6 +63,13 @@ async def calc_surf_isec_custom(
multi_queue = multiprocessing.Queue()
async_queue = AsyncQueue(multi_queue)

num_procs=4
proc_arr = []
for proc_num in range(num_procs):
p = multiprocessing.Process(target=process_surf_worker, args=(f"sigworker{proc_num}", multi_queue, fence_arr))
p.start()
proc_arr.append(p)

load_tasks = []
for real in reals:
task = asyncio.create_task(load_surf_bytes_task(async_queue, access, real, name, attribute))
Expand All @@ -70,9 +79,16 @@ async def calc_surf_isec_custom(

await asyncio.gather(*load_tasks)

perf_metrics.record_lap("execute-tasks")
for p in proc_arr:
multi_queue.put(None)

multi_queue.close()
multi_queue.join_thread()

print(f"{myprefix} finished")
for p in proc_arr:
p.join()

print(f"{myprefix} finished", flush=True)

return []

Expand All @@ -82,3 +98,21 @@ async def load_surf_bytes_task(queue, access: SurfaceAccess, real_num: int, name
await queue.put(surf_bytes)


def process_surf_worker(workerName, queue, fence_arr):

while True:
print(f"Worker {workerName} waiting for work...", flush=True)
theBytes = queue.get()
if theBytes is None:
print(f"Worker {workerName} exiting", flush=True)
return

print(f"Worker {workerName} Doing work...", flush=True)

xtgeo_surf = xtgeo.surface_from_file(BytesIO(theBytes))

size_mb = len(theBytes)/(1024*1024)
nx = xtgeo_surf.ncol
ny = xtgeo_surf.nrow

print(f"Worker {workerName} read surf [{nx}x{ny}, {size_mb:.2f}MB]", flush=True)

0 comments on commit 6a3be98

Please sign in to comment.