Skip to content

Commit

Permalink
add _graph module
Browse files Browse the repository at this point in the history
The evaluable module has two different implementations for producing graphs:
asciitree and graphviz. While there is not much overlap between the two graphs
at this point, that will change when we have loops embedded inside evaluables.

This commit adds a `_graph` module, for internal use only, that provides a
unified interface for drawing acyclic, directed graphs, with two backends:
asciitree and graphviz.
  • Loading branch information
joostvanzwieten committed Dec 11, 2020
1 parent a057ef2 commit 77b9ad3
Show file tree
Hide file tree
Showing 3 changed files with 537 additions and 0 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ jobs:
with:
name: python-package
path: dist/
- name: Install Graphviz
if: ${{ matrix.os == 'ubuntu-latest' }}
run: sudo apt install -y graphviz
- name: Install Nutils and dependencies
id: install
env:
Expand Down
222 changes: 222 additions & 0 deletions nutils/_graph.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
# Copyright (c) 2020 Evalf
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.

from typing import Mapping, MutableMapping, Optional, Iterator, Iterable, Generator, Sequence, List, MutableSet, Callable, Tuple, Generic, TypeVar, Dict
import typing, itertools, treelog, subprocess, abc, html

Metadata = TypeVar('Metadata')
GraphvizColorCallback = Callable[['Node'], Optional[str]]

class Subgraph:

def __init__(self, label: str, parent: Optional['Subgraph'] = None) -> None:
self.label = label
self.parent = parent

class Node(Generic[Metadata], metaclass=abc.ABCMeta):

def __init__(self, metadata: Metadata, subgraph: Optional[Subgraph] = None) -> None:
self.metadata = metadata
self.subgraph = subgraph

@abc.abstractmethod
def __bool__(self) -> bool:
raise NotImplementedError # pragma: no cover

@abc.abstractmethod
def _generate_asciitree_nodes(self, cache: MutableMapping['Node[Metadata]', str], subgraph_ids: Mapping[Optional[Subgraph], str], id_gen: Iterator[str], select: str, bridge: str) -> Generator[str, None, None]:
raise NotImplementedError # pragma: no cover

@abc.abstractmethod
def _collect_graphviz_nodes_edges(self, cache: MutableMapping['Node[Metadata]', str], id_gen: Iterator[str], nodes: MutableMapping[Optional[Subgraph], List[str]], edges: List[str], parent_subgraph: Optional[Subgraph], fill_color: Optional[GraphvizColorCallback] = None) -> Optional[str]:
raise NotImplementedError # pragma: no cover

def walk(self, seen: MutableSet['Node[Metadata]']) -> Iterator['Node[Metadata]']:
raise NotImplementedError # pragma: no cover

def generate_asciitree(self, richoutput: bool = False) -> str:
subgraph_children = _collect_subgraphs(self)
if len(subgraph_children) > 1:
subgraph_ids = {} # type: Dict[Optional[Subgraph], str]
parts = ['SUBGRAPHS\n'], _generate_asciitree_subgraphs(subgraph_children, subgraph_ids, None, '', ''), ['NODES\n'] # type: Sequence[Iterable[str]]
else:
subgraph_ids = {None: ''}
parts = []
asciitree = ''.join(itertools.chain(*parts, self._generate_asciitree_nodes({}, subgraph_ids, map(str, itertools.count()), '', '')))
if not richoutput:
asciitree = asciitree.replace('├', ':').replace('└', ':').replace('│', '|')
return asciitree

def generate_graphviz_source(self, *, fill_color: Optional[GraphvizColorCallback] = None) -> str:
edges = [] # type: List[str]
nodes = {} # type: Dict[Optional[Subgraph], List[str]]
subgraph_children = _collect_subgraphs(self)
id_gen = map(str, itertools.count())
self._collect_graphviz_nodes_edges({}, id_gen, nodes, edges, None, fill_color)
return ''.join(itertools.chain(['digraph {graph [dpi=72];'], _generate_graphviz_subgraphs(subgraph_children, nodes, None, id_gen), edges, ['}']))

