Skip to content

Commit

Permalink
Issue #683/#681 align additional/job_options argument in create_job, …
Browse files Browse the repository at this point in the history
…download, ...
  • Loading branch information
soxofaan committed Dec 10, 2024
1 parent a811bff commit ec66b31
Show file tree
Hide file tree
Showing 9 changed files with 336 additions and 26 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

- Automatically use `load_url` when providing a URL as geometries to `DataCube.aggregate_spatial()`, `DataCube.mask_polygon()`, etc. ([#104](https://github.com/Open-EO/openeo-python-client/issues/104), [#457](https://github.com/Open-EO/openeo-python-client/issues/457))
- Allow specifying `limit` when listing batch jobs with `Connection.list_jobs()` ([#677](https://github.com/Open-EO/openeo-python-client/issues/677))
- Add `additional` and `job_options` arguments to `Connection.download()`, `Datacube.download()` and related ([#681](https://github.com/Open-EO/openeo-python-client/issues/681))

### Changed

Expand All @@ -20,6 +21,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
Instead, if it is a local GeoJSON file, the GeoJSON data will be loaded directly client-side.
([#104](https://github.com/Open-EO/openeo-python-client/issues/104), [#457](https://github.com/Open-EO/openeo-python-client/issues/457))
- Move `read()` method from general `JobDatabaseInterface` to more specific `FullDataFrameJobDatabase` ([#680](https://github.com/Open-EO/openeo-python-client/issues/680))
- Align `additional` and `job_options` arguments in `Connection.create_job()`, `DataCube.create_job()` and related.
Also, follow official spec more closely. ([#683](https://github.com/Open-EO/openeo-python-client/issues/683), [Open-EO/openeo-api#276](https://github.com/Open-EO/openeo-api/issues/276))

### Removed

Expand Down
27 changes: 26 additions & 1 deletion openeo/rest/_testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,17 @@ class DummyBackend:
"""

# TODO: move to openeo.testing
# TODO: unify "batch_jobs", "batch_jobs_full" and "extra_job_metadata_fields"?
# TODO: unify "sync_requests" and "sync_requests_full"?

__slots__ = (
"_requests_mock",
"connection",
"file_formats",
"sync_requests",
"sync_requests_full",
"batch_jobs",
"batch_jobs_full",
"validation_requests",
"next_result",
"next_validation_errors",
Expand All @@ -60,7 +64,9 @@ def __init__(
self.connection = connection
self.file_formats = {"input": {}, "output": {}}
self.sync_requests = []
self.sync_requests_full = []
self.batch_jobs = {}
self.batch_jobs_full = {}
self.validation_requests = []
self.next_result = self.DEFAULT_RESULT
self.next_validation_errors = []
Expand Down Expand Up @@ -163,7 +169,9 @@ def setup_file_format(self, name: str, type: str = "output", gis_data_types: Ite

def _handle_post_result(self, request, context):
"""handler of `POST /result` (synchronous execute)"""
pg = request.json()["process"]["process_graph"]
post_data = request.json()
pg = post_data["process"]["process_graph"]
self.sync_requests_full.append(post_data)
self.sync_requests.append(pg)
result = self.next_result
if isinstance(result, (dict, list)):
Expand All @@ -185,6 +193,10 @@ def _handle_post_jobs(self, request, context):
job_id = f"job-{len(self.batch_jobs):03d}"
assert job_id not in self.batch_jobs

# Full post data dump
self.batch_jobs_full[job_id] = post_data

# Batch job essentials
job_data = {"job_id": job_id, "pg": pg, "status": "created"}
for field in ["title", "description"]:
if field in post_data:
Expand Down Expand Up @@ -272,6 +284,11 @@ def get_sync_pg(self) -> dict:
assert len(self.sync_requests) == 1
return self.sync_requests[0]

def get_sync_post_data(self) -> dict:
"""Get post data of the one and only synchronous job."""
assert len(self.sync_requests_full) == 1
return self.sync_requests_full[0]

def get_batch_pg(self) -> dict:
"""
Get process graph of the one and only batch job.
Expand All @@ -280,6 +297,14 @@ def get_batch_pg(self) -> dict:
assert len(self.batch_jobs) == 1
return self.batch_jobs[max(self.batch_jobs.keys())]["pg"]

def get_batch_post_data(self) -> dict:
"""
Get post data of the one and only batch job.
Fails when there is none or more than one.
"""
assert len(self.batch_jobs_full) == 1
return self.batch_jobs_full[max(self.batch_jobs_full.keys())]

def get_validation_pg(self) -> dict:
"""
Get process graph of the one and only validation request.
Expand Down
59 changes: 52 additions & 7 deletions openeo/rest/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -1644,6 +1644,8 @@ def upload_file(
def _build_request_with_process_graph(
self,
process_graph: Union[dict, FlatGraphableMixin, str, Path, List[FlatGraphableMixin]],
additional: Optional[dict] = None,
job_options: Optional[dict] = None,
**kwargs,
) -> dict:
"""
Expand All @@ -1655,6 +1657,15 @@ def _build_request_with_process_graph(
if any(c != self for c in connections):
raise OpenEoClientException(f"Mixing different connections: {self} and {connections}.")
result = kwargs

if additional:
result.update(additional)
if job_options is not None:
# Note: this "job_options" top-level property is not in official openEO API spec,
# but a commonly used convention, e.g. in openeo-python-driver based deployments.
assert "job_options" not in result
result["job_options"] = job_options

process_graph = as_flat_graph(process_graph)
if "process_graph" not in process_graph:
process_graph = {"process_graph": process_graph}
Expand Down Expand Up @@ -1702,6 +1713,8 @@ def download(
timeout: Optional[int] = None,
validate: Optional[bool] = None,
chunk_size: int = DEFAULT_DOWNLOAD_CHUNK_SIZE,
additional: Optional[dict] = None,
job_options: Optional[dict] = None,
) -> Union[None, bytes]:
"""
Downloads the result of a process graph synchronously,
Expand All @@ -1715,8 +1728,16 @@ def download(
:param validate: Optional toggle to enable/prevent validation of the process graphs before execution
(overruling the connection's ``auto_validate`` setting).
:param chunk_size: chunk size for streaming response.
:param additional: additional (top-level) properties to set in the request body
:param job_options: dictionary of job options to pass to the backend
(under top-level property "job_options")
.. versionadded:: 0.36.0
Added arguments ``additional`` and ``job_options``.
"""
pg_with_metadata = self._build_request_with_process_graph(process_graph=graph)
pg_with_metadata = self._build_request_with_process_graph(
process_graph=graph, additional=additional, job_options=job_options
)
self._preflight_validation(pg_with_metadata=pg_with_metadata, validate=validate)
response = self.post(
path="/result",
Expand All @@ -1740,6 +1761,8 @@ def execute(
timeout: Optional[int] = None,
validate: Optional[bool] = None,
auto_decode: bool = True,
additional: Optional[dict] = None,
job_options: Optional[dict] = None,
) -> Union[dict, requests.Response]:
"""
Execute a process graph synchronously and return the result. If the result is a JSON object, it will be parsed.
Expand All @@ -1749,10 +1772,18 @@ def execute(
:param validate: Optional toggle to enable/prevent validation of the process graphs before execution
(overruling the connection's ``auto_validate`` setting).
:param auto_decode: Boolean flag to enable/disable automatic JSON decoding of the response. Defaults to True.
:param additional: additional (top-level) properties to set in the request body
:param job_options: dictionary of job options to pass to the backend
(under top-level property "job_options")
:return: parsed JSON response as a dict if auto_decode is True, otherwise response object
.. versionadded:: 0.36.0
Added arguments ``additional`` and ``job_options``.
"""
pg_with_metadata = self._build_request_with_process_graph(process_graph=process_graph)
pg_with_metadata = self._build_request_with_process_graph(
process_graph=process_graph, additional=additional, job_options=job_options
)
self._preflight_validation(pg_with_metadata=pg_with_metadata, validate=validate)
response = self.post(
path="/result",
Expand All @@ -1779,6 +1810,7 @@ def create_job(
plan: Optional[str] = None,
budget: Optional[float] = None,
additional: Optional[dict] = None,
job_options: Optional[dict] = None,
validate: Optional[bool] = None,
) -> BatchJob:
"""
Expand All @@ -1795,23 +1827,27 @@ def create_job(
:param plan: The billing plan to process and charge the job with
:param budget: Maximum budget to be spent on executing the job.
Note that some backends do not honor this limit.
:param additional: additional job options to pass to the backend
:param additional: additional (top-level) properties to set in the request body
:param job_options: dictionary of job options to pass to the backend
(under top-level property "job_options")
:param validate: Optional toggle to enable/prevent validation of the process graphs before execution
(overruling the connection's ``auto_validate`` setting).
:return: Created job
.. versionchanged:: 0.35.0
Add :ref:`multi-result support <multi-result-process-graphs>`.
.. versionadded:: 0.36.0
Added argument ``job_options``.
"""
# TODO move all this (BatchJob factory) logic to BatchJob?

pg_with_metadata = self._build_request_with_process_graph(
process_graph=process_graph,
additional=additional,
job_options=job_options,
**dict_no_none(title=title, description=description, plan=plan, budget=budget)
)
if additional:
# TODO: get rid of this non-standard field? https://github.com/Open-EO/openeo-api/issues/276
pg_with_metadata["job_options"] = additional

self._preflight_validation(pg_with_metadata=pg_with_metadata, validate=validate)
response = self.post("/jobs", json=pg_with_metadata, expected_status=201)
Expand Down Expand Up @@ -1871,9 +1907,12 @@ def load_disk_collection(
def as_curl(
self,
data: Union[dict, DataCube, FlatGraphableMixin],
*,
path="/result",
method="POST",
obfuscate_auth: bool = False,
additional: Optional[dict] = None,
job_options: Optional[dict] = None,
) -> str:
"""
Build curl command to evaluate given process graph or data cube
Expand All @@ -1891,14 +1930,20 @@ def as_curl(
or ``"/jobs"`` for batch jobs
:param method: HTTP method to use (typically ``"POST"``)
:param obfuscate_auth: don't show actual bearer token
:param additional: additional (top-level) properties to set in the request body
:param job_options: dictionary of job options to pass to the backend
(under top-level property "job_options")
:return: curl command as a string
.. versionadded:: 0.36.0
Added arguments ``additional`` and ``job_options``.
"""
cmd = ["curl", "-i", "-X", method]
cmd += ["-H", "Content-Type: application/json"]
if isinstance(self.auth, BearerAuth):
cmd += ["-H", f"Authorization: Bearer {'...' if obfuscate_auth else self.auth.bearer}"]
pg_with_metadata = self._build_request_with_process_graph(data)
pg_with_metadata = self._build_request_with_process_graph(data, additional=additional, job_options=job_options)
if path == "/validation":
pg_with_metadata = pg_with_metadata["process"]
post_json = json.dumps(pg_with_metadata, separators=(",", ":"))
Expand Down
35 changes: 30 additions & 5 deletions openeo/rest/datacube.py
Original file line number Diff line number Diff line change
Expand Up @@ -2329,6 +2329,8 @@ def download(
*,
validate: Optional[bool] = None,
auto_add_save_result: bool = True,
additional: Optional[dict] = None,
job_options: Optional[dict] = None,
) -> Union[None, bytes]:
"""
Execute synchronously and download the raster data cube, e.g. as GeoTIFF.
Expand All @@ -2342,11 +2344,17 @@ def download(
:param validate: Optional toggle to enable/prevent validation of the process graphs before execution
(overruling the connection's ``auto_validate`` setting).
:param auto_add_save_result: Automatically add a ``save_result`` node to the process graph if there is none yet.
:param additional: additional (top-level) properties to set in the request body
:param job_options: dictionary of job options to pass to the backend
(under top-level property "job_options")
:return: None if the result is stored to disk, or a bytes object returned by the backend.
.. versionchanged:: 0.32.0
Added ``auto_add_save_result`` option
.. versionadded:: 0.36.0
Added arguments ``additional`` and ``job_options``.
"""
# TODO #278 centralize download/create_job/execute_job logic in DataCube, VectorCube, MlModel, ...
cube = self
Expand All @@ -2359,7 +2367,9 @@ def download(
default_format=self._DEFAULT_RASTER_FORMAT,
method="DataCube.download()",
)
return self._connection.download(cube.flat_graph(), outputfile, validate=validate)
return self._connection.download(
cube.flat_graph(), outputfile, validate=validate, additional=additional, job_options=job_options
)

def validate(self) -> List[dict]:
"""
Expand Down Expand Up @@ -2463,6 +2473,7 @@ def execute_batch(
print: typing.Callable[[str], None] = print,
max_poll_interval: float = 60,
connection_retry_interval: float = 30,
additional: Optional[dict] = None,
job_options: Optional[dict] = None,
validate: Optional[bool] = None,
auto_add_save_result: bool = True,
Expand All @@ -2477,13 +2488,18 @@ def execute_batch(
:param outputfile: The path of a file to which a result can be written
:param out_format: (optional) File format to use for the job result.
:param job_options:
:param additional: additional (top-level) properties to set in the request body
:param job_options: dictionary of job options to pass to the backend
(under top-level property "job_options")
:param validate: Optional toggle to enable/prevent validation of the process graphs before execution
(overruling the connection's ``auto_validate`` setting).
:param auto_add_save_result: Automatically add a ``save_result`` node to the process graph if there is none yet.
.. versionchanged:: 0.32.0
Added ``auto_add_save_result`` option
.. versionadded:: 0.36.0
Added argument ``additional``.
"""
# TODO: start showing deprecation warnings about these inconsistent argument names
if "format" in format_options and not out_format:
Expand All @@ -2506,6 +2522,7 @@ def execute_batch(
description=description,
plan=plan,
budget=budget,
additional=additional,
job_options=job_options,
validate=validate,
auto_add_save_result=False,
Expand All @@ -2523,6 +2540,7 @@ def create_job(
description: Optional[str] = None,
plan: Optional[str] = None,
budget: Optional[float] = None,
additional: Optional[dict] = None,
job_options: Optional[dict] = None,
validate: Optional[bool] = None,
auto_add_save_result: bool = True,
Expand All @@ -2543,15 +2561,20 @@ def create_job(
:param plan: The billing plan to process and charge the job with
:param budget: Maximum budget to be spent on executing the job.
Note that some backends do not honor this limit.
:param job_options: custom job options.
:param additional: additional (top-level) properties to set in the request body
:param job_options: dictionary of job options to pass to the backend
(under top-level property "job_options")
:param validate: Optional toggle to enable/prevent validation of the process graphs before execution
(overruling the connection's ``auto_validate`` setting).
:param auto_add_save_result: Automatically add a ``save_result`` node to the process graph if there is none yet.
:return: Created job.
.. versionchanged:: 0.32.0
.. versionadded:: 0.32.0
Added ``auto_add_save_result`` option
.. versionadded:: 0.36.0
Added ``additional`` argument.
"""
# TODO: add option to also automatically start the job?
# TODO: avoid using all kwargs as format_options
Expand All @@ -2572,7 +2595,8 @@ def create_job(
plan=plan,
budget=budget,
validate=validate,
additional=job_options,
additional=additional,
job_options=job_options,
)

send_job = legacy_alias(create_job, name="send_job", since="0.10.0")
Expand Down Expand Up @@ -2617,6 +2641,7 @@ def execute(self, *, validate: Optional[bool] = None, auto_decode: bool = True)
:return: parsed JSON response as a dict if auto_decode is True, otherwise response object
"""
# TODO: deprecated this. It's ill-defined how to "execute" a data cube without downloading it.
return self._connection.execute(self.flat_graph(), validate=validate, auto_decode=auto_decode)

@staticmethod
Expand Down
Loading

0 comments on commit ec66b31

Please sign in to comment.