Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wait till path available #916

Merged
merged 5 commits into from
Oct 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions openeogeotrellis/deploy/batch_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -356,11 +356,13 @@ def run_job(
ml_model_metadata = result.get_model_metadata(str(output_file))
logger.info("Extracted ml model metadata from %s" % output_file)
for name, asset in the_assets_metadata.items():
# TODO: test in separate branch
# if not asset.get("href").lower().startswith("s3:/"):
# # fusemount could have some delay to make files accessible, so poll a bit:
# asset_path = get_abs_path_of_asset(asset["href"], job_dir)
# wait_till_path_available(asset_path)
href = asset["href"]
url = urlparse(href)
if url.scheme in ["", "file"]:
file_path = url.path
# fusemount could have some delay to make files accessible, so poll a bit:
asset_path = get_abs_path_of_asset(file_path, job_dir)
wait_till_path_available(asset_path)
EmileSonneveld marked this conversation as resolved.
Show resolved Hide resolved
add_permissions(Path(asset["href"]), stat.S_IWGRP)
logger.info(f"wrote {len(the_assets_metadata)} assets to {output_file}")
assets_metadata.append(the_assets_metadata)
Expand Down
6 changes: 3 additions & 3 deletions openeogeotrellis/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -773,13 +773,13 @@ def to_jsonable(x):

def wait_till_path_available(path: Path):
retry = 0
max_tries = 5
max_tries = 20 # Almost 2 minutes
while not os.path.exists(path):
if retry < max_tries:
retry += 1
seconds = int(math.pow(2, retry + 2)) # exponential backoff
seconds = 5
logger.info(f"Waiting for path to be available. Try {retry}/{max_tries} (sleep:{seconds}seconds): {path}")
time.sleep(seconds)
else:
logger.warning(f"Path is not available after {max_tries} tries: {path}")
return # TODO: Throw error instead
return # TODO: Throw error instead?
45 changes: 39 additions & 6 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
import os
from typing import Union

import sys
from pathlib import Path
from datetime import datetime
from unittest import mock

import boto3
import contextlib
import flask
import pytest
import moto
import moto.server
import os
import pytest
import sys
import time_machine
import typing
import requests_mock
from _pytest.terminal import TerminalReporter
from pathlib import Path

from openeo_driver.backend import OpenEoBackendImplementation, UserDefinedProcesses
from openeo_driver.jobregistry import ElasticJobRegistry, JobRegistryInterface
Expand Down Expand Up @@ -179,6 +182,36 @@ def _setup_local_spark(out: TerminalReporter, verbosity=0):
def api_version(request):
return request.param

# TODO: Deduplicate code with openeo-python-client
class _Sleeper:
def __init__(self):
self.history = []

@contextlib.contextmanager
def patch(self, time_machine: time_machine.TimeMachineFixture) -> typing.Iterator["_Sleeper"]:
def sleep(seconds):
# Note: this requires that `time_machine.move_to()` has been called before
# also see https://github.com/adamchainz/time-machine/issues/247
time_machine.coordinates.shift(seconds)
self.history.append(seconds)

with mock.patch("time.sleep", new=sleep):
yield self

def did_sleep(self) -> bool:
return len(self.history) > 0


@pytest.fixture
def fast_sleep(time_machine) -> typing.Iterator[_Sleeper]:
"""
Fixture using `time_machine` to make `sleep` instant and update the current time.
"""
now = datetime.now().isoformat()
time_machine.move_to(now)
with _Sleeper().patch(time_machine=time_machine) as sleeper:
yield sleeper


@pytest.fixture
def udf_noop():
Expand Down
8 changes: 3 additions & 5 deletions tests/deploy/test_batch_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ def test_run_job(evaluate, tmp_path):


@mock.patch("openeo_driver.ProcessGraphDeserializer.evaluate")
def test_run_job_get_projection_extension_metadata(evaluate, tmp_path):
def test_run_job_get_projection_extension_metadata(evaluate, tmp_path, fast_sleep):
cube_mock = MagicMock()

job_dir = tmp_path / "job-402"
Expand Down Expand Up @@ -517,9 +517,7 @@ def test_run_job_get_projection_extension_metadata(evaluate, tmp_path):


@mock.patch("openeo_driver.ProcessGraphDeserializer.evaluate")
def test_run_job_get_projection_extension_metadata_all_assets_same_epsg_and_bbox(
evaluate, tmp_path
):
def test_run_job_get_projection_extension_metadata_all_assets_same_epsg_and_bbox(evaluate, tmp_path, fast_sleep):
"""When there are two raster assets with the same projection metadata, it should put
those metadata at the level of the item instead of the individual bands.
"""
Expand Down Expand Up @@ -959,7 +957,7 @@ def test_run_job_get_projection_extension_metadata_assets_with_different_epsg(


@mock.patch("openeo_driver.ProcessGraphDeserializer.evaluate")
def test_run_job_get_projection_extension_metadata_job_dir_is_relative_path(evaluate):
def test_run_job_get_projection_extension_metadata_job_dir_is_relative_path(evaluate, fast_sleep):
cube_mock = MagicMock()
# job dir should be a relative path,
# We still want the test data to be cleaned up though, so we need to use
Expand Down