From 0e5b03dcc4e3f3277ecee17167f8eb5fa55c9b5a Mon Sep 17 00:00:00 2001 From: Josh Karpel Date: Wed, 26 Jun 2024 01:33:56 -0500 Subject: [PATCH] collapse format --- examples/dag.yaml | 51 +++++++++++++++++++------------------- synth.yaml | 31 +++++++++++------------ synthesize/config.py | 31 +++++++++-------------- synthesize/orchestrator.py | 2 +- synthesize/renderer.py | 4 +-- synthesize/state.py | 40 ++++++++++++++---------------- 6 files changed, 72 insertions(+), 87 deletions(-) diff --git a/examples/dag.yaml b/examples/dag.yaml index 36f1b47..fb4f9c2 100644 --- a/examples/dag.yaml +++ b/examples/dag.yaml @@ -1,30 +1,29 @@ flows: - dag: - nodes: - - id: 1 - target: - id: sleep-and-echo - - id: 2 - target: - id: sleep-and-echo - - id: 3 - target: - id: sleep-and-echo - - id: 4 - target: - id: sleep-and-echo - trigger: - after: ["1", "2"] - - id: 5 - target: - id: sleep-and-echo - trigger: - after: ["3"] - - id: 6 - target: - id: sleep-and-echo - trigger: - after: ["4", "5"] + default: + 1: + target: + id: sleep-and-echo + 2: + target: + id: sleep-and-echo + 3: + target: + id: sleep-and-echo + 4: + target: + id: sleep-and-echo + trigger: + after: ["1", "2"] + 5: + target: + id: sleep-and-echo + trigger: + after: ["3"] + 6: + target: + id: sleep-and-echo + trigger: + after: ["4", "5"] targets: sleep-and-echo: diff --git a/synth.yaml b/synth.yaml index 25fae8f..753dad1 100644 --- a/synth.yaml +++ b/synth.yaml @@ -1,21 +1,20 @@ flows: default: - nodes: - - id: tests - target: - id: tests - trigger: - id: code-changes - - id: types - target: - id: types - trigger: - id: code-changes - - id: docs - target: - id: docs - trigger: - type: restart + tests: + target: + id: tests + trigger: + id: code-changes + types: + target: + id: types + trigger: + id: code-changes + docs: + target: + id: docs + trigger: + type: restart targets: tests: diff --git a/synthesize/config.py b/synthesize/config.py index e4a596c..c73832f 100644 --- a/synthesize/config.py +++ b/synthesize/config.py @@ -3,7 +3,6 @@ import shlex import shutil from colorsys import hsv_to_rgb -from functools import cached_property from pathlib import Path from random import random from stat import S_IEXEC @@ -137,12 +136,8 @@ class FlowNode(Model): color: str - @cached_property - def file_name(self) -> str: - return f"{self.id}-{md5(self.model_dump_json().encode())}" - def write_script(self, tmp_dir: Path) -> Path: - path = tmp_dir / self.file_name + path = tmp_dir / f"{self.id}-{md5(self.model_dump_json().encode())}" path.parent.mkdir(parents=True, exist_ok=True) path.write_text( @@ -167,8 +162,6 @@ class TriggerRef(Model): class UnresolvedFlowNode(Model): - id: str - target: Target | TargetRef args: Args envs: Envs @@ -179,11 +172,12 @@ class UnresolvedFlowNode(Model): def resolve( self, + id: str, targets: dict[str, Target], triggers: dict[str, AnyTrigger], ) -> FlowNode: return FlowNode( - id=self.id, + id=id, target=targets[self.target.id] if isinstance(self.target, TargetRef) else self.target, args=self.args, envs=self.envs, @@ -194,15 +188,8 @@ def resolve( ) -class Flow(Model): - nodes: tuple[FlowNode, ...] - - -class UnresolvedFlow(Model): - nodes: tuple[UnresolvedFlowNode, ...] - - def resolve(self, targets: dict[str, Target], triggers: dict[str, AnyTrigger]) -> Flow: - return Flow(nodes=tuple(node.resolve(targets, triggers) for node in self.nodes)) +UnresolvedFlow = dict[str, UnresolvedFlowNode] +Flow = dict[str, FlowNode] class Config(Model): @@ -220,4 +207,10 @@ def from_file(cls, file: Path) -> Config: raise NotImplementedError("Currently, only YAML files are supported.") def resolve(self) -> dict[str, Flow]: - return {id: flow.resolve(self.targets, self.triggers) for id, flow in self.flows.items()} + return { + flow_id: { + node_id: node.resolve(node_id, self.targets, self.triggers) + for node_id, node in flow.items() + } + for flow_id, flow in self.flows.items() + } diff --git a/synthesize/orchestrator.py b/synthesize/orchestrator.py index 3246b2d..b23db3f 100644 --- a/synthesize/orchestrator.py +++ b/synthesize/orchestrator.py @@ -136,7 +136,7 @@ async def start() -> None: await start() async def start_watchers(self) -> None: - for node in self.flow.nodes: + for node in self.flow.values(): if isinstance(node.trigger, Watch): self.watchers[node.id] = create_task( watch( diff --git a/synthesize/renderer.py b/synthesize/renderer.py index c9eef00..767e741 100644 --- a/synthesize/renderer.py +++ b/synthesize/renderer.py @@ -117,13 +117,11 @@ def handle_lifecycle_message( match message: case ExecutionStarted(node=node, pid=pid): parts = ( - "Node ", (node.id, node.color), f" started (pid {pid})", ) case ExecutionCompleted(node=node, pid=pid, exit_code=exit_code): parts = ( - "Node ", (node.id, node.color), f" (pid {pid}) exited with code ", (str(exit_code), "green" if exit_code == 0 else "red"), @@ -134,7 +132,7 @@ def handle_lifecycle_message( ) parts = ( - "Running node ", + "Running ", (node.id, node.color), " due to detected changes: ", changes, diff --git a/synthesize/state.py b/synthesize/state.py index 27a1c0f..b325151 100644 --- a/synthesize/state.py +++ b/synthesize/state.py @@ -18,68 +18,64 @@ class FlowNodeStatus(Enum): @dataclass(frozen=True) class FlowState: graph: DiGraph - id_to_node: dict[str, FlowNode] - id_to_status: dict[str, FlowNodeStatus] + flow: Flow + statuses: dict[str, FlowNodeStatus] @classmethod def from_flow(cls, flow: Flow) -> FlowState: - id_to_node = {node.id: node for node in flow.nodes} - graph = DiGraph() - for id, node in id_to_node.items(): - graph.add_node(node.id) + for id, node in flow.items(): + graph.add_node(id) if isinstance(node.trigger, After): for predecessor_id in node.trigger.after: graph.add_edge(predecessor_id, id) return FlowState( graph=graph, - id_to_node={id: node for id, node in id_to_node.items()}, - id_to_status={id: FlowNodeStatus.Pending for id in graph.nodes}, + flow=flow, + statuses={id: FlowNodeStatus.Pending for id in graph.nodes}, ) def running_nodes(self) -> set[FlowNode]: return { - self.id_to_node[id] - for id, status in self.id_to_status.items() + self.flow[id] + for id, status in self.statuses.items() if status is FlowNodeStatus.Running } def ready_nodes(self) -> set[FlowNode]: return { - self.id_to_node[id] + self.flow[id] for id in self.graph.nodes - if self.id_to_status[id] is FlowNodeStatus.Pending - and all( - self.id_to_status[a] is FlowNodeStatus.Succeeded for a in ancestors(self.graph, id) - ) + if self.statuses[id] is FlowNodeStatus.Pending + and all(self.statuses[a] is FlowNodeStatus.Succeeded for a in ancestors(self.graph, id)) } def mark_success(self, node: FlowNode) -> None: - self.id_to_status[node.id] = FlowNodeStatus.Succeeded + self.statuses[node.id] = FlowNodeStatus.Succeeded def mark_failure(self, node: FlowNode) -> None: - self.id_to_status[node.id] = FlowNodeStatus.Failed + self.statuses[node.id] = FlowNodeStatus.Failed def mark_pending(self, node: FlowNode) -> None: - self.id_to_status[node.id] = FlowNodeStatus.Pending + self.statuses[node.id] = FlowNodeStatus.Pending def mark_descendants_pending(self, node: FlowNode) -> None: for t in _descendants(self.graph, {node.id}): - self.id_to_status[t] = FlowNodeStatus.Pending + self.statuses[t] = FlowNodeStatus.Pending def mark_running(self, node: FlowNode) -> None: - self.id_to_status[node.id] = FlowNodeStatus.Running + self.statuses[node.id] = FlowNodeStatus.Running def all_done(self) -> bool: - return all(status is FlowNodeStatus.Succeeded for status in self.id_to_status.values()) + return all(status is FlowNodeStatus.Succeeded for status in self.statuses.values()) def num_nodes(self) -> int: return len(self.graph) def nodes(self) -> set[FlowNode]: - return set(self.id_to_node.values()) + return set(self.flow.values()) def _ancestors(graph: DiGraph, nodes: set[str]) -> set[str]: