diff --git a/src/openeo_aggregator/partitionedjobs/crossbackend.py b/src/openeo_aggregator/partitionedjobs/crossbackend.py index b74850e5..1a33de0b 100644 --- a/src/openeo_aggregator/partitionedjobs/crossbackend.py +++ b/src/openeo_aggregator/partitionedjobs/crossbackend.py @@ -82,6 +82,7 @@ def split_streaming( self, process_graph: FlatPG, get_replacement: GetReplacementCallable = _default_get_replacement, + main_subgraph_id: SubGraphId = "main", ) -> Iterator[Tuple[SubGraphId, SubJob, List[SubGraphId]]]: """ Split given process graph in sub-process graphs and return these as an iterator @@ -113,7 +114,7 @@ def split_streaming( secondary_backends = {b for b in backend_usage if b != primary_backend} _log.info(f"Backend split: {primary_backend=} {secondary_backends=}") - primary_id = "main" + primary_id = main_subgraph_id primary_pg = {} primary_has_load_collection = False primary_dependencies = [] diff --git a/src/openeo_aggregator/partitionedjobs/tracking.py b/src/openeo_aggregator/partitionedjobs/tracking.py index a684699e..129fae2f 100644 --- a/src/openeo_aggregator/partitionedjobs/tracking.py +++ b/src/openeo_aggregator/partitionedjobs/tracking.py @@ -32,6 +32,8 @@ from openeo_aggregator.partitionedjobs.zookeeper import ZooKeeperPartitionedJobDB from openeo_aggregator.utils import _UNSET, Clock, PGWithMetadata, timestamp_to_rfc3339 +PJOB_METADATA_FIELD_RESULT_JOBS = "result_jobs" + _log = logging.getLogger(__name__) @@ -79,12 +81,16 @@ def create_crossbackend_pjob( before we have finalised sub-processgraphs, whose metadata can then be persisted in the ZooKeeperPartitionedJobDB """ # Start with reserving a new partitioned job id based on initial metadata + main_subgraph_id = "main" pjob_node_value = self._db.serialize( user_id=user_id, created=Clock.time(), process=process, metadata=metadata, job_options=job_options, + **{ + PJOB_METADATA_FIELD_RESULT_JOBS: [main_subgraph_id], + }, ) pjob_id = self._db.obtain_new_pjob_id(user_id=user_id, initial_value=pjob_node_value) self._db.set_pjob_status(user_id=user_id, pjob_id=pjob_id, status=STATUS_INSERTED, create=True) @@ -107,7 +113,7 @@ def get_replacement(node_id: str, node: dict, subgraph_id: SubGraphId) -> dict: } for sjob_id, subjob, subjob_dependencies in splitter.split_streaming( - process_graph=process["process_graph"], get_replacement=get_replacement + process_graph=process["process_graph"], get_replacement=get_replacement, main_subgraph_id=main_subgraph_id ): subjobs[sjob_id] = subjob dependencies[sjob_id] = subjob_dependencies @@ -380,9 +386,21 @@ def get_assets(self, user_id: str, pjob_id: str, flask_request: flask.Request) - # TODO: do a sync if latest sync is too long ago? pjob_metadata = self._db.get_pjob_metadata(user_id=user_id, pjob_id=pjob_id) sjobs = self._db.list_subjobs(user_id=user_id, pjob_id=pjob_id) + if pjob_metadata.get(PJOB_METADATA_FIELD_RESULT_JOBS): + result_jobs = set(pjob_metadata[PJOB_METADATA_FIELD_RESULT_JOBS]) + result_sjob_ids = [s for s in sjobs if s in result_jobs] + log_msg = f"Collect {pjob_id} subjob assets: subset {result_sjob_ids} (from {len(sjobs)})" + else: + # Collect results of all subjobs by default + result_sjob_ids = list(sjobs.keys()) + log_msg = f"Collect {pjob_id} subjob assets: all {len(sjobs)})" + assets = [] - with TimingLogger(title=f"Collect assets of {pjob_id} ({len(sjobs)} sub-jobs)", logger=_log): - for sjob_id, sjob_metadata in sjobs.items(): + with TimingLogger(title=log_msg, logger=_log): + for sjob_id in result_sjob_ids: + sjob_metadata = sjobs[sjob_id] + + # TODO: skip subjobs that are just dependencies for a main/grouping job sjob_status = self._db.get_sjob_status(user_id=user_id, pjob_id=pjob_id, sjob_id=sjob_id)["status"] if sjob_status in {STATUS_INSERTED, STATUS_CREATED, STATUS_RUNNING}: raise JobNotFinishedException diff --git a/tests/partitionedjobs/test_api.py b/tests/partitionedjobs/test_api.py index 89f5d691..4a98f595 100644 --- a/tests/partitionedjobs/test_api.py +++ b/tests/partitionedjobs/test_api.py @@ -44,6 +44,7 @@ def __init__(self, date: str): @pytest.fixture def dummy1(backend1, requests_mock) -> DummyBackend: + # TODO: rename this fixture to dummy_backed1 dummy = DummyBackend(requests_mock=requests_mock, backend_url=backend1, job_id_template="1-jb-{i}") dummy.setup_basic_requests_mocks() dummy.register_user(bearer_token=TEST_USER_BEARER_TOKEN, user_id=TEST_USER) @@ -651,6 +652,7 @@ def test_create_job_simple(self, flask_app, api100, zk_db, dummy1): "process": {"process_graph": pg}, "metadata": {}, "job_options": {"split_strategy": "crossbackend"}, + "result_jobs": ["main"], } assert zk_db.get_pjob_status(user_id=TEST_USER, pjob_id=pjob_id) == { @@ -722,6 +724,7 @@ def test_create_job_basic(self, flask_app, api100, zk_db, dummy1): "process": {"process_graph": pg}, "metadata": {}, "job_options": {"split_strategy": "crossbackend"}, + "result_jobs": ["main"], } assert zk_db.get_pjob_status(user_id=TEST_USER, pjob_id=pjob_id) == { @@ -797,3 +800,76 @@ def test_create_job_basic(self, flask_app, api100, zk_db, dummy1): "result": True, }, } + + @now.mock + def test_start_and_job_results(self, flask_app, api100, zk_db, dummy1): + """Run the jobs and get results""" + api100.set_auth_bearer_token(token=TEST_USER_BEARER_TOKEN) + + pg = { + "lc1": {"process_id": "load_collection", "arguments": {"id": "S2"}}, + "lc2": {"process_id": "load_collection", "arguments": {"id": "S2"}}, + "merge": { + "process_id": "merge_cubes", + "arguments": {"cube1": {"from_node": "lc1"}, "cube2": {"from_node": "lc2"}}, + "result": True, + }, + } + + res = api100.post( + "/jobs", + json={ + "process": {"process_graph": pg}, + "job_options": {"split_strategy": "crossbackend"}, + }, + ).assert_status_code(201) + + pjob_id = "pj-20220119-123456" + expected_job_id = f"agg-{pjob_id}" + assert res.headers["OpenEO-Identifier"] == expected_job_id + + res = api100.get(f"/jobs/{expected_job_id}").assert_status_code(200) + assert res.json == { + "id": expected_job_id, + "process": {"process_graph": pg}, + "status": "created", + "created": self.now.rfc3339, + "progress": 0, + } + + # start job + api100.post(f"/jobs/{expected_job_id}/results").assert_status_code(202) + dummy1.set_job_status(TEST_USER, "1-jb-0", status="running") + dummy1.set_job_status(TEST_USER, "1-jb-1", status="queued") + res = api100.get(f"/jobs/{expected_job_id}").assert_status_code(200) + assert res.json == DictSubSet({"id": expected_job_id, "status": "running", "progress": 0}) + + # First job is ready + dummy1.set_job_status(TEST_USER, "1-jb-0", status="finished") + dummy1.setup_assets(job_id=f"1-jb-0", assets=["1-jb-0-result.tif"]) + dummy1.set_job_status(TEST_USER, "1-jb-1", status="running") + res = api100.get(f"/jobs/{expected_job_id}").assert_status_code(200) + assert res.json == DictSubSet({"id": expected_job_id, "status": "running", "progress": 50}) + + # Main job is ready too + dummy1.set_job_status(TEST_USER, "1-jb-1", status="finished") + dummy1.setup_assets(job_id=f"1-jb-1", assets=["1-jb-1-result.tif"]) + res = api100.get(f"/jobs/{expected_job_id}").assert_status_code(200) + assert res.json == DictSubSet({"id": expected_job_id, "status": "finished", "progress": 100}) + + # Get results + res = api100.get(f"/jobs/{expected_job_id}/results").assert_status_code(200) + assert res.json == DictSubSet( + { + "id": expected_job_id, + "assets": { + "main-1-jb-1-result.tif": { + "file:nodata": [None], + "href": "https://b1.test/v1/jobs/1-jb-1/results/1-jb-1-result.tif", + "roles": ["data"], + "title": "main-1-jb-1-result.tif", + "type": "application/octet-stream", + }, + }, + } + )