From 35c1d9379f692f349d1703114386019bac62b4da Mon Sep 17 00:00:00 2001
From: Stefaan Lippens <stefaan.lippens@vito.be>
Date: Mon, 4 Sep 2023 18:45:38 +0200
Subject: [PATCH 01/11] Issue #115 CrossBackendSplitter: internalize
 backend_for_collection caching

---
 scripts/crossbackend-processing-poc.py         |  1 -
 .../partitionedjobs/crossbackend.py            | 18 +++++++++---------
 2 files changed, 9 insertions(+), 10 deletions(-)

diff --git a/scripts/crossbackend-processing-poc.py b/scripts/crossbackend-processing-poc.py
index 25d33925..3d2f7b01 100644
--- a/scripts/crossbackend-processing-poc.py
+++ b/scripts/crossbackend-processing-poc.py
@@ -58,7 +58,6 @@ def main():
     with TimingLogger(title=f"Connecting to {backend_url}", logger=_log):
         connection = openeo.connect(url=backend_url).authenticate_oidc()
 
-    @functools.lru_cache(maxsize=100)
     def backend_for_collection(collection_id) -> str:
         metadata = connection.describe_collection(collection_id)
         return metadata["summaries"][STAC_PROPERTY_FEDERATION_BACKENDS][0]