def export_graphviz(self, *, fill_color: Optional[GraphvizColorCallback] = None, dot_path: str = 'dot', image_type: str = 'png') -> None:
src = self.generate_graphviz_source(fill_color=fill_color)
with treelog.infofile('dot.'+image_type, 'wb') as img:
src = src.replace(';', ';\n')
status = subprocess.run([dot_path,'-Gstart=1','-T'+image_type], input=src.encode(), stdout=subprocess.PIPE)
if status.returncode:
for i, line in enumerate(src.split('\n'), 1):
print('{:04d} {}'.format(i, line))
treelog.warning('graphviz failed for error code', status.returncode)
img.write(status.stdout)

class RegularNode(Node[Metadata]):

def __init__(self, label: str, args: Sequence[Node[Metadata]], kwargs: Mapping[str, Node[Metadata]], metadata: Metadata, subgraph: Optional[Subgraph] = None) -> None:
self._label = label
self._args = tuple(args)
self._kwargs = dict(kwargs)
super().__init__(metadata, subgraph)

def __bool__(self) -> bool:
return True

def _generate_asciitree_nodes(self, cache: MutableMapping[Node[Metadata], str], subgraph_ids: Mapping[Optional[Subgraph], str], id_gen: Iterator[str], select: str, bridge: str) -> Generator[str, None, None]:
if self in cache:
yield '{}{}\n'.format(select, cache[self])
else:
subgraph_id = subgraph_ids[self.subgraph]
cache[self] = id = '%{}{}'.format(subgraph_id, next(id_gen))
yield '{}{} = {}\n'.format(select, id, self._label.replace('\n', '; '))
args = tuple(('', arg) for arg in self._args if arg) + tuple(('{} = '.format(name), arg) for name, arg in self._kwargs.items())
for i, (prefix, arg) in enumerate(args, 1-len(args)):
yield from arg._generate_asciitree_nodes(cache, subgraph_ids, id_gen, bridge+('├ ' if i else '└ ')+prefix, bridge+('│ ' if i else ' '))

def _collect_graphviz_nodes_edges(self, cache: MutableMapping[Node[Metadata], str], id_gen: Iterator[str], nodes: MutableMapping[Optional[Subgraph], List[str]], edges: List[str], parent_subgraph: Optional[Subgraph], fill_color: Optional[GraphvizColorCallback] = None) -> Optional[str]:
if self in cache:
return cache[self]
cache[self] = id = next(id_gen)
if self._kwargs:
table = ['<TABLE BORDER="1" CELLBORDER="0" CELLSPACING="0">']
table += ['<TR>', *('<TD PORT="kwarg{}"><FONT POINT-SIZE="10">{}</FONT></TD>'.format(ikwarg, html.escape(name)) for ikwarg, name in enumerate(self._kwargs)), '</TR>']
table += ['<TR><TD COLSPAN="{}">{}</TD></TR>'.format(len(self._kwargs), html.escape(line)) for line in self._label.split('\n')]
table += ['</TABLE>']
attributes = ['shape=plain', 'label=<{}>'.format(''.join(table))]
else:
attributes = ['shape=box', 'label="{}"'.format(self._label.replace('"', '\\"'))]
attributes.extend(_graphviz_fill_color_attributes(self, fill_color))
nodes.setdefault(self.subgraph, []).append('{} [{}];'.format(id, ','.join(attributes)))
for arg in self._args:
arg_id = arg._collect_graphviz_nodes_edges(cache, id_gen, nodes, edges, self.subgraph, fill_color)
if arg_id:
edges.append('{} -> {};'.format(arg_id, id))
for ikwarg, arg in enumerate(self._kwargs.values()):
arg_id = arg._collect_graphviz_nodes_edges(cache, id_gen, nodes, edges, self.subgraph, fill_color)
if arg_id:
edges.append('{} -> {}:kwarg{}:n;'.format(arg_id, id, ikwarg))
return id

def walk(self, seen: MutableSet[Node[Metadata]]) -> Iterator[Node[Metadata]]:
if self in seen:
return
seen.add(self)
yield self
for arg in self._args:
yield from arg.walk(seen)
for arg in self._kwargs.values():
yield from arg.walk(seen)

class DuplicatedLeafNode(Node[Metadata]):

def __init__(self, label: str, metadata: Metadata) -> None:
self._label = label
super().__init__(metadata)

def __bool__(self) -> bool:
return True

