Skip to content

Commit

Permalink
Make functions in SurfaceAccess truly async (#475)
Browse files Browse the repository at this point in the history
  • Loading branch information
sigurdp authored Nov 13, 2023
1 parent c8dfb3e commit b440ac9
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 32 deletions.
16 changes: 7 additions & 9 deletions backend/src/backend/primary/routers/surface/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,9 @@
from src.backend.utils.perf_metrics import PerfMetrics
from src.services.sumo_access._helpers import SumoCase


from . import converters
from . import schemas


LOGGER = logging.getLogger(__name__)

router = APIRouter()
Expand All @@ -36,7 +34,7 @@ async def get_surface_directory(
surface_access = await SurfaceAccess.from_case_uuid(
authenticated_user.get_sumo_access_token(), case_uuid, ensemble_name
)
sumo_surf_dir = await surface_access.get_surface_directory()
sumo_surf_dir = await surface_access.get_surface_directory_async()

case_inspector = await SumoCase.from_case_uuid(authenticated_user.get_sumo_access_token(), case_uuid)
strat_column_identifier = await case_inspector.get_stratigraphic_column_identifier()
Expand Down Expand Up @@ -66,7 +64,7 @@ async def get_realization_surface_data(
perf_metrics = PerfMetrics(response)

access = await SurfaceAccess.from_case_uuid(authenticated_user.get_sumo_access_token(), case_uuid, ensemble_name)
xtgeo_surf = access.get_realization_surface_data(
xtgeo_surf = await access.get_realization_surface_data_async(
real_num=realization_num, name=name, attribute=attribute, time_or_interval_str=time_or_interval
)
perf_metrics.record_lap("get-surf")
Expand Down Expand Up @@ -101,7 +99,7 @@ async def get_statistical_surface_data(
if service_stat_func_to_compute is None:
raise HTTPException(status_code=404, detail="Invalid statistic requested")

xtgeo_surf = access.get_statistical_surface_data(
xtgeo_surf = await access.get_statistical_surface_data_async(
statistic_function=service_stat_func_to_compute,
name=name,
attribute=attribute,
Expand Down Expand Up @@ -138,12 +136,12 @@ async def get_property_surface_resampled_to_static_surface(
perf_metrics = PerfMetrics(response)

access = await SurfaceAccess.from_case_uuid(authenticated_user.get_sumo_access_token(), case_uuid, ensemble_name)
xtgeo_surf_mesh = access.get_realization_surface_data(
xtgeo_surf_mesh = await access.get_realization_surface_data_async(
real_num=realization_num_mesh, name=name_mesh, attribute=attribute_mesh
)
perf_metrics.record_lap("mesh-surf")

xtgeo_surf_property = access.get_realization_surface_data(
xtgeo_surf_property = await access.get_realization_surface_data_async(
real_num=realization_num_property,
name=name_property,
attribute=attribute_property,
Expand Down Expand Up @@ -183,12 +181,12 @@ async def get_property_surface_resampled_to_statistical_static_surface(
access = await SurfaceAccess.from_case_uuid(authenticated_user.get_sumo_access_token(), case_uuid, ensemble_name)
service_stat_func_to_compute = StatisticFunction.from_string_value(statistic_function)
if service_stat_func_to_compute is not None:
xtgeo_surf_mesh = access.get_statistical_surface_data(
xtgeo_surf_mesh = await access.get_statistical_surface_data_async(
statistic_function=service_stat_func_to_compute,
name=name_mesh,
attribute=attribute_mesh,
)
xtgeo_surf_property = access.get_statistical_surface_data(
xtgeo_surf_property = await access.get_statistical_surface_data_async(
statistic_function=service_stat_func_to_compute,
name=name_property,
attribute=attribute_property,
Expand Down
61 changes: 38 additions & 23 deletions backend/src/services/sumo_access/surface_access.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import xtgeo
from fmu.sumo.explorer import TimeFilter, TimeType
from fmu.sumo.explorer.objects import SurfaceCollection
from fmu.sumo.explorer.objects import SurfaceCollection, Surface

from src.services.utils.perf_timer import PerfTimer
from src.services.utils.statistic_function import StatisticFunction
Expand All @@ -17,7 +17,7 @@


class SurfaceAccess(SumoEnsemble):
async def get_surface_directory(self) -> List[SurfaceMeta]:
async def get_surface_directory_async(self) -> List[SurfaceMeta]:
surface_collection: SurfaceCollection = self._case.surfaces.filter(
iteration=self._iteration_name,
aggregation=False,
Expand Down Expand Up @@ -68,7 +68,7 @@ async def get_surface_directory(self) -> List[SurfaceMeta]:

return surfs

def get_realization_surface_data(
async def get_realization_surface_data_async(
self, real_num: int, name: str, attribute: str, time_or_interval_str: Optional[str] = None
) -> Optional[xtgeo.RegularSurface]:
"""
Expand Down Expand Up @@ -110,22 +110,35 @@ def get_realization_surface_data(
time=time_filter,
)

surf_count = len(surface_collection)
surf_count = await surface_collection.length_async()
if surf_count == 0:
LOGGER.warning(f"No realization surface found in Sumo for {addr_str}")
return None
if surf_count > 1:
LOGGER.warning(f"Multiple ({surf_count}) surfaces found in Sumo for: {addr_str}. Returning first surface.")

sumo_surf = surface_collection[0]
byte_stream: BytesIO = sumo_surf.blob
sumo_surf: Surface = await surface_collection.getitem_async(0)
et_locate_ms = timer.lap_ms()

byte_stream: BytesIO = await sumo_surf.blob_async
et_download_ms = timer.lap_ms()

xtgeo_surf = xtgeo.surface_from_file(byte_stream)
et_xtgeo_read_ms = timer.lap_ms()

LOGGER.debug(f"Got realization surface from Sumo in: {timer.elapsed_ms()}ms ({addr_str})")
size_mb = byte_stream.getbuffer().nbytes / (1024 * 1024)
LOGGER.debug(
f"Got realization surface from Sumo in: {timer.elapsed_ms()}ms ("
f"locate={et_locate_ms}ms, "
f"download={et_download_ms}ms, "
f"xtgeo_read={et_xtgeo_read_ms}ms) "
f"[{xtgeo_surf.ncol}x{xtgeo_surf.nrow}, {size_mb:.2f}MB] "
f"({addr_str})"
)

return xtgeo_surf

def get_statistical_surface_data(
async def get_statistical_surface_data_async(
self,
statistic_function: StatisticFunction,
name: str,
Expand Down Expand Up @@ -159,7 +172,6 @@ def get_statistical_surface_data(
end=timestamp_arr[1],
exact=True,
)
et_get_case_ms = timer.lap_ms()

surface_collection = self._case.surfaces.filter(
iteration=self._iteration_name,
Expand All @@ -168,16 +180,17 @@ def get_statistical_surface_data(
tagname=attribute,
time=time_filter,
)
et_collect_surfaces_ms = timer.lap_ms()

surf_count = len(surface_collection)
surf_count = await surface_collection.length_async()
if surf_count == 0:
LOGGER.warning(f"No statistical surfaces found in Sumo for {addr_str}")
return None
et_locate_ms = timer.lap_ms()

realizations = surface_collection.realizations
realizations = await surface_collection.realizations_async
et_collect_reals_ms = timer.lap_ms()

xtgeo_surf = _compute_statistical_surface(statistic_function, surface_collection)
xtgeo_surf = await _compute_statistical_surface_async(statistic_function, surface_collection)
et_calc_stat_ms = timer.lap_ms()

if not xtgeo_surf:
Expand All @@ -186,8 +199,8 @@ def get_statistical_surface_data(

LOGGER.debug(
f"Calculated statistical surface using Sumo in: {timer.elapsed_ms()}ms ("
f"get_case={et_get_case_ms}ms, "
f"collect_surfaces={et_collect_surfaces_ms}ms, "
f"locate={et_locate_ms}ms, "
f"collect_reals={et_collect_reals_ms}ms, "
f"calc_stat={et_calc_stat_ms}ms) "
f"({addr_str} {len(realizations)=} )"
)
Expand All @@ -199,22 +212,24 @@ def _make_addr_str(self, real_num: int, name: str, attribute: str, date_str: Opt
return addr_str


def _compute_statistical_surface(statistic: StatisticFunction, surface_coll: SurfaceCollection) -> xtgeo.RegularSurface:
async def _compute_statistical_surface_async(
statistic: StatisticFunction, surface_coll: SurfaceCollection
) -> xtgeo.RegularSurface:
xtgeo_surf: xtgeo.RegularSurface = None
if statistic == StatisticFunction.MIN:
xtgeo_surf = surface_coll.min()
xtgeo_surf = await surface_coll.min_async()
elif statistic == StatisticFunction.MAX:
xtgeo_surf = surface_coll.max()
xtgeo_surf = await surface_coll.max_async()
elif statistic == StatisticFunction.MEAN:
xtgeo_surf = surface_coll.mean()
xtgeo_surf = await surface_coll.mean_async()
elif statistic == StatisticFunction.P10:
xtgeo_surf = surface_coll.p10()
xtgeo_surf = await surface_coll.p10_async()
elif statistic == StatisticFunction.P90:
xtgeo_surf = surface_coll.p90()
xtgeo_surf = await surface_coll.p90_async()
elif statistic == StatisticFunction.P50:
xtgeo_surf = surface_coll.p50()
xtgeo_surf = await surface_coll.p50_async()
elif statistic == StatisticFunction.STD:
xtgeo_surf = surface_coll.std()
xtgeo_surf = await surface_coll.std_async()
else:
raise ValueError("Unhandled statistic function")

Expand Down

0 comments on commit b440ac9

Please sign in to comment.