diff --git a/src/openeo_aggregator/partitionedjobs/crossbackend.py b/src/openeo_aggregator/partitionedjobs/crossbackend.py
index 909e22bc..74195348 100644
--- a/src/openeo_aggregator/partitionedjobs/crossbackend.py
+++ b/src/openeo_aggregator/partitionedjobs/crossbackend.py
@@ -48,14 +48,14 @@ def split(
         process_graph = process["process_graph"]
 
         # Extract necessary back-ends from `load_collection` usage
-        backend_usage = collections.Counter(
-            self.backend_for_collection(node["arguments"]["id"])
-            for node in process_graph.values()
-            if node["process_id"] == "load_collection"
-        )
-        _log.info(
-            f"Extracted backend usage from `load_collection` nodes: {backend_usage}"
-        )
+        backend_per_collection: Dict[str, str] = {
+            cid: self.backend_for_collection(cid)
+            for cid in (
+                node["arguments"]["id"] for node in process_graph.values() if node["process_id"] == "load_collection"
+            )
+        }
+        backend_usage = collections.Counter(backend_per_collection.values())
+        _log.info(f"Extracted backend usage from `load_collection` nodes: {backend_usage=} {backend_per_collection=}")
 
         primary_backend = backend_usage.most_common(1)[0][0] if backend_usage else None
         secondary_backends = {b for b in backend_usage if b != primary_backend}
@@ -70,7 +70,7 @@ def split(
 
         for node_id, node in process_graph.items():
             if node["process_id"] == "load_collection":
-                bid = self.backend_for_collection(node["arguments"]["id"])
+                bid = backend_per_collection[node["arguments"]["id"]]
                 if bid == primary_backend and not (
                     self._always_split and primary_has_load_collection
                 ):

From 40474c88921ff797dbd77c99636996baed91609f Mon Sep 17 00:00:00 2001
From: Stefaan Lippens <stefaan.lippens@vito.be>
Date: Tue, 5 Sep 2023 17:55:08 +0200
Subject: [PATCH 02/11] Issue #115  allow AggregatorBatchJobs to get backend id
 for given collection id

---
 src/openeo_aggregator/backend.py              | 20 ++++++++++++++-----
 .../partitionedjobs/crossbackend.py           |  1 +
 2 files changed, 16 insertions(+), 5 deletions(-)

diff --git a/src/openeo_aggregator/backend.py b/src/openeo_aggregator/backend.py
index faf24e33..deb11cb5 100644
--- a/src/openeo_aggregator/backend.py
+++ b/src/openeo_aggregator/backend.py
@@ -111,6 +111,7 @@ def set_backends_for_collection(self, cid: str, backends: Iterable[str]):
         self._data[cid]["backends"] = list(backends)
 
     def get_backends_for_collection(self, cid: str) -> List[str]:
+        """Get backend ids that provide given collection id."""
         if cid not in self._data:
             raise CollectionNotFoundException(collection_id=cid)
         return self._data[cid]["backends"]
@@ -205,6 +206,11 @@ def evaluate(backend_id, pg):
 
         return [functools.partial(evaluate, pg=pg) for pg in process_graphs]
 
+    def get_backends_for_collection(self, cid: str) -> List[str]:
+        """Get backend ids that provide given collection id."""
+        metadata, internal = self._get_all_metadata_cached()
+        return internal.get_backends_for_collection(cid=cid)
+
     def get_backend_candidates_for_collections(self, collections: Iterable[str]) -> List[str]:
         """
         Get backend ids providing all given collections
@@ -568,13 +574,16 @@ def _process_load_ml_model(
 class AggregatorBatchJobs(BatchJobs):
 
     def __init__(
-            self,
-            backends: MultiBackendConnection,
-            processing: AggregatorProcessing,
-            partitioned_job_tracker: Optional[PartitionedJobTracker] = None,
+        self,
+        *,
+        backends: MultiBackendConnection,
+        catalog: AggregatorCollectionCatalog,
+        processing: AggregatorProcessing,
+        partitioned_job_tracker: Optional[PartitionedJobTracker] = None,
     ):
         super(AggregatorBatchJobs, self).__init__()
         self.backends = backends
+        self._catalog = catalog
         self.processing = processing
         self.partitioned_job_tracker = partitioned_job_tracker
 
@@ -1127,8 +1136,9 @@ def __init__(self, backends: MultiBackendConnection, config: AggregatorConfig):
 
         batch_jobs = AggregatorBatchJobs(
             backends=backends,
+            catalog=catalog,
             processing=processing,
-            partitioned_job_tracker=partitioned_job_tracker
+            partitioned_job_tracker=partitioned_job_tracker,
         )
 
         secondary_services = AggregatorSecondaryServices(backends=backends, processing=processing, config=config)
diff --git a/src/openeo_aggregator/partitionedjobs/crossbackend.py b/src/openeo_aggregator/partitionedjobs/crossbackend.py
index 74195348..36d1c169 100644
--- a/src/openeo_aggregator/partitionedjobs/crossbackend.py
+++ b/src/openeo_aggregator/partitionedjobs/crossbackend.py
@@ -186,6 +186,7 @@ def run_partitioned_job(
 ) -> dict:
     """
     Run partitioned job (probably with dependencies between subjobs)
+    with an active polling loop for tracking and scheduling the subjobs
 
     .. warning::
         this is experimental functionality

From 329469b3de72b3d13aa460f47245031b4be4a6d1 Mon Sep 17 00:00:00 2001
From: Stefaan Lippens <stefaan.lippens@vito.be>
Date: Wed, 6 Sep 2023 10:17:25 +0200
Subject: [PATCH 03/11] Issue #115 CrossBackendSplitter: add "streamed" split
 to allow injecting batch job ids on the fly

---
 .../partitionedjobs/crossbackend.py           | 118 +++++++++----
 tests/partitionedjobs/test_crossbackend.py    | 165 +++++++++++++++++-
 2 files changed, 247 insertions(+), 36 deletions(-)

diff --git a/src/openeo_aggregator/partitionedjobs/crossbackend.py b/src/openeo_aggregator/partitionedjobs/crossbackend.py
index 36d1c169..0a8a83b5 100644
--- a/src/openeo_aggregator/partitionedjobs/crossbackend.py
+++ b/src/openeo_aggregator/partitionedjobs/crossbackend.py
@@ -5,7 +5,7 @@
 import logging
 import time
 from contextlib import nullcontext
-from typing import Callable, Dict, List, Sequence
+from typing import Callable, Dict, Iterator, List, Optional, Protocol, Sequence, Tuple
 
 import openeo
 from openeo import BatchJob
@@ -20,6 +20,42 @@
 
 _LOAD_RESULT_PLACEHOLDER = "_placeholder:"
 
+# Some type annotation aliases to make things more self-documenting
+SubGraphId = str
+
+
+class GetReplacementCallable(Protocol):
+    """
+    Type annotation for callback functions that produce a node replacement
+    for a node that is split off from the main process graph
+
+    Also see `_default_get_replacement`
+    """
+
+    def __call__(self, node_id: str, node: dict, subgraph_id: SubGraphId) -> dict:
+        """
+        :param node_id: original id of the node in the process graph (e.g. `loadcollection2`)
+        :param node: original node in the process graph (e.g. `{"process_id": "load_collection", "arguments": {...}}`)
+        :param subgraph_id: id of the corresponding dependency subgraph
+            (to be handled as opaque id, but possibly something like `backend1:loadcollection2`)
+
+        :return: new process graph nodes. Should contain at least a node keyed under `node_id`
+        """
+        ...
+
+
+def _default_get_replacement(node_id: str, node: dict, subgraph_id: SubGraphId) -> dict:
+    """
+    Default `get_replacement` function to replace a node that has been split off.
+    """
+    return {
+        node_id: {
+            # TODO: use `load_stac` iso `load_result`
+            "process_id": "load_result",
+            "arguments": {"id": f"{_LOAD_RESULT_PLACEHOLDER}{subgraph_id}"},
+        }
+    }
+
 
 class CrossBackendSplitter(AbstractJobSplitter):
     """
@@ -42,10 +78,25 @@ def __init__(
         self.backend_for_collection = backend_for_collection
         self._always_split = always_split
 
-    def split(
-        self, process: PGWithMetadata, metadata: dict = None, job_options: dict = None
-    ) -> PartitionedJob:
-        process_graph = process["process_graph"]
+    def split_streaming(
+        self,
+        process_graph: FlatPG,
+        get_replacement: GetReplacementCallable = _default_get_replacement,
+    ) -> Iterator[Tuple[SubGraphId, SubJob, List[SubGraphId]]]:
+        """
+        Split given process graph in sub-process graphs and return these as an iterator
+        in an order so that a subgraph comes after all subgraphs it depends on
+        (e.g. main "primary" graph comes last).
+
+        The iterator approach allows working with a dynamic `get_replacement` implementation
+        that adapting to on previously produced subgraphs
+        (e.g. creating openEO batch jobs on the fly and injecting the corresponding batch job ids appropriately).
+
+        :return: tuple containing:
+            - subgraph id
+            - SubJob
+            - dependencies as list of subgraph ids
+        """
 
         # Extract necessary back-ends from `load_collection` usage
         backend_per_collection: Dict[str, str] = {
@@ -57,55 +108,60 @@ def split(
         backend_usage = collections.Counter(backend_per_collection.values())
         _log.info(f"Extracted backend usage from `load_collection` nodes: {backend_usage=} {backend_per_collection=}")
 
+        # TODO: more options to determine primary backend?
         primary_backend = backend_usage.most_common(1)[0][0] if backend_usage else None
         secondary_backends = {b for b in backend_usage if b != primary_backend}
         _log.info(f"Backend split: {primary_backend=} {secondary_backends=}")
 
         primary_id = "main"
-        primary_pg = SubJob(process_graph={}, backend_id=primary_backend)
+        primary_pg = {}
         primary_has_load_collection = False
-
-        subjobs: Dict[str, SubJob] = {primary_id: primary_pg}
-        dependencies: Dict[str, List[str]] = {primary_id: []}
+        primary_dependencies = []
 
         for node_id, node in process_graph.items():
             if node["process_id"] == "load_collection":
                 bid = backend_per_collection[node["arguments"]["id"]]
-                if bid == primary_backend and not (
-                    self._always_split and primary_has_load_collection
-                ):
+                if bid == primary_backend and (not self._always_split or not primary_has_load_collection):
                     # Add to primary pg
-                    primary_pg.process_graph[node_id] = node
+                    primary_pg[node_id] = node
                     primary_has_load_collection = True
                 else:
                     # New secondary pg
-                    pg = {
+                    sub_id = f"{bid}:{node_id}"
+                    sub_pg = {
                         node_id: node,
                         "sr1": {
                             # TODO: other/better choices for save_result format (e.g. based on backend support)?
-                            # TODO: particular format options?
                             "process_id": "save_result",
                             "arguments": {
                                 "data": {"from_node": node_id},
+                                # TODO: particular format options?
                                 # "format": "NetCDF",
                                 "format": "GTiff",
                             },
                             "result": True,
                         },
                     }
-                    dependency_id = f"{bid}:{node_id}"
-                    subjobs[dependency_id] = SubJob(process_graph=pg, backend_id=bid)
-                    dependencies[primary_id].append(dependency_id)
-                    # Link to primary pg with load_result
-                    primary_pg.process_graph[node_id] = {
-                        # TODO: encapsulate this placeholder process/id better?
-                        "process_id": "load_result",
-                        "arguments": {
-                            "id": f"{_LOAD_RESULT_PLACEHOLDER}{dependency_id}"
-                        },
-                    }
+
+                    yield (sub_id, SubJob(process_graph=sub_pg, backend_id=bid), [])
+
+                    # Link secondary pg into primary pg
+                    primary_pg.update(get_replacement(node_id=node_id, node=node, subgraph_id=sub_id))
+                    primary_dependencies.append(sub_id)
             else:
-                primary_pg.process_graph[node_id] = node
+                primary_pg[node_id] = node
+
+        yield (primary_id, SubJob(process_graph=primary_pg, backend_id=primary_backend), primary_dependencies)
+
+    def split(self, process: PGWithMetadata, metadata: dict = None, job_options: dict = None) -> PartitionedJob:
+        """Split given process graph into a `PartitionedJob`"""
+
+        subjobs: Dict[SubGraphId, SubJob] = {}
+        dependencies: Dict[SubGraphId, List[SubGraphId]] = {}
+        for sub_id, subjob, sub_dependencies in self.split_streaming(process_graph=process["process_graph"]):
+            subjobs[sub_id] = subjob
+            if sub_dependencies:
+                dependencies[sub_id] = sub_dependencies
 
         return PartitionedJob(
             process=process,
@@ -116,9 +172,7 @@ def split(
         )
 
 
-def resolve_dependencies(
-    process_graph: FlatPG, batch_jobs: Dict[str, BatchJob]
-) -> FlatPG:
+def _resolve_dependencies(process_graph: FlatPG, batch_jobs: Dict[str, BatchJob]) -> FlatPG:
     """
     Replace placeholders in given process graph
     based on given subjob_id to batch_job_id mapping.
@@ -235,9 +289,7 @@ def run_partitioned_job(
             # Handle job (start, poll status, ...)
             if states[subjob_id] == SUBJOB_STATES.READY:
                 try:
-                    process_graph = resolve_dependencies(
-                        subjob.process_graph, batch_jobs=batch_jobs
-                    )
+                    process_graph = _resolve_dependencies(subjob.process_graph, batch_jobs=batch_jobs)
 
                     _log.info(
                         f"Starting new batch job for subjob {subjob_id!r} on backend {subjob.backend_id!r}"
diff --git a/tests/partitionedjobs/test_crossbackend.py b/tests/partitionedjobs/test_crossbackend.py
index 8d1e2c82..9c26740e 100644
--- a/tests/partitionedjobs/test_crossbackend.py
+++ b/tests/partitionedjobs/test_crossbackend.py
@@ -1,5 +1,6 @@
 import dataclasses
 import re
+import types
 from typing import Dict, List, Optional
 from unittest import mock
 
@@ -13,12 +14,13 @@
 from openeo_aggregator.partitionedjobs import PartitionedJob, SubJob
 from openeo_aggregator.partitionedjobs.crossbackend import (
     CrossBackendSplitter,
+    SubGraphId,
     run_partitioned_job,
 )
 
 
 class TestCrossBackendSplitter:
-    def test_simple(self):
+    def test_split_simple(self):
         process_graph = {
             "add": {"process_id": "add", "arguments": {"x": 3, "y": 5}, "result": True}
         }
@@ -26,9 +28,16 @@ def test_simple(self):
         res = splitter.split({"process_graph": process_graph})
 
         assert res.subjobs == {"main": SubJob(process_graph, backend_id=None)}
-        assert res.dependencies == {"main": []}
+        assert res.dependencies == {}
 
-    def test_basic(self):
+    def test_split_streaming_simple(self):
+        process_graph = {"add": {"process_id": "add", "arguments": {"x": 3, "y": 5}, "result": True}}
+        splitter = CrossBackendSplitter(backend_for_collection=lambda cid: "foo")
+        res = splitter.split_streaming(process_graph)
+        assert isinstance(res, types.GeneratorType)
+        assert list(res) == [("main", SubJob(process_graph, backend_id=None), [])]
+
+    def test_split_basic(self):
         process_graph = {
             "lc1": {"process_id": "load_collection", "arguments": {"id": "B1_NDVI"}},
             "lc2": {"process_id": "load_collection", "arguments": {"id": "B2_FAPAR"}},
@@ -93,6 +102,156 @@ def test_basic(self):
         }
         assert res.dependencies == {"main": ["B2:lc2"]}
 
+    def test_split_streaming_basic(self):
+        process_graph = {
+            "lc1": {"process_id": "load_collection", "arguments": {"id": "B1_NDVI"}},
+            "lc2": {"process_id": "load_collection", "arguments": {"id": "B2_FAPAR"}},
+            "mc1": {
+                "process_id": "merge_cubes",
+                "arguments": {
+                    "cube1": {"from_node": "lc1"},
+                    "cube2": {"from_node": "lc2"},
+                },
+            },
+            "sr1": {
+                "process_id": "save_result",
+                "arguments": {"data": {"from_node": "mc1"}, "format": "NetCDF"},
+                "result": True,
+            },
+        }
+        splitter = CrossBackendSplitter(backend_for_collection=lambda cid: cid.split("_")[0])
+        result = splitter.split_streaming(process_graph)
+        assert isinstance(result, types.GeneratorType)
+
+        assert list(result) == [
+            (
+                "B2:lc2",
+                SubJob(
+                    process_graph={
+                        "lc2": {
+                            "process_id": "load_collection",
+                            "arguments": {"id": "B2_FAPAR"},
+                        },
+                        "sr1": {
+                            "process_id": "save_result",
+                            "arguments": {"data": {"from_node": "lc2"}, "format": "GTiff"},
+                            "result": True,
+                        },
+                    },
+                    backend_id="B2",
+                ),
+                [],
+            ),
+            (
+                "main",
+                SubJob(
+                    process_graph={
+                        "lc1": {"process_id": "load_collection", "arguments": {"id": "B1_NDVI"}},
+                        "lc2": {"process_id": "load_result", "arguments": {"id": "_placeholder:B2:lc2"}},
+                        "mc1": {
+                            "process_id": "merge_cubes",
+                            "arguments": {"cube1": {"from_node": "lc1"}, "cube2": {"from_node": "lc2"}},
+                        },
+                        "sr1": {
+                            "process_id": "save_result",
+                            "arguments": {"data": {"from_node": "mc1"}, "format": "NetCDF"},
+                            "result": True,
+                        },
+                    },
+                    backend_id="B1",
+                ),
+                ["B2:lc2"],
+            ),
+        ]
+
+    def test_split_streaming_get_replacement(self):
+        process_graph = {
+            "lc1": {"process_id": "load_collection", "arguments": {"id": "B1_NDVI"}},
+            "lc2": {"process_id": "load_collection", "arguments": {"id": "B2_FAPAR"}},
+            "lc3": {"process_id": "load_collection", "arguments": {"id": "B3_SCL"}},
+            "merge": {
+                "process_id": "merge",
+                "arguments": {
+                    "cube1": {"from_node": "lc1"},
+                    "cube2": {"from_node": "lc2"},
+                    "cube3": {"from_node": "lc3"},
+                },
+                "result": True,
+            },
+        }
+        splitter = CrossBackendSplitter(backend_for_collection=lambda cid: cid.split("_")[0])
+
+        batch_jobs = {}
+
+        def get_replacement(node_id: str, node: dict, subgraph_id: SubGraphId) -> dict:
+            return {
+                node_id: {
+                    "process_id": "load_batch_job",
+                    "arguments": {"batch_job": batch_jobs[subgraph_id]},
+                }
+            }
+
+        substream = splitter.split_streaming(process_graph, get_replacement=get_replacement)
+
+        result = []
+        for subgraph_id, subjob, dependencies in substream:
+            batch_jobs[subgraph_id] = f"job-{111 * (len(batch_jobs) + 1)}"
+            result.append((subgraph_id, subjob, dependencies))
+
+        assert list(result) == [
+            (
+                "B2:lc2",
+                SubJob(
+                    process_graph={
+                        "lc2": {"process_id": "load_collection", "arguments": {"id": "B2_FAPAR"}},
+                        "sr1": {
+                            "process_id": "save_result",
+                            "arguments": {"data": {"from_node": "lc2"}, "format": "GTiff"},
+                            "result": True,
+                        },
+                    },
+                    backend_id="B2",
+                ),
+                [],
+            ),
+            (
+                "B3:lc3",
+                SubJob(
+                    process_graph={
+                        "lc3": {"process_id": "load_collection", "arguments": {"id": "B3_SCL"}},
+                        "sr1": {
+                            "process_id": "save_result",
+                            "arguments": {"data": {"from_node": "lc3"}, "format": "GTiff"},
+                            "result": True,
+                        },
+                    },
+                    backend_id="B3",
+                ),
+                [],
+            ),
+            (
+                "main",
+                SubJob(
+                    process_graph={
+                        "lc1": {"process_id": "load_collection", "arguments": {"id": "B1_NDVI"}},
+                        "lc2": {"process_id": "load_batch_job", "arguments": {"batch_job": "job-111"}},
+                        "lc3": {"process_id": "load_batch_job", "arguments": {"batch_job": "job-222"}},
+                        "merge": {
+                            "process_id": "merge",
+                            "arguments": {
+                                "cube1": {"from_node": "lc1"},
+                                "cube2": {"from_node": "lc2"},
+                                "cube3": {"from_node": "lc3"},
+                            },
+                            "result": True,
+                        },
+                    },
+                    backend_id="B1",
+                ),
+                ["B2:lc2", "B3:lc3"],
+            ),
+        ]
+
 
 @dataclasses.dataclass
 class _FakeJob:

From 56bf3f58c4d2d7f50cbdb619a3bc04e31d54166e Mon Sep 17 00:00:00 2001
From: Stefaan Lippens <stefaan.lippens@vito.be>
Date: Wed, 6 Sep 2023 19:07:32 +0200
Subject: [PATCH 04/11] Issue #115 Initial implementation of "crossbackend"
 splitting through API

---
 setup.py                                      |   2 +-
 src/openeo_aggregator/backend.py              |  82 ++++++--
 .../partitionedjobs/__init__.py               |   1 +
 .../partitionedjobs/crossbackend.py           |   2 +-
 .../partitionedjobs/tracking.py               | 109 +++++++++-
 .../partitionedjobs/zookeeper.py              | 102 ++++++----
 tests/partitionedjobs/test_api.py             | 188 ++++++++++++++++++
 tests/partitionedjobs/test_zookeeper.py       |  15 +-
 8 files changed, 444 insertions(+), 57 deletions(-)

diff --git a/setup.py b/setup.py
index a6df6ca6..053e91a8 100644
--- a/setup.py
+++ b/setup.py
@@ -28,7 +28,7 @@
         "requests",
         "attrs",
         "openeo>=0.17.0",
-        "openeo_driver>=0.57.1.dev",
+        "openeo_driver>=0.65.0.dev",
         "flask~=2.0",
         "gunicorn~=20.0",
         "python-json-logger>=2.0.0",
diff --git a/src/openeo_aggregator/backend.py b/src/openeo_aggregator/backend.py
index deb11cb5..8379262a 100644
--- a/src/openeo_aggregator/backend.py
+++ b/src/openeo_aggregator/backend.py
@@ -85,12 +85,17 @@
 )
 from openeo_aggregator.metadata.reporter import LoggerReporter
 from openeo_aggregator.partitionedjobs import PartitionedJob
+from openeo_aggregator.partitionedjobs.crossbackend import (
+    CrossBackendSplitter,
+    SubGraphId,
+)
 from openeo_aggregator.partitionedjobs.splitting import FlimsySplitter, TileGridSplitter
 from openeo_aggregator.partitionedjobs.tracking import (
     PartitionedJobConnection,
     PartitionedJobTracker,
 )
 from openeo_aggregator.utils import (
+    Clock,
     FlatPG,
     PGWithMetadata,
     dict_merge,
@@ -620,18 +625,29 @@ def get_user_jobs(self, user_id: str) -> Union[List[BatchJobMetadata], dict]:
         })
 
     def create_job(
-            self, user_id: str, process: dict, api_version: str,
-            metadata: dict, job_options: dict = None
+        self,
+        user_id: str,
+        process: dict,
+        api_version: str,
+        metadata: dict,
+        job_options: Optional[dict] = None,
     ) -> BatchJobMetadata:
         if "process_graph" not in process:
             raise ProcessGraphMissingException()
 
         # TODO: better, more generic/specific job_option(s)?
-        if job_options and (
-            job_options.get(JOB_OPTION_SPLIT_STRATEGY)
-            or job_options.get(JOB_OPTION_TILE_GRID)
-        ):
-            return self._create_partitioned_job(
+        if job_options and (job_options.get(JOB_OPTION_SPLIT_STRATEGY) or job_options.get(JOB_OPTION_TILE_GRID)):
+            if job_options.get(JOB_OPTION_SPLIT_STRATEGY) == "crossbackend":
+                # TODO this is temporary feature flag to trigger "crossbackend" splitting
+                return self._create_crossbackend_job(
+                    user_id=user_id,
+                    process=process,
+                    api_version=api_version,
+                    metadata=metadata,
+                    job_options=job_options,
+                )
+            else:
+                return self._create_partitioned_job(
                 user_id=user_id,
                 process=process,
                 api_version=api_version,
@@ -690,8 +706,9 @@ def _create_job_standard(
                 raise OpenEOApiException(f"Failed to create job on backend {backend_id!r}: {e!r}")
         return BatchJobMetadata(
             id=JobIdMapping.get_aggregator_job_id(backend_job_id=job.job_id, backend_id=backend_id),
-            # Note: required, but unused metadata
-            status="dummy", created="dummy", process={"dummy": "dummy"}
+            # Note: additional required, but unused metadata
+            status="dummy",
+            created="dummy",
         )
 
     def _create_partitioned_job(
@@ -719,11 +736,52 @@ def _create_partitioned_job(
             raise ValueError("Could not determine splitting strategy from job options")
         pjob: PartitionedJob = splitter.split(process=process, metadata=metadata, job_options=job_options)
 
-        job_id = self.partitioned_job_tracker.create(user_id=user_id, pjob=pjob, flask_request=flask.request)
+        pjob_id = self.partitioned_job_tracker.create(user_id=user_id, pjob=pjob, flask_request=flask.request)
+
+        return BatchJobMetadata(
+            id=JobIdMapping.get_aggregator_job_id(backend_job_id=pjob_id, backend_id=JobIdMapping.AGG),
+            # Note: additional required, but unused metadata
+            status="dummy",
+            created="dummy",
+        )
+
+    def _create_crossbackend_job(
+        self,
+        user_id: str,
+        process: PGWithMetadata,
+        api_version: str,
+        metadata: dict,
+        job_options: Optional[dict] = None,
+    ) -> BatchJobMetadata:
+        """
+        Advanced/handled batch job creation:
+
+        - split original job in (possibly) multiple sub-jobs,
+          e.g. split the process graph based on `load_collection` availability
+        - distribute sub-jobs across (possibly) multiple back-ends
+        - keep track of them through a "parent job" in a `PartitionedJobTracker`.
+        """
+        if not self.partitioned_job_tracker:
+            raise FeatureUnsupportedException(message="Partitioned job tracking is not supported")
+
+        def backend_for_collection(collection_id) -> str:
+            return self._catalog.get_backends_for_collection(cid=collection_id)[0]
+
+        splitter = CrossBackendSplitter(
+            backend_for_collection=backend_for_collection,
+            # TODO: job option for `always_split` feature?
+            always_split=True,
+        )
+
+        pjob_id = self.partitioned_job_tracker.create_crossbackend_pjob(
+            user_id=user_id, process=process, metadata=metadata, job_options=job_options, splitter=splitter
+        )
 
         return BatchJobMetadata(
-            id=JobIdMapping.get_aggregator_job_id(backend_job_id=job_id, backend_id=JobIdMapping.AGG),
-            status="dummy", created="dummy", process={"dummy": "dummy"}
+            id=JobIdMapping.get_aggregator_job_id(backend_job_id=pjob_id, backend_id=JobIdMapping.AGG),
+            # Note: additional required, but unused metadata
+            status="dummy",
+            created="dummy",
         )
 
     def _get_connection_and_backend_job_id(
diff --git a/src/openeo_aggregator/partitionedjobs/__init__.py b/src/openeo_aggregator/partitionedjobs/__init__.py
index c8230626..7c099506 100644
--- a/src/openeo_aggregator/partitionedjobs/__init__.py
+++ b/src/openeo_aggregator/partitionedjobs/__init__.py
@@ -34,6 +34,7 @@ def to_subjobs_dict(
         """Helper to convert a collection of SubJobs to a dictionary"""
         # TODO: hide this logic in a setter or __init__ (e.g. when outgrowing the constraints of typing.NamedTuple)
         if isinstance(subjobs, Sequence):
+            # TODO: eliminate this `Sequence` code path, and just always work with dict?
             return {f"{i:04d}": j for i, j in enumerate(subjobs)}
         elif isinstance(subjobs, dict):
             return {str(k): v for k, v in subjobs.items()}
diff --git a/src/openeo_aggregator/partitionedjobs/crossbackend.py b/src/openeo_aggregator/partitionedjobs/crossbackend.py
index 0a8a83b5..b74850e5 100644
--- a/src/openeo_aggregator/partitionedjobs/crossbackend.py
+++ b/src/openeo_aggregator/partitionedjobs/crossbackend.py
@@ -93,7 +93,7 @@ def split_streaming(
         (e.g. creating openEO batch jobs on the fly and injecting the corresponding batch job ids appropriately).
 
         :return: tuple containing:
-            - subgraph id
+            - subgraph id, recommended to handle it as opaque id (but usually format '{backend_id}:{node_id}')
             - SubJob
             - dependencies as list of subgraph ids
         """
diff --git a/src/openeo_aggregator/partitionedjobs/tracking.py b/src/openeo_aggregator/partitionedjobs/tracking.py
index 522c6ba4..a684699e 100644
--- a/src/openeo_aggregator/partitionedjobs/tracking.py
+++ b/src/openeo_aggregator/partitionedjobs/tracking.py
@@ -1,13 +1,14 @@
 import collections
 import contextlib
+import dataclasses
 import datetime
 import logging
 import threading
-from typing import List, Optional
+from typing import Dict, List, Optional, Union
 
 import flask
 from openeo.api.logs import LogEntry
-from openeo.rest.job import ResultAsset
+from openeo.rest.job import BatchJob, ResultAsset
 from openeo.util import TimingLogger, rfc3339
 from openeo_driver.errors import JobNotFinishedException
 from openeo_driver.users import User
@@ -21,10 +22,15 @@
     STATUS_INSERTED,
     STATUS_RUNNING,
     PartitionedJob,
+    SubJob,
+)
+from openeo_aggregator.partitionedjobs.crossbackend import (
+    CrossBackendSplitter,
+    SubGraphId,
 )
 from openeo_aggregator.partitionedjobs.splitting import TileGridSplitter
 from openeo_aggregator.partitionedjobs.zookeeper import ZooKeeperPartitionedJobDB
-from openeo_aggregator.utils import _UNSET, timestamp_to_rfc3339
+from openeo_aggregator.utils import _UNSET, Clock, PGWithMetadata, timestamp_to_rfc3339
 
 _log = logging.getLogger(__name__)
 
@@ -57,6 +63,103 @@ def create(self, user_id: str, pjob: PartitionedJob, flask_request: flask.Reques
         self.create_sjobs(user_id=user_id, pjob_id=pjob_id, flask_request=flask_request)
         return pjob_id
 
+    def create_crossbackend_pjob(
+        self,
+        *,
+        user_id: str,
+        process: PGWithMetadata,
+        metadata: dict,
+        job_options: Optional[dict] = None,
+        splitter: CrossBackendSplitter,
+    ) -> str:
+        """
+        crossbackend partitioned job creation is different from original partitioned
+        job creation due to dependencies between jobs.
+        First the batch jobs have to be created in the right order on the respective backends
+        before we have finalised sub-processgraphs, whose metadata can then be persisted in the ZooKeeperPartitionedJobDB
+        """
+        # Start with reserving a new partitioned job id based on initial metadata
+        pjob_node_value = self._db.serialize(
+            user_id=user_id,
+            created=Clock.time(),
+            process=process,
+            metadata=metadata,
+            job_options=job_options,
+        )
+        pjob_id = self._db.obtain_new_pjob_id(user_id=user_id, initial_value=pjob_node_value)
+        self._db.set_pjob_status(user_id=user_id, pjob_id=pjob_id, status=STATUS_INSERTED, create=True)
+
+        # Create batch jobs on respective backends, and build the PartitionedJob components along the way
+        subjobs: Dict[str, SubJob] = {}
+        dependencies: Dict[str, List[str]] = {}
+        batch_jobs: Dict[SubGraphId, BatchJob] = {}
+        create_stats = collections.Counter()
+
+        def get_replacement(node_id: str, node: dict, subgraph_id: SubGraphId) -> dict:
+            # TODO: use `load_stac` iso `load_result`, and use canonical URL?
+            nonlocal batch_jobs
+            job_id = batch_jobs[subgraph_id].job_id
+            return {
+                node_id: {
+                    "process_id": "load_result",
+                    "arguments": {"id": job_id},
+                }
+            }
+
+        for sjob_id, subjob, subjob_dependencies in splitter.split_streaming(
+            process_graph=process["process_graph"], get_replacement=get_replacement
+        ):
+            subjobs[sjob_id] = subjob
+            dependencies[sjob_id] = subjob_dependencies
+            try:
+                # TODO: how to error handle this? job creation? Fail whole partitioned job or try to finish what is possible?
+                con = self._backends.get_connection(subjob.backend_id)
+                with con.authenticated_from_request(request=flask.request), con.override(
+                    default_timeout=CONNECTION_TIMEOUT_JOB_START
+                ):
+                    with TimingLogger(title=f"Create batch job {pjob_id=}:{sjob_id} on {con.id=}", logger=_log.info):
+                        job = con.create_job(
+                            process_graph=subjob.process_graph,
+                            title=f"Crossbackend job {pjob_id}:{sjob_id}",
+                            plan=metadata.get("plan"),
+                            budget=metadata.get("budget"),
+                            additional=job_options,
+                        )
+                        _log.info(f"Created {pjob_id}:{sjob_id} on backend {con.id} as batch job {job.job_id}")
+                        batch_jobs[sjob_id] = job
+                        title = f"Partitioned job {pjob_id=} {sjob_id=}"
+                        self._db.insert_sjob(
+                            user_id=user_id,
+                            pjob_id=pjob_id,
+                            sjob_id=sjob_id,
+                            subjob=subjob,
+                            title=title,
+                            status=STATUS_CREATED,
+                        )
+                        self._db.set_backend_job_id(
+                            user_id=user_id, pjob_id=pjob_id, sjob_id=sjob_id, job_id=job.job_id
+                        )
+                        create_stats[STATUS_CREATED] += 1
+            except Exception as exc:
+                _log.error(f"Creation of {pjob_id}:{sjob_id} failed", exc_info=True)
+                msg = f"Create failed: {exc}"
+                self._db.set_sjob_status(
+                    user_id=user_id, pjob_id=pjob_id, sjob_id=sjob_id, status=STATUS_ERROR, message=msg
+                )
+                create_stats[STATUS_ERROR] += 1
+
+        # TODO: this is currently unused, don't bother building it at all?
+        partitioned_job = PartitionedJob(
+            process=process, metadata=metadata, job_options=job_options, subjobs=subjobs, dependencies=dependencies
+        )
+
+        pjob_status = STATUS_CREATED if create_stats[STATUS_CREATED] > 0 else STATUS_ERROR
+        self._db.set_pjob_status(
+            user_id=user_id, pjob_id=pjob_id, status=pjob_status, message=repr(create_stats), progress=0
+        )
+
+        return pjob_id
+
     def create_sjobs(self, user_id: str, pjob_id: str, flask_request: flask.Request):
         """Create all sub-jobs on remote back-end for given partitioned job"""
         pjob_metadata = self._db.get_pjob_metadata(user_id=user_id, pjob_id=pjob_id)
diff --git a/src/openeo_aggregator/partitionedjobs/zookeeper.py b/src/openeo_aggregator/partitionedjobs/zookeeper.py
index fe920fd5..c93269a5 100644
--- a/src/openeo_aggregator/partitionedjobs/zookeeper.py
+++ b/src/openeo_aggregator/partitionedjobs/zookeeper.py
@@ -12,6 +12,7 @@
     STATUS_INSERTED,
     PartitionedJob,
     PartitionedJobFailure,
+    SubJob,
 )
 from openeo_aggregator.utils import Clock, strip_join, timestamp_to_rfc3339
 
@@ -90,6 +91,21 @@ def deserialize(value: bytes) -> dict:
         """Deserialize bytes (assuming UTF8 encoded JSON mapping)"""
         return json.loads(value.decode("utf8"))
 
+    def obtain_new_pjob_id(self, user_id: str, initial_value: bytes = b"", attempts: int = 3) -> str:
+        """Obtain new, unique partitioned job id"""
+        # A couple of pjob_id attempts: start with current time based name and a suffix to counter collisions (if any)
+        base_pjob_id = "pj-" + Clock.utcnow().strftime("%Y%m%d-%H%M%S")
+        for pjob_id in [base_pjob_id] + [f"{base_pjob_id}-{i}" for i in range(1, attempts)]:
+            try:
+                self._client.create(path=self._path(user_id, pjob_id), value=initial_value, makepath=True)
+                # We obtained our unique id
+                return pjob_id
+            except NodeExistsError:
+                # TODO: check that NodeExistsError is thrown on existing job_ids
+                # TODO: add a sleep() to back off a bit?
+                continue
+        raise PartitionedJobFailure("Too much attempts to create new pjob_id")
+
     def insert(self, user_id: str, pjob: PartitionedJob) -> str:
         """
         Insert a new partitioned job.
@@ -98,7 +114,7 @@ def insert(self, user_id: str, pjob: PartitionedJob) -> str:
         """
         with self._connect():
             # Insert parent node, with "static" (write once) metadata as associated data
-            job_node_value = self.serialize(
+            pjob_node_value = self.serialize(
                 user_id=user_id,
                 # TODO: more BatchJobMetdata fields
                 created=Clock.time(),
@@ -107,24 +123,9 @@ def insert(self, user_id: str, pjob: PartitionedJob) -> str:
                 job_options=pjob.job_options,
                 # TODO: pjob.dependencies #95
             )
-            # A couple of pjob_id attempts: start with current time based name and a suffix to counter collisions (if any)
-            base_pjob_id = "pj-" + Clock.utcnow().strftime("%Y%m%d-%H%M%S")
-            for pjob_id in [base_pjob_id] + [f"{base_pjob_id}-{i}" for i in range(1, 3)]:
-                try:
-                    self._client.create(path=self._path(user_id, pjob_id), value=job_node_value, makepath=True)
-                    break
-                except NodeExistsError:
-                    # TODO: check that NodeExistsError is thrown on existing job_ids
-                    # TODO: add a sleep() to back off a bit?
-                    continue
-            else:
-                raise PartitionedJobFailure("Too much attempts to create new pjob_id")
-
+            pjob_id = self.obtain_new_pjob_id(user_id=user_id, initial_value=pjob_node_value)
             # Updatable metadata
-            self._client.create(
-                path=self._path(user_id, pjob_id, "status"),
-                value=self.serialize(status=STATUS_INSERTED)
-            )
+            self.set_pjob_status(user_id=user_id, pjob_id=pjob_id, status=STATUS_INSERTED, create=True)
 
             # Insert subjobs
             # TODO #95 some subjobs are not fully defined if they have dependencies
@@ -132,22 +133,27 @@ def insert(self, user_id: str, pjob: PartitionedJob) -> str:
             #       Only create them when fully concrete,,
             #       or allow updates on this metadata?
             for i, (sjob_id, subjob) in enumerate(pjob.subjobs.items()):
-                self._client.create(
-                    path=self._path(user_id, pjob_id, "sjobs", sjob_id),
-                    value=self.serialize(
-                        process_graph=subjob.process_graph,
-                        backend_id=subjob.backend_id,
-                        title=f"Partitioned job {pjob_id} part {sjob_id} ({i + 1}/{len(pjob.subjobs)})",
-                    ),
-                    makepath=True,
-                )
-                self._client.create(
-                    path=self._path(user_id, pjob_id, "sjobs", sjob_id, "status"),
-                    value=self.serialize(status=STATUS_INSERTED),
-                )
+                title = f"Partitioned job {pjob_id} part {sjob_id} ({i + 1}/{len(pjob.subjobs)})"
+                self.insert_sjob(user_id=user_id, pjob_id=pjob_id, sjob_id=sjob_id, subjob=subjob, title=title)
 
         return pjob_id
 
+    def insert_sjob(
+        self,
+        user_id: str,
+        pjob_id: str,
+        sjob_id: str,
+        subjob: SubJob,
+        title: Optional[str] = None,
+        status: str = STATUS_INSERTED,
+    ):
+        self._client.create(
+            path=self._path(user_id, pjob_id, "sjobs", sjob_id),
+            value=self.serialize(process_graph=subjob.process_graph, backend_id=subjob.backend_id, title=title),
+            makepath=True,
+        )
+        self.set_sjob_status(user_id=user_id, pjob_id=pjob_id, sjob_id=sjob_id, status=status, create=True)
+
     def get_pjob_metadata(self, user_id: str, pjob_id: str) -> dict:
         """Get metadata of partitioned job, given by storage id."""
         with self._connect():
@@ -194,20 +200,32 @@ def get_backend_job_id(self, user_id: str, pjob_id: str, sjob_id: str) -> str:
                 raise NoJobIdForSubJobException(f"No job_id for {pjob_id}:{sjob_id}.")
             return self.deserialize(value)["job_id"]
 
-    def set_pjob_status(self, user_id: str, pjob_id: str, status: str, message: Optional[str] = None,
-                        progress: int = None):
+    def set_pjob_status(
+        self,
+        user_id: str,
+        pjob_id: str,
+        status: str,
+        message: Optional[str] = None,
+        progress: int = None,
+        create: bool = False,
+    ):
         """
         Store status of partitioned job (with optional message).
 
         :param pjob_id: (storage) id of partitioned job
         :param status: global status of partitioned job
         :param message: optional message, e.g. describing error
+        :param create: whether to create the node instead of updating
         """
         with self._connect():
-            self._client.set(
+            kwargs = dict(
                 path=self._path(user_id, pjob_id, "status"),
                 value=self.serialize(status=status, message=message, timestamp=Clock.time(), progress=progress)
             )
+            if create:
+                self._client.create(**kwargs)
+            else:
+                self._client.set(**kwargs)
 
     def get_pjob_status(self, user_id: str, pjob_id: str) -> dict:
         """
@@ -222,13 +240,25 @@ def get_pjob_status(self, user_id: str, pjob_id: str) -> dict:
             value, stat = self._client.get(self._path(user_id, pjob_id, "status"))
             return self.deserialize(value)
 
-    def set_sjob_status(self, user_id: str, pjob_id: str, sjob_id: str, status: str, message: Optional[str] = None):
+    def set_sjob_status(
+        self,
+        user_id: str,
+        pjob_id: str,
+        sjob_id: str,
+        status: str,
+        message: Optional[str] = None,
+        create: bool = False,
+    ):
         """Store status of sub-job (with optional message)"""
         with self._connect():
-            self._client.set(
+            kwargs = dict(
                 path=self._path(user_id, pjob_id, "sjobs", sjob_id, "status"),
                 value=self.serialize(status=status, message=message, timestamp=Clock.time()),
             )
+            if create:
+                self._client.create(**kwargs)
+            else:
+                self._client.set(**kwargs)
 
     def get_sjob_status(self, user_id: str, pjob_id: str, sjob_id: str) -> dict:
         """Get status of sub-job"""
diff --git a/tests/partitionedjobs/test_api.py b/tests/partitionedjobs/test_api.py
index 14bd3f20..89f5d691 100644
--- a/tests/partitionedjobs/test_api.py
+++ b/tests/partitionedjobs/test_api.py
@@ -33,6 +33,7 @@ class _Now:
     """Helper to mock "now" to given datetime"""
 
     # TODO: move to testing utilities and reuse  more?
+    # TODO: just migrate to a more standard time mock library like time_machine?
 
     def __init__(self, date: str):
         self.rfc3339 = rfc3339.normalize(date)
@@ -609,3 +610,190 @@ def test_job_results_basic(self, flask_app, api100, dummy1):
         assert res.json == DictSubSet({"type": "FeatureCollection"})
 
     # TODO: more/full TileGridSplitter batch job tests
+
+
+class TestCrossBackendSplitting:
+    now = _Now("2022-01-19T12:34:56Z")
+
+    @now.mock
+    def test_create_job_simple(self, flask_app, api100, zk_db, dummy1):
+        """Handling of single "load_collection" process graph"""
+        api100.set_auth_bearer_token(token=TEST_USER_BEARER_TOKEN)
+
+        pg = {"lc1": {"process_id": "load_collection", "arguments": {"id": "S2"}, "result": True}}
+
+        res = api100.post(
+            "/jobs",
+            json={
+                "process": {"process_graph": pg},
+                "job_options": {"split_strategy": "crossbackend"},
+            },
+        ).assert_status_code(201)
+
+        pjob_id = "pj-20220119-123456"
+        expected_job_id = f"agg-{pjob_id}"
+        assert res.headers["Location"] == f"http://oeoa.test/openeo/1.0.0/jobs/{expected_job_id}"
+        assert res.headers["OpenEO-Identifier"] == expected_job_id
+
+        res = api100.get(f"/jobs/{expected_job_id}").assert_status_code(200)
+        assert res.json == {
+            "id": expected_job_id,
+            "process": {"process_graph": pg},
+            "status": "created",
+            "created": self.now.rfc3339,
+            "progress": 0,
+        }
+
+        # Inspect stored parent job metadata
+        assert zk_db.get_pjob_metadata(user_id=TEST_USER, pjob_id=pjob_id) == {
+            "user_id": TEST_USER,
+            "created": self.now.epoch,
+            "process": {"process_graph": pg},
+            "metadata": {},
+            "job_options": {"split_strategy": "crossbackend"},
+        }
+
+        assert zk_db.get_pjob_status(user_id=TEST_USER, pjob_id=pjob_id) == {
+            "status": "created",
+            "message": approx_str_contains("{'created': 1}"),
+            "timestamp": pytest.approx(self.now.epoch, abs=1),
+            "progress": 0,
+        }
+
+        # Inspect stored subjob metadata
+        subjobs = zk_db.list_subjobs(user_id=TEST_USER, pjob_id=pjob_id)
+        assert subjobs == {
+            "main": {
+                "backend_id": "b1",
+                "process_graph": {"lc1": {"arguments": {"id": "S2"}, "process_id": "load_collection", "result": True}},
+                "title": "Partitioned job pjob_id='pj-20220119-123456' " "sjob_id='main'",
+            }
+        }
+        sjob_id = "main"
+        assert zk_db.get_sjob_status(user_id=TEST_USER, pjob_id=pjob_id, sjob_id=sjob_id) == {
+            "status": "created",
+            "timestamp": pytest.approx(self.now.epoch, abs=1),
+            "message": None,
+        }
+        job_id = zk_db.get_backend_job_id(user_id=TEST_USER, pjob_id=pjob_id, sjob_id=sjob_id)
+        assert job_id == "1-jb-0"
+
+        assert dummy1.get_job_status(TEST_USER, job_id) == "created"
+        pg = dummy1.get_job_data(TEST_USER, job_id).create["process"]["process_graph"]
+        assert pg == {"lc1": {"arguments": {"id": "S2"}, "process_id": "load_collection", "result": True}}
+
+    @now.mock
+    def test_create_job_basic(self, flask_app, api100, zk_db, dummy1):
+        api100.set_auth_bearer_token(token=TEST_USER_BEARER_TOKEN)
+
+        pg = {
+            "lc1": {"process_id": "load_collection", "arguments": {"id": "S2"}},
+            "lc2": {"process_id": "load_collection", "arguments": {"id": "S2"}},
+            "merge": {
+                "process_id": "merge_cubes",
+                "arguments": {"cube1": {"from_node": "lc1"}, "cube2": {"from_node": "lc2"}},
+                "result": True,
+            },
+        }
+
+        res = api100.post(
+            "/jobs",
+            json={"process": {"process_graph": pg}, "job_options": {"split_strategy": "crossbackend"}},
+        ).assert_status_code(201)
+
+        pjob_id = "pj-20220119-123456"
+        expected_job_id = f"agg-{pjob_id}"
+        assert res.headers["Location"] == f"http://oeoa.test/openeo/1.0.0/jobs/{expected_job_id}"
+        assert res.headers["OpenEO-Identifier"] == expected_job_id
+
+        res = api100.get(f"/jobs/{expected_job_id}").assert_status_code(200)
+        assert res.json == {
+            "id": expected_job_id,
+            "process": {"process_graph": pg},
+            "status": "created",
+            "created": self.now.rfc3339,
+            "progress": 0,
+        }
+
+        # Inspect stored parent job metadata
+        assert zk_db.get_pjob_metadata(user_id=TEST_USER, pjob_id=pjob_id) == {
+            "user_id": TEST_USER,
+            "created": self.now.epoch,
+            "process": {"process_graph": pg},
+            "metadata": {},
+            "job_options": {"split_strategy": "crossbackend"},
+        }
+
+        assert zk_db.get_pjob_status(user_id=TEST_USER, pjob_id=pjob_id) == {
+            "status": "created",
+            "message": approx_str_contains("{'created': 2}"),
+            "timestamp": pytest.approx(self.now.epoch, abs=5),
+            "progress": 0,
+        }
+
+        # Inspect stored subjob metadata
+        subjobs = zk_db.list_subjobs(user_id=TEST_USER, pjob_id=pjob_id)
+        assert subjobs == {
+            "b1:lc2": {
+                "backend_id": "b1",
+                "process_graph": {
+                    "lc2": {"process_id": "load_collection", "arguments": {"id": "S2"}},
+                    "sr1": {
+                        "process_id": "save_result",
+                        "arguments": {"data": {"from_node": "lc2"}, "format": "GTiff"},
+                        "result": True,
+                    },
+                },
+                "title": "Partitioned job pjob_id='pj-20220119-123456' sjob_id='b1:lc2'",
+            },
+            "main": {
+                "backend_id": "b1",
+                "process_graph": {
+                    "lc1": {"process_id": "load_collection", "arguments": {"id": "S2"}},
+                    "lc2": {"process_id": "load_result", "arguments": {"id": "1-jb-0"}},
+                    "merge": {
+                        "process_id": "merge_cubes",
+                        "arguments": {"cube1": {"from_node": "lc1"}, "cube2": {"from_node": "lc2"}},
+                        "result": True,
+                    },
+                },
+                "title": "Partitioned job pjob_id='pj-20220119-123456' sjob_id='main'",
+            },
+        }
+
+        sjob_id = "main"
+        expected_job_id = "1-jb-1"
+        assert zk_db.get_sjob_status(user_id=TEST_USER, pjob_id=pjob_id, sjob_id=sjob_id) == {
+            "status": "created",
+            "timestamp": self.now.epoch,
+            "message": None,
+        }
+        assert zk_db.get_backend_job_id(user_id=TEST_USER, pjob_id=pjob_id, sjob_id=sjob_id) == expected_job_id
+        assert dummy1.get_job_status(TEST_USER, expected_job_id) == "created"
+        assert dummy1.get_job_data(TEST_USER, expected_job_id).create["process"]["process_graph"] == {
+            "lc1": {"process_id": "load_collection", "arguments": {"id": "S2"}},
+            "lc2": {"process_id": "load_result", "arguments": {"id": "1-jb-0"}},
+            "merge": {
+                "process_id": "merge_cubes",
+                "arguments": {"cube1": {"from_node": "lc1"}, "cube2": {"from_node": "lc2"}},
+                "result": True,
+            },
+        }
+
+        sjob_id = "b1:lc2"
+        expected_job_id = "1-jb-0"
+        assert zk_db.get_sjob_status(user_id=TEST_USER, pjob_id=pjob_id, sjob_id=sjob_id) == {
+            "status": "created",
+            "timestamp": self.now.epoch,
+            "message": None,
+        }
+        assert zk_db.get_backend_job_id(user_id=TEST_USER, pjob_id=pjob_id, sjob_id=sjob_id) == expected_job_id
+        assert dummy1.get_job_status(TEST_USER, expected_job_id) == "created"
+        assert dummy1.get_job_data(TEST_USER, expected_job_id).create["process"]["process_graph"] == {
+            "lc2": {"process_id": "load_collection", "arguments": {"id": "S2"}},
+            "sr1": {
+                "process_id": "save_result",
+                "arguments": {"data": {"from_node": "lc2"}, "format": "GTiff"},
+                "result": True,
+            },
+        }
diff --git a/tests/partitionedjobs/test_zookeeper.py b/tests/partitionedjobs/test_zookeeper.py
index bb926ada..923e044d 100644
--- a/tests/partitionedjobs/test_zookeeper.py
+++ b/tests/partitionedjobs/test_zookeeper.py
@@ -66,6 +66,9 @@ def test_insert_basic(self, pjob, zk_client, zk_db):
             },
             "/o-a/tstsr/pj-20220117-174800/status": {
                 "status": "inserted",
+                "message": None,
+                "timestamp": approx_now(),
+                "progress": None,
             },
             "/o-a/tstsr/pj-20220117-174800/sjobs/0000": {
                 "process_graph": PG12,
@@ -73,7 +76,9 @@ def test_insert_basic(self, pjob, zk_client, zk_db):
                 "title": "Partitioned job pj-20220117-174800 part 0000 (1/2)"
             },
             "/o-a/tstsr/pj-20220117-174800/sjobs/0000/status": {
-                "status": "inserted"
+                "status": "inserted",
+                "message": None,
+                "timestamp": approx_now(),
             },
             "/o-a/tstsr/pj-20220117-174800/sjobs/0001": {
                 "process_graph": PG23,
@@ -81,7 +86,9 @@ def test_insert_basic(self, pjob, zk_client, zk_db):
                 "title": "Partitioned job pj-20220117-174800 part 0001 (2/2)"
             },
             "/o-a/tstsr/pj-20220117-174800/sjobs/0001/status": {
-                "status": "inserted"
+                "status": "inserted",
+                "message": None,
+                "timestamp": approx_now(),
             },
         }
 
@@ -146,7 +153,7 @@ def test_set_get_pjob_status(self, pjob, zk_db):
         zk_db.insert(pjob=pjob, user_id=TEST_USER)
 
         status = zk_db.get_pjob_status(user_id=TEST_USER, pjob_id="pj-20220117-174800")
-        assert status == {"status": "inserted"}
+        assert status == {"status": "inserted", "message": None, "timestamp": approx_now(), "progress": None}
 
         zk_db.set_pjob_status(user_id=TEST_USER, pjob_id="pj-20220117-174800", status="running", message="goin' on")
         status = zk_db.get_pjob_status(user_id=TEST_USER, pjob_id="pj-20220117-174800")
@@ -166,7 +173,7 @@ def test_set_get_sjob_status(self, pjob, zk_db):
         zk_db.insert(pjob=pjob, user_id=TEST_USER)
 
         status = zk_db.get_sjob_status(user_id=TEST_USER, pjob_id="pj-20220117-174800", sjob_id="0000")
-        assert status == {"status": "inserted"}
+        assert status == {"status": "inserted", "message": None, "timestamp": approx_now()}
 
         zk_db.set_sjob_status(
             user_id=TEST_USER, pjob_id="pj-20220117-174800", sjob_id="0000", status="running",

From f402fa6eaf4f269c7dd5074e6c1200fb8f84c33c Mon Sep 17 00:00:00 2001
From: Stefaan Lippens <stefaan.lippens@vito.be>
Date: Thu, 7 Sep 2023 14:10:11 +0200
Subject: [PATCH 05/11] Issue #115 only collect assets of main subjob for
 crossbackend job

---
 .../partitionedjobs/crossbackend.py           |  3 +-
 .../partitionedjobs/tracking.py               | 24 +++++-
 tests/partitionedjobs/test_api.py             | 76 +++++++++++++++++++
 3 files changed, 99 insertions(+), 4 deletions(-)

diff --git a/src/openeo_aggregator/partitionedjobs/crossbackend.py b/src/openeo_aggregator/partitionedjobs/crossbackend.py
index b74850e5..1a33de0b 100644
--- a/src/openeo_aggregator/partitionedjobs/crossbackend.py
+++ b/src/openeo_aggregator/partitionedjobs/crossbackend.py
@@ -82,6 +82,7 @@ def split_streaming(
         self,
         process_graph: FlatPG,
         get_replacement: GetReplacementCallable = _default_get_replacement,
+        main_subgraph_id: SubGraphId = "main",
     ) -> Iterator[Tuple[SubGraphId, SubJob, List[SubGraphId]]]:
         """
         Split given process graph in sub-process graphs and return these as an iterator
@@ -113,7 +114,7 @@ def split_streaming(
         secondary_backends = {b for b in backend_usage if b != primary_backend}
         _log.info(f"Backend split: {primary_backend=} {secondary_backends=}")
 
-        primary_id = "main"
+        primary_id = main_subgraph_id
         primary_pg = {}
         primary_has_load_collection = False
         primary_dependencies = []
diff --git a/src/openeo_aggregator/partitionedjobs/tracking.py b/src/openeo_aggregator/partitionedjobs/tracking.py
index a684699e..129fae2f 100644
--- a/src/openeo_aggregator/partitionedjobs/tracking.py
+++ b/src/openeo_aggregator/partitionedjobs/tracking.py
@@ -32,6 +32,8 @@
 from openeo_aggregator.partitionedjobs.zookeeper import ZooKeeperPartitionedJobDB
 from openeo_aggregator.utils import _UNSET, Clock, PGWithMetadata, timestamp_to_rfc3339
 
+PJOB_METADATA_FIELD_RESULT_JOBS = "result_jobs"
+
 _log = logging.getLogger(__name__)
 
 
@@ -79,12 +81,16 @@ def create_crossbackend_pjob(
         before we have finalised sub-processgraphs, whose metadata can then be persisted in the ZooKeeperPartitionedJobDB
         """
         # Start with reserving a new partitioned job id based on initial metadata
+        main_subgraph_id = "main"
         pjob_node_value = self._db.serialize(
             user_id=user_id,
             created=Clock.time(),
             process=process,
             metadata=metadata,
             job_options=job_options,
+            **{
+                PJOB_METADATA_FIELD_RESULT_JOBS: [main_subgraph_id],
+            },
         )
         pjob_id = self._db.obtain_new_pjob_id(user_id=user_id, initial_value=pjob_node_value)
         self._db.set_pjob_status(user_id=user_id, pjob_id=pjob_id, status=STATUS_INSERTED, create=True)
@@ -107,7 +113,7 @@ def get_replacement(node_id: str, node: dict, subgraph_id: SubGraphId) -> dict:
             }
 
         for sjob_id, subjob, subjob_dependencies in splitter.split_streaming(
-            process_graph=process["process_graph"], get_replacement=get_replacement
+            process_graph=process["process_graph"], get_replacement=get_replacement, main_subgraph_id=main_subgraph_id
         ):
             subjobs[sjob_id] = subjob
             dependencies[sjob_id] = subjob_dependencies
@@ -380,9 +386,21 @@ def get_assets(self, user_id: str, pjob_id: str, flask_request: flask.Request) -
         # TODO: do a sync if latest sync is too long ago?
         pjob_metadata = self._db.get_pjob_metadata(user_id=user_id, pjob_id=pjob_id)
         sjobs = self._db.list_subjobs(user_id=user_id, pjob_id=pjob_id)
+        if pjob_metadata.get(PJOB_METADATA_FIELD_RESULT_JOBS):
+            result_jobs = set(pjob_metadata[PJOB_METADATA_FIELD_RESULT_JOBS])
+            result_sjob_ids = [s for s in sjobs if s in result_jobs]
+            log_msg = f"Collect {pjob_id} subjob assets: subset {result_sjob_ids} (from {len(sjobs)})"
+        else:
+            # Collect results of all subjobs by default
+            result_sjob_ids = list(sjobs.keys())
+            log_msg = f"Collect {pjob_id} subjob assets: all {len(sjobs)})"
+
         assets = []
-        with TimingLogger(title=f"Collect assets of {pjob_id} ({len(sjobs)} sub-jobs)", logger=_log):
-            for sjob_id, sjob_metadata in sjobs.items():
+        with TimingLogger(title=log_msg, logger=_log):
+            for sjob_id in result_sjob_ids:
+                sjob_metadata = sjobs[sjob_id]
+
+                # TODO: skip subjobs that are just dependencies for a main/grouping job
                 sjob_status = self._db.get_sjob_status(user_id=user_id, pjob_id=pjob_id, sjob_id=sjob_id)["status"]
                 if sjob_status in {STATUS_INSERTED, STATUS_CREATED, STATUS_RUNNING}:
                     raise JobNotFinishedException
diff --git a/tests/partitionedjobs/test_api.py b/tests/partitionedjobs/test_api.py
index 89f5d691..4a98f595 100644
--- a/tests/partitionedjobs/test_api.py
+++ b/tests/partitionedjobs/test_api.py
@@ -44,6 +44,7 @@ def __init__(self, date: str):
 
 @pytest.fixture
 def dummy1(backend1, requests_mock) -> DummyBackend:
+    # TODO: rename this fixture to dummy_backed1
     dummy = DummyBackend(requests_mock=requests_mock, backend_url=backend1, job_id_template="1-jb-{i}")
     dummy.setup_basic_requests_mocks()
     dummy.register_user(bearer_token=TEST_USER_BEARER_TOKEN, user_id=TEST_USER)
@@ -651,6 +652,7 @@ def test_create_job_simple(self, flask_app, api100, zk_db, dummy1):
             "process": {"process_graph": pg},
             "metadata": {},
             "job_options": {"split_strategy": "crossbackend"},
+            "result_jobs": ["main"],
         }
 
         assert zk_db.get_pjob_status(user_id=TEST_USER, pjob_id=pjob_id) == {
@@ -722,6 +724,7 @@ def test_create_job_basic(self, flask_app, api100, zk_db, dummy1):
             "process": {"process_graph": pg},
             "metadata": {},
             "job_options": {"split_strategy": "crossbackend"},
+            "result_jobs": ["main"],
         }
 
         assert zk_db.get_pjob_status(user_id=TEST_USER, pjob_id=pjob_id) == {
@@ -797,3 +800,76 @@ def test_create_job_basic(self, flask_app, api100, zk_db, dummy1):
                 "result": True,
             },
         }
+
+    @now.mock
+    def test_start_and_job_results(self, flask_app, api100, zk_db, dummy1):
+        """Run the jobs and get results"""
+        api100.set_auth_bearer_token(token=TEST_USER_BEARER_TOKEN)
+
+        pg = {
+            "lc1": {"process_id": "load_collection", "arguments": {"id": "S2"}},
+            "lc2": {"process_id": "load_collection", "arguments": {"id": "S2"}},
+            "merge": {
+                "process_id": "merge_cubes",
+                "arguments": {"cube1": {"from_node": "lc1"}, "cube2": {"from_node": "lc2"}},
+                "result": True,
+            },
+        }
+
+        res = api100.post(
+            "/jobs",
+            json={
+                "process": {"process_graph": pg},
+                "job_options": {"split_strategy": "crossbackend"},
+            },
+        ).assert_status_code(201)
+
+        pjob_id = "pj-20220119-123456"
+        expected_job_id = f"agg-{pjob_id}"
+        assert res.headers["OpenEO-Identifier"] == expected_job_id
+
+        res = api100.get(f"/jobs/{expected_job_id}").assert_status_code(200)
+        assert res.json == {
+            "id": expected_job_id,
+            "process": {"process_graph": pg},
+            "status": "created",
+            "created": self.now.rfc3339,
+            "progress": 0,
+        }
+
+        # start job
+        api100.post(f"/jobs/{expected_job_id}/results").assert_status_code(202)
+        dummy1.set_job_status(TEST_USER, "1-jb-0", status="running")
+        dummy1.set_job_status(TEST_USER, "1-jb-1", status="queued")
+        res = api100.get(f"/jobs/{expected_job_id}").assert_status_code(200)
+        assert res.json == DictSubSet({"id": expected_job_id, "status": "running", "progress": 0})
+
+        # First job is ready
+        dummy1.set_job_status(TEST_USER, "1-jb-0", status="finished")
+        dummy1.setup_assets(job_id=f"1-jb-0", assets=["1-jb-0-result.tif"])
+        dummy1.set_job_status(TEST_USER, "1-jb-1", status="running")
+        res = api100.get(f"/jobs/{expected_job_id}").assert_status_code(200)
+        assert res.json == DictSubSet({"id": expected_job_id, "status": "running", "progress": 50})
+
+        # Main job is ready too
+        dummy1.set_job_status(TEST_USER, "1-jb-1", status="finished")
+        dummy1.setup_assets(job_id=f"1-jb-1", assets=["1-jb-1-result.tif"])
+        res = api100.get(f"/jobs/{expected_job_id}").assert_status_code(200)
+        assert res.json == DictSubSet({"id": expected_job_id, "status": "finished", "progress": 100})
+
+        # Get results
+        res = api100.get(f"/jobs/{expected_job_id}/results").assert_status_code(200)
+        assert res.json == DictSubSet(
+            {
+                "id": expected_job_id,
+                "assets": {
+                    "main-1-jb-1-result.tif": {
+                        "file:nodata": [None],
+                        "href": "https://b1.test/v1/jobs/1-jb-1/results/1-jb-1-result.tif",
+                        "roles": ["data"],
+                        "title": "main-1-jb-1-result.tif",
+                        "type": "application/octet-stream",
+                    },
+                },
+            }
+        )

From ca32d6097ac7bacd059827e05056073554ff219c Mon Sep 17 00:00:00 2001
From: Stefaan Lippens <stefaan.lippens@vito.be>
Date: Thu, 7 Sep 2023 17:54:13 +0200
Subject: [PATCH 06/11] Issue #115 fix ConnectionClosedError issue

---
 .../partitionedjobs/zookeeper.py              | 32 ++++++++++---------
 src/openeo_aggregator/testing.py              |  9 ++++++
 2 files changed, 26 insertions(+), 15 deletions(-)

diff --git a/src/openeo_aggregator/partitionedjobs/zookeeper.py b/src/openeo_aggregator/partitionedjobs/zookeeper.py
index c93269a5..a14230de 100644
--- a/src/openeo_aggregator/partitionedjobs/zookeeper.py
+++ b/src/openeo_aggregator/partitionedjobs/zookeeper.py
@@ -95,15 +95,16 @@ def obtain_new_pjob_id(self, user_id: str, initial_value: bytes = b"", attempts:
         """Obtain new, unique partitioned job id"""
         # A couple of pjob_id attempts: start with current time based name and a suffix to counter collisions (if any)
         base_pjob_id = "pj-" + Clock.utcnow().strftime("%Y%m%d-%H%M%S")
-        for pjob_id in [base_pjob_id] + [f"{base_pjob_id}-{i}" for i in range(1, attempts)]:
-            try:
-                self._client.create(path=self._path(user_id, pjob_id), value=initial_value, makepath=True)
-                # We obtained our unique id
-                return pjob_id
-            except NodeExistsError:
-                # TODO: check that NodeExistsError is thrown on existing job_ids
-                # TODO: add a sleep() to back off a bit?
-                continue
+        with self._connect():
+            for pjob_id in [base_pjob_id] + [f"{base_pjob_id}-{i}" for i in range(1, attempts)]:
+                try:
+                    self._client.create(path=self._path(user_id, pjob_id), value=initial_value, makepath=True)
+                    # We obtained our unique id
+                    return pjob_id
+                except NodeExistsError:
+                    # TODO: check that NodeExistsError is thrown on existing job_ids
+                    # TODO: add a sleep() to back off a bit?
+                    continue
         raise PartitionedJobFailure("Too much attempts to create new pjob_id")
 
     def insert(self, user_id: str, pjob: PartitionedJob) -> str:
@@ -147,12 +148,13 @@ def insert_sjob(
         title: Optional[str] = None,
         status: str = STATUS_INSERTED,
     ):
-        self._client.create(
-            path=self._path(user_id, pjob_id, "sjobs", sjob_id),
-            value=self.serialize(process_graph=subjob.process_graph, backend_id=subjob.backend_id, title=title),
-            makepath=True,
-        )
-        self.set_sjob_status(user_id=user_id, pjob_id=pjob_id, sjob_id=sjob_id, status=status, create=True)
+        with self._connect():
+            self._client.create(
+                path=self._path(user_id, pjob_id, "sjobs", sjob_id),
+                value=self.serialize(process_graph=subjob.process_graph, backend_id=subjob.backend_id, title=title),
+                makepath=True,
+            )
+            self.set_sjob_status(user_id=user_id, pjob_id=pjob_id, sjob_id=sjob_id, status=status, create=True)
 
     def get_pjob_metadata(self, user_id: str, pjob_id: str) -> dict:
         """Get metadata of partitioned job, given by storage id."""
diff --git a/src/openeo_aggregator/testing.py b/src/openeo_aggregator/testing.py
index 77e3df59..54db1bfb 100644
--- a/src/openeo_aggregator/testing.py
+++ b/src/openeo_aggregator/testing.py
@@ -32,7 +32,12 @@ def stop(self):
         assert self.state == "open"
         self.state = "closed"
 
+    def _assert_open(self):
+        if not self.state == "open":
+            raise kazoo.exceptions.ConnectionClosedError("Connection has been closed")
+
     def create(self, path: str, value, makepath: bool = False):
+        self._assert_open()
         if path in self.data:
             raise kazoo.exceptions.NodeExistsError()
         parent = str(pathlib.Path(path).parent)
@@ -44,20 +49,24 @@ def create(self, path: str, value, makepath: bool = False):
         self.data[path] = value
 
     def exists(self, path):
+        self._assert_open()
         return path in self.data
 
     def get(self, path):
+        self._assert_open()
         if path not in self.data:
             raise kazoo.exceptions.NoNodeError()
         return (self.data[path], None)
 
     def get_children(self, path):
+        self._assert_open()
         if path not in self.data:
             raise kazoo.exceptions.NoNodeError()
         parent = path.split("/")
         return [p.split("/")[-1] for p in self.data if p.split("/")[:-1] == parent]
 
     def set(self, path, value, version=-1):
+        self._assert_open()
         if path not in self.data:
             raise kazoo.exceptions.NoNodeError()
         self.data[path] = value

From adae14a584864ca906b227de8381eb7736b872fd Mon Sep 17 00:00:00 2001
From: Stefaan Lippens <stefaan.lippens@vito.be>
Date: Thu, 7 Sep 2023 18:16:44 +0200
Subject: [PATCH 07/11] Issue #115 Improve handling of failed sub batch job
 creation

---
 .../partitionedjobs/tracking.py               | 102 +++++++++---------
 tests/partitionedjobs/conftest.py             |   5 +-
 tests/partitionedjobs/test_api.py             |  37 +++++++
 3 files changed, 94 insertions(+), 50 deletions(-)

diff --git a/src/openeo_aggregator/partitionedjobs/tracking.py b/src/openeo_aggregator/partitionedjobs/tracking.py
index 129fae2f..dc07dfd4 100644
--- a/src/openeo_aggregator/partitionedjobs/tracking.py
+++ b/src/openeo_aggregator/partitionedjobs/tracking.py
@@ -112,57 +112,61 @@ def get_replacement(node_id: str, node: dict, subgraph_id: SubGraphId) -> dict:
                 }
             }
 
-        for sjob_id, subjob, subjob_dependencies in splitter.split_streaming(
-            process_graph=process["process_graph"], get_replacement=get_replacement, main_subgraph_id=main_subgraph_id
-        ):
-            subjobs[sjob_id] = subjob
-            dependencies[sjob_id] = subjob_dependencies
-            try:
-                # TODO: how to error handle this? job creation? Fail whole partitioned job or try to finish what is possible?
-                con = self._backends.get_connection(subjob.backend_id)
-                with con.authenticated_from_request(request=flask.request), con.override(
-                    default_timeout=CONNECTION_TIMEOUT_JOB_START
-                ):
-                    with TimingLogger(title=f"Create batch job {pjob_id=}:{sjob_id} on {con.id=}", logger=_log.info):
-                        job = con.create_job(
-                            process_graph=subjob.process_graph,
-                            title=f"Crossbackend job {pjob_id}:{sjob_id}",
-                            plan=metadata.get("plan"),
-                            budget=metadata.get("budget"),
-                            additional=job_options,
-                        )
-                        _log.info(f"Created {pjob_id}:{sjob_id} on backend {con.id} as batch job {job.job_id}")
-                        batch_jobs[sjob_id] = job
-                        title = f"Partitioned job {pjob_id=} {sjob_id=}"
-                        self._db.insert_sjob(
-                            user_id=user_id,
-                            pjob_id=pjob_id,
-                            sjob_id=sjob_id,
-                            subjob=subjob,
-                            title=title,
-                            status=STATUS_CREATED,
-                        )
-                        self._db.set_backend_job_id(
-                            user_id=user_id, pjob_id=pjob_id, sjob_id=sjob_id, job_id=job.job_id
-                        )
-                        create_stats[STATUS_CREATED] += 1
-            except Exception as exc:
-                _log.error(f"Creation of {pjob_id}:{sjob_id} failed", exc_info=True)
-                msg = f"Create failed: {exc}"
-                self._db.set_sjob_status(
-                    user_id=user_id, pjob_id=pjob_id, sjob_id=sjob_id, status=STATUS_ERROR, message=msg
-                )
-                create_stats[STATUS_ERROR] += 1
+        try:
+            for sjob_id, subjob, subjob_dependencies in splitter.split_streaming(
+                process_graph=process["process_graph"],
+                get_replacement=get_replacement,
+                main_subgraph_id=main_subgraph_id,
+            ):
+                subjobs[sjob_id] = subjob
+                dependencies[sjob_id] = subjob_dependencies
+                try:
+                    title = f"Partitioned job {pjob_id=} {sjob_id=}"
+                    self._db.insert_sjob(user_id=user_id, pjob_id=pjob_id, sjob_id=sjob_id, subjob=subjob, title=title)
+
+                    # TODO: how to error handle this? job creation? Fail whole partitioned job or try to finish what is possible?
+                    con = self._backends.get_connection(subjob.backend_id)
+                    with con.authenticated_from_request(request=flask.request), con.override(
+                        default_timeout=CONNECTION_TIMEOUT_JOB_START
+                    ):
+                        with TimingLogger(
+                            title=f"Create batch job {pjob_id=}:{sjob_id} on {con.id=}", logger=_log.info
+                        ):
+                            job = con.create_job(
+                                process_graph=subjob.process_graph,
+                                title=f"Crossbackend job {pjob_id}:{sjob_id}",
+                                plan=metadata.get("plan"),
+                                budget=metadata.get("budget"),
+                                additional=job_options,
+                            )
+                            _log.info(f"Created {pjob_id}:{sjob_id} on backend {con.id} as batch job {job.job_id}")
+                            batch_jobs[sjob_id] = job
+                            self._db.set_sjob_status(
+                                user_id=user_id, pjob_id=pjob_id, sjob_id=sjob_id, status=STATUS_CREATED
+                            )
+                            self._db.set_backend_job_id(
+                                user_id=user_id, pjob_id=pjob_id, sjob_id=sjob_id, job_id=job.job_id
+                            )
+                            create_stats[STATUS_CREATED] += 1
+                except Exception as exc:
+                    _log.error(f"Creation of {pjob_id}:{sjob_id} failed", exc_info=True)
+                    msg = f"Create failed: {exc}"
+                    self._db.set_sjob_status(
+                        user_id=user_id, pjob_id=pjob_id, sjob_id=sjob_id, status=STATUS_ERROR, message=msg
+                    )
+                    create_stats[STATUS_ERROR] += 1
 
-        # TODO: this is currently unused, don't bother building it at all?
-        partitioned_job = PartitionedJob(
-            process=process, metadata=metadata, job_options=job_options, subjobs=subjobs, dependencies=dependencies
-        )
+            # TODO: this is currently unused, don't bother building it at all?
+            partitioned_job = PartitionedJob(
+                process=process, metadata=metadata, job_options=job_options, subjobs=subjobs, dependencies=dependencies
+            )
 
-        pjob_status = STATUS_CREATED if create_stats[STATUS_CREATED] > 0 else STATUS_ERROR
-        self._db.set_pjob_status(
-            user_id=user_id, pjob_id=pjob_id, status=pjob_status, message=repr(create_stats), progress=0
-        )
+            pjob_status = STATUS_CREATED if create_stats[STATUS_CREATED] > 0 else STATUS_ERROR
+            self._db.set_pjob_status(
+                user_id=user_id, pjob_id=pjob_id, status=pjob_status, message=repr(create_stats), progress=0
+            )
+        except Exception as exc:
+            self._db.set_pjob_status(user_id=user_id, pjob_id=pjob_id, status=STATUS_ERROR, message=str(exc))
 
         return pjob_id
 
diff --git a/tests/partitionedjobs/conftest.py b/tests/partitionedjobs/conftest.py
index c986ab25..3911ce96 100644
--- a/tests/partitionedjobs/conftest.py
+++ b/tests/partitionedjobs/conftest.py
@@ -65,6 +65,7 @@ def __init__(self, requests_mock, backend_url: str, job_id_template: str = "job{
         self.job_id_template = job_id_template
         self.jobs: Dict[Tuple[str, str], DummyBatchJobData] = {}
         self.users: Dict[str, str] = {}
+        self.fail_create_job = False
 
     def register_user(self, bearer_token: str, user_id: str):
         self.users[bearer_token] = user_id
@@ -77,7 +78,7 @@ def get_user_id(self, request: requests.Request):
 
     def get_job_data(self, user_id, job_id) -> DummyBatchJobData:
         if (user_id, job_id) not in self.jobs:
-            raise JobNotFoundException
+            raise JobNotFoundException(job_id=job_id)
         return self.jobs[user_id, job_id]
 
     def setup_basic_requests_mocks(self):
@@ -127,6 +128,8 @@ def _handle_get_jobs(self, request: requests.Request, context):
 
     def _handle_post_jobs(self, request: requests.Request, context):
         """`POST /jobs` handler (create job)"""
+        if self.fail_create_job:
+            raise RuntimeError("nope!")
         user_id = self.get_user_id(request)
         job_id = self.job_id_template.format(i=len(self.jobs))
         assert (user_id, job_id) not in self.jobs
diff --git a/tests/partitionedjobs/test_api.py b/tests/partitionedjobs/test_api.py
index 4a98f595..37b334d4 100644
--- a/tests/partitionedjobs/test_api.py
+++ b/tests/partitionedjobs/test_api.py
@@ -873,3 +873,40 @@ def test_start_and_job_results(self, flask_app, api100, zk_db, dummy1):
                 },
             }
         )
+
+    @now.mock
+    def test_failing_create(self, flask_app, api100, zk_db, dummy1):
+        """Run what happens when creation of sub batch job fails on upstream backend"""
+        api100.set_auth_bearer_token(token=TEST_USER_BEARER_TOKEN)
+        dummy1.fail_create_job = True
+
+        pg = {
+            "lc1": {"process_id": "load_collection", "arguments": {"id": "S2"}},
+            "lc2": {"process_id": "load_collection", "arguments": {"id": "S2"}},
+            "merge": {
+                "process_id": "merge_cubes",
+                "arguments": {"cube1": {"from_node": "lc1"}, "cube2": {"from_node": "lc2"}},
+                "result": True,
+            },
+        }
+
+        res = api100.post(
+            "/jobs",
+            json={
+                "process": {"process_graph": pg},
+                "job_options": {"split_strategy": "crossbackend"},
+            },
+        ).assert_status_code(201)
+
+        pjob_id = "pj-20220119-123456"
+        expected_job_id = f"agg-{pjob_id}"
+        assert res.headers["OpenEO-Identifier"] == expected_job_id
+
+        res = api100.get(f"/jobs/{expected_job_id}").assert_status_code(200)
+        assert res.json == {
+            "id": expected_job_id,
+            "process": {"process_graph": pg},
+            "status": "error",
+            "created": self.now.rfc3339,
+            "progress": 0,
+        }

From 5eb7608b81c564f7dcb75b832a7d2dcfb9de0cbd Mon Sep 17 00:00:00 2001
From: Stefaan Lippens <stefaan.lippens@vito.be>
Date: Fri, 8 Sep 2023 09:35:41 +0200
Subject: [PATCH 08/11] Issue #115 crossbackend: switch to load_stac + partial
 job results

---
 src/openeo_aggregator/partitionedjobs/tracking.py |  9 +++++----
 tests/partitionedjobs/test_api.py                 | 10 ++++++++--
 2 files changed, 13 insertions(+), 6 deletions(-)

diff --git a/src/openeo_aggregator/partitionedjobs/tracking.py b/src/openeo_aggregator/partitionedjobs/tracking.py
index dc07dfd4..1335bd1f 100644
--- a/src/openeo_aggregator/partitionedjobs/tracking.py
+++ b/src/openeo_aggregator/partitionedjobs/tracking.py
@@ -102,13 +102,14 @@ def create_crossbackend_pjob(
         create_stats = collections.Counter()
 
         def get_replacement(node_id: str, node: dict, subgraph_id: SubGraphId) -> dict:
-            # TODO: use `load_stac` iso `load_result`, and use canonical URL?
             nonlocal batch_jobs
-            job_id = batch_jobs[subgraph_id].job_id
+            # TODO: use canonical URL to circumvent auth issues
+            #   but how does `parial` work there? (https://github.com/Open-EO/openeo-api/issues/507)
+            stac_url = batch_jobs[subgraph_id].get_results_metadata_url(full=True) + "?partial=true"
             return {
                 node_id: {
-                    "process_id": "load_result",
-                    "arguments": {"id": job_id},
+                    "process_id": "load_stac",
+                    "arguments": {"url": stac_url},
                 }
             }
 
diff --git a/tests/partitionedjobs/test_api.py b/tests/partitionedjobs/test_api.py
index 37b334d4..64f949c6 100644
--- a/tests/partitionedjobs/test_api.py
+++ b/tests/partitionedjobs/test_api.py
@@ -753,7 +753,10 @@ def test_create_job_basic(self, flask_app, api100, zk_db, dummy1):
                 "backend_id": "b1",
                 "process_graph": {
                     "lc1": {"process_id": "load_collection", "arguments": {"id": "S2"}},
-                    "lc2": {"process_id": "load_result", "arguments": {"id": "1-jb-0"}},
+                    "lc2": {
+                        "process_id": "load_stac",
+                        "arguments": {"url": "https://b1.test/v1/jobs/1-jb-0/results?partial=true"},
+                    },
                     "merge": {
                         "process_id": "merge_cubes",
                         "arguments": {"cube1": {"from_node": "lc1"}, "cube2": {"from_node": "lc2"}},
@@ -775,7 +778,10 @@ def test_create_job_basic(self, flask_app, api100, zk_db, dummy1):
         assert dummy1.get_job_status(TEST_USER, expected_job_id) == "created"
         assert dummy1.get_job_data(TEST_USER, expected_job_id).create["process"]["process_graph"] == {
             "lc1": {"process_id": "load_collection", "arguments": {"id": "S2"}},
-            "lc2": {"process_id": "load_result", "arguments": {"id": "1-jb-0"}},
+            "lc2": {
+                "process_id": "load_stac",
+                "arguments": {"url": "https://b1.test/v1/jobs/1-jb-0/results?partial=true"},
+            },
             "merge": {
                 "process_id": "merge_cubes",
                 "arguments": {"cube1": {"from_node": "lc1"}, "cube2": {"from_node": "lc2"}},

From ab5c315674e1cc09cdfacc70de9d822f79a93bde Mon Sep 17 00:00:00 2001
From: Stefaan Lippens <stefaan.lippens@vito.be>
Date: Fri, 8 Sep 2023 11:56:21 +0200
Subject: [PATCH 09/11] Issue #115 eliminate some warnings from
 TestCrossBackendSplitting tests

---
 tests/partitionedjobs/conftest.py |  9 +++++----
 tests/partitionedjobs/test_api.py | 12 +++++++++++-
 2 files changed, 16 insertions(+), 5 deletions(-)

diff --git a/tests/partitionedjobs/conftest.py b/tests/partitionedjobs/conftest.py
index 3911ce96..b7992ec1 100644
--- a/tests/partitionedjobs/conftest.py
+++ b/tests/partitionedjobs/conftest.py
@@ -1,6 +1,6 @@
 import collections
 import re
-from typing import Dict, List, Tuple
+from typing import Dict, Iterable, List, Tuple
 
 import pytest
 import requests
@@ -81,10 +81,11 @@ def get_job_data(self, user_id, job_id) -> DummyBatchJobData:
             raise JobNotFoundException(job_id=job_id)
         return self.jobs[user_id, job_id]
 
-    def setup_basic_requests_mocks(self):
+    def setup_basic_requests_mocks(self, collections: Iterable[str] = ("S2",)):
         # Basic collections
-        self.requests_mock.get(self.backend_url + "/collections", json={"collections": [{"id": "S2"}]})
-        self.requests_mock.get(self.backend_url + "/collections/S2", json={"id": "S2"})
+        for cid in collections:
+            self.requests_mock.get(self.backend_url + "/collections", json={"collections": [{"id": cid}]})
+            self.requests_mock.get(self.backend_url + f"/collections/{cid}", json={"id": cid})
         # Batch job handling: list jobs
         self.requests_mock.get(self.backend_url + "/jobs", json=self._handle_get_jobs)
         # Batch job handling: create job
diff --git a/tests/partitionedjobs/test_api.py b/tests/partitionedjobs/test_api.py
index 64f949c6..dfc8b941 100644
--- a/tests/partitionedjobs/test_api.py
+++ b/tests/partitionedjobs/test_api.py
@@ -44,13 +44,22 @@ def __init__(self, date: str):
 
 @pytest.fixture
 def dummy1(backend1, requests_mock) -> DummyBackend:
-    # TODO: rename this fixture to dummy_backed1
+    # TODO: rename this fixture to dummy_backed1 for clarity
     dummy = DummyBackend(requests_mock=requests_mock, backend_url=backend1, job_id_template="1-jb-{i}")
     dummy.setup_basic_requests_mocks()
     dummy.register_user(bearer_token=TEST_USER_BEARER_TOKEN, user_id=TEST_USER)
     return dummy
 
 
+@pytest.fixture
+def dummy2(backend2, requests_mock) -> DummyBackend:
+    # TODO: rename this fixture to dummy_backed2 for clarity
+    dummy = DummyBackend(requests_mock=requests_mock, backend_url=backend2, job_id_template="2-jb-{i}")
+    dummy.setup_basic_requests_mocks(collections=["S22"])
+    dummy.register_user(bearer_token=TEST_USER_BEARER_TOKEN, user_id=TEST_USER)
+    return dummy
+
+
 class TestFlimsyBatchJobSplitting:
     now = _Now("2022-01-19T12:34:56Z")
 
@@ -613,6 +622,7 @@ def test_job_results_basic(self, flask_app, api100, dummy1):
     # TODO: more/full TileGridSplitter batch job tests
 
 
+@pytest.mark.usefixtures("dummy1", "dummy2")
 class TestCrossBackendSplitting:
     now = _Now("2022-01-19T12:34:56Z")
 

From 044380510690bf58ffb68e1faa129d554df6ec0c Mon Sep 17 00:00:00 2001
From: Stefaan Lippens <stefaan.lippens@vito.be>
Date: Fri, 8 Sep 2023 12:09:55 +0200
Subject: [PATCH 10/11] Issue #115 switch to canonical batch job result URL

---
 .../partitionedjobs/tracking.py                | 18 +++++++++++++++---
 tests/partitionedjobs/test_api.py              | 18 ++++++++++++++----
 2 files changed, 29 insertions(+), 7 deletions(-)

diff --git a/src/openeo_aggregator/partitionedjobs/tracking.py b/src/openeo_aggregator/partitionedjobs/tracking.py
index 1335bd1f..6c63ce68 100644
--- a/src/openeo_aggregator/partitionedjobs/tracking.py
+++ b/src/openeo_aggregator/partitionedjobs/tracking.py
@@ -22,6 +22,7 @@
     STATUS_INSERTED,
     STATUS_RUNNING,
     PartitionedJob,
+    PartitionedJobFailure,
     SubJob,
 )
 from openeo_aggregator.partitionedjobs.crossbackend import (
@@ -103,9 +104,20 @@ def create_crossbackend_pjob(
 
         def get_replacement(node_id: str, node: dict, subgraph_id: SubGraphId) -> dict:
             nonlocal batch_jobs
-            # TODO: use canonical URL to circumvent auth issues
-            #   but how does `parial` work there? (https://github.com/Open-EO/openeo-api/issues/507)
-            stac_url = batch_jobs[subgraph_id].get_results_metadata_url(full=True) + "?partial=true"
+            try:
+                job: BatchJob = batch_jobs[subgraph_id]
+                with job.connection.authenticated_from_request(flask.request):
+                    result_url = job.get_results_metadata_url(full=True)
+                    result_metadata = job.connection.get(
+                        result_url, params={"partial": "true"}, expected_status=200
+                    ).json()
+                # Will canonical link also be partial? (https://github.com/Open-EO/openeo-api/issues/507)
+                canonical_links = [link for link in result_metadata.get("links", {}) if link.get("rel") == "canonical"]
+                stac_url = canonical_links[0]["href"]
+            except Exception as e:
+                msg = f"Failed to obtain partial canonical batch job result URL for {subgraph_id=}: {e}"
+                _log.exception(msg)
+                raise PartitionedJobFailure(msg) from e
             return {
                 node_id: {
                     "process_id": "load_stac",
diff --git a/tests/partitionedjobs/test_api.py b/tests/partitionedjobs/test_api.py
index dfc8b941..20807c02 100644
--- a/tests/partitionedjobs/test_api.py
+++ b/tests/partitionedjobs/test_api.py
@@ -695,7 +695,7 @@ def test_create_job_simple(self, flask_app, api100, zk_db, dummy1):
         assert pg == {"lc1": {"arguments": {"id": "S2"}, "process_id": "load_collection", "result": True}}
 
     @now.mock
-    def test_create_job_basic(self, flask_app, api100, zk_db, dummy1):
+    def test_create_job_basic(self, flask_app, api100, zk_db, dummy1, requests_mock):
         api100.set_auth_bearer_token(token=TEST_USER_BEARER_TOKEN)
 
         pg = {
@@ -708,6 +708,11 @@ def test_create_job_basic(self, flask_app, api100, zk_db, dummy1):
             },
         }
 
+        requests_mock.get(
+            "https://b1.test/v1/jobs/1-jb-0/results?partial=true",
+            json={"links": [{"rel": "canonical", "href": "https://data.b1.test/123abc"}]},
+        )
+
         res = api100.post(
             "/jobs",
             json={"process": {"process_graph": pg}, "job_options": {"split_strategy": "crossbackend"}},
@@ -765,7 +770,7 @@ def test_create_job_basic(self, flask_app, api100, zk_db, dummy1):
                     "lc1": {"process_id": "load_collection", "arguments": {"id": "S2"}},
                     "lc2": {
                         "process_id": "load_stac",
-                        "arguments": {"url": "https://b1.test/v1/jobs/1-jb-0/results?partial=true"},
+                        "arguments": {"url": "https://data.b1.test/123abc"},
                     },
                     "merge": {
                         "process_id": "merge_cubes",
@@ -790,7 +795,7 @@ def test_create_job_basic(self, flask_app, api100, zk_db, dummy1):
             "lc1": {"process_id": "load_collection", "arguments": {"id": "S2"}},
             "lc2": {
                 "process_id": "load_stac",
-                "arguments": {"url": "https://b1.test/v1/jobs/1-jb-0/results?partial=true"},
+                "arguments": {"url": "https://data.b1.test/123abc"},
             },
             "merge": {
                 "process_id": "merge_cubes",
@@ -818,7 +823,7 @@ def test_create_job_basic(self, flask_app, api100, zk_db, dummy1):
         }
 
     @now.mock
-    def test_start_and_job_results(self, flask_app, api100, zk_db, dummy1):
+    def test_start_and_job_results(self, flask_app, api100, zk_db, dummy1, requests_mock):
         """Run the jobs and get results"""
         api100.set_auth_bearer_token(token=TEST_USER_BEARER_TOKEN)
 
@@ -832,6 +837,11 @@ def test_start_and_job_results(self, flask_app, api100, zk_db, dummy1):
             },
         }
 
+        requests_mock.get(
+            "https://b1.test/v1/jobs/1-jb-0/results?partial=true",
+            json={"links": [{"rel": "canonical", "href": "https://data.b1.test/123abc"}]},
+        )
+
         res = api100.post(
             "/jobs",
             json={

From f500b330b9c2c4ee90cba2c9d0d2e0e80808ec31 Mon Sep 17 00:00:00 2001
From: Stefaan Lippens <stefaan.lippens@vito.be>
Date: Mon, 11 Sep 2023 12:00:13 +0200
Subject: [PATCH 11/11] Issue #115 bump to version 0.9.0a1

---
 CHANGELOG.md                   | 17 ++++++++++++++++-
 src/openeo_aggregator/about.py |  2 +-
 2 files changed, 17 insertions(+), 2 deletions(-)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index cdf98eab..b36c4b85 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -4,7 +4,22 @@ All notable changes to this project will be documented in this file.
 
 The format is roughly based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
 
-## [Current: 0.8.x]
+
+## [Current: 0.9.x]
+
+
+### Added
+
+- Initial aggregator-level implementation of cross-backend processing
+  ([#115](https://github.com/Open-EO/openeo-aggregator/issues/115))
+
+
+### Changed
+
+### Fixed
+
+
+## [0.8.x]
 
 ### Added
 
diff --git a/src/openeo_aggregator/about.py b/src/openeo_aggregator/about.py
index 6e579fac..a032b4b1 100644
--- a/src/openeo_aggregator/about.py
+++ b/src/openeo_aggregator/about.py
@@ -1 +1 @@
-__version__ = "0.8.5a1"
+__version__ = "0.9.0a1"