Skip to content

Commit

Permalink
Rft (#445)
Browse files Browse the repository at this point in the history
  • Loading branch information
HansKallekleiv authored Oct 31, 2023
1 parent 2e19f7a commit a18e3c4
Show file tree
Hide file tree
Showing 17 changed files with 715 additions and 0 deletions.
2 changes: 2 additions & 0 deletions backend/src/backend/primary/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from .routers.surface_polygons.router import router as surface_polygons_router
from .routers.graph.router import router as graph_router
from .routers.observations.router import router as observations_router
from .routers.rft.router import router as rft_router

logging.basicConfig(
level=logging.WARNING,
Expand Down Expand Up @@ -64,6 +65,7 @@ def custom_generate_unique_id(route: APIRoute) -> str:
app.include_router(surface_polygons_router, prefix="/surface_polygons", tags=["surface_polygons"])
app.include_router(graph_router, prefix="/graph", tags=["graph"])
app.include_router(observations_router, prefix="/observations", tags=["observations"])
app.include_router(rft_router, prefix="/rft", tags=["rft"])

authHelper = AuthHelper()
app.include_router(authHelper.router)
Expand Down
52 changes: 52 additions & 0 deletions backend/src/backend/primary/routers/rft/router.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import logging
from typing import Annotated

import pyarrow as pa
import pyarrow.compute as pc
from fastapi import APIRouter, Depends, HTTPException, Query

from src.backend.auth.auth_helper import AuthHelper
from src.services.summary_vector_statistics import compute_vector_statistics
from src.services.sumo_access.generic_types import EnsembleScalarResponse
from src.services.sumo_access.parameter_access import ParameterAccess
from src.services.sumo_access.rft_access import RftAccess
from src.services.utils.authenticated_user import AuthenticatedUser

from . import schemas

LOGGER = logging.getLogger(__name__)

router = APIRouter()


@router.get("/rft_info")
async def get_rft_info(
authenticated_user: Annotated[AuthenticatedUser, Depends(AuthHelper.get_authenticated_user)],
case_uuid: Annotated[str, Query(description="Sumo case uuid")],
ensemble_name: Annotated[str, Query(description="Ensemble name")],
) -> list[schemas.RftInfo]:
access = await RftAccess.from_case_uuid(authenticated_user.get_sumo_access_token(), case_uuid, ensemble_name)
rft_well_list = await access.get_rft_info()

return rft_well_list


@router.get("/realization_data")
async def get_realization_data(
authenticated_user: Annotated[AuthenticatedUser, Depends(AuthHelper.get_authenticated_user)],
case_uuid: Annotated[str, Query(description="Sumo case uuid")],
ensemble_name: Annotated[str, Query(description="Ensemble name")],
well_name: Annotated[str, Query(description="Well name")],
response_name: Annotated[str, Query(description="Response name")],
timestamps_utc_ms: Annotated[list[int] | None, Query(description="Timestamps utc ms")] = None,
realizations: Annotated[list[int] | None, Query(description="Realizations")] = None,
) -> list[schemas.RftRealizationData]:
access = await RftAccess.from_case_uuid(authenticated_user.get_sumo_access_token(), case_uuid, ensemble_name)
data = await access.get_rft_well_realization_data(
well_name=well_name,
response_name=response_name,
timestamps_utc_ms=timestamps_utc_ms,
realizations=realizations,
)

return data
14 changes: 14 additions & 0 deletions backend/src/backend/primary/routers/rft/schemas.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from pydantic import BaseModel


class RftInfo(BaseModel):
well_name: str
timestamps_utc_ms: list[int]


class RftRealizationData(BaseModel):
well_name: str
realization: int
timestamp_utc_ms: int
depth_arr: list[float]
value_arr: list[float]
176 changes: 176 additions & 0 deletions backend/src/services/sumo_access/rft_access.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
import logging
from typing import List, Optional, Sequence
from io import BytesIO

import pandas as pd
import pyarrow as pa
import pyarrow.compute as pc
import pyarrow.parquet as pq
from fmu.sumo.explorer.objects import Case, TableCollection

from ._helpers import SumoEnsemble
from ..utils.perf_timer import PerfTimer
from .rft_types import RftInfo, RftRealizationData

LOGGER = logging.getLogger(__name__)


class RftAccess(SumoEnsemble):
async def get_rft_info(self) -> list[RftInfo]:
table = await get_concatenated_rft_table(self._case, self._iteration_name, column_names=["PRESSURE"])
rft_well_infos: list[RftInfo] = []
well_names = table["WELL"].unique().tolist()

for well_name in well_names:
well_table = table.filter(pc.equal(table["WELL"], well_name))
timestamps_utc_ms = sorted(list(set(well_table["DATE"].to_numpy().astype(int).tolist())))

rft_well_infos.append(RftInfo(well_name=well_name, timestamps_utc_ms=timestamps_utc_ms))

return rft_well_infos

async def get_rft_well_realization_data(
self,
well_name: str,
response_name: str,
timestamps_utc_ms: Optional[int],
realizations: Optional[Sequence[int]],
) -> List[RftRealizationData]:
column_names = [response_name, "DEPTH"]
table = await self.get_rft_table(
well_names=[well_name],
column_names=column_names,
timestamps_utc_ms=timestamps_utc_ms,
realizations=realizations,
)
pandas_table = table.to_pandas(types_mapper=pd.ArrowDtype)

ret_arr: List[RftRealizationData] = []

for real, real_df in pandas_table.groupby("REAL"):
for datetime, date_df in real_df.groupby("DATE"):
ret_arr.append(
RftRealizationData(
well_name=well_name,
realization=real,
timestamp_utc_ms=datetime.timestamp() * 1000,
depth_arr=date_df["DEPTH"],
value_arr=date_df[response_name],
)
)

return ret_arr

async def get_rft_table(
self,
well_names: List[str],
column_names: List[str],
timestamps_utc_ms: Optional[int],
realizations: Optional[Sequence[int]],
) -> pa.table:
table = await get_concatenated_rft_table(self._case, self._iteration_name, column_names)

if realizations is not None:
mask = pc.is_in(table["REAL"], value_set=pa.array(realizations))
table = table.filter(mask)
mask = pc.is_in(table["WELL"], value_set=pa.array(well_names))
table = table.filter(mask)
if timestamps_utc_ms is not None:
mask = pc.is_in(table["DATE"], value_set=pa.array(timestamps_utc_ms))
table = table.filter(mask)

return table


async def get_concatenated_rft_table(case: Case, iteration_name: str, column_names: List[str]) -> pa.Table:
concatenated_table = None
for column_name in column_names:
table = await _load_arrow_table_for_from_sumo(case, iteration_name, column_name=column_name)

if concatenated_table is None:
concatenated_table = table
else:
concatenated_table = concatenated_table.append_column(column_name, table[column_name])

return concatenated_table


async def _load_arrow_table_for_from_sumo(case: Case, iteration_name: str, column_name: str) -> Optional[pa.Table]:
timer = PerfTimer()

rft_table_collection = await get_rft_table_collection(case, iteration_name, column_name=column_name)
if await rft_table_collection.length_async() == 0:
return None
if await rft_table_collection.length_async() > 1:
raise ValueError(f"Multiple tables found for vector {column_name=}")

sumo_table = await rft_table_collection.getitem_async(0)
# print(f"{sumo_table.format=}")
et_locate_sumo_table_ms = timer.lap_ms()

# Now, read as an arrow table
# Note!!!
# The tables we have seen so far have format set to 'arrow', but the actual data is in parquet format.
# This must be a bug or a misunderstanding.
# For now, just read the parquet data into an arrow table
byte_stream: BytesIO = await sumo_table.blob_async
table = pq.read_table(byte_stream)
et_download_arrow_table_ms = timer.lap_ms()

# Verify that we got the expected columns
if not "DATE" in table.column_names:
raise ValueError("Table does not contain a DATE column")
if not "REAL" in table.column_names:
raise ValueError("Table does not contain a REAL column")
if not column_name in table.column_names:
raise ValueError(f"Table does not contain a {column_name} column")
if table.num_columns != 4:
raise ValueError("Table should contain exactly 4 columns")

# Verify that we got the expected columns
if sorted(table.column_names) != sorted(["DATE", "REAL", "WELL", column_name]):
raise ValueError(f"Unexpected columns in table {table.column_names=}")

# Verify that the column datatypes are as we expect
schema = table.schema
if schema.field("DATE").type != pa.timestamp("ms"):
raise ValueError(f"Unexpected type for DATE column {schema.field('DATE').type=}")
if schema.field("REAL").type != pa.int16():
raise ValueError(f"Unexpected type for REAL column {schema.field('REAL').type=}")
if schema.field(column_name).type != pa.float32():
raise ValueError(f"Unexpected type for {column_name} column {schema.field(column_name).type=}")

LOGGER.debug(
f"Loaded arrow table from Sumo in: {timer.elapsed_ms()}ms ("
f"locate_sumo_table={et_locate_sumo_table_ms}ms, "
f"download_arrow_table={et_download_arrow_table_ms}ms) "
f"{column_name=} {table.shape=}"
)

return table


async def get_rft_table_collection(
case: Case, iteration_name: str, column_name: Optional[str] = None
) -> TableCollection:
"""Get a collection of rft tables for a case and iteration"""
rft_table_collection = case.tables.filter(
aggregation="collection",
tagname="rft",
iteration=iteration_name,
)
table_names = await rft_table_collection.names_async
print(table_names)
rft_table_collection = case.tables.filter(
aggregation="collection",
tagname="rft",
iteration=iteration_name,
column=column_name,
)
table_names = await rft_table_collection.names_async
if len(table_names) == 0:
raise ValueError("No rft table collections found")
if len(table_names) == 1:
return rft_table_collection

raise ValueError(f"Multiple rft table collections found: {table_names}. Expected only one.")
27 changes: 27 additions & 0 deletions backend/src/services/sumo_access/rft_types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from enum import Enum

from pydantic import BaseModel


class RftColumnNames(str, Enum):
WELL = "WELL"
DEPTH = "DEPTH"
PRESSURE = "PRESSURE"


class RftSumoTableSchema(BaseModel):
tagname: str
column_names: list[str]


class RftInfo(BaseModel):
well_name: str
timestamps_utc_ms: list[int]


class RftRealizationData(BaseModel):
well_name: str
realization: int
timestamp_utc_ms: int
depth_arr: list[float]
value_arr: list[float]
3 changes: 3 additions & 0 deletions frontend/src/api/ApiService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import { InplaceVolumetricsService } from './services/InplaceVolumetricsService'
import { ObservationsService } from './services/ObservationsService';
import { ParametersService } from './services/ParametersService';
import { PvtService } from './services/PvtService';
import { RftService } from './services/RftService';
import { SeismicService } from './services/SeismicService';
import { SurfaceService } from './services/SurfaceService';
import { SurfacePolygonsService } from './services/SurfacePolygonsService';
Expand All @@ -32,6 +33,7 @@ export class ApiService {
public readonly observations: ObservationsService;
public readonly parameters: ParametersService;
public readonly pvt: PvtService;
public readonly rft: RftService;
public readonly seismic: SeismicService;
public readonly surface: SurfaceService;
public readonly surfacePolygons: SurfacePolygonsService;
Expand Down Expand Up @@ -62,6 +64,7 @@ export class ApiService {
this.observations = new ObservationsService(this.request);
this.parameters = new ParametersService(this.request);
this.pvt = new PvtService(this.request);
this.rft = new RftService(this.request);
this.seismic = new SeismicService(this.request);
this.surface = new SurfaceService(this.request);
this.surfacePolygons = new SurfacePolygonsService(this.request);
Expand Down
3 changes: 3 additions & 0 deletions frontend/src/api/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@ export type { InplaceVolumetricsTableMetaData as InplaceVolumetricsTableMetaData
export type { Observations as Observations_api } from './models/Observations';
export type { PolygonData as PolygonData_api } from './models/PolygonData';
export type { PvtData as PvtData_api } from './models/PvtData';
export type { RftInfo as RftInfo_api } from './models/RftInfo';
export type { RftObservation as RftObservation_api } from './models/RftObservation';
export type { RftObservations as RftObservations_api } from './models/RftObservations';
export type { RftRealizationData as RftRealizationData_api } from './models/RftRealizationData';
export type { SeismicCubeMeta as SeismicCubeMeta_api } from './models/SeismicCubeMeta';
export { SensitivityType as SensitivityType_api } from './models/SensitivityType';
export { StatisticFunction as StatisticFunction_api } from './models/StatisticFunction';
Expand Down Expand Up @@ -69,6 +71,7 @@ export { InplaceVolumetricsService } from './services/InplaceVolumetricsService'
export { ObservationsService } from './services/ObservationsService';
export { ParametersService } from './services/ParametersService';
export { PvtService } from './services/PvtService';
export { RftService } from './services/RftService';
export { SeismicService } from './services/SeismicService';
export { SurfaceService } from './services/SurfaceService';
export { SurfacePolygonsService } from './services/SurfacePolygonsService';
Expand Down
9 changes: 9 additions & 0 deletions frontend/src/api/models/RftInfo.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
/* istanbul ignore file */
/* tslint:disable */
/* eslint-disable */

export type RftInfo = {
well_name: string;
timestamps_utc_ms: Array<number>;
};

12 changes: 12 additions & 0 deletions frontend/src/api/models/RftRealizationData.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/* istanbul ignore file */
/* tslint:disable */
/* eslint-disable */

export type RftRealizationData = {
well_name: string;
realization: number;
timestamp_utc_ms: number;
depth_arr: Array<number>;
value_arr: Array<number>;
};

Loading

0 comments on commit a18e3c4

Please sign in to comment.