Skip to content

Commit

Permalink
collapse format
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshKarpel committed Jun 26, 2024
1 parent ba54630 commit 0e5b03d
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 87 deletions.
51 changes: 25 additions & 26 deletions examples/dag.yaml
Original file line number Diff line number Diff line change
@@ -1,30 +1,29 @@
flows:
dag:
nodes:
- id: 1
target:
id: sleep-and-echo
- id: 2
target:
id: sleep-and-echo
- id: 3
target:
id: sleep-and-echo
- id: 4
target:
id: sleep-and-echo
trigger:
after: ["1", "2"]
- id: 5
target:
id: sleep-and-echo
trigger:
after: ["3"]
- id: 6
target:
id: sleep-and-echo
trigger:
after: ["4", "5"]
default:
1:
target:
id: sleep-and-echo
2:
target:
id: sleep-and-echo
3:
target:
id: sleep-and-echo
4:
target:
id: sleep-and-echo
trigger:
after: ["1", "2"]
5:
target:
id: sleep-and-echo
trigger:
after: ["3"]
6:
target:
id: sleep-and-echo
trigger:
after: ["4", "5"]

targets:
sleep-and-echo:
Expand Down
31 changes: 15 additions & 16 deletions synth.yaml
Original file line number Diff line number Diff line change
@@ -1,21 +1,20 @@
flows:
default:
nodes:
- id: tests
target:
id: tests
trigger:
id: code-changes
- id: types
target:
id: types
trigger:
id: code-changes
- id: docs
target:
id: docs
trigger:
type: restart
tests:
target:
id: tests
trigger:
id: code-changes
types:
target:
id: types
trigger:
id: code-changes
docs:
target:
id: docs
trigger:
type: restart

targets:
tests:
Expand Down
31 changes: 12 additions & 19 deletions synthesize/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import shlex
import shutil
from colorsys import hsv_to_rgb
from functools import cached_property
from pathlib import Path
from random import random
from stat import S_IEXEC
Expand Down Expand Up @@ -137,12 +136,8 @@ class FlowNode(Model):

color: str

@cached_property
def file_name(self) -> str:
return f"{self.id}-{md5(self.model_dump_json().encode())}"

def write_script(self, tmp_dir: Path) -> Path:
path = tmp_dir / self.file_name
path = tmp_dir / f"{self.id}-{md5(self.model_dump_json().encode())}"

