From a8a0807c60aee925203bf2477ddd26ae4782f808 Mon Sep 17 00:00:00 2001 From: Emile Sonneveld Date: Wed, 23 Oct 2024 10:30:32 +0200 Subject: [PATCH 1/5] Test with wait_till_path_available for significant timeouts. --- openeogeotrellis/deploy/batch_job.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/openeogeotrellis/deploy/batch_job.py b/openeogeotrellis/deploy/batch_job.py index 1b148caf..b62279cc 100644 --- a/openeogeotrellis/deploy/batch_job.py +++ b/openeogeotrellis/deploy/batch_job.py @@ -356,11 +356,10 @@ def run_job( ml_model_metadata = result.get_model_metadata(str(output_file)) logger.info("Extracted ml model metadata from %s" % output_file) for name, asset in the_assets_metadata.items(): - # TODO: test in separate branch - # if not asset.get("href").lower().startswith("s3:/"): - # # fusemount could have some delay to make files accessible, so poll a bit: - # asset_path = get_abs_path_of_asset(asset["href"], job_dir) - # wait_till_path_available(asset_path) + if not asset.get("href").lower().startswith("s3:/"): + # fusemount could have some delay to make files accessible, so poll a bit: + asset_path = get_abs_path_of_asset(asset["href"], job_dir) + wait_till_path_available(asset_path) add_permissions(Path(asset["href"]), stat.S_IWGRP) logger.info(f"wrote {len(the_assets_metadata)} assets to {output_file}") assets_metadata.append(the_assets_metadata) From a3e9be6ab7abe21ffcf2c4f044d8f4ffa54163e2 Mon Sep 17 00:00:00 2001 From: Emile Sonneveld Date: Wed, 23 Oct 2024 15:55:19 +0200 Subject: [PATCH 2/5] Use fast_sleep to accelerate tests that intentionally have missing files. https://github.com/Open-EO/openeo-geotrellis-extensions/issues/329 --- tests/conftest.py | 42 +++++++++++++++++++++++++++++----- tests/deploy/test_batch_job.py | 8 +++---- 2 files changed, 39 insertions(+), 11 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index 5e22d708..28350efa 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,16 +1,18 @@ -import os -from typing import Union - -import sys -from pathlib import Path +from unittest import mock import boto3 +import contextlib import flask -import pytest import moto import moto.server +import os +import pytest +import sys +import time_machine +import typing import requests_mock from _pytest.terminal import TerminalReporter +from pathlib import Path from openeo_driver.backend import OpenEoBackendImplementation, UserDefinedProcesses from openeo_driver.jobregistry import ElasticJobRegistry, JobRegistryInterface @@ -180,6 +182,34 @@ def api_version(request): return request.param +class _Sleeper: + def __init__(self): + self.history = [] + + @contextlib.contextmanager + def patch(self, time_machine: time_machine.TimeMachineFixture) -> typing.Iterator["_Sleeper"]: + def sleep(seconds): + # Note: this requires that `time_machine.move_to()` has been called before + # also see https://github.com/adamchainz/time-machine/issues/247 + time_machine.coordinates.shift(seconds) + self.history.append(seconds) + + with mock.patch("time.sleep", new=sleep): + yield self + + def did_sleep(self) -> bool: + return len(self.history) > 0 + + +@pytest.fixture +def fast_sleep(time_machine) -> typing.Iterator[_Sleeper]: + """ + Fixture using `time_machine` to make `sleep` instant and update the current time. + """ + with _Sleeper().patch(time_machine=time_machine) as sleeper: + yield sleeper + + @pytest.fixture def udf_noop(): file_name = get_test_data_file("udf_noop.py") diff --git a/tests/deploy/test_batch_job.py b/tests/deploy/test_batch_job.py index a70fcc4b..4436aa6a 100644 --- a/tests/deploy/test_batch_job.py +++ b/tests/deploy/test_batch_job.py @@ -391,7 +391,7 @@ def test_run_job(evaluate, tmp_path): @mock.patch("openeo_driver.ProcessGraphDeserializer.evaluate") -def test_run_job_get_projection_extension_metadata(evaluate, tmp_path): +def test_run_job_get_projection_extension_metadata(evaluate, tmp_path, fast_sleep): cube_mock = MagicMock() job_dir = tmp_path / "job-402" @@ -517,9 +517,7 @@ def test_run_job_get_projection_extension_metadata(evaluate, tmp_path): @mock.patch("openeo_driver.ProcessGraphDeserializer.evaluate") -def test_run_job_get_projection_extension_metadata_all_assets_same_epsg_and_bbox( - evaluate, tmp_path -): +def test_run_job_get_projection_extension_metadata_all_assets_same_epsg_and_bbox(evaluate, tmp_path, fast_sleep): """When there are two raster assets with the same projection metadata, it should put those metadata at the level of the item instead of the individual bands. """ @@ -959,7 +957,7 @@ def test_run_job_get_projection_extension_metadata_assets_with_different_epsg( @mock.patch("openeo_driver.ProcessGraphDeserializer.evaluate") -def test_run_job_get_projection_extension_metadata_job_dir_is_relative_path(evaluate): +def test_run_job_get_projection_extension_metadata_job_dir_is_relative_path(evaluate, fast_sleep): cube_mock = MagicMock() # job dir should be a relative path, # We still want the test data to be cleaned up though, so we need to use From 9abd90cf5ee7004ee31483cc067a9a07889d1790 Mon Sep 17 00:00:00 2001 From: Emile Sonneveld Date: Wed, 23 Oct 2024 16:43:47 +0200 Subject: [PATCH 3/5] Try fix against AttributeError: 'NoneType' object has no attribute 'shift' --- tests/conftest.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tests/conftest.py b/tests/conftest.py index 28350efa..93528195 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,3 +1,4 @@ +from datetime import datetime from unittest import mock import boto3 @@ -181,7 +182,7 @@ def _setup_local_spark(out: TerminalReporter, verbosity=0): def api_version(request): return request.param - +# TODO: Deduplicate code with openeo-python-driver class _Sleeper: def __init__(self): self.history = [] @@ -206,6 +207,8 @@ def fast_sleep(time_machine) -> typing.Iterator[_Sleeper]: """ Fixture using `time_machine` to make `sleep` instant and update the current time. """ + now = datetime.now().isoformat() + time_machine.move_to(now) with _Sleeper().patch(time_machine=time_machine) as sleeper: yield sleeper From 874e473c83b78b64994e6cd837887be12ea5e4be Mon Sep 17 00:00:00 2001 From: Emile Sonneveld Date: Wed, 23 Oct 2024 17:33:06 +0200 Subject: [PATCH 4/5] More robust check on href type. --- openeogeotrellis/deploy/batch_job.py | 7 +++++-- tests/conftest.py | 2 +- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/openeogeotrellis/deploy/batch_job.py b/openeogeotrellis/deploy/batch_job.py index b62279cc..ac80eaa7 100644 --- a/openeogeotrellis/deploy/batch_job.py +++ b/openeogeotrellis/deploy/batch_job.py @@ -356,9 +356,12 @@ def run_job( ml_model_metadata = result.get_model_metadata(str(output_file)) logger.info("Extracted ml model metadata from %s" % output_file) for name, asset in the_assets_metadata.items(): - if not asset.get("href").lower().startswith("s3:/"): + href = asset["href"] + url = urlparse(href) + if url.scheme in ["", "file"]: + file_path = url.path # fusemount could have some delay to make files accessible, so poll a bit: - asset_path = get_abs_path_of_asset(asset["href"], job_dir) + asset_path = get_abs_path_of_asset(file_path, job_dir) wait_till_path_available(asset_path) add_permissions(Path(asset["href"]), stat.S_IWGRP) logger.info(f"wrote {len(the_assets_metadata)} assets to {output_file}") diff --git a/tests/conftest.py b/tests/conftest.py index 93528195..24b519ae 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -182,7 +182,7 @@ def _setup_local_spark(out: TerminalReporter, verbosity=0): def api_version(request): return request.param -# TODO: Deduplicate code with openeo-python-driver +# TODO: Deduplicate code with openeo-python-client class _Sleeper: def __init__(self): self.history = [] From 594a80ebacafc06591ddcde1d5363cf38cc42e7c Mon Sep 17 00:00:00 2001 From: Emile Sonneveld Date: Wed, 23 Oct 2024 18:50:24 +0200 Subject: [PATCH 5/5] more optimistic retry --- openeogeotrellis/utils.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/openeogeotrellis/utils.py b/openeogeotrellis/utils.py index 4bfe11c5..d3b345f4 100644 --- a/openeogeotrellis/utils.py +++ b/openeogeotrellis/utils.py @@ -773,13 +773,13 @@ def to_jsonable(x): def wait_till_path_available(path: Path): retry = 0 - max_tries = 5 + max_tries = 20 # Almost 2 minutes while not os.path.exists(path): if retry < max_tries: retry += 1 - seconds = int(math.pow(2, retry + 2)) # exponential backoff + seconds = 5 logger.info(f"Waiting for path to be available. Try {retry}/{max_tries} (sleep:{seconds}seconds): {path}") time.sleep(seconds) else: logger.warning(f"Path is not available after {max_tries} tries: {path}") - return # TODO: Throw error instead + return # TODO: Throw error instead?