Skip to content

Commit

Permalink
metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
HansKallekleiv committed Nov 22, 2024
1 parent c487fce commit e209c3e
Show file tree
Hide file tree
Showing 2 changed files with 152 additions and 49 deletions.
185 changes: 143 additions & 42 deletions backend_py/primary/primary/routers/surface/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,53 @@
import logging
from typing import Annotated, List, Optional, Literal

from fastapi import APIRouter, Depends, HTTPException, Query, Response, Request, Body, status
from fastapi import (
APIRouter,
Depends,
HTTPException,
Query,
Response,
Request,
Body,
status,
)
from webviz_pkg.core_utils.perf_metrics import PerfMetrics

from primary.services.sumo_access.case_inspector import CaseInspector
from primary.services.sumo_access.surface_access import SurfaceAccess
from primary.services.smda_access.stratigraphy_access import StratigraphyAccess, StratigraphicUnit
from primary.services.smda_access.stratigraphy_utils import sort_stratigraphic_names_by_hierarchy
from primary.services.smda_access.mocked_drogon_smda_access import _mocked_stratigraphy_access
from primary.services.smda_access.stratigraphy_access import (
StratigraphyAccess,
StratigraphicUnit,
)
from primary.services.smda_access.stratigraphy_utils import (
sort_stratigraphic_names_by_hierarchy,
)
from primary.services.smda_access.mocked_drogon_smda_access import (
_mocked_stratigraphy_access,
)
from primary.services.utils.statistic_function import StatisticFunction
from primary.services.utils.surface_intersect_with_polyline import intersect_surface_with_polyline
from primary.services.utils.surface_intersect_with_polyline import (
intersect_surface_with_polyline,
)
from primary.services.utils.authenticated_user import AuthenticatedUser
from primary.auth.auth_helper import AuthHelper
from primary.services.surface_query_service.surface_query_service import batch_sample_surface_in_points_async
from primary.services.surface_query_service.surface_query_service import RealizationSampleResult
from primary.services.surface_query_service.surface_query_service import (
batch_sample_surface_in_points_async,
)
from primary.services.surface_query_service.surface_query_service import (
RealizationSampleResult,
)
from primary.utils.response_perf_metrics import ResponsePerfMetrics

from . import converters
from . import schemas
from . import dependencies

from .surface_address import RealizationSurfaceAddress, ObservedSurfaceAddress, StatisticalSurfaceAddress
from .surface_address import (
RealizationSurfaceAddress,
ObservedSurfaceAddress,
StatisticalSurfaceAddress,
)
from .surface_address import decode_surf_addr_str


