Skip to content

Commit

Permalink
feat: updated mogpr process and notebooks
Browse files Browse the repository at this point in the history
JanssenBrm committed Feb 29, 2024
1 parent d39f01e commit cd31704
Showing 5 changed files with 172 additions and 197 deletions.
161 changes: 79 additions & 82 deletions notebooks/OpenEO/FuseTS - MOGPR Multi Source Fusion.ipynb

Large diffs are not rendered by default.

186 changes: 82 additions & 104 deletions notebooks/OpenEO/FuseTS - MOGPR S1 and S2.ipynb

Large diffs are not rendered by default.

10 changes: 5 additions & 5 deletions src/fusets/openeo/mogpr_udf.py
Original file line number Diff line number Diff line change
@@ -4,7 +4,7 @@
from pathlib import Path
from typing import Dict

from openeo.metadata import Band, CollectionMetadata
from openeo.metadata import CollectionMetadata
from openeo.udf import XarrayDataCube, inspect


@@ -42,10 +42,10 @@ def write_gpy_cfg():


def apply_metadata(metadata: CollectionMetadata, context: dict) -> CollectionMetadata:
extra_bands = [Band(f"{x}_STD", None, None) for x in metadata.bands]
inspect(data=metadata, message="MOGPR metadata")
for band in extra_bands:
metadata = metadata.append_band(band)
# extra_bands = [Band(f"{x}_STD", None, None) for x in metadata.bands]
# inspect(data=metadata, message="MOGPR metadata")
# for band in extra_bands:
# metadata = metadata.append_band(band)
return metadata


