Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

prototype of udp based job manager #1

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file.
146 changes: 146 additions & 0 deletions src/esa_apex_toolbox/upscaling/udp_job_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
import ast
from pathlib import Path
from typing import Optional

import pandas as pd
import requests
import shapely

import openeo
from openeo.extra.job_management import MultiBackendJobManager


class UDPJobManager(MultiBackendJobManager):
"""
Large area processing for UDP's.

This job manager can run complex workflows without requiring project specific dependencies.
"""

def __init__(self, udp_id:str, udp_namespace:str, fixed_parameters:dict, job_options:dict=None):
super().__init__()
self.largescale_process = None
self._job_options = job_options
self.fixed_parameters = fixed_parameters
self.udp_namespace = udp_namespace
self.udp_id = udp_id
self.dataframe: pd.DataFrame = None

self._parse_udp()

def _parse_udp(self):
self.udp_metadata = requests.get(self.udp_namespace).json()

@property
def job_options(self):
return self._job_options

@job_options.setter
def job_options(self, value):
self._job_options = value

def udp_parameters(self) -> list[dict]:
return self.udp_metadata["parameters"]

def udp_parameter_schema(self, name:str) -> Optional[dict]:
return {p["name"]:p.get("schema",None) for p in self.udp_parameters()}.get(name,None)


def add_jobs(self, jobs_dataframe):
"""
Add jobs to the job manager.

Column names of the dataframe have to match with UDP parameters.

Extra columns names:

- `title` : Title of the job
- `description` : Description of the job

"""
if self.dataframe is None:
self.dataframe = jobs_dataframe
else:
raise ValueError("Jobs already added to the job manager.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this if else raise pattern looks like this could have been a constructor argument


def start_job_thread(self):
"""
Start running the jobs in a separate thread, returns afterwards.
"""

udp_parameter_names = [p["name"] for p in self.udp_parameters()]

geojson_params = [p["name"] for p in self.udp_parameters() if
p.get("schema", {}).get("subtype", "") == "geojson"]


output_file = Path("jobs.csv")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This static file reference should be an argument I guess

if self.dataframe is not None:
df = self._normalize_df(self.dataframe)

def normalize_fixed_param_value(name, value):
if isinstance(value, list) or isinstance(value, tuple):
return len(df) * [value]
else:
return value

new_columns = {
col: normalize_fixed_param_value(col,val) for (col, val) in self.fixed_parameters.items() if col not in df.columns
}
new_columns["udp_id"] = self.udp_id
new_columns["udp_namespace"] = self.udp_namespace
print(new_columns)
df = df.assign(**new_columns)

if len(geojson_params) == 1:
#TODO: this is very limited, expand to include more complex cases:
# - bbox instead of json
if geojson_params[0] not in df.columns:
df.rename_geometry(geojson_params[0],inplace=True)
elif len(geojson_params) > 1:
for p in geojson_params:
if p not in df.columns:
raise ValueError(f"Multiple geojson parameters, but not all are in the dataframe. Missing column: {p}, available columns: {df.columns}")

self._persists(df, output_file)



def start_job(
row: pd.Series,
connection: openeo.Connection,
**kwargs
) -> openeo.BatchJob:

def normalize_param_value(name, value):
schema = self.udp_parameter_schema(name)
if isinstance(value, str) and schema.get("type","") == "array":
return ast.literal_eval( value )
elif isinstance(value, str) and schema.get("subtype","") == "geojson":
#this is a side effect of using csv + renaming geometry column
return shapely.geometry.mapping(shapely.wkt.loads(value))
else:
return value

parameters = {k: normalize_param_value(k,row[k]) for k in udp_parameter_names }



cube = connection.datacube_from_process(row.udp_id,row.udp_namespace, **parameters)

title = row.get("title", f"Subjob {row.udp_id} - {str(parameters)}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use row index instead of str(parameters) in title to avoid extremely large titles?

description = row.get("description", f"Subjob {row.udp_id} - {str(parameters)}")
return cube.create_job(title=title, description=description)



import multiprocessing, time
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these imports can be toplevel I think


def start_running():
self.run_jobs(df=None, start_job=start_job, output_file=output_file)

self.largescale_process = multiprocessing.Process(target=start_running)
self.largescale_process.start()

def stop_job_thread(self):
self.largescale_process.terminate()
32 changes: 32 additions & 0 deletions tests/test_udp_job_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from time import sleep

import openeo
from openeo.extra.udp_job_manager import UDPJobManager

import geopandas as gpd

def test_create_and_start():


params = {
"biopar_type":"FAPAR",
"date":["2023-05-01","2023-05-30"]
}
manager = UDPJobManager("BIOPAR","https://openeo.dataspace.copernicus.eu/openeo/1.1/processes/u:3e24e251-2e9a-438f-90a9-d4500e576574/BIOPAR",fixed_parameters=params)


manager.add_jobs(LAEA_20km() )
manager.add_backend("cdse",connection = openeo.connect("openeo.dataspace.copernicus.eu").authenticate_oidc(), parallel_jobs=1)
manager.start_job_thread()
print("started running")
sleep(20)
manager.stop_job_thread()


def LAEA_20km()->gpd.GeoDataFrame:
countries = gpd.read_file(gpd.datasets.get_path('naturalearth_lowres'), bbox=(4, 50, 5, 52))
df = gpd.read_file("https://artifactory.vgt.vito.be/auxdata-public/grids/LAEA-20km.gpkg",mask=countries)
df = df.head(10)
#udp uses 'geometry' as name for aoi
#df.rename_geometry("polygon")
return df
Loading