Expand Down Expand Up @@ -69,19 +95,31 @@ async def get_realization_surfaces_metadata(
perf_metrics = ResponsePerfMetrics(response)

async with asyncio.TaskGroup() as tg:
access = SurfaceAccess.from_case_uuid(authenticated_user.get_sumo_access_token(), case_uuid, ensemble_name)
surf_meta_task = tg.create_task(access.get_realization_surfaces_metadata_async())
surf_meta_task.add_done_callback(lambda _: perf_metrics.record_lap_no_reset("get-meta"))
access = SurfaceAccess.from_case_uuid(
authenticated_user.get_sumo_access_token(), case_uuid, ensemble_name
)
surf_meta_task = tg.create_task(
access.get_realization_surfaces_metadata_async()
)
surf_meta_task.add_done_callback(
lambda _: perf_metrics.record_lap_no_reset("get-meta")
)

strat_units_task = tg.create_task(_get_stratigraphic_units_for_case_async(authenticated_user, case_uuid))
strat_units_task.add_done_callback(lambda _: perf_metrics.record_lap_no_reset("get-strat"))
strat_units_task = tg.create_task(
_get_stratigraphic_units_for_case_async(authenticated_user, case_uuid)
)
strat_units_task.add_done_callback(
lambda _: perf_metrics.record_lap_no_reset("get-strat")
)

perf_metrics.reset_lap_timer()
sumo_surf_meta_set = surf_meta_task.result()
strat_units = strat_units_task.result()

sorted_stratigraphic_surfaces = sort_stratigraphic_names_by_hierarchy(strat_units)
api_surf_meta_set = converters.to_api_surface_meta_set(sumo_surf_meta_set, sorted_stratigraphic_surfaces)
api_surf_meta_set = converters.to_api_surface_meta_set(
sumo_surf_meta_set, sorted_stratigraphic_surfaces
)
perf_metrics.record_lap("compose")

LOGGER.info(f"Got metadata for realization surfaces in: {perf_metrics.to_string()}")
Expand All @@ -101,27 +139,41 @@ async def get_observed_surfaces_metadata(
perf_metrics = ResponsePerfMetrics(response)

async with asyncio.TaskGroup() as tg:
access = SurfaceAccess.from_case_uuid_no_iteration(authenticated_user.get_sumo_access_token(), case_uuid)
access = SurfaceAccess.from_case_uuid_no_iteration(
authenticated_user.get_sumo_access_token(), case_uuid
)
surf_meta_task = tg.create_task(access.get_observed_surfaces_metadata_async())
surf_meta_task.add_done_callback(lambda _: perf_metrics.record_lap_no_reset("get-meta"))
surf_meta_task.add_done_callback(
lambda _: perf_metrics.record_lap_no_reset("get-meta")
)

strat_units_task = tg.create_task(_get_stratigraphic_units_for_case_async(authenticated_user, case_uuid))
strat_units_task.add_done_callback(lambda _: perf_metrics.record_lap_no_reset("get-strat"))
strat_units_task = tg.create_task(
_get_stratigraphic_units_for_case_async(authenticated_user, case_uuid)
)
strat_units_task.add_done_callback(
lambda _: perf_metrics.record_lap_no_reset("get-strat")
)

perf_metrics.reset_lap_timer()
sumo_surf_meta_set = surf_meta_task.result()
strat_units = strat_units_task.result()

sorted_stratigraphic_surfaces = sort_stratigraphic_names_by_hierarchy(strat_units)
api_surf_meta_set = converters.to_api_surface_meta_set(sumo_surf_meta_set, sorted_stratigraphic_surfaces)
api_surf_meta_set = converters.to_api_surface_meta_set(
sumo_surf_meta_set, sorted_stratigraphic_surfaces
)
perf_metrics.record_lap("compose")

LOGGER.info(f"Got metadata for observed surfaces in: {perf_metrics.to_string()}")

return api_surf_meta_set


@router.get("/surface_data", description="Get surface data for the specified surface." + GENERAL_SURF_ADDR_DOC_STR)
@router.get(
"/surface_data",
description="Get surface data for the specified surface."
+ GENERAL_SURF_ADDR_DOC_STR,
)
async def get_surface_data(
# fmt:off
response: Response,
Expand All @@ -136,11 +188,19 @@ async def get_surface_data(
access_token = authenticated_user.get_sumo_access_token()

addr = decode_surf_addr_str(surf_addr_str)
if not isinstance(addr, RealizationSurfaceAddress | ObservedSurfaceAddress | StatisticalSurfaceAddress):
raise HTTPException(status_code=404, detail="Endpoint only supports address types REAL, OBS and STAT")
if not isinstance(
addr,
RealizationSurfaceAddress | ObservedSurfaceAddress | StatisticalSurfaceAddress,
):
raise HTTPException(
status_code=404,
detail="Endpoint only supports address types REAL, OBS and STAT",
)

if addr.address_type == "REAL":
access = SurfaceAccess.from_case_uuid(access_token, addr.case_uuid, addr.ensemble_name)
access = SurfaceAccess.from_case_uuid(
access_token, addr.case_uuid, addr.ensemble_name
)
xtgeo_surf = await access.get_realization_surface_data_async(
real_num=addr.realization,
name=addr.name,
Expand All @@ -149,14 +209,20 @@ async def get_surface_data(
)
perf_metrics.record_lap("get-surf")
if not xtgeo_surf:
raise HTTPException(status_code=404, detail="Could not get realization surface")
raise HTTPException(
status_code=404, detail="Could not get realization surface"
)

elif addr.address_type == "STAT":
service_stat_func_to_compute = StatisticFunction.from_string_value(addr.stat_function)
service_stat_func_to_compute = StatisticFunction.from_string_value(
addr.stat_function
)
if service_stat_func_to_compute is None:
raise HTTPException(status_code=404, detail="Invalid statistic requested")

access = SurfaceAccess.from_case_uuid(access_token, addr.case_uuid, addr.ensemble_name)
access = SurfaceAccess.from_case_uuid(
access_token, addr.case_uuid, addr.ensemble_name
)
xtgeo_surf = await access.get_statistical_surface_data_async(
statistic_function=service_stat_func_to_compute,
name=addr.name,
Expand All @@ -166,16 +232,22 @@ async def get_surface_data(
)
perf_metrics.record_lap("sumo-calc")
if not xtgeo_surf:
raise HTTPException(status_code=404, detail="Could not get or compute statistical surface")
raise HTTPException(
status_code=404, detail="Could not get or compute statistical surface"
)

elif addr.address_type == "OBS":
access = SurfaceAccess.from_case_uuid_no_iteration(access_token, addr.case_uuid)
xtgeo_surf = await access.get_observed_surface_data_async(
name=addr.name, attribute=addr.attribute, time_or_interval_str=addr.iso_time_or_interval
name=addr.name,
attribute=addr.attribute,
time_or_interval_str=addr.iso_time_or_interval,
)
perf_metrics.record_lap("get-surf")
if not xtgeo_surf:
raise HTTPException(status_code=404, detail="Could not get observed surface")
raise HTTPException(
status_code=404, detail="Could not get observed surface"
)

if resample_to is not None:
xtgeo_surf = converters.resample_to_surface_def(xtgeo_surf, resample_to)
Expand All @@ -202,29 +274,46 @@ async def post_get_surface_intersection(
realization_num: int = Query(description="Realization number"),
name: str = Query(description="Surface name"),
attribute: str = Query(description="Surface attribute"),
time_or_interval_str: Optional[str] = Query(None, description="Time point or time interval string"),
cumulative_length_polyline: schemas.SurfaceIntersectionCumulativeLengthPolyline = Body(embed=True),
time_or_interval_str: Optional[str] = Query(
None, description="Time point or time interval string"
),
cumulative_length_polyline: schemas.SurfaceIntersectionCumulativeLengthPolyline = Body(
embed=True
),
) -> schemas.SurfaceIntersectionData:
"""Get surface intersection data for requested surface name.
The surface intersection data for surface name contains: An array of z-points, i.e. one z-value/depth per (x, y)-point in polyline,
and cumulative lengths, the accumulated length at each z-point in the array.
"""
access = SurfaceAccess.from_case_uuid(authenticated_user.get_sumo_access_token(), case_uuid, ensemble_name)
access = SurfaceAccess.from_case_uuid(
authenticated_user.get_sumo_access_token(), case_uuid, ensemble_name
)

surface = await access.get_realization_surface_data_async(
real_num=realization_num, name=name, attribute=attribute, time_or_interval_str=time_or_interval_str
real_num=realization_num,
name=name,
attribute=attribute,
time_or_interval_str=time_or_interval_str,
)
if surface is None:
raise HTTPException(status_code=404, detail="Surface '{name}' not found")

# Ensure name is applied
surface.name = name

intersection_polyline = converters.from_api_cumulative_length_polyline_to_xtgeo_polyline(cumulative_length_polyline)
surface_intersection = intersect_surface_with_polyline(surface, intersection_polyline)
intersection_polyline = (
converters.from_api_cumulative_length_polyline_to_xtgeo_polyline(
cumulative_length_polyline
)
)
surface_intersection = intersect_surface_with_polyline(
surface, intersection_polyline
)

surface_intersection_response = converters.to_api_surface_intersection(surface_intersection)
surface_intersection_response = converters.to_api_surface_intersection(
surface_intersection
)

return surface_intersection_response

Expand All @@ -240,12 +329,15 @@ async def post_sample_surface_in_points(
sample_points: schemas.PointSetXY = Body(embed=True),
authenticated_user: AuthenticatedUser = Depends(AuthHelper.get_authenticated_user),
) -> List[schemas.SurfaceRealizationSampleValues]:

perf_metrics = PerfMetrics()
sumo_access_token = authenticated_user.get_sumo_access_token()

perf_metrics.record_lap("get-access-token")
async_client = request.app.state.requests_client
perf_metrics.record_lap("get-async-client")

result_arr: List[RealizationSampleResult] = await batch_sample_surface_in_points_async(
result_arr: List[
RealizationSampleResult
] = await batch_sample_surface_in_points_async(
async_client=async_client,
sumo_access_token=sumo_access_token,
case_uuid=case_uuid,
Expand All @@ -256,6 +348,7 @@ async def post_sample_surface_in_points(
x_coords=sample_points.x_points,
y_coords=sample_points.y_points,
)
perf_metrics.record_lap("sample-surface")

intersections: List[schemas.SurfaceRealizationSampleValues] = []
for res in result_arr:
Expand All @@ -265,7 +358,9 @@ async def post_sample_surface_in_points(
sampled_values=res.sampledValues,
)
)
perf_metrics.record_lap("convert to api response")

LOGGER.info(f"Sampled surface in points in: {perf_metrics.to_string()}")
return intersections


Expand Down Expand Up @@ -304,13 +399,19 @@ async def _get_stratigraphic_units_for_case_async(
) -> list[StratigraphicUnit]:
perf_metrics = PerfMetrics()

case_inspector = CaseInspector.from_case_uuid(authenticated_user.get_sumo_access_token(), case_uuid)
strat_column_identifier = await case_inspector.get_stratigraphic_column_identifier_async()
case_inspector = CaseInspector.from_case_uuid(
authenticated_user.get_sumo_access_token(), case_uuid
)
strat_column_identifier = (
await case_inspector.get_stratigraphic_column_identifier_async()
)
perf_metrics.record_lap("get-strat-ident")

strat_access: StratigraphyAccess | _mocked_stratigraphy_access.StratigraphyAccess
if strat_column_identifier == "DROGON_HAS_NO_STRATCOLUMN":
strat_access = _mocked_stratigraphy_access.StratigraphyAccess(authenticated_user.get_smda_access_token())
strat_access = _mocked_stratigraphy_access.StratigraphyAccess(
authenticated_user.get_smda_access_token()
)
else:
strat_access = StratigraphyAccess(authenticated_user.get_smda_access_token())

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,31 +121,33 @@ async def batch_sample_surface_in_points_async(
LOGGER.info(f"Running async go point sampling for surface: {surface_name}")
perf_metrics.record_lap("prepare_call")


response: httpx.Response = await async_client.post(url=SERVICE_ENDPOINT, json=json_request_body)

response: httpx.Response = await async_client.post(
url=SERVICE_ENDPOINT, json=json_request_body
)

# async with httpx.AsyncClient(timeout=300) as client:
# LOGGER.info(f"Running async go point sampling for surface: {surface_name}")

# perf_metrics.record_lap("prepare_call")

# response: httpx.Response = await client.post(url=SERVICE_ENDPOINT, json=json_request_body)

perf_metrics.record_lap("main-call")

json_data: bytes = response.content
response_body = _PointSamplingResponseBody.model_validate_json(json_data)

response_body = _PointSamplingResponseBody.model_validate_json(json_data)
perf_metrics.set_metric("inner-go-call", response_body.calculationTime_ms)

perf_metrics.record_lap("validate-response")

# Replace values above the undefLimit with np.nan
for res in response_body.sampleResultArr:
values_np = np.asarray(res.sampledValues)
res.sampledValues = np.where(
(values_np < response_body.undefLimit), values_np, np.nan
).tolist()

perf_metrics.record_lap("post-process")
LOGGER.debug(
f"------------------ batch_sample_surface_in_points_async() took: {perf_metrics.to_string_s()}"
)
Expand Down

0 comments on commit e209c3e

Please sign in to comment.