From 41a07d290c8a828d669f9e8c66d0913b68982dec Mon Sep 17 00:00:00 2001 From: Stefaan Lippens Date: Wed, 6 Sep 2023 19:07:32 +0200 Subject: [PATCH] Issue #115 Initial implementation of "crossbackend" splitting through API --- setup.py | 2 +- src/openeo_aggregator/backend.py | 82 ++++++-- .../partitionedjobs/__init__.py | 1 + .../partitionedjobs/crossbackend.py | 2 +- .../partitionedjobs/tracking.py | 109 +++++++++- .../partitionedjobs/zookeeper.py | 102 ++++++---- tests/partitionedjobs/test_api.py | 188 ++++++++++++++++++ tests/partitionedjobs/test_zookeeper.py | 15 +- 8 files changed, 444 insertions(+), 57 deletions(-) diff --git a/setup.py b/setup.py index a6df6ca6..053e91a8 100644 --- a/setup.py +++ b/setup.py @@ -28,7 +28,7 @@ "requests", "attrs", "openeo>=0.17.0", - "openeo_driver>=0.57.1.dev", + "openeo_driver>=0.65.0.dev", "flask~=2.0", "gunicorn~=20.0", "python-json-logger>=2.0.0", diff --git a/src/openeo_aggregator/backend.py b/src/openeo_aggregator/backend.py index deb11cb5..8379262a 100644 --- a/src/openeo_aggregator/backend.py +++ b/src/openeo_aggregator/backend.py @@ -85,12 +85,17 @@ ) from openeo_aggregator.metadata.reporter import LoggerReporter from openeo_aggregator.partitionedjobs import PartitionedJob +from openeo_aggregator.partitionedjobs.crossbackend import ( + CrossBackendSplitter, + SubGraphId, +) from openeo_aggregator.partitionedjobs.splitting import FlimsySplitter, TileGridSplitter from openeo_aggregator.partitionedjobs.tracking import ( PartitionedJobConnection, PartitionedJobTracker, ) from openeo_aggregator.utils import ( + Clock, FlatPG, PGWithMetadata, dict_merge, @@ -620,18 +625,29 @@ def get_user_jobs(self, user_id: str) -> Union[List[BatchJobMetadata], dict]: }) def create_job( - self, user_id: str, process: dict, api_version: str, - metadata: dict, job_options: dict = None + self, + user_id: str, + process: dict, + api_version: str, + metadata: dict, + job_options: Optional[dict] = None, ) -> BatchJobMetadata: if "process_graph" not in process: raise ProcessGraphMissingException() # TODO: better, more generic/specific job_option(s)? - if job_options and ( - job_options.get(JOB_OPTION_SPLIT_STRATEGY) - or job_options.get(JOB_OPTION_TILE_GRID) - ): - return self._create_partitioned_job( + if job_options and (job_options.get(JOB_OPTION_SPLIT_STRATEGY) or job_options.get(JOB_OPTION_TILE_GRID)): + if job_options.get(JOB_OPTION_SPLIT_STRATEGY) == "crossbackend": + # TODO this is temporary feature flag to trigger "crossbackend" splitting + return self._create_crossbackend_job( + user_id=user_id, + process=process, + api_version=api_version, + metadata=metadata, + job_options=job_options, + ) + else: + return self._create_partitioned_job( user_id=user_id, process=process, api_version=api_version, @@ -690,8 +706,9 @@ def _create_job_standard( raise OpenEOApiException(f"Failed to create job on backend {backend_id!r}: {e!r}") return BatchJobMetadata( id=JobIdMapping.get_aggregator_job_id(backend_job_id=job.job_id, backend_id=backend_id), - # Note: required, but unused metadata - status="dummy", created="dummy", process={"dummy": "dummy"} + # Note: additional required, but unused metadata + status="dummy", + created="dummy", ) def _create_partitioned_job( @@ -719,11 +736,52 @@ def _create_partitioned_job( raise ValueError("Could not determine splitting strategy from job options") pjob: PartitionedJob = splitter.split(process=process, metadata=metadata, job_options=job_options) - job_id = self.partitioned_job_tracker.create(user_id=user_id, pjob=pjob, flask_request=flask.request) + pjob_id = self.partitioned_job_tracker.create(user_id=user_id, pjob=pjob, flask_request=flask.request) + + return BatchJobMetadata( + id=JobIdMapping.get_aggregator_job_id(backend_job_id=pjob_id, backend_id=JobIdMapping.AGG), + # Note: additional required, but unused metadata + status="dummy", + created="dummy", + ) + + def _create_crossbackend_job( + self, + user_id: str, + process: PGWithMetadata, + api_version: str, + metadata: dict, + job_options: Optional[dict] = None, + ) -> BatchJobMetadata: + """ + Advanced/handled batch job creation: + + - split original job in (possibly) multiple sub-jobs, + e.g. split the process graph based on `load_collection` availability + - distribute sub-jobs across (possibly) multiple back-ends + - keep track of them through a "parent job" in a `PartitionedJobTracker`. + """ + if not self.partitioned_job_tracker: + raise FeatureUnsupportedException(message="Partitioned job tracking is not supported") + + def backend_for_collection(collection_id) -> str: + return self._catalog.get_backends_for_collection(cid=collection_id)[0] + + splitter = CrossBackendSplitter( + backend_for_collection=backend_for_collection, + # TODO: job option for `always_split` feature? + always_split=True, + ) + + pjob_id = self.partitioned_job_tracker.create_crossbackend_pjob( + user_id=user_id, process=process, metadata=metadata, job_options=job_options, splitter=splitter + ) return BatchJobMetadata( - id=JobIdMapping.get_aggregator_job_id(backend_job_id=job_id, backend_id=JobIdMapping.AGG), - status="dummy", created="dummy", process={"dummy": "dummy"} + id=JobIdMapping.get_aggregator_job_id(backend_job_id=pjob_id, backend_id=JobIdMapping.AGG), + # Note: additional required, but unused metadata + status="dummy", + created="dummy", ) def _get_connection_and_backend_job_id( diff --git a/src/openeo_aggregator/partitionedjobs/__init__.py b/src/openeo_aggregator/partitionedjobs/__init__.py index c8230626..7c099506 100644 --- a/src/openeo_aggregator/partitionedjobs/__init__.py +++ b/src/openeo_aggregator/partitionedjobs/__init__.py @@ -34,6 +34,7 @@ def to_subjobs_dict( """Helper to convert a collection of SubJobs to a dictionary""" # TODO: hide this logic in a setter or __init__ (e.g. when outgrowing the constraints of typing.NamedTuple) if isinstance(subjobs, Sequence): + # TODO: eliminate this `Sequence` code path, and just always work with dict? return {f"{i:04d}": j for i, j in enumerate(subjobs)} elif isinstance(subjobs, dict): return {str(k): v for k, v in subjobs.items()} diff --git a/src/openeo_aggregator/partitionedjobs/crossbackend.py b/src/openeo_aggregator/partitionedjobs/crossbackend.py index 0a8a83b5..b74850e5 100644 --- a/src/openeo_aggregator/partitionedjobs/crossbackend.py +++ b/src/openeo_aggregator/partitionedjobs/crossbackend.py @@ -93,7 +93,7 @@ def split_streaming( (e.g. creating openEO batch jobs on the fly and injecting the corresponding batch job ids appropriately). :return: tuple containing: - - subgraph id + - subgraph id, recommended to handle it as opaque id (but usually format '{backend_id}:{node_id}') - SubJob - dependencies as list of subgraph ids """ diff --git a/src/openeo_aggregator/partitionedjobs/tracking.py b/src/openeo_aggregator/partitionedjobs/tracking.py index 522c6ba4..a684699e 100644 --- a/src/openeo_aggregator/partitionedjobs/tracking.py +++ b/src/openeo_aggregator/partitionedjobs/tracking.py @@ -1,13 +1,14 @@ import collections import contextlib +import dataclasses import datetime import logging import threading -from typing import List, Optional +from typing import Dict, List, Optional, Union import flask from openeo.api.logs import LogEntry -from openeo.rest.job import ResultAsset +from openeo.rest.job import BatchJob, ResultAsset from openeo.util import TimingLogger, rfc3339 from openeo_driver.errors import JobNotFinishedException from openeo_driver.users import User @@ -21,10 +22,15 @@ STATUS_INSERTED, STATUS_RUNNING, PartitionedJob, + SubJob, +) +from openeo_aggregator.partitionedjobs.crossbackend import ( + CrossBackendSplitter, + SubGraphId, ) from openeo_aggregator.partitionedjobs.splitting import TileGridSplitter from openeo_aggregator.partitionedjobs.zookeeper import ZooKeeperPartitionedJobDB -from openeo_aggregator.utils import _UNSET, timestamp_to_rfc3339 +from openeo_aggregator.utils import _UNSET, Clock, PGWithMetadata, timestamp_to_rfc3339 _log = logging.getLogger(__name__) @@ -57,6 +63,103 @@ def create(self, user_id: str, pjob: PartitionedJob, flask_request: flask.Reques self.create_sjobs(user_id=user_id, pjob_id=pjob_id, flask_request=flask_request) return pjob_id + def create_crossbackend_pjob( + self, + *, + user_id: str, + process: PGWithMetadata, + metadata: dict, + job_options: Optional[dict] = None, + splitter: CrossBackendSplitter, + ) -> str: + """ + crossbackend partitioned job creation is different from original partitioned + job creation due to dependencies between jobs. + First the batch jobs have to be created in the right order on the respective backends + before we have finalised sub-processgraphs, whose metadata can then be persisted in the ZooKeeperPartitionedJobDB + """ + # Start with reserving a new partitioned job id based on initial metadata + pjob_node_value = self._db.serialize( + user_id=user_id, + created=Clock.time(), + process=process, + metadata=metadata, + job_options=job_options, + ) + pjob_id = self._db.obtain_new_pjob_id(user_id=user_id, initial_value=pjob_node_value) + self._db.set_pjob_status(user_id=user_id, pjob_id=pjob_id, status=STATUS_INSERTED, create=True) + + # Create batch jobs on respective backends, and build the PartitionedJob components along the way + subjobs: Dict[str, SubJob] = {} + dependencies: Dict[str, List[str]] = {} + batch_jobs: Dict[SubGraphId, BatchJob] = {} + create_stats = collections.Counter() + + def get_replacement(node_id: str, node: dict, subgraph_id: SubGraphId) -> dict: + # TODO: use `load_stac` iso `load_result`, and use canonical URL? + nonlocal batch_jobs + job_id = batch_jobs[subgraph_id].job_id + return { + node_id: { + "process_id": "load_result", + "arguments": {"id": job_id}, + } + } + + for sjob_id, subjob, subjob_dependencies in splitter.split_streaming( + process_graph=process["process_graph"], get_replacement=get_replacement + ): + subjobs[sjob_id] = subjob + dependencies[sjob_id] = subjob_dependencies + try: + # TODO: how to error handle this? job creation? Fail whole partitioned job or try to finish what is possible? + con = self._backends.get_connection(subjob.backend_id) + with con.authenticated_from_request(request=flask.request), con.override( + default_timeout=CONNECTION_TIMEOUT_JOB_START + ): + with TimingLogger(title=f"Create batch job {pjob_id=}:{sjob_id} on {con.id=}", logger=_log.info): + job = con.create_job( + process_graph=subjob.process_graph, + title=f"Crossbackend job {pjob_id}:{sjob_id}", + plan=metadata.get("plan"), + budget=metadata.get("budget"), + additional=job_options, + ) + _log.info(f"Created {pjob_id}:{sjob_id} on backend {con.id} as batch job {job.job_id}") + batch_jobs[sjob_id] = job + title = f"Partitioned job {pjob_id=} {sjob_id=}" + self._db.insert_sjob( + user_id=user_id, + pjob_id=pjob_id, + sjob_id=sjob_id, + subjob=subjob, + title=title, + status=STATUS_CREATED, + ) + self._db.set_backend_job_id( + user_id=user_id, pjob_id=pjob_id, sjob_id=sjob_id, job_id=job.job_id + ) + create_stats[STATUS_CREATED] += 1 + except Exception as exc: + _log.error(f"Creation of {pjob_id}:{sjob_id} failed", exc_info=True) + msg = f"Create failed: {exc}" + self._db.set_sjob_status( + user_id=user_id, pjob_id=pjob_id, sjob_id=sjob_id, status=STATUS_ERROR, message=msg + ) + create_stats[STATUS_ERROR] += 1 + + # TODO: this is currently unused, don't bother building it at all? + partitioned_job = PartitionedJob( + process=process, metadata=metadata, job_options=job_options, subjobs=subjobs, dependencies=dependencies + ) + + pjob_status = STATUS_CREATED if create_stats[STATUS_CREATED] > 0 else STATUS_ERROR + self._db.set_pjob_status( + user_id=user_id, pjob_id=pjob_id, status=pjob_status, message=repr(create_stats), progress=0 + ) + + return pjob_id + def create_sjobs(self, user_id: str, pjob_id: str, flask_request: flask.Request): """Create all sub-jobs on remote back-end for given partitioned job""" pjob_metadata = self._db.get_pjob_metadata(user_id=user_id, pjob_id=pjob_id) diff --git a/src/openeo_aggregator/partitionedjobs/zookeeper.py b/src/openeo_aggregator/partitionedjobs/zookeeper.py index fe920fd5..c93269a5 100644 --- a/src/openeo_aggregator/partitionedjobs/zookeeper.py +++ b/src/openeo_aggregator/partitionedjobs/zookeeper.py @@ -12,6 +12,7 @@ STATUS_INSERTED, PartitionedJob, PartitionedJobFailure, + SubJob, ) from openeo_aggregator.utils import Clock, strip_join, timestamp_to_rfc3339 @@ -90,6 +91,21 @@ def deserialize(value: bytes) -> dict: """Deserialize bytes (assuming UTF8 encoded JSON mapping)""" return json.loads(value.decode("utf8")) + def obtain_new_pjob_id(self, user_id: str, initial_value: bytes = b"", attempts: int = 3) -> str: + """Obtain new, unique partitioned job id""" + # A couple of pjob_id attempts: start with current time based name and a suffix to counter collisions (if any) + base_pjob_id = "pj-" + Clock.utcnow().strftime("%Y%m%d-%H%M%S") + for pjob_id in [base_pjob_id] + [f"{base_pjob_id}-{i}" for i in range(1, attempts)]: + try: + self._client.create(path=self._path(user_id, pjob_id), value=initial_value, makepath=True) + # We obtained our unique id + return pjob_id + except NodeExistsError: + # TODO: check that NodeExistsError is thrown on existing job_ids + # TODO: add a sleep() to back off a bit? + continue + raise PartitionedJobFailure("Too much attempts to create new pjob_id") + def insert(self, user_id: str, pjob: PartitionedJob) -> str: """ Insert a new partitioned job. @@ -98,7 +114,7 @@ def insert(self, user_id: str, pjob: PartitionedJob) -> str: """ with self._connect(): # Insert parent node, with "static" (write once) metadata as associated data - job_node_value = self.serialize( + pjob_node_value = self.serialize( user_id=user_id, # TODO: more BatchJobMetdata fields created=Clock.time(), @@ -107,24 +123,9 @@ def insert(self, user_id: str, pjob: PartitionedJob) -> str: job_options=pjob.job_options, # TODO: pjob.dependencies #95 ) - # A couple of pjob_id attempts: start with current time based name and a suffix to counter collisions (if any) - base_pjob_id = "pj-" + Clock.utcnow().strftime("%Y%m%d-%H%M%S") - for pjob_id in [base_pjob_id] + [f"{base_pjob_id}-{i}" for i in range(1, 3)]: - try: - self._client.create(path=self._path(user_id, pjob_id), value=job_node_value, makepath=True) - break - except NodeExistsError: - # TODO: check that NodeExistsError is thrown on existing job_ids - # TODO: add a sleep() to back off a bit? - continue - else: - raise PartitionedJobFailure("Too much attempts to create new pjob_id") - + pjob_id = self.obtain_new_pjob_id(user_id=user_id, initial_value=pjob_node_value) # Updatable metadata - self._client.create( - path=self._path(user_id, pjob_id, "status"), - value=self.serialize(status=STATUS_INSERTED) - ) + self.set_pjob_status(user_id=user_id, pjob_id=pjob_id, status=STATUS_INSERTED, create=True) # Insert subjobs # TODO #95 some subjobs are not fully defined if they have dependencies @@ -132,22 +133,27 @@ def insert(self, user_id: str, pjob: PartitionedJob) -> str: # Only create them when fully concrete,, # or allow updates on this metadata? for i, (sjob_id, subjob) in enumerate(pjob.subjobs.items()): - self._client.create( - path=self._path(user_id, pjob_id, "sjobs", sjob_id), - value=self.serialize( - process_graph=subjob.process_graph, - backend_id=subjob.backend_id, - title=f"Partitioned job {pjob_id} part {sjob_id} ({i + 1}/{len(pjob.subjobs)})", - ), - makepath=True, - ) - self._client.create( - path=self._path(user_id, pjob_id, "sjobs", sjob_id, "status"), - value=self.serialize(status=STATUS_INSERTED), - ) + title = f"Partitioned job {pjob_id} part {sjob_id} ({i + 1}/{len(pjob.subjobs)})" + self.insert_sjob(user_id=user_id, pjob_id=pjob_id, sjob_id=sjob_id, subjob=subjob, title=title) return pjob_id + def insert_sjob( + self, + user_id: str, + pjob_id: str, + sjob_id: str, + subjob: SubJob, + title: Optional[str] = None, + status: str = STATUS_INSERTED, + ): + self._client.create( + path=self._path(user_id, pjob_id, "sjobs", sjob_id), + value=self.serialize(process_graph=subjob.process_graph, backend_id=subjob.backend_id, title=title), + makepath=True, + ) + self.set_sjob_status(user_id=user_id, pjob_id=pjob_id, sjob_id=sjob_id, status=status, create=True) + def get_pjob_metadata(self, user_id: str, pjob_id: str) -> dict: """Get metadata of partitioned job, given by storage id.""" with self._connect(): @@ -194,20 +200,32 @@ def get_backend_job_id(self, user_id: str, pjob_id: str, sjob_id: str) -> str: raise NoJobIdForSubJobException(f"No job_id for {pjob_id}:{sjob_id}.") return self.deserialize(value)["job_id"] - def set_pjob_status(self, user_id: str, pjob_id: str, status: str, message: Optional[str] = None, - progress: int = None): + def set_pjob_status( + self, + user_id: str, + pjob_id: str, + status: str, + message: Optional[str] = None, + progress: int = None, + create: bool = False, + ): """ Store status of partitioned job (with optional message). :param pjob_id: (storage) id of partitioned job :param status: global status of partitioned job :param message: optional message, e.g. describing error + :param create: whether to create the node instead of updating """ with self._connect(): - self._client.set( + kwargs = dict( path=self._path(user_id, pjob_id, "status"), value=self.serialize(status=status, message=message, timestamp=Clock.time(), progress=progress) ) + if create: + self._client.create(**kwargs) + else: + self._client.set(**kwargs) def get_pjob_status(self, user_id: str, pjob_id: str) -> dict: """ @@ -222,13 +240,25 @@ def get_pjob_status(self, user_id: str, pjob_id: str) -> dict: value, stat = self._client.get(self._path(user_id, pjob_id, "status")) return self.deserialize(value) - def set_sjob_status(self, user_id: str, pjob_id: str, sjob_id: str, status: str, message: Optional[str] = None): + def set_sjob_status( + self, + user_id: str, + pjob_id: str, + sjob_id: str, + status: str, + message: Optional[str] = None, + create: bool = False, + ): """Store status of sub-job (with optional message)""" with self._connect(): - self._client.set( + kwargs = dict( path=self._path(user_id, pjob_id, "sjobs", sjob_id, "status"), value=self.serialize(status=status, message=message, timestamp=Clock.time()), ) + if create: + self._client.create(**kwargs) + else: + self._client.set(**kwargs) def get_sjob_status(self, user_id: str, pjob_id: str, sjob_id: str) -> dict: """Get status of sub-job""" diff --git a/tests/partitionedjobs/test_api.py b/tests/partitionedjobs/test_api.py index 14bd3f20..89f5d691 100644 --- a/tests/partitionedjobs/test_api.py +++ b/tests/partitionedjobs/test_api.py @@ -33,6 +33,7 @@ class _Now: """Helper to mock "now" to given datetime""" # TODO: move to testing utilities and reuse more? + # TODO: just migrate to a more standard time mock library like time_machine? def __init__(self, date: str): self.rfc3339 = rfc3339.normalize(date) @@ -609,3 +610,190 @@ def test_job_results_basic(self, flask_app, api100, dummy1): assert res.json == DictSubSet({"type": "FeatureCollection"}) # TODO: more/full TileGridSplitter batch job tests + + +class TestCrossBackendSplitting: + now = _Now("2022-01-19T12:34:56Z") + + @now.mock + def test_create_job_simple(self, flask_app, api100, zk_db, dummy1): + """Handling of single "load_collection" process graph""" + api100.set_auth_bearer_token(token=TEST_USER_BEARER_TOKEN) + + pg = {"lc1": {"process_id": "load_collection", "arguments": {"id": "S2"}, "result": True}} + + res = api100.post( + "/jobs", + json={ + "process": {"process_graph": pg}, + "job_options": {"split_strategy": "crossbackend"}, + }, + ).assert_status_code(201) + + pjob_id = "pj-20220119-123456" + expected_job_id = f"agg-{pjob_id}" + assert res.headers["Location"] == f"http://oeoa.test/openeo/1.0.0/jobs/{expected_job_id}" + assert res.headers["OpenEO-Identifier"] == expected_job_id + + res = api100.get(f"/jobs/{expected_job_id}").assert_status_code(200) + assert res.json == { + "id": expected_job_id, + "process": {"process_graph": pg}, + "status": "created", + "created": self.now.rfc3339, + "progress": 0, + } + + # Inspect stored parent job metadata + assert zk_db.get_pjob_metadata(user_id=TEST_USER, pjob_id=pjob_id) == { + "user_id": TEST_USER, + "created": self.now.epoch, + "process": {"process_graph": pg}, + "metadata": {}, + "job_options": {"split_strategy": "crossbackend"}, + } + + assert zk_db.get_pjob_status(user_id=TEST_USER, pjob_id=pjob_id) == { + "status": "created", + "message": approx_str_contains("{'created': 1}"), + "timestamp": pytest.approx(self.now.epoch, abs=1), + "progress": 0, + } + + # Inspect stored subjob metadata + subjobs = zk_db.list_subjobs(user_id=TEST_USER, pjob_id=pjob_id) + assert subjobs == { + "main": { + "backend_id": "b1", + "process_graph": {"lc1": {"arguments": {"id": "S2"}, "process_id": "load_collection", "result": True}}, + "title": "Partitioned job pjob_id='pj-20220119-123456' " "sjob_id='main'", + } + } + sjob_id = "main" + assert zk_db.get_sjob_status(user_id=TEST_USER, pjob_id=pjob_id, sjob_id=sjob_id) == { + "status": "created", + "timestamp": pytest.approx(self.now.epoch, abs=1), + "message": None, + } + job_id = zk_db.get_backend_job_id(user_id=TEST_USER, pjob_id=pjob_id, sjob_id=sjob_id) + assert job_id == "1-jb-0" + + assert dummy1.get_job_status(TEST_USER, job_id) == "created" + pg = dummy1.get_job_data(TEST_USER, job_id).create["process"]["process_graph"] + assert pg == {"lc1": {"arguments": {"id": "S2"}, "process_id": "load_collection", "result": True}} + + @now.mock + def test_create_job_basic(self, flask_app, api100, zk_db, dummy1): + api100.set_auth_bearer_token(token=TEST_USER_BEARER_TOKEN) + + pg = { + "lc1": {"process_id": "load_collection", "arguments": {"id": "S2"}}, + "lc2": {"process_id": "load_collection", "arguments": {"id": "S2"}}, + "merge": { + "process_id": "merge_cubes", + "arguments": {"cube1": {"from_node": "lc1"}, "cube2": {"from_node": "lc2"}}, + "result": True, + }, + } + + res = api100.post( + "/jobs", + json={"process": {"process_graph": pg}, "job_options": {"split_strategy": "crossbackend"}}, + ).assert_status_code(201) + + pjob_id = "pj-20220119-123456" + expected_job_id = f"agg-{pjob_id}" + assert res.headers["Location"] == f"http://oeoa.test/openeo/1.0.0/jobs/{expected_job_id}" + assert res.headers["OpenEO-Identifier"] == expected_job_id + + res = api100.get(f"/jobs/{expected_job_id}").assert_status_code(200) + assert res.json == { + "id": expected_job_id, + "process": {"process_graph": pg}, + "status": "created", + "created": self.now.rfc3339, + "progress": 0, + } + + # Inspect stored parent job metadata + assert zk_db.get_pjob_metadata(user_id=TEST_USER, pjob_id=pjob_id) == { + "user_id": TEST_USER, + "created": self.now.epoch, + "process": {"process_graph": pg}, + "metadata": {}, + "job_options": {"split_strategy": "crossbackend"}, + } + + assert zk_db.get_pjob_status(user_id=TEST_USER, pjob_id=pjob_id) == { + "status": "created", + "message": approx_str_contains("{'created': 2}"), + "timestamp": pytest.approx(self.now.epoch, abs=5), + "progress": 0, + } + + # Inspect stored subjob metadata + subjobs = zk_db.list_subjobs(user_id=TEST_USER, pjob_id=pjob_id) + assert subjobs == { + "b1:lc2": { + "backend_id": "b1", + "process_graph": { + "lc2": {"process_id": "load_collection", "arguments": {"id": "S2"}}, + "sr1": { + "process_id": "save_result", + "arguments": {"data": {"from_node": "lc2"}, "format": "GTiff"}, + "result": True, + }, + }, + "title": "Partitioned job pjob_id='pj-20220119-123456' sjob_id='b1:lc2'", + }, + "main": { + "backend_id": "b1", + "process_graph": { + "lc1": {"process_id": "load_collection", "arguments": {"id": "S2"}}, + "lc2": {"process_id": "load_result", "arguments": {"id": "1-jb-0"}}, + "merge": { + "process_id": "merge_cubes", + "arguments": {"cube1": {"from_node": "lc1"}, "cube2": {"from_node": "lc2"}}, + "result": True, + }, + }, + "title": "Partitioned job pjob_id='pj-20220119-123456' sjob_id='main'", + }, + } + + sjob_id = "main" + expected_job_id = "1-jb-1" + assert zk_db.get_sjob_status(user_id=TEST_USER, pjob_id=pjob_id, sjob_id=sjob_id) == { + "status": "created", + "timestamp": self.now.epoch, + "message": None, + } + assert zk_db.get_backend_job_id(user_id=TEST_USER, pjob_id=pjob_id, sjob_id=sjob_id) == expected_job_id + assert dummy1.get_job_status(TEST_USER, expected_job_id) == "created" + assert dummy1.get_job_data(TEST_USER, expected_job_id).create["process"]["process_graph"] == { + "lc1": {"process_id": "load_collection", "arguments": {"id": "S2"}}, + "lc2": {"process_id": "load_result", "arguments": {"id": "1-jb-0"}}, + "merge": { + "process_id": "merge_cubes", + "arguments": {"cube1": {"from_node": "lc1"}, "cube2": {"from_node": "lc2"}}, + "result": True, + }, + } + + sjob_id = "b1:lc2" + expected_job_id = "1-jb-0" + assert zk_db.get_sjob_status(user_id=TEST_USER, pjob_id=pjob_id, sjob_id=sjob_id) == { + "status": "created", + "timestamp": self.now.epoch, + "message": None, + } + assert zk_db.get_backend_job_id(user_id=TEST_USER, pjob_id=pjob_id, sjob_id=sjob_id) == expected_job_id + assert dummy1.get_job_status(TEST_USER, expected_job_id) == "created" + assert dummy1.get_job_data(TEST_USER, expected_job_id).create["process"]["process_graph"] == { + "lc2": {"process_id": "load_collection", "arguments": {"id": "S2"}}, + "sr1": { + "process_id": "save_result", + "arguments": {"data": {"from_node": "lc2"}, "format": "GTiff"}, + "result": True, + }, + } diff --git a/tests/partitionedjobs/test_zookeeper.py b/tests/partitionedjobs/test_zookeeper.py index bb926ada..923e044d 100644 --- a/tests/partitionedjobs/test_zookeeper.py +++ b/tests/partitionedjobs/test_zookeeper.py @@ -66,6 +66,9 @@ def test_insert_basic(self, pjob, zk_client, zk_db): }, "/o-a/tstsr/pj-20220117-174800/status": { "status": "inserted", + "message": None, + "timestamp": approx_now(), + "progress": None, }, "/o-a/tstsr/pj-20220117-174800/sjobs/0000": { "process_graph": PG12, @@ -73,7 +76,9 @@ def test_insert_basic(self, pjob, zk_client, zk_db): "title": "Partitioned job pj-20220117-174800 part 0000 (1/2)" }, "/o-a/tstsr/pj-20220117-174800/sjobs/0000/status": { - "status": "inserted" + "status": "inserted", + "message": None, + "timestamp": approx_now(), }, "/o-a/tstsr/pj-20220117-174800/sjobs/0001": { "process_graph": PG23, @@ -81,7 +86,9 @@ def test_insert_basic(self, pjob, zk_client, zk_db): "title": "Partitioned job pj-20220117-174800 part 0001 (2/2)" }, "/o-a/tstsr/pj-20220117-174800/sjobs/0001/status": { - "status": "inserted" + "status": "inserted", + "message": None, + "timestamp": approx_now(), }, } @@ -146,7 +153,7 @@ def test_set_get_pjob_status(self, pjob, zk_db): zk_db.insert(pjob=pjob, user_id=TEST_USER) status = zk_db.get_pjob_status(user_id=TEST_USER, pjob_id="pj-20220117-174800") - assert status == {"status": "inserted"} + assert status == {"status": "inserted", "message": None, "timestamp": approx_now(), "progress": None} zk_db.set_pjob_status(user_id=TEST_USER, pjob_id="pj-20220117-174800", status="running", message="goin' on") status = zk_db.get_pjob_status(user_id=TEST_USER, pjob_id="pj-20220117-174800") @@ -166,7 +173,7 @@ def test_set_get_sjob_status(self, pjob, zk_db): zk_db.insert(pjob=pjob, user_id=TEST_USER) status = zk_db.get_sjob_status(user_id=TEST_USER, pjob_id="pj-20220117-174800", sjob_id="0000") - assert status == {"status": "inserted"} + assert status == {"status": "inserted", "message": None, "timestamp": approx_now()} zk_db.set_sjob_status( user_id=TEST_USER, pjob_id="pj-20220117-174800", sjob_id="0000", status="running",