diff --git a/docs/changelog.md b/docs/changelog.md index 3bcaafb..a82ecd9 100644 --- a/docs/changelog.md +++ b/docs/changelog.md @@ -26,6 +26,11 @@ - [#41](https://github.com/JoshKarpel/synthesize/pull/41) Execution duration is printed in the completion message. +### Fixed + +- [#45](https://github.com/JoshKarpel/synthesize/pull/45) + `Restart` triggers now allow for the node's children to run again after the node completes. + ## `0.0.2` Released `2023-02-12` diff --git a/docs/examples/after.yaml b/docs/examples/after.yaml index eadb6e1..a3d96e5 100644 --- a/docs/examples/after.yaml +++ b/docs/examples/after.yaml @@ -1,24 +1,24 @@ flows: default: nodes: - 1: + A: target: sleep-and-echo - 2: + B: target: sleep-and-echo - 3: + C: target: sleep-and-echo - 4: + D: target: sleep-and-echo trigger: - after: ["1", "2"] - 5: + after: ["A", "B"] + E: target: sleep-and-echo trigger: - after: ["3"] - 6: + after: ["C"] + F: target: sleep-and-echo trigger: - after: ["4", "5"] + after: ["D", "E"] targets: sleep-and-echo: diff --git a/docs/examples/once.yaml b/docs/examples/once.yaml index bd35e4e..664256b 100644 --- a/docs/examples/once.yaml +++ b/docs/examples/once.yaml @@ -1,11 +1,11 @@ flows: default: nodes: - 1: + A: target: sleep-and-echo - 2: + B: target: sleep-and-echo - 3: + C: target: sleep-and-echo targets: diff --git a/docs/examples/restart-after.yaml b/docs/examples/restart-after.yaml new file mode 100644 index 0000000..cc096f9 --- /dev/null +++ b/docs/examples/restart-after.yaml @@ -0,0 +1,22 @@ +flows: + default: + nodes: + A: + target: sleep-and-echo + trigger: + type: restart + delay: 10 + B: + target: sleep-and-echo + trigger: + after: ["A"] + C: + target: sleep-and-echo + trigger: + after: ["B"] + +targets: + sleep-and-echo: + commands: | + sleep 2 + echo "Hi from {{ id }}!" diff --git a/docs/examples/restart.yaml b/docs/examples/restart.yaml index 0659037..5b75809 100644 --- a/docs/examples/restart.yaml +++ b/docs/examples/restart.yaml @@ -1,12 +1,12 @@ flows: default: nodes: - 1: + A: target: sleep-and-echo trigger: type: restart delay: 3 - 2: + B: target: sleep-and-echo trigger: type: restart diff --git a/docs/examples/watch.yaml b/docs/examples/watch.yaml index ec22430..5048240 100644 --- a/docs/examples/watch.yaml +++ b/docs/examples/watch.yaml @@ -1,12 +1,12 @@ flows: default: nodes: - 1: + A: target: sleep-and-echo trigger: type: watch paths: ["synthesize/", "tests/"] - 2: + B: target: sleep-and-echo trigger: type: watch diff --git a/docs/hooks/mermaid.py b/docs/hooks/mermaid.py index 089a10c..822741d 100644 --- a/docs/hooks/mermaid.py +++ b/docs/hooks/mermaid.py @@ -14,7 +14,7 @@ def on_page_markdown( ) -> str: lines = [] for line in markdown.splitlines(): - if match := re.match(r"@mermaid\(([\w\.\/]+)\)", line): + if match := re.match(r"@mermaid\(([\w\.\/\-]+)\)", line): lines.append("```mermaid") cmd = subprocess.run( ("synth", "--mermaid", "--config", match.group(1)), diff --git a/docs/index.md b/docs/index.md index da8afd3..b11d674 100644 --- a/docs/index.md +++ b/docs/index.md @@ -1 +1,7 @@ # Synthesize + +```yaml +--8<-- "synth.yaml" +``` + +@mermaid(synth.yaml) diff --git a/docs/triggers.md b/docs/triggers.md index 2b3594c..edd482e 100644 --- a/docs/triggers.md +++ b/docs/triggers.md @@ -54,8 +54,10 @@ Use this trigger to run a node in reaction to changes in the filesystem. ## Combining Triggers +### Restart + After + ```yaml ---8<-- "synth.yaml" +--8<-- "docs/examples/restart-after.yaml" ``` -@mermaid(synth.yaml) +@mermaid(docs/examples/restart-after.yaml) diff --git a/synthesize/orchestrator.py b/synthesize/orchestrator.py index ff11505..590b3ad 100644 --- a/synthesize/orchestrator.py +++ b/synthesize/orchestrator.py @@ -20,7 +20,7 @@ WatchPathChanged, ) from synthesize.renderer import Renderer -from synthesize.state import FlowState +from synthesize.state import FlowState, Status from synthesize.utils import delay @@ -87,12 +87,15 @@ async def handle_messages(self, tmp_dir: Path) -> None: else: self.state.mark_failure(node) + self.state.mark_pending(*self.state.children(node)) + case WatchPathChanged(node=node): if e := self.executions.get(node.id): + self.waiters[node.id].add_done_callback( + lambda _: self.state.mark_pending(node) + ) e.terminate() - self.state.mark_descendants_pending(node) - case Quit(): return @@ -112,28 +115,28 @@ async def heartbeat() -> None: self.heartbeat = create_task(heartbeat()) async def start_ready_targets(self, tmp_dir: Path) -> None: - for ready_node in self.state.ready_nodes(): - if e := self.executions.get(ready_node.id): + for node in self.state.ready_nodes(): + if e := self.executions.get(node.id): if not e.has_exited: continue - self.state.mark_running(ready_node) - async def start() -> None: e = await Execution.start( - node=ready_node, + node=node, args=self.flow.args, envs=self.flow.envs, tmp_dir=tmp_dir, width=self.console.width - self.renderer.prefix_width, events=self.inbox, ) - self.executions[ready_node.id] = e - self.waiters[ready_node.id] = create_task(e.wait()) + self.executions[node.id] = e + self.waiters[node.id] = create_task(e.wait()) + self.state.mark_running(node) # When restarting after first execution, delay - if isinstance(ready_node.trigger, Restart) and ready_node.id in self.executions: - delay(ready_node.trigger.delay, start) + if isinstance(node.trigger, Restart) and node.id in self.executions: + self.state.mark(node, status=Status.Waiting) + delay(node.trigger.delay, start) else: await start() diff --git a/synthesize/renderer.py b/synthesize/renderer.py index 9ff2af0..ddb6a33 100644 --- a/synthesize/renderer.py +++ b/synthesize/renderer.py @@ -21,7 +21,7 @@ Message, WatchPathChanged, ) -from synthesize.state import FlowState +from synthesize.state import FlowState, Status prefix_format = "{timestamp:%H:%M:%S} {id} " internal_format = "{timestamp:%H:%M:%S}" @@ -61,28 +61,43 @@ def handle_message(self, message: Message) -> None: self.update(message) def info(self, event: Message) -> RenderableType: - table = Table.grid(padding=(1, 1, 0, 0)) + table = Table.grid(padding=(1, 1, 0, 0), expand=False) + + status_table = Table.grid(padding=(2, 2, 0, 0), expand=False) + + nodes_by_status = self.state.nodes_by_status() + node_status_displays = [] + for status in ( + Status.Running, + Status.Waiting, + Status.Pending, + Status.Succeeded, + Status.Failed, + ): + nodes_with_status = nodes_by_status[status] + if nodes_with_status: + node_status_displays.append( + Text.assemble( + status.value.capitalize(), + " ", + Text(" ").join( + Text(t.id, style=Style(color="black", bgcolor=t.color)) + for t in sorted(nodes_with_status, key=lambda t: t.id) + ), + ) + ) - running_nodes = self.state.running_nodes() - - running = ( - Text.assemble( - "Running ", - Text(" ").join( - Text(t.id, style=Style(color="black", bgcolor=t.color)) - for t in sorted(running_nodes, key=lambda t: t.id) - ), - ) - if running_nodes - else Text() - ) + status_table.add_row(*node_status_displays) table.add_row( internal_format.format_map({"timestamp": event.timestamp}), - running, + status_table, ) - return Group(Rule(style=(Style(color="green" if running_nodes else "yellow"))), table) + return Group( + Rule(style=(Style(color="green" if nodes_by_status[Status.Running] else "yellow"))), + table, + ) def render_prefix( self, message: ExecutionOutput | ExecutionStarted | ExecutionCompleted | WatchPathChanged @@ -109,7 +124,7 @@ def handle_lifecycle_message( ) -> None: prefix = Text.from_markup( self.render_prefix(message), - style=Style(color=message.node.color), + style=Style(color=message.node.color, dim=True), ) parts: tuple[str | tuple[str, str] | tuple[str, Style] | Text, ...] diff --git a/synthesize/state.py b/synthesize/state.py index 3464298..92cbece 100644 --- a/synthesize/state.py +++ b/synthesize/state.py @@ -1,6 +1,7 @@ from __future__ import annotations -from collections.abc import Collection, Iterator +from collections import defaultdict +from collections.abc import Collection, Iterator, Mapping from dataclasses import dataclass from enum import Enum @@ -9,8 +10,9 @@ from synthesize.config import After, Flow, FlowNode -class FlowNodeStatus(Enum): +class Status(Enum): Pending = "pending" + Waiting = "waiting" Running = "running" Succeeded = "succeeded" Failed = "failed" @@ -20,7 +22,7 @@ class FlowNodeStatus(Enum): class FlowState: graph: DiGraph flow: Flow - statuses: dict[str, FlowNodeStatus] + statuses: dict[str, Status] @classmethod def from_flow(cls, flow: Flow) -> FlowState: @@ -35,53 +37,54 @@ def from_flow(cls, flow: Flow) -> FlowState: return FlowState( graph=graph, flow=flow, - statuses={id: FlowNodeStatus.Pending for id in graph.nodes}, + statuses={id: Status.Pending for id in graph.nodes}, ) - def running_nodes(self) -> Collection[FlowNode]: - return tuple( - self.flow.nodes[id] - for id, status in self.statuses.items() - if status is FlowNodeStatus.Running - ) + def nodes_by_status(self) -> Mapping[Status, Collection[FlowNode]]: + d = defaultdict(list) + for id, s in self.statuses.items(): + d[s].append(self.flow.nodes[id]) + return d def ready_nodes(self) -> Collection[FlowNode]: return tuple( self.flow.nodes[id] for id in self.graph.nodes - if self.statuses[id] is FlowNodeStatus.Pending - and all(self.statuses[a] is FlowNodeStatus.Succeeded for a in ancestors(self.graph, id)) + if self.statuses[id] is Status.Pending + and all( + self.statuses[a] + in ( + Status.Succeeded, + Status.Waiting, + ) + for a in ancestors(self.graph, id) + ) ) - def mark_success(self, node: FlowNode) -> None: - self.statuses[node.id] = FlowNodeStatus.Succeeded + def mark_success(self, *nodes: FlowNode) -> None: + self.mark(*nodes, status=Status.Succeeded) - def mark_failure(self, node: FlowNode) -> None: - self.statuses[node.id] = FlowNodeStatus.Failed + def mark_failure(self, *nodes: FlowNode) -> None: + self.mark(*nodes, status=Status.Failed) - def mark_pending(self, node: FlowNode) -> None: - self.statuses[node.id] = FlowNodeStatus.Pending + def mark_pending(self, *nodes: FlowNode) -> None: + self.mark(*nodes, status=Status.Pending) - def mark_descendants_pending(self, node: FlowNode) -> None: - for t in _descendants(self.graph, {node.id}): - self.statuses[t] = FlowNodeStatus.Pending + def mark_running(self, *nodes: FlowNode) -> None: + self.mark(*nodes, status=Status.Running) - def mark_running(self, node: FlowNode) -> None: - self.statuses[node.id] = FlowNodeStatus.Running + def mark(self, *nodes: FlowNode, status: Status) -> None: + for node in nodes: + self.statuses[node.id] = status - def all_done(self) -> bool: - return all(status is FlowNodeStatus.Succeeded for status in self.statuses.values()) + def children(self, node: FlowNode) -> Collection[FlowNode]: + return tuple(self.flow.nodes[id] for id in self.graph.successors(node.id)) + + def descendants(self, node: FlowNode) -> Collection[FlowNode]: + return tuple(self.flow.nodes[id] for id in descendants(self.graph, node.id)) - def num_nodes(self) -> int: - return len(self.graph) + def all_done(self) -> bool: + return all(status is Status.Succeeded for status in self.statuses.values()) def nodes(self) -> Iterator[FlowNode]: yield from self.flow.nodes.values() - - -def _ancestors(graph: DiGraph, nodes: set[str]) -> set[str]: - return nodes.union(*(ancestors(graph, n) for n in nodes)) - - -def _descendants(graph: DiGraph, nodes: set[str]) -> set[str]: - return nodes.union(*(descendants(graph, n) for n in nodes))