diff --git a/openeogeotrellis/deploy/batch_job.py b/openeogeotrellis/deploy/batch_job.py index 1b148caf..ac80eaa7 100644 --- a/openeogeotrellis/deploy/batch_job.py +++ b/openeogeotrellis/deploy/batch_job.py @@ -356,11 +356,13 @@ def run_job( ml_model_metadata = result.get_model_metadata(str(output_file)) logger.info("Extracted ml model metadata from %s" % output_file) for name, asset in the_assets_metadata.items(): - # TODO: test in separate branch - # if not asset.get("href").lower().startswith("s3:/"): - # # fusemount could have some delay to make files accessible, so poll a bit: - # asset_path = get_abs_path_of_asset(asset["href"], job_dir) - # wait_till_path_available(asset_path) + href = asset["href"] + url = urlparse(href) + if url.scheme in ["", "file"]: + file_path = url.path + # fusemount could have some delay to make files accessible, so poll a bit: + asset_path = get_abs_path_of_asset(file_path, job_dir) + wait_till_path_available(asset_path) add_permissions(Path(asset["href"]), stat.S_IWGRP) logger.info(f"wrote {len(the_assets_metadata)} assets to {output_file}") assets_metadata.append(the_assets_metadata) diff --git a/openeogeotrellis/utils.py b/openeogeotrellis/utils.py index 4bfe11c5..d3b345f4 100644 --- a/openeogeotrellis/utils.py +++ b/openeogeotrellis/utils.py @@ -773,13 +773,13 @@ def to_jsonable(x): def wait_till_path_available(path: Path): retry = 0 - max_tries = 5 + max_tries = 20 # Almost 2 minutes while not os.path.exists(path): if retry < max_tries: retry += 1 - seconds = int(math.pow(2, retry + 2)) # exponential backoff + seconds = 5 logger.info(f"Waiting for path to be available. Try {retry}/{max_tries} (sleep:{seconds}seconds): {path}") time.sleep(seconds) else: logger.warning(f"Path is not available after {max_tries} tries: {path}") - return # TODO: Throw error instead + return # TODO: Throw error instead? diff --git a/tests/conftest.py b/tests/conftest.py index 5e22d708..24b519ae 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,16 +1,19 @@ -import os -from typing import Union - -import sys -from pathlib import Path +from datetime import datetime +from unittest import mock import boto3 +import contextlib import flask -import pytest import moto import moto.server +import os +import pytest +import sys +import time_machine +import typing import requests_mock from _pytest.terminal import TerminalReporter +from pathlib import Path from openeo_driver.backend import OpenEoBackendImplementation, UserDefinedProcesses from openeo_driver.jobregistry import ElasticJobRegistry, JobRegistryInterface @@ -179,6 +182,36 @@ def _setup_local_spark(out: TerminalReporter, verbosity=0): def api_version(request): return request.param +# TODO: Deduplicate code with openeo-python-client +class _Sleeper: + def __init__(self): + self.history = [] + + @contextlib.contextmanager + def patch(self, time_machine: time_machine.TimeMachineFixture) -> typing.Iterator["_Sleeper"]: + def sleep(seconds): + # Note: this requires that `time_machine.move_to()` has been called before + # also see https://github.com/adamchainz/time-machine/issues/247 + time_machine.coordinates.shift(seconds) + self.history.append(seconds) + + with mock.patch("time.sleep", new=sleep): + yield self + + def did_sleep(self) -> bool: + return len(self.history) > 0 + + +@pytest.fixture +def fast_sleep(time_machine) -> typing.Iterator[_Sleeper]: + """ + Fixture using `time_machine` to make `sleep` instant and update the current time. + """ + now = datetime.now().isoformat() + time_machine.move_to(now) + with _Sleeper().patch(time_machine=time_machine) as sleeper: + yield sleeper + @pytest.fixture def udf_noop(): diff --git a/tests/deploy/test_batch_job.py b/tests/deploy/test_batch_job.py index a70fcc4b..4436aa6a 100644 --- a/tests/deploy/test_batch_job.py +++ b/tests/deploy/test_batch_job.py @@ -391,7 +391,7 @@ def test_run_job(evaluate, tmp_path): @mock.patch("openeo_driver.ProcessGraphDeserializer.evaluate") -def test_run_job_get_projection_extension_metadata(evaluate, tmp_path): +def test_run_job_get_projection_extension_metadata(evaluate, tmp_path, fast_sleep): cube_mock = MagicMock() job_dir = tmp_path / "job-402" @@ -517,9 +517,7 @@ def test_run_job_get_projection_extension_metadata(evaluate, tmp_path): @mock.patch("openeo_driver.ProcessGraphDeserializer.evaluate") -def test_run_job_get_projection_extension_metadata_all_assets_same_epsg_and_bbox( - evaluate, tmp_path -): +def test_run_job_get_projection_extension_metadata_all_assets_same_epsg_and_bbox(evaluate, tmp_path, fast_sleep): """When there are two raster assets with the same projection metadata, it should put those metadata at the level of the item instead of the individual bands. """ @@ -959,7 +957,7 @@ def test_run_job_get_projection_extension_metadata_assets_with_different_epsg( @mock.patch("openeo_driver.ProcessGraphDeserializer.evaluate") -def test_run_job_get_projection_extension_metadata_job_dir_is_relative_path(evaluate): +def test_run_job_get_projection_extension_metadata_job_dir_is_relative_path(evaluate, fast_sleep): cube_mock = MagicMock() # job dir should be a relative path, # We still want the test data to be cleaned up though, so we need to use