def _generate_asciitree_nodes(self, cache: MutableMapping[Node[Metadata], str], subgraph_ids: Mapping[Optional[Subgraph], str], id_gen: Iterator[str], select: str, bridge: str) -> Generator[str, None, None]:
yield '{}{}\n'.format(select, self._label.replace('\n', '; '))

def _collect_graphviz_nodes_edges(self, cache: MutableMapping[Node[Metadata], str], id_gen: Iterator[str], nodes: MutableMapping[Optional[Subgraph], List[str]], edges: List[str], parent_subgraph: Optional[Subgraph], fill_color: Optional[GraphvizColorCallback] = None) -> Optional[str]:
id = next(id_gen)
attributes = ['shape=box', 'label="{}"'.format(self._label.replace('"', '\\"')), *_graphviz_fill_color_attributes(self, fill_color)]
nodes.setdefault(parent_subgraph, []).append('{} [{}];'.format(id, ','.join(attributes)))
return id

def walk(self, seen: MutableSet[Node[Metadata]]) -> Iterator[Node[Metadata]]:
if self in seen:
return
seen.add(self)
yield self

class InvisibleNode(Node[Metadata]):

def __init__(self, metadata: Metadata) -> None:
super().__init__(metadata)

def __bool__(self) -> bool:
return False

def _generate_asciitree_nodes(self, cache: MutableMapping[Node[Metadata], str], subgraph_ids: Mapping[Optional[Subgraph], str], id_gen: Iterator[str], select: str, bridge: str) -> Generator[str, None, None]:
yield '{}\n'.format(select)

def _collect_graphviz_nodes_edges(self, cache: MutableMapping[Node[Metadata], str], id_gen: Iterator[str], nodes: MutableMapping[Optional[Subgraph], List[str]], edges: List[str], parent_subgraph: Optional[Subgraph], fill_color: Optional[GraphvizColorCallback] = None) -> Optional[str]:
return None

def walk(self, seen: MutableSet[Node[Metadata]]) -> Iterator[Node[Metadata]]:
if self in seen:
return
seen.add(self)
yield self

def _graphviz_fill_color_attributes(node: Node[Metadata], fill_color: Optional[GraphvizColorCallback]) -> Sequence[str]:
if not fill_color:
return ()
value = fill_color(node)
if value is None:
return ()
return 'style=filled', 'fillcolor="{}"'.format(value)

def _collect_subgraphs(node: Node[Metadata]) -> Dict[Optional[Subgraph], List[Subgraph]]:
children = {None: []} # type: Dict[Optional[Subgraph], List[Subgraph]]
for node in node.walk(set()):
subgraph = node.subgraph
if subgraph and subgraph not in children:
children[subgraph] = []
while subgraph and subgraph.parent not in children:
children[subgraph.parent] = [subgraph]
subgraph = subgraph.parent
subgraph = typing.cast(Subgraph, subgraph)
children[subgraph.parent].append(subgraph)
return children

def _generate_asciitree_subgraphs(children: Mapping[Optional[Subgraph], Sequence[Subgraph]], subgraph_ids: MutableMapping[Optional[Subgraph], str], subgraph: Optional[Subgraph], select: str, bridge: str) -> Iterator[str]:
assert subgraph not in subgraph_ids
subgraph_ids[subgraph] = id = chr(ord('A') + len(subgraph_ids))
if subgraph:
yield '{}{} = {}\n'.format(select, id, subgraph.label.replace('\n', '; '))
else:
yield '{}{}\n'.format(select, id)
for i, child in enumerate(children[subgraph], 1-len(children[subgraph])):
yield from _generate_asciitree_subgraphs(children, subgraph_ids, child, bridge+('├ ' if i else '└ '), bridge+('│ ' if i else ' '))

def _generate_graphviz_subgraphs(children: Mapping[Optional[Subgraph], Sequence[Subgraph]], nodes: Mapping[Optional[Subgraph], Sequence[str]], subgraph: Optional[Subgraph], id_gen: Iterator[str]) -> Iterator[str]:
for child in children[subgraph]:
yield 'subgraph cluster{} {{'.format(next(id_gen))
yield from _generate_graphviz_subgraphs(children, nodes, child, id_gen)
yield '}'
yield from nodes.get(subgraph, ())
Loading

0 comments on commit 77b9ad3

Please sign in to comment.