diff --git a/src/fmu/dataio/_utils.py b/src/fmu/dataio/_utils.py index fde3bdc03..8554938fb 100644 --- a/src/fmu/dataio/_utils.py +++ b/src/fmu/dataio/_utils.py @@ -18,6 +18,7 @@ import pandas as pd import xtgeo import yaml +from pyarrow import Table, output_stream, parquet from fmu.config import utilities as ut @@ -117,11 +118,7 @@ def export_file( ) obj.to_csv(file, index=False) elif file_suffix == ".parquet": - from pyarrow import Table - if isinstance(obj, Table): - from pyarrow import output_stream, parquet - parquet.write_table(obj, where=output_stream(file)) elif file_suffix == ".json": diff --git a/src/fmu/dataio/export/rms/inplace_volumes.py b/src/fmu/dataio/export/rms/inplace_volumes.py index cae505f35..77bd63d02 100644 --- a/src/fmu/dataio/export/rms/inplace_volumes.py +++ b/src/fmu/dataio/export/rms/inplace_volumes.py @@ -7,6 +7,7 @@ import numpy as np import pandas as pd +import pyarrow as pa import fmu.dataio as dio from fmu.dataio._logging import null_logger @@ -248,7 +249,10 @@ def _export_volume_table(self) -> ExportResult: rep_include=False, table_index=_enums.InplaceVolumes.index_columns(), ) - absolute_export_path = edata.export(self._dataframe) + + volume_table = pa.Table.from_pandas(self._dataframe) + absolute_export_path = edata.export(volume_table) + _logger.debug("Volume result to: %s", absolute_export_path) return ExportResult( items=[ diff --git a/src/fmu/dataio/providers/objectdata/_provider.py b/src/fmu/dataio/providers/objectdata/_provider.py index 804aeefc7..a04949791 100644 --- a/src/fmu/dataio/providers/objectdata/_provider.py +++ b/src/fmu/dataio/providers/objectdata/_provider.py @@ -90,6 +90,7 @@ from typing import TYPE_CHECKING, Final import pandas as pd +import pyarrow as pa import xtgeo from fmu.dataio._definitions import ExportFolder, ValidFormats @@ -150,10 +151,7 @@ def objectdata_provider_factory( return FaultRoomSurfaceProvider(obj=obj, dataio=dataio) if isinstance(obj, dict): return DictionaryDataProvider(obj=obj, dataio=dataio) - - from pyarrow import Table - - if isinstance(obj, Table): + if isinstance(obj, pa.Table): return ArrowTableDataProvider(obj=obj, dataio=dataio) raise NotImplementedError(f"This data type is not currently supported: {type(obj)}") diff --git a/tests/test_export_rms/test_export_rms_volumetrics.py b/tests/test_export_rms/test_export_rms_volumetrics.py index 6a7cd4223..4a396499f 100644 --- a/tests/test_export_rms/test_export_rms_volumetrics.py +++ b/tests/test_export_rms/test_export_rms_volumetrics.py @@ -6,6 +6,7 @@ import jsonschema import numpy as np import pandas as pd +import pyarrow.parquet as pq import pytest import fmu.dataio as dataio @@ -139,7 +140,12 @@ def test_convert_table_from_legacy_to_standard_format( # check that the exported table is equal to the expected out = instance._export_volume_table() - exported_table = pd.read_csv(out.items[0].absolute_path) + # Note that using `read_parquet()` more than once in the same pytest module causes + # errors due to an issue in pandas registering a type extension globally on every + # invocation. This is probably a pandas bug. + # https://github.com/apache/arrow/issues/41857 + exported_table = pd.read_parquet(out.items[0].absolute_path) + pd.testing.assert_frame_equal(voltable_standard, exported_table) # check that the fluid column exists and contains oil and gas @@ -310,12 +316,12 @@ def test_rms_volumetrics_export_function( ) vol_table_file = result.items[0].absolute_path - absoulte_path = ( + absolute_path = ( rmssetup_with_fmuconfig.parent.parent - / "share/results/tables/volumes/geogrid.csv" + / "share/results/tables/volumes/geogrid.parquet" ) - assert vol_table_file == absoulte_path + assert vol_table_file == absolute_path assert Path(vol_table_file).is_file() metadata = dataio.read_metadata(vol_table_file) @@ -335,8 +341,12 @@ def test_inplace_volumes_payload_validates_against_model( model.""" out = exportvolumetrics._export_volume_table() - with open(out.items[0].absolute_path) as f: - df = pd.read_csv(f).replace(np.nan, None).to_dict(orient="records") + df = ( + pq.read_table(out.items[0].absolute_path) + .to_pandas() + .replace(np.nan, None) + .to_dict(orient="records") + ) InplaceVolumesResult.model_validate(df) # Throws if invalid @@ -349,8 +359,12 @@ def test_inplace_volumes_payload_validates_against_schema( schema.""" out = exportvolumetrics._export_volume_table() - with open(out.items[0].absolute_path) as f: - df = pd.read_csv(f).replace(np.nan, None).to_dict(orient="records") + df = ( + pq.read_table(out.items[0].absolute_path) + .to_pandas() + .replace(np.nan, None) + .to_dict(orient="records") + ) jsonschema.validate(instance=df, schema=dump()) # Throws if invalid