diff --git a/src/openeo_aggregator/backend.py b/src/openeo_aggregator/backend.py index faf24e33..deb11cb5 100644 --- a/src/openeo_aggregator/backend.py +++ b/src/openeo_aggregator/backend.py @@ -111,6 +111,7 @@ def set_backends_for_collection(self, cid: str, backends: Iterable[str]): self._data[cid]["backends"] = list(backends) def get_backends_for_collection(self, cid: str) -> List[str]: + """Get backend ids that provide given collection id.""" if cid not in self._data: raise CollectionNotFoundException(collection_id=cid) return self._data[cid]["backends"] @@ -205,6 +206,11 @@ def evaluate(backend_id, pg): return [functools.partial(evaluate, pg=pg) for pg in process_graphs] + def get_backends_for_collection(self, cid: str) -> List[str]: + """Get backend ids that provide given collection id.""" + metadata, internal = self._get_all_metadata_cached() + return internal.get_backends_for_collection(cid=cid) + def get_backend_candidates_for_collections(self, collections: Iterable[str]) -> List[str]: """ Get backend ids providing all given collections @@ -568,13 +574,16 @@ def _process_load_ml_model( class AggregatorBatchJobs(BatchJobs): def __init__( - self, - backends: MultiBackendConnection, - processing: AggregatorProcessing, - partitioned_job_tracker: Optional[PartitionedJobTracker] = None, + self, + *, + backends: MultiBackendConnection, + catalog: AggregatorCollectionCatalog, + processing: AggregatorProcessing, + partitioned_job_tracker: Optional[PartitionedJobTracker] = None, ): super(AggregatorBatchJobs, self).__init__() self.backends = backends + self._catalog = catalog self.processing = processing self.partitioned_job_tracker = partitioned_job_tracker @@ -1127,8 +1136,9 @@ def __init__(self, backends: MultiBackendConnection, config: AggregatorConfig): batch_jobs = AggregatorBatchJobs( backends=backends, + catalog=catalog, processing=processing, - partitioned_job_tracker=partitioned_job_tracker + partitioned_job_tracker=partitioned_job_tracker, ) secondary_services = AggregatorSecondaryServices(backends=backends, processing=processing, config=config) diff --git a/src/openeo_aggregator/partitionedjobs/crossbackend.py b/src/openeo_aggregator/partitionedjobs/crossbackend.py index 74195348..36d1c169 100644 --- a/src/openeo_aggregator/partitionedjobs/crossbackend.py +++ b/src/openeo_aggregator/partitionedjobs/crossbackend.py @@ -186,6 +186,7 @@ def run_partitioned_job( ) -> dict: """ Run partitioned job (probably with dependencies between subjobs) + with an active polling loop for tracking and scheduling the subjobs .. warning:: this is experimental functionality