Skip to content

Commit

Permalink
ENH: Export inplace volumes table as parquet
Browse files Browse the repository at this point in the history
  • Loading branch information
mferrera committed Dec 19, 2024
1 parent d52aef6 commit a1d8b4c
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 19 deletions.
10 changes: 4 additions & 6 deletions src/fmu/dataio/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

import numpy as np
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import xtgeo
import yaml

Expand Down Expand Up @@ -117,12 +119,8 @@ def export_file(
)
obj.to_csv(file, index=False)
elif file_suffix == ".parquet":
from pyarrow import Table

if isinstance(obj, Table):
from pyarrow import output_stream, parquet

parquet.write_table(obj, where=output_stream(file))
if isinstance(obj, pa.Table):
pq.write_table(obj, where=pa.output_stream(file))

elif file_suffix == ".json":
if isinstance(obj, FaultRoomSurface):
Expand Down
6 changes: 5 additions & 1 deletion src/fmu/dataio/export/rms/inplace_volumes.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import numpy as np
import pandas as pd
import pyarrow as pa

import fmu.dataio as dio
from fmu.dataio._logging import null_logger
Expand Down Expand Up @@ -248,7 +249,10 @@ def _export_volume_table(self) -> ExportResult:
rep_include=False,
table_index=_enums.InplaceVolumes.index_columns(),
)
absolute_export_path = edata.export(self._dataframe)

volume_table = pa.Table.from_pandas(self._dataframe)
absolute_export_path = edata.export(volume_table)

_logger.debug("Volume result to: %s", absolute_export_path)
return ExportResult(
items=[
Expand Down
6 changes: 2 additions & 4 deletions src/fmu/dataio/providers/objectdata/_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@
from typing import TYPE_CHECKING, Final

import pandas as pd
import pyarrow as pa
import xtgeo

from fmu.dataio._definitions import ExportFolder, ValidFormats
Expand Down Expand Up @@ -150,10 +151,7 @@ def objectdata_provider_factory(
return FaultRoomSurfaceProvider(obj=obj, dataio=dataio)
if isinstance(obj, dict):
return DictionaryDataProvider(obj=obj, dataio=dataio)

from pyarrow import Table

if isinstance(obj, Table):
if isinstance(obj, pa.Table):
return ArrowTableDataProvider(obj=obj, dataio=dataio)

raise NotImplementedError(f"This data type is not currently supported: {type(obj)}")
Expand Down
30 changes: 22 additions & 8 deletions tests/test_export_rms/test_export_rms_volumetrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import jsonschema
import numpy as np
import pandas as pd
import pyarrow.parquet as pq
import pytest

import fmu.dataio as dataio
Expand Down Expand Up @@ -139,7 +140,12 @@ def test_convert_table_from_legacy_to_standard_format(

# check that the exported table is equal to the expected
out = instance._export_volume_table()
exported_table = pd.read_csv(out.items[0].absolute_path)
# Note that using `read_parquet()` more than once in the same pytest module causes
# errors due to an issue in pandas registering a type extension globally on every
# invocation. This is probably a pandas bug.
# https://github.com/apache/arrow/issues/41857
exported_table = pd.read_parquet(out.items[0].absolute_path)

pd.testing.assert_frame_equal(voltable_standard, exported_table)

# check that the fluid column exists and contains oil and gas
Expand Down Expand Up @@ -310,12 +316,12 @@ def test_rms_volumetrics_export_function(
)
vol_table_file = result.items[0].absolute_path

absoulte_path = (
absolute_path = (
rmssetup_with_fmuconfig.parent.parent
/ "share/results/tables/volumes/geogrid.csv"
/ "share/results/tables/volumes/geogrid.parquet"
)

assert vol_table_file == absoulte_path
assert vol_table_file == absolute_path

assert Path(vol_table_file).is_file()
metadata = dataio.read_metadata(vol_table_file)
Expand All @@ -335,8 +341,12 @@ def test_inplace_volumes_payload_validates_against_model(
model."""

out = exportvolumetrics._export_volume_table()
with open(out.items[0].absolute_path) as f:
df = pd.read_csv(f).replace(np.nan, None).to_dict(orient="records")
df = (
pq.read_table(out.items[0].absolute_path)
.to_pandas()
.replace(np.nan, None)
.to_dict(orient="records")
)
InplaceVolumesResult.model_validate(df) # Throws if invalid


Expand All @@ -349,8 +359,12 @@ def test_inplace_volumes_payload_validates_against_schema(
schema."""

out = exportvolumetrics._export_volume_table()
with open(out.items[0].absolute_path) as f:
df = pd.read_csv(f).replace(np.nan, None).to_dict(orient="records")
df = (
pq.read_table(out.items[0].absolute_path)
.to_pandas()
.replace(np.nan, None)
.to_dict(orient="records")
)
jsonschema.validate(instance=df, schema=dump()) # Throws if invalid


Expand Down

0 comments on commit a1d8b4c

Please sign in to comment.