path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(
Expand All @@ -167,8 +162,6 @@ class TriggerRef(Model):


class UnresolvedFlowNode(Model):
id: str

target: Target | TargetRef
args: Args
envs: Envs
Expand All @@ -179,11 +172,12 @@ class UnresolvedFlowNode(Model):

def resolve(
self,
id: str,
targets: dict[str, Target],
triggers: dict[str, AnyTrigger],
) -> FlowNode:
return FlowNode(
id=self.id,
id=id,
target=targets[self.target.id] if isinstance(self.target, TargetRef) else self.target,
args=self.args,
envs=self.envs,
Expand All @@ -194,15 +188,8 @@ def resolve(
)


class Flow(Model):
nodes: tuple[FlowNode, ...]


class UnresolvedFlow(Model):
nodes: tuple[UnresolvedFlowNode, ...]

def resolve(self, targets: dict[str, Target], triggers: dict[str, AnyTrigger]) -> Flow:
return Flow(nodes=tuple(node.resolve(targets, triggers) for node in self.nodes))
UnresolvedFlow = dict[str, UnresolvedFlowNode]
Flow = dict[str, FlowNode]


class Config(Model):
Expand All @@ -220,4 +207,10 @@ def from_file(cls, file: Path) -> Config:
raise NotImplementedError("Currently, only YAML files are supported.")

def resolve(self) -> dict[str, Flow]:
return {id: flow.resolve(self.targets, self.triggers) for id, flow in self.flows.items()}
return {
flow_id: {
node_id: node.resolve(node_id, self.targets, self.triggers)
for node_id, node in flow.items()
}
for flow_id, flow in self.flows.items()
}
2 changes: 1 addition & 1 deletion synthesize/orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ async def start() -> None:
await start()

async def start_watchers(self) -> None:
for node in self.flow.nodes:
for node in self.flow.values():
if isinstance(node.trigger, Watch):
self.watchers[node.id] = create_task(
watch(
Expand Down
4 changes: 1 addition & 3 deletions synthesize/renderer.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,13 +117,11 @@ def handle_lifecycle_message(
match message:
case ExecutionStarted(node=node, pid=pid):
parts = (
"Node ",
(node.id, node.color),
f" started (pid {pid})",
)
case ExecutionCompleted(node=node, pid=pid, exit_code=exit_code):
parts = (
"Node ",
(node.id, node.color),
f" (pid {pid}) exited with code ",
(str(exit_code), "green" if exit_code == 0 else "red"),
Expand All @@ -134,7 +132,7 @@ def handle_lifecycle_message(
)

parts = (
"Running node ",
"Running ",
(node.id, node.color),
" due to detected changes: ",
changes,
Expand Down
40 changes: 18 additions & 22 deletions synthesize/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,68 +18,64 @@ class FlowNodeStatus(Enum):
@dataclass(frozen=True)
class FlowState:
graph: DiGraph
id_to_node: dict[str, FlowNode]
id_to_status: dict[str, FlowNodeStatus]
flow: Flow
statuses: dict[str, FlowNodeStatus]

@classmethod
def from_flow(cls, flow: Flow) -> FlowState:
id_to_node = {node.id: node for node in flow.nodes}

graph = DiGraph()

for id, node in id_to_node.items():
graph.add_node(node.id)
for id, node in flow.items():
graph.add_node(id)
if isinstance(node.trigger, After):
for predecessor_id in node.trigger.after:
graph.add_edge(predecessor_id, id)

return FlowState(
graph=graph,
id_to_node={id: node for id, node in id_to_node.items()},
id_to_status={id: FlowNodeStatus.Pending for id in graph.nodes},
flow=flow,
statuses={id: FlowNodeStatus.Pending for id in graph.nodes},
)

def running_nodes(self) -> set[FlowNode]:
return {
self.id_to_node[id]
for id, status in self.id_to_status.items()
self.flow[id]
for id, status in self.statuses.items()
if status is FlowNodeStatus.Running
}

def ready_nodes(self) -> set[FlowNode]:
return {
self.id_to_node[id]
self.flow[id]
for id in self.graph.nodes
if self.id_to_status[id] is FlowNodeStatus.Pending
and all(
self.id_to_status[a] is FlowNodeStatus.Succeeded for a in ancestors(self.graph, id)
)
if self.statuses[id] is FlowNodeStatus.Pending
and all(self.statuses[a] is FlowNodeStatus.Succeeded for a in ancestors(self.graph, id))
}

def mark_success(self, node: FlowNode) -> None:
self.id_to_status[node.id] = FlowNodeStatus.Succeeded
self.statuses[node.id] = FlowNodeStatus.Succeeded

def mark_failure(self, node: FlowNode) -> None:
self.id_to_status[node.id] = FlowNodeStatus.Failed
self.statuses[node.id] = FlowNodeStatus.Failed

def mark_pending(self, node: FlowNode) -> None:
self.id_to_status[node.id] = FlowNodeStatus.Pending
self.statuses[node.id] = FlowNodeStatus.Pending

def mark_descendants_pending(self, node: FlowNode) -> None:
for t in _descendants(self.graph, {node.id}):
self.id_to_status[t] = FlowNodeStatus.Pending
self.statuses[t] = FlowNodeStatus.Pending

def mark_running(self, node: FlowNode) -> None:
self.id_to_status[node.id] = FlowNodeStatus.Running
self.statuses[node.id] = FlowNodeStatus.Running

def all_done(self) -> bool:
return all(status is FlowNodeStatus.Succeeded for status in self.id_to_status.values())
return all(status is FlowNodeStatus.Succeeded for status in self.statuses.values())

def num_nodes(self) -> int:
return len(self.graph)

def nodes(self) -> set[FlowNode]:
return set(self.id_to_node.values())
return set(self.flow.values())


def _ancestors(graph: DiGraph, nodes: set[str]) -> set[str]:
Expand Down

0 comments on commit 0e5b03d

Please sign in to comment.