diff --git a/backend_py/primary/primary/routers/surface/router.py b/backend_py/primary/primary/routers/surface/router.py index 755eec1f6..291e8999f 100644 --- a/backend_py/primary/primary/routers/surface/router.py +++ b/backend_py/primary/primary/routers/surface/router.py @@ -2,27 +2,53 @@ import logging from typing import Annotated, List, Optional, Literal -from fastapi import APIRouter, Depends, HTTPException, Query, Response, Request, Body, status +from fastapi import ( + APIRouter, + Depends, + HTTPException, + Query, + Response, + Request, + Body, + status, +) from webviz_pkg.core_utils.perf_metrics import PerfMetrics from primary.services.sumo_access.case_inspector import CaseInspector from primary.services.sumo_access.surface_access import SurfaceAccess -from primary.services.smda_access.stratigraphy_access import StratigraphyAccess, StratigraphicUnit -from primary.services.smda_access.stratigraphy_utils import sort_stratigraphic_names_by_hierarchy -from primary.services.smda_access.mocked_drogon_smda_access import _mocked_stratigraphy_access +from primary.services.smda_access.stratigraphy_access import ( + StratigraphyAccess, + StratigraphicUnit, +) +from primary.services.smda_access.stratigraphy_utils import ( + sort_stratigraphic_names_by_hierarchy, +) +from primary.services.smda_access.mocked_drogon_smda_access import ( + _mocked_stratigraphy_access, +) from primary.services.utils.statistic_function import StatisticFunction -from primary.services.utils.surface_intersect_with_polyline import intersect_surface_with_polyline +from primary.services.utils.surface_intersect_with_polyline import ( + intersect_surface_with_polyline, +) from primary.services.utils.authenticated_user import AuthenticatedUser from primary.auth.auth_helper import AuthHelper -from primary.services.surface_query_service.surface_query_service import batch_sample_surface_in_points_async -from primary.services.surface_query_service.surface_query_service import RealizationSampleResult +from primary.services.surface_query_service.surface_query_service import ( + batch_sample_surface_in_points_async, +) +from primary.services.surface_query_service.surface_query_service import ( + RealizationSampleResult, +) from primary.utils.response_perf_metrics import ResponsePerfMetrics from . import converters from . import schemas from . import dependencies -from .surface_address import RealizationSurfaceAddress, ObservedSurfaceAddress, StatisticalSurfaceAddress +from .surface_address import ( + RealizationSurfaceAddress, + ObservedSurfaceAddress, + StatisticalSurfaceAddress, +) from .surface_address import decode_surf_addr_str @@ -69,19 +95,31 @@ async def get_realization_surfaces_metadata( perf_metrics = ResponsePerfMetrics(response) async with asyncio.TaskGroup() as tg: - access = SurfaceAccess.from_case_uuid(authenticated_user.get_sumo_access_token(), case_uuid, ensemble_name) - surf_meta_task = tg.create_task(access.get_realization_surfaces_metadata_async()) - surf_meta_task.add_done_callback(lambda _: perf_metrics.record_lap_no_reset("get-meta")) + access = SurfaceAccess.from_case_uuid( + authenticated_user.get_sumo_access_token(), case_uuid, ensemble_name + ) + surf_meta_task = tg.create_task( + access.get_realization_surfaces_metadata_async() + ) + surf_meta_task.add_done_callback( + lambda _: perf_metrics.record_lap_no_reset("get-meta") + ) - strat_units_task = tg.create_task(_get_stratigraphic_units_for_case_async(authenticated_user, case_uuid)) - strat_units_task.add_done_callback(lambda _: perf_metrics.record_lap_no_reset("get-strat")) + strat_units_task = tg.create_task( + _get_stratigraphic_units_for_case_async(authenticated_user, case_uuid) + ) + strat_units_task.add_done_callback( + lambda _: perf_metrics.record_lap_no_reset("get-strat") + ) perf_metrics.reset_lap_timer() sumo_surf_meta_set = surf_meta_task.result() strat_units = strat_units_task.result() sorted_stratigraphic_surfaces = sort_stratigraphic_names_by_hierarchy(strat_units) - api_surf_meta_set = converters.to_api_surface_meta_set(sumo_surf_meta_set, sorted_stratigraphic_surfaces) + api_surf_meta_set = converters.to_api_surface_meta_set( + sumo_surf_meta_set, sorted_stratigraphic_surfaces + ) perf_metrics.record_lap("compose") LOGGER.info(f"Got metadata for realization surfaces in: {perf_metrics.to_string()}") @@ -101,19 +139,29 @@ async def get_observed_surfaces_metadata( perf_metrics = ResponsePerfMetrics(response) async with asyncio.TaskGroup() as tg: - access = SurfaceAccess.from_case_uuid_no_iteration(authenticated_user.get_sumo_access_token(), case_uuid) + access = SurfaceAccess.from_case_uuid_no_iteration( + authenticated_user.get_sumo_access_token(), case_uuid + ) surf_meta_task = tg.create_task(access.get_observed_surfaces_metadata_async()) - surf_meta_task.add_done_callback(lambda _: perf_metrics.record_lap_no_reset("get-meta")) + surf_meta_task.add_done_callback( + lambda _: perf_metrics.record_lap_no_reset("get-meta") + ) - strat_units_task = tg.create_task(_get_stratigraphic_units_for_case_async(authenticated_user, case_uuid)) - strat_units_task.add_done_callback(lambda _: perf_metrics.record_lap_no_reset("get-strat")) + strat_units_task = tg.create_task( + _get_stratigraphic_units_for_case_async(authenticated_user, case_uuid) + ) + strat_units_task.add_done_callback( + lambda _: perf_metrics.record_lap_no_reset("get-strat") + ) perf_metrics.reset_lap_timer() sumo_surf_meta_set = surf_meta_task.result() strat_units = strat_units_task.result() sorted_stratigraphic_surfaces = sort_stratigraphic_names_by_hierarchy(strat_units) - api_surf_meta_set = converters.to_api_surface_meta_set(sumo_surf_meta_set, sorted_stratigraphic_surfaces) + api_surf_meta_set = converters.to_api_surface_meta_set( + sumo_surf_meta_set, sorted_stratigraphic_surfaces + ) perf_metrics.record_lap("compose") LOGGER.info(f"Got metadata for observed surfaces in: {perf_metrics.to_string()}") @@ -121,7 +169,11 @@ async def get_observed_surfaces_metadata( return api_surf_meta_set -@router.get("/surface_data", description="Get surface data for the specified surface." + GENERAL_SURF_ADDR_DOC_STR) +@router.get( + "/surface_data", + description="Get surface data for the specified surface." + + GENERAL_SURF_ADDR_DOC_STR, +) async def get_surface_data( # fmt:off response: Response, @@ -136,11 +188,19 @@ async def get_surface_data( access_token = authenticated_user.get_sumo_access_token() addr = decode_surf_addr_str(surf_addr_str) - if not isinstance(addr, RealizationSurfaceAddress | ObservedSurfaceAddress | StatisticalSurfaceAddress): - raise HTTPException(status_code=404, detail="Endpoint only supports address types REAL, OBS and STAT") + if not isinstance( + addr, + RealizationSurfaceAddress | ObservedSurfaceAddress | StatisticalSurfaceAddress, + ): + raise HTTPException( + status_code=404, + detail="Endpoint only supports address types REAL, OBS and STAT", + ) if addr.address_type == "REAL": - access = SurfaceAccess.from_case_uuid(access_token, addr.case_uuid, addr.ensemble_name) + access = SurfaceAccess.from_case_uuid( + access_token, addr.case_uuid, addr.ensemble_name + ) xtgeo_surf = await access.get_realization_surface_data_async( real_num=addr.realization, name=addr.name, @@ -149,14 +209,20 @@ async def get_surface_data( ) perf_metrics.record_lap("get-surf") if not xtgeo_surf: - raise HTTPException(status_code=404, detail="Could not get realization surface") + raise HTTPException( + status_code=404, detail="Could not get realization surface" + ) elif addr.address_type == "STAT": - service_stat_func_to_compute = StatisticFunction.from_string_value(addr.stat_function) + service_stat_func_to_compute = StatisticFunction.from_string_value( + addr.stat_function + ) if service_stat_func_to_compute is None: raise HTTPException(status_code=404, detail="Invalid statistic requested") - access = SurfaceAccess.from_case_uuid(access_token, addr.case_uuid, addr.ensemble_name) + access = SurfaceAccess.from_case_uuid( + access_token, addr.case_uuid, addr.ensemble_name + ) xtgeo_surf = await access.get_statistical_surface_data_async( statistic_function=service_stat_func_to_compute, name=addr.name, @@ -166,16 +232,22 @@ async def get_surface_data( ) perf_metrics.record_lap("sumo-calc") if not xtgeo_surf: - raise HTTPException(status_code=404, detail="Could not get or compute statistical surface") + raise HTTPException( + status_code=404, detail="Could not get or compute statistical surface" + ) elif addr.address_type == "OBS": access = SurfaceAccess.from_case_uuid_no_iteration(access_token, addr.case_uuid) xtgeo_surf = await access.get_observed_surface_data_async( - name=addr.name, attribute=addr.attribute, time_or_interval_str=addr.iso_time_or_interval + name=addr.name, + attribute=addr.attribute, + time_or_interval_str=addr.iso_time_or_interval, ) perf_metrics.record_lap("get-surf") if not xtgeo_surf: - raise HTTPException(status_code=404, detail="Could not get observed surface") + raise HTTPException( + status_code=404, detail="Could not get observed surface" + ) if resample_to is not None: xtgeo_surf = converters.resample_to_surface_def(xtgeo_surf, resample_to) @@ -202,18 +274,27 @@ async def post_get_surface_intersection( realization_num: int = Query(description="Realization number"), name: str = Query(description="Surface name"), attribute: str = Query(description="Surface attribute"), - time_or_interval_str: Optional[str] = Query(None, description="Time point or time interval string"), - cumulative_length_polyline: schemas.SurfaceIntersectionCumulativeLengthPolyline = Body(embed=True), + time_or_interval_str: Optional[str] = Query( + None, description="Time point or time interval string" + ), + cumulative_length_polyline: schemas.SurfaceIntersectionCumulativeLengthPolyline = Body( + embed=True + ), ) -> schemas.SurfaceIntersectionData: """Get surface intersection data for requested surface name. The surface intersection data for surface name contains: An array of z-points, i.e. one z-value/depth per (x, y)-point in polyline, and cumulative lengths, the accumulated length at each z-point in the array. """ - access = SurfaceAccess.from_case_uuid(authenticated_user.get_sumo_access_token(), case_uuid, ensemble_name) + access = SurfaceAccess.from_case_uuid( + authenticated_user.get_sumo_access_token(), case_uuid, ensemble_name + ) surface = await access.get_realization_surface_data_async( - real_num=realization_num, name=name, attribute=attribute, time_or_interval_str=time_or_interval_str + real_num=realization_num, + name=name, + attribute=attribute, + time_or_interval_str=time_or_interval_str, ) if surface is None: raise HTTPException(status_code=404, detail="Surface '{name}' not found") @@ -221,10 +302,18 @@ async def post_get_surface_intersection( # Ensure name is applied surface.name = name - intersection_polyline = converters.from_api_cumulative_length_polyline_to_xtgeo_polyline(cumulative_length_polyline) - surface_intersection = intersect_surface_with_polyline(surface, intersection_polyline) + intersection_polyline = ( + converters.from_api_cumulative_length_polyline_to_xtgeo_polyline( + cumulative_length_polyline + ) + ) + surface_intersection = intersect_surface_with_polyline( + surface, intersection_polyline + ) - surface_intersection_response = converters.to_api_surface_intersection(surface_intersection) + surface_intersection_response = converters.to_api_surface_intersection( + surface_intersection + ) return surface_intersection_response @@ -240,12 +329,15 @@ async def post_sample_surface_in_points( sample_points: schemas.PointSetXY = Body(embed=True), authenticated_user: AuthenticatedUser = Depends(AuthHelper.get_authenticated_user), ) -> List[schemas.SurfaceRealizationSampleValues]: - + perf_metrics = PerfMetrics() sumo_access_token = authenticated_user.get_sumo_access_token() - + perf_metrics.record_lap("get-access-token") async_client = request.app.state.requests_client + perf_metrics.record_lap("get-async-client") - result_arr: List[RealizationSampleResult] = await batch_sample_surface_in_points_async( + result_arr: List[ + RealizationSampleResult + ] = await batch_sample_surface_in_points_async( async_client=async_client, sumo_access_token=sumo_access_token, case_uuid=case_uuid, @@ -256,6 +348,7 @@ async def post_sample_surface_in_points( x_coords=sample_points.x_points, y_coords=sample_points.y_points, ) + perf_metrics.record_lap("sample-surface") intersections: List[schemas.SurfaceRealizationSampleValues] = [] for res in result_arr: @@ -265,7 +358,9 @@ async def post_sample_surface_in_points( sampled_values=res.sampledValues, ) ) + perf_metrics.record_lap("convert to api response") + LOGGER.info(f"Sampled surface in points in: {perf_metrics.to_string()}") return intersections @@ -304,13 +399,19 @@ async def _get_stratigraphic_units_for_case_async( ) -> list[StratigraphicUnit]: perf_metrics = PerfMetrics() - case_inspector = CaseInspector.from_case_uuid(authenticated_user.get_sumo_access_token(), case_uuid) - strat_column_identifier = await case_inspector.get_stratigraphic_column_identifier_async() + case_inspector = CaseInspector.from_case_uuid( + authenticated_user.get_sumo_access_token(), case_uuid + ) + strat_column_identifier = ( + await case_inspector.get_stratigraphic_column_identifier_async() + ) perf_metrics.record_lap("get-strat-ident") strat_access: StratigraphyAccess | _mocked_stratigraphy_access.StratigraphyAccess if strat_column_identifier == "DROGON_HAS_NO_STRATCOLUMN": - strat_access = _mocked_stratigraphy_access.StratigraphyAccess(authenticated_user.get_smda_access_token()) + strat_access = _mocked_stratigraphy_access.StratigraphyAccess( + authenticated_user.get_smda_access_token() + ) else: strat_access = StratigraphyAccess(authenticated_user.get_smda_access_token()) diff --git a/backend_py/primary/primary/services/surface_query_service/surface_query_service.py b/backend_py/primary/primary/services/surface_query_service/surface_query_service.py index 9c8893c76..65622403c 100644 --- a/backend_py/primary/primary/services/surface_query_service/surface_query_service.py +++ b/backend_py/primary/primary/services/surface_query_service/surface_query_service.py @@ -121,23 +121,25 @@ async def batch_sample_surface_in_points_async( LOGGER.info(f"Running async go point sampling for surface: {surface_name}") perf_metrics.record_lap("prepare_call") - - response: httpx.Response = await async_client.post(url=SERVICE_ENDPOINT, json=json_request_body) - + response: httpx.Response = await async_client.post( + url=SERVICE_ENDPOINT, json=json_request_body + ) # async with httpx.AsyncClient(timeout=300) as client: # LOGGER.info(f"Running async go point sampling for surface: {surface_name}") - + # perf_metrics.record_lap("prepare_call") - + # response: httpx.Response = await client.post(url=SERVICE_ENDPOINT, json=json_request_body) perf_metrics.record_lap("main-call") json_data: bytes = response.content - response_body = _PointSamplingResponseBody.model_validate_json(json_data) + response_body = _PointSamplingResponseBody.model_validate_json(json_data) perf_metrics.set_metric("inner-go-call", response_body.calculationTime_ms) + + perf_metrics.record_lap("validate-response") # Replace values above the undefLimit with np.nan for res in response_body.sampleResultArr: @@ -145,7 +147,7 @@ async def batch_sample_surface_in_points_async( res.sampledValues = np.where( (values_np < response_body.undefLimit), values_np, np.nan ).tolist() - + perf_metrics.record_lap("post-process") LOGGER.debug( f"------------------ batch_sample_surface_in_points_async() took: {perf_metrics.to_string_s()}" )