diff --git a/backend_py/primary/primary/services/surface_query_service/surface_query_service.py b/backend_py/primary/primary/services/surface_query_service/surface_query_service.py index 179072b94..0f38c4f93 100644 --- a/backend_py/primary/primary/services/surface_query_service/surface_query_service.py +++ b/backend_py/primary/primary/services/surface_query_service/surface_query_service.py @@ -9,7 +9,9 @@ from sumo.wrapper import SumoClient from primary import config -from primary.services.sumo_access.sumo_blob_access import get_sas_token_and_blob_store_base_uri_for_case +from primary.services.sumo_access.sumo_blob_access import ( + get_sas_token_and_blob_store_base_uri_for_case, +) from webviz_pkg.core_utils.perf_metrics import PerfMetrics @@ -55,30 +57,60 @@ async def batch_sample_surface_in_points_async( x_coords: list[float], y_coords: list[float], ) -> List[RealizationSampleResult]: - + perf_metrics = PerfMetrics() + sumo_client = SumoClient( + env=config.SUMO_ENV, token=sumo_access_token, interactive=False + ) + perf_metrics.record_lap("create sumo-client") realization_object_ids = await _get_object_uuids_for_surface_realizations( - sumo_access_token=sumo_access_token, + sumo_client=sumo_client, + case_uuid=case_uuid, + iteration_name=iteration_name, + surface_name=surface_name, + surface_attribute=surface_attribute, + realizations=realizations, + ) + perf_metrics.record_lap("get-obj-ids-using-explorer") + + realization_object_ids_using_es = await get_surface_blob_uuids_es( + sumo_client=sumo_client, case_uuid=case_uuid, iteration_name=iteration_name, surface_name=surface_name, surface_attribute=surface_attribute, realizations=realizations, ) - - perf_metrics.record_lap("get-obj-ids") - - sas_token, blob_store_base_uri = get_sas_token_and_blob_store_base_uri_for_case(sumo_access_token, case_uuid) + perf_metrics.record_lap("get-obj-ids-using-es") + + ## Check that they are the same by sorting on realization and checking length and blob ids + realization_object_ids.sort(key=lambda x: x.realization) + realization_object_ids_using_es.sort(key=lambda x: x.realization) + + assert len(realization_object_ids) == len(realization_object_ids_using_es) + for i in range(len(realization_object_ids)): + assert ( + realization_object_ids[i].realization + == realization_object_ids_using_es[i].realization + ) + assert ( + realization_object_ids[i].objectUuid + == realization_object_ids_using_es[i].objectUuid + ) + + sas_token, blob_store_base_uri = get_sas_token_and_blob_store_base_uri_for_case( + sumo_access_token, case_uuid + ) - #LOGGER.debug(f"Token: {sas_token}") + # LOGGER.debug(f"Token: {sas_token}") perf_metrics.record_lap("sas-token") request_body = _PointSamplingRequestBody( sasToken=sas_token, blobStoreBaseUri=blob_store_base_uri, - objectIds=realization_object_ids, + objectIds=realization_object_ids_using_es, xCoords=x_coords, yCoords=y_coords, ) @@ -87,10 +119,12 @@ async def batch_sample_surface_in_points_async( async with httpx.AsyncClient(timeout=300) as client: LOGGER.info(f"Running async go point sampling for surface: {surface_name}") - + perf_metrics.record_lap("prepare_call") - - response: httpx.Response = await client.post(url=SERVICE_ENDPOINT, json=json_request_body) + + response: httpx.Response = await client.post( + url=SERVICE_ENDPOINT, json=json_request_body + ) perf_metrics.record_lap("main-call") @@ -99,26 +133,29 @@ async def batch_sample_surface_in_points_async( perf_metrics.set_metric("inner-go-call", response_body.calculationTime_ms) - # Replace values above the undefLimit with np.nan for res in response_body.sampleResultArr: values_np = np.asarray(res.sampledValues) - res.sampledValues = np.where((values_np < response_body.undefLimit), values_np, np.nan).tolist() + res.sampledValues = np.where( + (values_np < response_body.undefLimit), values_np, np.nan + ).tolist() - LOGGER.debug(f"------------------ batch_sample_surface_in_points_async() took: {perf_metrics.to_string_s()}") + LOGGER.debug( + f"------------------ batch_sample_surface_in_points_async() took: {perf_metrics.to_string_s()}" + ) return response_body.sampleResultArr async def _get_object_uuids_for_surface_realizations( - sumo_access_token: str, + sumo_client: SumoClient, case_uuid: str, iteration_name: str, surface_name: str, surface_attribute: str, realizations: Optional[List[int]], ) -> List[_RealizationObjectId]: - sumo_client = SumoClient(env=config.SUMO_ENV, token=sumo_access_token, interactive=False) + case_collection = CaseCollection(sumo_client).filter(uuid=case_uuid) case = await case_collection.getitem_async(0) @@ -141,8 +178,70 @@ async def _get_object_uuids_for_surface_realizations( for obj_meta in object_meta_list: ret_list.append( _RealizationObjectId( - realization=obj_meta["_source"]["fmu"]["realization"]["id"], objectUuid=obj_meta["_id"] + realization=obj_meta["_source"]["fmu"]["realization"]["id"], + objectUuid=obj_meta["_id"], ) ) return ret_list + + +async def get_surface_blob_uuids_es( + sumo_client: SumoClient, + case_uuid: str, + iteration_name: str, + surface_name: str, + surface_attribute: str, + realizations: Optional[List[int]], +) -> List[_RealizationObjectId]: + """Get the blob ids and realization ids for realization surfaces""" + + query: Dict = { + "bool": { + "should": [ + { + "bool": { + "must": [ + {"term": {"_sumo.parent_object.keyword": case_uuid}}, + {"term": {"class.keyword": "surface"}}, + {"term": {"fmu.iteration.name.keyword": iteration_name}}, + {"term": {"fmu.context.stage": "realization"}}, + {"term": {"data.name.keyword": surface_name}}, + {"term": {"data.tagname.keyword": surface_attribute}}, + {"terms": {"fmu.realization.id": realizations}}, + ] + } + }, + ], + "minimum_should_match": 1, + }, + } + aggs = { + "key_combinations": { + "composite": { + "size": 65535, + "sources": [ + {"k_reals": {"terms": {"field": "fmu.realization.id"}}}, + {"k_uuids": {"terms": {"field": "_sumo.blob_name.keyword"}}}, + ], + } + } + } + payload = { + "query": query, + "aggs": aggs, + "size": 0, + } + response = await sumo_client.post_async("/search", json=payload) + + result = response.json() + + aggs = result["aggregations"]["key_combinations"]["buckets"] + realization_object_ids: List[_RealizationObjectId] = [] + for agg in aggs: + realization = agg["key"]["k_reals"] + blob_uuid = agg["key"]["k_uuids"] + realization_object_ids.append( + _RealizationObjectId(realization=realization, objectUuid=blob_uuid) + ) + return realization_object_ids