Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
sigurdp committed Oct 25, 2023
1 parent 491ac16 commit 869e602
Showing 1 changed file with 72 additions and 4 deletions.
76 changes: 72 additions & 4 deletions backend/src/backend/experiments/calc_surf_isec_async_via_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import aiofiles
import xtgeo
import struct
import multiprocessing

# from concurrent.futures import ProcessPoolExecutor
from dataclasses import dataclass
Expand Down Expand Up @@ -79,6 +80,38 @@ async def load_quick_surf(quick_file_name) -> xtgeo.RegularSurface:
return xtgeo_surf


@dataclass
class QueueItem:
irap_file_name: str
quick_file_name: str


def convert_irap_to_quick_format_process_worker(workerName, queue: multiprocessing.Queue):
while True:
print(f"---- Worker {workerName} waiting for work...", flush=True)
item: QueueItem = queue.get()
if item is None:
print(f"---- Worker {workerName} exiting", flush=True)
return

print(f"---- Worker {workerName} Doing work...", flush=True)

if item.irap_file_name is None:
return False

perf_metrics = PerfMetrics()

xtgeo_surf = xtgeo.surface_from_file(item.irap_file_name)
perf_metrics.record_lap("xtgeo-read")

my_bytes = xtgeo_surf_to_bytes(xtgeo_surf)
with open(item.quick_file_name, mode='wb') as f:
f.write(my_bytes)
perf_metrics.record_lap("write-quick")

print(f"---- Worker converted {item.quick_file_name=} in {perf_metrics.to_string()}", flush=True)


def xtgeo_surf_to_bytes(surf: xtgeo.RegularSurface) -> bytes:
header_bytes = struct.pack("@iiddddid", surf.ncol, surf.nrow, surf.xinc, surf.yinc, surf.xori, surf.yori, surf.yflip, surf.rotation)

Expand Down Expand Up @@ -139,6 +172,17 @@ async def calc_surf_isec_async_via_file(
reals = range(0, num_reals)


multi_queue = multiprocessing.Queue()
num_procs = 2
proc_arr = []
for proc_num in range(num_procs):
p = multiprocessing.Process(target=convert_irap_to_quick_format_process_worker, args=(f"worker_{proc_num}", multi_queue))
p.start()
proc_arr.append(p)

perf_metrics.record_lap("start-procs")


no_concurrent = num_workers
dltasks = set()
done_arr = []
Expand All @@ -159,7 +203,8 @@ async def calc_surf_isec_async_via_file(

print(f"{myprefix} {len(done_arr)=}", flush=True)

loadTimer = PerfTimer()
"""
myTimer = PerfTimer()
quick_file_names_arr = []
for donetask in done_arr:
surf_item: DownloadedSurfItem = donetask.result()
Expand All @@ -168,17 +213,40 @@ async def calc_surf_isec_async_via_file(
await convert_irap_to_quick_format(surf_item.file_name, quick_file_name)
quick_file_names_arr.append(quick_file_name)
print(f"{myprefix} average convert time: {loadTimer.elapsed_ms()/len(quick_file_names_arr):.2f}ms", flush=True)
print(f"{myprefix} average convert time: {myTimer.elapsed_ms()/len(quick_file_names_arr):.2f}ms", flush=True)
perf_metrics.record_lap("conv-xtgeo-to-quick")
"""


myTimer = PerfTimer()

quick_file_names_arr = []
for donetask in done_arr:
surf_item: DownloadedSurfItem = donetask.result()
if surf_item.file_name is not None:
quick_file_name = surf_item.file_name.replace(".bin", ".quick")
multi_queue.put(QueueItem(irap_file_name=surf_item.file_name, quick_file_name=quick_file_name))
quick_file_names_arr.append(quick_file_name)

for p in proc_arr:
multi_queue.put(None)

multi_queue.close()
multi_queue.join_thread()
for p in proc_arr:
p.join()

print(f"{myprefix} average convert time: {myTimer.elapsed_ms()/len(quick_file_names_arr):.2f}ms", flush=True)
perf_metrics.record_lap("conv-xtgeo-to-quick")


loadTimer = PerfTimer()
myTimer = PerfTimer()
xtgeo_surf_arr = []
for quick_file_name in quick_file_names_arr:
xtgeo_surf = await load_quick_surf(quick_file_name)
xtgeo_surf_arr.append(xtgeo_surf)

print(f"{myprefix} average quick surf load time: {loadTimer.elapsed_ms()/len(xtgeo_surf_arr):.2f}ms", flush=True)
print(f"{myprefix} average quick surf load time: {myTimer.elapsed_ms()/len(xtgeo_surf_arr):.2f}ms", flush=True)
perf_metrics.record_lap("load-quick")


Expand Down

0 comments on commit 869e602

Please sign in to comment.