4 changes: 2 additions & 2 deletions src/fusets/openeo/services/mogpr.json
Original file line number Diff line number Diff line change
@@ -21,7 +21,7 @@
"from_parameter": "data"
},
"runtime": "Python",
"udf": "import os\nimport sys\nfrom configparser import ConfigParser\nfrom pathlib import Path\nfrom typing import Dict\n\nfrom openeo.udf import XarrayDataCube\n\n\ndef load_venv():\n \"\"\"\n Add the virtual environment to the system path if the folder `/tmp/venv_static` exists\n :return:\n \"\"\"\n for venv_path in [\"tmp/venv\", \"tmp/venv_static\"]:\n if Path(venv_path).exists():\n sys.path.insert(0, venv_path)\n\n\ndef set_home(home):\n os.environ[\"HOME\"] = home\n\n\ndef create_gpy_cfg():\n home = os.getenv(\"HOME\")\n set_home(\"/tmp\")\n user_file = Path.home() / \".config\" / \"GPy\" / \"user.cfg\"\n if not user_file.exists():\n user_file.parent.mkdir(parents=True, exist_ok=True)\n return user_file, home\n\n\ndef write_gpy_cfg():\n user_file, home = create_gpy_cfg()\n config = ConfigParser()\n config[\"plotting\"] = {\"library\": \"none\"}\n with open(user_file, \"w\") as cfg:\n config.write(cfg)\n cfg.close()\n return home\n\n\ndef apply_datacube(cube: XarrayDataCube, context: Dict) -> XarrayDataCube:\n \"\"\"\n Apply mogpr integration to a datacube.\n MOGPR requires a full timeseries for multiple bands, so it needs to be invoked in the context of an apply_neighborhood process.\n @param cube:\n @param context:\n @return:\n \"\"\"\n load_venv()\n home = write_gpy_cfg()\n\n from fusets.mogpr import mogpr\n\n variables = context.get(\"variables\")\n time_dimension = context.get(\"time_dimension\", \"t\")\n prediction_period = context.get(\"prediction_period\", \"5D\")\n include_uncertainties = context.get(\"include_uncertainties\", False)\n\n dims = cube.get_array().dims\n result = mogpr(\n cube.get_array().to_dataset(dim=\"bands\"),\n variables=variables,\n time_dimension=time_dimension,\n prediction_period=prediction_period,\n include_uncertainties=include_uncertainties\n )\n result_dc = XarrayDataCube(result.to_array(dim=\"bands\").transpose(*dims))\n set_home(home)\n return result_dc\n\n\ndef load_mogpr_udf() -> str:\n \"\"\"\n Loads an openEO udf that applies mogpr.\n @return:\n \"\"\"\n import os\n\n return Path(os.path.realpath(__file__)).read_text()\n"
"udf": "import os\nimport sys\nfrom configparser import ConfigParser\nfrom pathlib import Path\nfrom typing import Dict\n\nfrom openeo.metadata import CollectionMetadata\nfrom openeo.udf import XarrayDataCube, inspect\n\n\ndef load_venv():\n \"\"\"\n Add the virtual environment to the system path if the folder `/tmp/venv_static` exists\n :return:\n \"\"\"\n for venv_path in [\"tmp/venv\", \"tmp/venv_static\"]:\n if Path(venv_path).exists():\n sys.path.insert(0, venv_path)\n\n\ndef set_home(home):\n os.environ[\"HOME\"] = home\n\n\ndef create_gpy_cfg():\n home = os.getenv(\"HOME\")\n set_home(\"/tmp\")\n user_file = Path.home() / \".config\" / \"GPy\" / \"user.cfg\"\n if not user_file.exists():\n user_file.parent.mkdir(parents=True, exist_ok=True)\n return user_file, home\n\n\ndef write_gpy_cfg():\n user_file, home = create_gpy_cfg()\n config = ConfigParser()\n config[\"plotting\"] = {\"library\": \"none\"}\n with open(user_file, \"w\") as cfg:\n config.write(cfg)\n cfg.close()\n return home\n\n\ndef apply_metadata(metadata: CollectionMetadata, context: dict) -> CollectionMetadata:\n # extra_bands = [Band(f\"{x}_STD\", None, None) for x in metadata.bands]\n # inspect(data=metadata, message=\"MOGPR metadata\")\n # for band in extra_bands:\n # metadata = metadata.append_band(band)\n return metadata\n\n\ndef apply_datacube(cube: XarrayDataCube, context: Dict) -> XarrayDataCube:\n \"\"\"\n Apply mogpr integration to a datacube.\n MOGPR requires a full timeseries for multiple bands, so it needs to be invoked in the context of an apply_neighborhood process.\n @param cube:\n @param context:\n @return:\n \"\"\"\n load_venv()\n home = write_gpy_cfg()\n\n from fusets.mogpr import mogpr\n\n variables = context.get(\"variables\")\n time_dimension = context.get(\"time_dimension\", \"t\")\n prediction_period = context.get(\"prediction_period\", \"5D\")\n include_uncertainties = context.get(\"include_uncertainties\", False)\n\n dims = cube.get_array().dims\n result = mogpr(\n cube.get_array().to_dataset(dim=\"bands\"),\n variables=variables,\n time_dimension=time_dimension,\n prediction_period=prediction_period,\n include_uncertainties=include_uncertainties,\n )\n result_dc = XarrayDataCube(result.to_array(dim=\"bands\").transpose(*dims))\n inspect(data=result_dc, message=\"MOGPR result\")\n set_home(home)\n return result_dc\n\n\ndef load_mogpr_udf() -> str:\n \"\"\"\n Loads an openEO udf that applies mogpr.\n @return:\n \"\"\"\n import os\n\n return Path(os.path.realpath(__file__)).read_text()\n"
},
"result": true
}
@@ -45,7 +45,7 @@
},
"id": "mogpr",
"summary": "Integrates timeseries in data cube using multi-output gaussian process regression.",
"description": "# Multi output gaussian process regression\n\n## Description\n\nCompute an integrated timeseries based on multiple inputs.\nFor instance, combine Sentinel-2 NDVI with Sentinel-1 RVI into one integrated NDVI.\n\n## Usage\n\nUsage examples for the MOGPR process.\n\n### Python\n\nThis code example highlights the usage of the MOGPR process in an OpenEO batch job.\nThe result of this batch job will consist of individual GeoTIFF files per date.\nGenerating multiple GeoTIFF files as output is only possible in a batch job.\n\n```python\nimport openeo\n\n## Setup of parameters\nminx, miny, maxx, maxy = (15.179421073198585, 45.80924633589998, 15.185336903822831, 45.81302555710934)\nspat_ext = dict(west=minx, east=maxx, north=maxy, south=miny, crs=4326)\ntemp_ext = [\"2021-01-01\", \"2021-12-31\"]\n\n## Setup connection to openEO\nconnection = openeo.connect(\"openeo.vito.be\").authenticate_oidc()\nservice = 'mogpr'\nnamespace = 'u:fusets'\n\n## Creation of the base NDVI data cube upon which the mogpr is executed\ns2 = connection.load_collection('SENTINEL2_L2A_SENTINELHUB',\n spatial_extent=spat_ext,\n temporal_extent=temp_ext,\n bands=[\"B04\", \"B08\", \"SCL\"])\ns2 = s2.process(\"mask_scl_dilation\", data=s2, scl_band_name=\"SCL\")\nbase_ndvi = s2.ndvi(red=\"B04\", nir=\"B08\", target_band='NDVI').band('NDVI')\n\n## Creation mogpr data cube\nmogpr = connection.datacube_from_process(service,\n namespace=f'https://openeo.vito.be/openeo/1.1/processes/{namespace}/{service}',\n data=base_ndvi)\n## Calculate the average time series value for the given area of interest\nmogpr = mogpr.aggregate_spatial(spat_ext, reducer='mean')\n\n## Execute the service through an openEO batch job\nmogpr_job = mogpr.execute_batch('./mogpr.json', out_format=\"json\",\n title=f'FuseTS - MOGPR', job_options={\n 'udf-dependency-archives': [\n 'https://artifactory.vgt.vito.be:443/artifactory/auxdata-public/ai4food/fusets_venv.zip#tmp/venv',\n 'https://artifactory.vgt.vito.be:443/artifactory/auxdata-public/ai4food/fusets.zip#tmp/venv_static'\n ]\n })\n```\n\n## Limitations\n\nThe spatial extent is limited to a maximum size equal to a Sentinel-2 MGRS tile (100 km x 100 km).\n\n## Configuration & Resource Usage\n\nRun configurations for different ROI/TOI with memory requirements and estimated run durations.\n\n### Synchronous calls\n\nTODO: Replace with actual measurements!!!\n\n| Spatial extent | Run duration |\n|----------------|--------------|\n| 100 m x 100 m | 1 minute |\n| 500m x 500 m | 1 minute |\n| 1 km x 1 km | 1 minute |\n| 5 km x 5 km | 2 minutes |\n| 10 km x 10 km | 3 minutes |\n| 50 km x 50 km | 9 minutes |\n\nThe maximum duration of a synchronous run is 15 minutes.\nFor long running computations, you can use batch jobs.\n\n### Batch jobs\n\nTODO: Replace with actual measurements!!!\n\n| Spatial extent | Temporal extent | Executor memory | Run duration |\n|-----------------|-----------------|-----------------|--------------|\n| 100 m x 100 m | 1 month | default | 7 minutes |\n| 500 m x 100 m | 1 month | default | 7 minutes |\n| 1 km x 1 km | 1 month | default | 7 minutes |\n| 5 km x 5 km | 1 month | default | 10 minutes |\n| 10 km x 10 km | 1 month | default | 11 minutes |\n| 50 km x 50 km | 1 month | 6 GB | 20 minutes |\n| 100 km x 100 km | 1 month | 7 GB | 34 minutes |\n| 100m x 100 m | 7 months | default | 10 minutes |\n| 500 m x 500 m | 7 months | default | 10 minutes |\n| 1 km x 1 km | 7 months | default | 14 minutes |\n| 5 km x 5 km | 7 months | default | 14 minutes |\n| 10 km x 10 km | 7 months | default | 19 minutes |\n| 50 km x 50 km | 7 months | 6 GB | 45 minutes |\n| 100 km x 100 km | 7 months | 8 GB | 65 minutes |\n\nThe executor memory defaults to 5 GB. You can increase the executor memory by specifying it as a job option, eg:\n\n```python\njob = cube.execute_batch(out_format=\"GTIFF\", job_options={\"executor-memory\": \"7g\"})\n```\n",
"description": "# Multi output gaussian process regression\n\n## Description\n\nCompute an integrated timeseries based on multiple inputs.\nFor instance, combine Sentinel-2 NDVI with Sentinel-1 RVI into one integrated NDVI.\n\n## Usage\n\nUsage examples for the MOGPR process.\n\n### Python\n\nThis code example highlights the usage of the MOGPR process in an OpenEO batch job.\nThe result of this batch job will consist of individual GeoTIFF files per date.\nGenerating multiple GeoTIFF files as output is only possible in a batch job.\n\n```python\nimport openeo\n\n## Setup of parameters\nminx, miny, maxx, maxy = (15.179421073198585, 45.80924633589998, 15.185336903822831, 45.81302555710934)\nspat_ext = dict(west=minx, east=maxx, north=maxy, south=miny, crs=4326)\ntemp_ext = [\"2021-01-01\", \"2021-12-31\"]\n\n## Setup connection to openEO\nconnection = openeo.connect(\"openeo.vito.be\").authenticate_oidc()\nservice = 'mogpr'\nnamespace = 'u:fusets'\n\n## Creation of the base NDVI data cube upon which the mogpr is executed\ns2 = connection.load_collection('SENTINEL2_L2A_SENTINELHUB',\n spatial_extent=spat_ext,\n temporal_extent=temp_ext,\n bands=[\"B04\", \"B08\", \"SCL\"])\ns2 = s2.process(\"mask_scl_dilation\", data=s2, scl_band_name=\"SCL\")\nbase_ndvi = s2.ndvi(red=\"B04\", nir=\"B08\", target_band='NDVI').band('NDVI')\n\n## Creation mogpr data cube\nmogpr = connection.datacube_from_process(service,\n namespace=f'https://openeo.vito.be/openeo/1.1/processes/{namespace}/{service}',\n data=base_ndvi, include_uncertainties=True)\n## Calculate the average time series value for the given area of interest\nmogpr = mogpr.aggregate_spatial(spat_ext, reducer='mean')\n\n## Execute the service through an openEO batch job\nmogpr_job = mogpr.execute_batch('./mogpr.json', out_format=\"json\",\n title=f'FuseTS - MOGPR', job_options={\n 'udf-dependency-archives': [\n 'https://artifactory.vgt.vito.be:443/artifactory/auxdata-public/ai4food/fusets_venv.zip#tmp/venv',\n 'https://artifactory.vgt.vito.be:443/artifactory/auxdata-public/ai4food/fusets.zip#tmp/venv_static'\n ]\n })\n```\n\n## Limitations\n\nThe spatial extent is limited to a maximum size equal to a Sentinel-2 MGRS tile (100 km x 100 km).\n\n## Configuration & Resource Usage\n\nRun configurations for different ROI/TOI with memory requirements and estimated run durations.\n\n### Synchronous calls\n\nTODO: Replace with actual measurements!!!\n\n| Spatial extent | Run duration |\n|----------------|--------------|\n| 100 m x 100 m | 1 minute |\n| 500m x 500 m | 1 minute |\n| 1 km x 1 km | 1 minute |\n| 5 km x 5 km | 2 minutes |\n| 10 km x 10 km | 3 minutes |\n| 50 km x 50 km | 9 minutes |\n\nThe maximum duration of a synchronous run is 15 minutes.\nFor long running computations, you can use batch jobs.\n\n### Batch jobs\n\nTODO: Replace with actual measurements!!!\n\n| Spatial extent | Temporal extent | Executor memory | Run duration |\n|-----------------|-----------------|-----------------|--------------|\n| 100 m x 100 m | 1 month | default | 7 minutes |\n| 500 m x 100 m | 1 month | default | 7 minutes |\n| 1 km x 1 km | 1 month | default | 7 minutes |\n| 5 km x 5 km | 1 month | default | 10 minutes |\n| 10 km x 10 km | 1 month | default | 11 minutes |\n| 50 km x 50 km | 1 month | 6 GB | 20 minutes |\n| 100 km x 100 km | 1 month | 7 GB | 34 minutes |\n| 100m x 100 m | 7 months | default | 10 minutes |\n| 500 m x 500 m | 7 months | default | 10 minutes |\n| 1 km x 1 km | 7 months | default | 14 minutes |\n| 5 km x 5 km | 7 months | default | 14 minutes |\n| 10 km x 10 km | 7 months | default | 19 minutes |\n| 50 km x 50 km | 7 months | 6 GB | 45 minutes |\n| 100 km x 100 km | 7 months | 8 GB | 65 minutes |\n\nThe executor memory defaults to 5 GB. You can increase the executor memory by specifying it as a job option, eg:\n\n```python\njob = cube.execute_batch(out_format=\"GTIFF\", job_options={\"executor-memory\": \"7g\"})\n```\n",
"parameters": [
{
"name": "data",
8 changes: 4 additions & 4 deletions src/fusets/openeo/services/publish_mogpr.py
Original file line number Diff line number Diff line change
@@ -72,7 +72,7 @@ def generate_mogpr_cube(
input_cube,
lambda data: data.run_udf(
udf=load_mogpr_udf(),
runtime="Python-Jep",
runtime="Python",
context={"include_uncertainties": get_context_value(include_uncertainties)},
),
size=[
@@ -92,7 +92,7 @@ def generate_mogpr_udp():
"include_uncertainties", "Flag to include the uncertainties in the output results", False
)

mogpr = generate_mogpr_cube()
mogpr = generate_mogpr_cube(input_cube=input_cube, include_uncertainties=include_uncertainties)

return publish_service(
id="mogpr",
@@ -104,5 +104,5 @@ def generate_mogpr_udp():


if __name__ == "__main__":
execute_udf()
# generate_mogpr_udp()
# execute_udf()
generate_mogpr_udp()

0 comments on commit cd31704

Please sign in to comment.