Skip to content

Commit

Permalink
change structure again
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshKarpel committed Jun 26, 2024
1 parent c1cf3ea commit cf22ab3
Show file tree
Hide file tree
Showing 6 changed files with 71 additions and 44 deletions.
37 changes: 19 additions & 18 deletions examples/dag.yaml
Original file line number Diff line number Diff line change
@@ -1,23 +1,24 @@
flows:
default:
1:
target: sleep-and-echo
2:
target: sleep-and-echo
3:
target: sleep-and-echo
4:
target: sleep-and-echo
trigger:
after: ["1", "2"]
5:
target: sleep-and-echo
trigger:
after: ["3"]
6:
target: sleep-and-echo
trigger:
after: ["4", "5"]
nodes:
1:
target: sleep-and-echo
2:
target: sleep-and-echo
3:
target: sleep-and-echo
4:
target: sleep-and-echo
trigger:
after: ["1", "2"]
5:
target: sleep-and-echo
trigger:
after: ["3"]
6:
target: sleep-and-echo
trigger:
after: ["4", "5"]

targets:
sleep-and-echo:
Expand Down
21 changes: 11 additions & 10 deletions synth.yaml
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
flows:
default:
tests:
target: tests
trigger: code-changes
types:
target: types
trigger: code-changes
docs:
target: docs
trigger:
type: restart
nodes:
tests:
target: tests
trigger: code-changes
types:
target: types
trigger: code-changes
docs:
target: docs
trigger:
type: restart

targets:
tests:
Expand Down
36 changes: 28 additions & 8 deletions synthesize/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,13 +136,14 @@ class FlowNode(Model):

color: str

def write_script(self, tmp_dir: Path) -> Path:
def write_script(self, tmp_dir: Path, flow_args: Args) -> Path:
path = tmp_dir / f"{self.id}-{md5(self.model_dump_json().encode())}"

path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(
self.target.render(
args=self.args
args=flow_args
| self.args
| {
"id": self.id,
}
Expand Down Expand Up @@ -178,8 +179,30 @@ def resolve(
)


UnresolvedFlow = dict[str, UnresolvedFlowNode]
Flow = dict[str, FlowNode]
class Flow(Model):
nodes: dict[str, FlowNode]
args: Args = Field(default_factory=frozendict)
envs: Envs = Field(default_factory=frozendict)


class UnresolvedFlow(Model):
nodes: dict[str, UnresolvedFlowNode]
args: Args = Field(default_factory=frozendict)
envs: Envs = Field(default_factory=frozendict)

def resolve(
self,
targets: dict[str, Target],
triggers: dict[str, AnyTrigger],
) -> Flow:
return Flow(
nodes={
node_id: node.resolve(node_id, targets, triggers)
for node_id, node in self.nodes.items()
},
args=self.args,
envs=self.envs,
)


class Config(Model):
Expand All @@ -198,9 +221,6 @@ def from_file(cls, file: Path) -> Config:

def resolve(self) -> dict[str, Flow]:
return {
flow_id: {
node_id: node.resolve(node_id, self.targets, self.triggers)
for node_id, node in flow.items()
}
flow_id: flow.resolve(self.targets, self.triggers)
for flow_id, flow in self.flows.items()
}
10 changes: 7 additions & 3 deletions synthesize/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
from pathlib import Path
from signal import SIGKILL, SIGTERM

from synthesize.config import FlowNode
from frozendict import frozendict

from synthesize.config import Flow, FlowNode
from synthesize.messages import ExecutionCompleted, ExecutionOutput, ExecutionStarted, Message


Expand All @@ -26,17 +28,19 @@ class Execution:
async def start(
cls,
node: FlowNode,
flow: Flow,
events: Queue[Message],
tmp_dir: Path,
width: int = 80,
) -> Execution:
path = node.write_script(tmp_dir=tmp_dir)
path = node.write_script(tmp_dir=tmp_dir, flow_args=flow.args)

process = await create_subprocess_exec(
program=path,
stdout=PIPE,
stderr=STDOUT,
env=os.environ
env=frozendict(os.environ)
| flow.envs
| node.envs
| {
"FORCE_COLOR": "1",
Expand Down
3 changes: 2 additions & 1 deletion synthesize/orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ async def start_ready_targets(self, tmp_dir: Path) -> None:
async def start() -> None:
e = await Execution.start(
node=ready_node,
flow=self.flow,
events=self.inbox,
width=self.console.width - self.renderer.prefix_width,
tmp_dir=tmp_dir,
Expand All @@ -136,7 +137,7 @@ async def start() -> None:
await start()

async def start_watchers(self) -> None:
for node in self.flow.values():
for node in self.flow.nodes.values():
if isinstance(node.trigger, Watch):
self.watchers[node.id] = create_task(
watch(
Expand Down
8 changes: 4 additions & 4 deletions synthesize/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class FlowState:
def from_flow(cls, flow: Flow) -> FlowState:
graph = DiGraph()

for id, node in flow.items():
for id, node in flow.nodes.items():
graph.add_node(id)
if isinstance(node.trigger, After):
for predecessor_id in node.trigger.after:
Expand All @@ -39,14 +39,14 @@ def from_flow(cls, flow: Flow) -> FlowState:

def running_nodes(self) -> set[FlowNode]:
return {
self.flow[id]
self.flow.nodes[id]
for id, status in self.statuses.items()
if status is FlowNodeStatus.Running
}

def ready_nodes(self) -> set[FlowNode]:
return {
self.flow[id]
self.flow.nodes[id]
for id in self.graph.nodes
if self.statuses[id] is FlowNodeStatus.Pending
and all(self.statuses[a] is FlowNodeStatus.Succeeded for a in ancestors(self.graph, id))
Expand Down Expand Up @@ -75,7 +75,7 @@ def num_nodes(self) -> int:
return len(self.graph)

def nodes(self) -> set[FlowNode]:
return set(self.flow.values())
return set(self.flow.nodes.values())


def _ancestors(graph: DiGraph, nodes: set[str]) -> set[str]:
Expand Down

0 comments on commit cf22ab3

Please sign in to comment.