From cc55c5abfb9892db460d6f40ba9390f01eab5c51 Mon Sep 17 00:00:00 2001 From: Stefaan Lippens Date: Wed, 18 Sep 2024 17:57:25 +0200 Subject: [PATCH] Issue #150 Do some renaming now that design dust has settled a bit also add consistency check of _GraphViewer node map --- .../partitionedjobs/crossbackend.py | 95 +++++++----- tests/partitionedjobs/test_crossbackend.py | 139 +++++++++++------- 2 files changed, 139 insertions(+), 95 deletions(-) diff --git a/src/openeo_aggregator/partitionedjobs/crossbackend.py b/src/openeo_aggregator/partitionedjobs/crossbackend.py index 082b0b8..4689994 100644 --- a/src/openeo_aggregator/partitionedjobs/crossbackend.py +++ b/src/openeo_aggregator/partitionedjobs/crossbackend.py @@ -448,18 +448,24 @@ def run_partitioned_job(pjob: PartitionedJob, connection: openeo.Connection, fai } +def to_frozenset(value: Union[Iterable[str], str]) -> frozenset[str]: + """Coerce value to frozenset of strings""" + if isinstance(value, str): + value = [value] + return frozenset(value) + + @dataclasses.dataclass(frozen=True, init=False, eq=True) -class _FrozenNode: +class _GVNode: """ - Node in a _FrozenGraph, with pointers to other nodes it depends on (needs data/input from) - and nodes to which it is input to. + Node in a _GraphViewer, with pointers to other nodes it depends on (needs data/input from) + and nodes to which it provides input to. - This is as immutable as possible (as far as Python allows) to - be used and reused in iterative/recursive graph handling algorithms, - without having to worry about accidentally changing state. + This structure designed to be as immutable as possible (as far as Python allows) + to be (re)used in iterative/recursive graph handling algorithms, + without having to worry about accidentally propagating changed state to other parts of the graph. """ - # TODO: better name for this class? # TODO: type coercion in __init__ of frozen dataclasses is bit ugly. Use attrs with field converters instead? # Node ids of other nodes this node depends on (aka parents) @@ -469,43 +475,54 @@ class _FrozenNode: # Backend ids this node is marked to be supported on # value None means it is unknown/unconstrained for this node - # TODO: Move this to _FrozenGraph as responsibility? + # TODO: Move this to _GraphViewer as responsibility? backend_candidates: Union[frozenset[BackendId], None] def __init__( self, *, - depends_on: Optional[Iterable[NodeId]] = None, - flows_to: Optional[Iterable[NodeId]] = None, - backend_candidates: Union[BackendId, Iterable[BackendId], None] = None, + depends_on: Union[Iterable[NodeId], NodeId, None] = None, + flows_to: Union[Iterable[NodeId], NodeId, None] = None, + backend_candidates: Union[Iterable[BackendId], BackendId, None] = None, ): super().__init__() - object.__setattr__(self, "depends_on", frozenset(depends_on or [])) - object.__setattr__(self, "flows_to", frozenset(flows_to or [])) - if isinstance(backend_candidates, str): - backend_candidates = frozenset([backend_candidates]) - elif backend_candidates is None: - backend_candidates = None - else: - backend_candidates = frozenset(backend_candidates) + object.__setattr__(self, "depends_on", to_frozenset(depends_on or [])) + object.__setattr__(self, "flows_to", to_frozenset(flows_to or [])) + backend_candidates = to_frozenset(backend_candidates) if backend_candidates is not None else None object.__setattr__(self, "backend_candidates", backend_candidates) def __repr__(self): return f"<{type(self).__name__}({self.depends_on}, {self.flows_to}, {self.backend_candidates})>" -class _FrozenGraph: +class _GraphViewer: """ - Graph of _FrozenNode objects. + Internal utility to have read-only view on the topological structure of a proces graph + and track the flow of backend support. + """ - # TODO: find better class name: e.g. SplitGraphView, GraphSplitUtility, GraphSplitter, ...? # TODO: add more logging of what is happening under the hood - def __init__(self, graph: dict[NodeId, _FrozenNode]): + def __init__(self, node_map: dict[NodeId, _GVNode]): + self._check_consistency(node_map=node_map) # Work with a read-only proxy to prevent accidental changes - # TODO: check consistency of references? - self._graph: Mapping[NodeId, _FrozenNode] = types.MappingProxyType(graph) + self._graph: Mapping[NodeId, _GVNode] = types.MappingProxyType(node_map) + + @staticmethod + def _check_consistency(node_map: dict[NodeId, _GVNode]): + """Check (link) consistency of given node map""" + key_ids = set(node_map.keys()) + linked_ids = set(k for n in node_map.values() for k in n.depends_on.union(n.flows_to)) + unknown = linked_ids.difference(key_ids) + if unknown: + raise GraphSplitException(f"Inconsistent node map: {key_ids=} != {linked_ids=}: {unknown=}") + bad_links = set() + for node_id, node in node_map.items(): + bad_links.update((other, node_id) for other in node.depends_on if node_id not in node_map[other].flows_to) + bad_links.update((node_id, other) for other in node.flows_to if node_id not in node_map[other].depends_on) + if bad_links: + raise GraphSplitException(f"Inconsistent node map: {bad_links=}") def __repr__(self): return f"<{type(self).__name__}({self._graph})>" @@ -513,7 +530,7 @@ def __repr__(self): @classmethod def from_flat_graph(cls, flat_graph: FlatPG, supporting_backends: SupportingBackendsMapper = (lambda n, d: None)): """ - Build _FrozenGraph from a flat process graph representation + Build _GraphViewer from a flat process graph representation """ # Extract dependency links between nodes depends_on = collections.defaultdict(list) @@ -525,14 +542,14 @@ def from_flat_graph(cls, flat_graph: FlatPG, supporting_backends: SupportingBack depends_on[node_id].append(from_node) flows_to[from_node].append(node_id) graph = { - node_id: _FrozenNode( + node_id: _GVNode( depends_on=depends_on.get(node_id, []), flows_to=flows_to.get(node_id, []), backend_candidates=supporting_backends(node_id, node), ) for node_id, node in flat_graph.items() } - return cls(graph=graph) + return cls(node_map=graph) @classmethod def from_edges( @@ -550,7 +567,7 @@ def from_edges( flows_to[parent].append(child) graph = { - node_id: _FrozenNode( + node_id: _GVNode( # Note that we just use node id as process id. Do we have better options here? depends_on=depends_on.get(node_id, []), flows_to=flows_to.get(node_id, []), @@ -558,14 +575,14 @@ def from_edges( ) for node_id in set(depends_on.keys()).union(flows_to.keys()) } - return cls(graph=graph) + return cls(node_map=graph) - def node(self, node_id: NodeId) -> _FrozenNode: + def node(self, node_id: NodeId) -> _GVNode: if node_id not in self._graph: raise GraphSplitException(f"Invalid node id {node_id}.") return self._graph[node_id] - def iter_nodes(self) -> Iterator[Tuple[NodeId, _FrozenNode]]: + def iter_nodes(self) -> Iterator[Tuple[NodeId, _GVNode]]: """Iterate through node_id-node pairs""" yield from self._graph.items() @@ -686,12 +703,12 @@ def get_flow_weights(node_id: NodeId) -> Dict[NodeId, fractions.Fraction]: # Select articulation points: nodes where all flows have weight 1 return set(node_id for node_id, flows in flow_weights.items() if all(w == 1 for w in flows.values())) - def split_at(self, split_node_id: NodeId) -> Tuple[_FrozenGraph, _FrozenGraph]: + def split_at(self, split_node_id: NodeId) -> Tuple[_GraphViewer, _GraphViewer]: """ Split graph at given node id (must be articulation point), creating two new graphs, containing original nodes and adaptation of the split node. - :return: two _FrozenGraph objects: the upstream subgraph and the downstream subgraph + :return: two _GraphViewer objects: the upstream subgraph and the downstream subgraph """ split_node = self.node(split_node_id) @@ -710,20 +727,20 @@ def next_nodes(node_id: NodeId) -> Iterable[NodeId]: up_graph = {n: self.node(n) for n in up_node_ids} # Replacement of original split node: no `flows_to` links - up_graph[split_node_id] = _FrozenNode( + up_graph[split_node_id] = _GVNode( depends_on=split_node.depends_on, backend_candidates=split_node.backend_candidates, ) - up = _FrozenGraph(graph=up_graph) + up = _GraphViewer(node_map=up_graph) down_graph = {n: node for n, node in self.iter_nodes() if n not in up_node_ids} # Replacement of original split node: no `depends_on` links # and perhaps more importantly: do not copy over the original `backend_candidates`` - down_graph[split_node_id] = _FrozenNode( + down_graph[split_node_id] = _GVNode( flows_to=split_node.flows_to, backend_candidates=None, ) - down = _FrozenGraph(graph=down_graph) + down = _GraphViewer(node_map=down_graph) return up, down @@ -797,7 +814,7 @@ def __init__(self, supporting_backends: SupportingBackendsMapper): self._supporting_backends_mapper = supporting_backends def split(self, process_graph: FlatPG) -> _PGSplitResult: - graph = _FrozenGraph.from_flat_graph( + graph = _GraphViewer.from_flat_graph( flat_graph=process_graph, supporting_backends=self._supporting_backends_mapper ) diff --git a/tests/partitionedjobs/test_crossbackend.py b/tests/partitionedjobs/test_crossbackend.py index f8f7707..28a10e8 100644 --- a/tests/partitionedjobs/test_crossbackend.py +++ b/tests/partitionedjobs/test_crossbackend.py @@ -19,8 +19,8 @@ LoadCollectionGraphSplitter, SubGraphId, SupportingBackendsMapper, - _FrozenGraph, - _FrozenNode, + _GraphViewer, + _GVNode, _PGSplitResult, _SubGraphData, run_partitioned_job, @@ -439,26 +439,40 @@ def test_basic(self, aggregator: _FakeAggregator): } -class TestFrozenNode: - def test_default(self): - node = _FrozenNode() +class TestGVNode: + def test_defaults(self): + node = _GVNode() + assert isinstance(node.depends_on, frozenset) assert node.depends_on == frozenset() + assert isinstance(node.flows_to, frozenset) assert node.flows_to == frozenset() assert node.backend_candidates is None def test_basic(self): - node = _FrozenNode(depends_on=["a", "b"], flows_to=["c", "d"], backend_candidates="X") + node = _GVNode(depends_on=["a", "b"], flows_to=["c", "d"], backend_candidates=["X"]) + assert isinstance(node.depends_on, frozenset) assert node.depends_on == frozenset(["a", "b"]) + assert isinstance(node.flows_to, frozenset) assert node.flows_to == frozenset(["c", "d"]) + assert isinstance(node.backend_candidates, frozenset) assert node.backend_candidates == frozenset(["X"]) + def test_single_strings(self): + node = _GVNode(depends_on="apple", flows_to="banana", backend_candidates="coconut") + assert isinstance(node.depends_on, frozenset) + assert node.depends_on == frozenset(["apple"]) + assert isinstance(node.flows_to, frozenset) + assert node.flows_to == frozenset(["banana"]) + assert isinstance(node.backend_candidates, frozenset) + assert node.backend_candidates == frozenset(["coconut"]) + def test_eq(self): - assert _FrozenNode() == _FrozenNode() - assert _FrozenNode( + assert _GVNode() == _GVNode() + assert _GVNode( depends_on=["a", "b"], flows_to=["c", "d"], backend_candidates="X", - ) == _FrozenNode( + ) == _GVNode( depends_on=("b", "a"), flows_to={"d", "c"}, backend_candidates=["X"], @@ -469,35 +483,48 @@ def supporting_backends_from_node_id_dict(data: dict) -> SupportingBackendsMappe return lambda node_id, node: data.get(node_id) -class TestFrozenGraph: +class TestGraphViewer: def test_empty(self): - graph = _FrozenGraph(graph={}) + graph = _GraphViewer(node_map={}) assert list(graph.iter_nodes()) == [] + @pytest.mark.parametrize( + ["node_map", "expected_error"], + [ + ({"a": _GVNode(flows_to="b")}, r"Inconsistent.*unknown=\{'b'\}"), + ({"b": _GVNode(depends_on="a")}, r"Inconsistent.*unknown=\{'a'\}"), + ({"a": _GVNode(flows_to="b"), "b": _GVNode()}, r"Inconsistent.*bad_links=\{\('a', 'b'\)\}"), + ({"b": _GVNode(depends_on="a"), "a": _GVNode()}, r"Inconsistent.*bad_links=\{\('a', 'b'\)\}"), + ], + ) + def test_check_consistency(self, node_map, expected_error): + with pytest.raises(GraphSplitException, match=expected_error): + _ = _GraphViewer(node_map=node_map) + def test_from_flat_graph_basic(self): flat = { "lc1": {"process_id": "load_collection", "arguments": {"id": "B1_NDVI"}}, "ndvi1": {"process_id": "ndvi", "arguments": {"data": {"from_node": "lc1"}}, "result": True}, } - graph = _FrozenGraph.from_flat_graph( + graph = _GraphViewer.from_flat_graph( flat, supporting_backends=supporting_backends_from_node_id_dict({"lc1": ["b1"]}) ) assert sorted(graph.iter_nodes()) == [ - ("lc1", _FrozenNode(flows_to=["ndvi1"], backend_candidates="b1")), - ("ndvi1", _FrozenNode(depends_on=["lc1"])), + ("lc1", _GVNode(flows_to=["ndvi1"], backend_candidates="b1")), + ("ndvi1", _GVNode(depends_on=["lc1"])), ] # TODO: test from_flat_graph with more complex graphs def test_from_edges(self): - graph = _FrozenGraph.from_edges([("a", "c"), ("b", "d"), ("c", "e"), ("d", "e"), ("e", "f")]) + graph = _GraphViewer.from_edges([("a", "c"), ("b", "d"), ("c", "e"), ("d", "e"), ("e", "f")]) assert sorted(graph.iter_nodes()) == [ - ("a", _FrozenNode(flows_to=["c"])), - ("b", _FrozenNode(flows_to=["d"])), - ("c", _FrozenNode(depends_on=["a"], flows_to=["e"])), - ("d", _FrozenNode(depends_on=["b"], flows_to=["e"])), - ("e", _FrozenNode(depends_on=["c", "d"], flows_to=["f"])), - ("f", _FrozenNode(depends_on=["e"])), + ("a", _GVNode(flows_to=["c"])), + ("b", _GVNode(flows_to=["d"])), + ("c", _GVNode(depends_on=["a"], flows_to=["e"])), + ("d", _GVNode(depends_on=["b"], flows_to=["e"])), + ("e", _GVNode(depends_on=["c", "d"], flows_to=["f"])), + ("f", _GVNode(depends_on=["e"])), ] @pytest.mark.parametrize( @@ -522,7 +549,7 @@ def test_from_edges(self): ], ) def test_walk_upstream_nodes(self, seed, include_seeds, expected): - graph = _FrozenGraph.from_edges( + graph = _GraphViewer.from_edges( # a b # | | # c d @@ -535,7 +562,7 @@ def test_walk_upstream_nodes(self, seed, include_seeds, expected): assert list(graph.walk_upstream_nodes(seed, include_seeds)) == expected def test_get_backend_candidates_basic(self): - graph = _FrozenGraph.from_edges( + graph = _GraphViewer.from_edges( # a # | # b c @@ -558,7 +585,7 @@ def test_get_backend_candidates_basic(self): assert graph.get_backend_candidates_for_node_set(["a", "b", "d"]) == set() def test_get_backend_candidates_none(self): - graph = _FrozenGraph.from_edges( + graph = _GraphViewer.from_edges( # a # | # b c @@ -575,7 +602,7 @@ def test_get_backend_candidates_none(self): assert graph.get_backend_candidates_for_node_set(["a", "b", "c"]) is None def test_get_backend_candidates_intersection(self): - graph = _FrozenGraph.from_edges( + graph = _GraphViewer.from_edges( # a b c # \ / \ / # d e @@ -598,7 +625,7 @@ def test_get_backend_candidates_intersection(self): assert graph.get_backend_candidates_for_node_set(["c", "d"]) == set() def test_find_forsaken_nodes(self): - graph = _FrozenGraph.from_edges( + graph = _GraphViewer.from_edges( # a b c # \ / \ / # d e @@ -618,7 +645,7 @@ def test_find_articulation_points_basic(self): "lc1": {"process_id": "load_collection", "arguments": {"id": "S2"}}, "ndvi1": {"process_id": "ndvi", "arguments": {"data": {"from_node": "lc1"}}, "result": True}, } - graph = _FrozenGraph.from_flat_graph(flat) + graph = _GraphViewer.from_flat_graph(flat) assert graph.find_articulation_points() == {"lc1", "ndvi1"} @pytest.mark.parametrize( @@ -693,61 +720,61 @@ def test_find_articulation_points_basic(self): ], ) def test_find_articulation_points(self, flat, expected): - graph = _FrozenGraph.from_flat_graph(flat) + graph = _GraphViewer.from_flat_graph(flat) assert graph.find_articulation_points() == expected def test_split_at_minimal(self): - graph = _FrozenGraph.from_edges( + graph = _GraphViewer.from_edges( [("a", "b")], supporting_backends_mapper=supporting_backends_from_node_id_dict({"a": "A"}) ) # Split at a up, down = graph.split_at("a") assert sorted(up.iter_nodes()) == [ - ("a", _FrozenNode(backend_candidates=["A"])), + ("a", _GVNode(backend_candidates=["A"])), ] assert sorted(down.iter_nodes()) == [ - ("a", _FrozenNode(flows_to=["b"])), - ("b", _FrozenNode(depends_on=["a"])), + ("a", _GVNode(flows_to=["b"])), + ("b", _GVNode(depends_on=["a"])), ] # Split at b up, down = graph.split_at("b") assert sorted(up.iter_nodes()) == [ - ("a", _FrozenNode(flows_to=["b"], backend_candidates=["A"])), - ("b", _FrozenNode(depends_on=["a"])), + ("a", _GVNode(flows_to=["b"], backend_candidates=["A"])), + ("b", _GVNode(depends_on=["a"])), ] assert sorted(down.iter_nodes()) == [ - ("b", _FrozenNode()), + ("b", _GVNode()), ] def test_split_at_basic(self): - graph = _FrozenGraph.from_edges( + graph = _GraphViewer.from_edges( [("a", "b"), ("b", "c")], supporting_backends_mapper=supporting_backends_from_node_id_dict({"a": "A"}), ) up, down = graph.split_at("b") assert sorted(up.iter_nodes()) == [ - ("a", _FrozenNode(flows_to=["b"], backend_candidates=["A"])), - ("b", _FrozenNode(depends_on=["a"])), + ("a", _GVNode(flows_to=["b"], backend_candidates=["A"])), + ("b", _GVNode(depends_on=["a"])), ] assert sorted(down.iter_nodes()) == [ - ("b", _FrozenNode(flows_to=["c"])), - ("c", _FrozenNode(depends_on=["b"])), + ("b", _GVNode(flows_to=["c"])), + ("c", _GVNode(depends_on=["b"])), ] def test_split_at_complex(self): - graph = _FrozenGraph.from_edges( + graph = _GraphViewer.from_edges( [("a", "b"), ("a", "c"), ("b", "d"), ("c", "d"), ("c", "e"), ("e", "g"), ("f", "g"), ("X", "Y")] ) up, down = graph.split_at("e") assert sorted(up.iter_nodes()) == sorted( - _FrozenGraph.from_edges([("a", "b"), ("a", "c"), ("b", "d"), ("c", "d"), ("c", "e")]).iter_nodes() + _GraphViewer.from_edges([("a", "b"), ("a", "c"), ("b", "d"), ("c", "d"), ("c", "e")]).iter_nodes() ) assert sorted(down.iter_nodes()) == sorted( - _FrozenGraph.from_edges([("e", "g"), ("f", "g"), ("X", "Y")]).iter_nodes() + _GraphViewer.from_edges([("e", "g"), ("f", "g"), ("X", "Y")]).iter_nodes() ) def test_split_at_non_articulation_point(self): - graph = _FrozenGraph.from_edges( + graph = _GraphViewer.from_edges( # a # /| # b | @@ -762,22 +789,22 @@ def test_split_at_non_articulation_point(self): # These should still work up, down = graph.split_at("a") assert sorted(up.iter_nodes()) == [ - ("a", _FrozenNode()), + ("a", _GVNode()), ] assert sorted(down.iter_nodes()) == [ - ("a", _FrozenNode(flows_to=["b", "c"])), - ("b", _FrozenNode(depends_on=["a"], flows_to=["c"])), - ("c", _FrozenNode(depends_on=["a", "b"])), + ("a", _GVNode(flows_to=["b", "c"])), + ("b", _GVNode(depends_on=["a"], flows_to=["c"])), + ("c", _GVNode(depends_on=["a", "b"])), ] up, down = graph.split_at("c") assert sorted(up.iter_nodes()) == [ - ("a", _FrozenNode(flows_to=["b", "c"])), - ("b", _FrozenNode(depends_on=["a"], flows_to=["c"])), - ("c", _FrozenNode(depends_on=["a", "b"])), + ("a", _GVNode(flows_to=["b", "c"])), + ("b", _GVNode(depends_on=["a"], flows_to=["c"])), + ("c", _GVNode(depends_on=["a", "b"])), ] assert sorted(down.iter_nodes()) == [ - ("c", _FrozenNode()), + ("c", _GVNode()), ] def test_produce_split_locations_simple(self): @@ -789,7 +816,7 @@ def test_produce_split_locations_simple(self): "lc1": {"process_id": "load_collection", "arguments": {"id": "S2"}}, "ndvi1": {"process_id": "ndvi", "arguments": {"data": {"from_node": "lc1"}}, "result": True}, } - graph = _FrozenGraph.from_flat_graph( + graph = _GraphViewer.from_flat_graph( flat, supporting_backends=supporting_backends_from_node_id_dict({"lc1": "b1"}) ) assert list(graph.produce_split_locations()) == [[]] @@ -810,7 +837,7 @@ def test_produce_split_locations_merge_basic(self): "arguments": {"cube1": {"from_node": "lc1"}, "cube2": {"from_node": "lc2"}}, }, } - graph = _FrozenGraph.from_flat_graph( + graph = _GraphViewer.from_flat_graph( flat, supporting_backends=supporting_backends_from_node_id_dict({"lc1": ["b1"], "lc2": ["b2"]}), ) @@ -832,7 +859,7 @@ def test_produce_split_locations_merge_longer(self): "arguments": {"cube1": {"from_node": "bands1"}, "cube2": {"from_node": "bands2"}}, }, } - graph = _FrozenGraph.from_flat_graph( + graph = _GraphViewer.from_flat_graph( flat, supporting_backends=supporting_backends_from_node_id_dict({"lc1": ["b1"], "lc2": ["b2"]}), ) @@ -861,7 +888,7 @@ def test_produce_split_locations_merge_longer_triangle(self): "arguments": {"cube1": {"from_node": "mask1"}, "cube2": {"from_node": "bands2"}}, }, } - graph = _FrozenGraph.from_flat_graph( + graph = _GraphViewer.from_flat_graph( flat, supporting_backends=supporting_backends_from_node_id_dict({"lc1": ["b1"], "lc2": ["b2"]}), )